Skip to content

Wire v2 Arrow pipeline: config → scan → DataFusion → output#1

Merged
strawgate merged 3 commits into
masterfrom
v2-arrow-pipeline
Mar 29, 2026
Merged

Wire v2 Arrow pipeline: config → scan → DataFusion → output#1
strawgate merged 3 commits into
masterfrom
v2-arrow-pipeline

Conversation

@strawgate
Copy link
Copy Markdown
Owner

Summary

  • Connects all v2 Arrow pipeline components into a working pipeline driven by YAML config
  • New pipeline_v2.rs: Pipeline::from_config() + run() main loop wiring file tailing → JSON/CRI parsing → Scanner → Arrow RecordBatch → DataFusion SQL → OutputSink
  • New CLI flags: --config, --validate, --dry-run for v2 mode; --blackhole for fake OTLP collector
  • Fix: strip type suffixes in scan_config() so field pushdown works with typed SQL names (level_strlevel)
  • Fix: handle empty RecordBatches in batch_builder.rs without panic

Performance (5M lines × 176 bytes × 7 JSON fields, single core)

JSON stdout output:
  SELECT * (passthrough)              2,907,415 l/s   343 ns/l   487 MB/s
  WHERE level != DEBUG (25% drop)     2,702,530 l/s   370 ns/l   453 MB/s
  WHERE level = ERROR (75% drop)      3,060,090 l/s   326 ns/l   513 MB/s
  SELECT 3 cols WHERE dur > 200       3,837,732 l/s   260 ns/l   643 MB/s

OTLP protobuf → blackhole collector:
  SELECT * → OTLP (no compress)       2,643,890 l/s   378 ns/l   443 MB/s
  WHERE != DEBUG → OTLP               2,919,754 l/s   342 ns/l   489 MB/s
  SELECT * → OTLP + zstd              3,070,802 l/s   325 ns/l   515 MB/s
  WHERE != DEBUG → OTLP + zstd        2,858,889 l/s   349 ns/l   479 MB/s

Test plan

  • All 107 tests pass (96 existing + 11 new pipeline_v2 tests)
  • --config config.yaml --validate validates and exits
  • --config config.yaml --dry-run constructs pipeline and exits
  • --config config.yaml processes JSON file with SELECT * passthrough
  • WHERE filtering produces correct row counts (25K/100K for ERROR-only)
  • Column projection outputs only selected fields
  • Type preservation: integers stay integers in JSON output
  • OTLP output to blackhole collector works end-to-end
  • Diagnostics endpoints (/health, /metrics, /api/pipelines) report correct stats

🤖 Generated with Claude Code

strawgate and others added 3 commits March 28, 2026 20:59
…g, diagnostics

Architecture: everything goes through Arrow RecordBatches.
- scanner.rs + batch_builder.rs: JSON→Arrow with typed columns (field_str, field_int, field_float)
- transform.rs: DataFusion SQL engine with int()/float() UDFs, plan caching, schema evolution
- output.rs: OutputSink trait with Stdout/JSON/OTLP/FanOut implementations
- config.rs: YAML config parser supporting simple and multi-pipeline configs
- diagnostics.rs: HTTP server with /health, /metrics (Prometheus), /api/pipelines, embedded dashboard
- dashboard.html: Live pipeline visualizer polling /api/pipelines

Also includes:
- docs/: Architecture docs, DataFusion research findings, evolution plan
- TODO.md: Detailed handoff for next agent (wire pipeline, SQL rewriter, benchmarks)
- deploy/: K8s DaemonSet manifest for VictoriaMetrics benchmark
- Dockerfile, README.md, DEVELOPING.md

96 tests passing. v1 pipeline preserved for backwards compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
docs/ARCHITECTURE.md — pipeline model, config schema, data flow, performance
docs/SCANNER_AND_TRANSFORM_DESIGN.md — typed columns, SQL rewriter spec, buffer lifecycle
docs/RESEARCH_BENCHMARKS.md — measured DataFusion costs, dead ends, v1/v2 baselines

Deleted: 5 superseded drafts, empty file, duplicate plan

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Connects all v2 components into a working pipeline driven by YAML config.

- pipeline_v2.rs: Pipeline struct with from_config() and run() main loop
  File tailing → JSON/CRI parsing → Scanner → Arrow RecordBatch →
  DataFusion SQL transform → OutputSink (stdout/OTLP/HTTP)
- main.rs: --config, --validate, --dry-run flags for v2 mode
  --blackhole flag for fake OTLP collector (benchmarking)
- transform.rs: strip type suffixes in scan_config() so field pushdown
  works with typed SQL column names (level_str → level)
- batch_builder.rs: handle empty batches without panic
- output.rs: CaptureSink test helper

