Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,23 @@ elastic es update --index my-index --id abc123

Run `elastic es <command> --help` for all available options on any command.

##### Helpers: dump and restore

`elastic es helpers dump` exports indices as bulk-format NDJSON using a
Point-in-Time + `search_after`, and `elastic es helpers bulk-ingest
--source-format bulk-ndjson` re-ingests that output verbatim. Typical use
case: capture a remote index for local debugging.

```bash
elastic --use-context remote es helpers dump --indices my-prod-idx \
--skip-index-name --output dump.ndjson
elastic --use-context local es helpers bulk-ingest \
--source-format bulk-ndjson --index local-copy --data-file dump.ndjson
```

See [the dump-and-restore guide](docs/dump-and-restore.md) for flags,
consistency semantics, and a deeper walkthrough.

#### `kb` - Kibana API

Run Kibana API calls. Commands are organised by namespace (e.g. `data-views`,
Expand Down
1 change: 1 addition & 0 deletions docs/docset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toc:
children:
- file: cli/installation.md
- file: cli/configuration.md
- file: dump-and-restore.md
- cli: cli/schema.json
folder: cli
title: Elastic CLI command reference
Expand Down
85 changes: 85 additions & 0 deletions docs/dump-and-restore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
description: Export and re-ingest Elasticsearch indices with `elastic es helpers dump` and `bulk-ingest --source-format bulk-ndjson`.
applies_to:
stack: preview
serverless: preview
type: guide
---

# Dump and restore an index

`elastic es helpers dump` exports one or more indices as bulk-format NDJSON
(`{"index":{...}}` + `_source` line pairs) using a per-index Point-in-Time
(PIT) and `search_after` sorted by `_shard_doc` for a consistent snapshot.
The output is shaped so it can be piped or passed directly into
`elastic es helpers bulk-ingest --source-format bulk-ndjson`.

Typical use case: capture a remote index for local debugging.

## Example: Remote → local round trip

```bash
# Export from the remote cluster, omit _index so the dump can be re-targeted,
# and filter by a Query DSL clause.
elastic --use-context remote es helpers dump \
--indices my-prod-idx \
--skip-index-name \
--query '{"range":{"@timestamp":{"gte":"now-1h"}}}' \
--output dump.ndjson

# Re-ingest into the local cluster under a new index name.
elastic --use-context local es helpers bulk-ingest \
--source-format bulk-ndjson \
--index local-copy \
--data-file dump.ndjson
```

Or pipe the two together (be aware that `--use-context` switches mid-pipe via
two separate processes, so each side reads its own active context):

```bash
elastic --use-context remote es helpers dump --indices my-prod-idx --skip-index-name \
| elastic --use-context local es helpers bulk-ingest --source-format bulk-ndjson --index local-copy
```

## `dump` options

Run `elastic es helpers dump --help` for the full list. The most commonly used:

| Option | Description |
|---|---|
| `--indices <list>` | Comma-separated list of indices to dump. Required. |
| `--size <n>` | Documents per search batch. Default `500`. |
| `--keep-alive <duration>` | Point-in-time keep-alive. Default `1m`. |
| `--output <path>` | Write NDJSON to a file. Omit to stream to stdout. |
| `--skip-index-name` | Omit `_index` from action lines so the dump can be re-targeted at a different index downstream. |
| `--add-id` | Include `_id` in action lines so document IDs round-trip. |
| `--query <json>` | Query DSL clause as an inline JSON string. |
| `--query-file <path>` | Path to a file containing a Query DSL clause. Use `-` to read from stdin. |

## Consistency model

The dump opens a PIT per index and pages through it with `search_after` on
`_shard_doc`. The PIT keeps reads consistent against ongoing writes for the
duration of the dump. If the process is interrupted (`SIGINT`/`SIGTERM`), the
active PIT is closed and the output file is flushed before exit.

If a single dump straddles multiple indices, each index gets its own PIT in
sequence — they are not snapshotted as a group.

## `bulk-ingest --source-format bulk-ndjson`

The companion mode streams pre-formatted action+doc line pairs verbatim into
the `_bulk` API. Behavior differs from the default `ndjson` mode:

- `--index` is **optional**. When omitted, requests go to `/_bulk` and the
action lines must carry `_index`. When provided, requests go to
`/{index}/_bulk` and `_index` in the action lines is overridden.
- Only `index` and `create` actions are accepted. `update` (which needs a
`{"doc": ...}` envelope) and `delete` (which has no paired document line)
would break the action+doc pair structure and are rejected at parse time.
- `--pipeline` and `--routing` are applied as URL query parameters so they
affect every action in the batch without rewriting the pre-formatted action
lines.
- `--flush-bytes`, `--concurrency`, `--retries`, and `--retry-delay` work
exactly as in the other source formats.
187 changes: 161 additions & 26 deletions src/es/helpers/bulk-ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ export interface BulkIngestDeps {

const defaultDeps: BulkIngestDeps = { getEsClient }

const SOURCE_FORMATS = ['ndjson', 'json', 'csv'] as const
const SOURCE_FORMATS = ['ndjson', 'json', 'csv', 'bulk-ndjson'] as const
type SourceFormat = typeof SOURCE_FORMATS[number]

const inputSchema = z.object({
index: z.string().describe('Target index'),
data_file: z.string().optional().describe('Path to data file (NDJSON, JSON array, or CSV)'),
index: z.string().optional().describe('Target index (required unless --source-format bulk-ndjson, in which case action lines may carry _index)'),
data_file: z.string().optional().describe('Path to data file (NDJSON, JSON array, CSV, or pre-formatted bulk NDJSON)'),
data_dir: z.string().optional().describe('Path to directory of data files to ingest'),
glob: z.string().optional().describe('Glob pattern for --data-dir file matching (default: **/*.json, or **/*.csv when --source-format csv)'),
no_recursive: z.boolean().optional().describe('Do not recurse into subdirectories when using --data-dir'),
source_format: z.enum(SOURCE_FORMATS).default('ndjson').describe('Input file format: ndjson, json, or csv'),
source_format: z.enum(SOURCE_FORMATS).default('ndjson').describe('Input file format: ndjson (one doc per line), json (JSON array or one doc per line), csv, or bulk-ndjson (already-formatted action+doc line pairs, as produced by `dump`)'),
csv_delimiter: z.string().optional().describe('CSV column delimiter (default: ",")'),
csv_columns: z.string().optional().describe('Comma-separated list of column names (overrides CSV header row)'),
skip_header: z.boolean().optional().describe('Skip the first row of a CSV file'),
Expand All @@ -52,23 +52,29 @@ const inputSchema = z.object({
type BulkIngestInput = z.infer<typeof inputSchema>

/**
* Splits an array of documents into batches where each batch's serialized
* size does not exceed the byte threshold.
* Splits items into batches where each batch's serialized size does not exceed
* `flushBytes`. `sizeOf` defaults to the JSON-serialised byte length plus a
* trailing newline; pass a custom callback for items already represented as
* strings (e.g. pre-formatted bulk pairs).
*/
function splitIntoBatches (docs: unknown[], flushBytes: number): unknown[][] {
const batches: unknown[][] = []
let currentBatch: unknown[] = []
function splitIntoBatches<T> (
items: T[],
flushBytes: number,
sizeOf: (item: T) => number = (i) => JSON.stringify(i).length + 1,
): T[][] {
const batches: T[][] = []
let currentBatch: T[] = []
let currentSize = 0

for (const doc of docs) {
const docSize = JSON.stringify(doc).length + 1 // +1 for newline
if (currentBatch.length > 0 && currentSize + docSize > flushBytes) {
for (const item of items) {
const itemSize = sizeOf(item)
if (currentBatch.length > 0 && currentSize + itemSize > flushBytes) {
batches.push(currentBatch)
currentBatch = []
currentSize = 0
}
currentBatch.push(doc)
currentSize += docSize
currentBatch.push(item)
currentSize += itemSize
}
if (currentBatch.length > 0) {
batches.push(currentBatch)
Expand All @@ -94,11 +100,67 @@ function parseByFormat (raw: string, opts: BulkIngestInput): unknown[] {
/** Returns the default glob pattern for the given source format. */
function defaultGlob (format: SourceFormat): string {
if (format === 'csv') return '**/*.csv'
if (format === 'bulk-ndjson') return '**/*.ndjson'
return '**/*.{json,ndjson,jsonl}'
}

/** Collects documents from the resolved input source. */
function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProcessed: number } {
// Only `index` and `create` are supported. The pair parser assumes every action
// is followed by a document line, which holds for both. `delete` actions carry
// no document line, and `update` actions require a `{"doc": ...}` envelope —
// neither shape is emitted by `dump` (the producer this format is designed for),
// so rejecting them keeps the parser simple and avoids silently corrupting input
// that does not match the expected pair structure.
const BULK_ACTIONS = new Set(['index', 'create'])

/**
* Parses raw text containing pre-formatted bulk action+doc line pairs.
* Each pair is returned as an `"action\ndoc"` string ready to be joined into a `_bulk` body.
* Only `index` and `create` actions are supported (see `BULK_ACTIONS`).
*/
export function parseBulkNdjsonPairs (raw: string): string[] {
const pairs: string[] = []
let action: string | undefined
let lineNum = 0
let nonEmptyCount = 0

for (const line of raw.split('\n')) {
lineNum++
const trimmed = line.trim()
if (trimmed.length === 0) continue
nonEmptyCount++
Comment on lines +121 to +130

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "every other line" loop scheme appears to overlook the fact that a delete action won't have a paired document on the following line. Also, update actions require documents to be wrapped in a {"doc": ...} envelope.

Since this is ingest path will always come use the output of a dump command, maybe BULK_ACTIONS needs to just be reduced to index and create, or just index?


if (action == null) {
let parsed: unknown
try {
parsed = JSON.parse(trimmed)
} catch (err) {
throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: ${err instanceof Error ? err.message : String(err)}`, { cause: err })
}
if (parsed == null || typeof parsed !== 'object' || Array.isArray(parsed)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit, probably for later, but it would be a nice improvement to standardize on using node:assert for all input validation of this sort (basically anywhere Zod or JSON schemas are not used for validation).

throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected an object`)
}
const keys = Object.keys(parsed as Record<string, unknown>)
if (keys.length !== 1 || !BULK_ACTIONS.has(keys[0]!)) {
throw new Error(`bulk-ndjson: invalid action line at line ${lineNum}: expected {"index"|"create": ...}, got: ${trimmed.slice(0, 80)} (delete/update are not supported in pre-formatted bulk-ndjson because their doc shape differs)`)
}
action = trimmed
} else {
pairs.push(`${action}\n${trimmed}`)
action = undefined
}
}

if (action != null) {
throw new Error(`bulk-ndjson: expected an even number of non-empty lines (action + doc pairs), got ${nonEmptyCount}`)
}
return pairs
}

/**
* Resolves the active input source (file, glob'd directory, or stdin) and returns
* one raw text chunk per source. `filesProcessed` is 0 when reading from stdin.
*/
function resolveRawInputs (opts: BulkIngestInput): { raws: string[], filesProcessed: number } {
const { data_file, data_dir } = opts

if (data_file != null && data_dir != null) {
Expand All @@ -113,39 +175,54 @@ function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProce
if (files.length === 0) {
throw new Error(`No files matched pattern "${resolvedPattern}" in ${data_dir}`)
}
const allDocs: unknown[] = []
for (const file of files) {
const raw = readFileSync(file, 'utf-8')
allDocs.push(...parseByFormat(raw, opts))
return {
raws: files.map((f) => readFileSync(f, 'utf-8')),
filesProcessed: files.length,
}
return { docs: allDocs, filesProcessed: files.length }
}

if (data_file != null) {
const raw = readRawInput(data_file)
if (raw == null || raw.trim().length === 0) {
throw new Error('No input data received from file')
}
return { docs: parseByFormat(raw, opts), filesProcessed: 1 }
return { raws: [raw], filesProcessed: 1 }
}

// Fall back to stdin
const raw = readRawInput()
if (raw == null || raw.trim().length === 0) {
throw new Error('No input provided. Use --data-file, --data-dir, or pipe data to stdin')
}
return { docs: parseByFormat(raw, opts), filesProcessed: 0 }
return { raws: [raw], filesProcessed: 0 }
}

/** Collects documents from the resolved input source. */
function collectDocuments (opts: BulkIngestInput): { docs: unknown[], filesProcessed: number } {
const { raws, filesProcessed } = resolveRawInputs(opts)
const docs: unknown[] = []
for (const raw of raws) docs.push(...parseByFormat(raw, opts))
return { docs, filesProcessed }
}

/** Sends a single bulk batch to Elasticsearch. Returns the count of errors. */
async function sendBatch (
transport: EsClient,
ndjsonBody: string,
index: string
index: string | undefined,
qs?: { pipeline?: string | undefined, routing?: string | undefined }
): Promise<{ errors: number, total: number }> {
const path = index != null ? `/${encodeURIComponent(index)}/_bulk` : '/_bulk'
const querystring: Record<string, string> = {}
if (qs?.pipeline != null) querystring.pipeline = qs.pipeline
if (qs?.routing != null) querystring.routing = qs.routing
const result = await transport.request(
{ method: 'POST', path, body: ndjsonBody, bulkBody: ndjsonBody }
{
method: 'POST',
path,
body: ndjsonBody,
bulkBody: ndjsonBody,
...(Object.keys(querystring).length > 0 && { querystring }),
}
) as { errors?: boolean, items?: Array<Record<string, { status?: number }>> }

let errorCount = 0
Expand All @@ -165,13 +242,21 @@ function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) {
return async (parsed: { input?: BulkIngestInput; options: Record<string, string | number | boolean> }): Promise<JsonValue> => {
const opts = parsed.input!

if (opts.source_format !== 'bulk-ndjson' && opts.index == null) {
return { error: { code: 'input_error', message: '--index is required (omit only when --source-format bulk-ndjson)' } }
}

let transport: EsClient
try {
transport = deps.getEsClient()
} catch (err) {
return missingConfigError(err)
}

if (opts.source_format === 'bulk-ndjson') {
return runBulkNdjson(opts, transport)
}

let docs: unknown[]
let filesProcessed: number
try {
Expand Down Expand Up @@ -229,6 +314,56 @@ function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) {
}
}

/**
* Ingests pre-formatted bulk NDJSON (action + document line pairs, as produced by `dump`).
* Reuses retry, concurrency, and progress reporting from the main flow; the only difference
* is that the input is already bulk-shaped, so each pair is sent through verbatim.
*/
async function runBulkNdjson (opts: BulkIngestInput, transport: EsClient): Promise<JsonValue> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opts.pipeline and opts.routing are ignored in this function. Should they be handled here, or do we expect the bulk-ndjson format to never include those?

let pairs: string[]
let filesProcessed: number
try {
const resolved = resolveRawInputs(opts)
pairs = resolved.raws.flatMap(parseBulkNdjsonPairs)
filesProcessed = resolved.filesProcessed
} catch (err) {
return { error: { code: 'input_error', message: err instanceof Error ? err.message : String(err) } }
}

if (pairs.length === 0) {
return { total: 0, succeeded: 0, failed: 0, retries: 0, elapsed_ms: 0 }
}

const batches = splitIntoBatches(pairs, opts.flush_bytes, (p) => p.length + 1)
const reporter = new ProgressReporter()
reporter.filesProcessed = filesProcessed

try {
await runWithConcurrency(batches, opts.concurrency, async (batch) => {
const body = batch.join('\n') + '\n'
const result = await retryWithBackoff(
async () => {
// Apply pipeline/routing as URL query params so they affect every
// action in the batch (the action lines are pre-formatted and we do
// not rewrite them).
const res = await sendBatch(transport, body, opts.index, { pipeline: opts.pipeline, routing: opts.routing })
if (res.errors > 0 && res.errors === res.total) {
throw new Error(`Bulk batch failed: ${res.errors}/${res.total} errors`)
}
return res
},
{ retries: opts.retries, delay: opts.retry_delay }
)
reporter.report(result.total, result.errors)
return result
})
} catch (err) {
return transportError(err)
}

return reporter.summary()
}

export function createBulkIngestCommand (deps?: BulkIngestDeps): OpaqueCommandHandle {
return defineCommand({
name: 'bulk-ingest',
Expand Down
Loading