refactor: delete StorageBuilder, unify on Scanner with scan()/scan_detached()#962
Conversation
|
Caution Review failedPull request was closed or merged during review WalkthroughThis PR consolidates dual-scanner infrastructure ( Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 2 warnings, 1 inconclusive)
Comment |
ApprovabilityVerdict: Needs human review This is a major refactor that deletes the entire You can customize Macroscope's approvability policy. Learn more. |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs (1)
37-50:⚠️ Potential issue | 🟠 MajorThis still lets schema and conflict-column drift slip through.
Comparing
BTreeSets throws away column order, and the_ => {}arm silently acceptsStructArrayconflict columns plus any other unexpected type pair. That meansscan()andscan_owned()can diverge on mixed-type fields or schema layout while this target still passes. Compare ordered fields, recurse intoDataType::Struct(_)children, and treat any unhandled type pairing as a fuzz failure.Based on learnings: Applies to
crates/logfwd-arrow/src/**/*.rs: Use bare names for single-type fields and StructArray for column naming conflicts.Also applies to: 53-114
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs` around lines 37 - 50, The current equality check uses BTreeSet of column names which loses ordering and misses StructArray vs bare-type mismatches; replace the set comparison with an ordered, index-by-index comparison of owned_batch.schema().fields() vs streaming_batch.schema().fields(), and when a field's DataType is a Struct recurse into its children to compare nested field names/types; treat any unexpected or unhandled type pairing (e.g., one side StructArray and the other a concrete type) as a fuzz failure (panic/assert) so scan() and scan_owned() divergences fail the target; apply the same ordered/composite checks for the analogous block covering lines ~53-114 and follow the convention of using bare names for single-type fields and StructArray for conflict columns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-arrow/tests/allocation_regression.rs`:
- Around line 53-57: The test is measuring allocation regressions but currently
does heavy input preparation by calling bytes::Bytes::from(data.clone()) inside
the measurement windows; move the expensive allocation out of the Region::new()
measured blocks by creating a single bytes::Bytes value from data (e.g., let
input = bytes::Bytes::from(data.clone()) or construct input once without
including it in the timed region) before each measured region, and then inside
the measured loops call scanner.scan_owned(input.clone()).unwrap() so only the
cheap refcounted clone happens during measurement; update every measured block
that currently uses scanner.scan_owned(bytes::Bytes::from(data.clone())) to use
the pre-allocated input and clone that handle instead.
In `@crates/logfwd-bench/benches/pipeline.rs`:
- Line 14: The import has a duplicated "Streaming" identifier: replace the
incorrect use of StreamingStreamingSimdScanner with the correct type name
StreamingSimdScanner in the use statement and update any other references to
StreamingStreamingSimdScanner in this file (e.g., variable types, instantiations
or pattern matches) to StreamingSimdScanner so the code compiles.
In `@crates/logfwd-bench/src/rss.rs`:
- Around line 95-97: The RSS benchmark is biased because the owned path clones
the full Vec<u8> while the streaming path moves it; instead, convert the input
Vec<u8> into a Bytes once and then clone that Bytes handle (cheap refcount
increment) for both the scan_owned and scan paths so both measure the same input
cost; update the setup used by scan_owned and scan (referencing the scan_owned
and scan calls and the variables creating the input Bytes) to create bytes_input
= Bytes::from(vec) once and use bytes_input.clone() where the owned path
previously cloned the Vec<u8>.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs`:
- Around line 22-26: The fuzz target currently returns early when either
StreamingSimdScanner::scan_owned or ::scan returns Err, but doesn't check that
the other call produced the same Result; ensure parity by calling both (or
capturing both Results first) and asserting that both are Ok or both are Err
before proceeding—use the StreamingSimdScanner instances/variables
(owned_scanner, streaming_scanner) and their results (owned_batch,
streaming_batch) to compare outcomes and if one is Ok while the other is Err,
cause the target to fail (panic or unwrap) so the mismatch is reported.
In `@crates/logfwd-core/tests/it/compliance_data.rs`:
- Around line 725-726: Update the inline comment to reflect that
StreamingBuilder::begin_batch() resets field_index (it does not persist across
batches); change the wording near the test around field_index to state that
begin_batch clears field_index so the column still exists for schema stability
but its value should be null at batch start. Reference
StreamingBuilder::begin_batch() and the field_index behavior when editing the
comment.
In `@dev-docs/ARCHITECTURE.md`:
- Around line 162-175: Doc text incorrectly claims finish_batch() and
StreamingSimdScanner::scan(Bytes) are always zero-copy; update the wording to
qualify that finish_batch() is zero-copy only when no decoded strings exist and
that when decoded_buf is populated the implementation builds a combined buffer
(causing a copy) before creating the StringViewArray; change the description of
StreamingSimdScanner::scan(Bytes) to state it returns a zero-copy RecordBatch
only if no decoded strings are present, whereas
StreamingSimdScanner::scan_owned(Bytes) still produces an owned StringArray via
finish_batch_owned() for persistence/compression.
---
Outside diff comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs`:
- Around line 37-50: The current equality check uses BTreeSet of column names
which loses ordering and misses StructArray vs bare-type mismatches; replace the
set comparison with an ordered, index-by-index comparison of
owned_batch.schema().fields() vs streaming_batch.schema().fields(), and when a
field's DataType is a Struct recurse into its children to compare nested field
names/types; treat any unexpected or unhandled type pairing (e.g., one side
StructArray and the other a concrete type) as a fuzz failure (panic/assert) so
scan() and scan_owned() divergences fail the target; apply the same
ordered/composite checks for the analogous block covering lines ~53-114 and
follow the convention of using bare names for single-type fields and StructArray
for conflict columns.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8424e577-003d-44e3-81bf-6cf987f8e129
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (37)
DEVELOPING.mdbook/src/architecture/pipeline.mdbook/src/development/contributing.mdcrates/logfwd-arrow/README.mdcrates/logfwd-arrow/src/conflict_schema.rscrates/logfwd-arrow/src/lib.rscrates/logfwd-arrow/src/scanner.rscrates/logfwd-arrow/src/storage_builder.rscrates/logfwd-arrow/tests/allocation_regression.rscrates/logfwd-bench/benches/pipeline.rscrates/logfwd-bench/src/e2e_profile.rscrates/logfwd-bench/src/es_throughput.rscrates/logfwd-bench/src/explore.rscrates/logfwd-bench/src/rss.rscrates/logfwd-bench/src/sizes.rscrates/logfwd-core/benches/scanner.rscrates/logfwd-core/examples/arrow_ipc_roundtrip.rscrates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_sink.rscrates/logfwd-core/fuzz/fuzz_targets/scanner_transform.rscrates/logfwd-core/fuzz/fuzz_targets/structural_index.rscrates/logfwd-core/src/scan_config.rscrates/logfwd-core/src/scanner.rscrates/logfwd-core/tests/it/compliance_data.rscrates/logfwd-core/tests/it/scanner_conformance.rscrates/logfwd/Cargo.tomlcrates/logfwd/src/pipeline.rscrates/logfwd/tests/it/integration.rsdev-docs/ARCHITECTURE.mddev-docs/DESIGN.mddev-docs/SCANNER_CONTRACT.mddev-docs/VERIFICATION.mddev-docs/research/arrow-ipc-feasibility.mddev-docs/research/type-suffix-redesign.mddocs/ci/documentation-thoroughly-updated.mddocs/ci/high-quality-rust-practices.md
💤 Files with no reviewable changes (1)
- crates/logfwd-arrow/src/storage_builder.rs
| drop( | ||
| scanner | ||
| .scan_owned(bytes::Bytes::from(data.clone())) | ||
| .unwrap(), | ||
| ); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify measured scan_owned loops are cloning Vec input in-place.
rg -n -C2 'scan_owned\(bytes::Bytes::from\([a-zA-Z0-9_]+\.clone\(\)\)\)' crates/logfwd-arrow/tests/allocation_regression.rsRepository: strawgate/memagent
Length of output: 912
🏁 Script executed:
# Read context around the allocation test to confirm measurement setup
sed -n '40,85p' crates/logfwd-arrow/tests/allocation_regression.rsRepository: strawgate/memagent
Length of output: 1168
🏁 Script executed:
# Also check the latter measurement sites
sed -n '120,145p' crates/logfwd-arrow/tests/allocation_regression.rsRepository: strawgate/memagent
Length of output: 963
🏁 Script executed:
# Verify bytes::Bytes is indeed refcounted/cheap clone
rg -A5 'pub struct Bytes' --type rustRepository: strawgate/memagent
Length of output: 44
Allocation regression measurements are polluted by input cloning inside measurement windows.
Lines 53–57, 62–66, 72–76, 129–131, 136–138: data.clone() allocates per iteration inside Region::new() measurement blocks. The test should measure scanner allocation behavior, not input preparation cost. Pre-allocate bytes::Bytes once before measured regions and clone the refcounted handle inside loops (cheap operation).
Proposed fix
fn owned_scanner_no_leak_across_batches() {
let mut scanner = StreamingSimdScanner::new(ScanConfig::default());
- let data = make_ndjson(500);
+ let input: bytes::Bytes = make_ndjson(500).into();
for _ in 0..5 {
- drop(
- scanner
- .scan_owned(bytes::Bytes::from(data.clone()))
- .unwrap(),
- );
+ drop(scanner.scan_owned(input.clone()).unwrap());
}
let reg1 = Region::new(GLOBAL);
for _ in 0..10 {
- drop(
- scanner
- .scan_owned(bytes::Bytes::from(data.clone()))
- .unwrap(),
- );
+ drop(scanner.scan_owned(input.clone()).unwrap());
}
let reg2 = Region::new(GLOBAL);
for _ in 0..10 {
- drop(
- scanner
- .scan_owned(bytes::Bytes::from(data.clone()))
- .unwrap(),
- );
+ drop(scanner.scan_owned(input.clone()).unwrap());
}- let data_500 = make_ndjson(500);
+ let data_500: bytes::Bytes = make_ndjson(500).into();
let reg_500 = Region::new(GLOBAL);
let _ = scanner
- .scan_owned(bytes::Bytes::from(data_500.clone()))
+ .scan_owned(data_500.clone())
.unwrap();
- let data_5000 = make_ndjson(5000);
+ let data_5000: bytes::Bytes = make_ndjson(5000).into();
let reg_5000 = Region::new(GLOBAL);
let _ = scanner
- .scan_owned(bytes::Bytes::from(data_5000.clone()))
+ .scan_owned(data_5000.clone())
.unwrap();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| drop( | |
| scanner | |
| .scan_owned(bytes::Bytes::from(data.clone())) | |
| .unwrap(), | |
| ); | |
| drop(scanner.scan_owned(input.clone()).unwrap()); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-arrow/tests/allocation_regression.rs` around lines 53 - 57, The
test is measuring allocation regressions but currently does heavy input
preparation by calling bytes::Bytes::from(data.clone()) inside the measurement
windows; move the expensive allocation out of the Region::new() measured blocks
by creating a single bytes::Bytes value from data (e.g., let input =
bytes::Bytes::from(data.clone()) or construct input once without including it in
the timed region) before each measured region, and then inside the measured
loops call scanner.scan_owned(input.clone()).unwrap() so only the cheap
refcounted clone happens during measurement; update every measured block that
currently uses scanner.scan_owned(bytes::Bytes::from(data.clone())) to use the
pre-allocated input and clone that handle instead.
|
|
||
| use logfwd_arrow::scanner::SimdScanner; | ||
| use bytes::Bytes; | ||
| use logfwd_arrow::scanner::StreamingStreamingSimdScanner; |
There was a problem hiding this comment.
Typo: StreamingStreamingSimdScanner should be StreamingSimdScanner.
Duplicated "Streaming" prefix will cause compilation failure.
🐛 Fix
-use logfwd_arrow::scanner::StreamingStreamingSimdScanner;
+use logfwd_arrow::scanner::StreamingSimdScanner;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use logfwd_arrow::scanner::StreamingStreamingSimdScanner; | |
| use logfwd_arrow::scanner::StreamingSimdScanner; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-bench/benches/pipeline.rs` at line 14, The import has a
duplicated "Streaming" identifier: replace the incorrect use of
StreamingStreamingSimdScanner with the correct type name StreamingSimdScanner in
the use statement and update any other references to
StreamingStreamingSimdScanner in this file (e.g., variable types, instantiations
or pattern matches) to StreamingSimdScanner so the code compiles.
| // === scan_owned (copies strings) vs scan (zero-copy) === | ||
| println!("--- scan_owned vs scan (1M simple lines) ---"); | ||
| { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify scan_owned path clones full input while scan path does not in RSS comparison block.
rg -n -C3 'scan_owned\(bytes::Bytes::from\(data.clone\(\)\)\)|scan\(bytes::Bytes::from\(data\)\)' crates/logfwd-bench/src/rss.rsRepository: strawgate/memagent
Length of output: 855
🏁 Script executed:
# Get full context of the benchmark block and function definitions
sed -n '90,130p' crates/logfwd-bench/src/rss.rsRepository: strawgate/memagent
Length of output: 2004
🏁 Script executed:
# Find generate_simple function definition
rg -n 'fn generate_simple' crates/logfwd-bench/src/rss.rs -A 3Repository: strawgate/memagent
Length of output: 319
scan_owned vs scan RSS benchmark comparison is biased by unequal input setup.
The owned path clones the full Vec<u8> before wrapping in Bytes, while the streaming path moves it directly. This inflates the owned-path RSS measurement by the clone cost, confounding the builder/output-mode comparison.
To fix, convert to Bytes once, then clone the cheap Bytes handle (refcount increment) for both paths:
- let data = generate_simple(1_000_000);
+ let data: bytes::Bytes = generate_simple(1_000_000).into();
let raw_mb = data.len() as f64 / 1_048_576.0;
let before = rss_mb();
let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default());
- let batch = owned_scanner.scan_owned(bytes::Bytes::from(data.clone())).unwrap();
+ let batch = owned_scanner.scan_owned(data.clone()).unwrap();
let after_owned = rss_mb();
...
let mid = rss_mb();
let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default());
- let batch = streaming_scanner.scan(bytes::Bytes::from(data)).unwrap();
+ let batch = streaming_scanner.scan(data.clone()).unwrap();Affects lines 103–104 and 112.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-bench/src/rss.rs` around lines 95 - 97, The RSS benchmark is
biased because the owned path clones the full Vec<u8> while the streaming path
moves it; instead, convert the input Vec<u8> into a Bytes once and then clone
that Bytes handle (cheap refcount increment) for both the scan_owned and scan
paths so both measure the same input cost; update the setup used by scan_owned
and scan (referencing the scan_owned and scan calls and the variables creating
the input Bytes) to create bytes_input = Bytes::from(vec) once and use
bytes_input.clone() where the owned path previously cloned the Vec<u8>.
| let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default()); | ||
| let Ok(owned_batch) = owned_scanner.scan_owned(bytes::Bytes::copy_from_slice(data)) else { return; }; | ||
|
|
||
| let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default()); | ||
| let Ok(streaming_batch) = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)) else { return; }; |
There was a problem hiding this comment.
Assert scan() / scan_owned() failure parity before returning.
A one-sided Err is currently ignored. Because the two modes diverge in finalization, the target should fail if one mode succeeds and the other does not.
🧪 Suggested fix
- let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default());
- let Ok(owned_batch) = owned_scanner.scan_owned(bytes::Bytes::copy_from_slice(data)) else { return; };
-
- let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default());
- let Ok(streaming_batch) = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)) else { return; };
+ let bytes = bytes::Bytes::copy_from_slice(data);
+
+ let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default());
+ let owned = owned_scanner.scan_owned(bytes.clone());
+
+ let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default());
+ let streaming = streaming_scanner.scan(bytes);
+
+ assert_eq!(owned.is_ok(), streaming.is_ok(), "mode success mismatch");
+ let (Ok(owned_batch), Ok(streaming_batch)) = (owned, streaming) else { return; };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default()); | |
| let Ok(owned_batch) = owned_scanner.scan_owned(bytes::Bytes::copy_from_slice(data)) else { return; }; | |
| let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default()); | |
| let Ok(streaming_batch) = streaming_scanner.scan(bytes::Bytes::copy_from_slice(data)) else { return; }; | |
| let bytes = bytes::Bytes::copy_from_slice(data); | |
| let mut owned_scanner = StreamingSimdScanner::new(ScanConfig::default()); | |
| let owned = owned_scanner.scan_owned(bytes.clone()); | |
| let mut streaming_scanner = StreamingSimdScanner::new(ScanConfig::default()); | |
| let streaming = streaming_scanner.scan(bytes); | |
| assert_eq!(owned.is_ok(), streaming.is_ok(), "mode success mismatch"); | |
| let (Ok(owned_batch), Ok(streaming_batch)) = (owned, streaming) else { return; }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner_consistency.rs` around lines 22
- 26, The fuzz target currently returns early when either
StreamingSimdScanner::scan_owned or ::scan returns Err, but doesn't check that
the other call produced the same Result; ensure parity by calling both (or
capturing both Results first) and asserting that both are Ok or both are Err
before proceeding—use the StreamingSimdScanner instances/variables
(owned_scanner, streaming_scanner) and their results (owned_batch,
streaming_batch) to compare outcomes and if one is Ok while the other is Err,
cause the target to fail (panic or unwrap) so the mismatch is reported.
| // (StreamingBuilder clears collectors on begin_batch, but field_index persists | ||
| // for schema stability. The column exists but the value should be null.) |
There was a problem hiding this comment.
Inline behavior note is now inaccurate.
This comment says field_index persists, but StreamingBuilder::begin_batch() clears it. Please update the note to match current reset semantics.
Suggested wording
- // (StreamingBuilder clears collectors on begin_batch, but field_index persists
- // for schema stability. The column exists but the value should be null.)
+ // StreamingBuilder resets per-batch state on begin_batch().
+ // Prior batch values must not leak into this batch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/tests/it/compliance_data.rs` around lines 725 - 726,
Update the inline comment to reflect that StreamingBuilder::begin_batch() resets
field_index (it does not persist across batches); change the wording near the
test around field_index to state that begin_batch clears field_index so the
column still exists for schema stability but its value should be null at batch
start. Reference StreamingBuilder::begin_batch() and the field_index behavior
when editing the comment.
…tached() Delete StorageBuilder and CopyScanner — both are strictly dominated by StreamingBuilder's dual-output architecture (finish_batch for zero-copy StringViewArray, finish_batch_detached for owned StringArray). Rename ZeroCopyScanner → Scanner. With only one scanner type, the "ZeroCopy" qualifier adds no information. The method names communicate the distinction: scan() for wire, scan_detached() for persistence. Renames applied throughout: ZeroCopyScanner → Scanner CopyScanner → (deleted) scan_owned() → scan_detached() finish_batch_owned() → finish_batch_detached() 45 files changed, -1041 net lines. All 1,002 workspace tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2d41c02 to
f500df7
Compare
Summary
StorageBuilderandCopyScannerentirely (-1041 net lines)ZeroCopyScanner→Scanner(only one scanner now, qualifier is noise)scan_owned()→scan_detached(),finish_batch_owned()→finish_batch_detached()(consistent withdetachmodule vocabulary)Follows #941 which proved
finish_batch_detached()dominatesStorageBuilderat every benchmark.Architecture after this PR
One builder (
StreamingBuilder), two finish modes:finish_batch()→StringViewArray— views into input buffer (hot/wire path)finish_batch_detached()→StringArray— bulk copy at finalization (persistence path)Test plan
cargo build --workspacecleancargo test --workspace— 1,002 tests passcargo clippy --workspace -- -D warningscleancargo fmt --all --checkcleanStorageBuilder/CopyScanner/ZeroCopyScanner/scan_owned/finish_batch_ownedreferences in.rsor.md🤖 Generated with Claude Code
Note
Delete
StorageBuilder,CopyScanner, andZeroCopyScannerin favor of a unifiedScannerwithscan()/scan_detached()StorageBuilderand the two separate scanner types (CopyScanner,ZeroCopyScanner) fromlogfwd-arrow, replacing them with a singleScannerthat exposes two methods:scan()for zero-copyStringViewArrayoutput andscan_detached()for ownedStringArrayoutput.finish_batch_ownedtofinish_batch_detachedonStreamingBuilderto align with the new naming convention; no functional logic changes.Scanner::new()with the appropriate method.StorageBuilder,CopyScanner, andZeroCopyScannerare no longer exported fromlogfwd-arrow; any code referencing these symbols will not compile.Macroscope summarized f500df7.