Add enrichment table framework: K8s path parser, host info, static labels#38
Conversation
…c tables Introduces the EnrichmentTable trait and three providers that produce Arrow RecordBatches for use as DataFusion lookup tables: - StaticTable: fixed key-value pairs from config (one row) - HostInfoTable: hostname, os_type, os_arch (resolved at startup) - K8sPathTable: parses CRI log paths into namespace, pod_name, pod_uid, container_name (zero K8s API calls, zero deps) SqlTransform gains add_enrichment_table() — registered tables appear alongside `logs` in each DataFusion session. Users can JOIN or CROSS JOIN them in their SQL transforms: SELECT logs.*, env.environment FROM logs CROSS JOIN env SELECT logs.*, k8s.namespace, k8s.pod_name FROM logs LEFT JOIN k8s_pods AS k8s ON logs._source LIKE k8s.log_path_prefix || '%' The K8s path parser handles standard CRI log paths: /var/log/pods/<ns>_<pod>_<uid>/<container>/N.log Design: providers store data behind Arc<RwLock<RecordBatch>>. The pipeline reads a snapshot once per batch (single RwLock read). Background refresh (e.g., K8s API watcher) can update the table without blocking the hot loop. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a new public Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-core/src/enrichment.rs`:
- Around line 254-264: The computed log_path_prefix can miss a trailing '/' when
container_end == after_pod_dir.len(), causing ambiguous prefixes; update the
logic in enrichment.rs (around after_pod_dir, container_end, prefix_end,
log_path_prefix) to ensure the prefix always ends with a '/' for container
directories (e.g., detect the missing slash and append one when building
log_path_prefix) so LIKE-based JOINs match only entries under that container and
not similarly named sibling directories.
- Around line 51-68: StaticTable::new currently allows an empty labels slice
which produces a RecordBatch with zero columns (1 row, 0 columns); validate the
input and either return an Err or panic with a clear message when
labels.is_empty(), or document and explicitly handle the empty case by creating
at least one placeholder column. Update the StaticTable::new function to check
labels.is_empty() at the top and then either (a) return a Result and propagate
an error, or (b) panic with a descriptive message like "labels must not be
empty" before building fields, schema, columns, and calling
RecordBatch::try_new; reference StaticTable::new, labels, Field::new,
StringArray, schema, columns, and RecordBatch::try_new when making the change.
In `@crates/logfwd-transform/src/lib.rs`:
- Around line 439-451: When iterating self.enrichment_tables in the block
containing snapshot(), add a debug/trace log when snapshot() returns None so
operators can see why a table was skipped; specifically, inside the for et in
&self.enrichment_tables loop, after the if let Some(snapshot) = et.snapshot()
branch, add a tracing/log::debug call that references et.name() and indicates
the enrichment table was empty and skipped (use the same logging facility used
elsewhere in this crate, e.g. tracing::debug! or log::debug!, to keep
consistency), leaving the existing MemTable::try_new and ctx.register_table
logic unchanged.
- Around line 390-397: The add_enrichment_table method currently blindly pushes
the provided table into self.enrichment_tables which can lead to a confusing
failure later when register_table is called if two tables share the same name;
update add_enrichment_table to check the new table's name() against existing
entries in self.enrichment_tables and handle duplicates early—either (A)
deduplicate by skipping the push and emit a clear warning/log mentioning the
duplicate name, or (B) change the signature to return Result and return an Err
with a descriptive message (including the duplicate name) so callers get
immediate feedback; reference add_enrichment_table, self.enrichment_tables, and
EnrichmentTable::name() when making the change.
- Around line 772-801: The test named test_enrichment_left_join does not
exercise a JOIN because the SQL passed to SqlTransform::new is "SELECT
logs.msg_str, logs.source_str FROM logs" and never references the enrichment
table added via transform.add_enrichment_table(teams); change the test to either
rename it to reflect that the enrichment table is unused (e.g.,
test_enrichment_table_unused_no_error) or modify the SQL to perform a LEFT JOIN
against the enrichment table (reference the StaticTable named teams) — for
example update the SQL passed to SqlTransform::new to include a LEFT JOIN on
teams using logs.source_str = teams.source (or the appropriate column names) so
the test actually validates the join behavior while keeping the rest of the test
(batch, arrays, and assertions) intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 97bff41d-cbb3-4194-ac8a-8affbd5609d5
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
crates/logfwd-core/Cargo.tomlcrates/logfwd-core/src/enrichment.rscrates/logfwd-core/src/lib.rscrates/logfwd-transform/src/lib.rs
| // Register enrichment tables (snapshots from background providers). | ||
| for et in &self.enrichment_tables { | ||
| if let Some(snapshot) = et.snapshot() { | ||
| let et_table = MemTable::try_new(snapshot.schema(), vec![vec![snapshot]]) | ||
| .map_err(|e| { | ||
| format!("Failed to create enrichment table '{}': {e}", et.name()) | ||
| })?; | ||
| ctx.register_table(et.name(), Arc::new(et_table)) | ||
| .map_err(|e| { | ||
| format!("Failed to register enrichment table '{}': {e}", et.name()) | ||
| })?; | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Silent skip when snapshot() returns None.
Tables like K8sPathTable start empty and won't be registered until populated. This is intentional, but operators may wonder why JOINs fail. A debug/trace log when skipping could aid troubleshooting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-transform/src/lib.rs` around lines 439 - 451, When iterating
self.enrichment_tables in the block containing snapshot(), add a debug/trace log
when snapshot() returns None so operators can see why a table was skipped;
specifically, inside the for et in &self.enrichment_tables loop, after the if
let Some(snapshot) = et.snapshot() branch, add a tracing/log::debug call that
references et.name() and indicates the enrichment table was empty and skipped
(use the same logging facility used elsewhere in this crate, e.g.
tracing::debug! or log::debug!, to keep consistency), leaving the existing
MemTable::try_new and ctx.register_table logic unchanged.
New enrichment providers: - CsvFileTable: loads a CSV file into an Arrow RecordBatch (all Utf8 columns). Supports reload() for periodic refresh. Handles missing fields gracefully (flexible mode). - JsonLinesFileTable: loads JSON Lines (one object per line) with union schema discovery across all rows. Non-string values are stringified. Null/missing fields produce Arrow nulls. Test coverage expanded from 7 to 25 tests: - CRI path parsing: added hyphenated namespaces, underscored pod names, empty/edge-case paths - K8s path table: added initially-empty, ignores-invalid, refresh-replaces - CSV: basic load, missing fields, empty file, reload, from-disk - JSON Lines: basic, union schema, non-string values, blank lines, empty - Static/host: single label, all columns present - Trait dispatch: Box<dyn EnrichmentTable> across all providers - Concurrent access: writer thread + reader thread on K8sPathTable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fixes from CodeRabbit review: 1. StaticTable::new() now panics on empty labels (prevents useless 0-column table that would confuse JOIN queries). 2. CRI path parser always produces a trailing '/' on log_path_prefix, preventing LIKE-based JOIN mismatches with similarly-named containers (e.g., "container" vs "container-extra"). 3. add_enrichment_table() validates name uniqueness and rejects "logs" (reserved). Duplicate names panic early with a clear message instead of failing with a confusing DataFusion error at query time. 4. Added eprintln warning when an enrichment table snapshot is None (not yet loaded), so operators can diagnose why JOINs return no enrichment data. 5. Replaced misleading test_enrichment_left_join (which never actually JOINed) with two focused tests: test_enrichment_unused_table_no_error and test_enrichment_empty_table_skipped. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-core/src/enrichment.rs`:
- Line 332: The CSV reader currently uses
csv::ReaderBuilder::new().flexible(true) which silently drops overflow fields;
change this to not be flexible (remove flexible(true) or set flexible(false))
and add an explicit validation after reading each record (check record.len()
against num_cols) in the parsing loop (the code using csv_reader and variables
record and num_cols) to return or propagate an error when record.len() >
num_cols so rows wider than the header are rejected (also apply the same check
to the other parsing block around the code referenced at lines ~349-359).
- Around line 165-176: The dedupe currently uses only namespace, pod_name, and
container_name which collapses distinct pods with different pod_uid; update the
comparison keys used in entries.sort_by and entries.dedup_by to include pod_uid
as well (i.e., compare (&a.namespace, &a.pod_name, &a.container_name,
&a.pod_uid) against b's tuple) so each unique pod_uid is preserved and you don't
lose associated log_path_prefix/pod_uid metadata.
- Around line 150-154: The constructor K8sPathTable::new currently initializes
data to None which prevents snapshot() from returning Some(...) until
update_from_paths() runs; change new to initialize data with an empty
snapshot/batch instead of None so the table is immediately registered.
Concretely, inside K8sPathTable::new set data: Arc::new(RwLock::new(Some(<empty
snapshot/batch>))) using the same snapshot/batch type produced by
update_from_paths()/snapshot(), so snapshot() returns Some(empty) initially;
reference the K8sPathTable::new function, the data field, and the
update_from_paths()/snapshot() helpers to implement this.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ebad5455-51ae-48ce-8b5f-d822ad8186b1
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
crates/logfwd-core/Cargo.tomlcrates/logfwd-core/src/enrichment.rs
|
|
||
| /// Read a CSV into an Arrow RecordBatch. All columns are Utf8. | ||
| fn read_csv_to_batch<R: io::Read>(reader: R) -> Result<RecordBatch, String> { | ||
| let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); |
There was a problem hiding this comment.
Reject CSV rows that are wider than the header.
With flexible(true), record.len() > num_cols is accepted here and every overflow field is discarded. That turns malformed source data into truncated enrichment rows with no load error.
Proposed fix
fn read_csv_to_batch<R: io::Read>(reader: R) -> Result<RecordBatch, String> {
let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader);
@@
for result in csv_reader.records() {
let record = result.map_err(|e| format!("CSV parse error: {e}"))?;
+ if record.len() > num_cols {
+ return Err(format!(
+ "CSV row has {} fields but header has {}",
+ record.len(),
+ num_cols
+ ));
+ }
for (i, field) in record.iter().enumerate() {
- if i < num_cols {
- columns[i].push(field.to_string());
- }
+ columns[i].push(field.to_string());
}
// Pad missing columns with empty string.
for col in columns.iter_mut().skip(record.len()) {
col.push(String::new());
}Also applies to: 349-359
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/src/enrichment.rs` at line 332, The CSV reader currently
uses csv::ReaderBuilder::new().flexible(true) which silently drops overflow
fields; change this to not be flexible (remove flexible(true) or set
flexible(false)) and add an explicit validation after reading each record (check
record.len() against num_cols) in the parsing loop (the code using csv_reader
and variables record and num_cols) to return or propagate an error when
record.len() > num_cols so rows wider than the header are rejected (also apply
the same check to the other parsing block around the code referenced at lines
~349-359).
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
crates/logfwd-core/src/enrichment.rs (3)
169-180:⚠️ Potential issue | 🟠 MajorPreserve distinct pods with different UIDs.
The current sort/dedup key drops one row whenever
{namespace, pod_name, container_name}repeats. That loses valid data for pods recreated with the same name but a new UID, and the survivinglog_path_prefix/pod_uidis whichever entry happened to come first.Proposed fix
entries.sort_by(|a, b| { - (&a.namespace, &a.pod_name, &a.container_name).cmp(&( + (&a.namespace, &a.pod_name, &a.container_name, &a.pod_uid).cmp(&( &b.namespace, &b.pod_name, &b.container_name, + &b.pod_uid, )) }); entries.dedup_by(|a, b| { a.namespace == b.namespace && a.pod_name == b.pod_name && a.container_name == b.container_name + && a.pod_uid == b.pod_uid });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/enrichment.rs` around lines 169 - 180, The dedup logic in entries.sort_by and entries.dedup_by currently uses (namespace, pod_name, container_name) and therefore drops entries for the same pod name but different pod_uid; update both sort_by and dedup_by to include pod_uid (or whichever UID field represents the pod identity) in the tuple comparison so each distinct pod UID is preserved (i.e., sort_by should compare (&a.namespace, &a.pod_name, &a.container_name, &a.pod_uid) and dedup_by should check a.namespace == b.namespace && a.pod_name == b.pod_name && a.container_name == b.container_name && a.pod_uid == b.pod_uid).
154-158:⚠️ Potential issue | 🟠 MajorRegister
k8s_podsas an empty batch from startup.
SqlTransform::execute()only registers enrichment tables whensnapshot()isSome(_). InitializingK8sPathTablewithNonemeans any SQL that actually references this table fails withtable not founduntil the firstupdate_from_paths(), even though the schema is already fixed.Proposed fix
pub fn new(table_name: impl Into<String>) -> Self { K8sPathTable { table_name: table_name.into(), - data: Arc::new(RwLock::new(None)), + data: Arc::new(RwLock::new(Some(build_k8s_batch(&[])))), } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/enrichment.rs` around lines 154 - 158, K8sPathTable::new currently sets data to None which prevents SqlTransform::execute() from registering the enrichment table until update_from_paths() runs; change K8sPathTable::new to initialize data with Some(empty snapshot/batch) (i.e. an empty, but typed, snapshot the same shape produced by update_from_paths()) so that SqlTransform::execute() sees snapshot() as Some(_) and the k8s_pods table is registered at startup; ensure the empty value matches the expected type used by snapshot()/update_from_paths() and by any code that reads K8sPathTable::data.
339-367:⚠️ Potential issue | 🟠 MajorReject CSV rows that are wider than the header.
flexible(true)plusif i < num_colssilently truncates overflow fields. That turns malformed enrichment data into corrupted lookup rows instead of failing fast.Proposed fix
fn read_csv_to_batch<R: io::Read>(reader: R) -> Result<RecordBatch, String> { let mut csv_reader = csv::ReaderBuilder::new().flexible(true).from_reader(reader); @@ for result in csv_reader.records() { let record = result.map_err(|e| format!("CSV parse error: {e}"))?; + if record.len() > num_cols { + return Err(format!( + "CSV row has {} fields but header has {}", + record.len(), + num_cols + )); + } for (i, field) in record.iter().enumerate() { - if i < num_cols { - columns[i].push(field.to_string()); - } + columns[i].push(field.to_string()); } // Pad missing columns with empty string. for col in columns.iter_mut().skip(record.len()) { col.push(String::new()); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-core/src/enrichment.rs` around lines 339 - 367, The function read_csv_to_batch currently uses csv::ReaderBuilder::new().flexible(true) and then silently truncates records longer than the header via the if i < num_cols check; change this to fail fast by detecting overflow rows and returning an Err. Specifically, in read_csv_to_batch, after obtaining a record (the result from csv_reader.records()), check if record.len() > num_cols and return a formatted Err (e.g., "CSV row has more fields than header: expected X, got Y") instead of pushing/truncating; you can keep or remove csv::ReaderBuilder::flexible(true) but ensure the overflow check is present before the loop that pushes into columns and before the padding code so malformed wide rows cause an error rather than silent truncation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-transform/src/lib.rs`:
- Around line 393-406: Pipeline::from_config is creating a SqlTransform via
SqlTransform::new() then moving it into the Pipeline without calling
SqlTransform::add_enrichment_table, leaving self.enrichment_tables empty and
breaking enrichment queries; modify Pipeline::from_config to call
add_enrichment_table for each enrichment provider (the same ones
configured/created in that function) before inserting the SqlTransform into the
Pipeline so that SqlTransform::enrichment_tables contains env, host_info,
k8s_pods providers at runtime; locate the SqlTransform instance created by
SqlTransform::new() in Pipeline::from_config and invoke its
add_enrichment_table(...) for each Arc<dyn
logfwd_core::enrichment::EnrichmentTable> you construct or obtain, then move the
populated transform into the Pipeline.
- Around line 757-819: QueryAnalyzer::new() currently ignores JOIN ... ON ...
predicates when building ScanConfig, so log-side columns referenced only in join
conditions aren't requested; fix by walking join predicates in
QueryAnalyzer::new(), extract column refs on the primary/log side from ON
expressions (handle binary ops, AND/OR nesting and qualified names), add those
column names to the ScanConfig requested columns (same place projection/WHERE
columns are added), and ensure any alias handling (e.g., table qualifiers) maps
to the correct source table; add a regression test in the suite that performs a
keyed join (e.g., use SqlTransform::new("SELECT a.owner FROM logs LEFT JOIN
assets a ON logs.host_str = a.hostname") with an assets StaticTable/K8sPathTable
fixture) and assert the output includes correct rows and that host_str was used
(i.e., no error and join behaves as expected).
---
Duplicate comments:
In `@crates/logfwd-core/src/enrichment.rs`:
- Around line 169-180: The dedup logic in entries.sort_by and entries.dedup_by
currently uses (namespace, pod_name, container_name) and therefore drops entries
for the same pod name but different pod_uid; update both sort_by and dedup_by to
include pod_uid (or whichever UID field represents the pod identity) in the
tuple comparison so each distinct pod UID is preserved (i.e., sort_by should
compare (&a.namespace, &a.pod_name, &a.container_name, &a.pod_uid) and dedup_by
should check a.namespace == b.namespace && a.pod_name == b.pod_name &&
a.container_name == b.container_name && a.pod_uid == b.pod_uid).
- Around line 154-158: K8sPathTable::new currently sets data to None which
prevents SqlTransform::execute() from registering the enrichment table until
update_from_paths() runs; change K8sPathTable::new to initialize data with
Some(empty snapshot/batch) (i.e. an empty, but typed, snapshot the same shape
produced by update_from_paths()) so that SqlTransform::execute() sees snapshot()
as Some(_) and the k8s_pods table is registered at startup; ensure the empty
value matches the expected type used by snapshot()/update_from_paths() and by
any code that reads K8sPathTable::data.
- Around line 339-367: The function read_csv_to_batch currently uses
csv::ReaderBuilder::new().flexible(true) and then silently truncates records
longer than the header via the if i < num_cols check; change this to fail fast
by detecting overflow rows and returning an Err. Specifically, in
read_csv_to_batch, after obtaining a record (the result from
csv_reader.records()), check if record.len() > num_cols and return a formatted
Err (e.g., "CSV row has more fields than header: expected X, got Y") instead of
pushing/truncating; you can keep or remove csv::ReaderBuilder::flexible(true)
but ensure the overflow check is present before the loop that pushes into
columns and before the padding code so malformed wide rows cause an error rather
than silent truncation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a401a295-132b-47c3-bfe0-4f92a271b7a0
📒 Files selected for processing (2)
crates/logfwd-core/src/enrichment.rscrates/logfwd-transform/src/lib.rs
| pub fn add_enrichment_table( | ||
| &mut self, | ||
| table: Arc<dyn logfwd_core::enrichment::EnrichmentTable>, | ||
| ) { | ||
| let name = table.name(); | ||
| assert!( | ||
| name != "logs", | ||
| "enrichment table cannot be named 'logs' (reserved)" | ||
| ); | ||
| assert!( | ||
| !self.enrichment_tables.iter().any(|t| t.name() == name), | ||
| "duplicate enrichment table name: '{name}'" | ||
| ); | ||
| self.enrichment_tables.push(table); |
There was a problem hiding this comment.
Wire enrichment tables into Pipeline::from_config before merging.
SqlTransform can store providers now, but in crates/logfwd/src/pipeline.rs, Lines 55-105 still only call SqlTransform::new() and then move the transform into Pipeline without ever invoking add_enrichment_table(). In the current integration self.enrichment_tables stays empty, so env, host_info, and k8s_pods queries only work in these unit tests, not from the real pipeline path.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-transform/src/lib.rs` around lines 393 - 406,
Pipeline::from_config is creating a SqlTransform via SqlTransform::new() then
moving it into the Pipeline without calling SqlTransform::add_enrichment_table,
leaving self.enrichment_tables empty and breaking enrichment queries; modify
Pipeline::from_config to call add_enrichment_table for each enrichment provider
(the same ones configured/created in that function) before inserting the
SqlTransform into the Pipeline so that SqlTransform::enrichment_tables contains
env, host_info, k8s_pods providers at runtime; locate the SqlTransform instance
created by SqlTransform::new() in Pipeline::from_config and invoke its
add_enrichment_table(...) for each Arc<dyn
logfwd_core::enrichment::EnrichmentTable> you construct or obtain, then move the
populated transform into the Pipeline.
| #[test] | ||
| fn test_enrichment_cross_join() { | ||
| use logfwd_core::enrichment::StaticTable; | ||
|
|
||
| let batch = make_test_batch(); | ||
| let mut transform = | ||
| SqlTransform::new("SELECT logs.*, env.environment FROM logs CROSS JOIN env").unwrap(); | ||
|
|
||
| // Add a static enrichment table. | ||
| let env_table = Arc::new(StaticTable::new( | ||
| "env", | ||
| &[("environment".to_string(), "production".to_string())], | ||
| )); | ||
| transform.add_enrichment_table(env_table); | ||
|
|
||
| let result = transform.execute(batch).unwrap(); | ||
| assert_eq!(result.num_rows(), 4); | ||
|
|
||
| // Should have original columns plus "environment". | ||
| let env_col = result | ||
| .column_by_name("environment") | ||
| .expect("should have environment column") | ||
| .as_any() | ||
| .downcast_ref::<StringArray>() | ||
| .unwrap(); | ||
| for i in 0..4 { | ||
| assert_eq!(env_col.value(i), "production"); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_enrichment_unused_table_no_error() { | ||
| use logfwd_core::enrichment::StaticTable; | ||
|
|
||
| let batch = make_test_batch(); | ||
| let table = Arc::new(StaticTable::new( | ||
| "unused", | ||
| &[("key".to_string(), "val".to_string())], | ||
| )); | ||
|
|
||
| let mut transform = SqlTransform::new("SELECT * FROM logs").unwrap(); | ||
| transform.add_enrichment_table(table); | ||
|
|
||
| // Enrichment table registered but not referenced in SQL — should not error. | ||
| let result = transform.execute(batch).unwrap(); | ||
| assert_eq!(result.num_rows(), 4); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_enrichment_empty_table_skipped() { | ||
| use logfwd_core::enrichment::K8sPathTable; | ||
|
|
||
| let batch = make_test_batch(); | ||
| let k8s = Arc::new(K8sPathTable::new("k8s_pods")); | ||
| // Not loaded — snapshot() returns None. | ||
|
|
||
| let mut transform = SqlTransform::new("SELECT * FROM logs").unwrap(); | ||
| transform.add_enrichment_table(k8s); | ||
|
|
||
| // Should not error — empty table just skipped. | ||
| let result = transform.execute(batch).unwrap(); | ||
| assert_eq!(result.num_rows(), 4); | ||
| } |
There was a problem hiding this comment.
Keyed enrichment JOINs still miss the log-side columns.
These tests only exercise CROSS JOIN or unreferenced tables. QueryAnalyzer::new() still ignores JOIN ... ON ... expressions when building ScanConfig, so a query like SELECT a.owner FROM logs LEFT JOIN assets a ON logs.host_str = a.hostname will not request host_str unless it also appears in the projection or WHERE. Please walk join predicates and add a real keyed-join regression here before advertising JOIN support.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-transform/src/lib.rs` around lines 757 - 819,
QueryAnalyzer::new() currently ignores JOIN ... ON ... predicates when building
ScanConfig, so log-side columns referenced only in join conditions aren't
requested; fix by walking join predicates in QueryAnalyzer::new(), extract
column refs on the primary/log side from ON expressions (handle binary ops,
AND/OR nesting and qualified names), add those column names to the ScanConfig
requested columns (same place projection/WHERE columns are added), and ensure
any alias handling (e.g., table qualifiers) maps to the correct source table;
add a regression test in the suite that performs a keyed join (e.g., use
SqlTransform::new("SELECT a.owner FROM logs LEFT JOIN assets a ON logs.host_str
= a.hostname") with an assets StaticTable/K8sPathTable fixture) and assert the
output includes correct rows and that host_str was used (i.e., no error and join
behaves as expected).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-ebpf-proto/Cargo.toml`:
- Around line 12-15: The Cargo.toml currently lists serde_json under
[dependencies] but the comment says it's only used for tests; move serde_json =
"1" from the [dependencies] section into the [dev-dependencies] section so it is
only included for test builds. Update the Cargo.toml by removing the serde_json
line from [dependencies] and adding the same version line under
[dev-dependencies] (retain the package name and version string).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5201f29e-d03c-4419-9910-14e7bcea9998
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.tomlcrates/logfwd-ebpf-proto/Cargo.tomlcrates/logfwd-ebpf-proto/src/bpf/mod.rscrates/logfwd-ebpf-proto/src/bpf/pipe_capture.rscrates/logfwd-ebpf-proto/src/common.rscrates/logfwd-ebpf-proto/src/lib.rs
| [dependencies] | ||
| serde_json = "1" # for test helpers only | ||
|
|
||
| [dev-dependencies] |
There was a problem hiding this comment.
Move serde_json to [dev-dependencies].
Comment says "for test helpers only" but it's in [dependencies]. This pulls serde_json into the main dependency graph unnecessarily.
Proposed fix
[dependencies]
-serde_json = "1" # for test helpers only
[dev-dependencies]
+serde_json = "1"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [dependencies] | |
| serde_json = "1" # for test helpers only | |
| [dev-dependencies] | |
| [dependencies] | |
| [dev-dependencies] | |
| serde_json = "1" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-ebpf-proto/Cargo.toml` around lines 12 - 15, The Cargo.toml
currently lists serde_json under [dependencies] but the comment says it's only
used for tests; move serde_json = "1" from the [dependencies] section into the
[dev-dependencies] section so it is only included for test builds. Update the
Cargo.toml by removing the serde_json line from [dependencies] and adding the
same version line under [dev-dependencies] (retain the package name and version
string).
a787c08 to
8649dec
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-core/fuzz/fuzz_targets/pipe_event.rs`:
- Around line 13-26: The current local PipeEvent struct and the direct unsafe
cast from a raw byte pointer to &PipeEvent are incorrect: replace the local
PipeEvent usage with the canonical PipeWriteEvent from
crates::logfwd_ebpf_proto::common and stop doing an aligned reference cast;
instead use std::ptr::read_unaligned() to read the bytes into a PipeWriteEvent.
Update all field accesses to use the canonical names (write_len and
captured_len) and replace MAX_DATA with MAX_CAPTURE_BYTES so the fuzzer matches
the real ring-buffer layout and avoids undefined behavior.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs`:
- Around line 30-31: Remove the tautological asserts on usize
(assert!(batch.num_columns() >= 0) / num_rows) and replace them with actual
invariants: verify per-column length consistency by iterating over columns and
asserting batch.column(i).len() == batch.num_rows() for every i, and apply the
same check to batch2 as well; optionally also assert that batch.num_columns() ==
batch2.num_columns() and batch.num_rows() == batch2.num_rows() if those
cross-batch invariants are intended (use the batch, batch2, num_columns,
num_rows, and column(...) symbols to locate the code).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 186f2528-b342-4ea1-b9d4-b327b7b7238d
⛔ Files ignored due to path filters (1)
crates/logfwd-core/fuzz/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (17)
crates/logfwd-core/fuzz/.gitignorecrates/logfwd-core/fuzz/Cargo.tomlcrates/logfwd-core/fuzz/artifacts/fuzz_scanner/crash-47eef4b87ea82d35dc7da973c3391a3f15800a30crates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-2d5b3aeca551c891189e4856971b929d4fefab9ecrates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-47eef4b87ea82d35dc7da973c3391a3f15800a30crates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-5258ff094acbab2d15a3be0c8f89be128454d4bccrates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-878968522d930a45a393a40ae4b00fe2a2b95e1ccrates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-ab479c7a0300377f0cdfa22164d1db0a0a326df2crates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-aed4337a5c6da1ffddc9b3acc7c42a86377e2420crates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-cb529b0dba4299132befe93a913113cb667acaa2crates/logfwd-core/fuzz/artifacts/fuzz_scanner/minimized-from-e7f9636f4b6c13489c3188335bd2e0a3d33f2a04crates/logfwd-core/fuzz/fuzz_targets/cri_path_parser.rscrates/logfwd-core/fuzz/fuzz_targets/csv_loader.rscrates/logfwd-core/fuzz/fuzz_targets/format_parser.rscrates/logfwd-core/fuzz/fuzz_targets/jsonl_loader.rscrates/logfwd-core/fuzz/fuzz_targets/pipe_event.rscrates/logfwd-core/fuzz/fuzz_targets/scanner.rs
| // Re-define PipeEvent here to avoid depending on the ebpf-proto crate | ||
| // (which may have different compilation requirements). | ||
| const MAX_DATA: usize = 4096; | ||
|
|
||
| #[repr(C)] | ||
| #[derive(Clone, Copy)] | ||
| struct PipeEvent { | ||
| pid: u32, | ||
| tgid: u32, | ||
| cgroup_id: u64, | ||
| len: u32, | ||
| captured: u32, | ||
| data: [u8; MAX_DATA], | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Local fuzz struct and cast ==="
sed -n '13,38p' crates/logfwd-core/fuzz/fuzz_targets/pipe_event.rs
echo
echo "=== Canonical PipeWriteEvent ==="
sed -n '1,80p' crates/logfwd-ebpf-proto/src/common.rs
echo
echo "=== Field name usage in fuzz target ==="
rg -n 'event\.(len|captured|write_len|captured_len|stream)' crates/logfwd-core/fuzz/fuzz_targets/pipe_event.rsRepository: strawgate/memagent
Length of output: 2205
Unsafe unaligned cast and struct layout mismatch with canonical event.
Line 37 casts the raw byte pointer directly to &PipeEvent, which violates Rust's alignment guarantees and invokes undefined behavior. Additionally, the local struct omits stream and _pad fields and uses different field names (len/captured vs. write_len/captured_len) compared to the canonical PipeWriteEvent in crates/logfwd-ebpf-proto/src/common.rs, meaning the fuzzer does not exercise the actual ring-buffer layout.
Use std::ptr::read_unaligned() with the canonical struct to fix both issues:
Suggested fix
-// Re-define PipeEvent here to avoid depending on the ebpf-proto crate
-const MAX_DATA: usize = 4096;
-
-#[repr(C)]
-#[derive(Clone, Copy)]
-struct PipeEvent {
- pid: u32,
- tgid: u32,
- cgroup_id: u64,
- len: u32,
- captured: u32,
- data: [u8; MAX_DATA],
-}
+use logfwd_ebpf_proto::common::{PipeWriteEvent, MAX_CAPTURE_BYTES};Then replace unsafe reference cast with:
- let event = unsafe { &*(data.as_ptr() as *const PipeEvent) };
+ let event = unsafe { std::ptr::read_unaligned(data.as_ptr() as *const PipeWriteEvent) };Update field accesses: .len → .write_len, .captured → .captured_len, MAX_DATA → MAX_CAPTURE_BYTES.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/fuzz/fuzz_targets/pipe_event.rs` around lines 13 - 26, The
current local PipeEvent struct and the direct unsafe cast from a raw byte
pointer to &PipeEvent are incorrect: replace the local PipeEvent usage with the
canonical PipeWriteEvent from crates::logfwd_ebpf_proto::common and stop doing
an aligned reference cast; instead use std::ptr::read_unaligned() to read the
bytes into a PipeWriteEvent. Update all field accesses to use the canonical
names (write_len and captured_len) and replace MAX_DATA with MAX_CAPTURE_BYTES
so the fuzzer matches the real ring-buffer layout and avoids undefined behavior.
| assert!(batch.num_columns() >= 0); | ||
| assert!(batch.num_rows() >= 0); |
There was a problem hiding this comment.
Replace tautological non-negative assertions with meaningful invariants.
These checks are always true (usize). They don’t validate behavior. Reuse the per-column length consistency check for batch2 as well to increase bug-finding signal.
Suggested tightening
- assert!(batch.num_columns() >= 0);
- assert!(batch.num_rows() >= 0);
+ // Structural sanity: each column length must match row count.
+ for col_idx in 0..batch.num_columns() {
+ assert_eq!(batch.column(col_idx).len(), batch.num_rows());
+ }
@@
- assert!(batch2.num_rows() >= 0);
+ for col_idx in 0..batch2.num_columns() {
+ assert_eq!(batch2.column(col_idx).len(), batch2.num_rows());
+ }Also applies to: 64-64
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/fuzz/fuzz_targets/scanner.rs` around lines 30 - 31, Remove
the tautological asserts on usize (assert!(batch.num_columns() >= 0) / num_rows)
and replace them with actual invariants: verify per-column length consistency by
iterating over columns and asserting batch.column(i).len() == batch.num_rows()
for every i, and apply the same check to batch2 as well; optionally also assert
that batch.num_columns() == batch2.num_columns() and batch.num_rows() ==
batch2.num_rows() if those cross-batch invariants are intended (use the batch,
batch2, num_columns, num_rows, and column(...) symbols to locate the code).
- K8sPathTable starts with empty batch (not None) so SQL queries don't fail with "table not found" before first update - Include pod_uid in dedup key to preserve distinct pods with same name - CSV loader rejects rows wider than header (was silently truncating) - Updated tests for new K8sPathTable initial state - Remove accidentally committed fuzz corpus/target files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a7cd838 to
ffe3cb6
Compare
Summary
Introduces
EnrichmentTabletrait and providers that produce Arrow RecordBatches registered as DataFusion lookup tables alongsidelogs.Providers
StaticTableHostInfoTablehost_infoK8sPathTableCsvFileTablereload()JsonLinesFileTablereload()Usage
SqlTransform gains
add_enrichment_table(). Tables registered viaArc<RwLock<Option<RecordBatch>>>— pipeline reads once per batch, background refresh doesn't block.Test plan
🤖 Generated with Claude Code