fix: improve file tailing reliability#805
Conversation
1. Checkpoint validation (#656): compare restored offsets with file sizes, reset to 0 on rotation/truncation with a warning. 2. Exclusive lock (#737): acquire flock on {data_dir}/logfwd.lock at startup to prevent duplicate instances. 3. Glob warnings (#730): warn when configured glob matches no files. Additional: replaced eprintln! with tracing macros in tail.rs, added unit tests for checkpoint validation and glob warnings, added integration test for exclusive lock. Co-Authored-By: Jules <noreply@google.com>
WalkthroughThis PR implements file-tailing reliability features and performs extensive code modernization. Key changes include: (1) checkpoint offset validation against current file size on startup, with automatic reset to offset 0 if stale; (2) exclusive lock file acquisition at startup to prevent duplicate instance processing; (3) glob pattern match warnings when configured file patterns yield no files; (4) column naming updates from Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 3 warnings)
✅ Passed checks (2 passed)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/logfwd/tests/compliance.rs (1)
150-168:⚠️ Potential issue | 🟠 MajorSource-specific verification can be bypassed when
source_idcolumn is missing.With non-empty
source_id,batch.column_by_name("source_id")returningNonemakessource_filterbecomeNone, so all rows from that batch are counted. That can produce false pass/fail in per-source compliance checks.🔧 Proposed fix
- let source_filter: Option<Vec<bool>> = if !source_id.is_empty() { - batch.column_by_name("source_id").map(|col| { + let source_filter: Option<Vec<bool>> = if !source_id.is_empty() { + Some(if let Some(col) = batch.column_by_name("source_id") { (0..batch.num_rows()) .map(|row| { if col.is_null(row) { return false; } let val = match col.data_type() { arrow::datatypes::DataType::Utf8 => col.as_string::<i32>().value(row), arrow::datatypes::DataType::Utf8View => col.as_string_view().value(row), _ => "", }; val == source_id }) .collect() - }) + } else { + vec![false; batch.num_rows()] + }) } else { None };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/tests/compliance.rs` around lines 150 - 168, The current logic sets source_filter to None when batch.column_by_name("source_id") returns None, which lets rows pass when source_id is non-empty; change this so that when source_id is non-empty but the "source_id" column is missing you produce Some(Vec<bool>) of length batch.num_rows() with all false (i.e., no rows match) instead of None; update the branch around batch.column_by_name("source_id") used to build source_filter (and preserve the existing null handling and UTF8/Utf8View matching logic) so missing column yields a false mask rather than allowing all rows.docs/CONFIG_REFERENCE.md (1)
137-143:⚠️ Potential issue | 🟡 MinorAdd
generatorinput type to documentation table.The
Generatorvariant exists in theInputTypeenum (implemented, used for benchmarking) but is missing from the Input types table in docs/CONFIG_REFERENCE.md. Add a row forgeneratorwith appropriate status and description.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/CONFIG_REFERENCE.md` around lines 137 - 143, The Input types table in CONFIG_REFERENCE.md is missing the Generator variant from the InputType enum; add a new table row for `generator` describing its status (e.g., Implemented) and a concise description like "Produce synthetic logs for benchmarking/testing" so documentation matches the implemented InputType::Generator used for benchmarking. Ensure the row format matches the existing table style (Value | Status | Description) and use the same backtick formatting for `generator`.
♻️ Duplicate comments (1)
crates/logfwd-io/tests/transport_e2e.rs (1)
159-159:⚠️ Potential issue | 🟡 MinorSame unnecessary change as Line 106.
Identical issue:
writeln!andwrite!with explicit\nproduce the same output. This change is unrelated to the PR scope and should be reverted.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/tests/transport_e2e.rs` at line 159, The change replaces writeln! with write!(client, "...\\n") in the test (the write!(client, "{\"client\":{id},\"seq\":{i}}}\\n").unwrap() call), which is unnecessary and out of scope; revert this to using writeln!(client, "{{\"client\":{id},\"seq\":{i}}}").unwrap() (restore the original writeln! invocation for the client write in the transport_e2e test) so behavior remains unchanged and the unrelated edit is removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-arrow/src/streaming_builder.rs`:
- Line 467: The change to the raw byte string delimiter on the variable `json`
in `streaming_builder.rs` is a cosmetic, unrelated edit; revert the line that
sets `let json = br#"not used directly"#;` back to the original `let json =
br"not used directly";` (or remove the unrelated test change entirely) so the PR
remains focused on tailing reliability; locate the `json` binding in the
`streaming_builder.rs` file (inside the Arrow builder code) and restore the
original delimiter form.
In `@crates/logfwd-core/Cargo.toml`:
- Line 16: Replace the direct pinned dev-dependency "arrow-json" in the
logfwd-core crate with a workspace-managed dependency: add an entry for
arrow-json (with the explicit version and default-features setting) under the
root [workspace.dependencies], then change the arrow-json line in the
logfwd-core Cargo.toml to reference the workspace (e.g., arrow-json = {
workspace = true }) so the crate uses the workspace-managed version instead of
pinning "54" locally.
In `@crates/logfwd-io/src/tail.rs`:
- Around line 247-249: The current logic only warns when the union of matches is
empty (using initial_paths.is_empty()), so change it to warn per-glob: iterate
over each pattern in patterns and call the existing warn_on_empty behavior for
each pattern when it yields no matches (use warn_on_empty = true), and remove
the aggregate initial_paths.is_empty() checks used at startup and in the rescan
path; ensure both the startup code and the rescan code invoke the per-pattern
warning path so each empty glob emits its own tracing::warn identifying the
failing pattern.
- Around line 1398-1417: The tests (e.g., test_glob_no_match_warning and the
related tests around FileTailer::new_with_globs) only assert tailer state and do
not verify that tracing::warn! was emitted; update the tests to install a test
tracing subscriber (or use the tracing_test helpers) that captures logs and
assert that a warning containing the expected message is emitted both at startup
and after rescan, and add a new mixed-pattern test that calls
FileTailer::new_with_globs with one matching glob and one non-matching glob to
assert the unmatched-glob warning is still logged; ensure you reference the
existing test names (test_glob_no_match_warning) and methods
(FileTailer::new_with_globs, tailer.poll) when adding assertions so tests fail
if the warnings are removed.
In `@crates/logfwd-io/tests/transport_e2e.rs`:
- Line 106: The change from writeln!(client, "{\"seq\":{i}}") to write!(client,
"{\"seq\":{i}}\\n") is unnecessary and unrelated to the PR goal; revert to using
writeln!(client, "{\"seq\":{i}}") (or otherwise restore the original writeln!
invocation that writes the formatted string with newline) where the client is
written to in transport_e2e.rs so the test code remains semantically identical
and focused on the tailing/checkpoint behaviors.
In `@crates/logfwd-transform/Cargo.toml`:
- Around line 11-16: The crate is pinning datafusion locally in
crates/logfwd-transform/Cargo.toml; instead add a datafusion entry with the
required features under the root [workspace.dependencies] and then replace the
local datafusion declaration here with an inherited dependency (e.g., datafusion
= { workspace = true, default-features = false } or similar) so this crate
(crates/logfwd-transform) uses the workspace-managed datafusion with the same
feature set (string_expressions, unicode_expressions, regex_expressions,
recursive_protection).
In `@crates/logfwd-transform/src/udf/grok.rs`:
- Line 309: The test uses the fully-qualified
arrow::record_batch::RecordBatch::try_new call even though RecordBatch is
already imported; replace arrow::record_batch::RecordBatch::try_new(schema,
vec![msgs]).unwrap() with the imported RecordBatch::try_new(schema,
vec![msgs]).unwrap() (i.e., use the existing RecordBatch symbol) to remove the
redundant qualification.
In `@crates/logfwd/tests/allocation_scaling.rs`:
- Line 54: The fixed 30s deadline in allocation_scaling.rs (the line assigning
let deadline = std::time::Instant::now() + Duration::from_secs(30)) makes the
500K scaling test flaky; change the timeout to be row-count-aware or overridable
via env var: compute the deadline using a base-per-row multiplier (e.g.,
duration_per_row * expected_row_count) with a minimum floor, and fall back to an
environment variable (e.g., TEST_TIMEOUT_MS) if provided, then replace the
current Duration::from_secs(30) use in the test's deadline calculation so the
timeout scales for large workloads while remaining bounded for small tests.
In `@crates/logfwd/tests/test_lock.rs`:
- Around line 5-68: The test_exclusive_lock test is fragile and platform-unsafe:
add a #[cfg(unix)] attribute to the test to avoid running on Windows; replace
the fixed std::thread::sleep(Duration::from_secs(10)) used after spawning child1
with a polling loop that checks for the lock file (e.g.,
data_dir.join("logfwd.lock")) or a readiness signal before proceeding (poll at
short intervals with a bounded number of attempts); enforce a hard timeout for
the polling so the test fails fast if the first instance never acquires the lock
and ensure child1 is killed/reaped in all failure paths to avoid zombies (use
child1.kill()/child1.wait() in the timeout cleanup).
- Around line 28-38: Replace launching the test helper with `cargo run` by
invoking the pre-built binary via the CARGO_BIN_EXE mechanism: use
env!("CARGO_BIN_EXE_logfwd") as the program passed to Command::new (the code
creating `child1`), and pass the same args (e.g., "--config",
config_path.to_str().unwrap()) to avoid recompilation in CI; ensure your crate
exposes a binary target (add a [[bin]] entry or otherwise build the binary in
test setup) so env!("CARGO_BIN_EXE_logfwd") is available at test runtime.
In `@docs/CONFIG_REFERENCE.md`:
- Line 422: The docs entry for the `diagnostics` option is incomplete and
misleading; update the description to list all active endpoints implemented by
the diagnostics router (e.g., `/health`, `/ready`, `/api/stats` served by
serve_stats, `/api/config` served by serve_config, `/api/logs`, `/api/history`,
`/api/traces`, etc.) and note that `/metrics` currently returns 410 Gone with a
removed-message rather than providing metrics; ensure the text references the
diagnostics route dispatcher (diagnostics) so readers can find the true
behavior.
In `@README.md`:
- Around line 161-178: The README example uses singular keys `input`/`output`
but the config struct is defined as `PipelineConfig` in
crates/logfwd-config/src/lib.rs with plural fields `inputs` and `outputs`, so
update the README snippet to use `inputs:` and `outputs:` (and similarly for the
`debug` block) to match `PipelineConfig`'s field names and allow serde
deserialization to succeed; ensure the YAML mirrors the plural usage used in
book/src/config/reference.md to keep docs consistent.
---
Outside diff comments:
In `@crates/logfwd/tests/compliance.rs`:
- Around line 150-168: The current logic sets source_filter to None when
batch.column_by_name("source_id") returns None, which lets rows pass when
source_id is non-empty; change this so that when source_id is non-empty but the
"source_id" column is missing you produce Some(Vec<bool>) of length
batch.num_rows() with all false (i.e., no rows match) instead of None; update
the branch around batch.column_by_name("source_id") used to build source_filter
(and preserve the existing null handling and UTF8/Utf8View matching logic) so
missing column yields a false mask rather than allowing all rows.
In `@docs/CONFIG_REFERENCE.md`:
- Around line 137-143: The Input types table in CONFIG_REFERENCE.md is missing
the Generator variant from the InputType enum; add a new table row for
`generator` describing its status (e.g., Implemented) and a concise description
like "Produce synthetic logs for benchmarking/testing" so documentation matches
the implemented InputType::Generator used for benchmarking. Ensure the row
format matches the existing table style (Value | Status | Description) and use
the same backtick formatting for `generator`.
---
Duplicate comments:
In `@crates/logfwd-io/tests/transport_e2e.rs`:
- Line 159: The change replaces writeln! with write!(client, "...\\n") in the
test (the write!(client, "{\"client\":{id},\"seq\":{i}}}\\n").unwrap() call),
which is unnecessary and out of scope; revert this to using writeln!(client,
"{{\"client\":{id},\"seq\":{i}}}").unwrap() (restore the original writeln!
invocation for the client write in the transport_e2e test) so behavior remains
unchanged and the unrelated edit is removed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: cf1d383a-b4d9-4350-a814-113b942c9b3c
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (55)
Cargo.tomlREADME.mdbook/src/README.mdbook/src/config/reference.mdbook/src/deployment/kubernetes.mdcrates/logfwd-arrow/src/streaming_builder.rscrates/logfwd-arrow/tests/allocation_regression.rscrates/logfwd-bench/benches/elasticsearch_arrow.rscrates/logfwd-bench/benches/pipeline.rscrates/logfwd-competitive-bench/src/main.rscrates/logfwd-competitive-bench/src/rate_bench.rscrates/logfwd-competitive-bench/src/runner.rscrates/logfwd-config/src/lib.rscrates/logfwd-core/Cargo.tomlcrates/logfwd-core/benches/scanner.rscrates/logfwd-core/examples/arrow_ipc_roundtrip.rscrates/logfwd-core/src/json_scanner.rscrates/logfwd-core/src/pipeline/lifecycle.rscrates/logfwd-core/tests/compliance_data.rscrates/logfwd-core/tests/scanner_conformance.rscrates/logfwd-io/src/diagnostics.rscrates/logfwd-io/src/enrichment.rscrates/logfwd-io/src/generator.rscrates/logfwd-io/src/metric_history.rscrates/logfwd-io/src/otlp_receiver.rscrates/logfwd-io/src/span_exporter.rscrates/logfwd-io/src/tail.rscrates/logfwd-io/src/tcp_input.rscrates/logfwd-io/tests/transport_e2e.rscrates/logfwd-output/src/elasticsearch.rscrates/logfwd-output/src/lib.rscrates/logfwd-output/src/loki.rscrates/logfwd-output/src/null.rscrates/logfwd-output/src/otlp_sink.rscrates/logfwd-output/tests/elasticsearch_arrow_ipc.rscrates/logfwd-transform/Cargo.tomlcrates/logfwd-transform/src/lib.rscrates/logfwd-transform/src/udf/grok.rscrates/logfwd/Cargo.tomlcrates/logfwd/src/main.rscrates/logfwd/src/pipeline.rscrates/logfwd/src/worker_pool.rscrates/logfwd/tests/allocation_e2e.rscrates/logfwd/tests/allocation_scaling.rscrates/logfwd/tests/compliance.rscrates/logfwd/tests/compliance_file.rscrates/logfwd/tests/integration.rscrates/logfwd/tests/test_lock.rsdev-docs/research/file-tailing-audit.mddocs/COLUMN_NAMING.mddocs/CONFIG_REFERENCE.mddocs/DEPLOYMENT.mddocs/TROUBLESHOOTING.mdexamples/elasticsearch/README.mdexamples/elasticsearch/with-auth-and-filter.yaml
💤 Files with no reviewable changes (1)
- dev-docs/research/file-tailing-audit.md
| #[test] | ||
| fn test_basic_string_and_int() { | ||
| let json = br"not used directly"; | ||
| let json = br#"not used directly"#; |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Cosmetic test change unrelated to PR objectives.
This raw byte string delimiter change (br"..." → br#"..."#) is purely syntactic with zero functional impact. The PR focuses on file tailing reliability (checkpoint validation, exclusive lock, glob warnings), but this file is in the Arrow builder crate. Consider whether unrelated cleanup belongs in a focused reliability fix.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-arrow/src/streaming_builder.rs` at line 467, The change to the
raw byte string delimiter on the variable `json` in `streaming_builder.rs` is a
cosmetic, unrelated edit; revert the line that sets `let json = br#"not used
directly"#;` back to the original `let json = br"not used directly";` (or remove
the unrelated test change entirely) so the PR remains focused on tailing
reliability; locate the `json` binding in the `streaming_builder.rs` file
(inside the Arrow builder code) and restore the original delimiter form.
| logfwd-test-utils = { version = "0.1.0", path = "../logfwd-test-utils" } | ||
| arrow = { workspace = true } | ||
| arrow-json = { workspace = true } | ||
| arrow-json = "54" |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Dev-dependency bypasses workspace management.
arrow-json = "54" is pinned directly instead of using { workspace = true }. As per coding guidelines, all new dependencies must be in [workspace.dependencies] with explicit version. This creates version drift risk when arrow is updated workspace-wide.
Suggested fix
Add to root Cargo.toml under [workspace.dependencies]:
arrow-json = { version = "54", default-features = false }Then here:
-arrow-json = "54"
+arrow-json = { workspace = true }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-core/Cargo.toml` at line 16, Replace the direct pinned
dev-dependency "arrow-json" in the logfwd-core crate with a workspace-managed
dependency: add an entry for arrow-json (with the explicit version and
default-features setting) under the root [workspace.dependencies], then change
the arrow-json line in the logfwd-core Cargo.toml to reference the workspace
(e.g., arrow-json = { workspace = true }) so the crate uses the
workspace-managed version instead of pinning "54" locally.
| if initial_paths.is_empty() && !patterns.is_empty() { | ||
| tracing::warn!("glob patterns match no files at startup: {:?}", patterns); | ||
| } |
There was a problem hiding this comment.
Warn per glob, not only when the whole pattern set is empty.
These branches only fire when the union of matches is empty. If one configured glob matches files and another matches none, the empty glob is silent, which misses the #730 behavior and hides which pattern is broken.
Suggested fix
-fn expand_glob_patterns(patterns: &[&str]) -> Vec<PathBuf> {
+fn expand_glob_patterns(patterns: &[&str], warn_on_empty: bool) -> Vec<PathBuf> {
let mut paths = Vec::new();
for pattern in patterns {
match glob::glob(pattern) {
Ok(entries) => {
- for entry in entries.flatten() {
- paths.push(entry);
+ let mut matched = false;
+ for entry in entries {
+ match entry {
+ Ok(path) => {
+ matched = true;
+ paths.push(path);
+ }
+ Err(err) => {
+ tracing::warn!("error expanding glob pattern {pattern:?}: {err}");
+ }
+ }
+ }
+ if warn_on_empty && !matched {
+ tracing::warn!("glob pattern matched no files: {pattern:?}");
}
}
Err(e) => {
tracing::warn!("invalid glob pattern {pattern:?}: {e}");
}
}
}
paths
}Then call it with warn_on_empty = true from both startup and rescan paths, and drop the aggregate is_empty() checks.
Also applies to: 280-285
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/tail.rs` around lines 247 - 249, The current logic only
warns when the union of matches is empty (using initial_paths.is_empty()), so
change it to warn per-glob: iterate over each pattern in patterns and call the
existing warn_on_empty behavior for each pattern when it yields no matches (use
warn_on_empty = true), and remove the aggregate initial_paths.is_empty() checks
used at startup and in the rescan path; ensure both the startup code and the
rescan code invoke the per-pattern warning path so each empty glob emits its own
tracing::warn identifying the failing pattern.
| #[test] | ||
| fn test_glob_no_match_warning() { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| let pattern = format!("{}/*.nonexistent", dir.path().display()); | ||
|
|
||
| let config = TailConfig { | ||
| poll_interval_ms: 10, | ||
| glob_rescan_interval_ms: 50, | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| // This should trigger the "match no files at startup" warning. | ||
| let mut tailer = FileTailer::new_with_globs(&[&pattern], config).unwrap(); | ||
| assert_eq!(tailer.num_files(), 0); | ||
|
|
||
| // Wait for rescan. This should trigger the "still match no files" warning. | ||
| std::thread::sleep(Duration::from_millis(150)); | ||
| tailer.poll().unwrap(); | ||
| assert_eq!(tailer.num_files(), 0); | ||
| } |
There was a problem hiding this comment.
These tests don't actually verify the new warnings.
Both tests validate state, but neither asserts the tracing::warn! side effect. A future regression could drop the warning emission and still pass. For the glob case, please also add a mixed-pattern scenario so one unmatched glob is covered even when another glob has matches.
Also applies to: 1419-1457
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/tail.rs` around lines 1398 - 1417, The tests (e.g.,
test_glob_no_match_warning and the related tests around
FileTailer::new_with_globs) only assert tailer state and do not verify that
tracing::warn! was emitted; update the tests to install a test tracing
subscriber (or use the tracing_test helpers) that captures logs and assert that
a warning containing the expected message is emitted both at startup and after
rescan, and add a new mixed-pattern test that calls FileTailer::new_with_globs
with one matching glob and one non-matching glob to assert the unmatched-glob
warning is still logged; ensure you reference the existing test names
(test_glob_no_match_warning) and methods (FileTailer::new_with_globs,
tailer.poll) when adding assertions so tests fail if the warnings are removed.
| let mut client = TcpStream::connect(addr).unwrap(); | ||
| for i in 0..100 { | ||
| writeln!(client, "{{\"seq\":{i}}}").unwrap(); | ||
| write!(client, "{{\"seq\":{i}}}\n").unwrap(); |
There was a problem hiding this comment.
Unnecessary change unrelated to PR objectives.
writeln!(client, "{{\"seq\":{i}}}") and write!(client, "{{\"seq\":{i}}}\n") produce identical output—both write the formatted string followed by \n. This change adds no functional value and is unrelated to the file tailing reliability objectives (checkpoint validation, exclusive lock, glob warnings). Consider reverting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/tests/transport_e2e.rs` at line 106, The change from
writeln!(client, "{\"seq\":{i}}") to write!(client, "{\"seq\":{i}}\\n") is
unnecessary and unrelated to the PR goal; revert to using writeln!(client,
"{\"seq\":{i}}") (or otherwise restore the original writeln! invocation that
writes the formatted string with newline) where the client is written to in
transport_e2e.rs so the test code remains semantically identical and focused on
the tailing/checkpoint behaviors.
| // This makes the test data-driven, not time-driven. | ||
| std::thread::spawn(move || { | ||
| let deadline = std::time::Instant::now() + Duration::from_secs(60); | ||
| let deadline = std::time::Instant::now() + Duration::from_secs(30); |
There was a problem hiding this comment.
30s fixed deadline can make the 500K scaling case flaky in slower CI environments.
This test hard-asserts full row completion; halving the safety window increases false failures. Prefer a row-count-aware timeout (or env-overridable timeout) to preserve reliability.
Suggested fix
- let deadline = std::time::Instant::now() + Duration::from_secs(30);
+ // Scale safety timeout with workload size to reduce CI flakiness.
+ let timeout_secs = ((row_count as u64 / 100_000).max(1) * 30).min(180);
+ let deadline = std::time::Instant::now() + Duration::from_secs(timeout_secs);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let deadline = std::time::Instant::now() + Duration::from_secs(30); | |
| // Scale safety timeout with workload size to reduce CI flakiness. | |
| let timeout_secs = ((row_count as u64 / 100_000).max(1) * 30).min(180); | |
| let deadline = std::time::Instant::now() + Duration::from_secs(timeout_secs); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/allocation_scaling.rs` at line 54, The fixed 30s deadline
in allocation_scaling.rs (the line assigning let deadline =
std::time::Instant::now() + Duration::from_secs(30)) makes the 500K scaling test
flaky; change the timeout to be row-count-aware or overridable via env var:
compute the deadline using a base-per-row multiplier (e.g., duration_per_row *
expected_row_count) with a minimum floor, and fall back to an environment
variable (e.g., TEST_TIMEOUT_MS) if provided, then replace the current
Duration::from_secs(30) use in the test's deadline calculation so the timeout
scales for large workloads while remaining bounded for small tests.
| #[test] | ||
| fn test_exclusive_lock() { | ||
| let dir = tempdir().unwrap(); | ||
| let data_dir = dir.path().to_path_buf(); | ||
|
|
||
| let config_path = dir.path().join("config.yaml"); | ||
| std::fs::write( | ||
| &config_path, | ||
| format!( | ||
| r#" | ||
| input: | ||
| type: generator | ||
| output: | ||
| type: "null" | ||
| storage: | ||
| data_dir: {} | ||
| "#, | ||
| data_dir.display() | ||
| ), | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| // Start the first instance. | ||
| let mut child1 = Command::new("cargo") | ||
| .args([ | ||
| "run", | ||
| "-p", | ||
| "logfwd", | ||
| "--", | ||
| "--config", | ||
| config_path.to_str().unwrap(), | ||
| ]) | ||
| .spawn() | ||
| .expect("failed to start first logfwd instance"); | ||
|
|
||
| // Wait a bit for it to start and acquire the lock. | ||
| std::thread::sleep(Duration::from_secs(10)); | ||
|
|
||
| // Try to start a second instance. | ||
| let output2 = Command::new("cargo") | ||
| .args([ | ||
| "run", | ||
| "-p", | ||
| "logfwd", | ||
| "--", | ||
| "--config", | ||
| config_path.to_str().unwrap(), | ||
| ]) | ||
| .output() | ||
| .expect("failed to start second logfwd instance"); | ||
|
|
||
| // Clean up first instance — wait after kill to reap the process and prevent zombies. | ||
| let _ = child1.kill(); | ||
| let _ = child1.wait(); | ||
|
|
||
| let stderr = String::from_utf8_lossy(&output2.stderr); | ||
| assert!( | ||
| stderr.contains("another instance of logfwd is already running"), | ||
| "expected error message about another instance, got stderr: {}\nstdout: {}", | ||
| stderr, | ||
| String::from_utf8_lossy(&output2.stdout) | ||
| ); | ||
| assert_eq!(output2.status.code(), Some(2), "expected exit code 2"); | ||
| } |
There was a problem hiding this comment.
Test has robustness concerns.
-
Unix-only but not gated: The flock implementation is Unix-only, but this test lacks
#[cfg(unix)]. It will fail on Windows. -
10-second sleep is fragile: If the binary isn't cached,
cargo runtriggers a rebuild. Consider polling for lock file existence or a readiness signal. -
No test timeout: If first instance never acquires lock, the test hangs indefinitely.
Suggested improvements
+#[cfg(unix)]
#[test]
fn test_exclusive_lock() {For robustness, consider polling:
// Poll for lock file instead of fixed sleep
let lock_path = data_dir.join("logfwd.lock");
for _ in 0..100 {
if lock_path.exists() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/test_lock.rs` around lines 5 - 68, The
test_exclusive_lock test is fragile and platform-unsafe: add a #[cfg(unix)]
attribute to the test to avoid running on Windows; replace the fixed
std::thread::sleep(Duration::from_secs(10)) used after spawning child1 with a
polling loop that checks for the lock file (e.g., data_dir.join("logfwd.lock"))
or a readiness signal before proceeding (poll at short intervals with a bounded
number of attempts); enforce a hard timeout for the polling so the test fails
fast if the first instance never acquires the lock and ensure child1 is
killed/reaped in all failure paths to avoid zombies (use
child1.kill()/child1.wait() in the timeout cleanup).
| let mut child1 = Command::new("cargo") | ||
| .args([ | ||
| "run", | ||
| "-p", | ||
| "logfwd", | ||
| "--", | ||
| "--config", | ||
| config_path.to_str().unwrap(), | ||
| ]) | ||
| .spawn() | ||
| .expect("failed to start first logfwd instance"); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider using pre-built binary path.
Using cargo run may trigger recompilation. For faster, more reliable CI:
let binary = env!("CARGO_BIN_EXE_logfwd");
Command::new(binary)
.args(["--config", config_path.to_str().unwrap()])
// ...This requires adding [[bin]] target or using assert_cmd crate patterns.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd/tests/test_lock.rs` around lines 28 - 38, Replace launching the
test helper with `cargo run` by invoking the pre-built binary via the
CARGO_BIN_EXE mechanism: use env!("CARGO_BIN_EXE_logfwd") as the program passed
to Command::new (the code creating `child1`), and pass the same args (e.g.,
"--config", config_path.to_str().unwrap()) to avoid recompilation in CI; ensure
your crate exposes a binary target (add a [[bin]] entry or otherwise build the
binary in test setup) so env!("CARGO_BIN_EXE_logfwd") is available at test
runtime.
| | Field | Type | Default | Description | | ||
| |-------|------|---------|-------------| | ||
| | `diagnostics` | string | none | `host:port` to listen for HTTP diagnostics. See [Diagnostics API](#diagnostics-api) for available routes. | | ||
| | `diagnostics` | string | none | `host:port` to listen for HTTP diagnostics. Exposes `/metrics` and `/api/pipelines`. | |
There was a problem hiding this comment.
Documentation inaccurately describes diagnostics endpoints.
The documentation claims diagnostics "exposes /metrics and /api/pipelines", but the code shows many additional active endpoints are served:
/health,/ready(health checks)/api/stats(pipeline statistics - uptime, line counts, bytes, timing)/api/config(configuration YAML)/api/logs(logs endpoint)/api/history(history endpoint)/api/traces(traces endpoint)
Additionally, /metrics returns HTTP 410 Gone with a message stating the endpoint was removed, not working metrics.
📝 Evidence from diagnostics.rs route handler
The route dispatcher in crates/logfwd-io/src/diagnostics.rs:468-500 shows all active endpoints, and serve_stats (lines 555-625) and serve_config (lines 627-638) implementations confirm these endpoints are fully functional.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/CONFIG_REFERENCE.md` at line 422, The docs entry for the `diagnostics`
option is incomplete and misleading; update the description to list all active
endpoints implemented by the diagnostics router (e.g., `/health`, `/ready`,
`/api/stats` served by serve_stats, `/api/config` served by serve_config,
`/api/logs`, `/api/history`, `/api/traces`, etc.) and note that `/metrics`
currently returns 410 Gone with a removed-message rather than providing metrics;
ensure the text references the diagnostics route dispatcher (diagnostics) so
readers can find the true behavior.
| input: | ||
| type: file | ||
| path: /var/log/pods/**/*.log | ||
| format: cri | ||
| transform: SELECT * FROM logs WHERE level_str = 'ERROR' | ||
| outputs: | ||
| output: | ||
| type: otlp | ||
| endpoint: http://otel-collector:4318 | ||
|
|
||
| debug: | ||
| inputs: | ||
| input: | ||
| type: file | ||
| path: /var/log/pods/**/*.log | ||
| format: cri | ||
| outputs: | ||
| output: | ||
| type: stdout | ||
| format: console | ||
| ``` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify PipelineConfig field names in the actual struct definition
rg -n -A5 'pub struct PipelineConfig' crates/logfwd-config/src/
# Check if there's any serde rename or alias for input/output
rg -n 'serde.*alias.*input|serde.*rename.*input' crates/logfwd-config/src/Repository: strawgate/memagent
Length of output: 546
🏁 Script executed:
# Find the deserialize_one_or_many function definition
rg -n -B2 -A10 'fn deserialize_one_or_many' crates/logfwd-config/src/Repository: strawgate/memagent
Length of output: 857
🏁 Script executed:
# Get the exact lines of PipelineConfig struct with all attributes
sed -n '264,270p' crates/logfwd-config/src/lib.rsRepository: strawgate/memagent
Length of output: 382
🏁 Script executed:
# Check for any serde alias or rename on the inputs/outputs fields
rg -B5 -A1 'pub inputs:|pub outputs:' crates/logfwd-config/src/lib.rsRepository: strawgate/memagent
Length of output: 483
Invalid YAML field names: input/output should be inputs/outputs.
The PipelineConfig struct in crates/logfwd-config/src/lib.rs defines plural field names (inputs, outputs) with no serde rename attributes. The example in README.md uses singular field names, which will fail to deserialize.
This creates inconsistency with book/src/config/reference.md (lines 40-63), which correctly uses plural forms.
Proposed fix
pipelines:
errors:
- input:
+ inputs:
+ - type: file
- type: file
- path: /var/log/pods/**/*.log
- format: cri
+ path: /var/log/pods/**/*.log
+ format: cri
transform: SELECT * FROM logs WHERE level_str = 'ERROR'
- output:
+ outputs:
+ - type: otlp
- type: otlp
- endpoint: http://otel-collector:4318
+ endpoint: http://otel-collector:4318
debug:
- input:
+ inputs:
+ - type: file
- type: file
- path: /var/log/pods/**/*.log
- format: cri
- output:
+ path: /var/log/pods/**/*.log
+ format: cri
+ outputs:
+ - type: stdout
- type: stdout
- format: console
+ format: console📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| input: | |
| type: file | |
| path: /var/log/pods/**/*.log | |
| format: cri | |
| transform: SELECT * FROM logs WHERE level_str = 'ERROR' | |
| outputs: | |
| output: | |
| type: otlp | |
| endpoint: http://otel-collector:4318 | |
| debug: | |
| inputs: | |
| input: | |
| type: file | |
| path: /var/log/pods/**/*.log | |
| format: cri | |
| outputs: | |
| output: | |
| type: stdout | |
| format: console | |
| ``` | |
| inputs: | |
| - type: file | |
| path: /var/log/pods/**/*.log | |
| format: cri | |
| transform: SELECT * FROM logs WHERE level_str = 'ERROR' | |
| outputs: | |
| - type: otlp | |
| endpoint: http://otel-collector:4318 | |
| debug: | |
| inputs: | |
| - type: file | |
| path: /var/log/pods/**/*.log | |
| format: cri | |
| outputs: | |
| - type: stdout | |
| format: console |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@README.md` around lines 161 - 178, The README example uses singular keys
`input`/`output` but the config struct is defined as `PipelineConfig` in
crates/logfwd-config/src/lib.rs with plural fields `inputs` and `outputs`, so
update the README snippet to use `inputs:` and `outputs:` (and similarly for the
`debug` block) to match `PipelineConfig`'s field names and allow serde
deserialization to succeed; ensure the YAML mirrors the plural usage used in
book/src/config/reference.md to keep docs consistent.
|
Superseded by focused PRs:
The 50+ unrelated file changes in this PR were from a rebase that pulled in master changes. |
Summary
Addresses three key reliability issues in the file tailing path:
flockon{data_dir}/logfwd.lockat startup to prevent duplicate instancesAdditional: replaced
eprintln!with tracing macros intail.rs, added unit tests for checkpoint validation and glob warnings, integration test for exclusive lock.Fixes #780
Test plan
cargo test -p logfwd-io— tail testscargo test -p logfwd --test test_lock— exclusive lock integration test🤖 Generated with Claude Code