feat: json/json_int/json_float UDFs for raw-first extraction#645
Conversation
Add json(), json_int(), json_float() DataFusion UDFs that extract fields from raw JSON strings using our SIMD scanner. These enable the raw-first architecture where readers produce _raw lines and SQL handles extraction. UDFs: - json(_raw, 'key') → Utf8 (string value) - json_int(_raw, 'key') → Int64 (NULL if not int) - json_float(_raw, 'key') → Float64 (NULL if not float) Side-by-side benchmark comparing: - Path A (current): scanner → typed columns → SQL - Path B (raw-first): _raw column → json UDFs in SQL Results (50K rows × 50 fields): - Extraction: A=184ms (197 MB/s) vs B=738ms (49 MB/s) - Passthrough: A=165ms (219 MB/s) vs B=6ms (6,098 MB/s) Next step: optimizer rule to merge multiple json() calls sharing the same _raw input into a single parse pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…raction When user SQL contains multiple json()/json_int()/json_float() calls sharing the same _raw column, the preprocess pass: 1. Extracts all referenced field names from the SQL 2. Runs the SIMD scanner once for those fields 3. Adds extracted columns to the batch 4. Rewrites the SQL to use direct column references Benchmark results (10K rows × 100 fields, extracting 10): - Path A (scanner+SQL): 106ms (133 MB/s) - Path B (raw UDFs, N parses): 926ms (15 MB/s) - Path C (raw+preprocess): 44ms (319 MB/s) ← 2.4x faster than A Path C beats Path A at wide schemas because it only parses the fields the SQL references, while Path A parses all 100 fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The shared parse cache approach doesn't work because DataFusion may call UDFs on filtered subsets of rows — the cache holds the full batch but DataFusion expects results matching the filtered row count. UDFs now parse independently per call (correct but slow for many fields). The json_preprocess module remains as the optimization path: it runs the scanner once before DataFusion, adds extracted columns to the batch, and rewrites the SQL to use column references. Benchmark shows preprocess path (C) matches or beats the current scanner pipeline (A) for wide schemas while keeping the raw-first architecture. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Major refactor of the json UDF prototype:
- Deduplicate three separate structs into single `JsonExtractUdf`
parameterized by `JsonExtractMode` enum (Str, Int, Float)
- Fix Utf8View crash: accept Utf8, Utf8View, LargeUtf8 in signatures
and coerce to StringArray before processing
- Fix row-count mismatch: validate scanner output rows match input
- Propagate scanner errors as DataFusionError::Execution instead of
silently returning nulls
- Remove HashMap allocation in hot path — return RecordBatch directly
- Delete json_preprocess.rs (SQL string rewriting too fragile)
Add 38 integration tests covering:
- Basic extraction (string, int, float)
- Type coercion (string-to-int parsing, int-to-float)
- Null handling (NULL rows, missing fields, mixed)
- Edge cases (empty {}, non-JSON, malformed, 12KB strings, unicode,
nested objects, arrays, booleans, JSON null, duplicate keys,
large integers, negative numbers, scientific notation)
- WHERE clause integration
- SELECT * passthrough
- Utf8View input compatibility
Known issues documented in tests:
- NULL _raw rows cause scanner row-count mismatch
- json_int cannot parse quoted-string numeric values ("200")
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughThis change adds three DataFusion scalar UDFs— Possibly related PRs
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-transform/Cargo.toml`:
- Around line 22-27: Remove the redundant dev-dependency entry for bytes in
Cargo.toml: delete the bytes = "1" line under [dev-dependencies] because bytes
is already declared under [dependencies]; update the file so only the single
bytes = "1" in the main dependencies block remains.
In `@crates/logfwd-transform/src/udf/json_extract.rs`:
- Around line 232-247: Replace the unsafe .unwrap() on the downcasted array with
a failure-aware message: after casting with arrow::compute::cast(&arr,
&DataType::Utf8) change the .downcast_ref::<StringArray>().unwrap() call to
.downcast_ref::<StringArray>().expect("expected Utf8 cast to produce StringArray
in json_extract parsing") (or return a proper Err if preferred) so failures
include a clear diagnostic; update the same occurrence in the json extraction
code path where str_arr is defined to avoid panics in production.
- Around line 59-67: The suffix_order method contains dead empty-string entries
that will never match any scanner output; update suffix_order (the match arms
for Self::Str, Self::Int, Self::Float) to remove the "" entries so each arm only
returns the actual suffixes the scanner emits (e.g., Self::Str -> &["_str",
"_int", "_float"], Self::Int -> &["_int"], Self::Float -> &["_float", "_int"]).
Keep the function signature and return type unchanged but adjust the returned
static slices to omit the empty-string placeholders.
- Around line 74-109: Add a brief doc comment above the parse_raw function
explaining that the post-scan row-count validation (the comparison between
batch.num_rows() and raw_array.len()) intentionally fails when the `_raw` column
contains NULLs; mention that parse_raw reconstructs NDJSON from raw_array, NULL
entries become empty lines and thus cause a mismatch and that this is a known
limitation covered by tests. Reference parse_raw, raw_array, and the
batch.num_rows() check so future maintainers understand this is intentional
rather than a bug.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 20fed77d-b6a7-48e8-97ea-f63941c28bdd
📒 Files selected for processing (6)
crates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/src/lib.rscrates/logfwd-transform/src/udf/json_extract.rscrates/logfwd-transform/src/udf/mod.rscrates/logfwd-transform/tests/json_udf_tests.rscrates/logfwd-transform/tests/raw_first_bench.rs
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-transform/tests/json_udf_tests.rs`:
- Around line 275-279: The test currently asserts only that result.is_err()
which is too broad; update the assertion in
crates/logfwd-transform/tests/json_udf_tests.rs (both occurrences around the
result variable at the shown blocks and the similar block at 792-795) to assert
the error is the specific DataFusion execution error containing the scanner
row-count mismatch message (the DataFusionError::Execution text "scanner row
count mismatch: got {} rows, expected {}"). Concretely, unwrap the Err from
result, convert it to its display/string form and assert that it contains (or
equals) the "scanner row count mismatch" substring so the test only passes for
that known limitation and not any unrelated failure.
- Around line 79-84: The helper function `query` currently uses `.find()` on
`batches` which returns only the first non-empty DataFusion batch and drops
subsequent non-empty batches; update `query` (the async function that builds
`ctx` via `make_ctx`, calls `ctx.sql(sql).await.unwrap()`, and collects batches
with `df.collect().await.unwrap()`) to filter for all non-empty batches, and
then concatenate them into a single RecordBatch (using the Arrow/DataFusion
concat utility that accepts a schema and slice of RecordBatches) returning
Some(concatenated_batch) or None if there are no non-empty batches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f5498ebf-0942-4611-953d-3eca7c31f2c1
📒 Files selected for processing (1)
crates/logfwd-transform/tests/json_udf_tests.rs
- Remove redundant bytes dev-dependency (already in [dependencies]) - Remove dead empty-string entries from suffix_order (scanner never emits bare column names) - Replace .unwrap() on downcast with .expect() for clearer diagnostics in Int/Float paths - Add doc comment to parse_raw explaining the NULL _raw row-count mismatch limitation - Fix query() test helper to concat all non-empty batches instead of dropping later batches - Tighten null_raw_row_causes_scanner_mismatch and mixed_valid_invalid_null_rows_errors to assert the specific "scanner row count mismatch" error text, not just is_err() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-transform/src/udf/json_extract.rs`:
- Line 245: Fix the typo in the inline comment inside
crates/logfwd-transform/src/udf/json_extract.rs where the comment currently
reads "unparseable → null."; change "unparseable" to "unparsable" so the comment
reads "unparsable → null." (look for the comment in the json_extract.rs code
block that explains parsing strings as i64).
- Around line 221-228: The branch checking suffix.is_empty() in the closure used
to compute col is dead because mode.suffix_order() never yields empty strings;
simplify the closure in json_extract.rs by removing that branch and always
building col_name with format!("{key}{suffix}") (or string concatenation) before
calling batch.column_by_name(&col_name).map(Arc::clone), referencing the
existing closure that assigns to let col and using mode.suffix_order(), key,
suffix, and batch.column_by_name to locate the column.
In `@crates/logfwd-transform/tests/json_udf_tests.rs`:
- Line 173: Replace the non-standard spelling "unparseable" with "unparsable" in
the comment text and the test identifier: update the inline comment that reads
"// _int column first. The scanner writes 0 for unparseable-as-int strings" to
use "unparsable", and rename the test function whose name contains "unparseable"
(the test at the other flagged location) to use "unparsable" in its identifier
so both the comment and the test name consistently use the standard spelling.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: abade759-35a5-462c-916b-5fb3afab984b
📒 Files selected for processing (3)
crates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/src/udf/json_extract.rscrates/logfwd-transform/tests/json_udf_tests.rs
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
suffix_order() no longer returns any "" entries so the is_empty()
guard is unreachable. Simplify to format!("{key}{suffix}") directly.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
json(_raw, 'key'),json_int(_raw, 'key'),json_float(_raw, 'key')scalar UDFs backed by our SIMD scannerSqlTransform::ensure_context()so they're available in all user SQLlogfwd-arrow+bytesdeps tologfwd-transform(needed by UDF impl)raw_first_bench.rs) comparing the current scanner-first path vs raw-first pathMotivation
This is the prototype for the raw-first architecture: readers store the original JSON line in a
_rawcolumn, and SQL extracts fields on demand via these UDFs. This eliminates the type-suffix column naming problem entirely for users who don't need pre-extracted typed columns.Known limitations (tracked for follow-up)
json(_raw, ...)call re-parses_rawindependently — N fields = N scans. A DataFusionOptimizerRulethat batches alljson*(_raw, ...)calls into a single scan is the planned fix._rawrows cause a scanner row-count mismatch error (scanner skips blank lines).json_int(_raw, 'key')on quoted-string numerics (e.g."200") returns NULL rather than parsing the string.Test plan
cargo test -p logfwd-transform --test json_udf_tests— 38 tests passcargo test -p logfwd-transform— existing lib tests still passcargo clippy -p logfwd-transform -- -D warnings— clean🤖 Generated with Claude Code