From b6e067119d12cf9231f0783ed7a49c883ec1fc68 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Tue, 16 Jun 2026 12:31:13 +0100 Subject: [PATCH 1/9] feat(es): add dump helper and bulk-ndjson restore path Add `elastic es helpers dump` for exporting indices as bulk-format NDJSON using PIT + search_after, and extend `bulk-ingest` with a `bulk-ndjson` source format so the dump output can be streamed back into `_bulk`. Use case: capture a remote index for local debugging. Inspired by escli-rs (https://github.com/Anaethelion/escli-rs); ports the dump/load feature set into the elastic/cli helper conventions. --- src/es/helpers/bulk-ingest.ts | 165 ++++++++++-- src/es/helpers/dump.ts | 243 +++++++++++++++++ src/es/helpers/register.ts | 4 +- test/es/helpers/bulk-ingest.test.ts | 191 +++++++++++++ test/es/helpers/dump.test.ts | 403 ++++++++++++++++++++++++++++ 5 files changed, 980 insertions(+), 26 deletions(-) create mode 100644 src/es/helpers/dump.ts create mode 100644 test/es/helpers/dump.test.ts diff --git a/src/es/helpers/bulk-ingest.ts b/src/es/helpers/bulk-ingest.ts index dc54a5d2..40b9a4a2 100644 --- a/src/es/helpers/bulk-ingest.ts +++ b/src/es/helpers/bulk-ingest.ts @@ -28,16 +28,16 @@ export interface BulkIngestDeps { const defaultDeps: BulkIngestDeps = { getEsClient } -const SOURCE_FORMATS = ['ndjson', 'json', 'csv'] as const +const SOURCE_FORMATS = ['ndjson', 'json', 'csv', 'bulk-ndjson'] as const type SourceFormat = typeof SOURCE_FORMATS[number] const inputSchema = z.object({ - index: z.string().describe('Target index'), - data_file: z.string().optional().describe('Path to data file (NDJSON, JSON array, or CSV)'), + index: z.string().optional().describe('Target index (required unless --source-format bulk-ndjson, in which case action lines may carry _index)'), + data_file: z.string().optional().describe('Path to data file (NDJSON, JSON array, CSV, or pre-formatted bulk NDJSON)'), data_dir: z.string().optional().describe('Path to directory of data files to ingest'), glob: z.string().optional().describe('Glob pattern for --data-dir file matching (default: **/*.json, or **/*.csv when --source-format csv)'), no_recursive: z.boolean().optional().describe('Do not recurse into subdirectories when using --data-dir'), - source_format: z.enum(SOURCE_FORMATS).default('ndjson').describe('Input file format: ndjson, json, or csv'), + source_format: z.enum(SOURCE_FORMATS).default('ndjson').describe('Input file format: ndjson (one doc per line), json (JSON array or one doc per line), csv, or bulk-ndjson (already-formatted action+doc line pairs, as produced by `dump`)'), csv_delimiter: z.string().optional().describe('CSV column delimiter (default: ",")'), csv_columns: z.string().optional().describe('Comma-separated list of column names (overrides CSV header row)'), skip_header: z.boolean().optional().describe('Skip the first row of a CSV file'), @@ -52,23 +52,29 @@ const inputSchema = z.object({ type BulkIngestInput = z.infer /** - * Splits an array of documents into batches where each batch's serialized - * size does not exceed the byte threshold. + * Splits items into batches where each batch's serialized size does not exceed + * `flushBytes`. `sizeOf` defaults to the JSON-serialised byte length plus a + * trailing newline; pass a custom callback for items already represented as + * strings (e.g. pre-formatted bulk pairs). */ -function splitIntoBatches (docs: unknown[], flushBytes: number): unknown[][] { - const batches: unknown[][] = [] - let currentBatch: unknown[] = [] +function splitIntoBatches ( + items: T[], + flushBytes: number, + sizeOf: (item: T) => number = (i) => JSON.stringify(i).length + 1, +): T[][] { + const batches: T[][] = [] + let currentBatch: T[] = [] let currentSize = 0 - for (const doc of docs) { - const docSize = JSON.stringify(doc).length + 1 // +1 for newline - if (currentBatch.length > 0 && currentSize + docSize > flushBytes) { + for (const item of items) { + const itemSize = sizeOf(item) + if (currentBatch.length > 0 && currentSize + itemSize > flushBytes) { batches.push(currentBatch) currentBatch = [] currentSize = 0 } - currentBatch.push(doc) - currentSize += docSize + currentBatch.push(item) + currentSize += itemSize } if (currentBatch.length > 0) { batches.push(currentBatch) @@ -94,11 +100,60 @@ function parseByFormat (raw: string, opts: BulkIngestInput): unknown[] { /** Returns the default glob pattern for the given source format. */ function defaultGlob (format: SourceFormat): string { if (format === 'csv') return '**/*.csv' + if (format === 'bulk-ndjson') return '**/*.ndjson' return '**/*.{json,ndjson,jsonl}' } -/** Collects documents from the resolved input source. */ -function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProcessed: number } { +const BULK_ACTIONS = new Set(['index', 'create', 'update', 'delete']) + +/** + * Parses raw text containing pre-formatted bulk action+doc line pairs. + * Each pair is returned as an `"action\ndoc"` string ready to be joined into a `_bulk` body. + */ +export function parseBulkNdjsonPairs (raw: string): string[] { + const pairs: string[] = [] + let action: string | undefined + let lineNum = 0 + let nonEmptyCount = 0 + + for (const line of raw.split('\n')) { + lineNum++ + const trimmed = line.trim() + if (trimmed.length === 0) continue + nonEmptyCount++ + + if (action == null) { + let parsed: unknown + try { + parsed = JSON.parse(trimmed) + } catch (err) { + throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: ${err instanceof Error ? err.message : String(err)}`, { cause: err }) + } + if (parsed == null || typeof parsed !== 'object' || Array.isArray(parsed)) { + throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected an object`) + } + const keys = Object.keys(parsed as Record) + if (keys.length !== 1 || !BULK_ACTIONS.has(keys[0]!)) { + throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected {"index"|"create"|"update"|"delete": ...}, got: ${trimmed.slice(0, 80)}`) + } + action = trimmed + } else { + pairs.push(`${action}\n${trimmed}`) + action = undefined + } + } + + if (action != null) { + throw new Error(`bulk-ndjson: expected an even number of non-empty lines (action + doc pairs), got ${nonEmptyCount}`) + } + return pairs +} + +/** + * Resolves the active input source (file, glob'd directory, or stdin) and returns + * one raw text chunk per source. `filesProcessed` is 0 when reading from stdin. + */ +function resolveRawInputs (opts: BulkIngestInput): { raws: string[], filesProcessed: number } { const { data_file, data_dir } = opts if (data_file != null && data_dir != null) { @@ -113,12 +168,10 @@ function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProce if (files.length === 0) { throw new Error(`No files matched pattern "${resolvedPattern}" in ${data_dir}`) } - const allDocs: unknown[] = [] - for (const file of files) { - const raw = readFileSync(file, 'utf-8') - allDocs.push(...parseByFormat(raw, opts)) + return { + raws: files.map((f) => readFileSync(f, 'utf-8')), + filesProcessed: files.length, } - return { docs: allDocs, filesProcessed: files.length } } if (data_file != null) { @@ -126,22 +179,29 @@ function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProce if (raw == null || raw.trim().length === 0) { throw new Error('No input data received from file') } - return { docs: parseByFormat(raw, opts), filesProcessed: 1 } + return { raws: [raw], filesProcessed: 1 } } - // Fall back to stdin const raw = readRawInput() if (raw == null || raw.trim().length === 0) { throw new Error('No input provided. Use --data-file, --data-dir, or pipe data to stdin') } - return { docs: parseByFormat(raw, opts), filesProcessed: 0 } + return { raws: [raw], filesProcessed: 0 } +} + +/** Collects documents from the resolved input source. */ +function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProcessed: number } { + const { raws, filesProcessed } = resolveRawInputs(opts) + const docs: unknown[] = [] + for (const raw of raws) docs.push(...parseByFormat(raw, opts)) + return { docs, filesProcessed } } /** Sends a single bulk batch to Elasticsearch. Returns the count of errors. */ async function sendBatch ( transport: EsClient, ndjsonBody: string, - index: string + index: string | undefined ): Promise<{ errors: number, total: number }> { const path = index != null ? `/${encodeURIComponent(index)}/_bulk` : '/_bulk' const result = await transport.request( @@ -165,6 +225,10 @@ function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) { return async (parsed: { input?: BulkIngestInput; options: Record }): Promise => { const opts = parsed.input! + if (opts.source_format !== 'bulk-ndjson' && opts.index == null) { + return { error: { code: 'input_error', message: '--index is required (omit only when --source-format bulk-ndjson)' } } + } + let transport: EsClient try { transport = deps.getEsClient() @@ -172,6 +236,10 @@ function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) { return missingConfigError(err) } + if (opts.source_format === 'bulk-ndjson') { + return runBulkNdjson(opts, transport) + } + let docs: unknown[] let filesProcessed: number try { @@ -229,6 +297,53 @@ function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) { } } +/** + * Ingests pre-formatted bulk NDJSON (action + document line pairs, as produced by `dump`). + * Reuses retry, concurrency, and progress reporting from the main flow; the only difference + * is that the input is already bulk-shaped, so each pair is sent through verbatim. + */ +async function runBulkNdjson (opts: BulkIngestInput, transport: EsClient): Promise { + let pairs: string[] + let filesProcessed: number + try { + const resolved = resolveRawInputs(opts) + pairs = resolved.raws.flatMap(parseBulkNdjsonPairs) + filesProcessed = resolved.filesProcessed + } catch (err) { + return { error: { code: 'input_error', message: err instanceof Error ? err.message : String(err) } } + } + + if (pairs.length === 0) { + return { total: 0, succeeded: 0, failed: 0, retries: 0, elapsed_ms: 0 } + } + + const batches = splitIntoBatches(pairs, opts.flush_bytes, (p) => p.length + 1) + const reporter = new ProgressReporter() + reporter.filesProcessed = filesProcessed + + try { + await runWithConcurrency(batches, opts.concurrency, async (batch) => { + const body = batch.join('\n') + '\n' + const result = await retryWithBackoff( + async () => { + const res = await sendBatch(transport, body, opts.index) + if (res.errors > 0 && res.errors === res.total) { + throw new Error(`Bulk batch failed: ${res.errors}/${res.total} errors`) + } + return res + }, + { retries: opts.retries, delay: opts.retry_delay } + ) + reporter.report(result.total, result.errors) + return result + }) + } catch (err) { + return transportError(err) + } + + return reporter.summary() +} + export function createBulkIngestCommand (deps?: BulkIngestDeps): OpaqueCommandHandle { return defineCommand({ name: 'bulk-ingest', diff --git a/src/es/helpers/dump.ts b/src/es/helpers/dump.ts new file mode 100644 index 00000000..534d97e0 --- /dev/null +++ b/src/es/helpers/dump.ts @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { z } from 'zod' +import { closeSync, openSync, writeSync } from 'node:fs' +import type { EsClient } from '../../lib/es-client.ts' +import { defineCommand } from '../../factory.ts' +import type { OpaqueCommandHandle, JsonValue } from '../../factory.ts' +import { getEsClient } from '../../lib/es-client.ts' +import { missingConfigError, transportError } from '../errors.ts' +import { readRawInput } from './shared.ts' + +/** Dependencies injectable for testing. */ +export interface DumpDeps { + getEsClient: () => EsClient + stdout: { write: (chunk: string) => boolean } + stderr: { write: (chunk: string) => boolean } +} + +const defaultDeps: DumpDeps = { + getEsClient, + stdout: process.stdout, + stderr: process.stderr, +} + +const inputSchema = z.object({ + indices: z.string().describe('Comma-separated list of indices to dump'), + size: z.number().default(500).describe('Documents per search batch'), + keep_alive: z.string().default('1m').describe('Point-in-time keep-alive duration'), + output: z.string().optional().describe('Path to write bulk-format NDJSON; omit to stream to stdout'), + skip_index_name: z.boolean().optional().describe('Omit _index from action lines so the dump can be re-targeted at a different index'), + add_id: z.boolean().optional().describe('Include _id in action lines so document IDs round-trip'), + query: z.string().optional().describe('Query DSL clause as a JSON string, e.g. \'{"term":{"status":"active"}}\''), + query_file: z.string().optional().describe('Path to a file containing a Query DSL clause (use - for stdin)'), +}) + +type DumpInput = z.infer + +interface Hit { + _id: string + _source: unknown + sort?: unknown[] +} + +interface SearchResponse { + pit_id?: string + hits?: { hits?: Hit[] } +} + +interface PitResponse { + id?: string +} + +function resolveQuery (input: DumpInput): unknown { + if (input.query != null) { + return JSON.parse(input.query) + } + if (input.query_file != null) { + const raw = readRawInput(input.query_file) + if (raw == null || raw.trim().length === 0) { + throw new Error('--query-file is empty') + } + return JSON.parse(raw) + } + return { match_all: {} } +} + +interface DumpIndexParams { + index: string + query: unknown + size: number + keepAlive: string + write: (chunk: string) => void + skipIndexName: boolean + addId: boolean +} + +async function dumpOneIndex (transport: EsClient, params: DumpIndexParams): Promise { + const { index, query, size, keepAlive, write, skipIndexName, addId } = params + + const pitOpen = await transport.request({ + method: 'POST', + path: `/${encodeURIComponent(index)}/_pit`, + querystring: { keep_alive: keepAlive }, + }) + if (pitOpen.id == null) { + throw new Error(`Failed to open point-in-time for index "${index}"`) + } + + // Action-line prefix is invariant across hits when --add-id is off; precompute it. + // When --add-id is on, only the `_id` field varies, so we still avoid building a + // throwaway object + JSON.stringify per hit. + const indexJson = JSON.stringify(index) + const actionPrefix = addId + ? (skipIndexName ? '{"index":{"_id":' : `{"index":{"_index":${indexJson},"_id":`) + : (skipIndexName ? '{"index":{}}' : `{"index":{"_index":${indexJson}}}`) + const actionSuffix = addId ? '}}' : '' + + let pitId: string = pitOpen.id + let searchAfter: unknown[] | undefined + let total = 0 + + try { + while (true) { + const body: Record = { + size, + pit: { id: pitId, keep_alive: keepAlive }, + query, + sort: [{ _shard_doc: 'asc' }], + } + if (searchAfter != null) body.search_after = searchAfter + + const result = await transport.request({ + method: 'POST', + path: '/_search', + body, + }) + const hits = result.hits?.hits ?? [] + if (hits.length === 0) break + + for (const hit of hits) { + const actionLine = addId + ? actionPrefix + JSON.stringify(hit._id) + actionSuffix + : actionPrefix + write(`${actionLine}\n${JSON.stringify(hit._source)}\n`) + } + total += hits.length + + if (result.pit_id != null) pitId = result.pit_id + const lastSort = hits[hits.length - 1]!.sort + if (lastSort == null || lastSort.length === 0) break + searchAfter = lastSort + } + } finally { + try { + await transport.request({ method: 'DELETE', path: '/_pit', body: { id: pitId } }) + } catch { + // Best-effort cleanup: the PIT will expire naturally after keep_alive. + } + } + + return total +} + +function createDumpHandler (deps: DumpDeps = defaultDeps) { + return async (parsed: { input?: DumpInput, options: Record }): Promise => { + const opts = parsed.input! + const jsonMode = parsed.options['json'] === true + + let transport: EsClient + try { + transport = deps.getEsClient() + } catch (err) { + return missingConfigError(err) + } + + if (jsonMode && opts.output == null) { + return { + error: { + code: 'input_error', + message: '--json requires --output: stats JSON cannot share stdout with streamed NDJSON' + } + } + } + + const indices = opts.indices.split(',').map((s) => s.trim()).filter((s) => s.length > 0) + if (indices.length === 0) { + return { error: { code: 'input_error', message: '--indices must contain at least one index name' } } + } + + let query: unknown + try { + query = resolveQuery(opts) + } catch (err) { + return { + error: { + code: 'input_error', + message: `Failed to parse query: ${err instanceof Error ? err.message : String(err)}` + } + } + } + + const fd = opts.output != null ? openSync(opts.output, 'w') : null + const write = fd != null + ? (chunk: string) => { writeSync(fd, chunk) } + : (chunk: string) => { deps.stdout.write(chunk) } + const perIndex: Array<{ name: string, docs: number }> = [] + const startTime = Date.now() + + try { + for (const index of indices) { + const docs = await dumpOneIndex(transport, { + index, + query, + size: opts.size, + keepAlive: opts.keep_alive, + write, + skipIndexName: opts.skip_index_name === true, + addId: opts.add_id === true, + }) + perIndex.push({ name: index, docs }) + } + } catch (err) { + if (fd != null) closeSync(fd) + return transportError(err) + } + + if (fd != null) closeSync(fd) + + const total = perIndex.reduce((n, e) => n + e.docs, 0) + const elapsed_ms = Date.now() - startTime + deps.stderr.write(`Dumped ${total} document(s) from ${indices.length} index/indices in ${elapsed_ms}ms\n`) + + return { indices: perIndex, total_docs: total, elapsed_ms } + } +} + +export function createDumpCommand (deps?: DumpDeps): OpaqueCommandHandle { + return defineCommand({ + name: 'dump', + description: 'Dump one or more indices as bulk-format NDJSON (action + document line pairs), suitable for re-ingestion via the `_bulk` API.', + input: inputSchema, + handler: createDumpHandler(deps), + intent: { destructive: false, idempotent: true, scope: 'global' }, + formatOutput: (result, parsed) => { + const r = result as Record + if (r.error != null) return JSON.stringify(result, null, 2) + '\n' + // When streaming to stdout, suppress the stats line to keep NDJSON on stdout clean. + const opts = parsed.input as DumpInput | undefined + if (opts?.output == null) return '' + const indices = r.indices as Array<{ name: string, docs: number }> + const lines = [ + `Output: ${opts.output}`, + `Total: ${r.total_docs}`, + `Elapsed: ${r.elapsed_ms}ms`, + ...indices.map((i) => ` ${i.name}: ${i.docs}`), + ] + return lines.join('\n') + '\n' + } + }) +} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts index fd71629c..be1cfaf6 100644 --- a/src/es/helpers/register.ts +++ b/src/es/helpers/register.ts @@ -9,6 +9,7 @@ import { createScrollSearchCommand } from './scroll-search.ts' import { createBulkIngestCommand } from './bulk-ingest.ts' import { createMsearchCommand } from './msearch.ts' import { createWatchCommand } from './watch.ts' +import { createDumpCommand } from './dump.ts' /** * Registers all high-level helper commands under a `helpers` group. @@ -23,6 +24,7 @@ export function registerHelperCommands (): OpaqueCommandHandle { createScrollSearchCommand(), createBulkIngestCommand(), createMsearchCommand(), - createWatchCommand() + createWatchCommand(), + createDumpCommand() ) } diff --git a/test/es/helpers/bulk-ingest.test.ts b/test/es/helpers/bulk-ingest.test.ts index 304f6271..9f83e661 100644 --- a/test/es/helpers/bulk-ingest.test.ts +++ b/test/es/helpers/bulk-ingest.test.ts @@ -421,4 +421,195 @@ describe('bulk-ingest command', () => { assert.strictEqual(doc.score, 3.14) }) }) + + describe('pre-formatted bulk-ndjson ingestion', () => { + it('streams action+doc pairs verbatim to /_bulk without --index', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, + '{"index":{"_index":"src-idx"}}\n{"v":1}\n{"index":{"_index":"src-idx"}}\n{"v":2}\n') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + assert.equal(requests[0]!.params.path, '/_bulk') + const body = requests[0]!.params.body as string + const lines = body.split('\n').filter((l) => l.length > 0) + assert.equal(lines.length, 4) + assert.deepStrictEqual(JSON.parse(lines[0]!), { index: { _index: 'src-idx' } }) + assert.deepStrictEqual(JSON.parse(lines[1]!), { v: 1 }) + assert.deepStrictEqual(JSON.parse(lines[3]!), { v: 2 }) + }) + + it('routes through /{index}/_bulk when --index is provided', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"index":{}}\n{"v":1}\n') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'target', + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) + + assert.equal(requests[0]!.params.path, '/target/_bulk') + }) + + it('splits pre-formatted pairs into batches by byte size', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + const lines: string[] = [] + for (let i = 0; i < 100; i++) { + lines.push('{"index":{"_index":"src"}}') + lines.push(JSON.stringify({ id: i, data: 'x'.repeat(100) })) + } + writeFileSync(filePath, lines.join('\n') + '\n') + + const { transport, requests } = mockTransport( + Array.from({ length: 100 }, () => successResponse(1)) + ) + + await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--flush-bytes', '500', + '--json' + ], makeDeps(transport)) + + assert.ok(requests.length > 1, `expected multiple batches, got ${requests.length}`) + }) + + it('errors on an odd number of non-empty lines', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"index":{}}\n{"v":1}\n{"index":{}}\n') + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + assert.match(err.message as string, /even number/i) + }) + + it('errors when action line is not {"index|create|update|delete": ...}', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"foo":{}}\n{"v":1}\n') + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + }) + + it('reads bulk-ndjson from --data-dir with multiple files', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-dir-')) + writeFileSync(join(tmpDir, 'a.ndjson'), '{"index":{}}\n{"v":1}\n') + writeFileSync(join(tmpDir, 'b.ndjson'), '{"index":{}}\n{"v":2}\n') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--index', 'target', + '--data-dir', tmpDir, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.match(body, /"v":1/) + assert.match(body, /"v":2/) + }) + + it('errors when --data-file and --data-dir are both provided', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + writeFileSync(join(tmpDir, 'a.ndjson'), '{"index":{}}\n{"v":1}\n') + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--index', 'target', + '--data-file', join(tmpDir, 'a.ndjson'), + '--data-dir', tmpDir, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + assert.match(err.message as string, /only one input source/i) + }) + + it('errors when --data-dir matches no files', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-empty-')) + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--data-dir', tmpDir, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + assert.match(err.message as string, /No files matched/) + }) + + it('returns empty summary when --data-file has no content', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-empty-')) + const filePath = join(tmpDir, 'empty.ndjson') + writeFileSync(filePath, '') + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + }) + + it('still requires --index for non-bulk-ndjson source formats', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.json') + writeFileSync(filePath, '[{"v":1}]') + + const { transport } = mockTransport([successResponse(1)]) + + const result = await runCommand([ + '--data-file', filePath, + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error') + assert.match(err.message as string, /--index is required/) + }) + }) }) diff --git a/test/es/helpers/dump.test.ts b/test/es/helpers/dump.test.ts new file mode 100644 index 00000000..e1196603 --- /dev/null +++ b/test/es/helpers/dump.test.ts @@ -0,0 +1,403 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { mkdtempSync, writeFileSync, readFileSync } from 'node:fs' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import type { EsClient, EsRequestParams } from '../../../src/lib/es-client.ts' +import { createDumpCommand } from '../../../src/es/helpers/dump.ts' +import type { DumpDeps } from '../../../src/es/helpers/dump.ts' +import { _testSetStdinReader } from '../../../src/factory.ts' +import { Command } from 'commander' + +interface PitResponse { id: string } +interface Hit { _id: string; _source: unknown; sort: unknown[] } +interface SearchResponse { pit_id: string; hits: { hits: Hit[] } } + +type MockResponse = PitResponse | SearchResponse | Record + +function mockTransport (responses: MockResponse[]): { + transport: EsClient + requests: Array<{ params: EsRequestParams }> +} { + const requests: Array<{ params: EsRequestParams }> = [] + let callIndex = 0 + const transport = { + request: async (params: EsRequestParams) => { + requests.push({ params }) + const response = responses[callIndex] ?? {} + callIndex++ + return response + } + } as unknown as EsClient + return { transport, requests } +} + +function captureIO (): { stdout: { write: (s: string) => boolean, chunks: string[] }, stderr: { write: (s: string) => boolean, chunks: string[] } } { + const stdoutChunks: string[] = [] + const stderrChunks: string[] = [] + return { + stdout: { chunks: stdoutChunks, write: (s) => { stdoutChunks.push(s); return true } }, + stderr: { chunks: stderrChunks, write: (s) => { stderrChunks.push(s); return true } } + } +} + +function makeDeps (transport: EsClient, io?: ReturnType): DumpDeps { + const channel = io ?? captureIO() + return { getEsClient: () => transport, stdout: channel.stdout, stderr: channel.stderr } +} + +async function runCommand (args: string[], deps: DumpDeps): Promise<{ stdout: string, stderr: string, programOut: string, programErr: string }> { + const cmd = createDumpCommand(deps) + const program = new Command() + program.exitOverride() + program.option('--json', 'output as JSON') + program.addCommand(cmd) + + const origStdoutWrite = process.stdout.write.bind(process.stdout) + const origStderrWrite = process.stderr.write.bind(process.stderr) + const progOut: string[] = [] + const progErr: string[] = [] + process.stdout.write = ((chunk: string) => { progOut.push(typeof chunk === 'string' ? chunk : chunk.toString()); return true }) as typeof process.stdout.write + process.stderr.write = ((chunk: string) => { progErr.push(typeof chunk === 'string' ? chunk : chunk.toString()); return true }) as typeof process.stderr.write + + const restoreStdin = _testSetStdinReader(() => '') + try { + await program.parseAsync(['node', 'test', 'dump', ...args]) + } finally { + restoreStdin() + process.stdout.write = origStdoutWrite + process.stderr.write = origStderrWrite + process.exitCode = 0 + } + + const depStdout = (deps.stdout as { chunks: string[] }).chunks ?? [] + const depStderr = (deps.stderr as { chunks: string[] }).chunks ?? [] + return { + stdout: depStdout.join(''), + stderr: depStderr.join(''), + programOut: progOut.join(''), + programErr: progErr.join('') + } +} + +describe('dump command', () => { + it('creates a command named dump', () => { + const { transport } = mockTransport([]) + const cmd = createDumpCommand(makeDeps(transport)) + assert.equal(cmd.name(), 'dump') + }) + + it('opens a PIT, paginates with search_after, and streams bulk-format NDJSON to stdout', async () => { + const io = captureIO() + const { transport, requests } = mockTransport([ + // open_point_in_time + { id: 'pit-1' }, + // initial search + { + pit_id: 'pit-2', + hits: { hits: [ + { _id: 'a', _source: { v: 1 }, sort: [10] }, + { _id: 'b', _source: { v: 2 }, sort: [20] }, + ] } + }, + // second page + { + pit_id: 'pit-3', + hits: { hits: [ + { _id: 'c', _source: { v: 3 }, sort: [30] }, + ] } + }, + // empty page + { pit_id: 'pit-3', hits: { hits: [] } }, + // close_point_in_time + {} + ]) + + await runCommand(['--indices', 'my-idx'], makeDeps(transport, io)) + + const lines = io.stdout.chunks.join('').split('\n').filter((l) => l.length > 0) + assert.equal(lines.length, 6, `expected 6 NDJSON lines (3 action + 3 doc), got: ${lines.length}`) + assert.deepStrictEqual(JSON.parse(lines[0]!), { index: { _index: 'my-idx' } }) + assert.deepStrictEqual(JSON.parse(lines[1]!), { v: 1 }) + assert.deepStrictEqual(JSON.parse(lines[2]!), { index: { _index: 'my-idx' } }) + assert.deepStrictEqual(JSON.parse(lines[3]!), { v: 2 }) + assert.deepStrictEqual(JSON.parse(lines[4]!), { index: { _index: 'my-idx' } }) + assert.deepStrictEqual(JSON.parse(lines[5]!), { v: 3 }) + + // PIT open + 3 searches + PIT close = 5 + assert.equal(requests.length, 5) + assert.equal(requests[0]!.params.method, 'POST') + assert.ok(requests[0]!.params.path.includes('_pit')) + + const initialBody = requests[1]!.params.body as Record + assert.equal(initialBody.size, 500) + assert.deepStrictEqual(initialBody.query, { match_all: {} }) + assert.deepStrictEqual(initialBody.sort, [{ _shard_doc: 'asc' }]) + assert.equal((initialBody.pit as { id: string }).id, 'pit-1') + + const secondBody = requests[2]!.params.body as Record + assert.deepStrictEqual(secondBody.search_after, [20]) + assert.equal((secondBody.pit as { id: string }).id, 'pit-2') + + const closeReq = requests[4]! + assert.equal(closeReq.params.method, 'DELETE') + assert.ok(closeReq.params.path.includes('_pit')) + assert.deepStrictEqual(closeReq.params.body, { id: 'pit-3' }) + }) + + it('respects --size and --keep-alive', async () => { + const io = captureIO() + const { transport, requests } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx', '--size', '50', '--keep-alive', '5m'], makeDeps(transport, io)) + + const pitOpen = requests[0]!.params + assert.equal((pitOpen.querystring as Record).keep_alive, '5m') + + const search = requests[1]!.params.body as Record + assert.equal(search.size, 50) + assert.equal((search.pit as { keep_alive: string }).keep_alive, '5m') + }) + + it('emits {"index":{}} when --skip-index-name is set', async () => { + const io = captureIO() + const { transport } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { v: 1 }, sort: [1] }] } }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'my-idx', '--skip-index-name'], makeDeps(transport, io)) + + const lines = io.stdout.chunks.join('').split('\n').filter((l) => l.length > 0) + assert.deepStrictEqual(JSON.parse(lines[0]!), { index: {} }) + assert.deepStrictEqual(JSON.parse(lines[1]!), { v: 1 }) + }) + + it('emits _id in action lines when --add-id is set', async () => { + const io = captureIO() + const { transport } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { v: 1 }, sort: [1] }] } }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'my-idx', '--add-id'], makeDeps(transport, io)) + + const lines = io.stdout.chunks.join('').split('\n').filter((l) => l.length > 0) + const action = JSON.parse(lines[0]!) as { index: Record } + assert.equal(action.index._id, 'a') + assert.equal(action.index._index, 'my-idx') + }) + + it('reads --query JSON from a file', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'dump-test-')) + const queryPath = join(tmpDir, 'q.json') + writeFileSync(queryPath, '{"term":{"status":"active"}}') + + const io = captureIO() + const { transport, requests } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx', '--query-file', queryPath], makeDeps(transport, io)) + + const body = requests[1]!.params.body as Record + assert.deepStrictEqual(body.query, { term: { status: 'active' } }) + }) + + it('reads --query JSON inline', async () => { + const io = captureIO() + const { transport, requests } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx', '--query', '{"match":{"f":"v"}}'], makeDeps(transport, io)) + + const body = requests[1]!.params.body as Record + assert.deepStrictEqual(body.query, { match: { f: 'v' } }) + }) + + it('writes NDJSON to --output file when provided', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'dump-test-')) + const outPath = join(tmpDir, 'out.ndjson') + + const io = captureIO() + const { transport } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { v: 1 }, sort: [1] }] } }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx', '--output', outPath], makeDeps(transport, io)) + + const fileContent = readFileSync(outPath, 'utf-8') + const lines = fileContent.split('\n').filter((l) => l.length > 0) + assert.equal(lines.length, 2) + assert.deepStrictEqual(JSON.parse(lines[0]!), { index: { _index: 'idx' } }) + assert.deepStrictEqual(JSON.parse(lines[1]!), { v: 1 }) + + // NDJSON should not be in deps.stdout when --output is set + assert.equal(io.stdout.chunks.length, 0) + }) + + it('dumps multiple indices with separate PITs', async () => { + const io = captureIO() + const { transport, requests } = mockTransport([ + // idx1 + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { i: 1 }, sort: [1] }] } }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {}, + // idx2 + { id: 'pit-2' }, + { pit_id: 'pit-2', hits: { hits: [{ _id: 'b', _source: { i: 2 }, sort: [1] }] } }, + { pit_id: 'pit-2', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx1,idx2'], makeDeps(transport, io)) + + const lines = io.stdout.chunks.join('').split('\n').filter((l) => l.length > 0) + assert.equal(lines.length, 4) + assert.deepStrictEqual(JSON.parse(lines[0]!), { index: { _index: 'idx1' } }) + assert.deepStrictEqual(JSON.parse(lines[2]!), { index: { _index: 'idx2' } }) + + // pit-1 open targets idx1 + assert.ok(requests[0]!.params.path.includes('idx1')) + // pit-2 open targets idx2 + assert.ok(requests[4]!.params.path.includes('idx2')) + }) + + it('closes the PIT on transport error mid-pagination', async () => { + const io = captureIO() + let deleteCount = 0 + let searchCount = 0 + const transport = { + request: async (params: EsRequestParams) => { + if (params.method === 'DELETE' && params.path.includes('_pit')) { + deleteCount++ + return {} + } + if (params.path.includes('_pit') && params.method === 'POST') { + return { id: 'pit-err' } + } + if (params.path.includes('_search')) { + searchCount++ + if (searchCount === 1) { + return { pit_id: 'pit-err', hits: { hits: [{ _id: 'a', _source: { v: 1 }, sort: [1] }] } } + } + throw new Error('boom') + } + return {} + } + } as unknown as EsClient + + await runCommand(['--indices', 'idx'], makeDeps(transport, io)) + + assert.equal(deleteCount, 1, 'PIT must be closed on transport error') + }) + + it('returns missing_config error when ES is not configured', async () => { + const io = captureIO() + const deps: DumpDeps = { + getEsClient: () => { throw new Error('missing_config: no ES configured') }, + stdout: io.stdout, + stderr: io.stderr + } + + const { programOut, programErr } = await runCommand(['--indices', 'idx', '--json'], deps) + const payload = JSON.parse((programErr.trim() || programOut.trim())) as Record + const err = payload.error as Record + assert.equal(err.code, 'missing_config') + }) + + it('errors when --query-file is empty', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'dump-test-')) + const queryPath = join(tmpDir, 'empty.json') + writeFileSync(queryPath, '') + + const io = captureIO() + const { transport } = mockTransport([]) + const { programOut, programErr } = await runCommand( + ['--indices', 'idx', '--query-file', queryPath, '--output', join(tmpDir, 'out.ndjson'), '--json'], + makeDeps(transport, io) + ) + const payload = JSON.parse(programErr.trim() || programOut.trim()) as Record + const err = payload.error as Record + assert.equal(err.code, 'input_error') + }) + + it('returns transport_error when PIT response is missing id', async () => { + const io = captureIO() + const { transport } = mockTransport([ + // PIT response without id + {}, + ]) + const tmpDir = mkdtempSync(join(tmpdir(), 'dump-test-')) + const { programOut, programErr } = await runCommand( + ['--indices', 'idx', '--output', join(tmpDir, 'out.ndjson'), '--json'], + makeDeps(transport, io) + ) + const payload = JSON.parse(programErr.trim() || programOut.trim()) as Record + const err = payload.error as Record + assert.equal(err.code, 'transport_error') + }) + + it('rejects --json when --output is not provided', async () => { + const io = captureIO() + const { transport } = mockTransport([]) + const { programOut, programErr } = await runCommand( + ['--indices', 'idx', '--json'], + makeDeps(transport, io) + ) + const payload = JSON.parse(programErr.trim() || programOut.trim()) as Record + const err = payload.error as Record + assert.equal(err.code, 'input_error') + assert.match(err.message as string, /--json requires --output/) + }) + + it('stops paginating when the last hit has no sort value', async () => { + const io = captureIO() + const { transport } = mockTransport([ + { id: 'pit-1' }, + // single page, hit lacks `sort` → loop must break instead of dereferencing undefined + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { v: 1 } }] } }, + {} + ]) + await runCommand(['--indices', 'idx'], makeDeps(transport, io)) + const lines = io.stdout.chunks.join('').split('\n').filter((l) => l.length > 0) + assert.equal(lines.length, 2) + }) + + it('writes a summary line to stderr', async () => { + const io = captureIO() + const { transport } = mockTransport([ + { id: 'pit-1' }, + { pit_id: 'pit-1', hits: { hits: [{ _id: 'a', _source: { v: 1 }, sort: [1] }] } }, + { pit_id: 'pit-1', hits: { hits: [] } }, + {} + ]) + + await runCommand(['--indices', 'idx'], makeDeps(transport, io)) + + const text = io.stderr.chunks.join('') + assert.ok(/1\s+document/i.test(text) || text.includes('1'), `expected stderr to mention 1 document, got: ${text}`) + }) +}) From f83f44bb6cf6774f76e3f75e90712559d9809bac Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Tue, 16 Jun 2026 12:35:05 +0100 Subject: [PATCH 2/9] chore: regenerate NOTICE.txt for yaml@2.8.4 --- NOTICE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE.txt b/NOTICE.txt index adf8baa9..7519db8c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1133,7 +1133,7 @@ THIS SOFTWARE. ------------------------------------------------------------------------ -yaml@2.9.0 +yaml@2.8.4 License: ISC Repository: https://github.com/eemeli/yaml Publisher: Eemeli Aro From d4c3a11743c735e3aa068f1b425a24525d7741f8 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Tue, 16 Jun 2026 12:52:22 +0100 Subject: [PATCH 3/9] chore: regenerate NOTICE.txt for commander@15.0.0 Supersedes the previous regeneration commit, which ran against a locally-symlinked node_modules with a stale yaml version. The actual drift on main was commander 14 -> 15 from #410, which had updated the lockfile but not NOTICE.txt. --- NOTICE.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 7519db8c..11184980 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -377,7 +377,7 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI ------------------------------------------------------------------------ -commander@14.0.3 +commander@15.0.0 License: MIT Repository: https://github.com/tj/commander.js Publisher: TJ Holowaychuk @@ -1133,7 +1133,7 @@ THIS SOFTWARE. ------------------------------------------------------------------------ -yaml@2.8.4 +yaml@2.9.0 License: ISC Repository: https://github.com/eemeli/yaml Publisher: Eemeli Aro From 3fd89fd895dddff21f26c6b0d033f0bfd5b6c3c6 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Tue, 16 Jun 2026 12:53:27 +0100 Subject: [PATCH 4/9] docs: document dump helper and bulk-ndjson restore in README Adds a "Dump and restore an index" subsection under the `es` section with the round-trip example, a flag reference for `dump`, and a note on the new `bulk-ingest --source-format bulk-ndjson` mode. --- README.md | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/README.md b/README.md index 73dcca11..dc0a09fd 100644 --- a/README.md +++ b/README.md @@ -369,6 +369,50 @@ elastic es update --index my-index --id abc123 Run `elastic es --help` for all available options on any command. +##### Dump and restore an index + +`elastic es helpers dump` exports one or more indices as bulk-format NDJSON +(action + `_source` line pairs) using a per-index Point-in-Time and +`search_after` sorted by `_shard_doc` for a consistent read. The output is +shaped so it can be piped or fed straight into `elastic es helpers bulk-ingest` +with `--source-format bulk-ndjson`. + +Typical use case: capture a remote index for local debugging. + +```bash +# Export from the remote cluster, omit _index so the dump can be re-targeted, +# and filter by a Query DSL clause +elastic --use-context remote es helpers dump \ + --indices my-prod-idx \ + --skip-index-name \ + --query '{"range":{"@timestamp":{"gte":"now-1h"}}}' \ + --output dump.ndjson + +# Re-ingest into the local cluster under a new index name +elastic --use-context local es helpers bulk-ingest \ + --source-format bulk-ndjson \ + --index local-copy \ + --data-file dump.ndjson +``` + +Selected `dump` options (run `elastic es helpers dump --help` for the full set): + +| Option | Description | +|---|---| +| `--indices ` | Comma-separated list of indices to dump (required) | +| `--size ` | Documents per search batch (default `500`) | +| `--keep-alive ` | Point-in-time keep-alive (default `1m`) | +| `--output ` | Write NDJSON to a file; omit to stream to stdout | +| `--skip-index-name` | Omit `_index` from action lines so the dump can be re-targeted | +| `--add-id` | Include `_id` in action lines so document IDs round-trip | +| `--query ` | Query DSL clause as inline JSON | +| `--query-file ` | Path to a file containing a Query DSL clause (use `-` for stdin) | + +The companion `bulk-ingest --source-format bulk-ndjson` mode streams +pre-formatted action+doc line pairs verbatim into `_bulk`. `--index` is +optional in this mode: when omitted, requests go to `/_bulk` and the action +lines must carry `_index`. + #### `kb` - Kibana API Run Kibana API calls. Commands are organised by namespace (e.g. `data-views`, From 4a1cc1f13c5a7d91888acd19ff1bb9d82e70913f Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Wed, 17 Jun 2026 13:42:09 +0100 Subject: [PATCH 5/9] feat(es): close PIT and flush output on SIGINT/SIGTERM mid-dump Track the active PIT id and output fd in mutable refs visible to a SIGINT/SIGTERM handler that releases both before the process exits with code 130. The per-index `finally` and the signal handler share the same refs (and null them eagerly) so the two cleanup paths can't race into a double-close. Listeners are removed when the handler returns. Addresses Anaethelion review feedback on PR #415. --- src/es/helpers/dump.ts | 53 +++++++++++++++++++++++++- test/es/helpers/dump.test.ts | 72 +++++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/src/es/helpers/dump.ts b/src/es/helpers/dump.ts index 534d97e0..cc4512d5 100644 --- a/src/es/helpers/dump.ts +++ b/src/es/helpers/dump.ts @@ -67,6 +67,12 @@ function resolveQuery (input: DumpInput): unknown { return { match_all: {} } } +/** Mutable handle to the resources a SIGINT/SIGTERM cleanup needs to release. */ +interface AbortRefs { + pitId: string | null + fd: number | null +} + interface DumpIndexParams { index: string query: unknown @@ -75,10 +81,31 @@ interface DumpIndexParams { write: (chunk: string) => void skipIndexName: boolean addId: boolean + abortRefs: AbortRefs +} + +/** + * Best-effort cleanup of resources held by an in-flight dump. Used both by the + * SIGINT/SIGTERM handlers and (indirectly, via per-index `finally`) by the + * normal completion path. Idempotent: nulling each ref after release prevents + * double-close races between the signal handler and the per-index finally. + */ +export async function abortDump (transport: EsClient, refs: AbortRefs): Promise { + if (refs.fd != null) { + try { closeSync(refs.fd) } catch { /* best effort */ } + refs.fd = null + } + if (refs.pitId != null) { + const id = refs.pitId + refs.pitId = null + try { + await transport.request({ method: 'DELETE', path: '/_pit', body: { id } }) + } catch { /* best effort */ } + } } async function dumpOneIndex (transport: EsClient, params: DumpIndexParams): Promise { - const { index, query, size, keepAlive, write, skipIndexName, addId } = params + const { index, query, size, keepAlive, write, skipIndexName, addId, abortRefs } = params const pitOpen = await transport.request({ method: 'POST', @@ -88,6 +115,7 @@ async function dumpOneIndex (transport: EsClient, params: DumpIndexParams): Prom if (pitOpen.id == null) { throw new Error(`Failed to open point-in-time for index "${index}"`) } + abortRefs.pitId = pitOpen.id // Action-line prefix is invariant across hits when --add-id is off; precompute it. // When --add-id is on, only the `_id` field varies, so we still avoid building a @@ -128,12 +156,16 @@ async function dumpOneIndex (transport: EsClient, params: DumpIndexParams): Prom } total += hits.length - if (result.pit_id != null) pitId = result.pit_id + if (result.pit_id != null) { + pitId = result.pit_id + abortRefs.pitId = result.pit_id + } const lastSort = hits[hits.length - 1]!.sort if (lastSort == null || lastSort.length === 0) break searchAfter = lastSort } } finally { + if (abortRefs.pitId === pitId) abortRefs.pitId = null try { await transport.request({ method: 'DELETE', path: '/_pit', body: { id: pitId } }) } catch { @@ -189,6 +221,18 @@ function createDumpHandler (deps: DumpDeps = defaultDeps) { const perIndex: Array<{ name: string, docs: number }> = [] const startTime = Date.now() + // Refs shared with the signal handler so SIGINT/SIGTERM mid-dump still + // close the active PIT and flush the output fd before the process exits. + const abortRefs: AbortRefs = { pitId: null, fd } + const onAbort = (): void => { + abortDump(transport, abortRefs).finally(() => { + // 130 = 128 + SIGINT; matches what a shell reports for Ctrl+C. + process.exit(130) + }) + } + process.on('SIGINT', onAbort) + process.on('SIGTERM', onAbort) + try { for (const index of indices) { const docs = await dumpOneIndex(transport, { @@ -199,15 +243,20 @@ function createDumpHandler (deps: DumpDeps = defaultDeps) { write, skipIndexName: opts.skip_index_name === true, addId: opts.add_id === true, + abortRefs, }) perIndex.push({ name: index, docs }) } } catch (err) { if (fd != null) closeSync(fd) + process.off('SIGINT', onAbort) + process.off('SIGTERM', onAbort) return transportError(err) } if (fd != null) closeSync(fd) + process.off('SIGINT', onAbort) + process.off('SIGTERM', onAbort) const total = perIndex.reduce((n, e) => n + e.docs, 0) const elapsed_ms = Date.now() - startTime diff --git a/test/es/helpers/dump.test.ts b/test/es/helpers/dump.test.ts index e1196603..6885dfac 100644 --- a/test/es/helpers/dump.test.ts +++ b/test/es/helpers/dump.test.ts @@ -5,11 +5,11 @@ import { describe, it } from 'node:test' import assert from 'node:assert/strict' -import { mkdtempSync, writeFileSync, readFileSync } from 'node:fs' +import { mkdtempSync, writeFileSync, readFileSync, openSync, closeSync, fstatSync } from 'node:fs' import { join } from 'node:path' import { tmpdir } from 'node:os' import type { EsClient, EsRequestParams } from '../../../src/lib/es-client.ts' -import { createDumpCommand } from '../../../src/es/helpers/dump.ts' +import { createDumpCommand, abortDump } from '../../../src/es/helpers/dump.ts' import type { DumpDeps } from '../../../src/es/helpers/dump.ts' import { _testSetStdinReader } from '../../../src/factory.ts' import { Command } from 'commander' @@ -401,3 +401,71 @@ describe('dump command', () => { assert.ok(/1\s+document/i.test(text) || text.includes('1'), `expected stderr to mention 1 document, got: ${text}`) }) }) + +describe('abortDump (SIGINT/SIGTERM cleanup)', () => { + it('closes the open fd and DELETEs the active PIT', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'dump-abort-')) + const fd = openSync(join(tmpDir, 'partial.ndjson'), 'w') + + const requests: Array<{ params: EsRequestParams }> = [] + const transport = { + request: async (params: EsRequestParams) => { + requests.push({ params }) + return {} + } + } as unknown as EsClient + + const refs = { pitId: 'pit-mid-dump', fd } + await abortDump(transport, refs) + + assert.equal(refs.pitId, null) + assert.equal(refs.fd, null) + assert.equal(requests.length, 1) + assert.equal(requests[0]!.params.method, 'DELETE') + assert.ok(requests[0]!.params.path.includes('_pit')) + assert.deepStrictEqual(requests[0]!.params.body, { id: 'pit-mid-dump' }) + + // fd should be closed: fstatSync on a closed fd throws + assert.throws(() => fstatSync(fd)) + }) + + it('is a no-op when no resources are active', async () => { + const requests: Array<{ params: EsRequestParams }> = [] + const transport = { + request: async (params: EsRequestParams) => { requests.push({ params }); return {} } + } as unknown as EsClient + + const refs = { pitId: null, fd: null } + await abortDump(transport, refs) + assert.equal(requests.length, 0) + }) + + it('swallows DELETE failures (best effort)', async () => { + const transport = { + request: async () => { throw new Error('network down') } + } as unknown as EsClient + const refs = { pitId: 'pit-x', fd: null } + await assert.doesNotReject(() => abortDump(transport, refs)) + assert.equal(refs.pitId, null) + }) + + it('clears pitId before awaiting DELETE so concurrent aborts do not double-close', async () => { + let resolveRequest: () => void = () => {} + const requests: EsRequestParams[] = [] + const transport = { + request: async (params: EsRequestParams) => { + requests.push(params) + await new Promise((r) => { resolveRequest = r }) + return {} + } + } as unknown as EsClient + + const refs = { pitId: 'pit-1', fd: null } + const first = abortDump(transport, refs) + // While the first DELETE is in flight, a second abort must not re-issue it. + const second = abortDump(transport, refs) + resolveRequest() + await Promise.all([first, second]) + assert.equal(requests.length, 1) + }) +}) From d6896ffd14ffb2e7692b17a9863758ef3048714f Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Wed, 17 Jun 2026 13:49:12 +0100 Subject: [PATCH 6/9] fix(test): remove unused closeSync import in dump.test.ts --- test/es/helpers/dump.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/es/helpers/dump.test.ts b/test/es/helpers/dump.test.ts index 6885dfac..f59210af 100644 --- a/test/es/helpers/dump.test.ts +++ b/test/es/helpers/dump.test.ts @@ -5,7 +5,7 @@ import { describe, it } from 'node:test' import assert from 'node:assert/strict' -import { mkdtempSync, writeFileSync, readFileSync, openSync, closeSync, fstatSync } from 'node:fs' +import { mkdtempSync, writeFileSync, readFileSync, openSync, fstatSync } from 'node:fs' import { join } from 'node:path' import { tmpdir } from 'node:os' import type { EsClient, EsRequestParams } from '../../../src/lib/es-client.ts' From 128df0586d787701df194111ed29e1855c862bc7 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Fri, 19 Jun 2026 12:51:28 +0100 Subject: [PATCH 7/9] fix(es): address PR review feedback on dump/bulk-ndjson - dump: --query-file '-' now reads from stdin instead of trying to open a file literally named '-'. - dump: batch per-page hits into a single write to cut writeSync calls from O(docs) to O(pages); on a multi-million-doc dump this is syscall-bound vs network-bound. - bulk-ndjson: restrict accepted actions to `index` and `create`. The pair parser assumes every action is followed by a doc line, which is not true for `delete` (no doc) or `update` (needs `{"doc": ...}` envelope). The producer this format is designed for (dump) only emits `index`, so rejecting the others is safer than silently corrupting input. - bulk-ndjson: apply --pipeline and --routing as URL query params so they affect every action in the batch without rewriting pre-formatted action lines. - docs: move the long-form dump-and-restore guide to docs/cli/stack/es/ helpers/dump-and-restore.md; README links to it. Addresses JoshMock review on PR #415. --- README.md | 47 +++------- docs/cli/stack/es/helpers/dump-and-restore.md | 85 +++++++++++++++++++ src/es/helpers/bulk-ingest.ts | 30 +++++-- src/es/helpers/dump.ts | 13 ++- test/es/helpers/bulk-ingest.test.ts | 43 +++++++++- 5 files changed, 173 insertions(+), 45 deletions(-) create mode 100644 docs/cli/stack/es/helpers/dump-and-restore.md diff --git a/README.md b/README.md index dc0a09fd..f342600a 100644 --- a/README.md +++ b/README.md @@ -369,49 +369,22 @@ elastic es update --index my-index --id abc123 Run `elastic es --help` for all available options on any command. -##### Dump and restore an index +##### Helpers: dump and restore -`elastic es helpers dump` exports one or more indices as bulk-format NDJSON -(action + `_source` line pairs) using a per-index Point-in-Time and -`search_after` sorted by `_shard_doc` for a consistent read. The output is -shaped so it can be piped or fed straight into `elastic es helpers bulk-ingest` -with `--source-format bulk-ndjson`. - -Typical use case: capture a remote index for local debugging. +`elastic es helpers dump` exports indices as bulk-format NDJSON using a +Point-in-Time + `search_after`, and `elastic es helpers bulk-ingest +--source-format bulk-ndjson` re-ingests that output verbatim. Typical use +case: capture a remote index for local debugging. ```bash -# Export from the remote cluster, omit _index so the dump can be re-targeted, -# and filter by a Query DSL clause -elastic --use-context remote es helpers dump \ - --indices my-prod-idx \ - --skip-index-name \ - --query '{"range":{"@timestamp":{"gte":"now-1h"}}}' \ - --output dump.ndjson - -# Re-ingest into the local cluster under a new index name +elastic --use-context remote es helpers dump --indices my-prod-idx \ + --skip-index-name --output dump.ndjson elastic --use-context local es helpers bulk-ingest \ - --source-format bulk-ndjson \ - --index local-copy \ - --data-file dump.ndjson + --source-format bulk-ndjson --index local-copy --data-file dump.ndjson ``` -Selected `dump` options (run `elastic es helpers dump --help` for the full set): - -| Option | Description | -|---|---| -| `--indices ` | Comma-separated list of indices to dump (required) | -| `--size ` | Documents per search batch (default `500`) | -| `--keep-alive ` | Point-in-time keep-alive (default `1m`) | -| `--output ` | Write NDJSON to a file; omit to stream to stdout | -| `--skip-index-name` | Omit `_index` from action lines so the dump can be re-targeted | -| `--add-id` | Include `_id` in action lines so document IDs round-trip | -| `--query ` | Query DSL clause as inline JSON | -| `--query-file ` | Path to a file containing a Query DSL clause (use `-` for stdin) | - -The companion `bulk-ingest --source-format bulk-ndjson` mode streams -pre-formatted action+doc line pairs verbatim into `_bulk`. `--index` is -optional in this mode: when omitted, requests go to `/_bulk` and the action -lines must carry `_index`. +See [the dump-and-restore guide](docs/cli/stack/es/helpers/dump-and-restore.md) +for flags, consistency semantics, and a deeper walkthrough. #### `kb` - Kibana API diff --git a/docs/cli/stack/es/helpers/dump-and-restore.md b/docs/cli/stack/es/helpers/dump-and-restore.md new file mode 100644 index 00000000..2c49354b --- /dev/null +++ b/docs/cli/stack/es/helpers/dump-and-restore.md @@ -0,0 +1,85 @@ +--- +description: Export and re-ingest Elasticsearch indices with `elastic es helpers dump` and `bulk-ingest --source-format bulk-ndjson`. +applies_to: + stack: preview + serverless: preview +type: guide +--- + +# Dump and restore an index + +`elastic es helpers dump` exports one or more indices as bulk-format NDJSON +(`{"index":{...}}` + `_source` line pairs) using a per-index Point-in-Time +(PIT) and `search_after` sorted by `_shard_doc` for a consistent snapshot. +The output is shaped so it can be piped or passed directly into +`elastic es helpers bulk-ingest --source-format bulk-ndjson`. + +Typical use case: capture a remote index for local debugging. + +## Example: remote → local round trip + +```bash +# Export from the remote cluster, omit _index so the dump can be re-targeted, +# and filter by a Query DSL clause. +elastic --use-context remote es helpers dump \ + --indices my-prod-idx \ + --skip-index-name \ + --query '{"range":{"@timestamp":{"gte":"now-1h"}}}' \ + --output dump.ndjson + +# Re-ingest into the local cluster under a new index name. +elastic --use-context local es helpers bulk-ingest \ + --source-format bulk-ndjson \ + --index local-copy \ + --data-file dump.ndjson +``` + +Or pipe the two together (be aware that `--use-context` switches mid-pipe via +two separate processes, so each side reads its own active context): + +```bash +elastic --use-context remote es helpers dump --indices my-prod-idx --skip-index-name \ + | elastic --use-context local es helpers bulk-ingest --source-format bulk-ndjson --index local-copy +``` + +## `dump` options + +Run `elastic es helpers dump --help` for the full list. The most commonly used: + +| Option | Description | +|---|---| +| `--indices ` | Comma-separated list of indices to dump. Required. | +| `--size ` | Documents per search batch. Default `500`. | +| `--keep-alive ` | Point-in-time keep-alive. Default `1m`. | +| `--output ` | Write NDJSON to a file. Omit to stream to stdout. | +| `--skip-index-name` | Omit `_index` from action lines so the dump can be re-targeted at a different index downstream. | +| `--add-id` | Include `_id` in action lines so document IDs round-trip. | +| `--query ` | Query DSL clause as an inline JSON string. | +| `--query-file ` | Path to a file containing a Query DSL clause. Use `-` to read from stdin. | + +## Consistency model + +The dump opens a PIT per index and pages through it with `search_after` on +`_shard_doc`. The PIT keeps reads consistent against ongoing writes for the +duration of the dump. If the process is interrupted (`SIGINT`/`SIGTERM`), the +active PIT is closed and the output file is flushed before exit. + +If a single dump straddles multiple indices, each index gets its own PIT in +sequence — they are not snapshotted as a group. + +## `bulk-ingest --source-format bulk-ndjson` + +The companion mode streams pre-formatted action+doc line pairs verbatim into +the `_bulk` API. Behaviour differs from the default `ndjson` mode: + +- `--index` is **optional**. When omitted, requests go to `/_bulk` and the + action lines must carry `_index`. When provided, requests go to + `/{index}/_bulk` and `_index` in the action lines is overridden. +- Only `index` and `create` actions are accepted. `update` (which needs a + `{"doc": ...}` envelope) and `delete` (which has no paired document line) + would break the action+doc pair structure and are rejected at parse time. +- `--pipeline` and `--routing` are applied as URL query parameters so they + affect every action in the batch without rewriting the pre-formatted action + lines. +- `--flush-bytes`, `--concurrency`, `--retries`, and `--retry-delay` work + exactly as in the other source formats. diff --git a/src/es/helpers/bulk-ingest.ts b/src/es/helpers/bulk-ingest.ts index 40b9a4a2..d2c0551d 100644 --- a/src/es/helpers/bulk-ingest.ts +++ b/src/es/helpers/bulk-ingest.ts @@ -104,11 +104,18 @@ function defaultGlob (format: SourceFormat): string { return '**/*.{json,ndjson,jsonl}' } -const BULK_ACTIONS = new Set(['index', 'create', 'update', 'delete']) +// Only `index` and `create` are supported. The pair parser assumes every action +// is followed by a document line, which holds for both. `delete` actions carry +// no document line, and `update` actions require a `{"doc": ...}` envelope — +// neither shape is emitted by `dump` (the producer this format is designed for), +// so rejecting them keeps the parser simple and avoids silently corrupting input +// that does not match the expected pair structure. +const BULK_ACTIONS = new Set(['index', 'create']) /** * Parses raw text containing pre-formatted bulk action+doc line pairs. * Each pair is returned as an `"action\ndoc"` string ready to be joined into a `_bulk` body. + * Only `index` and `create` actions are supported (see `BULK_ACTIONS`). */ export function parseBulkNdjsonPairs (raw: string): string[] { const pairs: string[] = [] @@ -134,7 +141,7 @@ export function parseBulkNdjsonPairs (raw: string): string[] { } const keys = Object.keys(parsed as Record) if (keys.length !== 1 || !BULK_ACTIONS.has(keys[0]!)) { - throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected {"index"|"create"|"update"|"delete": ...}, got: ${trimmed.slice(0, 80)}`) + throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected {"index"|"create": ...}, got: ${trimmed.slice(0, 80)} (delete/update are not supported in pre-formatted bulk-ndjson because their doc shape differs)`) } action = trimmed } else { @@ -201,11 +208,21 @@ function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProce async function sendBatch ( transport: EsClient, ndjsonBody: string, - index: string | undefined + index: string | undefined, + qs?: { pipeline?: string | undefined, routing?: string | undefined } ): Promise<{ errors: number, total: number }> { const path = index != null ? `/${encodeURIComponent(index)}/_bulk` : '/_bulk' + const querystring: Record = {} + if (qs?.pipeline != null) querystring.pipeline = qs.pipeline + if (qs?.routing != null) querystring.routing = qs.routing const result = await transport.request( - { method: 'POST', path, body: ndjsonBody, bulkBody: ndjsonBody } + { + method: 'POST', + path, + body: ndjsonBody, + bulkBody: ndjsonBody, + ...(Object.keys(querystring).length > 0 && { querystring }), + } ) as { errors?: boolean, items?: Array> } let errorCount = 0 @@ -326,7 +343,10 @@ async function runBulkNdjson (opts: BulkIngestInput, transport: EsClient): Promi const body = batch.join('\n') + '\n' const result = await retryWithBackoff( async () => { - const res = await sendBatch(transport, body, opts.index) + // Apply pipeline/routing as URL query params so they affect every + // action in the batch (the action lines are pre-formatted and we do + // not rewrite them). + const res = await sendBatch(transport, body, opts.index, { pipeline: opts.pipeline, routing: opts.routing }) if (res.errors > 0 && res.errors === res.total) { throw new Error(`Bulk batch failed: ${res.errors}/${res.total} errors`) } diff --git a/src/es/helpers/dump.ts b/src/es/helpers/dump.ts index cc4512d5..c3630337 100644 --- a/src/es/helpers/dump.ts +++ b/src/es/helpers/dump.ts @@ -58,7 +58,9 @@ function resolveQuery (input: DumpInput): unknown { return JSON.parse(input.query) } if (input.query_file != null) { - const raw = readRawInput(input.query_file) + // `-` is the conventional shorthand for "read from stdin"; readRawInput() + // with no argument falls back to stdin. + const raw = input.query_file === '-' ? readRawInput() : readRawInput(input.query_file) if (raw == null || raw.trim().length === 0) { throw new Error('--query-file is empty') } @@ -148,12 +150,19 @@ async function dumpOneIndex (transport: EsClient, params: DumpIndexParams): Prom const hits = result.hits?.hits ?? [] if (hits.length === 0) break + // Build one buffer per page so we issue a single `write` (and therefore + // a single `writeSync`/`process.stdout.write`) per `size` documents + // instead of one per hit. Cuts syscalls from O(docs) to O(pages) on the + // hot path; on a multi-million-doc dump this is the difference between + // syscall-bound and network-bound. + const parts: string[] = [] for (const hit of hits) { const actionLine = addId ? actionPrefix + JSON.stringify(hit._id) + actionSuffix : actionPrefix - write(`${actionLine}\n${JSON.stringify(hit._source)}\n`) + parts.push(actionLine, '\n', JSON.stringify(hit._source), '\n') } + write(parts.join('')) total += hits.length if (result.pit_id != null) { diff --git a/test/es/helpers/bulk-ingest.test.ts b/test/es/helpers/bulk-ingest.test.ts index 9f83e661..31531f40 100644 --- a/test/es/helpers/bulk-ingest.test.ts +++ b/test/es/helpers/bulk-ingest.test.ts @@ -506,7 +506,7 @@ describe('bulk-ingest command', () => { assert.match(err.message as string, /even number/i) }) - it('errors when action line is not {"index|create|update|delete": ...}', async () => { + it('errors when action line is not {"index|create": ...}', async () => { const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) const filePath = join(tmpDir, 'data.ndjson') writeFileSync(filePath, '{"foo":{}}\n{"v":1}\n') @@ -523,6 +523,47 @@ describe('bulk-ingest command', () => { assert.equal(err.code, 'input_error') }) + it('passes --pipeline and --routing as URL query params on _bulk', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"index":{}}\n{"v":1}\n') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'target', + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--pipeline', 'my-pipe', + '--routing', 'shard-1', + '--json' + ], makeDeps(transport)) + + const qs = requests[0]!.params.querystring as Record + assert.equal(qs.pipeline, 'my-pipe') + assert.equal(qs.routing, 'shard-1') + }) + + it('rejects update and delete actions (only index/create are paired with a doc line)', async () => { + for (const bad of ['{"update":{"_id":"a"}}\n{"doc":{"v":1}}\n', '{"delete":{"_id":"a"}}\n{"v":1}\n']) { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, bad) + + const { transport } = mockTransport([]) + + const result = await runCommand([ + '--data-file', filePath, + '--source-format', 'bulk-ndjson', + '--json' + ], makeDeps(transport)) as Record + + const err = result.error as Record + assert.equal(err.code, 'input_error', `expected error for ${bad}`) + assert.match(err.message as string, /index"\|"create/i) + } + }) + it('reads bulk-ndjson from --data-dir with multiple files', async () => { const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-ndjson-dir-')) writeFileSync(join(tmpDir, 'a.ndjson'), '{"index":{}}\n{"v":1}\n') From f74048dfbfb3dcc4c2b3dedd77a021ebd04eaf4a Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Fri, 19 Jun 2026 12:56:13 +0100 Subject: [PATCH 8/9] docs: move dump/restore guide out of auto-generated cli/ tree --- README.md | 4 ++-- docs/docset.yml | 1 + docs/{cli/stack/es/helpers => }/dump-and-restore.md | 0 3 files changed, 3 insertions(+), 2 deletions(-) rename docs/{cli/stack/es/helpers => }/dump-and-restore.md (100%) diff --git a/README.md b/README.md index f342600a..04f82c5e 100644 --- a/README.md +++ b/README.md @@ -383,8 +383,8 @@ elastic --use-context local es helpers bulk-ingest \ --source-format bulk-ndjson --index local-copy --data-file dump.ndjson ``` -See [the dump-and-restore guide](docs/cli/stack/es/helpers/dump-and-restore.md) -for flags, consistency semantics, and a deeper walkthrough. +See [the dump-and-restore guide](docs/dump-and-restore.md) for flags, +consistency semantics, and a deeper walkthrough. #### `kb` - Kibana API diff --git a/docs/docset.yml b/docs/docset.yml index 55b1868a..ad689861 100644 --- a/docs/docset.yml +++ b/docs/docset.yml @@ -6,6 +6,7 @@ toc: children: - file: cli/installation.md - file: cli/configuration.md + - file: dump-and-restore.md - cli: cli/schema.json folder: cli title: Elastic CLI command reference diff --git a/docs/cli/stack/es/helpers/dump-and-restore.md b/docs/dump-and-restore.md similarity index 100% rename from docs/cli/stack/es/helpers/dump-and-restore.md rename to docs/dump-and-restore.md From 103704371b51aa5f629f0ff6f2ee7a8998d614d4 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Fri, 19 Jun 2026 13:20:22 +0100 Subject: [PATCH 9/9] docs: fix Vale warnings in dump-and-restore guide --- docs/dump-and-restore.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dump-and-restore.md b/docs/dump-and-restore.md index 2c49354b..b1444f9d 100644 --- a/docs/dump-and-restore.md +++ b/docs/dump-and-restore.md @@ -16,7 +16,7 @@ The output is shaped so it can be piped or passed directly into Typical use case: capture a remote index for local debugging. -## Example: remote → local round trip +## Example: Remote → local round trip ```bash # Export from the remote cluster, omit _index so the dump can be re-targeted, @@ -70,7 +70,7 @@ sequence — they are not snapshotted as a group. ## `bulk-ingest --source-format bulk-ndjson` The companion mode streams pre-formatted action+doc line pairs verbatim into -the `_bulk` API. Behaviour differs from the default `ndjson` mode: +the `_bulk` API. Behavior differs from the default `ndjson` mode: - `--index` is **optional**. When omitted, requests go to `/_bulk` and the action lines must carry `_index`. When provided, requests go to