Benchmarked at 2.6-3.8M lines/sec/core on 5M lines × 7 fields:
  SELECT * passthrough:         2.9M l/s  343 ns/l
  WHERE filter (25% drop):     2.7M l/s  370 ns/l
  Column projection + filter:  3.8M l/s  260 ns/l
  SELECT * → OTLP + zstd:     3.1M l/s  325 ns/l

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@strawgate strawgate merged commit 8408421 into master Mar 29, 2026
strawgate added a commit that referenced this pull request Apr 11, 2026
Systematic audit found 30 potential issues from agent-authored code.
After parallel verification by 11 subagents, 13 were confirmed real,
8 were by-design, 5 were won't-fix, and 5 were false positives.
This commit fixes all 12 actionable confirmed findings.

**High severity:**
- Fix OtherStr panic: OTLP sink crashed on non-string attribute types
  (e.g., hash() UDF returning UInt64). Replaced unreachable!() with
  array_value_to_string(). Removed dead str_value() function. (#7)
- Fix silent struct drop: non-conflict Struct columns now log a warning
  before being skipped, matching the resource struct behavior. (#6)

**Medium severity:**
- Fix scanner contract drift: SCANNER_CONTRACT.md said "no escape
  decoding" but implementation decodes since PR #885. Updated doc. (#19)
- Deduplicate calendar math: made core's Kani-verified days_from_civil
  public; arrow's wrapper now delegates instead of reimplementing. (#21)
- Centralize metadata keys: added METADATA_RESOURCE_KEY and
  METADATA_RESOURCE_PREFIX constants to field_names.rs, replacing 15
  bare string literals across 4 files / 3 crates. (#15)
- Add TypedColumn::Bytes variant: OTAP bytes attributes now round-trip
  as BinaryArray instead of being hex-encoded to strings. (#16)

**Low severity:**
- Deduplicate WELL_KNOWN arrays: star_schema.rs now delegates to
  field_names::matches_any() instead of maintaining a local copy. Added
  logfwd-types dependency to logfwd-arrow. (#13)
- Centralize _raw column name: added field_names::RAW constant. (#12)
- Extract MAX_REQUEST_BODY_SIZE: shared constant in receiver_http.rs
  replaces 3 independent definitions. (#27)
- Import DEFAULT_RETRY_AFTER_SECS: otap_sink and arrow_ipc_sink now
  import from http_classify instead of redefining. (#29)
- Name timing defaults: pipeline build.rs and input_build.rs now use
  named constants instead of inline unwrap_or literals. (#30)
- Add timestamp diagnostic: tracing::debug!() on timestamp parse
  fallback for operator visibility. (#1)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@strawgate strawgate deleted the v2-arrow-pipeline branch April 12, 2026 06:20
strawgate added a commit that referenced this pull request Apr 14, 2026
- Fix pending_acks count in stalled batch scenario: after ACKing #3,
  only #1 and #3 are in pending_acks (2, not 3) since #2 hasn't been
  ACKed yet at that step.

- Remove 'queued' from checkpoint-blocking states. Per the TLA+ spec,
  only sent batches (via begin_send) participate in ordered-ack.
  Queued batches that were never sent are invisible to the checkpoint
  machine and should not block advancement.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
strawgate added a commit that referenced this pull request Apr 21, 2026
…ile BytesMut

This patch addresses GitHub issue #2468 by fulfilling #2421 and #2423.
Specifically, it does two things:
1. Removes the legacy `json_prefix` logic from `logfwd-core/src/cri.rs`
and related functions since the Sidecar column path is now the only
CRI metadata path.
2. Modifies the file tailer in `logfwd-io/src/tail/` to read directly
into per-file `BytesMut` buffers instead of allocating `Vec<u8>` on
every poll, advancing towards zero-copy framing (eliminating Copy #1).

All tests compile and pass without regressions. Type mismatches from earlier were corrected successfully.

Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 21, 2026
…ile BytesMut

This patch addresses GitHub issue #2468 by fulfilling #2421 and #2423.
Specifically, it does two things:
1. Removes the legacy `json_prefix` logic from `logfwd-core/src/cri.rs`
and related functions since the Sidecar column path is now the only
CRI metadata path.
2. Modifies the file tailer in `logfwd-io/src/tail/` to read directly
into per-file `BytesMut` buffers instead of allocating `Vec<u8>` on
every poll, advancing towards zero-copy framing (eliminating Copy #1).

All tests compile and pass without regressions. Type mismatches from earlier were corrected successfully.

Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 21, 2026
…ile BytesMut

This patch addresses GitHub issue #2468 by fulfilling #2421 and #2423.
Specifically, it does two things:
1. Removes the legacy `json_prefix` logic from `logfwd-core/src/cri.rs`
and related functions since the Sidecar column path is now the only
CRI metadata path.
2. Modifies the file tailer in `logfwd-io/src/tail/` to read directly
into per-file `BytesMut` buffers instead of allocating `Vec<u8>` on
every poll, advancing towards zero-copy framing (eliminating Copy #1).

All tests compile and pass without regressions. Type mismatches from earlier were corrected successfully.

Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 29, 2026
Add comprehensive verification to prevent the class of bugs found during
bug hunting (path mismatch, PID race, inotify breakage):

TLA+ SupervisorProtocol.tla:
- Models OpAMP client → supervisor → child process interaction
- PathConsistency invariant: write path == read path (prevents #1 bug)
- NeverSignalDeadChild: PID generation tracking (prevents #3 bug)
- ConfigMonotonic, OnlyValidatedConfigsReachMain: safety gates
- ConfigEventuallyApplied, CrashEventuallyRecovered: liveness

New proptests (supervisor.rs):
- path_contract_opamp_writes_where_supervisor_reads
- supervisor_intermediate_differs_from_main
- atomic_write_never_produces_partial_content
- supervisor_state_machine_always_valid (8 events, 50 steps)
- happy_path_returns_to_idle
- invalid_config_never_signals
- dead_child_never_gets_signaled
- filename_filter_matches_target

New proptests (ffwd-opamp integration):
- remote_config_path_is_yaml
- remote_config_path_none_never_panics
- remote_config_path_never_collides_with_main

New proptests (ffwd-config diff):
- reloadable_iff_server_unchanged
- total_count_equals_union_cardinality
- diff_is_deterministic

Also:
- Increase bootstrap reload channel capacity from 1 to 4
- Document TLA+ specs in tla/README.md

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 29, 2026
Add comprehensive verification to prevent the class of bugs found during
bug hunting (path mismatch, PID race, inotify breakage):

TLA+ SupervisorProtocol.tla:
- Models OpAMP client → supervisor → child process interaction
- PathConsistency invariant: write path == read path (prevents #1 bug)
- NeverSignalDeadChild: PID generation tracking (prevents #3 bug)
- ConfigMonotonic, OnlyValidatedConfigsReachMain: safety gates
- ConfigEventuallyApplied, CrashEventuallyRecovered: liveness

New proptests (supervisor.rs):
- path_contract_opamp_writes_where_supervisor_reads
- supervisor_intermediate_differs_from_main
- atomic_write_never_produces_partial_content
- supervisor_state_machine_always_valid (8 events, 50 steps)
- happy_path_returns_to_idle
- invalid_config_never_signals
- dead_child_never_gets_signaled
- filename_filter_matches_target

New proptests (ffwd-opamp integration):
- remote_config_path_is_yaml
- remote_config_path_none_never_panics
- remote_config_path_never_collides_with_main

New proptests (ffwd-config diff):
- reloadable_iff_server_unchanged
- total_count_equals_union_cardinality
- diff_is_deterministic

Also:
- Increase bootstrap reload channel capacity from 1 to 4
- Document TLA+ specs in tla/README.md

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 29, 2026
Add comprehensive verification to prevent the class of bugs found during
bug hunting (path mismatch, PID race, inotify breakage):

TLA+ SupervisorProtocol.tla:
- Models OpAMP client → supervisor → child process interaction
- PathConsistency invariant: write path == read path (prevents #1 bug)
- NeverSignalDeadChild: PID generation tracking (prevents #3 bug)
- ConfigMonotonic, OnlyValidatedConfigsReachMain: safety gates
- ConfigEventuallyApplied, CrashEventuallyRecovered: liveness

New proptests (supervisor.rs):
- path_contract_opamp_writes_where_supervisor_reads
- supervisor_intermediate_differs_from_main
- atomic_write_never_produces_partial_content
- supervisor_state_machine_always_valid (8 events, 50 steps)
- happy_path_returns_to_idle
- invalid_config_never_signals
- dead_child_never_gets_signaled
- filename_filter_matches_target

New proptests (ffwd-opamp integration):
- remote_config_path_is_yaml
- remote_config_path_none_never_panics
- remote_config_path_never_collides_with_main

New proptests (ffwd-config diff):
- reloadable_iff_server_unchanged
- total_count_equals_union_cardinality
- diff_is_deterministic

Also:
- Increase bootstrap reload channel capacity from 1 to 4
- Document TLA+ specs in tla/README.md

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
strawgate added a commit that referenced this pull request Apr 29, 2026
Add comprehensive verification to prevent the class of bugs found during
bug hunting (path mismatch, PID race, inotify breakage):

TLA+ SupervisorProtocol.tla:
- Models OpAMP client → supervisor → child process interaction
- PathConsistency invariant: write path == read path (prevents #1 bug)
- NeverSignalDeadChild: PID generation tracking (prevents #3 bug)
- ConfigMonotonic, OnlyValidatedConfigsReachMain: safety gates
- ConfigEventuallyApplied, CrashEventuallyRecovered: liveness

New proptests (supervisor.rs):
- path_contract_opamp_writes_where_supervisor_reads
- supervisor_intermediate_differs_from_main
- atomic_write_never_produces_partial_content
- supervisor_state_machine_always_valid (8 events, 50 steps)
- happy_path_returns_to_idle
- invalid_config_never_signals
- dead_child_never_gets_signaled
- filename_filter_matches_target

New proptests (ffwd-opamp integration):
- remote_config_path_is_yaml
- remote_config_path_none_never_panics
- remote_config_path_never_collides_with_main

New proptests (ffwd-config diff):
- reloadable_iff_server_unchanged
- total_count_equals_union_cardinality
- diff_is_deterministic

Also:
- Increase bootstrap reload channel capacity from 1 to 4
- Document TLA+ specs in tla/README.md

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant