perf: allocation regression tests + cache DataFusion SessionContext (#465)#609
Conversation
Scanner allocation tests (logfwd-arrow, stats_alloc): - storage_scanner_no_leak_across_batches: compares two windows of 10 batches — allocation bytes should be stable, not growing - streaming_scanner_no_leak_across_batches: same for zero-copy path - storage_scanner_allocs_are_linear_not_quadratic: verifies StorageBuilder per-row allocations are O(n), not O(n²); also verifies StreamingBuilder is sub-linear (<5x allocs for 10x data) Input churn tests (logfwd-io, dhat): - framed_input_no_buffer_churn: after warmup, new allocations per poll stay bounded (~3/poll for Vec<InputEvent> + data Vec, see #608) - framed_input_no_leak_across_polls: live allocations stay small after all data consumed Tests use separate integration test binaries since #[global_allocator] is per-binary (stats_alloc and dhat can't coexist). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds multiple allocation-regression and churn integration tests and corresponding dev-dependencies across crates: crates/logfwd, crates/logfwd-arrow, crates/logfwd-io, and crates/logfwd-output. Tests install process-wide allocators/profilers ( Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-arrow/Cargo.toml`:
- Around line 13-14: Update the dev-dependency declaration for stats_alloc in
Cargo.toml to pin the newer patch release: change the version string for the
stats_alloc entry under [dev-dependencies] from "0.1" to "0.1.10" so tests use
the updated 0.1.10 package.
In `@crates/logfwd-arrow/tests/allocation_regression.rs`:
- Around line 88-100: The test currently calls data.clone() inside each loop
iteration causing clone allocations to be measured; move the Bytes creation out
of the Region windows so Region::new(&GLOBAL) and reg1.change()/reg2.change()
only capture scanner.scan allocations: e.g. create the input once
(Bytes::from(data.clone()) or reuse a prebuilt bytes variable) before each
window and then call scanner.scan(bytes.clone() or &bytes) inside the loop so
that scanner.scan (the symbol to isolate) is the only thing measured rather than
repeated data.clone() calls.
- Around line 115-168: The test storage_scanner_allocs_are_linear_not_quadratic
is misleading because it also asserts streaming scanner behavior (uses
StreamingSimdScanner::new and streaming.scan), so either rename the test to
reflect both scanners or split the streaming assertions into a new #[test]
(e.g., streaming_scanner_allocs_are_sublinear). To split: remove the block that
constructs StreamingSimdScanner, its warmup, Region measurements
(reg_s500/reg_s5000) and the streaming_ratio assert from
storage_scanner_allocs_are_linear_not_quadratic and place them into a new test
function that imports make_ndjson, bytes::Bytes, and uses the same warmup and
data_500/data_5000 variables; ensure the new test is annotated with #[test] and
keeps the same assertions and thresholds. If renaming instead, update the test
function name to reflect both SimdScanner and StreamingSimdScanner so the intent
matches the checks.
In `@crates/logfwd-io/tests/allocation_churn.rs`:
- Around line 58-92: The test hardcodes polls_remaining = 45 which couples it to
MockSource::repeating(&chunk, 50) and the warmup loop (for _ in 0..5), making
the assertion brittle; change the test to compute polls_remaining dynamically by
reading the source's configured event count (from MockSource::repeating(...) or
its returned/mock metadata) minus the warmup iterations (the 5 used in the
warmup loop) so max_acceptable = (configured_count - warmup_count) * 5; locate
MockSource::repeating, the warmup for _ in 0..5 loop, and the polls_remaining /
max_acceptable calculations to implement the derived value instead of the
literal 45.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 21b2b286-8ef9-4abb-85b7-959e707a69e3
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
crates/logfwd-arrow/Cargo.tomlcrates/logfwd-arrow/tests/allocation_regression.rscrates/logfwd-io/Cargo.tomlcrates/logfwd-io/tests/allocation_churn.rs
| [dev-dependencies] | ||
| stats_alloc = "0.1" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check latest stats_alloc version on crates.io
curl -s https://crates.io/api/v1/crates/stats_alloc | jq '.crate.max_version, .crate.updated_at'Repository: strawgate/memagent
Length of output: 100
Update stats_alloc to version 0.1.10.
The dependency is justified for allocation regression tests, but version 0.1 is outdated. The latest version is 0.1.10 (last updated 2022-03-30).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-arrow/Cargo.toml` around lines 13 - 14, Update the
dev-dependency declaration for stats_alloc in Cargo.toml to pin the newer patch
release: change the version string for the stats_alloc entry under
[dev-dependencies] from "0.1" to "0.1.10" so tests use the updated 0.1.10
package.
Output encoding tests (logfwd-output, stats_alloc): - write_row_json zero-alloc verification (pre-allocated buffer reuse) - OTLP encode_batch stability across repeated batches End-to-end pipeline test (logfwd, stats_alloc): - Full file → scan → transform → null sink pipeline - Asserts <2KB/row allocation (catches per-row regressions) - Asserts <32MB retained after exit (catches true leaks vs expected OTel/tokio infrastructure retention of ~8-15MB) Note: allocation tests with stats_alloc must run with --test-threads=1 since Region snapshots share global allocator state. Parallel test execution causes false interference between tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-output/tests/allocation_regression.rs`:
- Around line 129-132: The ratio assertion fails when both windows allocate zero
bytes; modify the test around the computation of growth (using
stats2.bytes_allocated and stats1.bytes_allocated) to handle zero-allocation
explicitly: if stats1.bytes_allocated == 0 && stats2.bytes_allocated == 0 then
skip or short-circuit the ratio assertion (treat as stable/return from the
test), otherwise compute growth as now and perform the assert on the ratio;
update the assertion block that references growth to only run when at least one
allocation is non-zero.
- Around line 11-13: Tests that use the global allocator (Region, GLOBAL,
INSTRUMENTED_SYSTEM) must be run serially to avoid cross-test contamination;
either run the test binary with the single-thread harness flag by adding
"--test-threads=1" to the test invocation or serialize access inside the test
module by introducing a static Mutex (e.g., static ALLOC_MUTEX: Mutex<()> =
Mutex::new(())) and locking it at the start of each allocator-instrumented test
that references GLOBAL/INSTRUMENTED_SYSTEM/Region; update the test(s) to acquire
the mutex (or ensure the test runner flag is set) so allocator measurements
aren't interleaved.
In `@crates/logfwd/tests/allocation_e2e.rs`:
- Around line 66-71: The fixed 2s cancellation makes the bytes/row assertion
hardware-dependent; replace the time-based shutdown (Duration::from_secs(2) +
sd.cancel()) with a condition that waits for a minimum processed-row threshold
before calling sd.cancel(), or run a deterministic workload window instead;
locate the test's spawn block that sleeps then calls sd.cancel() and the
following ratio assertion (around the bytes/row check at lines referenced
95-101) and gate the assertion on a MIN_PROCESSED_ROWS constant (or compute the
deterministic number of rows to process) so the ratio is only checked when
enough rows were actually processed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ee2b6523-be5b-4021-a12c-5699ccb4cc0d
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
crates/logfwd-output/Cargo.tomlcrates/logfwd-output/tests/allocation_regression.rscrates/logfwd/Cargo.tomlcrates/logfwd/tests/allocation_e2e.rs
Measured baseline: ~1130 bytes/row, 0.07 allocs/row for 50K rows through full pipeline (file → scan → transform → null sink). Tighten threshold from 2048 to 1700 bytes/row (50% above baseline). This catches a single per-row String allocation (~100-200 bytes) which the old 2048 threshold would have missed. Add eprintln output showing allocation profile (rows, bytes, per-row stats) for visibility when tests run with --nocapture. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New scaling test measures per-row allocation at 10K/100K/500K rows: 10K rows: ~2089 bytes/row (fixed overhead dominates) 100K rows: ~1005 bytes/row (amortized) 500K rows: ~905 bytes/row (asymptotic per-row cost) Asymptotic cost is ~900 bytes/row for ~120 bytes input JSON — 7.5x amplification from flat text to typed Arrow columnar representation. This is expected (3 typed columns × value storage + alignment + metadata). Tighten e2e threshold from 1700 to 1500 bytes/row based on measured data. The scaling test asserts per-row cost DECREASES with volume (proving overhead is fixed, not per-row). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
crates/logfwd/tests/allocation_e2e.rs (1)
66-71:⚠️ Potential issue | 🟠 MajorMake the bytes/row regression gate workload-stable, not wall-clock-stable.
Line 69’s fixed 2s cancel makes Line 109’s bytes/row assertion hardware-dependent; with low processed rows, startup overhead dominates and yields false regressions. Gate the ratio check behind a minimum processed-row threshold (or cancel after reaching a row target).
Based on learnings: Applies to `**/*.rs` — “Do not introduce allocations in hot paths without benchmarking.”Suggested stabilization
+ const MIN_PROCESSED_ROWS_FOR_RATIO: u64 = 10_000; + // Verify we actually processed data. assert!(batches > 0, "pipeline processed no batches"); assert!(rows > 0, "pipeline processed no rows"); + assert!( + rows >= MIN_PROCESSED_ROWS_FOR_RATIO, + "insufficient rows for stable allocation signal: {rows}" + ); @@ assert!( bytes_per_row < 1700.0,Also applies to: 88-90, 109-115
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/tests/allocation_e2e.rs` around lines 66 - 71, The test currently cancels the pipeline after a fixed Duration::from_secs(2) (the spawned thread calling sd.cancel()), which makes the bytes/row assertion hardware-dependent; change the cancellation logic to be workload-stable by waiting for a minimum processed-row count instead of a wall-clock sleep—either poll a shared counter (e.g., the test's processed_rows/rows_processed metric) or have the pipeline signal when it has processed a target number of rows, and only call sd.cancel() once that minimum (choose a sensible MIN_ROWS) is reached; also gate the bytes/row assertion (the check around lines 109–115) behind that same minimum processed-row threshold so the regression check only runs when sufficient rows were processed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd/tests/allocation_e2e.rs`:
- Around line 28-30: The docstring claims a two-window stability check but the
test only measures one aggregate window; update the test to match the doc or
vice versa: either change the comment/test name to reflect a single-window
aggregation, or implement the promised window-vs-window comparison by capturing
a Region snapshot before the first pipeline run and a second Region after the
second window, compute Region::delta (or the existing delta logic) between those
two snapshots and assert allocations are stable across windows; locate the test
in crates/logfwd/tests/allocation_e2e.rs and modify the Region capture/assert
logic accordingly.
---
Duplicate comments:
In `@crates/logfwd/tests/allocation_e2e.rs`:
- Around line 66-71: The test currently cancels the pipeline after a fixed
Duration::from_secs(2) (the spawned thread calling sd.cancel()), which makes the
bytes/row assertion hardware-dependent; change the cancellation logic to be
workload-stable by waiting for a minimum processed-row count instead of a
wall-clock sleep—either poll a shared counter (e.g., the test's
processed_rows/rows_processed metric) or have the pipeline signal when it has
processed a target number of rows, and only call sd.cancel() once that minimum
(choose a sensible MIN_ROWS) is reached; also gate the bytes/row assertion (the
check around lines 109–115) behind that same minimum processed-row threshold so
the regression check only runs when sufficient rows were processed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 36ec405c-04d3-499b-9edb-2af536153f94
📒 Files selected for processing (1)
crates/logfwd/tests/allocation_e2e.rs
Removed temporary dhat profiling tests (diagnostic, not CI). Profiling results documented in #611. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd/tests/allocation_scaling.rs`:
- Around line 23-60: Extract the duplicated test helpers into a shared test
utility module (e.g., expand logfwd_test_utils) by moving the measure_pipeline
implementation and any helper functions it uses (such as test_meter and the YAML
config generation) into that module; update both allocation_scaling.rs and
allocation_e2e.rs to call logfwd_test_utils::measure_pipeline (or appropriately
named exported functions) and reuse Config::load_str/Pipeline::from_config usage
from the helper so tests import the common utility instead of duplicating the
same code.
- Around line 47-50: The test uses a fixed std::thread::spawn with
std::thread::sleep(Duration::from_secs(3)) before calling sd.cancel(), which
makes the cancellation timing hardware-dependent and flaky; replace the
sleep-based cancellation with a deterministic signal tied to pipeline completion
(for example wait on the pipeline's completion channel/receiver, join a worker
handle, or poll a shared processed_rows counter/Condvar) and invoke sd.cancel()
only after that completion condition is met; locate the spawned thread code
referencing std::thread::spawn, Duration::from_secs, and sd.cancel() in
allocation_scaling.rs and change it to wait on the real completion signal
instead of sleeping.
- Around line 56-59: The test truncates u64 counters to usize by casting
pipeline.metrics().batch_rows_total and .batches_total to usize; change the
function/test to return (u64, u64, u64) instead of (u64, usize, usize) and
remove the lossy casts so you return stats.bytes_allocated as u64,
pipeline.metrics().batch_rows_total.load(Ordering::Relaxed) and
pipeline.metrics().batches_total.load(Ordering::Relaxed) directly, and update
any callers of this test helper to accept the new (u64, u64, u64) tuple.
- Around line 87-90: The strict assert comparing per_row_500k and per_row_10k
can flake; update the assertion in the test to allow a small tolerance (e.g.
require per_row_500k < per_row_10k * 1.05) so minor CI variance won’t fail the
test, and adjust the failure message to reflect the tolerance and include the
actual values (referencing the variables per_row_500k and per_row_10k and the
measurement produced by measure_pipeline).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a8209ad8-c7cb-42e4-9dc6-db50d4433016
📒 Files selected for processing (2)
crates/logfwd/tests/allocation_e2e.rscrates/logfwd/tests/allocation_scaling.rs
| fn measure_pipeline(row_count: usize) -> (u64, usize, usize) { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| let log_path = dir.path().join("test.log"); | ||
| logfwd_test_utils::generate_json_lines(&log_path, row_count, "scale-test"); | ||
|
|
||
| let yaml = format!( | ||
| r#" | ||
| input: | ||
| type: file | ||
| path: {} | ||
| format: json | ||
| output: | ||
| type: "null" | ||
| "#, | ||
| log_path.display() | ||
| ); | ||
|
|
||
| let config = Config::load_str(&yaml).unwrap(); | ||
| let pipe_cfg = &config.pipelines["default"]; | ||
| let mut pipeline = Pipeline::from_config("default", pipe_cfg, &test_meter(), None).unwrap(); | ||
| pipeline.set_batch_timeout(Duration::from_millis(10)); | ||
|
|
||
| let shutdown = CancellationToken::new(); | ||
| let sd = shutdown.clone(); | ||
| std::thread::spawn(move || { | ||
| std::thread::sleep(Duration::from_secs(3)); | ||
| sd.cancel(); | ||
| }); | ||
|
|
||
| let reg = Region::new(&GLOBAL); | ||
| pipeline.run(&shutdown).unwrap(); | ||
| let stats = reg.change(); | ||
|
|
||
| let rows = pipeline.metrics().batch_rows_total.load(Ordering::Relaxed); | ||
| let batches = pipeline.metrics().batches_total.load(Ordering::Relaxed); | ||
|
|
||
| (stats.bytes_allocated as u64, rows as usize, batches as usize) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider extracting shared test utilities.
measure_pipeline, test_meter(), and the YAML config generation are duplicated across allocation_e2e.rs and this file. A logfwd_test_utils helper (or expanding the existing one) would reduce maintenance burden.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/allocation_scaling.rs` around lines 23 - 60, Extract the
duplicated test helpers into a shared test utility module (e.g., expand
logfwd_test_utils) by moving the measure_pipeline implementation and any helper
functions it uses (such as test_meter and the YAML config generation) into that
module; update both allocation_scaling.rs and allocation_e2e.rs to call
logfwd_test_utils::measure_pipeline (or appropriately named exported functions)
and reuse Config::load_str/Pipeline::from_config usage from the helper so tests
import the common utility instead of duplicating the same code.
| let rows = pipeline.metrics().batch_rows_total.load(Ordering::Relaxed); | ||
| let batches = pipeline.metrics().batches_total.load(Ordering::Relaxed); | ||
|
|
||
| (stats.bytes_allocated as u64, rows as usize, batches as usize) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Truncation from u64 to usize on 32-bit targets.
batch_rows_total is u64 but returned as usize. On 32-bit platforms processing >4B rows this would truncate. Unlikely in tests, but the cast is lossy.
Suggested fix
- (stats.bytes_allocated as u64, rows as usize, batches as usize)
+ (stats.bytes_allocated as u64, rows, batches)And update the return type to (u64, u64, u64) with corresponding changes in the caller.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/allocation_scaling.rs` around lines 56 - 59, The test
truncates u64 counters to usize by casting pipeline.metrics().batch_rows_total
and .batches_total to usize; change the function/test to return (u64, u64, u64)
instead of (u64, usize, usize) and remove the lossy casts so you return
stats.bytes_allocated as u64,
pipeline.metrics().batch_rows_total.load(Ordering::Relaxed) and
pipeline.metrics().batches_total.load(Ordering::Relaxed) directly, and update
any callers of this test helper to accept the new (u64, u64, u64) tuple.
SessionContext, built-in functions, and custom UDFs are now created once and reused across all batches. Only the `logs` MemTable is swapped per batch (deregister + register). This eliminates per-batch overhead of: - SessionContext construction (catalog, optimizer rules, built-in fns) - 4-5 UDF registrations (int, float, regexp_extract, grok, geo_lookup) Schema changes are handled automatically: MemTable is recreated with the batch's schema each call. The SessionContext itself is schema-agnostic. Context is created lazily on first execute() so that set_geo_database() and add_enrichment_table() can be called after SqlTransform::new(). Fixes #465. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-transform/src/lib.rs`:
- Around line 511-515: The public doc comments for the SessionContext/UDF
caching are inaccurate: they state only the `logs` MemTable is swapped per batch
but `SqlTransform::execute()` also refreshes enrichment tables; update the doc
blocks (the ones describing SessionContext and UDF behavior near the current
comments and the duplicate block around lines 569–575) to accurately state that
while SessionContext, UDFs, and built-in functions are reused across batches,
the `logs` MemTable and all enrichment tables are refreshed
(deregistered+registered) per batch by `SqlTransform::execute()` so the cache
behavior is correctly described.
- Around line 525-526: The cached SessionContext field ctx prevents
ensure_context() from re-registering geo_lookup() after a geo DB change; update
SqlTransform::set_geo_database() to assign the new geo_database and reset ctx to
None so ensure_context() will rerun registration (i.e., set self.geo_database =
Some(db) and self.ctx = None). Also apply the same ctx-reset behavior in any
other setters that replace the geo DB (see analogous code around lines 645-667)
so the cached session is invalidated whenever geo_database changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f78118f4-705a-4903-bdc3-4e4d65166f28
📒 Files selected for processing (1)
crates/logfwd-transform/src/lib.rs
Fixes from code review: - Build MemTable before deregister_table to avoid leaving context without a logs table on MemTable construction failure - Add comment explaining enrichment table None case (deregister is correct — stale data is worse than a clear error) New tests: - test_cached_context_no_data_leakage: 3 batches with different data through a WHERE filter, verifying no cross-batch contamination - test_cached_context_many_batches: 20 consecutive batches on the same SqlTransform, verifying deregister/register cycle is stable Also closes #498 (tokio runtime per batch) — not a real issue: execute_blocking's Err branch is essential for tests/benchmarks, and production uses execute().await directly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…t caching SessionContext fixes: - Invalidate cached context when geo database changes (set_geo_database now sets ctx = None so UDF is re-registered with new db) - Fix doc comment to mention enrichment table swapping, not just logs - Build MemTable before deregister to avoid broken state on error Test improvements: - Remove clone noise from streaming scanner measurements (pre-build Bytes outside Region) - Rename scanner_allocs test to reflect it tests both storage and streaming - Derive polls_remaining from constants instead of magic number 45 - Handle zero-allocation case in OTLP encode test - Fix doc comment claiming two-window but doing one aggregate - Increase timeouts for CI safety (10s for e2e, 5s for scaling) - Add 20% margin to scaling assertions for CI variance - Run cargo fmt Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace fixed sleep timeouts (5-10s) with metrics-driven cancellation: poll batch_rows_total until all expected rows are processed, then cancel immediately. This makes tests deterministic and fast: Scaling test: 90s → 5s (3 measurements, each cancels after processing) E2E test: 10s → 0.6s (cancels as soon as 50K rows complete) The pipeline's tailer thread stays alive after EOF (polling for new data), so time-based cancellation was the only exit path. The new approach polls metrics from a watchdog thread and cancels as soon as processing completes, with a 30s safety deadline for CI. Also removes unused NullSink import. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Blocker fix: combine multiple #[test] functions into single test
functions per binary. stats_alloc's global allocator and dhat's global
profiler both cause measurement interference or panics when tests run
in parallel (cargo test's default).
Scanner tests: 3 tests → 1 (scanner_allocation_regression)
Output tests: 2 tests → 1 (output_encoding_allocation_regression)
Input tests: 2 tests → 1 (framed_input_allocation_behavior)
Each test function uses scoped blocks {} to isolate resources while
sharing the single global allocator/profiler instance.
Also:
- Restore enrichment table "not loaded yet" warning (behavioral change
from old code that was silently removed)
- Add doc comment explaining why add_enrichment_table doesn't
invalidate cached context (asymmetry with set_geo_database)
- Add 0/0 early-return guards to scanner leak tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Split combined allocation test functions back into separate #[test] functions with #[serial] attribute. This gives: - Individual test names in cargo test output - Each test independently runnable and reportable - Clear failure identification (which test failed, not which "Part N") serial_test ensures tests sharing a global allocator (stats_alloc) or global profiler (dhat) run sequentially within each test binary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@coderabbitai please do a full review |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/logfwd-transform/src/lib.rs (2)
630-650:⚠️ Potential issue | 🟠 MajorAvoid replanning the query on the empty-result path.
Line 645 calls
ctx.sql(sql)a second time just to recover the output schema. For filters that often produce zero-row batches, that reparses and replans the query on every execute, which gives back a noticeable chunk of the win from cachingSessionContext. Capturedf.schema()beforecollect()and reuse it whenbatches.len() == 0.Proposed fix
let df = ctx .sql(sql) .await .map_err(|e| format!("SQL execution error: {e}"))?; + let output_schema = Arc::clone(df.schema().inner()); let batches = df .collect() .await .map_err(|e| format!("Failed to collect results: {e}"))?; // Concat all result batches into one. match batches.len() { - 0 => { - let df2 = ctx - .sql(sql) - .await - .map_err(|e| format!("SQL schema error: {e}"))?; - let df_schema = df2.schema(); - Ok(RecordBatch::new_empty(Arc::clone(df_schema.inner()))) - } + 0 => Ok(RecordBatch::new_empty(output_schema)), 1 => Ok(batches.into_iter().next().expect("verified len==1")), _ => { let schema = batches[0].schema();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-transform/src/lib.rs` around lines 630 - 650, The code re-runs ctx.sql(sql) when batches.len() == 0 which reparses/replans the query; instead capture the DataFrame schema before calling collect() and reuse it for the empty-result path. Concretely: after let df = ctx.sql(&self.user_sql).await...?;, call let df_schema = df.schema() (or otherwise clone/arc it) before df.collect(), then when batches.len() == 0 construct the empty RecordBatch from df_schema (used in RecordBatch::new_empty(...) currently) rather than calling ctx.sql(...) again. Ensure you reference the existing symbols df, df.schema(), collect(), self.user_sql and RecordBatch::new_empty to locate and update the logic.
607-627:⚠️ Potential issue | 🟠 MajorMissing enrichment snapshots will now spam stderr on every batch.
Because Line 612 forces every batch through the refresh path, a table with
snapshot() == Nonewill hit the existingeprintln!on every execute until it loads. That turns a transient startup condition into hot-path I/O and formatting work. Please log once on availability transitions, or move this warning out ofexecute().As per coding guidelines,
Do not introduce allocations in hot paths without benchmarking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-transform/src/lib.rs` around lines 607 - 627, The eprintln! in the execute() refresh loop causes repeated stderr allocations for tables whose snapshot() == None; update the logic around self.enrichment_tables/enrichment table handling (the loop that calls ctx.deregister_table, et.snapshot(), MemTable::try_new, ctx.register_table) to avoid hot-path formatting: add a lightweight state check (e.g., a Set or per-table boolean on the struct that tracks whether we already reported "not yet loaded") and only emit the warning once on a transition (loaded->unloaded or unloaded->loaded), or move the warning emission out of execute() to a non-hot path that runs on availability events; ensure the change references enrichment_tables, snapshot(), execute(), and the existing eprintln! call so the warning is only logged on state transitions and not on every batch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/tests/allocation_churn.rs`:
- Around line 100-118: Before taking the heap snapshot, drop test fixtures that
remain live: after drop(framed) explicitly drop chunk and stats (the
Arc<ComponentStats>) — and if source is still in scope, drop it too — so that
dhat::HeapStats::get() only measures allocations from FramedInput; locate the
chunk, stats, source, framed (created via FramedInput::new) and the final_stats
= dhat::HeapStats::get() call and insert drop(chunk); drop(stats); (and
drop(source); if applicable) before calling dhat::HeapStats::get().
In `@crates/logfwd/tests/allocation_e2e.rs`:
- Around line 68-95: The test currently only asserts rows > 0 after
pipeline.run(), which lets the 30s safety timeout produce a partial sample;
change this to assert that the observed row count equals the intended
expected_rows so timeouts fail explicitly: after computing let rows =
pipeline.metrics().batch_rows_total.load(Ordering::Relaxed); replace or add the
assertion to assert_eq!(rows, expected_rows, "pipeline processed wrong number of
rows (timeout?)"); keep the existing batches > 0 check; this ensures the
postcondition for evaluating allocation thresholds is the full 50K-row workload
rather than a partial run.
In `@crates/logfwd/tests/allocation_scaling.rs`:
- Around line 115-120: The assertion currently allows per_row_500k to be up to
10% worse than per_row_10k because it tests per_row_500k < per_row_10k * 1.1;
change the check in the assertion that compares per_row_500k and per_row_10k to
require the 500K run to be at most 90% of the 10K baseline (use multiplier 0.9
instead of 1.1) so the test enforces “10% below baseline” as described; update
the assert condition and keep the failure message referencing per_row_500k and
per_row_10k unchanged or adjust wording to match the stricter comparison.
- Around line 51-82: The test currently treats a 30s safety timeout as a
"successful" sample by returning partial metrics; update the measurement to fail
instead by asserting that the processed rows equal the intended workload: after
calling pipeline.run(&shutdown).unwrap(), load rows via
pipeline.metrics().batch_rows_total (the existing rows variable) and assert rows
== row_count as u64 (or the expected variable used in measure_pipeline), causing
the test to panic on timeout; reference the measure_pipeline function / test
harness, the pipeline.run(&shutdown) call, and the
pipeline.metrics().batch_rows_total/batches_total reads to locate where to add
the assertion.
---
Outside diff comments:
In `@crates/logfwd-transform/src/lib.rs`:
- Around line 630-650: The code re-runs ctx.sql(sql) when batches.len() == 0
which reparses/replans the query; instead capture the DataFrame schema before
calling collect() and reuse it for the empty-result path. Concretely: after let
df = ctx.sql(&self.user_sql).await...?;, call let df_schema = df.schema() (or
otherwise clone/arc it) before df.collect(), then when batches.len() == 0
construct the empty RecordBatch from df_schema (used in
RecordBatch::new_empty(...) currently) rather than calling ctx.sql(...) again.
Ensure you reference the existing symbols df, df.schema(), collect(),
self.user_sql and RecordBatch::new_empty to locate and update the logic.
- Around line 607-627: The eprintln! in the execute() refresh loop causes
repeated stderr allocations for tables whose snapshot() == None; update the
logic around self.enrichment_tables/enrichment table handling (the loop that
calls ctx.deregister_table, et.snapshot(), MemTable::try_new,
ctx.register_table) to avoid hot-path formatting: add a lightweight state check
(e.g., a Set or per-table boolean on the struct that tracks whether we already
reported "not yet loaded") and only emit the warning once on a transition
(loaded->unloaded or unloaded->loaded), or move the warning emission out of
execute() to a non-hot path that runs on availability events; ensure the change
references enrichment_tables, snapshot(), execute(), and the existing eprintln!
call so the warning is only logged on state transitions and not on every batch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 215c4e90-ccf2-4bda-abd7-54b83ebc79db
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
crates/logfwd-arrow/Cargo.tomlcrates/logfwd-arrow/tests/allocation_regression.rscrates/logfwd-io/Cargo.tomlcrates/logfwd-io/tests/allocation_churn.rscrates/logfwd-output/Cargo.tomlcrates/logfwd-output/tests/allocation_regression.rscrates/logfwd-transform/src/lib.rscrates/logfwd/Cargo.tomlcrates/logfwd/tests/allocation_e2e.rscrates/logfwd/tests/allocation_scaling.rs
| // At 500K rows, per-row cost should be well below the 10K baseline. | ||
| // Use 10% margin — at 500K the fixed overhead is negligible. | ||
| assert!( | ||
| per_row_500k < per_row_10k * 1.1, | ||
| "per-row cost at 500K ({per_row_500k:.0}) should be below 10K baseline ({per_row_10k:.0})" | ||
| ); |
There was a problem hiding this comment.
This threshold allows the large-run regression the test claims to forbid.
per_row_500k < per_row_10k * 1.1 still passes when the 500K run is 9% worse than the 10K baseline. That contradicts both the comment and the test name. If the requirement is “10% below baseline,” the multiplier needs to be 0.9.
Proposed fix
assert!(
- per_row_500k < per_row_10k * 1.1,
+ per_row_500k < per_row_10k * 0.9,
"per-row cost at 500K ({per_row_500k:.0}) should be below 10K baseline ({per_row_10k:.0})"
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // At 500K rows, per-row cost should be well below the 10K baseline. | |
| // Use 10% margin — at 500K the fixed overhead is negligible. | |
| assert!( | |
| per_row_500k < per_row_10k * 1.1, | |
| "per-row cost at 500K ({per_row_500k:.0}) should be below 10K baseline ({per_row_10k:.0})" | |
| ); | |
| // At 500K rows, per-row cost should be well below the 10K baseline. | |
| // Use 10% margin — at 500K the fixed overhead is negligible. | |
| assert!( | |
| per_row_500k < per_row_10k * 0.9, | |
| "per-row cost at 500K ({per_row_500k:.0}) should be below 10K baseline ({per_row_10k:.0})" | |
| ); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/allocation_scaling.rs` around lines 115 - 120, The
assertion currently allows per_row_500k to be up to 10% worse than per_row_10k
because it tests per_row_500k < per_row_10k * 1.1; change the check in the
assertion that compares per_row_500k and per_row_10k to require the 500K run to
be at most 90% of the 10K baseline (use multiplier 0.9 instead of 1.1) so the
test enforces “10% below baseline” as described; update the assert condition and
keep the failure message referencing per_row_500k and per_row_10k unchanged or
adjust wording to match the stricter comparison.
…ures before leak check - Assert rows == expected after pipeline.run() in both e2e and scaling tests. If the safety timeout fires instead of data-driven cancellation, the test fails explicitly rather than producing misleading partial metrics. - Scaling assertion: per_row_500k must be strictly less than per_row_10k (was < per_row_10k * 1.1 which allowed 500K to be WORSE than 10K, contradicting the test's purpose). - Drop chunk and stats Arc before dhat::HeapStats::get() in leak test so we only measure FramedInput's residual allocations, not test fixtures. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Two changes: allocation regression test infrastructure and a performance fix
that caches the DataFusion SessionContext across batches.
Allocation regression tests
Per-component tests that catch memory leaks, buffer churn, and super-linear
allocation growth:
storage_scanner_no_leak_across_batchesstreaming_scanner_no_leak_across_batchesstorage_scanner_allocs_are_linear_not_quadraticframed_input_no_buffer_churnframed_input_no_leak_across_pollswrite_row_json_stable_across_batchesotlp_encode_stable_across_batchespipeline_allocations_stable_across_batchesallocation_per_row_decreases_with_scaleMeasured baseline: ~905 bytes/row asymptotic (500K rows) for ~120 bytes input JSON.
Uses
stats_allocanddhatin separate test binaries (global allocator constraint).Run allocation tests with
--test-threads=1.Cache DataFusion SessionContext (#465)
SessionContext, built-in functions, and custom UDFs are now created once and
reused across all batches. Only the
logsMemTable is swapped per batch(deregister + register).
Previously: every batch created a new SessionContext (catalog, optimizer rules,
hundreds of built-in functions) + registered 4-5 UDFs + created a new MemTable.
Now: context created lazily on first execute(). Per-batch cost is just
deregister + register of the
logstable.Safety: MemTable is built BEFORE deregister to avoid leaving the context without
a table on construction failure. Enrichment tables with no snapshot are
deregistered (stale data is worse than a clear error).
Also closes #498
execute_blocking's tokio runtime creation is NOT a per-batch issue inproduction — the pipeline calls
execute().awaitdirectly. TheErrbranch(temporary runtime) is essential for the 65+ unit tests that use
#[test].New dependencies (dev-only)
stats_alloc = "0.1"on logfwd-arrow, logfwd-output, logfwddhat = "0.3"on logfwd-ioTest plan
cargo test -p logfwd-transform— 84 tests pass (65 + 18 + 1 doctest)cargo test -p logfwd-arrow --test allocation_regression -- --test-threads=1— 3 passcargo test -p logfwd-io --test allocation_churn -- --test-threads=1— 2 passcargo test -p logfwd-output --test allocation_regression— 2 passcargo test -p logfwd --test allocation_e2e— 1 passcargo test -p logfwd --test allocation_scaling— 1 passcargo clippy -p logfwd-transform -- -D warnings— zero errors🤖 Generated with Claude Code