refactor: move format handling and framing out of pipeline into input layer#581
Conversation
… layer The pipeline's input_poll_loop previously handled three concerns that belong in the input layer: newline framing, format parsing (CRI/Auto/ JSON/Raw), and remainder management. This made the pipeline aware of formats, prevented composing transports with different formats, and required every new input type to wire into the same monolithic loop. New architecture: - FormatProcessor (logfwd-io/src/format.rs): enum with Passthrough, Cri, and Auto variants. Processes framed lines into scanner-ready output. Owns CriAggregator for CRI/Auto formats. - FramedInput (logfwd-io/src/framed.rs): wraps any InputSource with newline framing (remainder management) + format processing. Implements InputSource so it's transparent to the pipeline. The pipeline now just polls → batches → sends. No format switch, no remainder handling, no CRI aggregator. Transport, framing, and format are composable: any transport (file, TCP, UDP) can use any format via FramedInput(transport, format). Tests for CRI extraction and remainder handling moved from pipeline.rs to format.rs and framed.rs respectively. Pipeline integration tests pass unchanged. Closes #573 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis change moves newline framing and format-specific processing into the input layer by adding two modules to crates/logfwd-io: Possibly related PRs
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove legacy_remainder_tests module — these tested reimplemented inline logic, not the production FramedInput path. Covered by framed.rs tests. - Remove stale comment breadcrumbs - Restore useful batching comments in input_poll_loop (explain buffered_since tracking and batch send conditions) - Fix unnecessary allocation in FramedInput remainder split: reuse self.remainder capacity via extend_from_slice instead of to_vec() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/logfwd/src/pipeline.rs (2)
509-509:⚠️ Potential issue | 🟠 Major
Format::Autois still not scanner-ready for plain-text input.
FormatProcessor::Autopassthroughs non-CRI lines verbatim, butPipeline::from_config()only enablesscan_config.keep_rawfor explicitFormat::Raw. With the new default at Line 509, a plain-text file still reaches the scanner withkeep_raw = falseand collapses into empty rows. Either treatAutoas raw-capable when buildingscan_config, or narrowAutoso it only falls back to JSON-safe input.Possible follow-up in
Pipeline::from_config- if config - .inputs - .iter() - .any(|i| matches!(i.format, Some(Format::Raw))) + if config + .inputs + .iter() + .any(|i| matches!(i.format, Some(Format::Raw | Format::Auto))) { scan_config.keep_raw = true; }Also applies to: 592-595
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/src/pipeline.rs` at line 509, Pipeline::from_config() currently treats Format::Auto as not enabling scan_config.keep_raw, which causes plain-text inputs to be collapsed; update Pipeline::from_config() so that when cfg.format is Format::Auto it sets scan_config.keep_raw = true (or otherwise marks Auto as raw-capable) OR change the default selection so Auto only applies when JSON-safe input is expected; specifically adjust the logic around Format::Auto (and the same code paths referenced at the other occurrence around the block at lines 592-595) to either enable keep_raw for Auto or restrict Auto to non-raw fallback behavior, ensuring FormatProcessor::Auto receives scan_config with keep_raw=true when plain-text passthrough is required.
526-553:⚠️ Potential issue | 🟠 MajorFail non-JSON formats for generator and OTLP inputs.
Both branches force
Format::Json, so configs liketype: generator, format: rawortype: otlp, format: crisucceed but run with different semantics than requested. Reject non-JSON explicit formats here so unsupported combinations fail at construction instead of being silently coerced.Suggested validation
InputType::Generator => { + if let Some(format) = &cfg.format { + if !matches!(format, Format::Json) { + return Err(format!( + "input '{name}': generator input only supports format=json, got {format:?}" + )); + } + } use logfwd_io::generator::{GeneratorConfig, GeneratorInput}; let events_per_sec = match cfg.listen.as_deref() { Some(s) => s.parse().map_err(|_| { @@ InputType::Otlp => { + if let Some(format) = &cfg.format { + if !matches!(format, Format::Json) { + return Err(format!( + "input '{name}': otlp input only supports format=json, got {format:?}" + )); + } + } let addr = cfg .listen .as_ref()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/src/pipeline.rs` around lines 526 - 553, The Generator and Otlp branches currently unconditionally set Format::Json, allowing configs that explicitly requested a different format to succeed; add validation in the InputType::Generator and InputType::Otlp branches to reject any explicit non-JSON format. Concretely, before creating GeneratorInput::new(name, config) and OtlpReceiverInput::new(name, addr), check cfg.format (or cfg.format.as_deref()) and if Some(fmt) != Some("json") return an Err with a clear message like "input '{name}': <generator|otlp> only supports format 'json'"; otherwise proceed to construct the source and keep Format::Json. Ensure you reference the InputType::Generator and InputType::Otlp branches, the Format enum, and the GeneratorInput::new and OtlpReceiverInput::new calls when applying the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/format.rs`:
- Around line 97-103: When encountering a malformed or non-CRI line in the else
branch (where passthrough_on_fail is checked), ensure the aggregator's pending
CRI state is cleared so a previous 'P' doesn't bleed into the next valid 'F'
record; call the aggregator reset/clear method (e.g., aggregator.reset_pending()
or aggregator.clear()—use the actual method name on the aggregator type) before
either extending out for passthrough or calling stats.inc_parse_errors(1) so the
parser's pending state is always reset at parse/fallback boundaries.
- Around line 18-28: Add the #[non_exhaustive] attribute to the public enum
FormatProcessor so future variants won't be breaking changes; update the enum
declaration for FormatProcessor (the Passthrough / Cri / Auto variants) to be
annotated with #[non_exhaustive] and run cargo check to ensure any downstream
exhaustive matches get compiler errors requiring a wildcard arm.
In `@crates/logfwd-io/src/framed.rs`:
- Around line 63-99: The hot path allocates twice: creating a new Vec for the
remainder (chunk[pos + 1..].to_vec()) and allocating a fresh Vec for every Data
event; fix by (1) replacing the to_vec() with Vec::split_off on the local chunk
(e.g. after computing pos do let tail = chunk.split_off(pos + 1); self.remainder
= tail;) so you move the tail out without an allocation, and (2) avoiding
per-event Vec::with_capacity by introducing/reusing a spare buffer field (e.g.
add spare_buf: Vec<u8> to the struct) and swap self.out_buf with self.spare_buf
(std::mem::swap(&mut self.out_buf, &mut self.spare_buf)) before pushing
InputEvent::Data { bytes: spare_buf } so capacity is preserved and no new
allocations occur; update FramedInput::poll to use these semantics around chunk,
self.remainder, self.out_buf, result_events and InputEvent::Data.
---
Outside diff comments:
In `@crates/logfwd/src/pipeline.rs`:
- Line 509: Pipeline::from_config() currently treats Format::Auto as not
enabling scan_config.keep_raw, which causes plain-text inputs to be collapsed;
update Pipeline::from_config() so that when cfg.format is Format::Auto it sets
scan_config.keep_raw = true (or otherwise marks Auto as raw-capable) OR change
the default selection so Auto only applies when JSON-safe input is expected;
specifically adjust the logic around Format::Auto (and the same code paths
referenced at the other occurrence around the block at lines 592-595) to either
enable keep_raw for Auto or restrict Auto to non-raw fallback behavior, ensuring
FormatProcessor::Auto receives scan_config with keep_raw=true when plain-text
passthrough is required.
- Around line 526-553: The Generator and Otlp branches currently unconditionally
set Format::Json, allowing configs that explicitly requested a different format
to succeed; add validation in the InputType::Generator and InputType::Otlp
branches to reject any explicit non-JSON format. Concretely, before creating
GeneratorInput::new(name, config) and OtlpReceiverInput::new(name, addr), check
cfg.format (or cfg.format.as_deref()) and if Some(fmt) != Some("json") return an
Err with a clear message like "input '{name}': <generator|otlp> only supports
format 'json'"; otherwise proceed to construct the source and keep Format::Json.
Ensure you reference the InputType::Generator and InputType::Otlp branches, the
Format enum, and the GeneratorInput::new and OtlpReceiverInput::new calls when
applying the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0777814c-7819-4d13-a125-10e6f8ad2c49
📒 Files selected for processing (4)
crates/logfwd-io/src/format.rscrates/logfwd-io/src/framed.rscrates/logfwd-io/src/lib.rscrates/logfwd/src/pipeline.rs
There was a problem hiding this comment.
crates/logfwd/src/pipeline.rs introduces a construction-time format mapper that still silently falls through unsupported input formats to passthrough (make_format, around line 500: _ => FormatProcessor::Passthrough).
This preserves the logfmt/syslog fallthrough behavior instead of failing early at input construction, so misconfigured inputs still reach the JSON scanner and fail at runtime with parse noise.
Please make unsupported input formats explicit and return an input-construction error (while keeping Json/Raw passthrough and Cri/Auto mapped to dedicated processors).
What is this? | From workflow: AI: PR Review
Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/logfwd-io/src/framed.rs (1)
63-75: 🛠️ Refactor suggestion | 🟠 MajorThe previously flagged hot-path heap churn is still present.
std::mem::take()on Line 64 leavesself.remainderempty, so Line 74 still allocates a new tail buffer whenever a partial line survives. Lines 98-99 also allocate a freshVecfor every emittedDataevent. This wrapper now sits on the input hot path, so please keep a reusable spare output buffer and reuse the remainder storage in place, or benchmark the regression explicitly.As per coding guidelines, "Do not introduce allocations in hot paths without benchmarking" and "No per-record allocations in hot path. Reuse buffers with
.clear()."Also applies to: 96-100
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/framed.rs` around lines 63 - 75, Avoid the per-poll allocations: do not call std::mem::take to replace self.remainder (which forces later reallocations) and do not allocate a fresh Vec for every emitted Data event; instead append the incoming bytes into self.remainder in-place, call memchr::memrchr on self.remainder to find the split point, then reuse a reusable spare buffer (e.g. a new field like self.spare_output) to hold the complete-chunk to emit by swapping/truncating rather than allocating; specifically update the logic around self.remainder, chunk, and the memchr::memrchr branch to push/extend into self.remainder, compute pos, then move/truncate data into the spare buffer for emission (and clear/reuse buffers afterward) so no new Vec is allocated on each poll or at the Data emission sites referenced around the current memrchr branch and the Data creation lines.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/framed.rs`:
- Around line 69-85: The tail-saving branch that does
self.remainder.extend_from_slice(&chunk[pos + 1..]) can stash an unbounded
remainder and must enforce the same MAX_REMAINDER_BYTES cap as the no-newline
branch: after extending/truncating, check if self.remainder.len() >
MAX_REMAINDER_BYTES and if so call self.stats.inc_parse_errors(1), clear the
remainder (self.remainder.clear()), and reset the framing/formatter state (e.g.
call self.formatter.reset() or otherwise clear pending CRI/Auto state) so no
stale formatter state carries into the next message; keep the existing
chunk.truncate(pos + 1) behavior for the emitted frame.
In `@crates/logfwd/src/pipeline.rs`:
- Around line 494-500: make_format currently coerces any unsupported Format into
FormatProcessor::Passthrough, which silently ignores invalid cfg.format values;
change make_format to return Result<FormatProcessor, Error> (or a suitable error
type) and validate the incoming Format enum so only supported variants are
accepted (e.g., allow Cri and Auto where used). Update callers that construct
Generator and OTLP (where cfg.format is currently assumed to be Json) to check
cfg.format explicitly and return an error when the format is not supported
instead of falling back; reference symbols: make_format, Format,
FormatProcessor, Generator, OTLP, and cfg.format to locate and enforce format
validation.
---
Duplicate comments:
In `@crates/logfwd-io/src/framed.rs`:
- Around line 63-75: Avoid the per-poll allocations: do not call std::mem::take
to replace self.remainder (which forces later reallocations) and do not allocate
a fresh Vec for every emitted Data event; instead append the incoming bytes into
self.remainder in-place, call memchr::memrchr on self.remainder to find the
split point, then reuse a reusable spare buffer (e.g. a new field like
self.spare_output) to hold the complete-chunk to emit by swapping/truncating
rather than allocating; specifically update the logic around self.remainder,
chunk, and the memchr::memrchr branch to push/extend into self.remainder,
compute pos, then move/truncate data into the spare buffer for emission (and
clear/reuse buffers afterward) so no new Vec is allocated on each poll or at the
Data emission sites referenced around the current memrchr branch and the Data
creation lines.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 141b5495-b102-4292-9919-e6e21109d816
📒 Files selected for processing (2)
crates/logfwd-io/src/framed.rscrates/logfwd/src/pipeline.rs
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Addressed all 5 unresolved review threads with focused fixes and pushed commit
Tests/validation run
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/framed.rs`:
- Around line 103-109: The current use of std::mem::take(&mut self.spare_buf)
discards spare_buf's reserved capacity causing a reallocation next poll; fix by
preserving capacity before taking: capture let cap = self.spare_buf.capacity();
then take the buffer into data (let data = std::mem::take(&mut self.spare_buf));
then restore capacity with self.spare_buf.reserve(cap); push InputEvent::Data {
bytes: data } into result_events; this keeps spare_buf capacity while still
moving its contents out after the earlier std::mem::swap with out_buf.
In `@crates/logfwd/src/pipeline.rs`:
- Around line 441-445: The poll loop in input_poll_loop only matches
InputEvent::Data and appends bytes to input.buf, so InputEvent::Rotated and
InputEvent::Truncated are being dropped; update the loop that iterates over
events to match those variants too and forward them to the FramedInput instance
(e.g., call the appropriate reset/rotate/truncate handling on input or a public
method on FramedInput) instead of ignoring them, and optionally emit a
metric/log entry when handling InputEvent::Rotated and InputEvent::Truncated for
observability; keep InputEvent::Data handling (extending input.buf) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: dd4ea1d6-a9d6-47e1-b80c-c4d0981e9c14
📒 Files selected for processing (3)
crates/logfwd-io/src/format.rscrates/logfwd-io/src/framed.rscrates/logfwd/src/pipeline.rs
- preserve FramedInput spare buffer capacity when emitting Data events - handle Rotated/Truncated input events in pipeline for observability Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Addressed the two unresolved review threads in this PR and pushed commit
Both addressed threads have been resolved. Tests run
Environment limitations encountered:
Note 🔒 Integrity filtering filtered 1 itemIntegrity filtering activated and filtered the following item during workflow execution.
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd/src/pipeline.rs`:
- Around line 442-446: The current handling treats InputEvent::Rotated and
InputEvent::Truncated as errors by calling input.stats.inc_errors(); instead add
a dedicated counter for expected operational events: add a new method on the
stats/metrics type (e.g., Stats::inc_rotations() or inc_rotations_truncations())
and invoke that from the pipeline where InputEvent::Rotated and
InputEvent::Truncated are matched instead of inc_errors(); also update the
metrics registration/initialization to expose this new counter so
rotation/truncation events are tracked separately from true errors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d58a84a5-cd06-4e0e-ad64-e83d701b027c
📒 Files selected for processing (2)
crates/logfwd-io/src/framed.rscrates/logfwd/src/pipeline.rs
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Addressed the remaining unresolved review feedback.
Tests run
Note 🔒 Integrity filtering filtered 1 itemIntegrity filtering activated and filtered the following item during workflow execution.
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/diagnostics.rs`:
- Around line 631-638: The pipelines API test doesn't assert the new rotations
field is serialized; update the fixture used by test_pipelines_endpoint to seed
a nonzero rotations value (e.g., set rotations = 1 on the PipelineStats or
fixture builder) and add an assertion that the returned JSON contains
"rotations":1; also apply the same change to the other pipeline-related test
that builds the same fixture/serialization path (the block testing serialization
around the other pipeline endpoint/code that uses the same format) so both tests
validate rotations serialization. Ensure you modify the fixture construction
used by test_pipelines_endpoint and the duplicate fixture in the other test so
the atomic counter increment test and the /api/pipelines response assertion both
verify rotations is present and equals 1.
- Around line 73-76: Add a /// doc comment on the public method inc_rotations()
describing its behavior: note that it increments both rotations_total and
otel_rotations and is invoked by the pipeline for both Rotated and Truncated
events (i.e., it is not strictly "rotation-only"); mention the memory/order
semantics if useful (uses Ordering::Relaxed) and that it updates OpenTelemetry
metrics using otel_attrs so callers understand side effects and usage
expectations.
In `@crates/logfwd/src/pipeline.rs`:
- Around line 520-537: The validation in validate_input_format currently rejects
Format::Cri and Format::Auto for InputType::Udp and InputType::Tcp; remove that
branch (the InputType::Udp | InputType::Tcp arm that returns Err for Cri/Auto)
so TCP/UDP inputs are not blocked from using CRI/Auto. Keep the existing
Generator/Otlp check for Format::Json unchanged and allow the pipeline to rely
on FramedInput/FormatProcessor (e.g., FormatProcessor::cri/auto) to handle
CRI/Auto framing for TCP/UDP.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 69151d76-1ab1-4703-9d4c-ed85445230b4
📒 Files selected for processing (2)
crates/logfwd-io/src/diagnostics.rscrates/logfwd/src/pipeline.rs
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Addressed all 3 unresolved CodeRabbit threads with minimal fixes and pushed commit
Validation
Note 🔒 Integrity filtering filtered 1 itemIntegrity filtering activated and filtered the following item during workflow execution.
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
|
Reviewed the current PR feedback state and there are no unresolved review threads remaining to address in this run. No code changes were required, so there was nothing to push and no additional threads to resolve. Validation
Note 🔒 Integrity filtering filtered 1 itemIntegrity filtering activated and filtered the following item during workflow execution.
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
Replace Vec::with_capacity(self.out_buf.capacity()) per emit with a spare_buf swap pattern. The 64KB capacity bounces between out_buf and spare_buf across polls — no allocation after initialization. Before: every emit created a new 64KB Vec (Vec::with_capacity) After: take out_buf, swap in spare_buf's capacity, zero allocations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Move format handling (CRI/Auto/JSON/Raw) and newline framing out of
input_poll_loopinpipeline.rsinto composable input-layer components inlogfwd-io. The pipeline no longer knows about formats — it just polls for scanner-ready bytes.New components:
FormatProcessor(logfwd-io/src/format.rs) — enum with Passthrough, Cri, Auto variants. Processes framed lines into scanner-ready output.FramedInput(logfwd-io/src/framed.rs) — wraps anyInputSourcewith newline framing + format processing. ImplementsInputSourcetransparently.Pipeline simplification:
InputStatereduced from 7 fields to 3 (source, buf, stats)input_poll_loopreduced from ~130 lines to ~50 — just poll → batch → sendextract_cri_messagesdeleted from pipeline.rs (moved to format.rs)MAX_REMAINDER_BYTESmoved to framed.rsComposability: Any transport (file, TCP, UDP) can use any format:
WIP — Known remaining work
just clippycleanTest plan
Closes #573
🤖 Generated with Claude Code