feat: create logfwd-arrow crate, move builders + scanner structs (Step 1)#307
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (6)
WalkthroughA new crate Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
crates/logfwd-core/src/scanner.rs (2)
20-30: 🛠️ Refactor suggestion | 🟠 MajorAdd doc comment for newly public
ScanBuildertrait.This trait is now part of the public API surface. Per coding guidelines, public APIs must have doc comments explaining the trait's purpose and method contracts.
+/// Builder interface for constructing tabular output from scanned JSON lines. +/// +/// Implementations populate typed columns (string, int, float) from parsed fields. pub trait ScanBuilder {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 20 - 30, Add a doc comment for the newly public trait ScanBuilder describing its purpose and contract: explain that ScanBuilder orchestrates parsing/emitting rows in a batch (methods: begin_batch, begin_row, end_row), how resolve_field maps a byte-key to a field index, and the semantics expected for the append_* methods (append_str_by_idx, append_int_by_idx, append_float_by_idx, append_null_by_idx) including when they may be called relative to begin_row/end_row; also document append_raw behavior and any thread-safety/ownership expectations and whether implementations must tolerate out-of-order or repeated resolves. Keep the comment concise, use rustdoc style (///) above the trait, and mention any panics or return-value guarantees for resolve_field and append_*.
285-347:⚠️ Potential issue | 🟠 MajorRemove duplicate
SimdScannerandStreamingSimdScannerstructs fromlogfwd-core.Both structs are identically defined in
crates/logfwd-core/src/scanner.rsandcrates/logfwd-arrow/src/scanner.rs. All production code imports fromlogfwd_arrow::scanner; thelogfwd-coreversions are unused except in local tests. This duplication creates maintenance overhead and the illusion of choice (different types despite identical definitions). Keep only thelogfwd-arrowversions and updatelogfwd-coretests to import from there as well.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 285 - 347, Delete the duplicate SimdScanner and StreamingSimdScanner type definitions and their impl blocks (including methods new and scan and uses of StorageBuilder, StreamingBuilder, and scan_into) from crates/logfwd-core/src/scanner.rs, and update any tests in logfwd-core to import and use the canonical implementations from logfwd_arrow::scanner (e.g., use logfwd_arrow::scanner::{SimdScanner, StreamingSimdScanner};) so production code only uses the single source of truth in logfwd-arrow; ensure any test helpers that constructed StorageBuilder/StreamingBuilder are adapted to the public API from logfwd-arrow.crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs (1)
22-26:⚠️ Potential issue | 🔴 Critical
scan()calls returnResult, used without unwrapping.Both
storage_scanner.scan(data)andstreaming_scanner.scan(...)returnResult<RecordBatch, ArrowError>, but the results are used directly asRecordBatch. This won't compile.🐛 Proposed fix
let mut storage_scanner = SimdScanner::new(ScanConfig::default()); - let storage_batch = storage_scanner.scan(data); + let storage_batch = storage_scanner.scan(data).expect("fuzz: storage scan failed"); let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default()); - let streaming_batch = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)); + let streaming_batch = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)).expect("fuzz: streaming scan failed");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs` around lines 22 - 26, The calls to scan() on SimdScanner and StreamingSimdScanner return Result<RecordBatch, ArrowError> but the code treats them as RecordBatch; update the two uses (storage_scanner.scan and streaming_scanner.scan) to handle the Result (e.g., call .expect or .unwrap with a descriptive message, or propagate the error) so you obtain a RecordBatch before further use; locate the calls to SimdScanner::new / StreamingSimdScanner::new and replace direct assignment of storage_batch and streaming_batch with proper Result handling.crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs (1)
60-61:⚠️ Potential issue | 🔴 Critical
scan()returnsResult, but treated asRecordBatchdirectly.Line 61 calls
batch.num_rows(), butscanner.scan(data)returnsResult<RecordBatch, ArrowError>. Must unwrap before use.🐛 Fix
- let batch = scanner.scan(data); + let batch = scanner.scan(data).expect("fuzz: scan failed");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs` around lines 60 - 61, scanner.scan(data) returns Result<RecordBatch, ArrowError> but the code treats its return as a RecordBatch; unwrap or handle the Result before calling batch.num_rows(). Locate the call to scanner.scan in chunk_index fuzz target, replace the direct use with proper result handling (e.g., match/if let or expect) to obtain a RecordBatch value and then call num_rows(), ensuring ArrowError is propagated or logged accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-arrow/src/scanner.rs`:
- Around line 109-127: Add doc comments for the public APIs SimdScanner::new and
SimdScanner::scan: document what SimdScanner represents and what configuration
(ScanConfig) is used, describe SimdScanner::new(config) returns a new scanner
built with the given config and builder behavior, and describe
SimdScanner::scan(&mut self, buf: &[u8]) -> Result<RecordBatch, ArrowError>
including its UTF-8 validation behavior, the effect of ScanConfig.validate_utf8,
and what errors/RecordBatch are returned; ensure comments use /// and briefly
mention side-effects (uses StorageBuilder and finish_batch) so public API
documentation is complete.
- Around line 143-161: Add public API documentation for
StreamingSimdScanner::new and StreamingSimdScanner::scan: add a short /// doc
comment above new describing it constructs a StreamingSimdScanner from a
ScanConfig and initializes the StreamingBuilder, and add a /// doc comment above
scan describing that it consumes a bytes::Bytes buffer, optionally validates
UTF‑8 when config.validate_utf8 is true, begins a batch, performs scanning via
scan_into, and returns a RecordBatch or ArrowError; include examples of
parameters and return value and note error conditions (invalid UTF‑8 or scan
failures) so callers understand usage and failure modes.
In `@crates/logfwd-arrow/src/storage_builder.rs`:
- Around line 52-58: Add a short "# Usage" example to the doc comment for the
self-contained persistence builder showing the typical scan → build → compress →
finish_batch flow (mirror the existing example on StreamingBuilder). Mention the
builder type (StorageBuilder), constructing it, appending records during a scan,
and calling finish_batch (and any resulting return type) so users can copy-paste
a minimal example; keep it concise and consistent with the StreamingBuilder
example.
- Around line 109-121: The duplicate-detection in check_dup only handles idx <
64 and silently ignores larger indices; do two things: extract a shared constant
(e.g., MAX_INLINE_BITS = 64) and a shared helper (e.g., check_dup_bits) used by
this check_dup and StreamingBuilder::check_dup so behavior is consistent, and
replace the else branch with proper handling for idx >= MAX_INLINE_BITS — either
grow a bit-storage (Vec<u64> or similar backing field instead of a single
written_bits u64) and set/test the appropriate word/bit, or explicitly
bound-check and return an error/panic if larger indices should be illegal.
Update the written_bits field and both builders to use the shared helper and
constant so duplicate detection works the same way for indices ≥64.
In `@crates/logfwd-arrow/src/streaming_builder.rs`:
- Around line 84-87: The written_bits u64 field in StreamingBuilder
(written_bits) silently disables duplicate-key protection for field indexes >=
64; update the implementation to either emit a log/metric when an index exceeds
63 (so we can detect/alert on the overflow) and/or add overflow protection using
a secondary structure (e.g., a BitVec, Vec<bool>, or HashSet<usize>) to track
written fields beyond 63; modify places that set/check written_bits (references:
written_bits field and any methods that call/set it in StreamingBuilder) to
consult the overflow set when index >= 64 and to log/metric the first time the
threshold is hit.
- Around line 65-79: The public method append_raw on StreamingBuilder is
currently a silent no-op which confuses callers; update the API by either (A)
adding a clear doc comment on StreamingBuilder::append_raw stating it's
intentionally a no-op (and why, e.g., raw columns not supported or handled
elsewhere) and point callers to the supported methods, or (B) change the
signature to return a Result or log an explicit warning so callers get feedback;
apply the same treatment to the other no-op occurrence around the append_raw at
the 211–213 region so both places consistently document or surface the no-op
behavior.
- Around line 142-153: Add a short doc comment above the fn offset_of(&self,
value: &[u8]) -> u32 that explicitly states the safety invariant: that value
must be a subslice of self.buf (so value.as_ptr() and value.len() lie within
self.buf.as_ptr()..self.buf.as_ptr().add(self.buf.len())), the function uses
usize pointer arithmetic to avoid offset_from UB, and the debug_assert enforces
this in debug builds; reference self.buf, value, and the returned byte offset to
clarify intent for future maintainers.
- Around line 277-281: The unsafe call to builder.append_view_unchecked(...)
violates the "Do not use unsafe code" guideline; replace this unsafe block with
the safe API (e.g., call builder.append_value or the equivalent safe append
method) so UTF‑8 validity is enforced safely (you already validate in
append_str_by_idx), or if zero-copy is absolutely required, remove the unsafe
usage only after adding a documented exception with benchmarks and rationale;
locate the usage by searching for builder.append_view_unchecked in
streaming_builder.rs and adjust the append path to use the safe append_value
method (or wrap the operation in a clearly documented, benchmarked exception).
In `@crates/logfwd-core/fuzz/Cargo.toml`:
- Line 3: The Cargo.toml incorrectly lists the dependency logfwd-arrow inside
the [package] section; remove the logfwd-arrow = { path = "../../logfwd-arrow" }
entry from the [package] block and ensure the same dependency remains (and only)
under the [dependencies] section so Cargo reads it correctly; search for the
symbol "logfwd-arrow" and delete the duplicate in the package metadata while
keeping the declaration under [dependencies].
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 112-113: Add a public doc comment to the newly public function
scan_into explaining its purpose, parameters, behavior, and preconditions:
describe that scan_into(buf: &[u8], config: &ScanConfig, builder: &mut B) scans
the supplied byte buffer using the provided ScanConfig and feeds parsed
records/events into the ScanBuilder implementation B; document each parameter
(buf, config, builder), the UTF‑8 precondition that buf must contain valid UTF‑8
(or explain how non‑UTF‑8 is handled), and describe the observable effects and
any panics/errors the function may produce; reference the ScanBuilder trait and
ScanConfig type in the docs so callers know where to look for expected builder
behavior and config options.
---
Outside diff comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs`:
- Around line 60-61: scanner.scan(data) returns Result<RecordBatch, ArrowError>
but the code treats its return as a RecordBatch; unwrap or handle the Result
before calling batch.num_rows(). Locate the call to scanner.scan in chunk_index
fuzz target, replace the direct use with proper result handling (e.g., match/if
let or expect) to obtain a RecordBatch value and then call num_rows(), ensuring
ArrowError is propagated or logged accordingly.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs`:
- Around line 22-26: The calls to scan() on SimdScanner and StreamingSimdScanner
return Result<RecordBatch, ArrowError> but the code treats them as RecordBatch;
update the two uses (storage_scanner.scan and streaming_scanner.scan) to handle
the Result (e.g., call .expect or .unwrap with a descriptive message, or
propagate the error) so you obtain a RecordBatch before further use; locate the
calls to SimdScanner::new / StreamingSimdScanner::new and replace direct
assignment of storage_batch and streaming_batch with proper Result handling.
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 20-30: Add a doc comment for the newly public trait ScanBuilder
describing its purpose and contract: explain that ScanBuilder orchestrates
parsing/emitting rows in a batch (methods: begin_batch, begin_row, end_row), how
resolve_field maps a byte-key to a field index, and the semantics expected for
the append_* methods (append_str_by_idx, append_int_by_idx, append_float_by_idx,
append_null_by_idx) including when they may be called relative to
begin_row/end_row; also document append_raw behavior and any
thread-safety/ownership expectations and whether implementations must tolerate
out-of-order or repeated resolves. Keep the comment concise, use rustdoc style
(///) above the trait, and mention any panics or return-value guarantees for
resolve_field and append_*.
- Around line 285-347: Delete the duplicate SimdScanner and StreamingSimdScanner
type definitions and their impl blocks (including methods new and scan and uses
of StorageBuilder, StreamingBuilder, and scan_into) from
crates/logfwd-core/src/scanner.rs, and update any tests in logfwd-core to import
and use the canonical implementations from logfwd_arrow::scanner (e.g., use
logfwd_arrow::scanner::{SimdScanner, StreamingSimdScanner};) so production code
only uses the single source of truth in logfwd-arrow; ensure any test helpers
that constructed StorageBuilder/StreamingBuilder are adapted to the public API
from logfwd-arrow.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0b0fc98c-720d-4358-b64c-cfd0f45e694b
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (26)
Cargo.tomlcrates/logfwd-arrow/Cargo.tomlcrates/logfwd-arrow/src/lib.rscrates/logfwd-arrow/src/scanner.rscrates/logfwd-arrow/src/storage_builder.rscrates/logfwd-arrow/src/streaming_builder.rscrates/logfwd-bench/Cargo.tomlcrates/logfwd-bench/benches/pipeline.rscrates/logfwd-core/Cargo.tomlcrates/logfwd-core/benches/scanner.rscrates/logfwd-core/examples/arrow_ipc_roundtrip.rscrates/logfwd-core/fuzz/Cargo.tomlcrates/logfwd-core/fuzz/fuzz_targets/chunk_index.rscrates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rscrates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rscrates/logfwd-core/src/scanner.rscrates/logfwd-core/tests/compliance_data.rscrates/logfwd-core/tests/scanner_conformance.rscrates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/tests/scanner_datafusion_boundary.rscrates/logfwd/Cargo.tomlcrates/logfwd/src/pipeline.rscrates/logfwd/tests/integration.rs
| @@ -1,5 +1,6 @@ | |||
| [package] | |||
| name = "logfwd-core-fuzz" | |||
| logfwd-arrow = { path = "../../logfwd-arrow" } | |||
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
import tomllib
from pathlib import Path
p = Path("crates/logfwd-core/fuzz/Cargo.toml")
data = tomllib.loads(p.read_text())
pkg = data.get("package", {})
deps = data.get("dependencies", {})
print("package contains logfwd-arrow:", "logfwd-arrow" in pkg)
print("dependencies contains logfwd-arrow:", "logfwd-arrow" in deps)
print("package keys:", sorted(pkg.keys()))
PYRepository: strawgate/memagent
Length of output: 222
Remove dependency from [package] section.
logfwd-arrow is declared on Line 3 within [package], which Cargo ignores as invalid metadata. Keep the dependency only under [dependencies] on Line 14.
Proposed fix
[package]
name = "logfwd-core-fuzz"
-logfwd-arrow = { path = "../../logfwd-arrow" }
version = "0.0.0"
publish = false
edition = "2021"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logfwd-arrow = { path = "../../logfwd-arrow" } | |
| [package] | |
| name = "logfwd-core-fuzz" | |
| version = "0.0.0" | |
| publish = false | |
| edition = "2021" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/fuzz/Cargo.toml` at line 3, The Cargo.toml incorrectly
lists the dependency logfwd-arrow inside the [package] section; remove the
logfwd-arrow = { path = "../../logfwd-arrow" } entry from the [package] block
and ensure the same dependency remains (and only) under the [dependencies]
section so Cargo reads it correctly; search for the symbol "logfwd-arrow" and
delete the duplicate in the package metadata while keeping the declaration under
[dependencies].
f45366b to
5997536
Compare
|
@coderabbitai full review please be brutal |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-core/src/scanner.rs (1)
16-26: 🛠️ Refactor suggestion | 🟠 MajorDocument
ScanBuilderbefore exposing it.
ScanBuilderis now public, but downstream implementers still have to infer callback ordering and method invariants from the code. Please add rustdoc for the trait and its required call sequence.As per coding guidelines, "All public APIs must have doc comments."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 16 - 26, Add rustdoc for the public trait ScanBuilder describing its purpose and the required call sequence and invariants: explain when begin_batch, begin_row, end_row are called and that resolve_field returns an index used by append_*_by_idx, that append_raw receives an entire line, and how append_null_by_idx should be used; document expected ordering (begin_batch -> repeat(begin_row -> resolve_field? -> append_*_by_idx* -> end_row) -> end of batch), thread-safety/ownership expectations for key and value byte slices, and any error/overflow behavior implementers must handle (e.g., what to do with unknown fields or out-of-range idx). Reference the trait and its methods (ScanBuilder, begin_batch, begin_row, end_row, resolve_field, append_str_by_idx, append_int_by_idx, append_float_by_idx, append_null_by_idx, append_raw) so implementers can find the contract.
♻️ Duplicate comments (3)
crates/logfwd-core/fuzz/Cargo.toml (1)
3-14:⚠️ Potential issue | 🟡 MinorRemove misplaced dependency key from
[package].Line 3 defines
logfwd-arrowin[package], which is not a dependency section. Keep it only in[dependencies](Line 14). This was already flagged previously and remains unresolved.💡 Proposed fix
[package] name = "logfwd-core-fuzz" -logfwd-arrow = { path = "../../logfwd-arrow" } version = "0.0.0" publish = false edition = "2021"Run this read-only check to confirm placement:
#!/bin/bash python - <<'PY' import tomllib from pathlib import Path p = Path("crates/logfwd-core/fuzz/Cargo.toml") data = tomllib.loads(p.read_text()) pkg = data.get("package", {}) deps = data.get("dependencies", {}) print("package has logfwd-arrow:", "logfwd-arrow" in pkg) print("dependencies has logfwd-arrow:", "logfwd-arrow" in deps) print("package keys:", sorted(pkg.keys())) PYExpected result:
package has logfwd-arrow: Falseanddependencies has logfwd-arrow: True.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/Cargo.toml` around lines 3 - 14, The Cargo.toml has a misplaced dependency entry "logfwd-arrow" inside the [package] table; remove the "logfwd-arrow = { path = \"../../logfwd-arrow\" }" key from the [package] section so it only appears under [dependencies], ensuring the [package] keys remain version, publish, and edition and that the dependency stays as logfwd-arrow = { path = "../../logfwd-arrow" } in the [dependencies] table.crates/logfwd-core/src/scanner.rs (1)
32-33: 🛠️ Refactor suggestion | 🟠 MajorAdd rustdoc to
scan_into.This newly public function has a non-obvious UTF-8 precondition and builder side effects, but no rustdoc yet.
As per coding guidelines, "All public APIs must have doc comments."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 32 - 33, Add a rustdoc comment to the public function scan_into to describe its purpose, parameters, UTF-8 precondition for buf, what ScanConfig controls, and the side effects on the ScanBuilder implementor (including any items pushed/modified and whether the function may panic or return early). Reference the function signature (scan_into), the ScanBuilder trait and ScanConfig type in the doc so callers know the contract: that buf must be valid UTF-8 (or document how invalid UTF-8 is handled), what builder state is mutated, and any threading/ownership requirements. Ensure the doc follows project style (///) and includes examples or links if appropriate.crates/logfwd-arrow/src/scanner.rs (1)
109-126: 🛠️ Refactor suggestion | 🟠 MajorAdd rustdoc to the new public entry points.
SimdScanner::new/scanandStreamingSimdScanner::new/scanare public API now, but their method-level contract is still undocumented (validate_utf8, ownership/lifetime, and unsupported options likekeep_raw).As per coding guidelines, "All public APIs must have doc comments."
Also applies to: 143-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-arrow/src/scanner.rs` around lines 109 - 126, Add rustdoc comments for the public constructors and methods to document their contracts: annotate SimdScanner::new, SimdScanner::scan and StreamingSimdScanner::new, StreamingSimdScanner::scan with short descriptions of behavior and parameters, explicitly document what validate_utf8 does, the ownership/lifetime expectations of the buf parameter (e.g., that scan borrows the slice and does not take ownership), and the semantics/limitations of config options like keep_raw and any unsupported options; include examples or note corner cases (errors returned, UTF-8 validation behavior) so callers understand usage and error conditions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-arrow/src/scanner.rs`:
- Around line 92-94: The append_raw implementation in StreamingSimdScanner is a
no-op so when ScanConfig.keep_raw is true the streaming path silently drops
_raw; update StreamingSimdScanner to fail fast by checking ScanConfig.keep_raw
during construction or at scan startup and return an ArrowError (or propagate an
error) if true, referencing append_raw, StreamingSimdScanner, StreamingBuilder,
and ScanConfig.keep_raw; alternatively implement plumbing to write _raw through
StreamingBuilder, but if choosing the minimal change prefer the fail-fast check
and add a streaming regression test that sets keep_raw=true to assert the
scanner returns the expected error (also apply the same fix where similar no-op
code exists around append_raw at the other affected block).
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs`:
- Line 15: The calls to SimdScanner::scan() and StreamingSimdScanner::scan()
return Result<RecordBatch, ArrowError> but are treated as RecordBatch; update
the comparison logic in fuzz_targets/scanner_consistency.rs to match on the two
Results (from SimdScanner::scan and StreamingSimdScanner::scan) and only compare
the RecordBatch values when both are Ok, otherwise return early or handle Err
cases (e.g., skip, log, or assert both Err). Specifically, locate the variables
produced by SimdScanner::scan() and StreamingSimdScanner::scan() and replace
direct uses with a match (Ok(batch_a), Ok(batch_b)) => compare batches, (Err(_),
Err(_)) => return/continue, and other mixed-Err patterns => handle or return as
appropriate.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rs`:
- Line 15: SimdScanner::scan() returns Result<RecordBatch, ArrowError> but the
diff assigns it directly to batch and later uses batch as a RecordBatch; update
the code around the call to SimdScanner::scan() (the variable named batch) to
handle the Result by unwrapping or propagating the error: either use the ?
operator (if the surrounding fn returns Result) or match the Result and on Err
convert/return/skip appropriately, then bind the Ok(record_batch) to batch so
subsequent uses at the sites referencing batch (later code that expects a
RecordBatch around where scan() is called) operate on a concrete RecordBatch;
ensure ArrowError is handled consistently (map it to the fuzz harness error path
or continue) and remove any wrong type assumptions.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rs`:
- Line 21: SimdScanner::scan returns Result<RecordBatch, ArrowError>, so don't
assign it directly to batch; instead handle the Result from scanner.scan()
(e.g., let batch = scanner.scan()? if the surrounding function returns Result,
or use match/if let Ok(batch) = scanner.scan() { execute(batch) } else { /*
handle or return error */ }) before calling execute(), referencing
scanner.scan(), the batch variable, and execute() to locate the change.
In `@crates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rs`:
- Line 14: StreamingSimdScanner::scan returns Result<RecordBatch, ArrowError>
but the code assigns that Result directly and passes it to validate_batch which
expects &RecordBatch; change the code to call StreamingSimdScanner::scan(),
match or use ?/unwrap to handle the Result, on Ok(batch) pass &batch to
validate_batch (or bind as mut if needed), and on Err(err) either propagate the
error or handle/log and continue; update any variables (the ones receiving
scan's return) to be RecordBatch after unwrapping, not Result<RecordBatch,
ArrowError>, so validate_batch(&batch) compiles.
---
Outside diff comments:
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 16-26: Add rustdoc for the public trait ScanBuilder describing its
purpose and the required call sequence and invariants: explain when begin_batch,
begin_row, end_row are called and that resolve_field returns an index used by
append_*_by_idx, that append_raw receives an entire line, and how
append_null_by_idx should be used; document expected ordering (begin_batch ->
repeat(begin_row -> resolve_field? -> append_*_by_idx* -> end_row) -> end of
batch), thread-safety/ownership expectations for key and value byte slices, and
any error/overflow behavior implementers must handle (e.g., what to do with
unknown fields or out-of-range idx). Reference the trait and its methods
(ScanBuilder, begin_batch, begin_row, end_row, resolve_field, append_str_by_idx,
append_int_by_idx, append_float_by_idx, append_null_by_idx, append_raw) so
implementers can find the contract.
---
Duplicate comments:
In `@crates/logfwd-arrow/src/scanner.rs`:
- Around line 109-126: Add rustdoc comments for the public constructors and
methods to document their contracts: annotate SimdScanner::new,
SimdScanner::scan and StreamingSimdScanner::new, StreamingSimdScanner::scan with
short descriptions of behavior and parameters, explicitly document what
validate_utf8 does, the ownership/lifetime expectations of the buf parameter
(e.g., that scan borrows the slice and does not take ownership), and the
semantics/limitations of config options like keep_raw and any unsupported
options; include examples or note corner cases (errors returned, UTF-8
validation behavior) so callers understand usage and error conditions.
In `@crates/logfwd-core/fuzz/Cargo.toml`:
- Around line 3-14: The Cargo.toml has a misplaced dependency entry
"logfwd-arrow" inside the [package] table; remove the "logfwd-arrow = { path =
\"../../logfwd-arrow\" }" key from the [package] section so it only appears
under [dependencies], ensuring the [package] keys remain version, publish, and
edition and that the dependency stays as logfwd-arrow = { path =
"../../logfwd-arrow" } in the [dependencies] table.
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 32-33: Add a rustdoc comment to the public function scan_into to
describe its purpose, parameters, UTF-8 precondition for buf, what ScanConfig
controls, and the side effects on the ScanBuilder implementor (including any
items pushed/modified and whether the function may panic or return early).
Reference the function signature (scan_into), the ScanBuilder trait and
ScanConfig type in the doc so callers know the contract: that buf must be valid
UTF-8 (or document how invalid UTF-8 is handled), what builder state is mutated,
and any threading/ownership requirements. Ensure the doc follows project style
(///) and includes examples or links if appropriate.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 37b73cd7-a010-4c39-a202-3bc7afb66a36
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/logfwd-arrow/Cargo.tomlcrates/logfwd-arrow/src/lib.rscrates/logfwd-arrow/src/scanner.rscrates/logfwd-arrow/src/storage_builder.rscrates/logfwd-arrow/src/streaming_builder.rscrates/logfwd-bench/Cargo.tomlcrates/logfwd-bench/benches/pipeline.rscrates/logfwd-core/Cargo.tomlcrates/logfwd-core/benches/scanner.rscrates/logfwd-core/examples/arrow_ipc_roundtrip.rscrates/logfwd-core/fuzz/Cargo.tomlcrates/logfwd-core/fuzz/fuzz_targets/chunk_index.rscrates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rscrates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rscrates/logfwd-core/src/lib.rscrates/logfwd-core/src/scanner.rscrates/logfwd-core/tests/compliance_data.rscrates/logfwd-core/tests/scanner_conformance.rscrates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/tests/scanner_datafusion_boundary.rscrates/logfwd/Cargo.tomlcrates/logfwd/src/pipeline.rscrates/logfwd/tests/integration.rs
💤 Files with no reviewable changes (1)
- crates/logfwd-core/src/lib.rs
| fn append_raw(&mut self, _line: &[u8]) { | ||
| // StreamingBuilder does not support _raw column | ||
| } |
There was a problem hiding this comment.
Fail fast on keep_raw in the streaming path.
append_raw() is a no-op here, so ScanConfig.keep_raw = true silently drops _raw for StreamingSimdScanner while SimdScanner emits it. That makes the same config produce different schemas and loses requested data. Either plumb _raw through StreamingBuilder or return an ArrowError before scanning, and add a streaming regression test.
🛠️ Minimal fail-fast option
pub fn scan(&mut self, buf: bytes::Bytes) -> Result<RecordBatch, ArrowError> {
+ if self.config.keep_raw {
+ return Err(ArrowError::InvalidArgumentError(
+ "StreamingSimdScanner does not support ScanConfig.keep_raw".into(),
+ ));
+ }
if self.config.validate_utf8 {
std::str::from_utf8(&buf).map_err(|e| {
ArrowError::InvalidArgumentError(format!(Also applies to: 150-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-arrow/src/scanner.rs` around lines 92 - 94, The append_raw
implementation in StreamingSimdScanner is a no-op so when ScanConfig.keep_raw is
true the streaming path silently drops _raw; update StreamingSimdScanner to fail
fast by checking ScanConfig.keep_raw during construction or at scan startup and
return an ArrowError (or propagate an error) if true, referencing append_raw,
StreamingSimdScanner, StreamingBuilder, and ScanConfig.keep_raw; alternatively
implement plumbing to write _raw through StreamingBuilder, but if choosing the
minimal change prefer the fail-fast check and add a streaming regression test
that sets keep_raw=true to assert the scanner returns the expected error (also
apply the same fix where similar no-op code exists around append_raw at the
other affected block).
🧩 Analysis chain
🏁 Script executed: Length of output: 51489 Let me check the Arrow reference docs and a few more things. 🏁 Script executed: Length of output: 10114
Review of PR
|
| Category | Count |
|---|---|
| 🔴 Critical (correctness / safety / broken contract) | 3 |
| 🟠 Design (fragility / leaky abstraction / silent bugs) | 5 |
| 🟡 Test coverage / dead code | 4 |
| 🔵 Minor | 3 |
The mechanical move is clean and the test suite is solid for the happy path. The real problems are: (1) the stated goal (arrow out of core) isn't done — enrichment.rs keeps Arrow in production deps; (2) the ScanBuilder trait cannot safely represent both builders because begin_batch has different signatures; and (3) offset_of's debug_assert-only guard before append_view_unchecked is a release-build soundness hole. Those three need to be resolved before this is ready.
🧠 Learnings used
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-29T23:58:44.359Z
Learning: Consult library-specific reference documentation (`docs/references/arrow-v54.md`, `docs/references/datafusion-v45.md`, `docs/references/tokio-async-patterns.md`, `docs/references/opentelemetry-otlp.md`, `docs/references/notify-memchr-zstd.md`) before working on related code
✅ Actions performed
Full review triggered.
…p 1) Create logfwd-arrow as the Arrow integration layer. Move Arrow-dependent code out of logfwd-core to prepare for tightening core to no_std. What moved to logfwd-arrow: - streaming_builder.rs (StreamingBuilder, zero-copy StringViewArray) - storage_builder.rs (StorageBuilder, owned columns) - SimdScanner + StreamingSimdScanner structs - ScanBuilder impl blocks for both builders - All 21 inline scanner tests (they test Arrow output) What stays in logfwd-core (199 lines): - ScanBuilder trait (now pub) - scan_into() generic scan loop (now pub) - scan_line(), skip_ws() What was deleted from logfwd-core: - storage_builder.rs (moved, not duplicated) - streaming_builder.rs (moved, not duplicated) - Scanner struct definitions (moved) - ScanBuilder impl blocks (moved) - 300+ lines of dead code All imports across workspace updated. All 390+ tests pass. Clippy clean, fmt clean. Step 1 of the integrated zero-copy pipeline + proven core plan. Closes #264. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5997536 to
ffd5d51
Compare
Review feedback addressedFuzz Cargo.toml (criticals on scanner_sink.rs, streaming_scanner.rs, scanner_consistency.rs, scanner_transform.rs): Fixed — removed misplaced
Doc comments: Added to All tests pass, clippy clean, fmt clean. |
- add StorageBuilder usage doc example - share duplicate tracking helper and support idx >= 64 - document offset_of safety invariant - remove unsafe append_view_unchecked usage Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Addressed all currently unresolved review threads from this review with a focused follow-up commit.
Validation
Note 🔒 Integrity filtering filtered 1 itemIntegrity filtering activated and filtered the following item during workflow execution.
What is this? | From workflow: AI: Address PR Review Feedback Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs (1)
59-71:⚠️ Potential issue | 🔴 CriticalMissing error handling for
scanner.scan()— will fail to compile or panic.Line 60 assigns
scanner.scan(data)directly tobatch, butscan()returnsResult<RecordBatch, ArrowError>. Compare withscanner.rsfuzz target (lines 31-33) which correctly handles this:let Ok(batch) = scanner.scan(data) else { return; };// --- Full scanner pipeline (uses ChunkIndex internally) --- let mut scanner = SimdScanner::new(ScanConfig::default()); - let batch = scanner.scan(data); + let Ok(batch) = scanner.scan(data) else { + return; + }; let num_rows = batch.num_rows();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs` around lines 59 - 71, The call to scanner.scan(data) returns Result<RecordBatch, ArrowError> but is assigned directly to batch; change this to match the pattern used in the other fuzz target (handle the Err case by early-return). Specifically, update the SimdScanner usage: call scanner.scan(data) and use a let Ok(batch) = scanner.scan(data) else { return; } pattern (or equivalent match) so that scanner.scan’s Result is handled and batch is a RecordBatch before using batch.num_rows(), batch.schema(), and batch.column(...).crates/logfwd-core/src/scanner.rs (1)
16-26: 🛠️ Refactor suggestion | 🟠 MajorAdd doc comment for newly public
ScanBuildertrait.
ScanBuilderis now part of the public API. Document its purpose and contract for downstream implementors (e.g.,StorageBuilder,StreamingBuilderinlogfwd-arrow).+/// Trait for receiving parsed JSON fields from the scan loop. +/// +/// Implementors accumulate parsed data (strings, integers, floats, nulls) +/// into columnar storage. The scan loop calls methods in this order: +/// `begin_batch` → (`begin_row` → field appends → `end_row`)* per row. pub trait ScanBuilder {As per coding guidelines: "All public APIs must have doc comments."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 16 - 26, Add a doc comment to the newly public ScanBuilder trait describing its purpose (an abstraction for building rows/batches from scanned log input for downstream implementors like StorageBuilder and StreamingBuilder), the lifecycle contract (call sequence: begin_batch -> repeated begin_row, resolve_field, append_* or append_null_by_idx, end_row -> end when done), the semantics of resolve_field (returns an index for the given key; implementors must be stable for the batch and can add new fields), and the meanings of each method (append_str_by_idx/append_int_by_idx/append_float_by_idx append typed values for the given index, append_null_by_idx records a null, append_raw for raw line fallback); document expectations about error handling, ownership of byte slices (they are borrowed), and that implementors must handle repeated calls within a batch consistently.
♻️ Duplicate comments (6)
crates/logfwd-core/fuzz/Cargo.toml (1)
3-3:⚠️ Potential issue | 🟡 MinorRemove misplaced dependency from
[package]section.Line 3 declares
logfwd-arrowinside[package]}— Cargo ignores this. The correct declaration exists at line 14 under[dependencies].[package] name = "logfwd-core-fuzz" -logfwd-arrow = { path = "../../logfwd-arrow" } version = "0.0.0"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/Cargo.toml` at line 3, The Cargo.toml has a misplaced dependency declaration: remove the line declaring logfwd-arrow from the [package] section (the stray "logfwd-arrow = { path = \"../../logfwd-arrow\" }") so the dependency only exists under [dependencies] where the correct declaration is already present; ensure no other dependency-like entries remain in the [package] table and save the file.crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs (1)
15-15:⚠️ Potential issue | 🔴 CriticalNormalize both
scan()results before batch comparison.Lines [23] and [26] hold
Result<RecordBatch, ArrowError>, but Lines [30+] treat them as concreteRecordBatchvalues.Proposed fix
let mut storage_scanner = SimdScanner::new(ScanConfig::default()); let storage_batch = storage_scanner.scan(data); let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default()); let streaming_batch = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)); + + let (storage_batch, streaming_batch) = match (storage_batch, streaming_batch) { + (Ok(storage_batch), Ok(streaming_batch)) => (storage_batch, streaming_batch), + (Err(_), Err(_)) => return, + (Ok(_), Err(err)) | (Err(err), Ok(_)) => { + panic!("scanner result mismatch: {err}"); + } + };#!/bin/bash set -euo pipefail # Verify scan() signatures in arrow scanner. rg -n "impl SimdScanner|impl StreamingSimdScanner|pub fn scan\\(.*\\) -> Result<RecordBatch, ArrowError>" \ crates/logfwd-arrow/src/scanner.rs # Verify this fuzz target assumes bare RecordBatch after scan(). rg -n "storage_scanner\\.scan\\(|streaming_scanner\\.scan\\(|num_rows\\(|schema\\(|column_by_name\\(" \ crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rsAlso applies to: 23-33, 35-50
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs` at line 15, The test uses SimdScanner::scan and StreamingSimdScanner::scan which return Result<RecordBatch, ArrowError>, but later treats the return values as concrete RecordBatchs (calling num_rows, schema, column_by_name); update the fuzz target to normalize both scan() results into RecordBatch before comparison by matching or using expect/unwrap with a clear message (e.g., call storage_scanner.scan(...).expect("...") and streaming_scanner.scan(...).expect("...") or explicitly match and return on Err), then perform the existing equality/field checks on the extracted RecordBatchs (referencing SimdScanner::scan, StreamingSimdScanner::scan, and subsequent methods num_rows, schema, column_by_name).crates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rs (1)
21-21:⚠️ Potential issue | 🔴 CriticalUnwrap
scan()result before passing to transform.Line [31] stores
Result<RecordBatch, ArrowError>, but Line [36] uses it as aRecordBatch.Proposed fix
- let batch = scanner.scan(data); + let Ok(batch) = scanner.scan(data) else { + return; + };#!/bin/bash set -euo pipefail # Verify SimdScanner::scan return type. rg -n "pub fn scan\\(.*\\) -> Result<RecordBatch, ArrowError>" crates/logfwd-arrow/src/scanner.rs # Verify this fuzz target call chain. rg -n "let batch = scanner\\.scan\\(data\\)|transform\\.execute\\(batch\\)" \ crates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rsAlso applies to: 31-31, 36-36
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rs` at line 21, The variable `batch` currently holds a Result from `SimdScanner::scan` but is being passed directly to `transform.execute` which expects a `RecordBatch`; change the call site to unwrap or propagate the Result (e.g., replace `let batch = scanner.scan(data); transform.execute(batch)` with unwrapping/handling such as `let batch = scanner.scan(data).unwrap(); transform.execute(batch)` or propagate the error so `transform.execute` receives a `RecordBatch`; update the `batch` usage wherever referenced to use the unwrapped `RecordBatch` from `SimdScanner::scan`.crates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rs (1)
14-14:⚠️ Potential issue | 🔴 CriticalHandle
StreamingSimdScanner::scan()results before validation.Lines [39] and [59] produce
Result<RecordBatch, ArrowError>, but Lines [40] and [60] pass them as&RecordBatch.Proposed fix
- let batch = scanner.scan(bytes::Bytes::copy_from_slice(data)); - validate_batch(&batch, "streaming_extract_all"); + if let Ok(batch) = scanner.scan(bytes::Bytes::copy_from_slice(data)) { + validate_batch(&batch, "streaming_extract_all"); + } - let batch2 = scanner2.scan(bytes::Bytes::copy_from_slice(data)); - validate_batch(&batch2, "streaming_pushdown"); + if let Ok(batch2) = scanner2.scan(bytes::Bytes::copy_from_slice(data)) { + validate_batch(&batch2, "streaming_pushdown"); + }#!/bin/bash set -euo pipefail # Verify StreamingSimdScanner::scan return type. rg -n "pub fn scan\\(.*\\) -> Result<RecordBatch, ArrowError>" crates/logfwd-arrow/src/scanner.rs # Verify this target uses Result values as RecordBatch. rg -n "scanner\\.scan\\(|validate_batch\\(&batch|validate_batch\\(&batch2" \ crates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rsAlso applies to: 39-40, 59-60
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rs` at line 14, The StreamingSimdScanner::scan() calls return Result<RecordBatch, ArrowError> but the code currently passes their results directly to validate_batch as &RecordBatch; change the code to handle the Result first (e.g., match or if let Ok(batch) = scanner.scan(...) / if let Ok(batch2) = scanner.scan(...) ) and only call validate_batch(&batch) or validate_batch(&batch2) on the Ok variants (handle or early-return/log on Err variants), so that validate_batch receives a concrete &RecordBatch; locate uses by the symbols StreamingSimdScanner::scan, validate_batch, batch and batch2 and update those call sites accordingly.crates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rs (1)
15-15:⚠️ Potential issue | 🔴 CriticalHandle
SimdScanner::scan()result before usingbatch.Line [25] binds
Result<RecordBatch, ArrowError>, but Lines [35], [45], [59], and [62] usebatchas aRecordBatch.Proposed fix
- let batch = scanner.scan(data); + let Ok(batch) = scanner.scan(data) else { + return; + };#!/bin/bash set -euo pipefail # Verify SimdScanner::scan return type. rg -n "pub fn scan\\(.*\\) -> Result<RecordBatch, ArrowError>" crates/logfwd-arrow/src/scanner.rs # Verify this fuzz target treats scan() output as a bare RecordBatch. rg -n "let batch = scanner\\.scan\\(data\\)|serialize_batch\\(&batch\\)|encode_batch\\(&batch|batch\\.num_rows\\(" \ crates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rsAlso applies to: 25-25, 35-35, 45-45, 59-59, 62-62
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rs` at line 15, The code binds the result of SimdScanner::scan() to batch but never handles the Result<RecordBatch, ArrowError>, then treats batch as a RecordBatch in subsequent calls (e.g., serialize_batch(&batch), encode_batch(&batch), batch.num_rows()). Update the fuzz target to properly handle the Result returned by SimdScanner::scan: call scanner.scan(data) and either match on Ok(batch) / Err(err) or use the ? operator (or .unwrap_or_else/expect with a clear message) so that you only pass a real RecordBatch into serialize_batch, encode_batch and any batch.num_rows() calls; reference SimdScanner::scan and the local variable batch to locate the changes.crates/logfwd-arrow/src/scanner.rs (1)
60-63:⚠️ Potential issue | 🟠 Major
StreamingBuilderis not a valid publicScanBuilder.
logfwd_core::scan_into()only knows aboutbegin_batch()andappend_raw(), but this impl turns both callbacks into stubs and relies onStreamingSimdScannerto injectBytesout of band. Withscan_into()now public andStreamingBuilderre-exported,scan_into(..., &mut StreamingBuilder)compiles but cannot satisfy the advertised contract, andkeep_raw=truestill silently drops_raw. Either keepStreamingBuilderinternal for now, or make this impl honor the full callback contract end-to-end and add a streaming regression test forkeep_raw=true.Also applies to: 92-94
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-arrow/src/scanner.rs` around lines 60 - 63, The StreamingBuilder impl for ScanBuilder is incomplete: it stubs begin_batch() and append_raw() and relies on StreamingSimdScanner to supply Bytes out-of-band, which breaks the public contract used by logfwd_core::scan_into() (and silently drops _raw when keep_raw=true). Fix by either making StreamingBuilder non-public (hide/re-export privately) or implementing the ScanBuilder callbacks end-to-end: have StreamingBuilder::begin_batch(...) initialize any per-batch state and have StreamingBuilder::append_raw(...) accept and store the provided Bytes/_raw according to keep_raw logic, removing the out-of-band dependency on StreamingSimdScanner; also add a streaming regression test exercising scan_into(..., &mut StreamingBuilder) with keep_raw=true to ensure _raw is preserved. Reference symbols: StreamingBuilder, begin_batch, append_raw, StreamingSimdScanner, scan_into, keep_raw.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/chunk_index.rs`:
- Around line 59-71: The call to scanner.scan(data) returns Result<RecordBatch,
ArrowError> but is assigned directly to batch; change this to match the pattern
used in the other fuzz target (handle the Err case by early-return).
Specifically, update the SimdScanner usage: call scanner.scan(data) and use a
let Ok(batch) = scanner.scan(data) else { return; } pattern (or equivalent
match) so that scanner.scan’s Result is handled and batch is a RecordBatch
before using batch.num_rows(), batch.schema(), and batch.column(...).
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 16-26: Add a doc comment to the newly public ScanBuilder trait
describing its purpose (an abstraction for building rows/batches from scanned
log input for downstream implementors like StorageBuilder and StreamingBuilder),
the lifecycle contract (call sequence: begin_batch -> repeated begin_row,
resolve_field, append_* or append_null_by_idx, end_row -> end when done), the
semantics of resolve_field (returns an index for the given key; implementors
must be stable for the batch and can add new fields), and the meanings of each
method (append_str_by_idx/append_int_by_idx/append_float_by_idx append typed
values for the given index, append_null_by_idx records a null, append_raw for
raw line fallback); document expectations about error handling, ownership of
byte slices (they are borrowed), and that implementors must handle repeated
calls within a batch consistently.
---
Duplicate comments:
In `@crates/logfwd-arrow/src/scanner.rs`:
- Around line 60-63: The StreamingBuilder impl for ScanBuilder is incomplete: it
stubs begin_batch() and append_raw() and relies on StreamingSimdScanner to
supply Bytes out-of-band, which breaks the public contract used by
logfwd_core::scan_into() (and silently drops _raw when keep_raw=true). Fix by
either making StreamingBuilder non-public (hide/re-export privately) or
implementing the ScanBuilder callbacks end-to-end: have
StreamingBuilder::begin_batch(...) initialize any per-batch state and have
StreamingBuilder::append_raw(...) accept and store the provided Bytes/_raw
according to keep_raw logic, removing the out-of-band dependency on
StreamingSimdScanner; also add a streaming regression test exercising
scan_into(..., &mut StreamingBuilder) with keep_raw=true to ensure _raw is
preserved. Reference symbols: StreamingBuilder, begin_batch, append_raw,
StreamingSimdScanner, scan_into, keep_raw.
In `@crates/logfwd-core/fuzz/Cargo.toml`:
- Line 3: The Cargo.toml has a misplaced dependency declaration: remove the line
declaring logfwd-arrow from the [package] section (the stray "logfwd-arrow = {
path = \"../../logfwd-arrow\" }") so the dependency only exists under
[dependencies] where the correct declaration is already present; ensure no other
dependency-like entries remain in the [package] table and save the file.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs`:
- Line 15: The test uses SimdScanner::scan and StreamingSimdScanner::scan which
return Result<RecordBatch, ArrowError>, but later treats the return values as
concrete RecordBatchs (calling num_rows, schema, column_by_name); update the
fuzz target to normalize both scan() results into RecordBatch before comparison
by matching or using expect/unwrap with a clear message (e.g., call
storage_scanner.scan(...).expect("...") and
streaming_scanner.scan(...).expect("...") or explicitly match and return on
Err), then perform the existing equality/field checks on the extracted
RecordBatchs (referencing SimdScanner::scan, StreamingSimdScanner::scan, and
subsequent methods num_rows, schema, column_by_name).
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rs`:
- Line 15: The code binds the result of SimdScanner::scan() to batch but never
handles the Result<RecordBatch, ArrowError>, then treats batch as a RecordBatch
in subsequent calls (e.g., serialize_batch(&batch), encode_batch(&batch),
batch.num_rows()). Update the fuzz target to properly handle the Result returned
by SimdScanner::scan: call scanner.scan(data) and either match on Ok(batch) /
Err(err) or use the ? operator (or .unwrap_or_else/expect with a clear message)
so that you only pass a real RecordBatch into serialize_batch, encode_batch and
any batch.num_rows() calls; reference SimdScanner::scan and the local variable
batch to locate the changes.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rs`:
- Line 21: The variable `batch` currently holds a Result from
`SimdScanner::scan` but is being passed directly to `transform.execute` which
expects a `RecordBatch`; change the call site to unwrap or propagate the Result
(e.g., replace `let batch = scanner.scan(data); transform.execute(batch)` with
unwrapping/handling such as `let batch = scanner.scan(data).unwrap();
transform.execute(batch)` or propagate the error so `transform.execute` receives
a `RecordBatch`; update the `batch` usage wherever referenced to use the
unwrapped `RecordBatch` from `SimdScanner::scan`.
In `@crates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rs`:
- Line 14: The StreamingSimdScanner::scan() calls return Result<RecordBatch,
ArrowError> but the code currently passes their results directly to
validate_batch as &RecordBatch; change the code to handle the Result first
(e.g., match or if let Ok(batch) = scanner.scan(...) / if let Ok(batch2) =
scanner.scan(...) ) and only call validate_batch(&batch) or
validate_batch(&batch2) on the Ok variants (handle or early-return/log on Err
variants), so that validate_batch receives a concrete &RecordBatch; locate uses
by the symbols StreamingSimdScanner::scan, validate_batch, batch and batch2 and
update those call sites accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 99bfb9b3-a73f-4ef7-8c28-70f48eebb029
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/logfwd-arrow/Cargo.tomlcrates/logfwd-arrow/src/lib.rscrates/logfwd-arrow/src/scanner.rscrates/logfwd-arrow/src/storage_builder.rscrates/logfwd-arrow/src/streaming_builder.rscrates/logfwd-bench/Cargo.tomlcrates/logfwd-bench/benches/pipeline.rscrates/logfwd-core/Cargo.tomlcrates/logfwd-core/benches/scanner.rscrates/logfwd-core/examples/arrow_ipc_roundtrip.rscrates/logfwd-core/fuzz/Cargo.tomlcrates/logfwd-core/fuzz/fuzz_targets/chunk_index.rscrates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rscrates/logfwd-core/fuzz/fuzz_targets/streaming_scanner.rscrates/logfwd-core/src/lib.rscrates/logfwd-core/src/scanner.rscrates/logfwd-core/tests/compliance_data.rscrates/logfwd-core/tests/scanner_conformance.rscrates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/tests/scanner_datafusion_boundary.rscrates/logfwd/Cargo.tomlcrates/logfwd/src/pipeline.rscrates/logfwd/tests/integration.rs
💤 Files with no reviewable changes (1)
- crates/logfwd-core/src/lib.rs
1. Fix all 5 fuzz targets: properly handle Result from scan() calls.
Previously assigned Result directly to batch variable. Now uses
`let Ok(batch) = scanner.scan(data) else { return; };` pattern.
2. Add comprehensive doc comment to ScanBuilder trait: lifecycle
contract, method semantics, first-write-wins behavior, ownership.
Pre-existing issues tracked separately:
- StreamingBuilder begin_batch/append_raw: #308
- _raw column rethink: #308
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Step 1 of the integrated zero-copy pipeline + proven core plan (#262, #303). Creates the
logfwd-arrowcrate and moves Arrow-dependent code out oflogfwd-core.What moved
What stays in logfwd-core
ScanBuildertrait (nowpub, waspub(crate))scan_into()generic scan loop (nowpub)scan_line(),skip_ws()Why
Preparing logfwd-core for
#![no_std]+#![forbid(unsafe_code)]. The builders depend on Arrow types which require std. Moving them out lets core become the proven pure-logic crate.Test plan
cargo check— full workspace buildscargo test— 372+ tests pass, zero failurescargo clippy -- -D warnings— cleancargo fmt --check— cleanCloses #264.
🤖 Generated with Claude Code