Skip to content

perf(es): bulk-ingest buffers all input into memory; OOMs on large dumps #443

Description

@MattDevy

Symptom

elastic es helpers bulk-ingest buffers the entire input set into memory before sending the first batch. Large dumps (100 MB+) cause heap bloat; multi-GB dumps OOM.

This is asymmetric with elastic es helpers dump, which streams output one PIT page at a time and is bounded by --size. Round-tripping a large index through dump | bulk-ingest therefore works on the export side but breaks on import.

Where it happens

src/es/helpers/bulk-ingest.ts:

  • resolveRawInputs (current lines 264-291) reads every input source fully into memory:
    • data_file: readFileSync(path, 'utf-8') → entire file as one string.
    • data_dir: files.map(f => readFileSync(f, 'utf-8')) → every matched file held simultaneously.
    • stdin: readFileSync(0, 'utf-8') → entire stdin drained into one string.
  • collectDocuments then parses everything into unknown[] (default modes) or string[] of action+doc pairs (bulk-ndjson mode).
  • splitIntoBatches slices the in-memory array into batches by byte size.

Rough cost on a typical ndjson dump: ~1M × 200-byte docs ≈ 200 MB raw + ~400 MB parsed-object bloat → 600 MB+ heap. 10M docs OOMs on an 8 GB box.

This is pre-existing behaviour; the bulk-ndjson source format added in #415 inherited the same memory model from collectDocuments / splitIntoBatches.

Suggested fix

Replace resolveRawInputs with a streaming reader that yields lines and accumulates them into a flush buffer:

// Sketch
async function streamBatches(opts, onBatch) {
  let buf = ''
  let bufBytes = 0
  for await (const source of openSources(opts)) {
    const rl = readline.createInterface({ input: source, crlfDelay: Infinity })
    for await (const line of rl) {
      if (line.length === 0) continue
      buf += line + '\n'
      bufBytes += line.length + 1
      if (bufBytes >= opts.flush_bytes) {
        await onBatch(buf)
        buf = ''
        bufBytes = 0
      }
    }
  }
  if (bufBytes > 0) await onBatch(buf)
}

Considerations:

  • For bulk-ndjson, the buffer flush must land on a pair boundary (every 2 non-empty lines), not mid-pair. Keep a small ringbuffer of one pending action line.
  • For json mode (one raw doc per line), flush_bytes should account for the synthesized action line per doc.
  • For CSV, switch from csv-parse/sync to csv-parse (streaming variant) — header inference still works, the parser emits records as they're read.
  • data_dir: iterate files sequentially with a single concurrency-bounded queue rather than reading all of them.
  • Concurrency interaction: with N in-flight batches, peak memory is flush_bytes * concurrency. Default 5 MiB × 5 = 25 MiB — bounded and independent of input size.
  • --data-file - already routes to stdin via readRawInput; process.stdin is already a Readable, so wrap it in readline rather than draining.

Acceptance

  • bulk-ingest --source-format bulk-ndjson ingests a 1 GB dump file without exceeding ~50 MiB heap (measure with node --max-old-space-size=128).
  • All four source formats (ndjson, json, csv, bulk-ndjson) stream the same way.
  • Existing retry / concurrency / progress-reporter wrappers work unchanged — only the input acquisition changes.

Background

Surfaced during review of #415 (dump/restore helpers). Dump side already streams; ingest side does not. Filed as a follow-up because the fix is broader than that PR and touches all source formats, not just bulk-ndjson.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions