Extract FormatParser trait, simplify pipeline hot loop#13
Merged
Conversation
Introduce FormatParser trait in logfwd-core with JsonParser, RawParser, and CriParser implementations. This replaces the format match arm and inline accumulate_json_lines/accumulate_raw_lines functions that were embedded in pipeline_v2.rs. Pipeline InputState is simplified from 4 format-related fields (format, reassembler, partial_line + the match dispatch) to a single Box<dyn FormatParser>. The hot loop is now just: let n = input.parser.process(&bytes, &mut input.json_buf); New formats (logfmt, syslog) can be added by implementing the trait. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduce FormatParser trait in logfwd-core with JsonParser, RawParser, and CriParser implementations. This replaces the format match arm and inline accumulate_json_lines/accumulate_raw_lines functions that were embedded in pipeline_v2.rs. Pipeline InputState simplified from 4 format-related fields to a single Box<dyn FormatParser>. The hot loop is now: let n = input.parser.process(&bytes, &mut input.json_buf); Also fixes clippy warnings in grok/regexp_extract UDFs from PR #12 (Default impls, collapsed if-let chains). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1154b1f to
33e53b3
Compare
- Rename pipeline_v2.rs → pipeline.rs (v1 was removed long ago) - Rename run_v2_pipelines() → run_pipelines() - Update module doc, struct doc, and error messages to drop "v2" - Replace #[allow(dead_code)] with #[expect(dead_code, reason = ...)] - Remove stale "v1" comment in batch_builder.rs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove wildcard re-exports from logfwd-output — only FanOut, BatchMetadata, OutputSink, build_output_sink, Compression, and parse_column_name are public. Sink implementations are crate-private. - Mark placeholder modules (elasticsearch, loki, parquet) with #[allow(dead_code)] at module level since they're not wired in yet. - Use #[expect(dead_code, reason = ...)] for tail.rs path field. - Remove unused Float64Array import in batch_builder tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
strawgate
added a commit
that referenced
this pull request
Apr 11, 2026
Systematic audit found 30 potential issues from agent-authored code. After parallel verification by 11 subagents, 13 were confirmed real, 8 were by-design, 5 were won't-fix, and 5 were false positives. This commit fixes all 12 actionable confirmed findings. **High severity:** - Fix OtherStr panic: OTLP sink crashed on non-string attribute types (e.g., hash() UDF returning UInt64). Replaced unreachable!() with array_value_to_string(). Removed dead str_value() function. (#7) - Fix silent struct drop: non-conflict Struct columns now log a warning before being skipped, matching the resource struct behavior. (#6) **Medium severity:** - Fix scanner contract drift: SCANNER_CONTRACT.md said "no escape decoding" but implementation decodes since PR #885. Updated doc. (#19) - Deduplicate calendar math: made core's Kani-verified days_from_civil public; arrow's wrapper now delegates instead of reimplementing. (#21) - Centralize metadata keys: added METADATA_RESOURCE_KEY and METADATA_RESOURCE_PREFIX constants to field_names.rs, replacing 15 bare string literals across 4 files / 3 crates. (#15) - Add TypedColumn::Bytes variant: OTAP bytes attributes now round-trip as BinaryArray instead of being hex-encoded to strings. (#16) **Low severity:** - Deduplicate WELL_KNOWN arrays: star_schema.rs now delegates to field_names::matches_any() instead of maintaining a local copy. Added logfwd-types dependency to logfwd-arrow. (#13) - Centralize _raw column name: added field_names::RAW constant. (#12) - Extract MAX_REQUEST_BODY_SIZE: shared constant in receiver_http.rs replaces 3 independent definitions. (#27) - Import DEFAULT_RETRY_AFTER_SECS: otap_sink and arrow_ipc_sink now import from http_classify instead of redefining. (#29) - Name timing defaults: pipeline build.rs and input_build.rs now use named constants instead of inline unwrap_or literals. (#30) - Add timestamp diagnostic: tracing::debug!() on timestamp parse fallback for operator visibility. (#1) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
FormatParsertrait inlogfwd-core::formatwith three implementations:JsonParser— passes through newline-delimited JSON, carries partial linesRawParser— wraps lines as{"_raw":"<escaped>"}CriParser— parses CRI container log format, reassembles partialspipeline_v2.rs::InputStatefrom 4 format-related fields to oneBox<dyn FormatParser>input.parser.process(&bytes, &mut input.json_buf)Before (pipeline_v2.rs hot loop)
After
Test plan
cargo test --workspace— 93 tests pass (91 original + 6 new format tests - 4 moved)cargo clippy --workspace -- -D warnings— cleancargo fmt --check— clean🤖 Generated with Claude Code