Skip to content

feat: StreamingBuilder + StorageBuilder, zero-copy hot path#62

Merged
strawgate merged 1 commit into
masterfrom
arrow-ipc-output
Mar 29, 2026
Merged

feat: StreamingBuilder + StorageBuilder, zero-copy hot path#62
strawgate merged 1 commit into
masterfrom
arrow-ipc-output

Conversation

@strawgate
Copy link
Copy Markdown
Owner

@strawgate strawgate commented Mar 29, 2026

Summary

Two purpose-built Arrow builders sharing a generic SIMD scan loop. Replaces the old incremental null-padding approach with a deferred (row, value) collection pattern that is correct by construction.

StorageBuilder — collects (row, value) records during scanning. At finish_batch, bulk-builds each column independently: fill values at recorded positions, leave gaps as null. No cross-builder coordination, no column alignment bugs. For persistence: scan → build → compress → disk.

StreamingBuilder — zero-copy hot path. String values are 16-byte StringViewArray views pointing into the bytes::Bytes input buffer. No string copies. 20% faster scan. For real-time: scan → query → output → discard.

ScanBuilder trait — generic scan loop (scan_into) works with either builder via monomorphization. One scan implementation, two output modes.

What changed

  • Deleted columnar_builder.rs (superseded)
  • Deleted indexed_builder.rs (replaced by StorageBuilder)
  • storage_builder.rs: rewritten with deferred (row, value) pattern — eliminates the column length mismatch bug that occurred with varying field sets across NDJSON lines
  • streaming_builder.rs: zero-copy builder with bytes::Bytes ownership (no raw pointers)
  • simd_scanner.rs: ScanBuilder trait, generic scan_into/scan_line, SimdScanner + StreamingSimdScanner
  • chunk_classify.rs: fixed scalar fallback escape handling (escaped_next state across block boundaries)
  • scanner_conformance.rs: skip duplicate-key lines in oracle tests (first-writer-wins vs last-writer-wins is implementation-defined per RFC 8259)

Test plan

  • cargo test -p logfwd-core --lib — 133 unit tests
  • cargo test -p logfwd-core --test scanner_conformance — 64 oracle-verified tests
  • cargo test -p logfwd-core --test scanner_correctness — 16 correctness tests
  • PROPTEST_CASES=2000 — zero failures across all suites
  • cargo check — full workspace builds

🤖 Generated with Claude Code

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR restructures logfwd-core: adds scan_config, storage_builder, and streaming_builder modules; introduces SimdScanner and StreamingSimdScanner backed by a generic ScanBuilder-driven scan loop; removes batch_builder, columnar_builder, indexed_builder, and simd_scanner; tightens several module visibilities in lib.rs; updates chunk_classify string-escape handling; updates tests, benches, and fuzz targets to the new APIs; and adds the bytes = "1" dependency.

Possibly related PRs


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/logfwd-core/src/storage_builder.rs (1)

211-231: ⚠️ Potential issue | 🟡 Minor

Duplicate key detection disabled for fields ≥ 64.

When idx >= 64, bit is 0, so the duplicate check written_bits & bit != 0 always passes. First-writer-wins semantics silently don't apply to high-index fields. Consider documenting this limitation or extending tracking (e.g., a second bitset or using written_this_row for duplicate detection too).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 211 - 231, The
duplicate-key check in append_str_by_idx relies solely on written_bits so when
idx >= 64 bit==0 and duplicates slip through; modify append_str_by_idx to check
written_bits for idx<64 and otherwise consult fields[idx].written_this_row (or
set and check a per-field flag) before returning as a duplicate, then set
fields[idx].written_this_row = true when accepting the first write; update use
of written_bits and fields[idx].written_this_row consistently so
first-writer-wins works for all field indices (referencing append_str_by_idx,
written_bits, fields, and written_this_row).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/logfwd-core/examples/bench_builders.rs`:
- Around line 256-348: This code uses the non-public API types/methods
ColumnarBatchBuilder and finish_batch_compressed (and builder.finish_batch) for
per-column analysis and compression comparison; update the example to use the
new public builders or re-export these helpers: replace direct
ColumnarBatchBuilder usage with the new public builder type(s) (or add a pub
re-export) and switch calls to the new finish/serialize/compress methods
provided by the public API, ensuring the example references the new builder name
and the new compress/finish method(s) so it no longer depends on private
symbols.
- Around line 5-7: The example references
logfwd_core::columnar_builder::ColumnarBatchBuilder but lib.rs does not pub mod
columnar_builder, so the symbol is inaccessible; to fix either (A) re-export the
module by adding pub mod columnar_builder to the crate root (lib.rs) so
ColumnarBatchBuilder becomes accessible, or (B) change the example to use a
publicly exported builder such as StorageBuilder or StreamingBuilder (replace
ColumnarBatchBuilder usage in the bench_builders.rs example with
StorageBuilder/StreamingBuilder and adjust construction calls accordingly).

In `@crates/logfwd-core/examples/bench_e2e.rs`:
- Around line 125-128: The match arm handling the 'n' token advances the parser
pointer unconditionally with `$p += 4`, which can go past the end when input is
malformed; update that arm to clamp the advance against `$eol` (e.g. set `$p =
($p + 4).min($eol)` or equivalent) before calling
`$builder.append_null_by_idx($idx)`, ensuring you still append the null and
avoid out-of-bounds reads.
- Around line 214-233: The example uses the wrong StreamingBuilder API: replace
StreamingBuilder::new(rows) with StreamingBuilder::new(), remove
set_buffer(&buf) calls and instead pass the buffer to begin_batch as
begin_batch(buf_bytes.clone() or Bytes::from(buf)), and change
finish_batch(buf_bytes.clone()) to finish_batch() (use its returned batch);
update the warmup, timing loop (black_box the finish_batch() result), and final
batch construction to call begin_batch(...) with the bytes and then
finish_batch() to match the signatures in streaming_builder.rs.

In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 36-47: The parameter `capacity` on FieldColumns::new is unused;
either remove it from the signature or mark it as intentionally unused (e.g.,
rename to `_capacity`) so the compiler warning disappears. Update the
constructor signature `fn new(name: &[u8], capacity: usize)` and any callers to
match the chosen change, and keep the body building the FieldColumns struct
(ensure references to `name` and lazy builders like `str_builder`,
`int_builder`, `float_builder` remain unchanged).

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 93-103: The CI failure is due to the clippy lint
clippy::new-without-default; add an impl Default for StreamingBuilder that
returns the same initial state as StreamingBuilder::new (fields:
Vec::with_capacity(32), field_index: HashMap::with_capacity(32), row_count: 0,
written_bits: 0, field_mask: 0, buf: bytes::Bytes::new()) and optionally
refactor StreamingBuilder::new to call Default::default(); implement the Default
trait for the StreamingBuilder type to satisfy the lint.

---

Outside diff comments:
In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 211-231: The duplicate-key check in append_str_by_idx relies
solely on written_bits so when idx >= 64 bit==0 and duplicates slip through;
modify append_str_by_idx to check written_bits for idx<64 and otherwise consult
fields[idx].written_this_row (or set and check a per-field flag) before
returning as a duplicate, then set fields[idx].written_this_row = true when
accepting the first write; update use of written_bits and
fields[idx].written_this_row consistently so first-writer-wins works for all
field indices (referencing append_str_by_idx, written_bits, fields, and
written_this_row).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3806fae1-16e4-4496-9a6f-4fcf37500205

📥 Commits

Reviewing files that changed from the base of the PR and between 1547d60 and 182da51.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • crates/logfwd-core/Cargo.toml
  • crates/logfwd-core/examples/bench_builders.rs
  • crates/logfwd-core/examples/bench_e2e.rs
  • crates/logfwd-core/src/lib.rs
  • crates/logfwd-core/src/storage_builder.rs
  • crates/logfwd-core/src/streaming_builder.rs

Comment thread crates/logfwd-core/examples/bench_builders.rs Outdated
Comment thread crates/logfwd-core/examples/bench_builders.rs Outdated
Comment on lines +125 to +128
b'n' => {
$p += 4;
$builder.append_null_by_idx($idx);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential out-of-bounds: $p += 4 assumes well-formed null.

If 'n' appears near EOL in malformed input (e.g., "key":n\n), $p += 4 could exceed $eol. Consider clamping: $p = ($p + 4).min($eol);

Proposed fix
             b'n' => {
-                $p += 4;
+                $p = ($p + 4).min($eol);
                 $builder.append_null_by_idx($idx);
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
b'n' => {
$p += 4;
$builder.append_null_by_idx($idx);
}
b'n' => {
$p = ($p + 4).min($eol);
$builder.append_null_by_idx($idx);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/examples/bench_e2e.rs` around lines 125 - 128, The match
arm handling the 'n' token advances the parser pointer unconditionally with `$p
+= 4`, which can go past the end when input is malformed; update that arm to
clamp the advance against `$eol` (e.g. set `$p = ($p + 4).min($eol)` or
equivalent) before calling `$builder.append_null_by_idx($idx)`, ensuring you
still append the null and avoid out-of-bounds reads.

Comment on lines +214 to +233
let mut builder = StreamingBuilder::new(rows);
// Warm
builder.begin_batch();
builder.set_buffer(&buf);
scan_buffer!(&buf, &index, builder, dispatch_value);
let _ = builder.finish_batch(buf_bytes.clone());

let t = Instant::now();
for _ in 0..iters {
builder.begin_batch();
builder.set_buffer(&buf);
scan_buffer!(&buf, &index, builder, dispatch_value);
std::hint::black_box(builder.finish_batch(buf_bytes.clone()));
}
let scan_ms = t.elapsed().as_secs_f64() * 1000.0 / iters as f64;

builder.begin_batch();
builder.set_buffer(&buf);
scan_buffer!(&buf, &index, builder, dispatch_value);
let batch = builder.finish_batch(buf_bytes.clone());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

API mismatch: StreamingBuilder usage is inverted.

Pipeline failures confirm the example's API calls don't match streaming_builder.rs:

