From 4250c91fbada35c970041cc63e3a45bb4fa5e3a2 Mon Sep 17 00:00:00 2001 From: strawgate Date: Sat, 18 Apr 2026 20:30:59 -0500 Subject: [PATCH 1/4] refactor: route csv enrichment through columnar builder --- crates/logfwd-transform/src/enrichment.rs | 234 +++++++++++++++++++--- 1 file changed, 203 insertions(+), 31 deletions(-) diff --git a/crates/logfwd-transform/src/enrichment.rs b/crates/logfwd-transform/src/enrichment.rs index 91dcef556..653ccb4d9 100644 --- a/crates/logfwd-transform/src/enrichment.rs +++ b/crates/logfwd-transform/src/enrichment.rs @@ -9,9 +9,11 @@ use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use arrow::array::StringArray; +use arrow::array::{ArrayRef, StringArray, new_null_array}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; +use logfwd_arrow::columnar::builder::ColumnarBatchBuilder; +use logfwd_arrow::columnar::plan::{BatchPlan, FieldKind}; use crate::TransformError; @@ -308,7 +310,7 @@ fn build_k8s_batch(entries: &[K8sPodEntry]) -> RecordBatch { // File-based lookup table (CSV) // --------------------------------------------------------------------------- -/// A lookup table loaded from a CSV file. All columns are Utf8. +/// A lookup table loaded from a CSV file. All columns are Utf8View. /// Supports periodic refresh via `reload()`. /// /// ```yaml @@ -372,7 +374,11 @@ impl EnrichmentTable for CsvFileTable { } } -/// Read a CSV into an Arrow RecordBatch. All columns are Utf8. +/// Read a CSV into an Arrow RecordBatch through the shared columnar builder. +/// +/// CSV parsing semantics stay here: delimiter/quote/header handling and row +/// alignment are owned by the CSV reader. `ColumnarBatchBuilder` owns string +/// storage, sparse padding, and Arrow finalization. fn read_csv_to_batch(reader: R) -> Result { let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); @@ -401,7 +407,108 @@ fn read_csv_to_batch(reader: R) -> Result, _>>()?; + let mut builder = ColumnarBatchBuilder::new(plan); + builder.set_dedup_enabled(false); + builder.begin_batch(); + + for (row_idx, result) in csv_reader.records().enumerate() { + let record = + result.map_err(|e| TransformError::Enrichment(format!("CSV parse error: {e}")))?; + if record.len() > num_cols { + return Err(TransformError::Enrichment(format!( + "CSV row {} has {} fields, expected {} (header count)", + row_idx + 1, + record.len(), + num_cols, + ))); + } + + builder.begin_row(); + for (idx, handle) in handles.iter().enumerate() { + if let Some(field) = record.get(idx) { + builder + .write_str(*handle, field) + .map_err(|e| TransformError::Enrichment(format!("CSV builder error: {e}")))?; + } else { + builder.write_null(*handle); + } + } + builder.end_row(); + } + + let batch = builder + .finish_batch() + .map_err(|e| TransformError::Enrichment(format!("CSV builder error: {e}")))?; + restore_csv_header_columns(&headers, batch) +} + +fn restore_csv_header_columns( + headers: &[String], + batch: RecordBatch, +) -> Result { + let mut fields = Vec::with_capacity(headers.len()); + let mut arrays = Vec::with_capacity(headers.len()); + for header in headers { + match batch + .schema() + .fields() + .iter() + .position(|field| field.name() == header) + { + Some(idx) => { + fields.push(Arc::clone(&batch.schema().fields()[idx])); + arrays.push(Arc::clone(batch.column(idx))); + } + None => { + fields.push(Arc::new(Field::new(header, DataType::Utf8View, true))); + arrays.push(new_null_array(&DataType::Utf8View, batch.num_rows()) as ArrayRef); + } + } + } + + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(TransformError::Arrow) +} + +#[cfg(test)] +fn read_csv_to_legacy_batch_for_test( + reader: R, +) -> Result { + let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); + + let headers: Vec = csv_reader + .headers() + .map_err(|e| TransformError::Enrichment(format!("CSV header error: {e}")))? + .iter() + .map(ToString::to_string) + .collect(); + + if headers.is_empty() { + return Err(TransformError::Enrichment("CSV has no columns".to_string())); + } + + let mut seen = HashSet::with_capacity(headers.len()); + for h in &headers { + if h.is_empty() { + return Err(TransformError::Enrichment( + "CSV has an empty header name".to_string(), + )); + } + if !seen.insert(h) { + return Err(TransformError::Enrichment(format!( + "CSV has duplicate header name: {h}" + ))); + } + } + let num_cols = headers.len(); let mut columns: Vec>> = vec![Vec::new(); num_cols]; @@ -419,7 +526,6 @@ fn read_csv_to_batch(reader: R) -> Result(reader: R) -> Result> = columns + let arrays: Vec = columns .iter() .map(|col| { let arr: StringArray = col.iter().map(|s| s.as_deref()).collect(); - Arc::new(arr) as _ + Arc::new(arr) as ArrayRef }) .collect(); @@ -1469,6 +1575,18 @@ mod tests { use super::*; use arrow::array::Array; + fn csv_string_column<'a>( + batch: &'a RecordBatch, + name: &str, + ) -> &'a arrow::array::StringViewArray { + batch + .column_by_name(name) + .unwrap_or_else(|| panic!("missing CSV column: {name}")) + .as_any() + .downcast_ref::() + .unwrap_or_else(|| panic!("CSV column {name} should be Utf8View")) + } + // -- CRI path parsing --------------------------------------------------- #[test] @@ -1671,22 +1789,13 @@ mod tests { let batch = table.snapshot().unwrap(); assert_eq!(batch.num_rows(), 2); assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View); - let hostname = batch - .column_by_name("hostname") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); + let hostname = csv_string_column(&batch, "hostname"); assert_eq!(hostname.value(0), "web-1"); assert_eq!(hostname.value(1), "api-2"); - let team = batch - .column_by_name("team") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); + let team = csv_string_column(&batch, "team"); assert_eq!(team.value(0), "platform"); assert_eq!(team.value(1), "backend"); } @@ -1698,16 +1807,84 @@ mod tests { table.load_from_reader(&csv_data[..]).unwrap(); let batch = table.snapshot().unwrap(); assert_eq!(batch.num_rows(), 2); - let c = batch - .column_by_name("c") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); + let c = csv_string_column(&batch, "c"); assert_eq!(c.value(0), "3"); assert!(c.is_null(1)); // padded with NULL } + #[test] + fn csv_empty_cells_are_empty_strings() { + let csv_data = b"a,b,c\n1,,3\n,5,\n"; + let table = CsvFileTable::new("t", "/fake"); + table.load_from_reader(&csv_data[..]).unwrap(); + let batch = table.snapshot().unwrap(); + + let a = csv_string_column(&batch, "a"); + let b = csv_string_column(&batch, "b"); + let c = csv_string_column(&batch, "c"); + assert_eq!(a.value(1), ""); + assert_eq!(b.value(0), ""); + assert_eq!(c.value(1), ""); + } + + #[test] + fn csv_quoted_commas_stay_in_cell() { + let csv_data = b"host,note\nweb-1,\"hello,team\"\napi-2,\"x,y,z\"\n"; + let table = CsvFileTable::new("t", "/fake"); + table.load_from_reader(&csv_data[..]).unwrap(); + let batch = table.snapshot().unwrap(); + + let note = csv_string_column(&batch, "note"); + assert_eq!(note.value(0), "hello,team"); + assert_eq!(note.value(1), "x,y,z"); + } + + #[test] + fn csv_all_missing_column_preserves_header_as_nulls() { + let csv_data = b"a,b\n1\n2\n"; + let table = CsvFileTable::new("t", "/fake"); + table.load_from_reader(&csv_data[..]).unwrap(); + let batch = table.snapshot().unwrap(); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.schema().field(1).name(), "b"); + assert_eq!(batch.schema().field(1).data_type(), &DataType::Utf8View); + let b = csv_string_column(&batch, "b"); + assert!(b.is_null(0)); + assert!(b.is_null(1)); + } + + #[test] + fn csv_columnar_builder_matches_legacy_values() { + let csv_data = b"a,b,c\n1,2,3\n4,,\n5\n"; + let columnar = read_csv_to_batch(&csv_data[..]).unwrap(); + let legacy = read_csv_to_legacy_batch_for_test(&csv_data[..]).unwrap(); + + assert_eq!(columnar.num_rows(), legacy.num_rows()); + assert_eq!(columnar.num_columns(), legacy.num_columns()); + for field in legacy.schema().fields() { + let name = field.name(); + let actual = csv_string_column(&columnar, name); + let expected = legacy + .column_by_name(name) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..legacy.num_rows() { + assert_eq!( + actual.is_null(row), + expected.is_null(row), + "null mismatch for {name} row {row}", + ); + if !expected.is_null(row) { + assert_eq!(actual.value(row), expected.value(row)); + } + } + } + } + #[test] fn csv_empty_file_fails() { let table = CsvFileTable::new("t", "/fake"); @@ -1756,12 +1933,7 @@ mod tests { assert_eq!(rows, 2); let batch = table.snapshot().unwrap(); - let region = batch - .column_by_name("region") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); + let region = csv_string_column(&batch, "region"); assert_eq!(region.value(0), "us-east"); assert_eq!(region.value(1), "eu-west"); } From a778140d05a9342e8826a7fd08be21f749e4e006 Mon Sep 17 00:00:00 2001 From: strawgate Date: Sat, 18 Apr 2026 20:46:16 -0500 Subject: [PATCH 2/4] test: update enrichment join for utf8view csv columns --- crates/logfwd/tests/it/integration.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/logfwd/tests/it/integration.rs b/crates/logfwd/tests/it/integration.rs index 6b6c402b6..5c86daabb 100644 --- a/crates/logfwd/tests/it/integration.rs +++ b/crates/logfwd/tests/it/integration.rs @@ -454,12 +454,12 @@ fn test_enrichment_join() { // Spot-check values: both "auth" rows should map to "platform". let team_col = result.column_by_name("team").expect("team column missing"); - use arrow::array::StringArray; - // The CSV enrichment table stores columns as DataType::Utf8 (StringArray). + use arrow::array::StringViewArray; + // CSV enrichment tables store string columns as Utf8View. let team_arr = team_col .as_any() - .downcast_ref::() - .expect("team column should be DataType::Utf8"); + .downcast_ref::() + .expect("team column should be DataType::Utf8View"); let teams: Vec<&str> = team_arr.iter().map(|v| v.unwrap_or("")).collect(); assert!( teams.contains(&"platform"), From 64bd0521c5edb0cf241ef2beb83b96184623d41b Mon Sep 17 00:00:00 2001 From: strawgate Date: Sat, 18 Apr 2026 20:57:06 -0500 Subject: [PATCH 3/4] refactor: share csv header validation --- crates/logfwd-transform/src/enrichment.rs | 112 ++++++++++------------ 1 file changed, 52 insertions(+), 60 deletions(-) diff --git a/crates/logfwd-transform/src/enrichment.rs b/crates/logfwd-transform/src/enrichment.rs index 653ccb4d9..7c2e896ef 100644 --- a/crates/logfwd-transform/src/enrichment.rs +++ b/crates/logfwd-transform/src/enrichment.rs @@ -4,12 +4,14 @@ //! Each provider produces an Arrow RecordBatch representing a lookup table. //! The SqlTransform registers these as MemTables so users can JOIN against them. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use arrow::array::{ArrayRef, StringArray, new_null_array}; +#[cfg(test)] +use arrow::array::ArrayRef; +use arrow::array::{StringArray, new_null_array}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use logfwd_arrow::columnar::builder::ColumnarBatchBuilder; @@ -382,30 +384,7 @@ impl EnrichmentTable for CsvFileTable { fn read_csv_to_batch(reader: R) -> Result { let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); - let headers: Vec = csv_reader - .headers() - .map_err(|e| TransformError::Enrichment(format!("CSV header error: {e}")))? - .iter() - .map(ToString::to_string) - .collect(); - - if headers.is_empty() { - return Err(TransformError::Enrichment("CSV has no columns".to_string())); - } - - let mut seen = HashSet::with_capacity(headers.len()); - for h in &headers { - if h.is_empty() { - return Err(TransformError::Enrichment( - "CSV has an empty header name".to_string(), - )); - } - if !seen.insert(h) { - return Err(TransformError::Enrichment(format!( - "CSV has duplicate header name: {h}" - ))); - } - } + let headers = read_csv_headers(&mut csv_reader)?; let num_cols = headers.len(); let mut plan = BatchPlan::with_capacity(num_cols); @@ -433,13 +412,12 @@ fn read_csv_to_batch(reader: R) -> Result Result { + let schema = batch.schema(); + let index_by_name: HashMap<&str, usize> = schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| (field.name().as_str(), idx)) + .collect(); + let mut fields = Vec::with_capacity(headers.len()); let mut arrays = Vec::with_capacity(headers.len()); for header in headers { - match batch - .schema() - .fields() - .iter() - .position(|field| field.name() == header) - { + match index_by_name.get(header.as_str()).copied() { Some(idx) => { - fields.push(Arc::clone(&batch.schema().fields()[idx])); + fields.push(Arc::clone(&schema.fields()[idx])); arrays.push(Arc::clone(batch.column(idx))); } None => { fields.push(Arc::new(Field::new(header, DataType::Utf8View, true))); - arrays.push(new_null_array(&DataType::Utf8View, batch.num_rows()) as ArrayRef); + arrays.push(new_null_array(&DataType::Utf8View, batch.num_rows())); } } } @@ -484,30 +465,7 @@ fn read_csv_to_legacy_batch_for_test( ) -> Result { let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); - let headers: Vec = csv_reader - .headers() - .map_err(|e| TransformError::Enrichment(format!("CSV header error: {e}")))? - .iter() - .map(ToString::to_string) - .collect(); - - if headers.is_empty() { - return Err(TransformError::Enrichment("CSV has no columns".to_string())); - } - - let mut seen = HashSet::with_capacity(headers.len()); - for h in &headers { - if h.is_empty() { - return Err(TransformError::Enrichment( - "CSV has an empty header name".to_string(), - )); - } - if !seen.insert(h) { - return Err(TransformError::Enrichment(format!( - "CSV has duplicate header name: {h}" - ))); - } - } + let headers = read_csv_headers(&mut csv_reader)?; let num_cols = headers.len(); let mut columns: Vec>> = vec![Vec::new(); num_cols]; @@ -548,6 +506,40 @@ fn read_csv_to_legacy_batch_for_test( RecordBatch::try_new(schema, arrays).map_err(TransformError::Arrow) } +fn read_csv_headers( + csv_reader: &mut csv::Reader, +) -> Result, TransformError> { + let headers: Vec = csv_reader + .headers() + .map_err(|e| TransformError::Enrichment(format!("CSV header error: {e}")))? + .iter() + .map(ToString::to_string) + .collect(); + validate_csv_headers(&headers)?; + Ok(headers) +} + +fn validate_csv_headers(headers: &[String]) -> Result<(), TransformError> { + if headers.is_empty() { + return Err(TransformError::Enrichment("CSV has no columns".to_string())); + } + + let mut seen = HashSet::with_capacity(headers.len()); + for h in headers { + if h.is_empty() { + return Err(TransformError::Enrichment( + "CSV has an empty header name".to_string(), + )); + } + if !seen.insert(h) { + return Err(TransformError::Enrichment(format!( + "CSV has duplicate header name: {h}" + ))); + } + } + Ok(()) +} + // --------------------------------------------------------------------------- // JSON Lines file-based lookup table // --------------------------------------------------------------------------- From 6ec9437be0dce2cc36065b449ccd489a8e0153a8 Mon Sep 17 00:00:00 2001 From: strawgate Date: Sat, 18 Apr 2026 21:01:35 -0500 Subject: [PATCH 4/4] docs: document csv utf8view enrichment output --- book/src/content/docs/configuration/reference.mdx | 6 ++++-- crates/logfwd-transform/src/enrichment.rs | 8 ++++++-- dev-docs/ARCHITECTURE.md | 6 ++++++ dev-docs/DESIGN.md | 7 +++++-- dev-docs/VERIFICATION.md | 1 + 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/book/src/content/docs/configuration/reference.mdx b/book/src/content/docs/configuration/reference.mdx index c5ff08442..abf68b0a6 100644 --- a/book/src/content/docs/configuration/reference.mdx +++ b/book/src/content/docs/configuration/reference.mdx @@ -804,8 +804,10 @@ Useful for `/etc/os-release`, `.env` files, or ConfigMap-mounted metadata files. ### `csv` enrichment -A multi-row lookup table loaded from a CSV file. All columns are UTF-8 strings. -The first row must be column headers. +A multi-row lookup table loaded from a CSV file. All columns are UTF-8 strings +and are materialized internally as Arrow `Utf8View` columns for SQL execution. +The first row must be column headers. Empty cells are empty strings; missing +trailing cells are `NULL`. ```yaml enrichment: diff --git a/crates/logfwd-transform/src/enrichment.rs b/crates/logfwd-transform/src/enrichment.rs index 7c2e896ef..1162ffb0e 100644 --- a/crates/logfwd-transform/src/enrichment.rs +++ b/crates/logfwd-transform/src/enrichment.rs @@ -312,7 +312,7 @@ fn build_k8s_batch(entries: &[K8sPodEntry]) -> RecordBatch { // File-based lookup table (CSV) // --------------------------------------------------------------------------- -/// A lookup table loaded from a CSV file. All columns are Utf8View. +/// A lookup table loaded from a CSV file. All columns are nullable Utf8View. /// Supports periodic refresh via `reload()`. /// /// ```yaml @@ -340,6 +340,9 @@ impl CsvFileTable { } /// Load the file from a reader (useful for testing). + /// + /// The stored batch preserves CSV header order as nullable `Utf8View` + /// columns. Empty cells are empty strings; missing trailing cells are nulls. pub fn load_from_reader(&self, reader: R) -> Result { let batch = read_csv_to_batch(reader)?; let num_rows = batch.num_rows(); @@ -380,7 +383,8 @@ impl EnrichmentTable for CsvFileTable { /// /// CSV parsing semantics stay here: delimiter/quote/header handling and row /// alignment are owned by the CSV reader. `ColumnarBatchBuilder` owns string -/// storage, sparse padding, and Arrow finalization. +/// storage, sparse padding, and Arrow finalization into nullable Utf8View +/// columns. fn read_csv_to_batch(reader: R) -> Result { let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); diff --git a/dev-docs/ARCHITECTURE.md b/dev-docs/ARCHITECTURE.md index c0cbfd544..2b3430389 100644 --- a/dev-docs/ARCHITECTURE.md +++ b/dev-docs/ARCHITECTURE.md @@ -199,6 +199,12 @@ scanner only extracts fields the SQL actually uses. Custom UDFs: `int()`, `float()`, `regexp_extract()`, `grok()`, `geo_lookup()`. +CSV enrichment tables are parsed in `logfwd-transform` and materialized through +`ColumnarBatchBuilder` as nullable `Utf8View` columns. Delimiter, quote, header, +duplicate-header, and row-alignment semantics remain local to the transform +crate; `logfwd-arrow` only owns the shared columnar builder and Arrow +finalization mechanics. + ### 7. Output: RecordBatch → wire format ``` diff --git a/dev-docs/DESIGN.md b/dev-docs/DESIGN.md index da0ac0d12..c9f48666e 100644 --- a/dev-docs/DESIGN.md +++ b/dev-docs/DESIGN.md @@ -26,6 +26,8 @@ logfwd-io Produces RecordBatch from external sources. logfwd-transform RecordBatch → RecordBatch via DataFusion SQL. UDFs, enrichment tables, JOINs. + CSV enrichment tables produce Utf8View columns via the + shared columnar builder. logfwd-output Consumes RecordBatch, sends externally. OTLP, Arrow IPC, JSON lines. Parquet/ClickHouse are planned. @@ -177,8 +179,9 @@ field has multiple types within a batch. **Core requirement — round-trip type fidelity.** A field that enters as `int 200` must exit as `int 200`. No type promotion across documents. **Core requirement — clean schemas stay clean.** OTLP, Arrow IPC, CSV, and consistent JSON -produce bare column names with native types. The suffix machinery only activates when there -is an actual per-row type conflict. +produce bare column names with native types. CSV enrichment columns are bare nullable +`Utf8View` string columns. The suffix machinery only activates when there is an actual +per-row type conflict. Output always serializes with the bare JSON key, dispatching on DataType. A view layer provides bare-name access for suffixed columns. diff --git a/dev-docs/VERIFICATION.md b/dev-docs/VERIFICATION.md index bae82afed..cadeb0263 100644 --- a/dev-docs/VERIFICATION.md +++ b/dev-docs/VERIFICATION.md @@ -359,6 +359,7 @@ logfwd-core is the proven kernel. All rules are CI-enforced. | `logfwd-io/udp_input.rs` | UDP bounded-drain predicate (`should_stop_udp_drain`) and per-poll datagram work cap | Kani (3 proofs: zero counters, independent caps, predicate equivalence) + unit tests + proptest predicate equivalence + bounded-drain/recovery regression | | `logfwd-output/elasticsearch.rs` | Elasticsearch bulk NDJSON serialization + timestamp suffix writer | Unit tests + proptest oracle checks (serialize_batch and timestamp suffix fast-vs-simple equivalence) + ignored release microbench guardrails | | `logfwd-output/otlp_sink.rs` | OTLP sink row encoder and generated-fast encoder parity | Unit tests + proptest oracle check (generated-fast vs handwritten encoder equivalence on random UTF-8 rows) | +| `logfwd-transform/enrichment.rs` | CSV enrichment parsing, header validation, nullable `Utf8View` batch production, and DataFusion join integration | Unit tests for CSV edge cases and legacy-value/null parity + integration test for `Utf8View` join output. Kani is not required because this path is heap-heavy CSV/DataFusion integration rather than a bounded pure seam. | | `logfwd-io/polling_input_health.rs` | Polling-input source health reducer for tail/TCP/UDP (`healthy`, backpressure, error-backoff) | Kani exhaustive (3 proofs) + unit tests + proptest sequence checks | | `logfwd-io/receiver_health.rs` | Standalone receiver health reducer (`noop`, backpressure, fatal, shutdown) | Kani exhaustive (6 proofs) + unit tests + proptest sequence checks | | `logfwd-io/format.rs` | CRI metadata injection, Auto-mode fallthrough to passthrough | Kani (4 proofs: inject_cri_metadata output structure, JSON vs plain-text path dispatch) |