refactor(logfwd-io): split FileTailer + Bytes buffer ownership#939
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 3 minutes and 55 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository YAML (base), Organization UI (inherited) Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (3)
WalkthroughFileTailer was refactored into two internal components: FileDiscovery (handles notify watcher registration, event draining, glob rescans, and detection of new/rotated paths) and FileReader (manages open file descriptors, byte reading, truncation/copytruncate handling, rotation/deletion draining, LRU eviction, and offset/checkpoint operations). FileTailer::poll() now drains notifications, optionally triggers glob rescans per TailConfig, runs discovery detection, then invokes FileReader to read tailed files, remove deleted entries, and evict to TailConfig::max_open_files. Public FileTailer methods remain unchanged; internal state moved into FileDiscovery/FileReader. Possibly related PRs
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
ApprovabilityVerdict: Needs human review 1 blocking correctness issue found. This refactor splits FileTailer into two internal components (FileDiscovery + FileReader) to separate concerns. While structurally the changes appear to preserve existing behavior, there are unresolved review comments identifying a potential data loss bug in set_offset_by_source (missing file size validation) and an inconsistent source_id attribution in drain_file that should be addressed before merging. You can customize Macroscope's approvability policy. Learn more. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/tail.rs`:
- Line 291: Add a brief comment immediately above the let watch_paths =
self.watch_paths.clone(); line explaining that the clone is done to avoid a
borrow-checker conflict when iterating while mutating reader.files, that it
occurs on the poll hot path but is necessary given current ownership, and that
its allocation cost has been considered (or benchmark justification noted);
reference the watch_paths.clone() use and the mutable iteration of reader.files
so future readers understand why the allocation is acceptable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: e68191b2-f801-463c-98a6-c9fad8ab4249
📒 Files selected for processing (1)
crates/logfwd-io/src/tail.rs
| reader: &mut FileReader, | ||
| events: &mut Vec<TailEvent>, | ||
| ) { | ||
| let watch_paths = self.watch_paths.clone(); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Hot-path allocation: watch_paths.clone() runs every poll cycle.
Per coding guidelines, allocations in hot-path loops require benchmark justification. The clone is necessary here due to borrow-checker constraints (iterating while mutating reader.files), but a brief comment explaining why this is acceptable would satisfy the guideline.
📝 Suggested comment
+ // Clone watch_paths to release `&self` borrow before mutating reader.
+ // Allocation is O(num_watched_paths) per poll; acceptable for typical file counts (<1000).
let watch_paths = self.watch_paths.clone();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let watch_paths = self.watch_paths.clone(); | |
| // Clone watch_paths to release `&self` borrow before mutating reader. | |
| // Allocation is O(num_watched_paths) per poll; acceptable for typical file counts (<1000). | |
| let watch_paths = self.watch_paths.clone(); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/tail.rs` at line 291, Add a brief comment immediately
above the let watch_paths = self.watch_paths.clone(); line explaining that the
clone is done to avoid a borrow-checker conflict when iterating while mutating
reader.files, that it occurs on the poll hot path but is necessary given current
ownership, and that its allocation cost has been considered (or benchmark
justification noted); reference the watch_paths.clone() use and the mutable
iteration of reader.files so future readers understand why the allocation is
acceptable.
Reflect the current state after #939: FileTailer is now composed of FileDiscovery + FileReader, all stage boundaries carry Bytes, and the buffer lifecycle section shows the honest copy count (3 remaining, down from 5) with the target from #608 (2 for passthrough, 1 unavoidable for CRI). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dev-docs/ARCHITECTURE.md`:
- Around line 40-52: Documentation and code disagree: FileReader still uses
Vec<u8> and InputEvent::Data carries Vec<u8> but docs describe BytesMut →
freeze() → Bytes; update the implementation so FileReader's buffer (read_buf) is
a BytesMut, have FileReader::read_new_data produce and return a Bytes (call
freeze() / into()) instead of Vec<u8>, and change InputEvent::Data to store
Bytes rather than Vec<u8> so the mpsc channel carries Bytes across threads;
update all call sites of read_new_data, InputEvent::Data construction, and any
type annotations to use Bytes/BytesMut (and ensure ownership/refcounting is
preserved when sending across the channel).
- Around line 57-58: The docs incorrectly state that FramedInput works with
Bytes; update the architecture docs to reflect that FramedInput::poll and
related structures use Vec<u8> (not Bytes). Specifically mention the actual
buffer types: FramedInput::out_buf: Vec<u8>, FramedInput::spare_buf: Vec<u8>,
SourceState.remainder: Vec<u8>, and InputEvent::Data { bytes: Vec<u8> }, and
change the diagram/text "newline-delimited JSON as Bytes" to "newline-delimited
JSON as Vec<u8>" (or equivalent wording) so the documentation matches the
implementation.
- Line 56: Update the fenced code block containing "Bytes → FramedInput::poll()
→ newline-delimited JSON as Bytes" to include a language specifier for syntax
highlighting (e.g., change the opening ``` to ```text); locate the fence in
ARCHITECTURE.md where that exact line appears and add the language identifier so
the block reads ```text followed by the line and closing ``` to enable proper
rendering.
- Around line 274-281: The ARCHITECTURE.md section claiming "BytesMut → freeze()
→ Bytes" and zero-copy behavior is inaccurate because the code still uses
Vec<u8> (e.g., InputEvent::Data is Vec<u8>, FramedInput fields are Vec<u8> and
there is no freeze()/Bytes transition); update the documentation to describe the
current Vec<u8> pipeline (replace BytesMut/Bytes and zero-copy claims with the
actual steps: Vec<u8> storage in InputEvent::Data, FramedInput using Vec<u8>,
copy points such as extend_from_slice and any copies in
FormatProcessor/pipeline) OR alternatively implement Phase 2 fully by converting
InputEvent::Data to Bytes, changing FramedInput fields to BytesMut and adding
the freeze() transition and zero-copy handoff; choose one approach and make the
docs and PR summary consistent with the chosen implementation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 55685c8c-d08c-411e-b438-8e47f5651cd3
📒 Files selected for processing (1)
dev-docs/ARCHITECTURE.md
|
|
||
| ### 2. Framing: raw bytes → complete lines | ||
|
|
||
| ``` |
There was a problem hiding this comment.
Add language specifier to fenced code block.
The code block lacks a language identifier for syntax highlighting.
📝 Proposed fix
-```
+```text
Bytes → FramedInput::poll() → newline-delimited JSON as Bytes
</details>
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.22.0)</summary>
[warning] 56-56: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
Verify each finding against the current code and only fix it if needed.
In @dev-docs/ARCHITECTURE.md at line 56, Update the fenced code block containing
"Bytes → FramedInput::poll() → newline-delimited JSON as Bytes" to include a
language specifier for syntax highlighting (e.g., change the opening ``` to
add the language identifier so the block reads ```text followed by the line and
closing ``` to enable proper rendering.
60b6820 to
399532a
Compare
Split the monolithic FileTailer into three components: - FileDiscovery: notify watcher, glob patterns, rotation detection, deleted-file cleanup - FileReader: open file descriptors, read buffer, read_new_data(), drain_file(), evict_lru(), offset queries - FileTailer: thin compositor with poll() orchestrating both No public API changes. All master features preserved: source_id on TailEvent variants, EvictedFile with identity verification (#817), evicted offsets in file_offsets()/file_paths() (#697), drain with source_id propagation. Also fixes pre-existing lint warnings: - clippy::needless_range_loop in star_schema scatter loop - unused_qualifications for UInt32Array in otap_receiver tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reflect the current state after the FileTailer split: FileTailer is now composed of FileDiscovery + FileReader. Update buffer lifecycle section to show current copy count and target architecture from #608. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
399532a to
aa73abb
Compare
| /// Drain remaining data from a single file, emitting Data/Truncated events. | ||
| /// | ||
| /// Factored from the repeated drain pattern used during rotation and | ||
| /// deletion cleanup. Preserves source_id on all emitted events. | ||
| fn drain_file( | ||
| &mut self, | ||
| path: &Path, | ||
| source_id: Option<SourceId>, | ||
| events: &mut Vec<TailEvent>, | ||
| ) { | ||
| match self.read_new_data(path) { | ||
| Ok(ReadResult::Data(data)) => { | ||
| events.push(TailEvent::Data { | ||
| path: path.to_path_buf(), | ||
| bytes: data, | ||
| source_id, | ||
| }); | ||
| } | ||
| Ok(ReadResult::TruncatedThenData(data)) => { | ||
| events.push(TailEvent::Truncated { | ||
| path: path.to_path_buf(), | ||
| source_id, | ||
| }); | ||
| events.push(TailEvent::Data { | ||
| path: path.to_path_buf(), | ||
| bytes: data, | ||
| source_id, | ||
| }); | ||
| let _ = self.files.remove(path); | ||
| if let Err(e) = self.open_file_at(path, false) { | ||
| tracing::warn!(path = %path.display(), error = %e, "tail.open_after_rotation_failed"); | ||
| } | ||
| } else if is_new { | ||
| if let Err(e) = self.open_file_at(path, false) { | ||
| tracing::warn!(path = %path.display(), error = %e, "tail.open_new_file_failed"); | ||
| } | ||
| } |
There was a problem hiding this comment.
🟢 Low src/tail.rs:560
When drain_file handles TruncatedThenData, it emits the Data event with the pre-read source_id rather than the post-read identity. Since read_new_data updates tailed.identity.fingerprint during truncation detection (lines 503-504), the data belongs to the new identity, but the event is tagged with the old one. This inconsistency with read_all (which uses a fresh source_id for the Data event at lines 640-645) means the final drained data after truncation is incorrectly attributed.
Ok(ReadResult::TruncatedThenData(data)) => {
events.push(TailEvent::Truncated {
path: path.to_path_buf(),
source_id,
});
+ let new_source_id = self.source_id_for_path(path);
events.push(TailEvent::Data {
path: path.to_path_buf(),
bytes: data,
- source_id,
+ source_id: new_source_id,
});🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file crates/logfwd-io/src/tail.rs around lines 560-588:
When `drain_file` handles `TruncatedThenData`, it emits the `Data` event with the pre-read `source_id` rather than the post-read identity. Since `read_new_data` updates `tailed.identity.fingerprint` during truncation detection (lines 503-504), the data belongs to the new identity, but the event is tagged with the old one. This inconsistency with `read_all` (which uses a fresh `source_id` for the `Data` event at lines 640-645) means the final drained data after truncation is incorrectly attributed.
Evidence trail:
- crates/logfwd-io/src/tail.rs lines 564-597: `drain_file` function uses same `source_id` parameter for both Truncated and Data events in TruncatedThenData case
- crates/logfwd-io/src/tail.rs lines 503-504: `read_new_data` updates `tailed.identity.fingerprint` during truncation
- crates/logfwd-io/src/tail.rs lines 632-645: `read_all` uses `pre_read_source_id` for Truncated but calls `self.source_id_for_path(&path)` for fresh source_id on Data event
- crates/logfwd-io/src/tail.rs lines 761-766: `source_id_for_path` returns source_id based on current tailed.identity
- crates/logfwd-io/src/tail.rs lines 340-346, 386-388: callers of `drain_file` capture source_id before calling it
There was a problem hiding this comment.
🟡 Medium
set_offset_by_source seeks to the saved offset without validating it against the current file size, unlike set_offset which resets to 0 when the offset exceeds file size (lines 718-729). When a file is truncated between runs, this causes all data between 0 and the stale offset to be skipped — the same data loss issue #656 fixed in set_offset. Consider adding the same file size validation before seeking.
fn set_offset_by_source(&mut self, source_id: SourceId, offset: u64) -> io::Result<()> {
for tailed in self.files.values_mut() {
if tailed.identity.source_id() == source_id {
- tailed.offset = offset;
- tailed.file.seek(SeekFrom::Start(offset))?;
+ let file_size = tailed.file.metadata()?.len();
+ let safe_offset = if offset > file_size {
+ 0
+ } else {
+ offset
+ };
+ tailed.offset = safe_offset;
+ tailed.file.seek(SeekFrom::Start(safe_offset))?;
return Ok(());
}
}
Ok(()) // source not found — file may not exist yet
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file crates/logfwd-io/src/tail.rs around lines 736-750:
`set_offset_by_source` seeks to the saved offset without validating it against the current file size, unlike `set_offset` which resets to 0 when the offset exceeds file size (lines 718-729). When a file is truncated between runs, this causes all data between 0 and the stale offset to be skipped — the same data loss issue #656 fixed in `set_offset`. Consider adding the same file size validation before seeking.
Evidence trail:
crates/logfwd-io/src/tail.rs lines 710-750 at REVIEWED_COMMIT:
- `set_offset` (lines 716-733) validates offset against file size with `let file_size = tailed.file.metadata()?.len()` and `if offset > file_size` check
- `set_offset_by_source` (lines 740-750) directly sets offset and seeks without any file size validation
- Comment on `set_offset` explicitly references #656 as the fix for this truncation scenario
The #939 merge changed logfwd-io to produce Bytes but pipeline.rs still used Vec<u8> — causing a wasteful Bytes→Vec→Bytes round-trip at the pipeline boundary. Changes: - Add bytes dependency to logfwd crate - ChannelMsg::Data carries Bytes instead of Vec<u8> - InputState.buf is BytesMut with split().freeze() on send - scan_buf is BytesMut with split().freeze() in flush_batch - Scanner receives Bytes directly (removes .into() conversion) Generator benchmark (3 runs each, drop warmup): master: 1,658K / 1,755K lines/sec refactor: 2,779K / 2,782K lines/sec (+60%) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix test assertions to use Bytes::from_static instead of vec![] - Correct ARCHITECTURE.md "Buffer lifecycle" section to reflect actual state: tailer/input/framed still use Vec<u8>, only pipeline.rs uses BytesMut/Bytes. Previous version overstated what #939 delivered. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The #939 merge changed logfwd-io to produce Bytes but pipeline.rs still used Vec<u8> — causing a wasteful Bytes→Vec→Bytes round-trip at the pipeline boundary. Changes: - Add bytes dependency to logfwd crate - ChannelMsg::Data carries Bytes instead of Vec<u8> - InputState.buf is BytesMut with split().freeze() on send - scan_buf is BytesMut with split().freeze() in flush_batch - Scanner receives Bytes directly (removes .into() conversion) Generator benchmark (3 runs each, drop warmup): master: 1,658K / 1,755K lines/sec refactor: 2,779K / 2,782K lines/sec (+60%) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix test assertions to use Bytes::from_static instead of vec![] - Correct ARCHITECTURE.md "Buffer lifecycle" section to reflect actual state: tailer/input/framed still use Vec<u8>, only pipeline.rs uses BytesMut/Bytes. Previous version overstated what #939 delivered. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Two phases of the buffer ownership refactor (#608) in a single coherent change:
Phase 1 — Split FileTailer into
FileDiscovery(path watching, glob evaluation, rotation detection, LRU eviction) andFileReader(open file descriptors, byte reading, offset tracking).FileTailercomposes both withpoll()as a clean orchestrator. No public API changes.Phase 2 — Replace
Vec<u8>withbytes::Bytesat every stage boundary from tailer to scanner.TailEvent,InputEvent,ChannelMsgall carryBytes.FileReader::read_new_data()usesBytesMut+freeze(). Pipeline accumulation buffers useBytesMutwithsplit().freeze(). All input sources (tcp, udp, generator, otlp_receiver) produceBytes.logfwd-coreuntouched — scanner takes&[u8]viaBytes::Deref.What this eliminates
Per #608's copy chain (5 copies disk→scanner):
BytesMut+freeze()extend_from_slicestillprocess_linesstill copiesBytesMutreuses capacitysplit().freeze()avoids allocThis PR takes us from 5 copies to ~3. The remaining copies (2 and 3) require FramedInput to use
Bytes::slice()for zero-copy newline framing — that's the follow-up work described in #608.What this does NOT do (follow-up work)
Bytes::slice()instead ofextend_from_sliceintoout_buf. Eliminates copies 2+3 for passthrough format. (perf: eliminate per-poll allocations in input→scanner data path #608)poll() → Vec<InputEvent>with streamingpoll_next() → Option<InputChunk>. (Phase 4 in plan)Why Bytes, not Vec ownership
Vec<u8>has single-owner move semantics but cannot be cheaply sub-sliced — every boundary requiresextend_from_slice.Bytesis immutable shared ownership (Arc<[u8]>internally) which enablesslice()for zero-copy framing. TheBytesMut → freeze() → Bytestransition is one-way and permanent — no mutation is possible after freeze. This is the same safety model as&mut T → &T, just with owned types that work across thread/async boundaries.The Arrow layer already requires
Bytes—StreamingBuilderstores the input buffer asBytesand ArrowStringViewArrayreferences offsets into it. The RecordBatch must keep the buffer alive through SQL transform and output serialization.Throughput impact
Test plan
cargo test -p logfwd-io— 184 tests pass (166 unit + 2 allocation + 16 integration)cargo test -p logfwd --lib— all tests passjust bench-self 15— 2.45M lines/sec (was 1.06M)just bench-otlp— full OTLP round-trip benchmarkjust profile-otlp-local🤖 Generated with Claude Code
Note
Split
FileTailerintoFileDiscoveryandFileReaderstructs in logfwd-ioRefactors tail.rs by decomposing the monolithic
FileTailerinto two focused structs:FileDiscovery(owns the filesystem watcher, glob patterns, and rotation/deletion detection) andFileReader(owns open file descriptors, I/O buffers, LRU eviction, and offset management).FileTaileris retained as a thin orchestrator that composes both structs and preserves the existing public API via delegation. Updates ARCHITECTURE.md to reflect the new layering. Behavioral Change:FileTailer::pollnow delegates each phase to the sub-structs in sequence; external API and event semantics are unchanged, but internal field paths in tests shift fromtailer.watch_pathstotailer.discovery.watch_paths.Macroscope summarized aa73abb.