Refactor SIMD scanner to support multiple builder backends#59
Refactor SIMD scanner to support multiple builder backends#59strawgate wants to merge 4 commits into
Conversation
Replace the HashMap-based BatchBuilder in SimdScanner with the 3x faster
IndexedBatchBuilder (bitset field tracking, direct index-based appends).
Add ColumnarSimdScanner for zero-copy scanning with adaptive dictionary
encoding and zstd-compressed Arrow IPC output.
Key changes:
- Define ScanBuilder trait shared by IndexedBatchBuilder and ColumnarBatchBuilder
- Extract scan loop into generic scan_into<B>() to avoid code duplication
- SimdScanner now uses IndexedBatchBuilder (drop-in, same public API)
- New ColumnarSimdScanner with scan() and scan_compressed() methods
- Default keep_raw to false (was 65% of uncompressed table size)
- Fix all three builders to use RecordBatchOptions for correct row count
on empty-column batches (e.g. scanning "{}" with keep_raw=false)
- Add SIMD scanner + columnar scanner coverage to fuzz harness
https://claude.ai/code/session_01QamNN5H6VX3GE34Y28hqXJ
WalkthroughThis PR refactors the JSON-to-Arrow scanning architecture by introducing a generic Possibly related PRs
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-core/src/scanner.rs (1)
36-42:⚠️ Potential issue | 🟠 MajorDon't change
ScanConfig::default()silently.
keep_raw: falsechanges the public default schema for every caller still usingScanConfig::default(). We already have default-based callers incrates/logfwd-core/tests/scanner_correctness.rs:10-19andcrates/logfwd-bench/benches/pipeline.rs:107-111,209-213, so this PR changes their output shape and benchmark behavior with no compile-time signal. Please either keep the old default, or update every default-based call site and document the schema break explicitly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/scanner.rs` around lines 36 - 42, The change to ScanConfig::default altered the public default schema by setting keep_raw: false; revert keep_raw to its previous value (set keep_raw: true) in the ScanConfig::default implementation to preserve the existing public behavior (ScanConfig::default, wanted_fields, extract_all remain unchanged), or if the schema change is intentional, update every default-based caller to construct ScanConfig explicitly with the new keep_raw value and add a documented, top-level breaking-change note about the schema change; ensure you modify ScanConfig::default (the impl Default for ScanConfig) and either revert keep_raw or update and document callers accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs`:
- Around line 50-85: The fuzz harness must reject non-UTF-8 inputs before
invoking the scanners because the builders (batch_builder.rs,
indexed_builder.rs, columnar_builder.rs) call from_utf8_unchecked and cause UB;
add a UTF-8 gate at the top of the fuzz_target closure (before calling
Scanner::scan, SimdScanner::scan, ColumnarSimdScanner::scan) that checks the
input with std::str::from_utf8 (or equivalent) and returns/continues early if
the check fails so only valid UTF-8 bytes reach the builders.
In `@crates/logfwd-core/src/simd_scanner.rs`:
- Around line 279-296: SimdScanner currently uses IndexedBatchBuilder which
tracks per-row writes in a u64 and will break for >64 extracted fields; add a
guard in SimdScanner::new (or right before calling scan_into in
SimdScanner::scan) that checks the expected or detected max field count and if
it can exceed 64, use a fallback builder (e.g., the previous non-indexed batch
builder) or a safe implementation that supports >64 columns instead of
IndexedBatchBuilder; reference the SimdScanner::new, SimdScanner::scan,
IndexedBatchBuilder, and finish_batch() symbols so the change replaces the
builder selection logic and prevents finish_batch() panics.
- Around line 43-85: CI fails cargo fmt on the forwarding impls for ScanBuilder
on IndexedBatchBuilder and ColumnarBatchBuilder; run rustfmt (e.g., cargo fmt)
and commit the formatted changes so the impl blocks and their methods
(begin_batch, begin_row, end_row, resolve_field, append_str_by_idx,
append_int_by_idx, append_float_by_idx, append_null_by_idx, append_raw, and
set_buffer) match rustfmt's style and satisfy cargo fmt --check.
- Around line 303-309: The dictionary tier selection currently uses
StringDictionaryBuilder::<Int8Type> for cardinality < 256 which will overflow at
128..255 (i8::MAX); update the builder choice so that when you intend to support
up to 255 entries you use StringDictionaryBuilder::<UInt8Type> instead of
Int8Type, or alternatively change the branch threshold from "< 256" to "< 128"
to remain safe with Int8Type; specifically locate the branch that picks
StringDictionaryBuilder::<Int8Type> (and any constants/variables comparing
cardinality against 256) and replace Int8Type with UInt8Type (or lower the
numeric threshold) so the chosen dictionary type matches the supported
cardinality.
---
Outside diff comments:
In `@crates/logfwd-core/src/scanner.rs`:
- Around line 36-42: The change to ScanConfig::default altered the public
default schema by setting keep_raw: false; revert keep_raw to its previous value
(set keep_raw: true) in the ScanConfig::default implementation to preserve the
existing public behavior (ScanConfig::default, wanted_fields, extract_all remain
unchanged), or if the schema change is intentional, update every default-based
caller to construct ScanConfig explicitly with the new keep_raw value and add a
documented, top-level breaking-change note about the schema change; ensure you
modify ScanConfig::default (the impl Default for ScanConfig) and either revert
keep_raw or update and document callers accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 77398bfd-ac9d-497b-9dba-0cf66402ea8d
📒 Files selected for processing (7)
crates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/src/batch_builder.rscrates/logfwd-core/src/columnar_builder.rscrates/logfwd-core/src/indexed_builder.rscrates/logfwd-core/src/scanner.rscrates/logfwd-core/src/simd_scanner.rscrates/logfwd-core/tests/scanner_conformance.proptest-regressions
| pub struct SimdScanner { | ||
| builder: IndexedBatchBuilder, | ||
| config: ScanConfig, | ||
| } | ||
|
|
||
| impl SimdScanner { | ||
| pub fn new(config: ScanConfig, expected_rows: usize) -> Self { | ||
| SimdScanner { | ||
| builder: IndexedBatchBuilder::new(expected_rows, config.keep_raw), | ||
| config, | ||
| } | ||
| } | ||
|
|
||
| /// Scan a buffer of newline-delimited JSON lines and return a RecordBatch. | ||
| pub fn scan(&mut self, buf: &[u8]) -> RecordBatch { | ||
| scan_into(buf, &self.config, &mut self.builder); | ||
| self.builder.finish_batch() | ||
| } |
There was a problem hiding this comment.
SimdScanner now panics on batches with 65+ extracted fields.
This switches the default SIMD backend to IndexedBatchBuilder, but that builder only tracks per-row writes in a u64. After the 64th discovered field, missing-row padding stops covering later columns, so the next row that omits one of them leaves the array short and finish_batch() panics. Please add an explicit guard or a fallback backend before wiring this in as the default.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/src/simd_scanner.rs` around lines 279 - 296, SimdScanner
currently uses IndexedBatchBuilder which tracks per-row writes in a u64 and will
break for >64 extracted fields; add a guard in SimdScanner::new (or right before
calling scan_into in SimdScanner::scan) that checks the expected or detected max
field count and if it can exceed 64, use a fallback builder (e.g., the previous
non-indexed batch builder) or a safe implementation that supports >64 columns
instead of IndexedBatchBuilder; reference the SimdScanner::new,
SimdScanner::scan, IndexedBatchBuilder, and finish_batch() symbols so the change
replaces the builder selection logic and prevents finish_batch() panics.
Two correctness fixes: 1. chunk_classify scalar fallback: Fix bit/buffer desync in escape handling. The old code advanced `i` by 2 on backslash but `bit` by 1, permanently misaligning all subsequent bitmask positions. Also, closing quotes were incorrectly marked as string interior. Replaced with escaped_next flag that properly carries across 64-byte block boundaries. 2. IndexedBatchBuilder + ColumnarBatchBuilder: Fix >64 field support. The u64 bitset only tracked fields 0-63. Fields 64+ had no duplicate detection (all shared bit 0) and no end-of-row NULL padding, causing column length mismatches. Added per-field written_this_row flag for overflow fields while preserving the fast bitset path for fields 0-63. Both fixes were found by the existing proptest oracle_multi_line test which generates random escape-heavy multi-line NDJSON with 64+ unique fields. https://claude.ai/code/session_01QamNN5H6VX3GE34Y28hqXJ
…ep_raw Address review findings: - Fuzz harness: add UTF-8 validation gate before invoking scanners. All builders use from_utf8_unchecked on extracted values, so non-UTF-8 fuzz input would cause UB. Return early if input fails from_utf8. - ColumnarBatchBuilder: fix Int8 dictionary overflow. The adaptive encoding used Int8Type for cardinality < 256, but i8 max index is 127. Changed threshold to < 128 to stay within i8 range. - Run cargo fmt on ScanBuilder forwarding impls. - Revert ScanConfig::default().keep_raw to true to preserve the existing public API contract. Verified against current code: - >64 field guard: already fixed in prior commit (written_this_row flag) - All 204 tests pass, 0 warnings, cargo fmt clean https://claude.ai/code/session_01QamNN5H6VX3GE34Y28hqXJ
There was a problem hiding this comment.
♻️ Duplicate comments (1)
crates/logfwd-core/src/simd_scanner.rs (1)
303-333:⚠️ Potential issue | 🔴 Critical
ColumnarSimdScannerstill exposes the low-cardinality dictionary overflow.This new path is now public, but
crates/logfwd-core/src/columnar_builder.rsstill choosesStringDictionaryBuilder<Int8Type>forcardinality < 256.Int8Typetops out at 127 dictionary keys, so 128-255 unique strings will panic once this scanner hits that tier. UseUInt8Typefor the first tier or lower the threshold below 128.🛠️ Proposed fix
- use arrow::datatypes::{DataType, Field, Int8Type, Int16Type, Schema}; + use arrow::datatypes::{DataType, Field, Int16Type, Schema, UInt8Type};- /// adaptive dictionary encoding (Dict<Int8>/Dict<Int16>/plain StringArray + /// adaptive dictionary encoding (Dict<UInt8>/Dict<Int16>/plain StringArray- if cardinality < 256 { - let mut builder = StringDictionaryBuilder::<Int8Type>::new(); + if cardinality < 256 { + let mut builder = StringDictionaryBuilder::<UInt8Type>::new(); let mut vi = 0; for row in 0..num_rows { if vi < str_values.len() && str_values[vi].row as usize == row { let vr = str_values[vi]; let bytes = &buf[vr.offset as usize..(vr.offset as usize + vr.len as usize)]; let s = unsafe { std::str::from_utf8_unchecked(bytes) }; builder.append_value(s); vi += 1; } else { builder.append_null(); } } let dict_type = - DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)); + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); schema_fields.push(Field::new(col_name, dict_type, true)); arrays.push(Arc::new(builder.finish()) as ArrayRef);For the Apache Arrow Rust crate version used by this repository, what is the maximum dictionary cardinality supported by `StringDictionaryBuilder<Int8Type>`? If the code wants a 256-entry dictionary tier, should it use `UInt8Type` instead?
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c345a27d-f5d9-494b-a587-4548349fe3d7
📒 Files selected for processing (4)
crates/logfwd-core/src/chunk_classify.rscrates/logfwd-core/src/columnar_builder.rscrates/logfwd-core/src/indexed_builder.rscrates/logfwd-core/src/simd_scanner.rs
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-core/src/columnar_builder.rs (1)
269-271: 🧹 Nitpick | 🔵 TrivialSafety invariant depends on caller providing valid UTF-8.
The
from_utf8_uncheckedcalls throughoutfinish_batch()(lines 270, 342, 361, 381, 399) assume the input buffer contains valid UTF-8. This is enforced by the fuzz harness, but production callers must also guarantee this. Consider adding a doc comment onfinish_batchnoting this requirement.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/columnar_builder.rs` around lines 269 - 271, finish_batch currently uses unsafe std::str::from_utf8_unchecked on field buffers (e.g., the uses with fc.name) and thus requires callers to supply valid UTF-8; add a doc comment on the finish_batch function stating this safety invariant (that all field/name/value buffers must be valid UTF-8) so production callers are aware and the unsafety contract is documented, and reference the places using from_utf8_unchecked (e.g., fc.name and the other spots in finish_batch) in the doc comment to make the requirement explicit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs`:
- Around line 81-91: Add coverage for the compressed IPC path by calling
ColumnarSimdScanner::scan_compressed(...) in the same two cases; construct or
reuse the existing compressed input (IPC + zstd) used elsewhere in tests or
compress `data` into the Arrow IPC + zstd form expected by scan_compressed, then
invoke ColumnarSimdScanner::new(make_extract_all_config(),
128).scan_compressed(compressed_data) and
ColumnarSimdScanner::new(make_pushdown_config(),
128).scan_compressed(compressed_data) and pass each result to
validate_batch(&batch, "columnar_extract_all_compressed") /
validate_batch(&batch, "columnar_pushdown_compressed") so that scan_compressed
is exercised alongside scan.
---
Outside diff comments:
In `@crates/logfwd-core/src/columnar_builder.rs`:
- Around line 269-271: finish_batch currently uses unsafe
std::str::from_utf8_unchecked on field buffers (e.g., the uses with fc.name) and
thus requires callers to supply valid UTF-8; add a doc comment on the
finish_batch function stating this safety invariant (that all field/name/value
buffers must be valid UTF-8) so production callers are aware and the unsafety
contract is documented, and reference the places using from_utf8_unchecked
(e.g., fc.name and the other spots in finish_batch) in the doc comment to make
the requirement explicit.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a735bd97-1e5a-4f6d-946c-e65447fed8d2
📒 Files selected for processing (3)
crates/logfwd-core/fuzz/fuzz_targets/scanner.rscrates/logfwd-core/src/columnar_builder.rscrates/logfwd-core/src/simd_scanner.rs
| // --- Columnar SIMD Scanner --- | ||
| { | ||
| let mut scanner = ColumnarSimdScanner::new(make_extract_all_config(), 128); | ||
| let batch = scanner.scan(data); | ||
| validate_batch(&batch, "columnar_extract_all"); | ||
| } | ||
| { | ||
| let mut scanner = ColumnarSimdScanner::new(make_pushdown_config(), 128); | ||
| let batch = scanner.scan(data); | ||
| validate_batch(&batch, "columnar_pushdown"); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider fuzzing scan_compressed() for completeness.
The ColumnarSimdScanner::scan_compressed() path is not exercised here. While the core scanning logic is shared with scan(), the IPC compression path (zstd + Arrow IPC serialization) could have edge cases worth fuzzing.
💡 Optional: Add scan_compressed coverage
// --- Columnar SIMD Scanner ---
{
let mut scanner = ColumnarSimdScanner::new(make_extract_all_config(), 128);
let batch = scanner.scan(data);
validate_batch(&batch, "columnar_extract_all");
}
{
let mut scanner = ColumnarSimdScanner::new(make_pushdown_config(), 128);
let batch = scanner.scan(data);
validate_batch(&batch, "columnar_pushdown");
}
+ // Compressed IPC path
+ {
+ let mut scanner = ColumnarSimdScanner::new(make_extract_all_config(), 128);
+ let _ = scanner.scan_compressed(data); // validates IPC serialization doesn't panic
+ }
});🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs` around lines 81 - 91, Add
coverage for the compressed IPC path by calling
ColumnarSimdScanner::scan_compressed(...) in the same two cases; construct or
reuse the existing compressed input (IPC + zstd) used elsewhere in tests or
compress `data` into the Arrow IPC + zstd form expected by scan_compressed, then
invoke ColumnarSimdScanner::new(make_extract_all_config(),
128).scan_compressed(compressed_data) and
ColumnarSimdScanner::new(make_pushdown_config(),
128).scan_compressed(compressed_data) and pass each result to
validate_batch(&batch, "columnar_extract_all_compressed") /
validate_batch(&batch, "columnar_pushdown_compressed") so that scan_compressed
is exercised alongside scan.
Summary
Refactored the SIMD scanner to decouple the scanning logic from the builder implementation by introducing a
ScanBuildertrait. This enables code reuse across different builder backends while maintaining performance through inlining.Key Changes
Introduced
ScanBuildertrait: A shared interface for index-based field access that bothIndexedBatchBuilderandColumnarBatchBuilderimplement, allowing the core scan loop to be builder-agnostic.Extracted generic
scan_into()function: Moved the core scanning logic into a generic function parameterized overScanBuilder, eliminating code duplication between different scanner implementations.Added
ColumnarSimdScanner: New scanner usingColumnarBatchBuilderfor zero-copy scanning with adaptive dictionary encoding and optional zstd-compressed Arrow IPC output viascan_compressed().Refactored
SimdScanner: Now usesIndexedBatchBuilder(replacing the oldBatchBuilder) with bitset-tracked fields for ~3x faster builder performance via direct index-based appends instead of HashMap lookups.Updated
RecordBatchOptions: All builders now explicitly set row count when constructingRecordBatchobjects for consistency.Enhanced fuzzing: Extended the scanner fuzzer to exercise both
SimdScannerandColumnarSimdScanneracross extract-all and field pushdown modes.Fixed default config: Changed
ScanConfig::keep_rawdefault fromtruetofalseto match typical use cases.Implementation Details
#[inline(always)]to ensure zero-cost abstraction and allow the compiler to inline the scanning loop.set_buffer()method is optional (default no-op) for builders that don't need zero-copy semantics.https://claude.ai/code/session_01QamNN5H6VX3GE34Y28hqXJ