feat: enrichment tables, processors, and CSV geo backend#2089
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds many new snapshot and reloadable enrichment tables and configs (env_vars, kv_file, csv, jsonl, geo_database with CsvRange), new one-row system enrichments (process_info, network_info, container_info, k8s_cluster_info), and updates k8s_path/table_name semantics. Implements periodic reloads for reloadable enrichments and geo DBs, adds Blocklist and HttpEnrich processors, exposes a CSV-range GeoIP backend (CsvRangeDatabase) through UDF re-exports, tightens validation (non-empty names/paths and refresh_interval > 0), updates runtime pipeline wiring to track reload tasks, and expands docs, examples, and tests. Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 2 warnings)
✅ Passed checks (1 passed)
Comment |
ApprovabilityVerdict: Needs human review This PR introduces substantial new feature capability including multiple enrichment table types, two new processors (one performing HTTP network I/O with caching), a CSV geo database backend, container/K8s runtime detection, and procfs network discovery. The scope and nature of these changes—new components, new user-facing behavior, network I/O, and system file parsing—warrant human review. Additionally, two unresolved review comments identify logic bugs in container runtime detection and example configuration. You can customize Macroscope's approvability policy. Learn more. |
There was a problem hiding this comment.
Pull request overview
Adds a broad enrichment framework to logfwd, expanding what can be joined into DataFusion SQL (snapshot tables + GeoIP backends) and introducing pre-SQL processor primitives intended for I/O-based enrichment.
Changes:
- Adds multiple new enrichment table types (env vars, process/network/container/K8s cluster info, KV file) plus auto-reload support for file-backed tables and GeoIP DBs.
- Introduces a CSV IP-range GeoIP backend (
csv_range) alongside MMDB and wires it through config + runtime. - Adds new processor implementations (blocklist + HTTP enrichment), plus examples and documentation updates.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| examples/use-cases/nginx-geo-enriched-to-otlp.yaml | New example config demonstrating GeoIP enrichment pattern. |
| examples/use-cases/kubernetes-enriched-to-otlp.yaml | New example config showing multi-table K8s/host/container enrichment. |
| examples/use-cases/app-with-metadata-enrichment-to-otlp.yaml | New example config demonstrating env/file metadata enrichment pattern. |
| dev-docs/research/enrichment-architecture-plan-2026-04.md | Adds a research/architecture plan documenting snapshot vs live enrichment split. |
| dev-docs/research/README.md | Links the new enrichment research doc. |
| crates/logfwd/src/transform.rs | Re-exports CsvRangeDatabase for external use. |
| crates/logfwd/src/main.rs | Extends pipeline validation to support new enrichment variants + CSV-range Geo DB. |
| crates/logfwd-transform/src/udf/mod.rs | Registers CSV range geo backend module and re-export. |
| crates/logfwd-transform/src/udf/csv_range_geo.rs | Implements CSV IP-range GeoIP backend + tests. |
| crates/logfwd-transform/src/enrichment.rs | Adds reloadable Geo DB wrapper + new enrichment tables (Env/KV/Process/Network/Container/K8s cluster) + tests. |
| crates/logfwd-runtime/src/transform.rs | Re-exports CsvRangeDatabase for runtime crate. |
| crates/logfwd-runtime/src/processor/mod.rs | Exposes new processor modules (blocklist/http_enrich). |
| crates/logfwd-runtime/src/processor/http_enrich.rs | Adds batch-blocking HTTP enrichment processor with cache + tests. |
| crates/logfwd-runtime/src/processor/blocklist.rs | Adds CSV-backed blocklist processor + tests. |
| crates/logfwd-runtime/src/pipeline/build.rs | Wires new enrichment table types + implements refresh_interval reload tasks; updates tests. |
| crates/logfwd-runtime/Cargo.toml | Adds ureq + csv deps to support new processors. |
| crates/logfwd-config/src/validate.rs | Adds validation for refresh_interval and new enrichment config variants. |
| crates/logfwd-config/src/types.rs | Extends config schema: new enrichment variants, CSV/JSONL refresh_interval, geo format csv_range. |
| crates/logfwd-config/src/lib.rs | Re-exports new config types. |
| book/src/content/docs/configuration/reference.mdx | Updates enrichment documentation; also modifies input reference content. |
| Cargo.lock | Updates lockfile for new deps and dependency bumps. |
Comments suppressed due to low confidence (1)
crates/logfwd-runtime/src/pipeline/build.rs:723
- The unit tests that asserted
from_configrejects emptyinputs/outputs(andpoll_interval_ms == 0) were removed in this change, but the corresponding validation logic still exists above. Please restore equivalent coverage so regressions in these basic config invariants are caught.
fn minimal_config(path: String) -> PipelineConfig {
PipelineConfig {
inputs: vec![minimal_input(path)],
outputs: vec![minimal_output()],
transform: None,
enrichment: vec![],
resource_attrs: std::collections::HashMap::new(),
workers: None,
batch_target_bytes: None,
batch_timeout_ms: None,
poll_interval_ms: None,
}
}
#[test]
fn from_config_uses_explicit_data_dir_for_checkpoint_store() {
let dir = tempfile::tempdir().expect("tempdir");
let log_path = dir.path().join("in.log");
let data_dir = dir.path().join("state");
std::fs::write(&log_path, b"{\"level\":\"INFO\"}\n").expect("write input");
let cfg = PipelineConfig {
inputs: vec![minimal_input(log_path.to_string_lossy().into_owned())],
transform: None,
outputs: vec![minimal_output()],
enrichment: Vec::new(),
resource_attrs: Default::default(),
workers: None,
batch_target_bytes: None,
batch_timeout_ms: None,
poll_interval_ms: None,
};
let pipeline = Pipeline::from_config_with_data_dir(
"p",
&cfg,
&logfwd_test_utils::test_meter(),
None,
Some(&data_dir),
)
.expect("pipeline should build with explicit data dir");
assert!(
pipeline.checkpoint_store.is_some(),
"explicit data dir should open a checkpoint store"
);
assert!(
data_dir.join("p").is_dir(),
"checkpoint store should be rooted under the explicit data dir"
);
}
#[test]
fn from_config_rejects_zero_batch_and_poll_timeouts() {
let dir = tempfile::tempdir().expect("tempdir");
let log_path = dir.path().join("in.log");
std::fs::write(&log_path, b"{\"level\":\"INFO\"}\n").expect("write input");
let cfg = PipelineConfig {
inputs: vec![minimal_input(log_path.to_string_lossy().into_owned())],
transform: None,
outputs: vec![minimal_output()],
enrichment: Vec::new(),
resource_attrs: Default::default(),
workers: None,
batch_target_bytes: None,
batch_timeout_ms: Some(0),
poll_interval_ms: None,
};
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8b767f190c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
crates/logfwd-config/src/validate.rs (1)
751-873: 🧹 Nitpick | 🔵 TrivialAdd explicit regression tests for the new enrichment validation paths.
These new branches add user-facing validation rules (
refresh_interval > 0, blankenv_vars.prefix,kv_filepath/table checks), but the provided config tests do not cover them directly. Please add focused validation tests so these rules do not regress silently.Based on learnings, "When config fields change, update parsing code,
book/src/content/docs/configuration/reference.mdx, example configs, and validation tests. Seedev-docs/CHANGE_MAP.mdfor full co-change requirements."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/validate.rs` around lines 751 - 873, Add regression unit tests that exercise each new EnrichmentConfig validation branch: cover refresh_interval == Some(0) for Geo/Csv/Jsonl/KvFile, blank path checks (absolute path non-existence) for Csv/Jsonl/KvFile, empty table_name for Static/Csv/Jsonl/K8sPath/EnvVars/KvFile, and empty EnvVars.prefix. Locate the validation function that inspects EnrichmentConfig (the code handling EnrichmentConfig::Geo, ::Static, ::Csv, ::Jsonl, ::K8sPath, ::EnvVars, ::KvFile in validate.rs) and create focused tests that build minimal pipeline configs triggering each error message and assert ConfigError::Validation with the expected message; add tests alongside the existing config validation tests so future changes to parsing/refs require updating tests, docs, and examples per CHANGE_MAP guidance.book/src/content/docs/configuration/reference.mdx (1)
931-942:⚠️ Potential issue | 🟠 MajorMove this
enrichmentblock underpipelines.app.In the advanced layout,
enrichmentis part of each pipeline config, not a top-level sibling ofpipelines. As written, the “Complete example” is not a valid multi-pipeline config.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@book/src/content/docs/configuration/reference.mdx` around lines 931 - 942, The `enrichment` block is currently a top-level sibling of `pipelines` but must be nested under the specific pipeline (e.g., pipelines.app) in the advanced layout; move the entire enrichment mapping (the list of types and the static table_name/labels) so it becomes a child of the pipelines.app configuration rather than a top-level key, ensuring the final YAML places enrichment under the pipelines -> app section to produce a valid multi-pipeline config.crates/logfwd-config/src/types.rs (2)
675-690:⚠️ Potential issue | 🟠 MajorThe new processor layer is still unreachable from YAML.
PipelineConfigstill has no processor list/config field, so the newBlocklistProcessorandHttpEnrichProcessoradded in this PR cannot be expressed in config and never enter pipeline construction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/types.rs` around lines 675 - 690, PipelineConfig is missing a processors field so the new BlocklistProcessor and HttpEnrichProcessor cannot be configured via YAML; add a processors field to the struct (e.g., pub processors: Vec<ProcessorConfig>) with the same serde attributes used for inputs/outputs (#[serde(default, deserialize_with = "deserialize_one_or_many")]) and default so existing configs remain compatible, ensure ProcessorConfig enum/type is referenced here (and that it includes BlocklistProcessor and HttpEnrichProcessor) so pipeline construction can read and instantiate processors from PipelineConfig.
559-673: 🛠️ Refactor suggestion | 🟠 MajorAdd rustdoc for the new public config surface.
GeoDatabaseConfig,StaticEnrichmentConfig,HostInfoConfig,K8sPathConfig,CsvEnrichmentConfig,JsonlEnrichmentConfig,ProcessInfoConfig,KvFileEnrichmentConfig,NetworkInfoConfig,ContainerInfoConfig,K8sClusterInfoConfig, andEnrichmentConfigare public additions, but most of them still have no item-level///docs.As per coding guidelines, "All new public items must have /// doc comments describing behavior, not just the name."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/types.rs` around lines 559 - 673, Add item-level /// rustdoc comments for each newly public config type and the EnrichmentConfig enum to describe their purpose and behavior: document GeoDatabaseConfig (format, path, refresh behavior), StaticEnrichmentConfig (table_name, labels), HostInfoConfig, K8sPathConfig (default_k8s_table_name behavior), CsvEnrichmentConfig and JsonlEnrichmentConfig (table_name, path, optional refresh_interval semantics), EnvVarsEnrichmentConfig (prefix behavior), ProcessInfoConfig, KvFileEnrichmentConfig (path and refresh semantics), NetworkInfoConfig, ContainerInfoConfig, K8sClusterInfoConfig, and for the EnrichmentConfig enum document each variant (GeoDatabase, Static, HostInfo, K8sPath, Csv, Jsonl, EnvVars, ProcessInfo, KvFile, NetworkInfo, ContainerInfo, K8sClusterInfo) explaining what config each variant expects; keep comments concise, describe fields’ runtime effects and any defaults (e.g., refresh semantics, prefix stripping for EnvVars, default_k8s_table_name) and include brief examples where helpful.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@book/src/content/docs/configuration/reference.mdx`:
- Around line 603-606: The documentation for primary_ipv4/primary_ipv6 is
misleading: update the description of NetworkInfoTable's primary_ipv4 and
primary_ipv6 to state they return the lexicographically first non-loopback
address (after sorting discovered addresses), not the default-route interface
address, and clarify that all_ipv4/all_ipv6 provide the full comma-separated
lists for multihomed hosts; reference the symbols NetworkInfoTable,
primary_ipv4, primary_ipv6, all_ipv4, and all_ipv6 in the updated lines.
In `@crates/logfwd-runtime/src/processor/blocklist.rs`:
- Around line 264-285: In append_pre_built_columns, detect if the batch already
has fields named "{prefix}_match" or "{prefix}_category" before constructing the
new schema: compute match_name and cat_name, iterate batch.schema().fields() (or
field names) to see if either exists, and if so return
Err(ProcessorError::Permanent(...)) with a clear message mentioning the
conflicting name(s) instead of proceeding; otherwise continue to build the
schema and record batch as currently implemented.
In `@crates/logfwd-runtime/src/processor/http_enrich.rs`:
- Around line 347-443: Compute json_col_name and status_col_name, then check
batch.schema().fields() (or use batch.column_by_name) for existing fields with
those names and fail fast if found (return Err(ProcessorError::Permanent(...))
with a clear message) before mutating or appending columns; apply the same check
in both the num_rows == 0 branch and the main column-build branch (the code
paths that push new Field::new(&json_col_name, ...) and
Field::new(&status_col_name, ...)) so you never produce a duplicate-name schema.
In `@crates/logfwd-runtime/src/transform.rs`:
- Line 14: Add a triple-slash doc comment above the public re-export of
CsvRangeDatabase in transform.rs: describe what CsvRangeDatabase represents and
its behavior when used at runtime (purpose, key semantics, any important
constraints or panics/errors, and a short usage note), e.g. a one- or
two-sentence summary plus a brief remark about expected inputs/outputs and
visibility as a runtime-facing re-export; ensure the comment precedes the line
"pub use logfwd_transform::udf::CsvRangeDatabase;" and follows the project's ///
doc style.
In `@crates/logfwd-transform/src/udf/mod.rs`:
- Around line 5-12: Add missing doc comments for the new public UDF surface: add
/// doc comments above each pub mod declaration (csv_range_geo, geo_lookup,
grok, hash, json_extract, regexp_extract) describing what the module provides
and its behavior/usage, and add a /// comment above the pub use CsvRangeDatabase
explaining what CsvRangeDatabase represents, its primary responsibilities,
input/output contract, and any important invariants; ensure the comments follow
project style (behavior-focused, not just the name) and cover public API
expectations so new public items are documented.
In `@crates/logfwd/src/transform.rs`:
- Line 14: Add a triple-slash doc comment above the public re-export
`CsvRangeDatabase` in this facade module describing what the type does and how
it should be used (e.g., that it is a CSV-backed range-indexed database UDF,
expected input/output shape, any important invariants or configuration options).
Place the comment immediately above `pub use
logfwd_transform::udf::CsvRangeDatabase;` and keep it concise but descriptive to
satisfy the project guideline that all new public items have /// documentation.
In `@examples/use-cases/kubernetes-enriched-to-otlp.yaml`:
- Around line 21-55: The enrichment list includes k8s_path and process_info but
the SQL transform never references them (see enrichment types "k8s_path" and
"process_info" and the "transform" SELECT), so remove the unused enrichment
entries or update the transform to JOIN/SELECT their fields; specifically either
delete the k8s_path and process_info entries from enrichment, or add CROSS JOIN
k8s_path and/or CROSS JOIN process_info and select the needed columns (e.g.,
k8s_path.namespace or process_info.*) in the SELECT so the configured
enrichments are actually used.
---
Outside diff comments:
In `@book/src/content/docs/configuration/reference.mdx`:
- Around line 931-942: The `enrichment` block is currently a top-level sibling
of `pipelines` but must be nested under the specific pipeline (e.g.,
pipelines.app) in the advanced layout; move the entire enrichment mapping (the
list of types and the static table_name/labels) so it becomes a child of the
pipelines.app configuration rather than a top-level key, ensuring the final YAML
places enrichment under the pipelines -> app section to produce a valid
multi-pipeline config.
In `@crates/logfwd-config/src/types.rs`:
- Around line 675-690: PipelineConfig is missing a processors field so the new
BlocklistProcessor and HttpEnrichProcessor cannot be configured via YAML; add a
processors field to the struct (e.g., pub processors: Vec<ProcessorConfig>) with
the same serde attributes used for inputs/outputs (#[serde(default,
deserialize_with = "deserialize_one_or_many")]) and default so existing configs
remain compatible, ensure ProcessorConfig enum/type is referenced here (and that
it includes BlocklistProcessor and HttpEnrichProcessor) so pipeline construction
can read and instantiate processors from PipelineConfig.
- Around line 559-673: Add item-level /// rustdoc comments for each newly public
config type and the EnrichmentConfig enum to describe their purpose and
behavior: document GeoDatabaseConfig (format, path, refresh behavior),
StaticEnrichmentConfig (table_name, labels), HostInfoConfig, K8sPathConfig
(default_k8s_table_name behavior), CsvEnrichmentConfig and JsonlEnrichmentConfig
(table_name, path, optional refresh_interval semantics), EnvVarsEnrichmentConfig
(prefix behavior), ProcessInfoConfig, KvFileEnrichmentConfig (path and refresh
semantics), NetworkInfoConfig, ContainerInfoConfig, K8sClusterInfoConfig, and
for the EnrichmentConfig enum document each variant (GeoDatabase, Static,
HostInfo, K8sPath, Csv, Jsonl, EnvVars, ProcessInfo, KvFile, NetworkInfo,
ContainerInfo, K8sClusterInfo) explaining what config each variant expects; keep
comments concise, describe fields’ runtime effects and any defaults (e.g.,
refresh semantics, prefix stripping for EnvVars, default_k8s_table_name) and
include brief examples where helpful.
In `@crates/logfwd-config/src/validate.rs`:
- Around line 751-873: Add regression unit tests that exercise each new
EnrichmentConfig validation branch: cover refresh_interval == Some(0) for
Geo/Csv/Jsonl/KvFile, blank path checks (absolute path non-existence) for
Csv/Jsonl/KvFile, empty table_name for Static/Csv/Jsonl/K8sPath/EnvVars/KvFile,
and empty EnvVars.prefix. Locate the validation function that inspects
EnrichmentConfig (the code handling EnrichmentConfig::Geo, ::Static, ::Csv,
::Jsonl, ::K8sPath, ::EnvVars, ::KvFile in validate.rs) and create focused tests
that build minimal pipeline configs triggering each error message and assert
ConfigError::Validation with the expected message; add tests alongside the
existing config validation tests so future changes to parsing/refs require
updating tests, docs, and examples per CHANGE_MAP guidance.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 5af18fc1-4d42-46b5-a767-87aaa702f124
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (20)
book/src/content/docs/configuration/reference.mdxcrates/logfwd-config/src/lib.rscrates/logfwd-config/src/types.rscrates/logfwd-config/src/validate.rscrates/logfwd-runtime/Cargo.tomlcrates/logfwd-runtime/src/pipeline/build.rscrates/logfwd-runtime/src/processor/blocklist.rscrates/logfwd-runtime/src/processor/http_enrich.rscrates/logfwd-runtime/src/processor/mod.rscrates/logfwd-runtime/src/transform.rscrates/logfwd-transform/src/enrichment.rscrates/logfwd-transform/src/udf/csv_range_geo.rscrates/logfwd-transform/src/udf/mod.rscrates/logfwd/src/main.rscrates/logfwd/src/transform.rsdev-docs/research/README.mddev-docs/research/enrichment-architecture-plan-2026-04.mdexamples/use-cases/app-with-metadata-enrichment-to-otlp.yamlexamples/use-cases/kubernetes-enriched-to-otlp.yamlexamples/use-cases/nginx-geo-enriched-to-otlp.yaml
- Add tracing::warn else branches to all 4 tokio::spawn guards in build.rs - Fix O(n²) missing-key detection in HTTP enrich fetch_missing (scope HashSet to current chunk only) - Add output column collision checks in blocklist and HTTP enrich processors - Add doc comments to CsvRangeDatabase re-exports (3 files) - Restore host_metrics input section and table row in reference.mdx - Document primary_ip lexicographic selection rule in reference.mdx - Fix kubernetes example: use k8s_path and process_info in SQL query - Improve env test safety comment (nextest process isolation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-runtime/src/pipeline/build.rs (1)
229-318: 🧹 Nitpick | 🔵 TrivialConsistent reload patterns across CSV and JSONL enrichments.
The reload loop logic is duplicated but straightforward. Consider extracting a helper if more file-backed enrichment types are added.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/pipeline/build.rs` around lines 229 - 318, The CSV and JSONL reload loops duplicate the same async spawn logic; extract a helper like spawn_enrichment_reload_task that takes the Arc table, the table name (String) and interval_secs and contains the Handle::try_current() check, tokio::time::interval ticker, tokio::task::spawn_blocking(move || t.reload()) matching and tracing calls; make the helper generic over the table type (or accept a trait object exposing reload()) so you can replace the duplicated loop around Csv/JsonLines creation sites (e.g. around JsonLinesFileTable::new, the CSV table creation, and the t.reload() calls) with a single call to spawn_enrichment_reload_task(table, name, interval_secs).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-runtime/src/processor/blocklist.rs`:
- Around line 378-470: Tests are missing coverage for three hardening branches:
unsupported-type error, duplicate-key conflict rejection, and output-column
collision guard; add unit tests that call BlocklistProcessor::from_reader and
process to exercise these branches. Specifically, add tests named like
unsupported_key_type_returns_error (pass a non-UTF8/unsupported DataType in the
CSV/schema to trigger the unsupported-type error path in
BlocklistProcessor::from_reader), duplicate_key_conflict_rejected (create a CSV
with duplicate keys or conflicting entries to trigger the duplicate-key
rejection logic when constructing the processor), and
output_column_collision_is_error (construct a processor where the output column
names "bl_match"/"bl_category" collide with existing schema fields and assert
from_reader or process returns an error); use BlocklistProcessor::from_reader,
BlocklistProcessor::process, and the metadata helpers shown in existing tests to
locate and exercise the relevant branches.
- Around line 139-145: The None arm of the column lookup in blocklist.rs
currently treats a missing source_column as “no matches” and appends columns,
which silently disables the blocklist; change this to fail fast like the
unsupported-type branch by returning an error (including the missing column name
from self.source_column) instead of calling append_columns so schema drift/typos
are surfaced; update the match arm that handles
batch.column_by_name(&self.source_column) to produce an explicit error (with
clear context referencing the source_column and the blocklist operation) rather
than returning Ok(smallvec![enriched]).
- Around line 18-27: The module docs reference a stale constructor
BlocklistProcessor::new; update the docs to point to the actual public APIs
(BlocklistProcessor::from_reader and BlocklistProcessor::open) and, where
applicable, align the doc comment with CONFIG_REFERENCE.md conventions — replace
the "call `BlocklistProcessor::new` directly" text with guidance to use
`BlocklistProcessor::from_reader` for in-memory readers or
`BlocklistProcessor::open` for file paths, and ensure the doc comment format
matches the project's CONFIG_REFERENCE.md style.
- Around line 273-289: The new schema construction drops existing metadata; when
appending fields to batch.schema() preserve metadata by using
Schema::new_with_metadata instead of Schema::new. Locate the block that reads
let schema = batch.schema() and builds fields, then replace the final
Schema::new(fields) call with Schema::new_with_metadata(fields,
schema.metadata().clone()) (or equivalent) so the new Arc<Schema> includes the
original schema.metadata(); keep the checks for match_name and cat_name and the
Field pushes unchanged.
In `@crates/logfwd-runtime/src/processor/http_enrich.rs`:
- Around line 521-545: The helper function named urlencoded violates the
verb_noun naming rule; rename the function urlencoded to a verb_noun name such
as percent_encode_key (or encode_url_key) and update all call sites (references
to urlencoded) accordingly, preserving the current implementation and signature
so behavior and ownership remain unchanged; ensure any module exports or uses in
http_enrich.rs are updated to the new identifier.
- Around line 229-246: The current HTTP enrich path marks any non-empty 200
response as LookupResult::Hit, which caches/returns arbitrary text; modify the
resp handling in the same block (the code that reads
resp.into_body().as_reader() with self.config.max_body_bytes) to validate the
collected buf as JSON before emitting LookupResult::Hit (use
serde_json::from_str or equivalent to parse/validate), keep the existing
size-exceeded path as LookupResult::Error, and classify non-empty but non-JSON
bodies as LookupResult::Miss or return a LookupResult::Error with a clear
message instead of returning Hit.
- Around line 367-375: The schema rebuilds in http_enrich.rs (the blocks that
create `fields`, push the JSON and status `Field`s, then call
`Schema::new(fields)` and `RecordBatch::new_empty(schema)`) drop existing
metadata; replace both uses of `Schema::new(fields)` with
`Schema::new_with_metadata(fields, batch.schema().metadata().clone())` (the same
change in the other block around the second reconstruction at lines 442–449) so
the new schema preserves `batch.schema().metadata()` before creating the empty
`RecordBatch`.
In `@examples/use-cases/kubernetes-enriched-to-otlp.yaml`:
- Around line 36-61: The JOIN uses a non-existent column kp.path and l._path;
update the k8s_path reference to the actual column kp.log_path_prefix and either
switch l._path to the correct logs column (e.g., l.path) or add a clear
TODO/comment that this is forward-looking until issue `#1346` adds l._path; locate
the CROSS/LEFT JOIN involving k8s_path kp and logs l to make the replacement or
add the explanatory comment so the SQL matches the real schema.
---
Outside diff comments:
In `@crates/logfwd-runtime/src/pipeline/build.rs`:
- Around line 229-318: The CSV and JSONL reload loops duplicate the same async
spawn logic; extract a helper like spawn_enrichment_reload_task that takes the
Arc table, the table name (String) and interval_secs and contains the
Handle::try_current() check, tokio::time::interval ticker,
tokio::task::spawn_blocking(move || t.reload()) matching and tracing calls; make
the helper generic over the table type (or accept a trait object exposing
reload()) so you can replace the duplicated loop around Csv/JsonLines creation
sites (e.g. around JsonLinesFileTable::new, the CSV table creation, and the
t.reload() calls) with a single call to spawn_enrichment_reload_task(table,
name, interval_secs).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 21579301-6d59-43e7-af5a-f4d35c49bc09
📒 Files selected for processing (9)
book/src/content/docs/configuration/reference.mdxcrates/logfwd-runtime/src/pipeline/build.rscrates/logfwd-runtime/src/processor/blocklist.rscrates/logfwd-runtime/src/processor/http_enrich.rscrates/logfwd-runtime/src/transform.rscrates/logfwd-transform/src/enrichment.rscrates/logfwd-transform/src/udf/mod.rscrates/logfwd/src/transform.rsexamples/use-cases/kubernetes-enriched-to-otlp.yaml
- Add tracing::warn else branches to all 4 tokio::spawn guards in build.rs - Fix O(n²) missing-key detection in HTTP enrich fetch_missing (scope HashSet to current chunk only) - Add output column collision checks in blocklist and HTTP enrich processors - Add doc comments to CsvRangeDatabase re-exports (3 files) - Restore host_metrics input section and table row in reference.mdx - Document primary_ip lexicographic selection rule in reference.mdx - Fix kubernetes example: use k8s_path and process_info in SQL query - Improve env test safety comment (nextest process isolation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
6327ec4 to
dadb903
Compare
- Fix k8s_path JOIN to use _source_path = log_path_prefix (macroscope, coderabbit)
- Fix stale blocklist module doc (remove 'not yet wired', dead ::new ref)
- Blocklist: error on missing source_column instead of silent pass-through
- HTTP enrich: reject non-JSON 200 responses (must start with '{' or '[')
- HTTP enrich: rename urlencoded → encode_url_key (verb_noun convention)
- Preserve schema metadata in both processors (Schema::new_with_metadata)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 52608cd275
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd/src/main.rs (1)
1334-1450: 🛠️ Refactor suggestion | 🟠 MajorMove read-only enrichment assembly behind a
logfwd-runtimehelper.This branch now reimplements path resolution, table construction, and geo backend opening that already exists in
crates/logfwd-runtime/src/pipeline/build.rs. Keeping two manual builders in sync is brittle, and it pushes runtime/DataFusion knowledge back into the CLI crate.As per coding guidelines, "The binary crate must contain only CLI parsing, config loading, and signal handling. No long-lived runtime orchestration logic — delegate that to
logfwd-runtime."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/src/main.rs` around lines 1334 - 1450, The code duplicates enrichment/table/geo-db construction logic from logfwd-runtime; replace the local match over EnrichmentConfig and manual path resolution with a single call to the runtime helper (e.g., a function in logfwd_runtime::pipeline::build that assembles read-only enrichments). Concretely, remove the long match that mutates enrichment_tables, geo_database, and does path joins for Csv/Jsonl/KvFile, and instead call the runtime helper with the EnrichmentConfig list and base_path to get back the Vec of tables and optional geo DB (preserving error mapping), then assign enrichment_tables and geo_database from its return; also add the appropriate use/import for that helper and adapt error handling to propagate the same messages.
♻️ Duplicate comments (6)
crates/logfwd-runtime/src/processor/blocklist.rs (3)
273-289:⚠️ Potential issue | 🟡 MinorPreserve schema metadata when appending blocklist columns.
Schema::new(fields)dropsbatch.schema().metadata(). If the incoming batch carries Arrow schema annotations, this processor strips them from every enriched batch. UseSchema::new_with_metadata(fields, schema.metadata().clone())here.Suggested fix
- let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(Schema::new_with_metadata( + fields, + schema.metadata().clone(), + ));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/blocklist.rs` around lines 273 - 289, The schema creation currently uses Schema::new(fields) which discards existing Arrow schema metadata; update the blocklist processor where fields are built (after pushing match_name and cat_name) to construct the schema with the original metadata by calling Schema::new_with_metadata(fields, schema.metadata().clone()) instead of Schema::new(fields), ensuring you keep the original batch.schema() metadata when creating the Arc::new(Schema).
18-27:⚠️ Potential issue | 🟡 MinorFix the stale module docs before shipping this API.
Lines 18-27 still say blocklist is "not yet wired" and tell readers to call
BlocklistProcessor::new, but this type only exposesfrom_reader/openand the PR wires blocklist into YAML. The entry-point docs are currently pointing at a dead API.As per coding guidelines, "All new public items must have /// doc comments matching CONFIG_REFERENCE.md where applicable."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/blocklist.rs` around lines 18 - 27, Update the stale module docs to reflect that blocklist is now wired into the YAML config and replace the obsolete reference to BlocklistProcessor::new with the actual public constructors BlocklistProcessor::from_reader and BlocklistProcessor::open; update the example YAML to show the wired config (e.g., processors: - type: blocklist, source_column, path, prefix) and ensure the module-level /// docs and any public-item doc comments conform to the CONFIG_REFERENCE.md wording for this processor so public API docs are accurate and consistent.
378-470: 🧹 Nitpick | 🔵 TrivialAdd regression tests for the hardened error paths.
This suite still doesn't exercise the unsupported source-type error, duplicate-key conflict rejection, or output-column collision guard. Those are the branches most likely to regress in this file.
As per coding guidelines, "Write one test per behavior. Test function names must describe the scenario (e.g.,
empty_input_returns_none,invalid_json_skipped)."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/blocklist.rs` around lines 378 - 470, Add focused regression tests that exercise the unsupported source-type, duplicate-key conflict rejection, and output-column collision guard branches: write one test per behavior using BlocklistProcessor::from_reader and BlocklistProcessor::process to trigger each error path (e.g., unsupported_source_type_returns_err, duplicate_key_conflict_rejected, output_column_collision_error) and assert the returned Result is Err and contains the expected error variant/message; for duplicate-key test, construct a CSV with duplicate keys and ensure from_reader or process rejects it, for unsupported source-type call from_reader with a source type not handled by the processor, and for output-column collision create a processor where the output column name (e.g., "bl_match" or "bl_category") conflicts with an existing input column and assert an error is returned.crates/logfwd-runtime/src/processor/http_enrich.rs (3)
521-545: 🛠️ Refactor suggestion | 🟠 MajorRename
urlencodedto a verb_noun helper.Line 521 introduces a helper name that doesn't follow the repo's naming rule. Please rename it to something like
percent_encode_keyand update the call site/tests.As per coding guidelines, "Function names must be in
verb_nounformat (e.g.,parse_config,scan_buffer)."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/http_enrich.rs` around lines 521 - 545, Rename the helper function `urlencoded` to a verb_noun name (e.g., `percent_encode_key`) and update all call sites and tests to use the new name; keep the function signature (fn percent_encode_key(s: &str) -> String) and maintain existing behavior and visibility, then run tests to ensure no references to `urlencoded` remain.
367-375:⚠️ Potential issue | 🟡 MinorPreserve schema metadata in both batch rebuild paths.
Both
Schema::new(fields)calls dropbatch.schema().metadata(). Empty and non-empty batches should carry the same upstream Arrow schema annotations after enrichment.Suggested fix
- let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(Schema::new_with_metadata( + fields, + batch.schema().metadata().clone(), + )); ... - let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(Schema::new_with_metadata( + fields, + batch.schema().metadata().clone(), + ));Also applies to: 442-449
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/http_enrich.rs` around lines 367 - 375, The new schema construction currently drops existing schema metadata by calling Schema::new(fields) before creating the empty or rebuilt batch; update both places (the block that builds `fields` and calls `let schema = Arc::new(Schema::new(fields))` and the similar block around lines 442-449) to preserve the original metadata by cloning or reusing `batch.schema().metadata()` when constructing the new Schema (e.g., build the Schema from fields and the original metadata) so that `RecordBatch::new_empty(schema)` and the non-empty rebuild keep the upstream Arrow schema annotations.
229-246:⚠️ Potential issue | 🟠 MajorDon't mark arbitrary 200 bodies as JSON hits.
Line 245 upgrades any non-empty
200body toLookupResult::Hit, so HTML/plaintext gets cached and emitted through{prefix}_jsonunder"hit". That contradicts the module contract that this column contains JSON and will break downstreamjson()/json_str()consumers on invalid payloads. Validate the body before returningHit, or downgrade invalid payloads toMiss/Error.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/http_enrich.rs` around lines 229 - 246, The code currently treats any non-empty 200 response body as LookupResult::Hit which can cache/emit non-JSON into the JSON column; change the logic in http_enrich.rs (around the resp handling using resp.into_body().as_reader().take(...) and the LookupResult branch) to validate the buffered string as JSON (e.g., attempt serde_json::from_str on buf) and only return LookupResult::Hit(buf) when parsing succeeds; if parsing fails return LookupResult::Miss or an Error (with a clear message) instead, keeping the existing max_body_bytes limit and error branch behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@book/src/content/docs/configuration/reference.mdx`:
- Around line 610-613: Doc string for `process_info.start_time` is ambiguous:
update the description to state it is the ISO 8601 UTC timestamp when the
ProcessInfoTable (created by ProcessInfoTable::new during pipeline startup) was
constructed, not the OS process start time, so users won't confuse it with
uptime metadata; locate the `agent_name`/`agent_version`/`pid`/`start_time`
table entry and replace the `start_time` line with wording that mentions
ProcessInfoTable::new and pipeline startup construction time.
- Around line 753-759: The docs and examples fail to state that the enrichment
config field refresh_interval must be > 0 (cannot be 0); update the wording in
the enrichment examples in reference.mdx to explicitly say "refresh_interval
must be a positive integer (0 is invalid)" and update any other snippets (the
kv_file examples) accordingly, then enforce this validation in the config
parsing/validation logic (e.g., add a check in validateEnrichment or
EnrichmentConfig.validate to reject refresh_interval === 0) and add/adjust unit
tests that exercise the validation rule and any example configs in
examples/use-cases so they use a positive refresh_interval.
In `@crates/logfwd-config/src/types.rs`:
- Around line 627-650: Add behavior-focused /// doc comments to the newly public
structs: ProcessInfoConfig, KvFileEnrichmentConfig, NetworkInfoConfig,
ContainerInfoConfig, and K8sClusterInfoConfig. For ProcessInfoConfig document
the exact columns produced (e.g., include agent_version field name used by
ProcessInfoTable), expected semantics and any platform-specific notes; for
KvFileEnrichmentConfig document the meaning of table_name and path, the
refresh_interval semantics (file is reloaded from disk every N seconds when set,
and behavior when absent), and how missing/invalid entries are handled; for
NetworkInfoConfig, ContainerInfoConfig, and K8sClusterInfoConfig state what
enriched columns they produce and intended lifetime/refresh behavior. Ensure
each public type has a concise /// paragraph describing runtime behavior rather
than just the name.
In `@crates/logfwd-config/src/validate.rs`:
- Around line 839-847: The new validation branches currently use
cfg.table_name.is_empty() which allows whitespace-only names; update both
occurrences (the block validating env_vars prefix and the block validating
kv_file path around the later similar lines) to trim table_name before checking
— e.g., use cfg.table_name.trim().is_empty() — so that " " is treated as
empty; ensure you update the checks where cfg.prefix and cfg.path are already
trimmed to keep behavior consistent for table_name in validate.rs.
In `@crates/logfwd-output/src/row_json.rs`:
- Around line 217-221: The three branches that do let v =
arr.as_any().downcast_ref::<StringArray>().unwrap().value(row); must not use
unwrap; replace each unwrap-based downcast with fallible handling (e.g., if let
Some(sa) = arr.as_any().downcast_ref::<StringArray>() { let v = sa.value(row);
... } else { return Err(io::Error::new(io::ErrorKind::InvalidData, "expected
StringArray in <branch description>")); }) so that failures return io::Error
with ErrorKind::InvalidData consistent with the other branches; update the same
pattern for all three StringArray branches referencing arr, StringArray, and
value(row).
In `@crates/logfwd-runtime/Cargo.toml`:
- Around line 61-64: Move the ureq and csv dependencies up to the workspace
manifest by adding them under [workspace.dependencies] with explicit versions
(e.g., the versions currently used here) and any required metadata
(license/registry constraints), then replace the crate-level entries in this
crate's Cargo.toml (the ureq and csv dependency lines) with workspace references
(use workspace = true for each dependency). Update ureq to keep its feature
selection (e.g., rustls) via the crate manifest if needed while still
referencing the workspace dependency.
In `@crates/logfwd-runtime/src/pipeline/build.rs`:
- Around line 122-188: The reload tasks spawned for geo reloading (see
ReloadableGeoDb::new, reload_handle, and the handle.spawn blocks using
tokio::runtime::Handle::try_current()) are leaked because their JoinHandles are
dropped immediately; collect and store those JoinHandles on the pipeline/builder
so their lifetime is tied to the pipeline and they can be aborted/awaited on
drop or when a pipeline is rebuilt. Change the code that spawns the task to save
the returned JoinHandle (e.g., push into a Vec<tokio::task::JoinHandle<()>> or a
JoinSet field on the builder/pipeline), expose that field from the builder so
from_config can attach it to the constructed pipeline, and implement Drop for
the pipeline to abort/await those handles; apply the same fix to the other spawn
sites noted (the other geo reload blocks).
- Around line 130-175: The ticker created in the reload loops uses
tokio::time::interval which defaults to MissedTickBehavior::Burst; update each
reload loop that creates a variable named ticker (e.g., the async task spawned
via tokio::runtime::Handle::try_current() in build.rs and the other similar
reload tasks referenced at the comment) to call
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip)
immediately after creating the interval (i.e., right after let mut ticker =
tokio::time::interval(...)) so missed ticks are skipped and reloads occur at
most once per interval.
In `@crates/logfwd-transform/src/enrichment.rs`:
- Around line 1174-1178: The docs for ContainerInfoTable currently claim
detection uses "/run/containerd/" but the detect_container function does not
check that path; update the ContainerInfoTable /// doc comments to accurately
reflect actual detection sources used by detect_container (remove the
/run/containerd bullet or explicitly add code to probe that path) and ensure any
new public item has a /// doc comment describing its behavior; reference
ContainerInfoTable and detect_container when making the change so the public
docs and implementation stay consistent.
In `@crates/logfwd-transform/src/udf/csv_range_geo.rs`:
- Around line 210-227: The CsvRangeDatabase::lookup currently returns GeoResult
for user CSV private/unresolvable blocks; before converting or searching, detect
and return None for non-routable addresses by checking the parsed IpAddr (in
lookup) for private/unroutable categories (e.g., addr.is_private(),
addr.is_loopback(), addr.is_unspecified(), and for IPv6 addr.is_unique_local())
and short-circuit to None; do this check immediately after IpAddr::from_str
succeeds and before calling ip_to_u128 or using self.ranges so
CsvRangeDatabase::lookup preserves GeoDatabase semantics.
- Around line 84-114: The CSV header lookups (variables start_idx, end_idx,
country_code_idx) currently only check for "ip_range_start"/"start_ip",
"ip_range_end"/"end_ip", and "country_code", causing DB-IP files to be rejected;
update the header position checks on headers (used to compute start_idx,
end_idx, and country_code_idx) to also accept DB‑IP column name variants
"ip_start" and "ip_end" for the start/end indices and "country" for the country
column (preserving existing alternatives like
"start_ip"/"end_ip"/"country_code"). Ensure you adjust the position calls that
set start_idx, end_idx, and country_code_idx so the module truly supports DB‑IP
CSVs.
In `@dev-docs/research/enrichment-architecture-plan-2026-04.md`:
- Around line 9-18: The text incorrectly calls the processor stage
"post-transform processors" while the new blocklist/HTTP enrich processors run
pre-SQL; update the wording to reflect that these are pre-SQL processor stages
(e.g., replace "post-transform processors" with "pre-SQL processors" or "pre-SQL
enrichment processors") and ensure the paragraph that recommends materializing
live enrichment columns before SQL explicitly references the blocklist/HTTP
enrich processors to avoid the execution-order contradiction.
In `@examples/use-cases/app-with-metadata-enrichment-to-otlp.yaml`:
- Around line 2-6: The example's header mentions metadata enrichers but the
example leaves kv_file and env_vars commented out and the query only selects
from process_info/host_info, so update the YAML to actually demonstrate the
enrichers: enable (or add) a kv_file block and an env_vars block with example
keys, wire them into the pipeline/enricher configuration (the symbols to change
are kv_file, env_vars and the query that references process_info/host_info), and
adjust the query to show selection/serialization of the injected metadata fields
so users copying the file see the ConfigMap/env metadata flow end-to-end.
In `@examples/use-cases/kubernetes-enriched-to-otlp.yaml`:
- Line 27: The example registers the enrichment under a different table name
than the join expects: change the registered enrichment type from "k8s_path" to
the default "k8s_pods" so it matches the join reference (the join currently uses
"k8s_path" on Line 59); ensure the enrichment block that contains "- type:
k8s_path" is updated to "- type: k8s_pods" (or alternatively change the join to
"k8s_pods") so both the registration and the join use the same symbol
("k8s_path" vs "k8s_pods").
In `@examples/use-cases/nginx-geo-enriched-to-otlp.yaml`:
- Around line 2-6: The example currently doesn't perform GeoIP enrichment
because the geo_database block is commented out and the SQL never calls
geo_lookup(); update the example by either (A) enabling/configuring the
geo_database section and adding a call to geo_lookup() in the pipeline SQL so
the MaxMind DB is loaded and IPs are enriched (ensure the SQL references
geo_lookup(client_ip) or similar), or (B) if you intend this as a template,
rename the file or add a clear note that GeoIP is disabled; reference the
geo_database key and the geo_lookup() function and ensure the pipeline stage
that currently only adds host_info invokes geo_lookup() to produce geo fields.
---
Outside diff comments:
In `@crates/logfwd/src/main.rs`:
- Around line 1334-1450: The code duplicates enrichment/table/geo-db
construction logic from logfwd-runtime; replace the local match over
EnrichmentConfig and manual path resolution with a single call to the runtime
helper (e.g., a function in logfwd_runtime::pipeline::build that assembles
read-only enrichments). Concretely, remove the long match that mutates
enrichment_tables, geo_database, and does path joins for Csv/Jsonl/KvFile, and
instead call the runtime helper with the EnrichmentConfig list and base_path to
get back the Vec of tables and optional geo DB (preserving error mapping), then
assign enrichment_tables and geo_database from its return; also add the
appropriate use/import for that helper and adapt error handling to propagate the
same messages.
---
Duplicate comments:
In `@crates/logfwd-runtime/src/processor/blocklist.rs`:
- Around line 273-289: The schema creation currently uses Schema::new(fields)
which discards existing Arrow schema metadata; update the blocklist processor
where fields are built (after pushing match_name and cat_name) to construct the
schema with the original metadata by calling Schema::new_with_metadata(fields,
schema.metadata().clone()) instead of Schema::new(fields), ensuring you keep the
original batch.schema() metadata when creating the Arc::new(Schema).
- Around line 18-27: Update the stale module docs to reflect that blocklist is
now wired into the YAML config and replace the obsolete reference to
BlocklistProcessor::new with the actual public constructors
BlocklistProcessor::from_reader and BlocklistProcessor::open; update the example
YAML to show the wired config (e.g., processors: - type: blocklist,
source_column, path, prefix) and ensure the module-level /// docs and any
public-item doc comments conform to the CONFIG_REFERENCE.md wording for this
processor so public API docs are accurate and consistent.
- Around line 378-470: Add focused regression tests that exercise the
unsupported source-type, duplicate-key conflict rejection, and output-column
collision guard branches: write one test per behavior using
BlocklistProcessor::from_reader and BlocklistProcessor::process to trigger each
error path (e.g., unsupported_source_type_returns_err,
duplicate_key_conflict_rejected, output_column_collision_error) and assert the
returned Result is Err and contains the expected error variant/message; for
duplicate-key test, construct a CSV with duplicate keys and ensure from_reader
or process rejects it, for unsupported source-type call from_reader with a
source type not handled by the processor, and for output-column collision create
a processor where the output column name (e.g., "bl_match" or "bl_category")
conflicts with an existing input column and assert an error is returned.
In `@crates/logfwd-runtime/src/processor/http_enrich.rs`:
- Around line 521-545: Rename the helper function `urlencoded` to a verb_noun
name (e.g., `percent_encode_key`) and update all call sites and tests to use the
new name; keep the function signature (fn percent_encode_key(s: &str) -> String)
and maintain existing behavior and visibility, then run tests to ensure no
references to `urlencoded` remain.
- Around line 367-375: The new schema construction currently drops existing
schema metadata by calling Schema::new(fields) before creating the empty or
rebuilt batch; update both places (the block that builds `fields` and calls `let
schema = Arc::new(Schema::new(fields))` and the similar block around lines
442-449) to preserve the original metadata by cloning or reusing
`batch.schema().metadata()` when constructing the new Schema (e.g., build the
Schema from fields and the original metadata) so that
`RecordBatch::new_empty(schema)` and the non-empty rebuild keep the upstream
Arrow schema annotations.
- Around line 229-246: The code currently treats any non-empty 200 response body
as LookupResult::Hit which can cache/emit non-JSON into the JSON column; change
the logic in http_enrich.rs (around the resp handling using
resp.into_body().as_reader().take(...) and the LookupResult branch) to validate
the buffered string as JSON (e.g., attempt serde_json::from_str on buf) and only
return LookupResult::Hit(buf) when parsing succeeds; if parsing fails return
LookupResult::Miss or an Error (with a clear message) instead, keeping the
existing max_body_bytes limit and error branch behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 07463efa-6603-4911-8096-4b3f034dfbba
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (21)
book/src/content/docs/configuration/reference.mdxcrates/logfwd-config/src/lib.rscrates/logfwd-config/src/types.rscrates/logfwd-config/src/validate.rscrates/logfwd-output/src/row_json.rscrates/logfwd-runtime/Cargo.tomlcrates/logfwd-runtime/src/pipeline/build.rscrates/logfwd-runtime/src/processor/blocklist.rscrates/logfwd-runtime/src/processor/http_enrich.rscrates/logfwd-runtime/src/processor/mod.rscrates/logfwd-runtime/src/transform.rscrates/logfwd-transform/src/enrichment.rscrates/logfwd-transform/src/udf/csv_range_geo.rscrates/logfwd-transform/src/udf/mod.rscrates/logfwd/src/main.rscrates/logfwd/src/transform.rsdev-docs/research/README.mddev-docs/research/enrichment-architecture-plan-2026-04.mdexamples/use-cases/app-with-metadata-enrichment-to-otlp.yamlexamples/use-cases/kubernetes-enriched-to-otlp.yamlexamples/use-cases/nginx-geo-enriched-to-otlp.yaml
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/logfwd-runtime/src/processor/http_enrich.rs (1)
249-257:⚠️ Potential issue | 🟠 MajorValidate the body as JSON before returning
Hit.The current
{/[probe still accepts malformed payloads like{and rejects valid JSON scalars liketrue,"ok", or42. That means{prefix}_jsoncan carry invalid JSON while some valid 200 responses are surfaced as errors. Please parse the body and only emitHitwhen decoding succeeds; add a regression test for both cases.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/processor/http_enrich.rs` around lines 249 - 257, The code currently checks only buf.trim_start().starts_with('{') or '[' to decide LookupResult::Hit, which accepts malformed JSON and rejects valid JSON scalars; update the logic in the function containing this match (the block that returns LookupResult::Hit(buf) or LookupResult::Error(...)) to attempt to parse the trimmed body as JSON (e.g., serde_json::from_slice or from_str) and only return LookupResult::Hit(buf) when parsing succeeds, otherwise return LookupResult::Error with the parse error; also add regression tests that assert a Hit for valid JSON scalars like "true", "42", and "\"ok\"" and an Error for malformed payloads like "{" and "[" so both cases are covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-runtime/src/processor/blocklist.rs`:
- Around line 379-414: Add a test that verifies entries with empty category
strings are treated as NULL: create a new test (e.g.,
matched_entry_with_empty_category_is_null) that uses
BlocklistProcessor::from_reader with the existing CSV constant (which contains
"bad-bot,") and a batch containing Some("bad-bot"); call proc.process(...) and
assert that the "bl_match" column is true and the "bl_category" column is NULL
for that row, thereby exercising the load_blocklist path that filters out empty
category strings; reference BlocklistProcessor::from_reader, proc.process, CSV,
and the "bl_match"/"bl_category" column names to locate code.
- Around line 152-198: The three DataType arms (DataType::Utf8,
DataType::Utf8View, DataType::LargeUtf8) repeat the same looping/append_lookup
logic; extract that into a small helper (e.g., a local closure named
process_string_column or a helper function) that accepts a getter/array
reference and iterates 0..num_rows, computing key (None for null, Some(value)
otherwise) and calling append_lookup(&self.entries, key, &mut match_builder,
&mut cat_builder); then replace each arm with a single call to that helper
passing col.as_string::<i32>(), col.as_string_view(), or col.as_string::<i64>()
respectively (or an appropriate view-wrapper), keeping the existing error arm
that returns ProcessorError::Permanent with self.source_column and
col.data_type().
---
Duplicate comments:
In `@crates/logfwd-runtime/src/processor/http_enrich.rs`:
- Around line 249-257: The code currently checks only
buf.trim_start().starts_with('{') or '[' to decide LookupResult::Hit, which
accepts malformed JSON and rejects valid JSON scalars; update the logic in the
function containing this match (the block that returns LookupResult::Hit(buf) or
LookupResult::Error(...)) to attempt to parse the trimmed body as JSON (e.g.,
serde_json::from_slice or from_str) and only return LookupResult::Hit(buf) when
parsing succeeds, otherwise return LookupResult::Error with the parse error;
also add regression tests that assert a Hit for valid JSON scalars like "true",
"42", and "\"ok\"" and an Error for malformed payloads like "{" and "[" so both
cases are covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: ca31b8b7-6594-41a8-b353-0d0412c90a8d
📒 Files selected for processing (2)
crates/logfwd-runtime/src/processor/blocklist.rscrates/logfwd-runtime/src/processor/http_enrich.rs
- Replace .unwrap() downcasts with fallible error handling in row_json.rs - Trim table_name in k8s_path, env_vars, kv_file validation - Add /// doc comments to all public enrichment config types - Hoist ureq and csv to workspace dependencies - Fix ContainerInfoTable detection-source docs (remove /run/containerd/) - Clarify process_info.start_time wording in reference.mdx - Add refresh_interval >= 1 notes in reference.mdx - Fix pre-transform vs post-transform wording in research doc - Add test for matched blocklist entry with empty category Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 65e7e958e1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
crates/logfwd-config/src/validate.rs (2)
778-817:⚠️ Potential issue | 🟡 MinorKeep
table_namewhitespace validation consistent across enrichment types.
k8s_path,env_vars, andkv_filenow reject whitespace-only names, butstatic,csv, andjsonlin the same match still only checkis_empty(). That leavestable_name: " "valid for those three branches.Proposed fix
EnrichmentConfig::Static(cfg) => { - if cfg.table_name.is_empty() { + if cfg.table_name.trim().is_empty() { return Err(ConfigError::Validation(format!( "pipeline '{name}' enrichment #{j}: table_name must not be empty" ))); } @@ EnrichmentConfig::Csv(cfg) => { - if cfg.table_name.is_empty() { + if cfg.table_name.trim().is_empty() { return Err(ConfigError::Validation(format!( "pipeline '{name}' enrichment #{j}: table_name must not be empty" ))); } @@ EnrichmentConfig::Jsonl(cfg) => { - if cfg.table_name.is_empty() { + if cfg.table_name.trim().is_empty() { return Err(ConfigError::Validation(format!( "pipeline '{name}' enrichment #{j}: table_name must not be empty" ))); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/validate.rs` around lines 778 - 817, Update the table_name validation in the EnrichmentConfig::Csv, EnrichmentConfig::Jsonl, and EnrichmentConfig::Static branches to reject whitespace-only names the same way k8s_path/env_vars/kv_file do: replace the cfg.table_name.is_empty() check with a trimmed/whitespace-aware check (e.g., cfg.table_name.trim().is_empty()) so that values like " " are rejected and the existing error message ("pipeline '{name}' enrichment #{j}: table_name must not be empty") is preserved; look for the table_name checks inside those match arms in validate.rs to change the condition accordingly.
751-873: 🛠️ Refactor suggestion | 🟠 MajorAdd direct tests for the new enrichment validation branches.
This adds several new failure modes (
refresh_interval == 0,env_vars.prefix,kv_file.path/table_name, whitespace-onlyk8s_path.table_name), but the test module below still only covers the older enrichment cases. Please pin each new validator branch with aConfig::load_strregression test. As per coding guidelines, "Write one test per behavior. Test function names must describe the scenario."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/validate.rs` around lines 751 - 873, The test suite is missing direct regression tests for the new EnrichmentConfig validation branches (refresh_interval == 0, env_vars.prefix empty, kv_file.path/table_name empty, whitespace-only K8sPath table_name); add one focused test per behavior that calls Config::load_str with a minimal pipeline config exercising each failing branch and asserts Err(ConfigError::Validation(...)) with the expected message; create tests named to describe the scenario (e.g., validate_enrichment_refresh_interval_zero, validate_env_vars_prefix_empty, validate_kv_file_missing_path_or_table_name, validate_k8s_path_table_name_whitespace) and ensure each uses the specific variant from the diff (EnrichmentConfig::Csv/Jsonl/KvFile/EnvVars/K8sPath) to pin the regression.book/src/content/docs/configuration/reference.mdx (1)
955-966:⚠️ Potential issue | 🟡 MinorMove
enrichmentunderpipelines.appin this advanced-layout example.In the multi-pipeline schema,
enrichmentis a field of eachPipelineConfig, not a top-level key. Copied as written, this example will not validate.📝 Suggested fix
pipelines: app: inputs: - name: pod_logs type: file path: /var/log/pods/**/*.log format: cri @@ outputs: - name: collector type: otlp endpoint: ${OTEL_ENDPOINT} protocol: grpc compression: zstd - name: debug type: stdout format: console - -enrichment: - - type: host_info - - type: process_info - - type: network_info - - type: container_info - - type: k8s_cluster_info - - type: static - table_name: labels - labels: - environment: ${ENVIRONMENT} - cluster: ${CLUSTER_NAME} + enrichment: + - type: host_info + - type: process_info + - type: network_info + - type: container_info + - type: k8s_cluster_info + - type: static + table_name: labels + labels: + environment: ${ENVIRONMENT} + cluster: ${CLUSTER_NAME}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@book/src/content/docs/configuration/reference.mdx` around lines 955 - 966, The example places the enrichment block at top-level but enrichment is a per-PipelineConfig field; move the entire enrichment section under the relevant pipeline (e.g., pipelines.app) so it becomes part of that pipeline's config. Locate the enrichment block and relocate it as a child of pipelines.app in the advanced-layout example, ensuring keys like type, table_name and labels remain unchanged and that the resulting YAML nests enrichment inside the PipelineConfig for the app pipeline.crates/logfwd-runtime/src/pipeline/build.rs (1)
739-765:⚠️ Potential issue | 🟡 MinorRestore the zero
poll_interval_msassertion.After this refactor the test only exercises
batch_timeout_ms: Some(0), so a regression inpoll_interval_msvalidation would now slip through despite the test name claiming both cases are covered.As per coding guidelines, "Write one test per behavior. Test function names must describe the scenario (e.g.,
empty_input_returns_none,invalid_json_skipped)."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-runtime/src/pipeline/build.rs` around lines 739 - 765, The test from_config_rejects_zero_batch_and_poll_timeouts no longer checks zero poll interval; restore validation by constructing a PipelineConfig with poll_interval_ms: Some(0) (and a valid non-zero batch_timeout_ms) and call Pipeline::from_config to assert it returns an Err containing a message about poll_interval_ms must be > 0, or alternatively split into two tests: one that sets batch_timeout_ms: Some(0) to assert Pipeline::from_config rejects it and a second test that sets poll_interval_ms: Some(0) to assert Pipeline::from_config rejects poll intervals of zero; reference the existing test name and Pipeline::from_config and the cfg fields poll_interval_ms and batch_timeout_ms to locate where to add/modify the assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@book/src/content/docs/configuration/reference.mdx`:
- Around line 685-698: Update the stale note in the `k8s_path` enrichment
section to reflect that the `logs` table exposes `_source_path` and `k8s_path`
exposes `log_path_prefix`, remove the incorrect claim that the join is not
wired, and replace the example with the correct join using those fields (i.e.,
join `logs` l ON l._source_path = k8s_path.log_path_prefix), referencing the
`k8s_path` enrichment and `logs` table, and ensure the example SQL uses `LEFT
JOIN k8s_path kp ON l._source_path = kp.log_path_prefix`.
In `@crates/logfwd-runtime/src/pipeline/build.rs`:
- Around line 96-191: Alternate builder misses the periodic refresh logic
present in the runtime builder: replicate the refresh_interval handling
(creating ReloadableGeoDb via
crate::transform::enrichment::ReloadableGeoDb::new(initial_db), obtaining
reload_handle, creating a background reload task that uses
tokio::task::spawn_blocking to reopen based on GeoDatabaseFormat (e.g.,
GeoDatabaseFormat::Mmdb and GeoDatabaseFormat::CsvRange), replacing the
reload_handle on success and logging on failure, and storing the JoinHandle in
reload_tasks) in the alternate builder’s construction path so that when
refresh_interval is Some(...) the code uses reloadable (assigned to
geo_database) and schedules the same periodic reload loop instead of performing
only a one-shot reload().
In `@crates/logfwd-runtime/src/processor/http_enrich.rs`:
- Around line 395-398: The code currently treats a missing source column as
all-miss by returning vec![None; num_rows]; change this to fail fast like
BlocklistProcessor by returning an Err(ProcessorError::Permanent(...)) when
batch.column_by_name(&self.config.source_column) is None, referencing the same
self.config.source_column in the error message so typos are visible; update the
match in http_enrich.rs where keys is assigned (the match on
batch.column_by_name) to produce Err(ProcessorError::Permanent(...)) instead of
vec![None; num_rows], preserving the function's Result error type.
In `@crates/logfwd-transform/src/enrichment.rs`:
- Around line 2027-2030: Add a SAFETY comment above the cleanup unsafe block
that calls std::env::remove_var("LOGFWD_DUPTEST_FOO") and
remove_var("LOGFWD_DUPTEST_foo"), mirroring the invariant explained in the
earlier unsafe block: explain why calling these safe std::env functions in an
unsafe block is required (e.g., for test isolation or to match the earlier
unsafe rationale), and explicitly state the invariant being upheld
(thread-safety or single-threaded test execution) so the block complies with the
project's unsafe-block documentation rule.
In `@crates/logfwd-transform/src/udf/csv_range_geo.rs`:
- Around line 138-141: The current check "if end < start { continue; }" silently
drops inverted ranges; change it to return an error instead so inverted ranges
are treated as malformed like the other cases. Replace the silent continue with
a propagated Err(...) (using the same error variant/format the loader uses for
other malformed-row cases) and include a message that includes the start and end
values; keep the rest of the function logic unchanged so row parsing fails fast
for inverted ranges (refer to the same malformed-row error usage elsewhere in
this module for the exact error type/format).
In `@examples/use-cases/kubernetes-enriched-to-otlp.yaml`:
- Around line 8-12: Update the Prerequisites note to explicitly state that
/etc/os-release must be mounted from the host (hostPath) into the DaemonSet so
node metadata (os_name) reflects the Kubernetes node rather than the agent
image; reference the existing bullet that mentions "Mount /etc/os-release from
the host" and replace the ambiguous text with a clear sentence like "Mount
/etc/os-release from the host (hostPath) into the container so node metadata
uses the host's OS info rather than the container image."
---
Outside diff comments:
In `@book/src/content/docs/configuration/reference.mdx`:
- Around line 955-966: The example places the enrichment block at top-level but
enrichment is a per-PipelineConfig field; move the entire enrichment section
under the relevant pipeline (e.g., pipelines.app) so it becomes part of that
pipeline's config. Locate the enrichment block and relocate it as a child of
pipelines.app in the advanced-layout example, ensuring keys like type,
table_name and labels remain unchanged and that the resulting YAML nests
enrichment inside the PipelineConfig for the app pipeline.
In `@crates/logfwd-config/src/validate.rs`:
- Around line 778-817: Update the table_name validation in the
EnrichmentConfig::Csv, EnrichmentConfig::Jsonl, and EnrichmentConfig::Static
branches to reject whitespace-only names the same way k8s_path/env_vars/kv_file
do: replace the cfg.table_name.is_empty() check with a trimmed/whitespace-aware
check (e.g., cfg.table_name.trim().is_empty()) so that values like " " are
rejected and the existing error message ("pipeline '{name}' enrichment #{j}:
table_name must not be empty") is preserved; look for the table_name checks
inside those match arms in validate.rs to change the condition accordingly.
- Around line 751-873: The test suite is missing direct regression tests for the
new EnrichmentConfig validation branches (refresh_interval == 0, env_vars.prefix
empty, kv_file.path/table_name empty, whitespace-only K8sPath table_name); add
one focused test per behavior that calls Config::load_str with a minimal
pipeline config exercising each failing branch and asserts
Err(ConfigError::Validation(...)) with the expected message; create tests named
to describe the scenario (e.g., validate_enrichment_refresh_interval_zero,
validate_env_vars_prefix_empty, validate_kv_file_missing_path_or_table_name,
validate_k8s_path_table_name_whitespace) and ensure each uses the specific
variant from the diff (EnrichmentConfig::Csv/Jsonl/KvFile/EnvVars/K8sPath) to
pin the regression.
In `@crates/logfwd-runtime/src/pipeline/build.rs`:
- Around line 739-765: The test from_config_rejects_zero_batch_and_poll_timeouts
no longer checks zero poll interval; restore validation by constructing a
PipelineConfig with poll_interval_ms: Some(0) (and a valid non-zero
batch_timeout_ms) and call Pipeline::from_config to assert it returns an Err
containing a message about poll_interval_ms must be > 0, or alternatively split
into two tests: one that sets batch_timeout_ms: Some(0) to assert
Pipeline::from_config rejects it and a second test that sets poll_interval_ms:
Some(0) to assert Pipeline::from_config rejects poll intervals of zero;
reference the existing test name and Pipeline::from_config and the cfg fields
poll_interval_ms and batch_timeout_ms to locate where to add/modify the
assertions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: ffbae129-cdfe-4068-8418-f684bf0ff55c
📒 Files selected for processing (14)
Cargo.tomlbook/src/content/docs/configuration/reference.mdxcrates/logfwd-config/src/types.rscrates/logfwd-config/src/validate.rscrates/logfwd-output/src/row_json.rscrates/logfwd-runtime/Cargo.tomlcrates/logfwd-runtime/src/pipeline/build.rscrates/logfwd-runtime/src/pipeline/mod.rscrates/logfwd-runtime/src/processor/blocklist.rscrates/logfwd-runtime/src/processor/http_enrich.rscrates/logfwd-transform/src/enrichment.rscrates/logfwd-transform/src/udf/csv_range_geo.rsdev-docs/research/enrichment-architecture-plan-2026-04.mdexamples/use-cases/kubernetes-enriched-to-otlp.yaml
- Add tracing::warn else branches to all 4 tokio::spawn guards in build.rs - Fix O(n²) missing-key detection in HTTP enrich fetch_missing (scope HashSet to current chunk only) - Add output column collision checks in blocklist and HTTP enrich processors - Add doc comments to CsvRangeDatabase re-exports (3 files) - Restore host_metrics input section and table row in reference.mdx - Document primary_ip lexicographic selection rule in reference.mdx - Fix kubernetes example: use k8s_path and process_info in SQL query - Improve env test safety comment (nextest process isolation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix k8s_path JOIN to use _source_path = log_path_prefix (macroscope, coderabbit)
- Fix stale blocklist module doc (remove 'not yet wired', dead ::new ref)
- Blocklist: error on missing source_column instead of silent pass-through
- HTTP enrich: reject non-JSON 200 responses (must start with '{' or '[')
- HTTP enrich: rename urlencoded → encode_url_key (verb_noun convention)
- Preserve schema metadata in both processors (Schema::new_with_metadata)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Replace .unwrap() downcasts with fallible error handling in row_json.rs - Trim table_name in k8s_path, env_vars, kv_file validation - Add /// doc comments to all public enrichment config types - Hoist ureq and csv to workspace dependencies - Fix ContainerInfoTable detection-source docs (remove /run/containerd/) - Clarify process_info.start_time wording in reference.mdx - Add refresh_interval >= 1 notes in reference.mdx - Fix pre-transform vs post-transform wording in research doc - Add test for matched blocklist entry with empty category Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
65e7e95 to
8a2e157
Compare
…ocessors, geo lookup
Add comprehensive enrichment system with two layers:
1. Snapshot tables (EnrichmentTable trait → DataFusion MemTables, joined via SQL):
- static, host_info, k8s_path, csv, jsonl, env_vars, geo_database
- process_info, kv_file, network_info, container_info, k8s_cluster_info
2. Processor-stage enrichers (run before SQL, can do I/O):
- BlocklistProcessor: zero-copy Arrow array lookups, CSV-loaded blocklists
- HttpEnrichProcessor: batch-blocking HTTP lookups with concurrent fetch,
deduplication, caching, and configurable concurrency
Also includes:
- CSV IP-range geo backend (binary search, no MaxMind dependency)
- Config types, validation, and pipeline wiring for all 12 enrichment types
- Expanded reference docs covering all enrichment types
- 3 example configs (K8s, GeoIP, ConfigMap metadata)
- Architecture plan document
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- blocklist: use 1-indexed row numbers consistently in error messages - env_vars: error on duplicate column names after lowercasing instead of silently deduplicating (nondeterministic behavior) - container_info: parse Docker cgroup v2 'docker-<id>.scope' paths - Add tests for all three fixes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- blocklist: error on unsupported source column types instead of silently treating all rows as non-matches - http_enrich: add max_body_bytes (1 MiB default) to cap response body reads and prevent unbounded memory growth - processor/mod.rs: add doc comments to public module exports - types.rs: document process_info column names in variant doc - enrichment.rs: move FixedGeoDb and remaining tests inside #[cfg(test)] mod tests - enrichment.rs: fix case-insensitive scope suffix stripping in docker cgroup v2 parser (use rsplit_once instead of strip_suffix) - kubernetes example: remove misleading k8s_namespace alias (the k8s_cluster_info.namespace is collector-scoped, not per-log) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- http_enrich: compute cache eviction headroom from cacheable entries only, excluding errors that are never inserted - enrichment: drop RwLock read guard before calling GeoDatabase::lookup() to avoid blocking reload writers during lookups - env_vars: skip vars matching prefix exactly (would create empty column) - enrichment: add doc comments to all public constructors - network_info: document that primary_ipv4/ipv6 are lexicographically first, not necessarily the default-route address - env dup test: gate with #[cfg(unix)] since Windows env vars are case-insensitive Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… ranges - Wrap all 4 tokio::spawn calls in build.rs with Handle::try_current() so Pipeline::from_config does not panic when called without a runtime - Add overlap validation to CsvRangeDatabase::load_from_reader after sorting ranges, preventing ambiguous binary-search lookups - Add u128_to_ip_string helper for readable overlap error messages - Add tests for overlapping and adjacent range edge cases Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
extract_strings now returns an error for unsupported column types instead of silently returning all-None which made config errors indistinguishable from genuine lookup misses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Switch from std::env::vars() (which panics on non-UTF8 keys/values) to std::env::vars_os() with graceful skip of non-UTF8 entries. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Eviction now counts only net-new keys (not already in cache) so refreshing an existing key does not cause unnecessary churn - Capacity trimming applies after filtering out Error results so partial-failure bursts do not discard cacheable entries Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Add tracing::warn else branches to all 4 tokio::spawn guards in build.rs - Fix O(n²) missing-key detection in HTTP enrich fetch_missing (scope HashSet to current chunk only) - Add output column collision checks in blocklist and HTTP enrich processors - Add doc comments to CsvRangeDatabase re-exports (3 files) - Restore host_metrics input section and table row in reference.mdx - Document primary_ip lexicographic selection rule in reference.mdx - Fix kubernetes example: use k8s_path and process_info in SQL query - Improve env test safety comment (nextest process isolation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use _source_path and log_path_prefix (the actual column names) instead of the incorrect _path and path placeholders. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix k8s_path JOIN to use _source_path = log_path_prefix (macroscope, coderabbit)
- Fix stale blocklist module doc (remove 'not yet wired', dead ::new ref)
- Blocklist: error on missing source_column instead of silent pass-through
- HTTP enrich: reject non-JSON 200 responses (must start with '{' or '[')
- HTTP enrich: rename urlencoded → encode_url_key (verb_noun convention)
- Preserve schema metadata in both processors (Schema::new_with_metadata)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix broken intra-doc links in csv_range_geo.rs (use crate:: paths) - Add explicit table_name: k8s_path to kubernetes example config (default is k8s_pods, not k8s_path) - Treat HTTP 204 No Content as Miss in http_enrich processor - Update doc comments to reflect 204 handling Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Replace .unwrap() downcasts with fallible error handling in row_json.rs - Trim table_name in k8s_path, env_vars, kv_file validation - Add /// doc comments to all public enrichment config types - Hoist ureq and csv to workspace dependencies - Fix ContainerInfoTable detection-source docs (remove /run/containerd/) - Clarify process_info.start_time wording in reference.mdx - Add refresh_interval >= 1 notes in reference.mdx - Fix pre-transform vs post-transform wording in research doc - Add test for matched blocklist entry with empty category Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8a2e157 to
05f11c9
Compare
Auto-dismissed because every review thread opened from this change request is now resolved. If additional changes are still required, please leave a new review.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 05f11c945f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (path.contains("/kubepods") || path.contains("containerd-")) | ||
| && let Some(id) = extract_hex_id_from_path(path) | ||
| { | ||
| let runtime = if path.contains("containerd") { | ||
| "containerd" |
There was a problem hiding this comment.
Detect CRI-O before generic kubepods runtime fallback
The generic kubepods/containerd branch runs before the CRI-O-specific /crio- parsing, so cgroup paths like /kubepods.../crio-<id>.scope are classified as "kubernetes" instead of "cri-o". In CRI-O Kubernetes environments this makes container_runtime consistently wrong and prevents the CRI-O branch below from ever executing. Check for CRI-O first (or include crio in the generic runtime selection) so runtime labels stay accurate.
Useful? React with 👍 / 👎.
| CROSS JOIN process_info p | ||
| CROSS JOIN container_info c | ||
| CROSS JOIN k8s_cluster_info k | ||
| LEFT JOIN k8s_path kp ON l._source_path = kp.log_path_prefix |
There was a problem hiding this comment.
Join k8s_path example using prefix matching
This join uses exact equality between l._source_path and kp.log_path_prefix, but _source_path is a full filename (for example .../container/0.log) while log_path_prefix is only the directory prefix ending in /. As written, the LEFT JOIN will not match normal CRI file rows, so the k8s_namespace/k8s_pod/k8s_container fields remain NULL in the published example. Use a prefix-based predicate (or join on a full-path key) to make enrichment actually work.
Useful? React with 👍 / 👎.
The _source_path is a full file path while log_path_prefix is a directory prefix. Equality never matches — use starts_with() for correct prefix matching. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8a5a7401d3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| pub mod blocklist; | ||
| /// HTTP enrichment processor: per-key HTTP lookups with caching and | ||
| /// concurrency control. | ||
| pub mod http_enrich; |
There was a problem hiding this comment.
Wire new processors into YAML pipeline construction
Exporting blocklist/http_enrich here makes the feature look available, but this commit never adds a processors section to PipelineConfig nor any from_config wiring to instantiate these processors, so pipeline construction from YAML still leaves processors empty and deny_unknown_fields rejects a processors: key. In practice, users cannot enable the new processors through config despite the new processor modules and docs examples.
Useful? React with 👍 / 👎.
Summary
Add a two-layer enrichment system that lets pipelines join static metadata, look up geo-IP data, call HTTP APIs, and filter with blocklists — all configured via YAML.
Enrichment tables (SQL-joinable via DataFusion)
statichost_infonetwork_infoprocess_infocontainer_infok8s_pathk8s_cluster_infocsvjsonlkv_fileKEY=VALUEfileenv_varsgeo_databaseProcessors (run before SQL, can do I/O)
Also includes
tokio::spawnwithHandle::try_current()guardTest plan
cargo test -p logfwd-transform -p logfwd-runtime— 258 tests passSELECT * FROM host_infoin DataFusion SQLcargo clippy --workspace -- -D warnings)Relates to #1671
Note
Add enrichment tables, CSV geo backend, blocklist and HTTP enrichment processors to pipeline
CsvRangeDatabaseincrates/logfwd-transform/src/udf/csv_range_geo.rs: a CSV-backed IP range geo database (DB-IP Lite–style) with binary-search lookup and overlap detection at load time.BlocklistProcessorincrates/logfwd-runtime/src/processor/blocklist.rs: appends<prefix>_match(Boolean) and<prefix>_category(Utf8) columns to each batch from a CSV blocklist file.HttpEnrichProcessorincrates/logfwd-runtime/src/processor/http_enrich.rs: enriches batches via blocking HTTP GET with a TTL cache, bounded concurrency, and<prefix>_json/<prefix>_statusoutput columns.EnrichmentConfigincrates/logfwd-config/src/types.rswithEnvVars,ProcessInfo,KvFile,NetworkInfo,ContainerInfo, andK8sClusterInfovariants, each with a corresponding table implementation wired in the pipeline builder.ReloadableGeoDb/GeoReloadHandleandEnvTabletocrates/logfwd-transform/src/enrichment.rs; geo databases and CSV/JSONL/KV enrichment tables now support periodic background reload via Tokio tasks whenrefresh_intervalis configured.Macroscope summarized 8a5a740.