Example calls Actual API
new(rows) new()
begin_batch() begin_batch(buf: bytes::Bytes)
set_buffer(&buf) (doesn't exist)
finish_batch(buf_bytes) finish_batch(&self)
Proposed fix
-            let mut builder = StreamingBuilder::new(rows);
+            let mut builder = StreamingBuilder::new();
             // Warm
-            builder.begin_batch();
-            builder.set_buffer(&buf);
+            builder.begin_batch(buf_bytes.clone());
             scan_buffer!(&buf, &index, builder, dispatch_value);
-            let _ = builder.finish_batch(buf_bytes.clone());
+            let _ = builder.finish_batch();

             let t = Instant::now();
             for _ in 0..iters {
-                builder.begin_batch();
-                builder.set_buffer(&buf);
+                builder.begin_batch(buf_bytes.clone());
                 scan_buffer!(&buf, &index, builder, dispatch_value);
-                std::hint::black_box(builder.finish_batch(buf_bytes.clone()));
+                std::hint::black_box(builder.finish_batch());
             }
             let scan_ms = t.elapsed().as_secs_f64() * 1000.0 / iters as f64;

-            builder.begin_batch();
-            builder.set_buffer(&buf);
+            builder.begin_batch(buf_bytes.clone());
             scan_buffer!(&buf, &index, builder, dispatch_value);
-            let batch = builder.finish_batch(buf_bytes.clone());
+            let batch = builder.finish_batch();
🧰 Tools
🪛 GitHub Actions: CI

[error] 214-214: error[E0061]: StreamingBuilder::new takes 0 arguments but 1 was supplied (StreamingBuilder::new(rows)); unexpected argument of type usize.


[error] 216-216: error[E0061]: builder.begin_batch() takes 1 argument but 0 were supplied; missing argument bytes::Bytes.


[error] 217-217: error[E0599]: no method named set_buffer found for struct StreamingBuilder.


[error] 219-219: error[E0061]: builder.finish_batch takes 0 arguments but 1 was supplied (builder.finish_batch(buf_bytes.clone())); unexpected argument of type bytes::Bytes.


[error] 223-223: error[E0061]: builder.begin_batch() takes 1 argument but 0 were supplied; missing argument bytes::Bytes.


[error] 224-224: error[E0599]: no method named set_buffer found for struct StreamingBuilder.


[error] 226-226: error[E0061]: builder.finish_batch takes 0 arguments but 1 was supplied (builder.finish_batch(buf_bytes.clone())); unexpected argument of type bytes::Bytes.


[error] 230-230: error[E0061]: builder.begin_batch() takes 1 argument but 0 were supplied; missing argument bytes::Bytes.


[error] 231-231: error[E0599]: no method named set_buffer found for struct StreamingBuilder.


[error] 233-233: error[E0061]: builder.finish_batch takes 0 arguments but 1 was supplied (builder.finish_batch(buf_bytes.clone())); unexpected argument of type bytes::Bytes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/examples/bench_e2e.rs` around lines 214 - 233, The example
uses the wrong StreamingBuilder API: replace StreamingBuilder::new(rows) with
StreamingBuilder::new(), remove set_buffer(&buf) calls and instead pass the
buffer to begin_batch as begin_batch(buf_bytes.clone() or Bytes::from(buf)), and
change finish_batch(buf_bytes.clone()) to finish_batch() (use its returned
batch); update the warmup, timing loop (black_box the finish_batch() result),
and final batch construction to call begin_batch(...) with the bytes and then
finish_batch() to match the signatures in streaming_builder.rs.

Comment thread crates/logfwd-core/src/storage_builder.rs Outdated
Comment thread crates/logfwd-core/src/streaming_builder.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (2)
crates/logfwd-core/examples/bench_builders.rs (1)

6-7: ⚠️ Potential issue | 🔴 Critical

This example still depends on the removed ColumnarBatchBuilder path.

crates/logfwd-core/src/lib.rs no longer exports columnar_builder, so the benchmark and the later size-analysis block still target an API that is not publicly available. Migrate this example to the new public builders or restore the export.

Also applies to: 181-210, 260-327

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/examples/bench_builders.rs` around lines 6 - 7, The
example imports the removed symbol ColumnarBatchBuilder from logfwd_core; update
the example to use the new public builder API or restore the re-export in
lib.rs. Either replace uses of ColumnarBatchBuilder (and any
ColumnarBatchBuilder::... calls) with the new public builder types/methods
exported by crates/logfwd-core (adjust the use lines and subsequent code to the
new type names), or add a pub mod/export for columnar_builder in
crates/logfwd-core/src/lib.rs so ColumnarBatchBuilder is available; also update
any references to StorageBuilder if its path changed to match the current public
API. Ensure all occurrences in this file (and the noted ranges) are updated to
the chosen fix.
crates/logfwd-core/examples/bench_e2e.rs (1)

214-233: ⚠️ Potential issue | 🔴 Critical

This block still uses the pre-refactor StreamingBuilder API.

StreamingBuilder::new(rows), parameterless begin_batch(), set_buffer(...), and finish_batch(buf_bytes) no longer exist, so this example does not compile.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/examples/bench_e2e.rs` around lines 214 - 233, The example
uses the old StreamingBuilder API (StreamingBuilder::new(rows), begin_batch(),
set_buffer(...), finish_batch(buf_bytes)) which no longer exists; update the
example to the refactored API by creating the builder with the current
constructor and using the new batch methods — replace
StreamingBuilder::new(rows) with the refactored constructor (e.g.,
StreamingBuilder::new() or the new with_* constructor used across the crate),
remove separate set_buffer/parameterless begin_batch calls and instead call the
new begin/start batch method that accepts the row count and buffer (e.g.,
begin_batch(rows, &buf) or start_batch(&buf, rows) as defined in the refactor),
and call the new finish/complete method with no buf_bytes argument (or with the
new required args) to obtain the batch; adjust the warmup, timed loop, and final
batch sections to use those updated method names and signatures (refer to
StreamingBuilder, begin_batch/start_batch, and finish/complete in the refactor
to match exact names).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/logfwd-core/examples/bench_e2e.rs`:
- Around line 150-153: The benchmark currently generates a separate Vec<u8>
(buf) and passes its slices to ChunkIndex::new and scanning helpers, but
StreamingBuilder/StreamingSimdScanner expects views into a single bytes::Bytes
backing buffer; change the benchmark to convert the generated Vec<u8> into a
bytes::Bytes (e.g., let bytes = Bytes::from(buf)) and then construct
ChunkIndex::new(&bytes) and call scan_buffer! / other scan helpers using that
same bytes variable so zero-copy offsets are correct; apply the same change to
the second occurrence mentioned (lines ~213-219) so both benchmarks use the
identical bytes::Bytes backing buffer for indexing and scanning, and ensure any
begin_batch calls use that bytes value for StreamingBuilder.

In `@crates/logfwd-core/src/batch_builder.rs`:
- Line 11: Remove the unused import RecordBatchOptions from the top-level use
line so it doesn't trigger CI warnings; update the import statement that
currently reads use arrow::record_batch::{RecordBatch, RecordBatchOptions}; to
only import RecordBatch (used by finish_batch) and ensure no other references to
RecordBatchOptions remain in the module.

In `@crates/logfwd-core/src/simd_scanner.rs`:
- Around line 101-103: StreamingSimdScanner currently ignores
ScanConfig.keep_raw because append_raw in StreamingSimdScanner is a no-op;
update the scanner so keep_raw is not silently dropped by either (A) validating
the config in StreamingSimdScanner construction/scan entrypoint and returning an
error if ScanConfig.keep_raw is true, or (B) implementing append_raw to
buffer/emit the raw bytes consistently (matching non-streaming behavior) and
ensure scan_line routes raw data to that buffer; make the change for append_raw
and any other no-op raw handlers noted (lines around append_raw, the scan_line
call sites, and related methods) so behavior is consistent or explicitly
rejected.
- Around line 99-100: append_null_by_idx currently no-ops and thus doesn't mark
the field as written, which breaks duplicate suppression for streaming nulls;
modify append_null_by_idx to perform the same "mark field written" action used
elsewhere (for example call the existing mark_written_by_idx or set the written
bitmap/flag for the given index) so that nulls participate in duplicate
suppression even though no value buffer is materialized; apply the same change
to the other similar implementation region mentioned (the one at the later
occurrence).

In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 57-58: The current written_bits bitmap and
check_dup()/append_null_by_idx() logic fails for field indices >=64 and never
records null writes, breaking first-writer-wins for wide schemas; replace the
single u64 bitmap (written_bits) with per-field tracking (e.g., a Vec<bool> or
BitVec sized to columns) and update check_dup(), append_null_by_idx(), and any
other paths that mark writes (including string writes that push into str_values)
to set the per-field flag for both non-null and null branches so duplicate
detection correctly prevents later overwrites in finish_batch() and prevents
extra entries in str_values.
- Around line 165-166: The code currently calls std::str::from_utf8_unchecked in
finish_batch (and related places) on untrusted &[u8] coming from public APIs
(resolve_field, append_str_by_idx, append_raw), which is UB for non-UTF-8 bytes;
fix by removing from_utf8_unchecked and either (A) validate and convert at API
boundaries when the bytes are first accepted (e.g., in resolve_field /
append_str_by_idx / append_raw) so only valid UTF-8 Strings are stored, or (B)
replace the unchecked conversion in finish_batch with std::str::from_utf8 and
propagate/return an error when conversion fails (update return types to Result
where needed); update the error paths and tests accordingly so non-UTF-8 input
yields a recoverable error rather than UB.

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 83-87: The written_bits/field_mask approach breaks for field
indices >= 64 because computing bit = 1u64 << idx yields 0 and duplicates /
append_null_by_idx aren't recorded, which corrupts the vi cursor in
finish_batch(); fix by adding explicit support for wide indices—either extend
the bitmask (e.g., to a wider bitmap) or add a separate set/bitmap for indices
>= 64 and use it wherever written_bits/field_mask are consulted; update
append_null_by_idx, any write/append methods that currently compute bit from
idx, and finish_batch so they consult the new structure for idx >= 64 (and still
use written_bits for idx < 64) to ensure duplicates are detected and nulls are
marked correctly.
- Around line 139-164: append_str_by_idx currently relies on offset_of's
debug_assert and on_utf8 unchecked conversions in finish_batch, exposing unsafe
StringView invariants; fix by (1) replacing from_utf8_unchecked(...) in
finish_batch/resolve_field usage with a safe conversion (std::str::from_utf8 or
try_from) and propagate or handle the error, and (2) making offset validation
enforced at runtime by changing offset_of to return Result<u32, Error> or using
assert! (not debug_assert!) so append_str_by_idx (or append_view_unchecked) only
accepts validated offsets, or alternatively mark
append_str_by_idx/append_view_unchecked as unsafe and add clear doc comments
describing the required invariants (slice must be inside self.buf and UTF-8 must
be valid); update callers in finish_batch to either perform the runtime checks
or call the newly unsafe functions with documented safety comments.

In `@crates/logfwd-core/tests/scanner_conformance.rs`:
- Around line 41-65: The byte-scan that computes raw_key_count
(line.windows(2).filter(|w| w == b"\":").count()) incorrectly counts nested
keys; replace it with a small top-level key counter: iterate the bytes of line,
maintain a nesting depth (increment on '{'/'[' and decrement on '}'/']'), track
whether you are inside a string and escaped characters, and only count
occurrences of a closing-quote followed by ':' when depth == 1 (top-level) and
not inside a string; use the same variable raw_key_count so the later comparison
with obj.len() still works and keep the surrounding logic that skips the row
when obj.len() < raw_key_count.

---

Duplicate comments:
In `@crates/logfwd-core/examples/bench_builders.rs`:
- Around line 6-7: The example imports the removed symbol ColumnarBatchBuilder
from logfwd_core; update the example to use the new public builder API or
restore the re-export in lib.rs. Either replace uses of ColumnarBatchBuilder
(and any ColumnarBatchBuilder::... calls) with the new public builder
types/methods exported by crates/logfwd-core (adjust the use lines and
subsequent code to the new type names), or add a pub mod/export for
columnar_builder in crates/logfwd-core/src/lib.rs so ColumnarBatchBuilder is
available; also update any references to StorageBuilder if its path changed to
match the current public API. Ensure all occurrences in this file (and the noted
ranges) are updated to the chosen fix.

In `@crates/logfwd-core/examples/bench_e2e.rs`:
- Around line 214-233: The example uses the old StreamingBuilder API
(StreamingBuilder::new(rows), begin_batch(), set_buffer(...),
finish_batch(buf_bytes)) which no longer exists; update the example to the
refactored API by creating the builder with the current constructor and using
the new batch methods — replace StreamingBuilder::new(rows) with the refactored
constructor (e.g., StreamingBuilder::new() or the new with_* constructor used
across the crate), remove separate set_buffer/parameterless begin_batch calls
and instead call the new begin/start batch method that accepts the row count and
buffer (e.g., begin_batch(rows, &buf) or start_batch(&buf, rows) as defined in
the refactor), and call the new finish/complete method with no buf_bytes
argument (or with the new required args) to obtain the batch; adjust the warmup,
timed loop, and final batch sections to use those updated method names and
signatures (refer to StreamingBuilder, begin_batch/start_batch, and
finish/complete in the refactor to match exact names).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1fcab9c6-48c1-457e-9776-ac32938e07c9

📥 Commits

Reviewing files that changed from the base of the PR and between 182da51 and b4c1216.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • crates/logfwd-core/Cargo.toml
  • crates/logfwd-core/examples/bench_builders.rs
  • crates/logfwd-core/examples/bench_e2e.rs
  • crates/logfwd-core/src/batch_builder.rs
  • crates/logfwd-core/src/chunk_classify.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/lib.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/storage_builder.rs
  • crates/logfwd-core/src/streaming_builder.rs
  • crates/logfwd-core/tests/scanner_conformance.rs
💤 Files with no reviewable changes (2)
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/columnar_builder.rs

Comment on lines +150 to +153
let buf = gen_k8s(rows);
let mb = buf.len() as f64 / 1024.0 / 1024.0;
let iters = if rows <= 10_000 { 200 } else { 20 };
let index = ChunkIndex::new(&buf);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use one backing buffer for the view benchmark.

StreamingBuilder computes string views relative to the bytes::Bytes passed to begin_batch, but this benchmark still tokenizes &buf from a separate Vec<u8> allocation. After the signature fix, that means the streaming path is no longer operating on slices from its own backing buffer, so the zero-copy offsets can be wrong. Match StreamingSimdScanner::scan here and build ChunkIndex / scan_buffer! over the same bytes::Bytes.

Also applies to: 213-219

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/examples/bench_e2e.rs` around lines 150 - 153, The
benchmark currently generates a separate Vec<u8> (buf) and passes its slices to
ChunkIndex::new and scanning helpers, but StreamingBuilder/StreamingSimdScanner
expects views into a single bytes::Bytes backing buffer; change the benchmark to
convert the generated Vec<u8> into a bytes::Bytes (e.g., let bytes =
Bytes::from(buf)) and then construct ChunkIndex::new(&bytes) and call
scan_buffer! / other scan helpers using that same bytes variable so zero-copy
offsets are correct; apply the same change to the second occurrence mentioned
(lines ~213-219) so both benchmarks use the identical bytes::Bytes backing
buffer for indexing and scanning, and ensure any begin_batch calls use that
bytes value for StreamingBuilder.

Comment thread crates/logfwd-core/src/batch_builder.rs Outdated
Comment thread crates/logfwd-core/src/simd_scanner.rs Outdated
Comment on lines +99 to +100
fn append_null_by_idx(&mut self, _idx: usize) { /* nulls are gaps */
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Streaming nulls still need to mark the field as written.

With this no-op, {"a":null,"a":1} becomes 1 on the streaming path because the first key never records its write. append_null_by_idx needs to participate in duplicate suppression even if it does not materialize a value buffer.

Also applies to: 208-216

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/simd_scanner.rs` around lines 99 - 100,
append_null_by_idx currently no-ops and thus doesn't mark the field as written,
which breaks duplicate suppression for streaming nulls; modify
append_null_by_idx to perform the same "mark field written" action used
elsewhere (for example call the existing mark_written_by_idx or set the written
bitmap/flag for the given index) so that nulls participate in duplicate
suppression even though no value buffer is materialized; apply the same change
to the other similar implementation region mentioned (the one at the later
occurrence).

Comment thread crates/logfwd-core/src/simd_scanner.rs Outdated
Comment on lines +101 to +103
#[inline(always)]
fn append_raw(&mut self, _line: &[u8]) { /* not supported */
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

keep_raw is silently dropped by StreamingSimdScanner.

scan_line still calls append_raw whenever ScanConfig.keep_raw is true, but the streaming implementation discards it and StreamingSimdScanner exposes no guard against that config. Either reject keep_raw on this scanner or emit _raw consistently.

Also applies to: 138-140, 299-309

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/simd_scanner.rs` around lines 101 - 103,
StreamingSimdScanner currently ignores ScanConfig.keep_raw because append_raw in
StreamingSimdScanner is a no-op; update the scanner so keep_raw is not silently
dropped by either (A) validating the config in StreamingSimdScanner
construction/scan entrypoint and returning an error if ScanConfig.keep_raw is
true, or (B) implementing append_raw to buffer/emit the raw bytes consistently
(matching non-streaming behavior) and ensure scan_line routes raw data to that
buffer; make the change for append_raw and any other no-op raw handlers noted
(lines around append_raw, the scan_line call sites, and related methods) so
behavior is consistent or explicitly rejected.

Comment on lines +57 to +58
written_bits: u64,
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

written_bits still breaks first-writer-wins for nulls and field 64+.

check_dup() falls back to “not written” once idx >= 64, and append_null_by_idx() never marks the field as seen. That means wide-schema duplicates are still recorded, and a duplicate string write leaves an extra entry in str_values, so finish_batch() can null out every later row in that column. {k: null, k: 1} also incorrectly resolves to 1. Please switch to per-field tracking and mark the null path too.

Patch sketch
 pub struct StorageBuilder {
     fields: Vec<FieldCollector>,
     field_index: HashMap<Vec<u8>, usize>,
     raw_values: Vec<Vec<u8>>,
     row_count: u32,
     keep_raw: bool,
-    written_bits: u64,
+    written_this_row: Vec<bool>,
 }

 impl StorageBuilder {
     pub fn new(expected_rows: usize, keep_raw: bool) -> Self {
         let _ = expected_rows;
         StorageBuilder {
             fields: Vec::with_capacity(32),
             field_index: HashMap::with_capacity(32),
             raw_values: Vec::new(),
             row_count: 0,
             keep_raw,
-            written_bits: 0,
+            written_this_row: Vec::with_capacity(32),
         }
     }

     #[inline(always)]
     pub fn begin_row(&mut self) {
-        self.written_bits = 0;
+        self.written_this_row.fill(false);
     }

     #[inline]
     pub fn resolve_field(&mut self, key: &[u8]) -> usize {
         if let Some(&idx) = self.field_index.get(key) {
             return idx;
         }
         let idx = self.fields.len();
         self.fields.push(FieldCollector::new(key));
         self.field_index.insert(key.to_vec(), idx);
+        self.written_this_row.push(false);
         idx
     }

     #[inline(always)]
     fn check_dup(&mut self, idx: usize) -> bool {
-        if idx < 64 {
-            let bit = 1u64 << idx;
-            if self.written_bits & bit != 0 {
-                return true;
-            }
-            self.written_bits |= bit;
-            false
-        } else {
-            false
-        }
+        if self.written_this_row[idx] {
+            true
+        } else {
+            self.written_this_row[idx] = true;
+            false
+        }
     }

     #[inline(always)]
-    pub fn append_null_by_idx(&mut self, _idx: usize) {}
+    pub fn append_null_by_idx(&mut self, idx: usize) {
+        let _ = self.check_dup(idx);
+    }
 }

Also applies to: 81-84, 91-99, 102-114, 150-151

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 57 - 58, The current
written_bits bitmap and check_dup()/append_null_by_idx() logic fails for field
indices >=64 and never records null writes, breaking first-writer-wins for wide
schemas; replace the single u64 bitmap (written_bits) with per-field tracking
(e.g., a Vec<bool> or BitVec sized to columns) and update check_dup(),
append_null_by_idx(), and any other paths that mark writes (including string
writes that push into str_values) to set the per-field flag for both non-null
and null branches so duplicate detection correctly prevents later overwrites in
finish_batch() and prevents extra entries in str_values.

Comment on lines +165 to +166
for fc in &self.fields {
let name = unsafe { std::str::from_utf8_unchecked(&fc.name) };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Rust std::str::from_utf8_unchecked invalid UTF-8 undefined behavior official documentation

💡 Result:

Yes, passing invalid UTF-8 bytes to std::str::from_utf8_unchecked results in undefined behavior, as stated in the official Rust documentation. The current documentation (https://doc.rust-lang.org/std/str/fn.from_utf8_unchecked.html) includes a safety section: "The bytes passed in must be valid UTF-8." Older versions explicitly state: "This function is unsafe because it does not check that the bytes passed to it are valid UTF-8. If this constraint is violated, undefined behavior results, as the rest of Rust assumes that &str s are valid UTF-8." (e.g., 1.55 docs). This confirms that violating the invariant leads to UB because Rust's string operations assume &str contains valid UTF-8.

Citations:


🏁 Script executed:

# First, let's examine the storage_builder.rs file to understand the context
wc -l crates/logfwd-core/src/storage_builder.rs

Repository: strawgate/memagent

Length of output: 107


🏁 Script executed:

# Look at the specific lines mentioned (165-166 and 204-225)
sed -n '160,230p' crates/logfwd-core/src/storage_builder.rs | cat -n

Repository: strawgate/memagent

Length of output: 3665


🏁 Script executed:

# Search for the public API functions mentioned: resolve_field(), append_str_by_idx(), append_raw()
rg "pub fn (resolve_field|append_str_by_idx|append_raw)" crates/logfwd-core/src/storage_builder.rs -A 5

Repository: strawgate/memagent

Length of output: 644


🏁 Script executed:

# Check for finish_batch function and its implementation
rg "fn finish_batch" crates/logfwd-core/src/storage_builder.rs -A 30

Repository: strawgate/memagent

Length of output: 1425


🏁 Script executed:

# Check FieldCollector struct and its constructor to see if validation happens there
rg "struct FieldCollector" crates/logfwd-core/src/storage_builder.rs -A 20

Repository: strawgate/memagent

Length of output: 630


🏁 Script executed:

# Also check if there's any validation in field.push() or elsewhere
rg "FieldCollector::new" crates/logfwd-core/src/storage_builder.rs -B 2 -A 10

Repository: strawgate/memagent

Length of output: 435


Remove from_utf8_unchecked() calls or validate UTF-8 at API boundaries.

This is a critical soundness violation. The public API functions resolve_field(), append_str_by_idx(), and append_raw() accept arbitrary &[u8] with no documented UTF-8 requirement. However, finish_batch() (a safe function) passes this untrusted data directly to from_utf8_unchecked() at lines 165-166, 204-225, and elsewhere, which is undefined behavior if the bytes are not valid UTF-8. Non-UTF-8 input triggers UB instead of a recoverable error.

Either validate UTF-8 at API boundaries (when storing the bytes), document the UTF-8 requirement clearly in all public functions, or use fallible conversion (from_utf8()) with error handling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 165 - 166, The code
currently calls std::str::from_utf8_unchecked in finish_batch (and related
places) on untrusted &[u8] coming from public APIs (resolve_field,
append_str_by_idx, append_raw), which is UB for non-UTF-8 bytes; fix by removing
from_utf8_unchecked and either (A) validate and convert at API boundaries when
the bytes are first accepted (e.g., in resolve_field / append_str_by_idx /
append_raw) so only valid UTF-8 Strings are stored, or (B) replace the unchecked
conversion in finish_batch with std::str::from_utf8 and propagate/return an
error when conversion fails (update return types to Result where needed); update
the error paths and tests accordingly so non-UTF-8 input yields a recoverable
error rather than UB.

Comment on lines +83 to +87
/// Tracks which fields (by index) were written in the current row.
/// Supports up to 64 fields for O(1) duplicate detection;
/// fields 64+ work correctly but without duplicate-key protection.
written_bits: u64,
field_mask: u64,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

The “64+ fields still work” invariant is false for strings.

For idx >= 64, bit becomes 0, so duplicates are never filtered, and append_null_by_idx() also leaves the field unmarked. A duplicate wide string key inserts a second (row, offset, len) for the same row; then the vi cursor in finish_batch() gets stuck on that stale row and every later row in that column becomes null. The null-first duplicate case is wrong even under 64 fields.

Patch sketch
 pub struct StreamingBuilder {
     fields: Vec<FieldColumns>,
     field_index: HashMap<Vec<u8>, usize>,
     row_count: u32,
-    written_bits: u64,
-    field_mask: u64,
+    written_this_row: Vec<bool>,
     buf: bytes::Bytes,
 }

 impl StreamingBuilder {
     pub fn new() -> Self {
         StreamingBuilder {
             fields: Vec::with_capacity(32),
             field_index: HashMap::with_capacity(32),
             row_count: 0,
-            written_bits: 0,
-            field_mask: 0,
+            written_this_row: Vec::with_capacity(32),
             buf: bytes::Bytes::new(),
         }
     }

     #[inline(always)]
     pub fn begin_row(&mut self) {
-        self.written_bits = 0;
+        self.written_this_row.fill(false);
     }

     #[inline]
     pub fn resolve_field(&mut self, key: &[u8]) -> usize {
         if let Some(&idx) = self.field_index.get(key) {
             return idx;
         }
         let idx = self.fields.len();
         self.fields.push(FieldColumns::new(key));
         self.field_index.insert(key.to_vec(), idx);
-        if idx < 64 {
-            self.field_mask |= 1u64 << idx;
-        }
+        self.written_this_row.push(false);
         idx
     }

     #[inline(always)]
     pub fn append_str_by_idx(&mut self, idx: usize, value: &[u8]) {
-        let bit = if idx < 64 { 1u64 << idx } else { 0 };
-        if self.written_bits & bit != 0 {
+        if std::mem::replace(&mut self.written_this_row[idx], true) {
             return;
         }
-        self.written_bits |= bit;
         let offset = self.offset_of(value);
         let fc = &mut self.fields[idx];
         fc.has_str = true;
         fc.str_views.push((self.row_count, offset, value.len() as u32));
     }

     #[inline(always)]
     pub fn append_int_by_idx(&mut self, idx: usize, value: &[u8]) {
-        let bit = if idx < 64 { 1u64 << idx } else { 0 };
-        if self.written_bits & bit != 0 {
+        if std::mem::replace(&mut self.written_this_row[idx], true) {
             return;
         }
-        self.written_bits |= bit;
         let fc = &mut self.fields[idx];
         fc.has_int = true;
         if let Some(v) = parse_int_fast(value) {
             fc.int_values.push((self.row_count, v));
         }
     }

     #[inline(always)]
     pub fn append_float_by_idx(&mut self, idx: usize, value: &[u8]) {
-        let bit = if idx < 64 { 1u64 << idx } else { 0 };
-        if self.written_bits & bit != 0 {
+        if std::mem::replace(&mut self.written_this_row[idx], true) {
             return;
         }
-        self.written_bits |= bit;
         let fc = &mut self.fields[idx];
         fc.has_float = true;
         if let Some(v) = parse_float_fast(value) {
             fc.float_values.push((self.row_count, v));
         }
     }

     #[inline(always)]
-    pub fn append_null_by_idx(&mut self, _idx: usize) {
-        // Nulls are represented by gaps — rows without a value record.
+    pub fn append_null_by_idx(&mut self, idx: usize) {
+        self.written_this_row[idx] = true;
     }
 }

Also applies to: 125-137, 152-197, 250-263

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 83 - 87, The
written_bits/field_mask approach breaks for field indices >= 64 because
computing bit = 1u64 << idx yields 0 and duplicates / append_null_by_idx aren't
recorded, which corrupts the vi cursor in finish_batch(); fix by adding explicit
support for wide indices—either extend the bitmask (e.g., to a wider bitmap) or
add a separate set/bitmap for indices >= 64 and use it wherever
written_bits/field_mask are consulted; update append_null_by_idx, any
write/append methods that currently compute bit from idx, and finish_batch so
they consult the new structure for idx >= 64 (and still use written_bits for idx
< 64) to ensure duplicates are detected and nulls are marked correctly.

Comment on lines +139 to +164
/// Compute offset of a value slice within the buffer.
/// Safe: uses pointer arithmetic without `offset_from` UB preconditions.
#[inline(always)]
fn offset_of(&self, value: &[u8]) -> u32 {
let base = self.buf.as_ptr() as usize;
let ptr = value.as_ptr() as usize;
debug_assert!(
ptr >= base && ptr + value.len() <= base + self.buf.len(),
"value must be within buffer bounds"
);
(ptr - base) as u32
}

#[inline(always)]
pub fn append_str_by_idx(&mut self, idx: usize, value: &[u8]) {
let bit = if idx < 64 { 1u64 << idx } else { 0 };
if self.written_bits & bit != 0 {
return;
}
self.written_bits |= bit;
let offset = self.offset_of(value);
let fc = &mut self.fields[idx];
fc.has_str = true;
fc.str_views
.push((self.row_count, offset, value.len() as u32));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

What are the safety requirements of arrow-rs StringViewBuilder::append_view_unchecked and Rust std::str::from_utf8_unchecked?

💡 Result:

The safety requirements for the two functions are as follows: arrow-rs StringViewBuilder::append_view_unchecked (a method on GenericByteViewBuilder): This unsafe function appends a view into a data block without bounds or validity checks. Callers must ensure: 1. The block ID has been added using GenericByteViewBuilder::append_block. 2. The range offset..offset+len is entirely within the block's bounds. 3. The data in that range is valid for the array type T (for StringViewBuilder, valid UTF-8). Violating these may lead to invalid memory access, incorrect array construction, or memory unsafety. Rust std::str::from_utf8_unchecked: This unsafe function converts &[u8] to &str without validating UTF-8. Callers must ensure the input bytes form valid UTF-8. Violating this causes undefined behavior, as Rust assumes &str contains valid UTF-8 (e.g., string methods may crash, misbehave, or worse).

Citations:


🏁 Script executed:

wc -l crates/logfwd-core/src/streaming_builder.rs

Repository: strawgate/memagent

Length of output: 109


🏁 Script executed:

cat -n crates/logfwd-core/src/streaming_builder.rs | sed -n '139,165p'

Repository: strawgate/memagent

Length of output: 1193


🏁 Script executed:

cat -n crates/logfwd-core/src/streaming_builder.rs | sed -n '214,260p'

Repository: strawgate/memagent

Length of output: 2577


🏁 Script executed:

rg "append_view_unchecked|from_utf8_unchecked" crates/logfwd-core/src/streaming_builder.rs -n -A 2

Repository: strawgate/memagent

Length of output: 330


🏁 Script executed:

rg "fc\.name\s*=" crates/logfwd-core/src/streaming_builder.rs -B 2 -A 2

Repository: strawgate/memagent

Length of output: 44


🏁 Script executed:

grep -n "\.name" crates/logfwd-core/src/streaming_builder.rs | head -20

Repository: strawgate/memagent

Length of output: 140


🏁 Script executed:

cat -n crates/logfwd-core/src/streaming_builder.rs | sed -n '1,100p'

Repository: strawgate/memagent

Length of output: 4229


🏁 Script executed:

cat -n crates/logfwd-core/src/streaming_builder.rs | sed -n '100,150p'

Repository: strawgate/memagent

Length of output: 1889


🏁 Script executed:

rg "append_str_by_idx|FieldColumns::new" crates/logfwd-core/src/streaming_builder.rs -B 2 -A 2

Repository: strawgate/memagent

Length of output: 1620


🏁 Script executed:

cat -n crates/logfwd-core/src/streaming_builder.rs | sed -n '150,220p'

Repository: strawgate/memagent

Length of output: 2984


append_str_by_idx() exposes StringView unsafe preconditions through a safe API.

Two issues in finish_batch():

  1. Line 215: Field names from resolve_field() are converted via from_utf8_unchecked() without UTF-8 validation—user input may contain invalid UTF-8.

  2. Line 257: Offsets passed to append_view_unchecked() rely on validation that exists only via debug_assert! in offset_of(). In release builds, invalid offsets (from slices outside the buffer) bypass validation entirely.

Both functions require their safety invariants to be upheld at runtime. Use runtime checks (assert! or Result) or mark the API unsafe with explicit invariant documentation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 139 - 164,
append_str_by_idx currently relies on offset_of's debug_assert and on_utf8
unchecked conversions in finish_batch, exposing unsafe StringView invariants;
fix by (1) replacing from_utf8_unchecked(...) in finish_batch/resolve_field
usage with a safe conversion (std::str::from_utf8 or try_from) and propagate or
handle the error, and (2) making offset validation enforced at runtime by
changing offset_of to return Result<u32, Error> or using assert! (not
debug_assert!) so append_str_by_idx (or append_view_unchecked) only accepts
validated offsets, or alternatively mark append_str_by_idx/append_view_unchecked
as unsafe and add clear doc comments describing the required invariants (slice
must be inside self.buf and UTF-8 must be valid); update callers in finish_batch
to either perform the runtime checks or call the newly unsafe functions with
documented safety comments.

Comment thread crates/logfwd-core/tests/scanner_conformance.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (6)
crates/logfwd-core/src/simd_scanner.rs (1)

101-103: ⚠️ Potential issue | 🟠 Major

keep_raw is still silently dropped on the streaming path.

scan_line() appends raw bytes whenever the config asks for them, but the streaming adapter no-ops and StreamingSimdScanner still accepts that config. Either reject keep_raw up front or materialize _raw so storage and streaming scanners don't produce different schemas for the same request.

Also applies to: 137-140, 308-319

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/simd_scanner.rs` around lines 101 - 103, The streaming
path silently drops keep_raw because append_raw in simd_scanner.rs is a no-op
while scan_line honors keep_raw, causing divergent schemas; fix by either
validating/rejecting keep_raw when constructing/accepting the config in
StreamingSimdScanner (so it fails fast) or implement append_raw to materialize
and store the _raw bytes in the same field used by scan_line (ensure the same
storage name and schema are used), and update any constructor/config-check code
that currently allows keep_raw to be accepted without backing storage.
crates/logfwd-core/src/streaming_builder.rs (3)

92-101: ⚠️ Potential issue | 🔴 Critical

Add Default for StreamingBuilder to unblock CI.

Clippy is already failing new_without_default on this type.

Minimal fix
+impl Default for StreamingBuilder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl StreamingBuilder {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 92 - 101, Add a
Default impl for StreamingBuilder and have new delegate to it: implement impl
Default for StreamingBuilder { fn default() -> Self { /* same initialization as
current new: fields: Vec::with_capacity(32), field_index:
HashMap::with_capacity(32), row_count: 0, written_bits: 0, buf:
bytes::Bytes::new() */ } } then change pub fn new() -> Self { Self::default() }
so Clippy's new_without_default warning is resolved; reference StreamingBuilder,
new, and Default in your change.

138-145: ⚠️ Potential issue | 🔴 Critical

Enforce the StringView safety invariants in this safe API.

offset_of() only debug_assert!s that the slice belongs to self.buf and then truncates to u32, while finish_batch() uses both from_utf8_unchecked() and append_view_unchecked(). A caller can pass an out-of-buffer slice, a >4 GiB offset, or invalid key bytes and trigger UB in release. Either validate these conditions at runtime or make the append path unsafe with explicit preconditions.

Also applies to: 210-255

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 138 - 145, The safe
API violates StringView invariants: ensure offset_of() and the public
append/finish paths enforce at runtime that the provided slice points inside
self.buf, that (ptr - base) fits in u32, and that any bytes interpreted as str
are valid UTF-8 before calling from_utf8_unchecked() or append_view_unchecked();
specifically, in offset_of() (and in finish_batch()/append path) add explicit
checks that ptr >= base && ptr + value.len() <= base + self.buf.len(), that (ptr
- base) <= u32::MAX, and validate UTF-8 for key/value bytes (or else mark the
append/finish APIs unsafe and document these preconditions) so release builds
cannot produce UB from out-of-buffer slices, >4GiB offsets, or invalid UTF-8.

83-89: ⚠️ Potential issue | 🔴 Critical

Per-row write tracking is incomplete after field 63 and for explicit nulls.

idx >= 64 maps to bit 0, so duplicate suppression disappears for wide objects, and Line 191 makes null invisible to first-writer-wins. For string columns, a duplicate wide-key write can leave two entries for the same row in str_views, which then keeps vi pinned and turns later rows in that column into nulls. The empty branch at Line 131 is also the dead stub clippy is flagging.

Also applies to: 123-133, 148-193

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 83 - 89, The
per-row duplicate-tracking must be extended beyond the 64-bit written_bits and
explicit-null writes must mark a field as written; add an overflow tracker
(e.g., a small HashSet<u32>/Vec<u8> named written_overflow on the same per-row
state as written_bits) and change checks that currently do "if idx < 64 { check
written_bits } else { /* empty */ }" to consult written_overflow for idx >= 64;
when any writer writes an explicit null call the same mark-as-written path (set
the bit or insert into written_overflow) so first-writer-wins respects nulls;
update code paths that push into str_views/vi to avoid appending a second entry
when a duplicate is detected via written_overflow, and remove the dead empty
branch referenced at Line 131. Ensure finish_batch and any logic that reads
written_bits also consults written_overflow to compute final written state.
crates/logfwd-core/src/storage_builder.rs (2)

172-174: ⚠️ Potential issue | 🔴 Critical

from_utf8_unchecked is unsafe here without enforced UTF-8 boundaries.

Lines 174/218/233 use unchecked conversion on bytes accepted from safe public APIs. If invalid UTF-8 enters, this is undefined behavior. Replace with checked conversion (or validate at ingestion).

Safer direction
- let name = unsafe { std::str::from_utf8_unchecked(&fc.name) };
+ let name = std::str::from_utf8(&fc.name)
+     .expect("storage_builder: invalid UTF-8 field name");

- let s = unsafe { std::str::from_utf8_unchecked(&fc.str_values[vi].1) };
- builder.append_value(s);
+ match std::str::from_utf8(&fc.str_values[vi].1) {
+     Ok(s) => builder.append_value(s),
+     Err(_) => builder.append_null(),
+ }

- builder.append_value(unsafe { std::str::from_utf8_unchecked(val) });
+ if let Ok(s) = std::str::from_utf8(val) {
+     builder.append_value(s);
+ } else {
+     builder.append_null();
+ }
#!/bin/bash
set -euo pipefail

FILE="$(fd -p storage_builder.rs | head -n1)"

echo "== unchecked UTF-8 callsites =="
rg -n 'from_utf8_unchecked' "$FILE"

echo
echo "== public byte-accepting APIs in this file =="
rg -n 'pub fn (resolve_field|append_str_by_idx|append_raw)\(' "$FILE" -A3 -B1

echo
echo "== checked UTF-8 conversions present =="
rg -n 'std::str::from_utf8\(|String::from_utf8' "$FILE" || true

Also applies to: 217-219, 232-234

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 172 - 174, The code
currently uses unsafe std::str::from_utf8_unchecked on bytes (e.g. &fc.name
inside the loop over self.fields and the other callsites around lines 217/232),
which must be replaced with checked conversion; change those calls to
std::str::from_utf8(...) and handle the Result (propagate an error or
validate/normalize at ingestion), and update callers such as resolve_field,
append_str_by_idx, and append_raw to accept and handle the possible UTF-8 error
(or ensure bytes are validated earlier), so no unchecked conversions remain.

64-65: ⚠️ Potential issue | 🔴 Critical

First-writer-wins is still broken for nulls and fields at/over 64.

Line 119 falls back to “not duplicate” for idx >= 64, and Line 158 never marks null writes. This still allows same-row duplicates and can desync str_values during finish_batch.

Patch sketch
 pub struct StorageBuilder {
     fields: Vec<FieldCollector>,
     field_index: HashMap<Vec<u8>, usize>,
     raw_values: Vec<Vec<u8>>,
     row_count: u32,
     keep_raw: bool,
-    written_bits: u64,
+    written_this_row: Vec<bool>,
 }

 impl StorageBuilder {
     pub fn new(expected_rows: usize, keep_raw: bool) -> Self {
         let _ = expected_rows; // reserved for future capacity hints
         StorageBuilder {
             fields: Vec::with_capacity(32),
             field_index: HashMap::with_capacity(32),
             raw_values: Vec::new(),
             row_count: 0,
             keep_raw,
-            written_bits: 0,
+            written_this_row: Vec::with_capacity(32),
         }
     }

     #[inline(always)]
     pub fn begin_row(&mut self) {
-        self.written_bits = 0;
+        self.written_this_row.fill(false);
     }

     #[inline]
     pub fn resolve_field(&mut self, key: &[u8]) -> usize {
         if let Some(&idx) = self.field_index.get(key) {
             return idx;
         }
         let idx = self.fields.len();
         self.fields.push(FieldCollector::new(key));
         self.field_index.insert(key.to_vec(), idx);
+        self.written_this_row.push(false);
         idx
     }

     #[inline(always)]
     fn check_dup(&mut self, idx: usize) -> bool {
-        if idx < 64 {
-            let bit = 1u64 << idx;
-            if self.written_bits & bit != 0 {
-                return true;
-            }
-            self.written_bits |= bit;
-            false
-        } else {
-            false
-        }
+        if self.written_this_row[idx] {
+            true
+        } else {
+            self.written_this_row[idx] = true;
+            false
+        }
     }

     #[inline(always)]
-    pub fn append_null_by_idx(&mut self, _idx: usize) {}
+    pub fn append_null_by_idx(&mut self, idx: usize) {
+        let _ = self.check_dup(idx);
+    }
 }

Also applies to: 90-91, 109-121, 157-158

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 64 - 65, The
first-writer-wins bitset (written_bits) only covers indices <64 and never
records null writes, allowing duplicate writes and desync of str_values in
finish_batch; fix by expanding the bitset to cover all field indices
(replace/uplift written_bits: u64 into a dynamically-sized bitmap like Vec<u64>
sized by num_fields/64) and update all bit checks/writes (the
duplicate-detection logic and any idx >= 64 branches) to compute word = idx / 64
and bit = idx % 64, using that bitmap for is-duplicate and mark-written
operations, and ensure null-value writes also set the bit so finish_batch will
remain in sync with str_values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/logfwd-core/tests/scanner_conformance.rs`:
- Around line 311-316: The parity tests always use ScanConfig::default(), so
they never exercise config-dependent regressions; modify the test to add
explicit regression cases that instantiate scanners with non-default configs
(e.g., create SimdScanner::new(ScanConfig { keep_raw: true, ..Default::default()
}, 64) and StreamingSimdScanner::new(ScanConfig { keep_raw: true,
..Default::default() }), and also run both scanners on inputs that trigger
first-writer-wins behavior like b"{\"a\":null,\"a\":1}" to assert matching
outputs), and add assertions comparing storage.scan(...) and streaming.scan(...)
results for those fixed inputs to ensure the keep_raw=true and duplicate-key
cases are exercised (refer to SimdScanner::new, StreamingSimdScanner::new,
ScanConfig::default, storage.scan, streaming.scan).

---

Duplicate comments:
In `@crates/logfwd-core/src/simd_scanner.rs`:
- Around line 101-103: The streaming path silently drops keep_raw because
append_raw in simd_scanner.rs is a no-op while scan_line honors keep_raw,
causing divergent schemas; fix by either validating/rejecting keep_raw when
constructing/accepting the config in StreamingSimdScanner (so it fails fast) or
implement append_raw to materialize and store the _raw bytes in the same field
used by scan_line (ensure the same storage name and schema are used), and update
any constructor/config-check code that currently allows keep_raw to be accepted
without backing storage.

In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 172-174: The code currently uses unsafe
std::str::from_utf8_unchecked on bytes (e.g. &fc.name inside the loop over
self.fields and the other callsites around lines 217/232), which must be
replaced with checked conversion; change those calls to std::str::from_utf8(...)
and handle the Result (propagate an error or validate/normalize at ingestion),
and update callers such as resolve_field, append_str_by_idx, and append_raw to
accept and handle the possible UTF-8 error (or ensure bytes are validated
earlier), so no unchecked conversions remain.
- Around line 64-65: The first-writer-wins bitset (written_bits) only covers
indices <64 and never records null writes, allowing duplicate writes and desync
of str_values in finish_batch; fix by expanding the bitset to cover all field
indices (replace/uplift written_bits: u64 into a dynamically-sized bitmap like
Vec<u64> sized by num_fields/64) and update all bit checks/writes (the
duplicate-detection logic and any idx >= 64 branches) to compute word = idx / 64
and bit = idx % 64, using that bitmap for is-duplicate and mark-written
operations, and ensure null-value writes also set the bit so finish_batch will
remain in sync with str_values.

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 92-101: Add a Default impl for StreamingBuilder and have new
delegate to it: implement impl Default for StreamingBuilder { fn default() ->
Self { /* same initialization as current new: fields: Vec::with_capacity(32),
field_index: HashMap::with_capacity(32), row_count: 0, written_bits: 0, buf:
bytes::Bytes::new() */ } } then change pub fn new() -> Self { Self::default() }
so Clippy's new_without_default warning is resolved; reference StreamingBuilder,
new, and Default in your change.
- Around line 138-145: The safe API violates StringView invariants: ensure
offset_of() and the public append/finish paths enforce at runtime that the
provided slice points inside self.buf, that (ptr - base) fits in u32, and that
any bytes interpreted as str are valid UTF-8 before calling
from_utf8_unchecked() or append_view_unchecked(); specifically, in offset_of()
(and in finish_batch()/append path) add explicit checks that ptr >= base && ptr
+ value.len() <= base + self.buf.len(), that (ptr - base) <= u32::MAX, and
validate UTF-8 for key/value bytes (or else mark the append/finish APIs unsafe
and document these preconditions) so release builds cannot produce UB from
out-of-buffer slices, >4GiB offsets, or invalid UTF-8.
- Around line 83-89: The per-row duplicate-tracking must be extended beyond the
64-bit written_bits and explicit-null writes must mark a field as written; add
an overflow tracker (e.g., a small HashSet<u32>/Vec<u8> named written_overflow
on the same per-row state as written_bits) and change checks that currently do
"if idx < 64 { check written_bits } else { /* empty */ }" to consult
written_overflow for idx >= 64; when any writer writes an explicit null call the
same mark-as-written path (set the bit or insert into written_overflow) so
first-writer-wins respects nulls; update code paths that push into str_views/vi
to avoid appending a second entry when a duplicate is detected via
written_overflow, and remove the dead empty branch referenced at Line 131.
Ensure finish_batch and any logic that reads written_bits also consults
written_overflow to compute final written state.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: adba8257-161b-4ca6-ad2f-87032a3921c2

📥 Commits

Reviewing files that changed from the base of the PR and between b4c1216 and d1f5735.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • crates/logfwd-core/Cargo.toml
  • crates/logfwd-core/src/batch_builder.rs
  • crates/logfwd-core/src/chunk_classify.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/lib.rs
  • crates/logfwd-core/src/scanner.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/storage_builder.rs
  • crates/logfwd-core/src/streaming_builder.rs
  • crates/logfwd-core/tests/scanner_conformance.rs
💤 Files with no reviewable changes (2)
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/indexed_builder.rs

Comment thread crates/logfwd-core/src/simd_scanner.rs Outdated
Comment on lines +311 to +316
let mut storage = SimdScanner::new(ScanConfig::default(), 64);
let mut streaming = StreamingSimdScanner::new(ScanConfig::default());

let sb = storage.scan(input);
let stb = streaming.scan(bytes::Bytes::from(input.to_vec()));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Parity tests miss the config-dependent regressions.

Both scanners are created with ScanConfig::default(), so this helper never exercises keep_raw: true or the {"a":null,"a":1} first-writer-wins case. Add a couple of fixed regression cases here; the current storage/streaming mismatch still passes this suite untouched.

Also applies to: 390-403

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/tests/scanner_conformance.rs` around lines 311 - 316, The
parity tests always use ScanConfig::default(), so they never exercise
config-dependent regressions; modify the test to add explicit regression cases
that instantiate scanners with non-default configs (e.g., create
SimdScanner::new(ScanConfig { keep_raw: true, ..Default::default() }, 64) and
StreamingSimdScanner::new(ScanConfig { keep_raw: true, ..Default::default() }),
and also run both scanners on inputs that trigger first-writer-wins behavior
like b"{\"a\":null,\"a\":1}" to assert matching outputs), and add assertions
comparing storage.scan(...) and streaming.scan(...) results for those fixed
inputs to ensure the keep_raw=true and duplicate-key cases are exercised (refer
to SimdScanner::new, StreamingSimdScanner::new, ScanConfig::default,
storage.scan, streaming.scan).

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/logfwd-bench/benches/pipeline.rs (1)

119-131: 🧹 Nitpick | 🔵 Trivial

Consider importing FieldSpec at the top to reduce verbosity.

The fully-qualified logfwd_core::scan_config::FieldSpec is repeated three times. A single import would clean this up.

♻️ Proposed fix

Add to imports at line 12:

-use logfwd_core::scan_config::ScanConfig;
+use logfwd_core::scan_config::{FieldSpec, ScanConfig};

Then simplify the field specs:

                 let config = ScanConfig {
                     wanted_fields: vec![
-                        logfwd_core::scan_config::FieldSpec {
+                        FieldSpec {
                             name: "level".into(),
                             aliases: vec![],
                         },
-                        logfwd_core::scan_config::FieldSpec {
+                        FieldSpec {
                             name: "status".into(),
                             aliases: vec![],
                         },
-                        logfwd_core::scan_config::FieldSpec {
+                        FieldSpec {
                             name: "duration_ms".into(),
                             aliases: vec![],
                         },
                     ],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-bench/benches/pipeline.rs` around lines 119 - 131, Replace the
repeated fully-qualified type logfwd_core::scan_config::FieldSpec by adding a
use import (use logfwd_core::scan_config::FieldSpec;) near the top of the module
and then change the three occurrences to just FieldSpec { name: "...".into(),
aliases: vec![], } so the field specs read FieldSpec { ... } instead of
logfwd_core::scan_config::FieldSpec { ... }.
♻️ Duplicate comments (3)
crates/logfwd-core/src/streaming_builder.rs (1)

92-101: ⚠️ Potential issue | 🔴 Critical

Add Default implementation.

Pipeline fails: clippy::new-without-default.

+impl Default for StreamingBuilder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl StreamingBuilder {
     pub fn new() -> Self {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 92 - 101, Clippy
flags new-without-default for StreamingBuilder; implement the Default trait for
StreamingBuilder with fn default() -> Self returning the same initial state as
the current pub fn new(), and then either have pub fn new() call Self::default()
or remove new in favor of Default::default(); ensure the Default impl
initializes fields, field_index, row_count, written_bits, and buf identically to
the current new implementation.
crates/logfwd-core/src/storage_builder.rs (1)

157-158: ⚠️ Potential issue | 🟠 Major

append_null_by_idx doesn't mark field as written.

Explicit nulls don't update written_bits, so {k: null, k: 1} incorrectly resolves to 1 instead of preserving the first (null) value.

 #[inline(always)]
-pub fn append_null_by_idx(&mut self, _idx: usize) {}
+pub fn append_null_by_idx(&mut self, idx: usize) {
+    let _ = self.check_dup(idx);
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/storage_builder.rs` around lines 157 - 158,
append_null_by_idx currently does nothing so explicit nulls aren't recorded in
written_bits causing later values to overwrite them; modify append_null_by_idx
to mark the field as written by setting the corresponding bit in written_bits
(the same bit logic used by append_by_idx) and also record the explicit null in
the same storage/slot used for values (e.g., push a None/Null placeholder or set
the value slot to Null) so that subsequent merges see the field as explicitly
written; update append_null_by_idx to use the same indexing/bit-manipulation
helpers as append_by_idx and written_bits to ensure consistent behavior.
crates/logfwd-core/tests/scanner_conformance.rs (1)

308-316: 🧹 Nitpick | 🔵 Trivial

Parity tests still use ScanConfig::default() — add config-dependent regression cases.

As noted in prior review, assert_builders_consistent always uses default config. Add explicit cases for:

  • keep_raw: true
  • Duplicate-key inputs like {"a":null,"a":1} to verify first-writer-wins consistency
♻️ Proposed addition
#[test]
fn consistency_keep_raw() {
    let config = ScanConfig {
        wanted_fields: vec![],
        extract_all: true,
        keep_raw: true,
    };
    let input = br#"{"level":"INFO","msg":"test"}"#;
    
    let mut storage = SimdScanner::new(config.clone(), 64);
    let mut streaming = StreamingSimdScanner::new(config);
    
    let sb = storage.scan(input);
    let stb = streaming.scan(bytes::Bytes::from(input.to_vec()));
    
    assert_eq!(sb.num_rows(), stb.num_rows());
    // Verify _raw column exists and matches
    assert!(sb.column_by_name("_raw").is_some());
    assert!(stb.column_by_name("_raw").is_some());
}

#[test]
fn consistency_duplicate_keys() {
    let input = br#"{"a":null,"a":1}"#;
    // Both should use first-writer-wins
    assert_builders_consistent(input);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/tests/scanner_conformance.rs` around lines 308 - 316, The
parity test helper assert_builders_consistent currently always uses
ScanConfig::default(); add explicit test cases that exercise non-default
configs: create a ScanConfig instance with extract_all: true and keep_raw: true
and instantiate SimdScanner::new(config.clone(), 64) and
StreamingSimdScanner::new(config) to assert num_rows parity and that both
builders expose a "_raw" column; also add a duplicate-key test using input
br#"{"a":null,"a":1}"# that calls assert_builders_consistent to ensure both
SimdScanner and StreamingSimdScanner follow first-writer-wins for duplicate
keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/logfwd-core/src/format.rs`:
- Line 5: Fix the typo in the module doc comment at the top of format.rs: remove
the duplicated word "the" so the comment reads "lines suitable for the SIMD
scanner." Update the module-level doc comment (the line starting with "//!")
accordingly.

In `@crates/logfwd-core/src/scan_config.rs`:
- Around line 96-101: The function parse_float_fast uses unsafe
std::str::from_utf8_unchecked on unvalidated bytes which can cause UB for
non-UTF8 input; change it to perform a safe conversion using
std::str::from_utf8(bytes) and handle the Result (return None on Err) before
calling s.parse::<f64>().ok(), keeping the rest of parse_float_fast the same
(refer to the parse_float_fast function and the local s/bytes identifiers).

In `@crates/logfwd-core/src/scanner.rs`:
- Around line 208-217: The null-handling branch in scanner.rs currently does pos
+= 4 (b'n' branch) which trusts the next three bytes are "ull" and can land
mid-token for malformed input; replace that fixed skip with the same
delimiter-scanning logic used for booleans: advance pos in a loop while pos <
end and the current byte is part of an identifier (or not a delimiter like ',',
'}', ']', ':' or whitespace), then clamp pos = end if needed; after scanning,
keep the existing builder.resolve_field(key) and builder.append_null_by_idx(idx)
calls so nulls are recorded only after consuming the full token and not when
positioned mid-token.

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 131-132: Remove the no-op conditional by deleting the empty if
branch "if idx < 64 {}" and leave the subsequent "idx" expression as-is; if
there was intended behavior (e.g., clamping or an assertion), replace the empty
block with the proper logic (for example assert!(idx < 64) or idx =
idx.min(63)), otherwise simply remove the entire "if idx < 64 {}" line to
satisfy clippy::needless-ifs.

---

Outside diff comments:
In `@crates/logfwd-bench/benches/pipeline.rs`:
- Around line 119-131: Replace the repeated fully-qualified type
logfwd_core::scan_config::FieldSpec by adding a use import (use
logfwd_core::scan_config::FieldSpec;) near the top of the module and then change
the three occurrences to just FieldSpec { name: "...".into(), aliases: vec![], }
so the field specs read FieldSpec { ... } instead of
logfwd_core::scan_config::FieldSpec { ... }.

---

Duplicate comments:
In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 157-158: append_null_by_idx currently does nothing so explicit
nulls aren't recorded in written_bits causing later values to overwrite them;
modify append_null_by_idx to mark the field as written by setting the
corresponding bit in written_bits (the same bit logic used by append_by_idx) and
also record the explicit null in the same storage/slot used for values (e.g.,
push a None/Null placeholder or set the value slot to Null) so that subsequent
merges see the field as explicitly written; update append_null_by_idx to use the
same indexing/bit-manipulation helpers as append_by_idx and written_bits to
ensure consistent behavior.

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 92-101: Clippy flags new-without-default for StreamingBuilder;
implement the Default trait for StreamingBuilder with fn default() -> Self
returning the same initial state as the current pub fn new(), and then either
have pub fn new() call Self::default() or remove new in favor of
Default::default(); ensure the Default impl initializes fields, field_index,
row_count, written_bits, and buf identically to the current new implementation.

In `@crates/logfwd-core/tests/scanner_conformance.rs`:
- Around line 308-316: The parity test helper assert_builders_consistent
currently always uses ScanConfig::default(); add explicit test cases that
exercise non-default configs: create a ScanConfig instance with extract_all:
true and keep_raw: true and instantiate SimdScanner::new(config.clone(), 64) and
StreamingSimdScanner::new(config) to assert num_rows parity and that both
builders expose a "_raw" column; also add a duplicate-key test using input
br#"{"a":null,"a":1}"# that calls assert_builders_consistent to ensure both
SimdScanner and StreamingSimdScanner follow first-writer-wins for duplicate
keys.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: e250c199-1145-4339-813d-3ae95fc8a1a5

📥 Commits

Reviewing files that changed from the base of the PR and between d1f5735 and 04eb2f9.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • crates/logfwd-bench/benches/pipeline.rs
  • crates/logfwd-core/Cargo.toml
  • crates/logfwd-core/benches/scanner.rs
  • crates/logfwd-core/fuzz/fuzz_targets/scanner.rs
  • crates/logfwd-core/src/batch_builder.rs
  • crates/logfwd-core/src/chunk_classify.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/format.rs
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/lib.rs
  • crates/logfwd-core/src/scan_config.rs
  • crates/logfwd-core/src/scanner.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/storage_builder.rs
  • crates/logfwd-core/src/streaming_builder.rs
  • crates/logfwd-core/tests/scanner_conformance.rs
  • crates/logfwd-core/tests/scanner_correctness.rs
  • crates/logfwd-transform/src/lib.rs
  • crates/logfwd/src/pipeline.rs
💤 Files with no reviewable changes (5)
  • crates/logfwd-core/tests/scanner_correctness.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/batch_builder.rs

Comment thread crates/logfwd-core/src/format.rs Outdated
Comment thread crates/logfwd-core/src/scan_config.rs
Comment thread crates/logfwd-core/src/scanner.rs
Comment thread crates/logfwd-core/src/streaming_builder.rs Outdated
@strawgate strawgate force-pushed the arrow-ipc-output branch 2 times, most recently from a8a46f8 to f969fea Compare March 29, 2026 20:08
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/logfwd-core/benches/scanner.rs`:
- Around line 2-3: The benchmarks still call the old constructor signature that
included a count; update each benchmark to call the new SimdScanner::new(...)
that takes only the ScanConfig: replace calls like SimdScanner::new(config,
count) or SimdScanner::new(&config, count) with SimdScanner::new(config) (or
SimdScanner::new(&config) matching ownership in your file), and ensure you
create/prepare the ScanConfig/FieldSpec used by the scanner (types referenced:
SimdScanner::new, ScanConfig, FieldSpec) before passing it in.

In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs`:
- Around line 5-6: The SimdScanner constructor call uses the old arity including
a batch-size argument; update all calls to SimdScanner::new in this fuzz target
(and the other occurrences referenced) to remove the batch-size parameter so
they match the new signature (call SimdScanner::new with only the remaining
arguments), e.g., change any SimdScanner::new(..., batch_size) invocations to
SimdScanner::new(...) and keep ScanConfig/other parameters unchanged.

In `@crates/logfwd-core/src/scan_config.rs`:
- Around line 32-50: The current ScanConfig::is_wanted performs a linear search
over wanted_fields and their aliases (O(fields×aliases)) on every lookup; change
ScanConfig to precompute a HashSet of byte keys at construction (e.g., a
HashSet<Vec<u8>> or HashSet<Box<[u8]>>) that contains each fs.name.as_bytes()
and each alias.as_bytes(), skip building when extract_all is true, and then
rewrite is_wanted to return true if extract_all or the HashSet contains the key
(using key as a slice lookup). Update the constructor/builder that creates
ScanConfig to populate this wanted_set from wanted_fields and keep wanted_fields
unchanged for compatibility.

In `@crates/logfwd-core/src/scanner.rs`:
- Around line 208-216: The null branch in scanner.rs resolves the field and
calls builder.append_null_by_idx(idx) but that method is a no-op for the
"written" tracking in both storage_builder and streaming_builder, so you must
ensure null writes mark the field as already-written: either update
append_null_by_idx implementations in storage_builder and streaming_builder to
set the per-row "written" flag for idx, or (simpler) after let idx =
builder.resolve_field(key); call a dedicated writer-tracking method (e.g.,
builder.mark_written_by_idx(idx) or builder.mark_field_written(idx)) immediately
after append_null_by_idx so first-writer-wins honors leading nulls.
- Around line 193-206: In the boolean-value branch (the b't' | b'f' match) in
scanner.rs the loop that advances pos should treat carriage return as a
terminator like the numeric branch; update the delimiter checks (the condition
testing buf[pos] != b',' && buf[pos] != b'}' && buf[pos] != b' ' && buf[pos] !=
b'\t') to also check buf[pos] != b'\r' so that
builder.append_str_by_idx(resolve_field(key), &buf[s..pos]) does not include a
trailing '\r' in the extracted value.

In `@crates/logfwd-core/src/storage_builder.rs`:
- Around line 132-142: The current append_int_by_idx sets fc.has_int before
attempting parse_int_fast and silently drops unparseable values, creating
asymmetry with append_str_by_idx; fix by either moving the assignment of
fc.has_int into the Some(v) branch so has_int is only set when parse succeeds
(i.e., inside the if let Some(v) block) or explicitly record a null marker for
that row in fc.int_values when parse_int_fast returns None; additionally add a
short doc comment on append_int_by_idx describing the chosen behavior (and
reference parse_int_fast, append_str_by_idx, fc.has_int, and fc.int_values to
locate the logic).

In `@crates/logfwd-core/src/streaming_builder.rs`:
- Around line 201-203: The streaming builder currently drops raw lines in pub fn
append_raw(&mut self, _line: &[u8]) which breaks ScanConfig.keep_raw usage from
StreamingSimdScanner; either implement storage and emission of raw bytes or fail
fast when keep_raw is requested. Update StreamingBuilder (and its constructor
used by StreamingSimdScanner) to accept a buffer/Vec<Vec<u8>> or similar field
to store appended raw lines in append_raw and ensure those raw bytes are emitted
as the _raw column when building batches, or check ScanConfig.keep_raw at
creation time and return an Err or panic with a clear message if raw views are
unsupported. Ensure you modify the code paths that construct the builder in
StreamingSimdScanner to enforce the new behavior and update append_raw signature
usages accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0074becc-4b98-4f95-a3de-2c0c48e569fe

📥 Commits

Reviewing files that changed from the base of the PR and between 04eb2f9 and a8a46f8.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • crates/logfwd-bench/benches/pipeline.rs
  • crates/logfwd-core/Cargo.toml
  • crates/logfwd-core/benches/scanner.rs
  • crates/logfwd-core/fuzz/fuzz_targets/scanner.rs
  • crates/logfwd-core/src/batch_builder.rs
  • crates/logfwd-core/src/chunk_classify.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/format.rs
  • crates/logfwd-core/src/indexed_builder.rs
  • crates/logfwd-core/src/lib.rs
  • crates/logfwd-core/src/scan_config.rs
  • crates/logfwd-core/src/scanner.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/storage_builder.rs
  • crates/logfwd-core/src/streaming_builder.rs
  • crates/logfwd-core/tests/scanner_conformance.rs
  • crates/logfwd-core/tests/scanner_correctness.rs
  • crates/logfwd-transform/src/lib.rs
  • crates/logfwd/src/pipeline.rs
💤 Files with no reviewable changes (5)
  • crates/logfwd-core/tests/scanner_correctness.rs
  • crates/logfwd-core/src/columnar_builder.rs
  • crates/logfwd-core/src/simd_scanner.rs
  • crates/logfwd-core/src/batch_builder.rs
  • crates/logfwd-core/src/indexed_builder.rs

Comment thread crates/logfwd-core/benches/scanner.rs
Comment thread crates/logfwd-core/fuzz/fuzz_targets/scanner.rs
Comment on lines +32 to +50
impl ScanConfig {
/// Is this field name wanted?
#[inline]
pub fn is_wanted(&self, key: &[u8]) -> bool {
if self.extract_all {
return true;
}
for fs in &self.wanted_fields {
if key == fs.name.as_bytes() {
return true;
}
for a in &fs.aliases {
if key == a.as_bytes() {
return true;
}
}
}
false
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

is_wanted has O(fields × aliases) complexity per field lookup.

For hot paths with many wanted fields, consider a HashSet<&[u8]> built at construction time. Current impl is fine for typical small field lists.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/scan_config.rs` around lines 32 - 50, The current
ScanConfig::is_wanted performs a linear search over wanted_fields and their
aliases (O(fields×aliases)) on every lookup; change ScanConfig to precompute a
HashSet of byte keys at construction (e.g., a HashSet<Vec<u8>> or
HashSet<Box<[u8]>>) that contains each fs.name.as_bytes() and each
alias.as_bytes(), skip building when extract_all is true, and then rewrite
is_wanted to return true if extract_all or the HashSet contains the key (using
key as a slice lookup). Update the constructor/builder that creates ScanConfig
to populate this wanted_set from wanted_fields and keep wanted_fields unchanged
for compatibility.

Comment thread crates/logfwd-core/src/scanner.rs
Comment thread crates/logfwd-core/src/scanner.rs
Comment thread crates/logfwd-core/src/storage_builder.rs
Comment on lines +201 to +203
pub fn append_raw(&mut self, _line: &[u8]) {
// No _raw support in streaming builder.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't silently ignore keep_raw in the streaming builder.

crates/logfwd-core/src/scanner.rs:138-140 still calls append_raw() when ScanConfig.keep_raw is enabled, but this implementation drops the line. StreamingSimdScanner therefore accepts keep_raw: true and returns a batch without the requested _raw column. Either implement raw views here or fail fast when that config is used.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/logfwd-core/src/streaming_builder.rs` around lines 201 - 203, The
streaming builder currently drops raw lines in pub fn append_raw(&mut self,
_line: &[u8]) which breaks ScanConfig.keep_raw usage from StreamingSimdScanner;
either implement storage and emission of raw bytes or fail fast when keep_raw is
requested. Update StreamingBuilder (and its constructor used by
StreamingSimdScanner) to accept a buffer/Vec<Vec<u8>> or similar field to store
appended raw lines in append_raw and ensure those raw bytes are emitted as the
_raw column when building batches, or check ScanConfig.keep_raw at creation time
and return an Err or panic with a clear message if raw views are unsupported.
Ensure you modify the code paths that construct the builder in
StreamingSimdScanner to enforce the new behavior and update append_raw signature
usages accordingly.

Two purpose-built Arrow builders sharing a generic SIMD scan loop.
Removes the old scalar scanner, BatchBuilder, and all dead code.

Module layout:
  scan_config   — ScanConfig, FieldSpec, parse_int_fast, parse_float_fast
  scanner       — SimdScanner, StreamingSimdScanner, ScanBuilder trait
  storage_builder    — self-contained persistence builder (deferred pattern)
  streaming_builder  — zero-copy hot-path builder (StringViewArray)
  chunk_classify     — (crate-internal) NEON SIMD structural classifier
  simd_scan          — (crate-internal) per-line SIMD string/nested scanning

Deleted:
  batch_builder.rs      — old HashMap-based builder (658 lines)
  simd_scanner.rs       — renamed to scanner.rs (it IS the scanner now)
  scanner_correctness.rs — redundant with oracle conformance tests

StorageBuilder: deferred (row, value) pattern. Correct by construction — no
cross-builder null padding, no column alignment bugs.

StreamingBuilder: zero-copy StringViewArrays into bytes::Bytes buffer. No
string copies during scanning or at finish.

ScanBuilder trait: generic scan_into() works with either builder via
monomorphization. One scan implementation, two output modes.

Fixes: scalar fallback escape handling, RecordBatchOptions for empty batches,
keep_raw default to false, cross-builder consistency tests, SAFETY comments.

185 tests passing. 66 oracle-verified conformance tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@strawgate strawgate merged commit 2fab93e into master Mar 29, 2026
4 checks passed
@strawgate strawgate deleted the arrow-ipc-output branch March 29, 2026 20:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant