feat: transport hardening — SO_RCVBUF, keepalive, e2e tests, Kani proofs#612
Conversation
…, Kani proofs Follow-up to #570 — production hardening for TCP/UDP/OTLP transports. Hardening: - TCP input: SO_KEEPALIVE (60s idle, 10s interval) via socket2, per-client idle timeout (60s), 1024 connection limit with proper drain, ECONNABORTED/ECONNRESET handling (#578) - TCP sink: connect timeout (5s), write-with-retry reconnect - UDP input: SO_RCVBUF 8MB via socket2 to reduce kernel drops (#577) - UDP sink: MTU-safe datagram batching (1400B), flush drains buffer - OTLP receiver: 10MB body limit (413), bounded channel (4096), JSON content-type support, proper HTTP status codes - Generator: complexity modes, non-blocking rate limiter, varied data - NullSink: atomic batch/row counters - Reject CRI/auto format on TCP/UDP inputs (would panic) E2E integration tests (12): - TCP: single line, multiple lines, partial line across reads, multiple clients, disconnect, large message, rapid connect/disconnect - UDP: single datagram, multiple datagrams, large datagram, no newline - OTLP: protobuf encode → POST → decode → verify JSON Formal verification (4 Kani proofs): - hex_encode: output length and character validity (≤4 bytes) - json_string_escaping: no unescaped quotes/backslashes (≤8 bytes) - strip_ansi: preserves non-escape chars, removes all escapes (≤12 bytes) Property-based tests (2 proptests): - Simple generator JSON always valid (1000 events) - Complex generator JSON always valid (500 events, nested objects) Addresses: #577, #578. Related: #576, #579, #580. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR adds a Possibly related PRs
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd/src/pipeline.rs (1)
602-622: 🧹 Nitpick | 🔵 TrivialAdd a config-level test for these new rejection paths.
This is now a transport invariant, but the tests in this file don’t pin the
Errcases yet. A smallfrom_config()test for TCP/UDP withformat: criandformat: autowould keep this from silently regressing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd/src/pipeline.rs` around lines 602 - 622, Add unit tests that assert from_config() rejects CRI/auto formats for UDP and TCP inputs: write two small tests invoking the pipeline creation path (call the from_config() function or the public builder that processes InputType::Udp and InputType::Tcp) with a config object where listen/addr is set and cfg.format is set to Format::Cri and Format::Auto respectively, then assert the result is Err and that the error string contains the transport-specific rejection message (e.g., "CRI/auto format is not supported for UDP inputs" and "CRI/auto format is not supported for TCP inputs"); reference the InputType::Udp, InputType::Tcp, and from_config symbols so tests target the same code paths that perform the matches!(cfg.format, Some(Format::Cri | Format::Auto)) checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/Cargo.toml`:
- Around line 29-31: Remove the duplicate dev-dependencies for
opentelemetry-proto and prost from the dev-dependencies block (they are already
regular dependencies), leaving only proptest as a dev-only crate; update the
dev-dependencies section so tests rely on the existing regular dependencies
(opentelemetry-proto, prost) and keep proptest under dev-dependencies to avoid
version/feature drift.
In `@crates/logfwd-io/src/generator.rs`:
- Around line 349-356: The tests currently only validate JSON when an
InputEvent::Data is present, so update the handling around generator.poll() to
assert that exactly one Data event was produced and that the number of parsed
lines equals the expected total before attempting JSON parsing: capture the
result of generator.poll(), ensure there is Some(InputEvent::Data { bytes })
(fail the test if not), convert bytes to text and split/collect lines and assert
lines.len() == total (or compare to the `total` variable), then iterate over the
lines to serde_json::from_str and preserve the existing panic message using
offset and i for context; apply the same checks to the other block around the
second poll() usage (the block at 375-382).
In `@crates/logfwd-io/src/otlp_receiver.rs`:
- Around line 487-512: The test json_string_escaping_produces_valid_json
incorrectly assumes write_json_string_field handles all control characters;
either make the writer escape the full control range (0x00..=0x1f) in
write_json_string_field so all control bytes are emitted as valid JSON escapes,
or tighten the Kani harness to only feed bytes that the current writer supports
(i.e., constrain the input bytes in the test to exclude control codes other than
'\n','\r','\t' and only allow legal UTF‑8 sequences). Reference the
json_string_escaping_produces_valid_json test and the write_json_string_field
implementation and apply one of these two fixes so the proof matches the
writer's actual behavior.
In `@crates/logfwd-io/src/tcp_input.rs`:
- Around line 77-84: The cloned socket (sock2) is leaked by mem::forget, so
remove std::mem::forget(sock2) and stop creating an owned Socket from
stream.try_clone(); instead use socket2::SockRef::from(&stream) to configure
keepalive (call set_keepalive and set_tcp_keepalive on the SockRef) so no
duplicate fd is owned or leaked; alternatively if you must own a Socket, drop it
normally after configuring rather than calling mem::forget — update the code
around sock2, keepalive, and the set_* calls to use SockRef::from(&stream) and
remove mem::forget.
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 31-40: Resolve addr using to_socket_addrs() first, pick the first
resolved std::net::SocketAddr to determine whether to create an IPv4 or IPv6
socket instead of hard-coding Domain::IPV4; then create the socket via
Socket::new(Domain::*, Type::DGRAM, Some(Protocol::UDP)), call
set_recv_buffer_size(RECV_BUF_SIZE) and set_nonblocking(true) as before, and
bind the socket using the resolved SocketAddr converted into socket2::SockAddr;
update the code paths that currently call Socket::new(Domain::IPV4, ...) and
parse::<SocketAddr>() so they use the resolved address and appropriate Domain
before calling bind on sock2.
In `@crates/logfwd-io/tests/transport_e2e.rs`:
- Around line 18-35: The current helper uses a fixed 80ms sleep and a hard-coded
poll count (settle() + poll_until_bytes) which slows tests and is flaky; change
poll_until_bytes to take a deadline/duration (or timeout) and poll immediately
in a loop until data is observed or the deadline is reached, using
Instant::now() to compute the deadline, call input.poll().unwrap() on each
iteration, append any InputEvent::Data bytes, and only sleep a short backoff
(e.g., a few ms) between retries to avoid busy-waiting; remove the settle() call
or limit it to an initial tiny backoff so tests run fast on local machines and
remain robust on CI.
---
Outside diff comments:
In `@crates/logfwd/src/pipeline.rs`:
- Around line 602-622: Add unit tests that assert from_config() rejects CRI/auto
formats for UDP and TCP inputs: write two small tests invoking the pipeline
creation path (call the from_config() function or the public builder that
processes InputType::Udp and InputType::Tcp) with a config object where
listen/addr is set and cfg.format is set to Format::Cri and Format::Auto
respectively, then assert the result is Err and that the error string contains
the transport-specific rejection message (e.g., "CRI/auto format is not
supported for UDP inputs" and "CRI/auto format is not supported for TCP
inputs"); reference the InputType::Udp, InputType::Tcp, and from_config symbols
so tests target the same code paths that perform the matches!(cfg.format,
Some(Format::Cri | Format::Auto)) checks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c2d13cf1-7a35-4e29-b8a3-0815aaa71feb
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
crates/logfwd-io/Cargo.tomlcrates/logfwd-io/src/generator.rscrates/logfwd-io/src/otlp_receiver.rscrates/logfwd-io/src/stderr_capture.rscrates/logfwd-io/src/tcp_input.rscrates/logfwd-io/src/udp_input.rscrates/logfwd-io/tests/transport_e2e.rscrates/logfwd/src/pipeline.rs
| let sock2 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; | ||
| // Tune kernel receive buffer to reduce packet loss under load. | ||
| let _ = sock2.set_recv_buffer_size(RECV_BUF_SIZE); // best-effort | ||
| sock2.set_nonblocking(true)?; | ||
| sock2.bind( | ||
| &addr | ||
| .parse::<std::net::SocketAddr>() | ||
| .map_err(io::Error::other)? | ||
| .into(), | ||
| )?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate and examine the udp_input.rs file
fd -t f "udp_input.rs"Repository: strawgate/memagent
Length of output: 95
🏁 Script executed:
# Read the target file around lines 31-40
cat -n crates/logfwd-io/src/udp_input.rs | head -60Repository: strawgate/memagent
Length of output: 2564
🏁 Script executed:
# Check imports and overall structure
cat -n crates/logfwd-io/src/udp_input.rsRepository: strawgate/memagent
Length of output: 7353
Resolve the bind address before choosing the socket domain.
This hard-codes an IPv4 socket and also narrows addr from ToSocketAddrs to a literal SocketAddr. parse::<SocketAddr>() does not perform DNS resolution, so hostnames like localhost:514 will fail. IPv6 addresses like [::1]:514 will also fail because the domain is hard-coded. Resolve the address first via to_socket_addrs(), detect the appropriate domain from the resolved SocketAddr, then bind the socket2::Socket.
🐛 Suggested fix
-use std::net::UdpSocket;
+use std::net::{ToSocketAddrs, UdpSocket};
@@
pub fn new(name: impl Into<String>, addr: &str) -> io::Result<Self> {
+ let addr = addr
+ .to_socket_addrs()?
+ .next()
+ .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no socket address resolved"))?;
+ let domain = if addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 };
+
// Use socket2 to create the socket so we can tune SO_RCVBUF *before*
// any datagrams arrive.
- let sock2 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
+ let sock2 = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
// Tune kernel receive buffer to reduce packet loss under load.
let _ = sock2.set_recv_buffer_size(RECV_BUF_SIZE); // best-effort
sock2.set_nonblocking(true)?;
- sock2.bind(
- &addr
- .parse::<std::net::SocketAddr>()
- .map_err(io::Error::other)?
- .into(),
- )?;
+ sock2.bind(&addr.into())?;
let socket: UdpSocket = sock2.into();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/udp_input.rs` around lines 31 - 40, Resolve addr using
to_socket_addrs() first, pick the first resolved std::net::SocketAddr to
determine whether to create an IPv4 or IPv6 socket instead of hard-coding
Domain::IPV4; then create the socket via Socket::new(Domain::*, Type::DGRAM,
Some(Protocol::UDP)), call set_recv_buffer_size(RECV_BUF_SIZE) and
set_nonblocking(true) as before, and bind the socket using the resolved
SocketAddr converted into socket2::SockAddr; update the code paths that
currently call Socket::new(Domain::IPV4, ...) and parse::<SocketAddr>() so they
use the resolved address and appropriate Domain before calling bind on sock2.
…rage PR feedback fixes: - TCP: fixed fd leak — SockRef instead of try_clone/mem::forget (#612 Critical) - TCP: added MAX_LINE_LENGTH (1MB) — disconnects clients exceeding it - TCP: configurable idle timeout via with_idle_timeout() - UDP: IPv6 support in socket construction - UDP: SO_RCVBUF readback verification, drop counter (AtomicU64) - OTLP: JSON control char escaping (0x00-0x1f → \uXXXX, RFC 8259) - OTLP: updated Kani proof to verify control char escaping - Generator: fixed proptests to assert Data was produced (not silently pass) - Removed duplicate dev-deps (opentelemetry-proto, prost already in deps) - Fixed otlp_oversized_body test (accept connection reset for 11MB body) New tests (14 added, 186 total): - tcp_idle_timeout: verify eviction after configurable timeout - tcp_max_line_length: verify disconnect for >1MB without newline - tcp_connection_storm: 100 rapid connect/disconnect, no fd leak - udp_recv_buffer_size: verify SO_RCVBUF actually applied - udp_high_volume: 10K datagrams, assert ≥90% arrival - udp_empty_datagram: no panic on 0-byte datagram - udp_socket_is_nonblocking: verify WouldBlock on empty recv - udp_drops_detected_starts_at_zero: counter init check - otlp_concurrent_requests: 10 simultaneous POSTs - otlp_oversized_body: 11MB body rejected - otlp_wrong_content_type: graceful handling - json_escaping_control_chars: \x00-\x1f escaped as \uXXXX - generator proptest assertions strengthened Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Linux CI drops more UDP datagrams under load than macOS localhost. 50% is still a meaningful assertion — below that indicates a real problem (no recv buffer, wrong socket, etc). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-io/src/otlp_receiver.rs (1)
462-469: 🧹 Nitpick | 🔵 Trivial
char::from_digit(...).unwrap_or('0')can never hit the fallback.
(b >> 4)and(b & 0xf)are always<= 15, sofrom_digit(..., 16)always returnsSome. Theunwrap_oris defensive but effectively dead code.If you want to remove the fallback and make the guarantee explicit:
- s.push(char::from_digit((b >> 4) as u32, 16).unwrap_or('0')); - s.push(char::from_digit((b & 0xf) as u32, 16).unwrap_or('0')); + // SAFETY: (b >> 4) and (b & 0xf) are always in 0..=15 + s.push(char::from_digit((b >> 4) as u32, 16).expect("nibble always valid")); + s.push(char::from_digit((b & 0xf) as u32, 16).expect("nibble always valid"));Or use the same
HEX_DIGITSlookup for consistency withwrite_json_string_field.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/otlp_receiver.rs` around lines 462 - 469, The encode function uses char::from_digit(..., 16).unwrap_or('0') but the fallback is unreachable because (b >> 4) and (b & 0xf) are always in 0..=15; remove the dead unwrap_or and either call char::from_digit(...).unwrap() or, for consistency with write_json_string_field, replace the conversion with the HEX_DIGITS lookup used elsewhere (e.g. index into HEX_DIGITS with (b >> 4) and (b & 0xf)) to produce hex chars in the encode function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/generator.rs`:
- Around line 436-443: Replace the silent catch-all match in the loop over
events with an explicit pattern check: instead of matching with a trailing `_ =>
{}` arm, use an `if let InputEvent::Data { bytes } = event { ... }` so only the
Data variant updates total_lines (or alternatively expand the match to handle
other InputEvent variants explicitly); update the loop that iterates over events
and references total_lines and InputEvent::Data to use this explicit pattern.
In `@crates/logfwd-io/src/tcp_input.rs`:
- Around line 138-155: Currently the code appends chunk into all_data before
enforcing MAX_LINE_LENGTH, so oversized final chunks are delivered; move the
length-check that uses client.bytes_since_newline and memchr::memrchr(b'\n',
chunk) to run before calling all_data.extend_from_slice(chunk) (and before
updating client.bytes_since_newline and client.last_data) so that if
bytes_since_newline + chunk_length (or the post-newline count) would exceed
MAX_LINE_LENGTH you mark alive[i] = false and break without appending; update
the logic around client.bytes_since_newline, client.last_data and any early
disconnect path to preserve correct state (use the same variables in the
referenced block: chunk, client, all_data, MAX_LINE_LENGTH, memchr::memrchr).
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 46-53: Move the call to set_recv_buffer_size before binding the
socket: after creating the socket with Socket::new (the sock2 variable) call
sock2.set_recv_buffer_size(RECV_BUF_SIZE) (best-effort) and then call
sock2.bind(&parsed_addr.into())?; afterward read back the actual buffer via
sock2.recv_buffer_size().unwrap_or(0). Remove or update the misleading comment
that says "Bind before setting buffer" so it reflects the correct order.
In `@crates/logfwd-io/tests/transport_e2e.rs`:
- Around line 460-467: The test currently only checks result.is_err() after
ureq::post(&url).header(...).send(&oversized); — change it to assert the
specific ureq error variant: call result.unwrap_err() and match against
ureq::Error::Status(413, _) to accept an HTTP 413 response, or
ureq::Error::Transport(_) (or other transport/connection error variants) to
accept a connection reset; fail the test if the error is any other variant.
Reference the result variable from the ureq::post(...).send(&oversized) call and
the ureq::Error::Status / ureq::Error::Transport variants when implementing the
match.
---
Outside diff comments:
In `@crates/logfwd-io/src/otlp_receiver.rs`:
- Around line 462-469: The encode function uses char::from_digit(...,
16).unwrap_or('0') but the fallback is unreachable because (b >> 4) and (b &
0xf) are always in 0..=15; remove the dead unwrap_or and either call
char::from_digit(...).unwrap() or, for consistency with write_json_string_field,
replace the conversion with the HEX_DIGITS lookup used elsewhere (e.g. index
into HEX_DIGITS with (b >> 4) and (b & 0xf)) to produce hex chars in the encode
function.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 21748055-0b1f-44f0-8daa-54731eb4a983
📒 Files selected for processing (6)
crates/logfwd-io/Cargo.tomlcrates/logfwd-io/src/generator.rscrates/logfwd-io/src/otlp_receiver.rscrates/logfwd-io/src/tcp_input.rscrates/logfwd-io/src/udp_input.rscrates/logfwd-io/tests/transport_e2e.rs
| for event in &events { | ||
| match event { | ||
| InputEvent::Data { bytes } => { | ||
| let text = String::from_utf8_lossy(bytes); | ||
| total_lines += text.trim().lines().count() as u64; | ||
| } | ||
| _ => {} | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Prefer exhaustive match or if let over empty _ => {}.
The empty arm silently ignores any future InputEvent variants. Since this is test code and InputEvent is likely small and stable, this is low-risk, but an explicit if let is cleaner.
♻️ Suggested change
- for event in &events {
- match event {
- InputEvent::Data { bytes } => {
- let text = String::from_utf8_lossy(bytes);
- total_lines += text.trim().lines().count() as u64;
- }
- _ => {}
- }
- }
+ for event in &events {
+ if let InputEvent::Data { bytes } = event {
+ let text = String::from_utf8_lossy(bytes);
+ total_lines += text.trim().lines().count() as u64;
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for event in &events { | |
| match event { | |
| InputEvent::Data { bytes } => { | |
| let text = String::from_utf8_lossy(bytes); | |
| total_lines += text.trim().lines().count() as u64; | |
| } | |
| _ => {} | |
| } | |
| for event in &events { | |
| if let InputEvent::Data { bytes } = event { | |
| let text = String::from_utf8_lossy(bytes); | |
| total_lines += text.trim().lines().count() as u64; | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/generator.rs` around lines 436 - 443, Replace the silent
catch-all match in the loop over events with an explicit pattern check: instead
of matching with a trailing `_ => {}` arm, use an `if let InputEvent::Data {
bytes } = event { ... }` so only the Data variant updates total_lines (or
alternatively expand the match to handle other InputEvent variants explicitly);
update the loop that iterates over events and references total_lines and
InputEvent::Data to use this explicit pattern.
There was a problem hiding this comment.
Valid point — will address when we add new InputEvent variants. Low risk since tests break loudly on new variants anyway.
There was a problem hiding this comment.
@strawgate — fair enough, the test-failure signal when a new variant arrives is a reasonable safety net. No action needed now.
🧠 Learnings used
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/**/*.rs : Avoid `.unwrap()` in production code paths; use `?` or `.expect("reason")` instead
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-core/**/*.rs : No per-line heap allocations in hot paths (scanner, format parser, CRI parser); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-output/**/*.rs : No per-line heap allocations in hot paths (OTLP encoder, compress); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: CODE_STYLE.md:0-0
Timestamp: 2026-04-01T03:02:58.925Z
Learning: Applies to **/*.rs : Use `#[non_exhaustive]` on all public enums so new variants aren't breaking
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: CODE_STYLE.md:0-0
Timestamp: 2026-04-01T03:02:58.925Z
Learning: Applies to **/*.rs : No comments restating the code. Avoid noise like `// increment counter` above `counter += 1`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: CODE_STYLE.md:0-0
Timestamp: 2026-04-01T03:02:58.925Z
Learning: Applies to **/*.rs : No `.unwrap()` in production paths. Use `?`, `.expect("reason")`, or `unwrap_or`.
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: CODE_STYLE.md:0-0
Timestamp: 2026-04-01T03:02:58.925Z
Learning: Applies to **/*.rs : No `#[should_panic]` — test the Result/Option return instead
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-arrow/**/*.rs : No per-line heap allocations in hot paths (builders, Arrow construction); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-core/**/*.rs : Implement Kani proofs for pure logic in logfwd-core; three-layer verification approach: Kani (scalar) → proptest (SIMD ≡ scalar) → Kani (consumers)
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: CODE_STYLE.md:0-0
Timestamp: 2026-04-01T03:02:58.925Z
Learning: Applies to **/*.rs : Use proptest for property-based testing of complex inputs
| // Bind before setting buffer — ensures the socket is valid. | ||
| sock2.bind(&parsed_addr.into())?; | ||
|
|
||
| // Tune kernel receive buffer to reduce packet loss under load. | ||
| let _ = sock2.set_recv_buffer_size(RECV_BUF_SIZE); // best-effort | ||
|
|
||
| // Read back actual buffer size — the OS may cap it. | ||
| let actual_recv_buf = sock2.recv_buffer_size().unwrap_or(0); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if socket2 documentation mentions bind order for SO_RCVBUF
rg -i "recv.*buffer" --type rust -A 5 -B 5 | head -60Repository: strawgate/memagent
Length of output: 3135
🏁 Script executed:
#!/bin/bash
# Check socket2 crate for SO_RCVBUF timing and inspect full UdpInput::new() function
cat -n crates/logfwd-io/src/udp_input.rs | head -80Repository: strawgate/memagent
Length of output: 3409
🏁 Script executed:
#!/bin/bash
# Search for socket2 version and any comments about buffer order in Cargo.toml
rg "socket2" crates/logfwd-io/Cargo.toml -A 2 -B 2Repository: strawgate/memagent
Length of output: 190
🏁 Script executed:
#!/bin/bash
# Check socket2 crate source/docs for SO_RCVBUF timing constraints
# Also search for any issue or TODO comments about this choice
rg -i "so_rcvbuf|recv.*buffer|set_recv" crates/ --type rust -B 3 -C 2 | head -100Repository: strawgate/memagent
Length of output: 2723
🌐 Web query:
socket2 0.5 rust SO_RCVBUF bind order timing
💡 Result:
In socket2 0.5 (and later versions), you can and should set SO_RCVBUF using Socket::set_recv_buffer_size before calling bind. The socket2 API is designed for configuring socket options on the unbound Socket instance prior to binding, as shown in official examples: create Socket with Socket::new, set options like set_recv_buffer_size, then bind. This follows standard socket programming practice where setsockopt(SOL_SOCKET, SO_RCVBUF) is called after socket but before bind to ensure the receive buffer is properly sized from the start. Setting it after bind is possible but may have delayed or no effect until buffers are reallocated, per underlying system behavior.
Citations:
- 1: https://docs.rs/socket2/^0.5.9/
- 2: https://docs.rs/socket2/0.6.1/src/socket2/lib.rs.html#108
- 3: https://cseweb.ucsd.edu/classes/sp22/cse223B-a/tribbler/socket2/struct.SockRef.html
- 4: https://docs.rs/socket2/latest/socket2/struct.Socket.html
- 5: https://internals.rust-lang.org/t/allow-udpsocket-to-set-socket-options-before-bind/21076
- 6: https://docs.rs/socket2/latest/src/socket2/socket.rs.html
Move set_recv_buffer_size() before bind() — socket2 requires socket option configuration before binding.
The socket2 API is designed to configure options on the unbound socket, then bind. Setting SO_RCVBUF after bind may have delayed or no effect depending on platform. Reorder to set the buffer size before binding:
let sock2 = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
let _ = sock2.set_recv_buffer_size(RECV_BUF_SIZE);
sock2.bind(&parsed_addr.into())?;
let actual_recv_buf = sock2.recv_buffer_size().unwrap_or(0);The comment "Bind before setting buffer — ensures the socket is valid" is incorrect; the socket is valid immediately after Socket::new().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/udp_input.rs` around lines 46 - 53, Move the call to
set_recv_buffer_size before binding the socket: after creating the socket with
Socket::new (the sock2 variable) call sock2.set_recv_buffer_size(RECV_BUF_SIZE)
(best-effort) and then call sock2.bind(&parsed_addr.into())?; afterward read
back the actual buffer via sock2.recv_buffer_size().unwrap_or(0). Remove or
update the misleading comment that says "Bind before setting buffer" so it
reflects the correct order.
| let result = ureq::post(&url) | ||
| .header("Content-Type", "application/x-protobuf") | ||
| .send(&oversized); | ||
|
|
||
| // Server should reject the oversized body — either 413 status or | ||
| // connection reset (server closes before we finish sending 11MB). | ||
| assert!(result.is_err(), "expected rejection for 11MB body"); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider asserting the specific error type for oversized body.
is_err() passes if the connection resets OR returns 413. If you want to distinguish, check for ureq::Error::StatusCode(413) when the server responds vs connection errors when it closes early.
// Server should reject the oversized body — either 413 status or
// connection reset (server closes before we finish sending 11MB).
- assert!(result.is_err(), "expected rejection for 11MB body");
+ match result {
+ Err(ureq::Error::StatusCode(413)) => {} // explicit rejection
+ Err(_) => {} // connection reset or other error is also acceptable
+ Ok(resp) => panic!("expected rejection for 11MB body, got {}", resp.status()),
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/tests/transport_e2e.rs` around lines 460 - 467, The test
currently only checks result.is_err() after
ureq::post(&url).header(...).send(&oversized); — change it to assert the
specific ureq error variant: call result.unwrap_err() and match against
ureq::Error::Status(413, _) to accept an HTTP 413 response, or
ureq::Error::Transport(_) (or other transport/connection error variants) to
accept a connection reset; fail the test if the error is any other variant.
Reference the result variable from the ureq::post(...).send(&oversized) call and
the ureq::Error::Status / ureq::Error::Transport variants when implementing the
match.
There was a problem hiding this comment.
Intentionally relaxed — ureq may return ConnectionReset (server closes before 11MB upload completes) or StatusCode(413) depending on timing. Both indicate rejection.
There was a problem hiding this comment.
@strawgate, understood — since both ConnectionReset and StatusCode(413) are valid rejection signals depending on whether the server closes before or after reading the full body, is_err() is the right level of assertion here. Makes sense to keep it relaxed.
🧠 Learnings used
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-output/**/*.rs : No per-line heap allocations in hot paths (OTLP encoder, compress); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-core/**/*.rs : No per-line heap allocations in hot paths (scanner, format parser, CRI parser); verify with `just ci`
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/logfwd-io/src/udp_input.rs (1)
35-50:⚠️ Potential issue | 🟠 Major
new()still narrows bind semantics and configuresSO_RCVBUFtoo late.
addr.parse()only accepts literalSocketAddrs, so hostnames likelocalhost:514now fail. This block also still binds before callingset_recv_buffer_size(), leaving the socket2 hardening incomplete. Resolve withto_socket_addrs(), derive the domain from the resolved address, setSO_RCVBUF, then bind.For Rust UDP sockets using `socket2`, should `Socket::set_recv_buffer_size()` be applied before `bind()`? Also, does `std::net::SocketAddr::from_str` accept hostnames like `localhost:514`, or is `ToSocketAddrs` required?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 35 - 50, The new() implementation currently uses addr.parse() (which rejects hostnames) and binds the socket before setting SO_RCVBUF; change it to resolve the provided addr with ToSocketAddrs (e.g., addr.to_socket_addrs()) and pick a concrete SocketAddr to derive Domain (use is_ipv4/is_ipv6 on that resolved addr), create the socket2::Socket with that Domain, call sock2.set_recv_buffer_size(RECV_BUF_SIZE) immediately after creating the socket (best-effort) and only then call sock2.bind(&resolved_addr.into()); update variable names like parsed_addr -> resolved_addr and keep using RECV_BUF_SIZE, set_recv_buffer_size, Socket::new, and bind to locate the fixes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 241-245: The test currently ignores send_to() errors but still
compares receiver success against a fixed 10_000 attempts; change the logic to
count only successful sends and derive the 50% threshold from that count.
Concretely: increment a sent_success (or sent_count) when
sender.send_to(msg.as_bytes(), addr) returns Ok, use sent_success instead of
10_000 when computing the required minimum (e.g. required = sent_success / 2),
and update the corresponding receiver assertion/check (the block that currently
compares received to 5_000/10_000) to compare received_count against
sent_success/2; apply this same change in the other mirrored section as well so
both places use successful-sends as the denominator.
- Around line 73-89: The UdpInput diagnostics (recv_buffer_size, drops_detected,
drops_counter) are never exposed because UdpInput is boxed as Box<dyn
InputSource> and InputSource lacks accessors; to fix, add diagnostic accessors
to the InputSource trait (e.g., methods like recv_buffer_size() -> usize and
drops_counter() -> Arc<AtomicU64> or a generic metrics() -> MetricsSnapshot) and
implement them on UdpInput (methods already present: recv_buffer_size,
drops_detected, drops_counter); then update the pipeline creation site that
boxes UdpInput (the code in pipeline.rs that constructs Box<dyn InputSource>) to
ensure the diagnostics propagate (or alternatively adapt pipeline.rs to extract
metrics from the concrete UdpInput before boxing and register them with the
pipeline metrics system), so these values are available to the production
abstraction and monitoring.
---
Duplicate comments:
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 35-50: The new() implementation currently uses addr.parse() (which
rejects hostnames) and binds the socket before setting SO_RCVBUF; change it to
resolve the provided addr with ToSocketAddrs (e.g., addr.to_socket_addrs()) and
pick a concrete SocketAddr to derive Domain (use is_ipv4/is_ipv6 on that
resolved addr), create the socket2::Socket with that Domain, call
sock2.set_recv_buffer_size(RECV_BUF_SIZE) immediately after creating the socket
(best-effort) and only then call sock2.bind(&resolved_addr.into()); update
variable names like parsed_addr -> resolved_addr and keep using RECV_BUF_SIZE,
set_recv_buffer_size, Socket::new, and bind to locate the fixes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2e192e0c-348c-4d90-8925-391db36ac0b5
📒 Files selected for processing (1)
crates/logfwd-io/src/udp_input.rs
| /// Returns the actual kernel receive buffer size (as reported by | ||
| /// `getsockopt`). Useful for diagnostics — compare with `RECV_BUF_SIZE` | ||
| /// to see if the OS capped the requested value. | ||
| pub fn recv_buffer_size(&self) -> usize { | ||
| self.actual_recv_buf | ||
| } | ||
|
|
||
| /// Returns the number of detected drop events (ENOBUFS / similar errors | ||
| /// observed during `recv`). This is a lower bound — the kernel may drop | ||
| /// packets silently without signalling ENOBUFS. | ||
| pub fn drops_detected(&self) -> u64 { | ||
| self.drops_detected.load(Ordering::Relaxed) | ||
| } | ||
|
|
||
| /// Returns a clone of the drops counter for external monitoring. | ||
| pub fn drops_counter(&self) -> Arc<AtomicU64> { | ||
| Arc::clone(&self.drops_detected) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
These diagnostics do not reach the production abstraction.
Lines 607-612 in crates/logfwd/src/pipeline.rs immediately box UdpInput as Box<dyn InputSource>, and Lines 19-29 in crates/logfwd-io/src/input.rs expose no way to read recv_buffer_size() or drops_counter(). In practice, the new counters are test-only unless they are wired into the trait or exported through pipeline metrics.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/udp_input.rs` around lines 73 - 89, The UdpInput
diagnostics (recv_buffer_size, drops_detected, drops_counter) are never exposed
because UdpInput is boxed as Box<dyn InputSource> and InputSource lacks
accessors; to fix, add diagnostic accessors to the InputSource trait (e.g.,
methods like recv_buffer_size() -> usize and drops_counter() -> Arc<AtomicU64>
or a generic metrics() -> MetricsSnapshot) and implement them on UdpInput
(methods already present: recv_buffer_size, drops_detected, drops_counter); then
update the pipeline creation site that boxes UdpInput (the code in pipeline.rs
that constructs Box<dyn InputSource>) to ensure the diagnostics propagate (or
alternatively adapt pipeline.rs to extract metrics from the concrete UdpInput
before boxing and register them with the pipeline metrics system), so these
values are available to the production abstraction and monitoring.
There was a problem hiding this comment.
Valid — the diagnostics (drops_detected, recv_buffer_size) are on UdpInput but not exposed through the InputSource trait. Tracked in #574 (transport vs parsing separation) which will introduce richer transport metadata.
There was a problem hiding this comment.
@strawgate, understood — acknowledged that these diagnostics are intentionally scoped to UdpInput for now and will be wired into the production abstraction as part of the transport/parsing separation in #574. I'll keep that context in mind for future reviews.
🧠 Learnings used
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-output/**/*.rs : No per-line heap allocations in hot paths (OTLP encoder, compress); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-core/**/*.rs : No per-line heap allocations in hot paths (scanner, format parser, CRI parser); verify with `just ci`
Learnt from: CR
Repo: strawgate/memagent PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-03-31T06:21:14.412Z
Learning: Applies to src/logfwd-arrow/**/*.rs : No per-line heap allocations in hot paths (builders, Arrow construction); verify with `just ci`
… threshold TCP max_line_length: move extend_from_slice AFTER the length check so the offending chunk is not appended to the output buffer when a client exceeds MAX_LINE_LENGTH. UDP high_volume test: base the 50% threshold on successful sends (not total attempts) since send_to can fail transiently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Linux CI kernel drops ~78% of 10K rapid-fire datagrams on localhost (only 2185 received). Fix: send 1000 in batches of 100 with 10ms pauses between batches to let the receiver drain. This tests the input's ability to handle volume without overwhelming the kernel. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
crates/logfwd-io/src/udp_input.rs (3)
35-40:⚠️ Potential issue | 🟠 MajorResolve
addrbefore selecting the socket domain.
addr.parse::<SocketAddr>()rejects hostnames, solocalhost:514now fails even thoughUdpSocket::bind(addr)previously accepted it. Resolve withToSocketAddrs, then deriveDomainfrom the resolvedSocketAddr.Suggested fix
-use std::net::UdpSocket; +use std::net::{ToSocketAddrs, UdpSocket}; @@ - let parsed_addr: std::net::SocketAddr = addr.parse().map_err(io::Error::other)?; + let parsed_addr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no socket address resolved"))?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 35 - 40, The code currently calls addr.parse() (parsed_addr) to choose Domain::IPV4/IPv6, which rejects hostnames like "localhost:514"; instead resolve the address using ToSocketAddrs (e.g., addr.to_socket_addrs()), take a resolved std::net::SocketAddr (first one), derive the Domain from that resolved SocketAddr (use is_ipv4()/is_ipv6() to pick Domain::IPV4 or Domain::IPV6), and then proceed to create/bind the socket (same place where UdpSocket::bind(addr) is used) so hostnames are accepted and domain selection matches the actual resolved address.
46-53:⚠️ Potential issue | 🟠 MajorSet
SO_RCVBUFbeforebind().Binding first leaves a startup window with the default kernel buffer, which undermines the hardening this PR is adding. Move
set_recv_buffer_size(RECV_BUF_SIZE)ahead ofbind()and update the comment.Suggested fix
- // Bind before setting buffer — ensures the socket is valid. - sock2.bind(&parsed_addr.into())?; - // Tune kernel receive buffer to reduce packet loss under load. let _ = sock2.set_recv_buffer_size(RECV_BUF_SIZE); // best-effort + sock2.bind(&parsed_addr.into())?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 46 - 53, Move the best-effort kernel receive buffer tuning ahead of binding: call sock2.set_recv_buffer_size(RECV_BUF_SIZE) before sock2.bind(&parsed_addr.into()), and update the comment to say we set SO_RCVBUF prior to bind to avoid a startup window with the default buffer; keep the subsequent read-back of the actual buffer size via sock2.recv_buffer_size().unwrap_or(0) and preserve the best-effort (ignore/set to let it fail) behavior.
236-274:⚠️ Potential issue | 🟡 MinorMake
udp_high_volumefail when the sender queues nothing.If every
send_to()fails,sentstays0and the final assertion passes trivially. Add an explicitsent > 0check; the comment on Line 236 should also say50%, not90%.Suggested fix
- // Send 10000 datagrams rapidly, verify at least 90% arrive. + // Send 10000 datagrams rapidly, verify at least 50% of successful sends arrive. @@ - let threshold = sent / 2; + assert!(sent > 0, "sender failed to queue any datagrams"); + let threshold = sent / 2;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 236 - 274, The test currently computes sent and may pass trivially if sent == 0; add an explicit assertion that at least one datagram was successfully queued (assert!(sent > 0, "no datagrams were sent")) before computing the threshold and the final check, and update the nearby comment that currently says "verify at least 90% arrive" to reflect the actual policy ("accept ≥50% of what was sent") so it's consistent with computing threshold = sent / 2 and the final assert comparing received >= threshold; refer to the variables sent, received, threshold and the poll/send loop in this udp_high_volume test block.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/tcp_input.rs`:
- Around line 44-48: The pipeline currently always constructs TcpInput via
TcpInput::new(), which hardcodes DEFAULT_IDLE_TIMEOUT; change the pipeline code
that invokes TcpInput::new() to read an optional idle_timeout from the YAML
pipeline config and call TcpInput::with_idle_timeout(name, addr, idle_timeout)
when provided (falling back to TcpInput::new() or DEFAULT_IDLE_TIMEOUT when
absent). Update the pipeline config parsing to accept an optional idle_timeout
field (seconds or duration string), validate/parse it, and pass the parsed value
into the TcpInput construction so operators can tune the idle timeout.
---
Duplicate comments:
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 35-40: The code currently calls addr.parse() (parsed_addr) to
choose Domain::IPV4/IPv6, which rejects hostnames like "localhost:514"; instead
resolve the address using ToSocketAddrs (e.g., addr.to_socket_addrs()), take a
resolved std::net::SocketAddr (first one), derive the Domain from that resolved
SocketAddr (use is_ipv4()/is_ipv6() to pick Domain::IPV4 or Domain::IPV6), and
then proceed to create/bind the socket (same place where UdpSocket::bind(addr)
is used) so hostnames are accepted and domain selection matches the actual
resolved address.
- Around line 46-53: Move the best-effort kernel receive buffer tuning ahead of
binding: call sock2.set_recv_buffer_size(RECV_BUF_SIZE) before
sock2.bind(&parsed_addr.into()), and update the comment to say we set SO_RCVBUF
prior to bind to avoid a startup window with the default buffer; keep the
subsequent read-back of the actual buffer size via
sock2.recv_buffer_size().unwrap_or(0) and preserve the best-effort (ignore/set
to let it fail) behavior.
- Around line 236-274: The test currently computes sent and may pass trivially
if sent == 0; add an explicit assertion that at least one datagram was
successfully queued (assert!(sent > 0, "no datagrams were sent")) before
computing the threshold and the final check, and update the nearby comment that
currently says "verify at least 90% arrive" to reflect the actual policy
("accept ≥50% of what was sent") so it's consistent with computing threshold =
sent / 2 and the final assert comparing received >= threshold; refer to the
variables sent, received, threshold and the poll/send loop in this
udp_high_volume test block.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 91d2ce8c-16f9-40a6-8fba-e386916b080e
📒 Files selected for processing (2)
crates/logfwd-io/src/tcp_input.rscrates/logfwd-io/src/udp_input.rs
| impl TcpInput { | ||
| /// Bind to `addr` (e.g. "0.0.0.0:5140"). | ||
| /// Bind to `addr` (e.g. "0.0.0.0:5140") with the default idle timeout. | ||
| pub fn new(name: impl Into<String>, addr: &str) -> io::Result<Self> { | ||
| Self::with_idle_timeout(name, addr, DEFAULT_IDLE_TIMEOUT) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider exposing idle_timeout in pipeline configuration.
The with_idle_timeout constructor exists but the pipeline (per pipeline.rs:623) always uses new() with the 60-second default. Adding idle_timeout as an optional YAML config field would let operators tune this for their traffic patterns.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/tcp_input.rs` around lines 44 - 48, The pipeline
currently always constructs TcpInput via TcpInput::new(), which hardcodes
DEFAULT_IDLE_TIMEOUT; change the pipeline code that invokes TcpInput::new() to
read an optional idle_timeout from the YAML pipeline config and call
TcpInput::with_idle_timeout(name, addr, idle_timeout) when provided (falling
back to TcpInput::new() or DEFAULT_IDLE_TIMEOUT when absent). Update the
pipeline config parsing to accept an optional idle_timeout field (seconds or
duration string), validate/parse it, and pass the parsed value into the TcpInput
construction so operators can tune the idle timeout.
There was a problem hiding this comment.
Will expose in pipeline config as part of the FramedInput refactor (#574) where transport options become first-class config.
There was a problem hiding this comment.
@strawgate, makes sense — the FramedInput refactor in #574 is the right place for this since transport options will be first-class config there. I'll skip flagging this in future reviews of this PR.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
crates/logfwd-io/src/udp_input.rs (2)
243-280:⚠️ Potential issue | 🟡 MinorUse an exact 50% check and fail if nothing was sent.
sent / 2rounds down, so odd values accept less than 50%, andsent == 0still lets the test pass without exercising the receiver.Suggested fix
- let threshold = sent / 2; - assert!( - received >= threshold, - "expected at least {threshold}/{sent} datagrams, got {received}" - ); + assert!(sent > 0, "sender failed to queue any datagrams"); + assert!( + received * 2 >= sent, + "expected at least 50% of successful sends, got {received}/{sent}" + );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 243 - 280, The test currently computes threshold as sent / 2 which floors odd counts and allows sent == 0 to pass; update the check to require at least 50% (use a ceiling for half, e.g. threshold = (sent + 1) / 2) and add an explicit assertion that sent > 0 to fail the test if nothing was sent (refer to the variables sent, threshold, received and the final assert block that compares received >= threshold).
35-50:⚠️ Potential issue | 🟠 MajorResolve
addrwithToSocketAddrsand applySO_RCVBUFbeforebind().Line 35 regresses
UdpSocket::bind(addr)behavior by accepting only literal socket addresses, so hostname configs likelocalhost:514now fail. Line 47 also binds beforeset_recv_buffer_size(), which leaves a startup window where the socket is already receiving with the default buffer size.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 35 - 50, The code currently parses addr with addr.parse() (only literal addresses) and calls sock2.bind(...) before set_recv_buffer_size(); fix this by resolving addr via ToSocketAddrs (e.g., call addr.to_socket_addrs() and pick an appropriate SocketAddr or iterate candidates), use the resolved SocketAddr to decide Domain (based on the first/selected SocketAddr), create the Socket with Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)), call sock2.set_recv_buffer_size(RECV_BUF_SIZE) before calling sock2.bind(&resolved_addr.into()), and handle the case of no resolved addresses by returning an io::Error; refer to parsed_addr, addr.parse(), Socket::new, set_recv_buffer_size, and bind in the updated flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 123-130: The ENOBUFS/ENOMEM branch inside the receive/poll loop
(where drops_detected.fetch_add is called) currently just continues and can spin
forever; change it to avoid busy-waiting by giving control back to the poller or
adding a short backoff: e.g., break/return from the inner receive loop (so
poll() can yield) or perform a non-blocking yield/sleep/backoff
(tokio::task::yield_now()/sleep or std::thread::yield_now()/short sleep
depending on context) after incrementing drops_detected; ensure the code still
continues draining if appropriate but does not loop-hot on persistent
buffer-pressure errors.
- Around line 315-324: The test in udp_input.rs currently measures wall-clock
elapsed around input.socket.recv and asserts it returned quickly; drop the
timing logic and instead assert the recv result indicates nonblocking by
matching the error to WouldBlock (e.g., check result.is_err() and matches
io::ErrorKind::WouldBlock or equivalent). Remove the start/elapsed/elapsed
assertion and keep only the recv call and a clear assertion that the error kind
is WouldBlock to avoid flaky CI.
---
Duplicate comments:
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 243-280: The test currently computes threshold as sent / 2 which
floors odd counts and allows sent == 0 to pass; update the check to require at
least 50% (use a ceiling for half, e.g. threshold = (sent + 1) / 2) and add an
explicit assertion that sent > 0 to fail the test if nothing was sent (refer to
the variables sent, threshold, received and the final assert block that compares
received >= threshold).
- Around line 35-50: The code currently parses addr with addr.parse() (only
literal addresses) and calls sock2.bind(...) before set_recv_buffer_size(); fix
this by resolving addr via ToSocketAddrs (e.g., call addr.to_socket_addrs() and
pick an appropriate SocketAddr or iterate candidates), use the resolved
SocketAddr to decide Domain (based on the first/selected SocketAddr), create the
Socket with Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)), call
sock2.set_recv_buffer_size(RECV_BUF_SIZE) before calling
sock2.bind(&resolved_addr.into()), and handle the case of no resolved addresses
by returning an io::Error; refer to parsed_addr, addr.parse(), Socket::new,
set_recv_buffer_size, and bind in the updated flow.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: bd53560e-cb1c-4d6b-b79e-3b24a6128c83
📒 Files selected for processing (1)
crates/logfwd-io/src/udp_input.rs
|
@coderabbitai please do a full review |
|
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-io/src/otlp_receiver.rs (1)
434-456:⚠️ Potential issue | 🟠 MajorEscape JSON keys too.
This only sanitizes
value.keyis still copied raw here, and raw inwrite_json_field()too. OTLP attribute/resource keys come from input, so a key containing"or control bytes will emit invalid NDJSON and drop the batch at scan time.🔐 Suggested direction
-fn write_json_string_field(out: &mut Vec<u8>, key: &str, value: &str) { - out.push(b'"'); - out.extend_from_slice(key.as_bytes()); - out.extend_from_slice(b"\":\""); - // ... - out.push(b'"'); +fn write_json_string_field(out: &mut Vec<u8>, key: &str, value: &str) { + write_json_string(out, key); + out.push(b':'); + write_json_string(out, value); }Then reuse the same
write_json_string()helper fromwrite_json_field()when emitting keys.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/otlp_receiver.rs` around lines 434 - 456, The key is written raw in write_json_string_field (and similarly in write_json_field), letting unescaped quotes/control bytes produce invalid NDJSON; change the implementation to escape the key the same way as the value by calling the existing write_json_string helper (or reusing its logic) instead of directly extending key.as_bytes(), and then write the ":" and escaped value as before—i.e., emit an escaped string for the key, emit the colon separator, then emit the escaped value using the same write_json_string logic so both key and value use identical RFC8259 escaping.
♻️ Duplicate comments (3)
crates/logfwd-io/src/generator.rs (1)
355-356:⚠️ Potential issue | 🟡 MinorAssert the exact batch size in both properties.
line_count >= 1still passes ifpoll()truncates the batch. Herebatch_size == total, so the property should requireline_count == total as usizebefore validating JSON.♻️ Suggested change
- let line_count = text.trim().lines().count(); - assert!(line_count >= 1, "expected at least 1 JSON line, got 0 (offset={offset})"); + let line_count = text.trim().lines().count(); + assert_eq!( + line_count, + total as usize, + "expected {total} JSON lines, got {line_count} (offset={offset})" + );Apply the same change in the complex-mode property.
Also applies to: 388-389
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/generator.rs` around lines 355 - 356, The property test currently only asserts "line_count >= 1" which allows truncated batches; change the assertion in generator.rs where line_count is computed from text (variable names: text, line_count) to require exact equality with the expected batch size (use "line_count == total as usize" when batch_size == total) before validating JSON, and apply the same exact-size assertion fix to the analogous complex-mode property (the other occurrence around the lines referenced) so both properties enforce the full batch returned by poll() rather than any non-empty subset.crates/logfwd-io/src/udp_input.rs (2)
123-130:⚠️ Potential issue | 🟠 MajorYield after a drop-signal error.
If the socket keeps returning
ENOBUFS/ENOMEM, this branch never makes progress andpoll()can spin hot forever. Increment the counter, then break or back off so the pipeline regains control.🐛 Suggested change
{ self.drops_detected.fetch_add(1, Ordering::Relaxed); - // Continue draining — there may be more datagrams queued. + break; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 123 - 130, The ENOBUFS/ENOMEM branch only increments self.drops_detected via self.drops_detected.fetch_add(1, Ordering::Relaxed) but then continues the read loop, which can cause poll() to spin; change this branch so after incrementing you break out of the receive/drain loop (or otherwise back off/yield control) so the task regains control — e.g., after the fetch_add return from the current poll iteration or call an async yield/backoff before continuing. Update the branch in udp_input.rs (the ENOBUFS/ENOMEM match arm) to both increment the counter and then break/return/yield so the pipeline can make progress.
35-47:⚠️ Potential issue | 🟠 MajorResolve
addrbefore creating the socket.
parse::<SocketAddr>()only accepts numeric socket literals. This regresseslisten: localhost:5140even though the oldUdpSocket::bindpath and the TCP input accept hostnames. Resolve withToSocketAddrs, then chooseDomain::IPV4/IPV6from the resolvedSocketAddr.Does Rust `std::net::SocketAddr::from_str` accept hostnames like `localhost:5140`, and is `ToSocketAddrs` the correct API to resolve them before binding a socket?🐛 Suggested change
-use std::net::UdpSocket; +use std::net::{ToSocketAddrs, UdpSocket}; @@ - let parsed_addr: std::net::SocketAddr = addr.parse().map_err(io::Error::other)?; + let parsed_addr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no socket address resolved"))?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/udp_input.rs` around lines 35 - 47, Resolve the hostname before creating the socket: replace the direct addr.parse() call with resolving addr via ToSocketAddrs (e.g., addr.to_socket_addrs()) and pick a concrete std::net::SocketAddr (return an error if resolution yields none), then set Domain based on that resolved SocketAddr (use parsed_addr.is_ipv4()/is_ipv6() on the resolved addr), create the Socket::new with that domain and only then bind using sock2.bind(&resolved_addr.into()) to preserve hostname support (also ensure errors from resolution are mapped to io::Error).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-io/src/tcp_input.rs`:
- Around line 125-129: The idle-timeout check currently runs unconditionally and
can drop active clients if this thread is backpressured; instead, only decide
eviction when the socket actually reports WouldBlock. Move the check that
compares now.duration_since(client.last_data) > self.idle_timeout out of the
unconditional path and into the branch handling a read returning WouldBlock (the
same place you handle WouldBlock for the client socket), using the same alive[i]
= false logic; keep using client.last_data and self.idle_timeout so only sockets
with no pending data are evicted.
- Around line 142-155: The current logic using memchr::memrchr on chunk resets
bytes_since_newline to the tail after the last newline but fails to detect an
overflow in the prefix before that newline; fix by scanning chunk for newline
boundaries and checking each segment length against MAX_LINE_LENGTH: iterate
through newline positions in chunk (e.g., memchr::memchr or
memchr::memchr_iter), for each segment compute segment_len = bytes_since_newline
+ segment_bytes_before_newline and if it exceeds MAX_LINE_LENGTH set alive[i] =
false and break; on each newline reset bytes_since_newline = 0, and after
processing all newlines set client.bytes_since_newline to the remaining bytes
after the last newline before calling all_data.extend_from_slice(chunk).
In `@crates/logfwd-io/tests/transport_e2e.rs`:
- Around line 110-112: The test uses substring checks that miscount sequences
(e.g., `"seq":1` matching `"seq":10`); update the assertions inside poll_until
callbacks (the one using variables/data/input and the other occurrences at the
indicated ranges) to either split the captured buffer into individual lines and
check each line's JSON "seq" field exactly, or parse each line as JSON and
assert equality of the numeric seq values (e.g., collect seq numbers and compare
to the expected set); locate the checks in the poll_until closure(s) and replace
the String::contains logic with exact line-based or JSON-parsing membership
checks for `"seq"`.
---
Outside diff comments:
In `@crates/logfwd-io/src/otlp_receiver.rs`:
- Around line 434-456: The key is written raw in write_json_string_field (and
similarly in write_json_field), letting unescaped quotes/control bytes produce
invalid NDJSON; change the implementation to escape the key the same way as the
value by calling the existing write_json_string helper (or reusing its logic)
instead of directly extending key.as_bytes(), and then write the ":" and escaped
value as before—i.e., emit an escaped string for the key, emit the colon
separator, then emit the escaped value using the same write_json_string logic so
both key and value use identical RFC8259 escaping.
---
Duplicate comments:
In `@crates/logfwd-io/src/generator.rs`:
- Around line 355-356: The property test currently only asserts "line_count >=
1" which allows truncated batches; change the assertion in generator.rs where
line_count is computed from text (variable names: text, line_count) to require
exact equality with the expected batch size (use "line_count == total as usize"
when batch_size == total) before validating JSON, and apply the same exact-size
assertion fix to the analogous complex-mode property (the other occurrence
around the lines referenced) so both properties enforce the full batch returned
by poll() rather than any non-empty subset.
In `@crates/logfwd-io/src/udp_input.rs`:
- Around line 123-130: The ENOBUFS/ENOMEM branch only increments
self.drops_detected via self.drops_detected.fetch_add(1, Ordering::Relaxed) but
then continues the read loop, which can cause poll() to spin; change this branch
so after incrementing you break out of the receive/drain loop (or otherwise back
off/yield control) so the task regains control — e.g., after the fetch_add
return from the current poll iteration or call an async yield/backoff before
continuing. Update the branch in udp_input.rs (the ENOBUFS/ENOMEM match arm) to
both increment the counter and then break/return/yield so the pipeline can make
progress.
- Around line 35-47: Resolve the hostname before creating the socket: replace
the direct addr.parse() call with resolving addr via ToSocketAddrs (e.g.,
addr.to_socket_addrs()) and pick a concrete std::net::SocketAddr (return an
error if resolution yields none), then set Domain based on that resolved
SocketAddr (use parsed_addr.is_ipv4()/is_ipv6() on the resolved addr), create
the Socket::new with that domain and only then bind using
sock2.bind(&resolved_addr.into()) to preserve hostname support (also ensure
errors from resolution are mapped to io::Error).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 833d9781-7805-4280-827e-5dd6b4484e67
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
crates/logfwd-io/Cargo.tomlcrates/logfwd-io/src/generator.rscrates/logfwd-io/src/otlp_receiver.rscrates/logfwd-io/src/stderr_capture.rscrates/logfwd-io/src/tcp_input.rscrates/logfwd-io/src/udp_input.rscrates/logfwd-io/tests/transport_e2e.rscrates/logfwd/src/pipeline.rs
| // Track bytes since last newline for max-line-length | ||
| // enforcement. If the sender exceeds the limit without | ||
| // a newline we disconnect them immediately — and do NOT | ||
| // append the offending chunk. | ||
| if let Some(last_nl) = memchr::memrchr(b'\n', chunk) { | ||
| client.bytes_since_newline = n - last_nl - 1; | ||
| } else { | ||
| client.bytes_since_newline += n; | ||
| } | ||
| if client.bytes_since_newline > MAX_LINE_LENGTH { | ||
| alive[i] = false; | ||
| break; | ||
| } | ||
| all_data.extend_from_slice(chunk); |
There was a problem hiding this comment.
The max-line guard misses overflow when the newline arrives in the same chunk.
If bytes_since_newline is already near MAX_LINE_LENGTH and this read contains ...A\n, the code resets from the last newline and never checks the bytes before the first newline in chunk. That lets a > MAX_LINE_LENGTH line slip through.
🐛 Suggested change
- if let Some(last_nl) = memchr::memrchr(b'\n', chunk) {
- client.bytes_since_newline = n - last_nl - 1;
- } else {
- client.bytes_since_newline += n;
- }
- if client.bytes_since_newline > MAX_LINE_LENGTH {
+ let overflow = if let Some(first_nl) = memchr::memchr(b'\n', chunk) {
+ client.bytes_since_newline + first_nl > MAX_LINE_LENGTH
+ } else {
+ client.bytes_since_newline + n > MAX_LINE_LENGTH
+ };
+ if overflow {
alive[i] = false;
break;
}
+ client.bytes_since_newline = memchr::memrchr(b'\n', chunk)
+ .map_or(client.bytes_since_newline + n, |last_nl| n - last_nl - 1);
all_data.extend_from_slice(chunk);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/src/tcp_input.rs` around lines 142 - 155, The current logic
using memchr::memrchr on chunk resets bytes_since_newline to the tail after the
last newline but fails to detect an overflow in the prefix before that newline;
fix by scanning chunk for newline boundaries and checking each segment length
against MAX_LINE_LENGTH: iterate through newline positions in chunk (e.g.,
memchr::memchr or memchr::memchr_iter), for each segment compute segment_len =
bytes_since_newline + segment_bytes_before_newline and if it exceeds
MAX_LINE_LENGTH set alive[i] = false and break; on each newline reset
bytes_since_newline = 0, and after processing all newlines set
client.bytes_since_newline to the remaining bytes after the last newline before
calling all_data.extend_from_slice(chunk).
| let data = poll_until(&mut input, Duration::from_secs(5), |d| { | ||
| let t = String::from_utf8_lossy(d); | ||
| (0..100).all(|i| t.contains(&format!("\"seq\":{i}"))) |
There was a problem hiding this comment.
Match exact seq values, not substrings.
contains("\"seq\":1") also matches "seq":10, "seq":11, etc. These assertions can therefore overcount low-digit messages and pass with gaps. Split by line or parse the JSON before checking membership.
Also applies to: 117-120, 302-307, 313-320
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-io/tests/transport_e2e.rs` around lines 110 - 112, The test
uses substring checks that miscount sequences (e.g., `"seq":1` matching
`"seq":10`); update the assertions inside poll_until callbacks (the one using
variables/data/input and the other occurrences at the indicated ranges) to
either split the captured buffer into individual lines and check each line's
JSON "seq" field exactly, or parse each line as JSON and assert equality of the
numeric seq values (e.g., collect seq numbers and compare to the expected set);
locate the checks in the poll_until closure(s) and replace the String::contains
logic with exact line-based or JSON-parsing membership checks for `"seq"`.
UDP input:
- ENOBUFS/ENOMEM now breaks instead of spinning (no forward progress)
- Removed wall-clock assertion from nonblocking test (scheduler noise)
TCP input:
- Idle check moved AFTER reading — data may arrive between polls
- max_line_length checks accumulated length at first newline before
resetting, catching lines that grew over the limit across chunks
E2E tests:
- seq value matching uses closing brace ("seq":N}) to prevent
substring overcounting ("seq":1 matching "seq":10, "seq":100)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Follow-up to #570 — production hardening for TCP/UDP/OTLP transports that landed after the squash merge.
Hardening
Testing
Addresses
Test plan
cargo fmt --check && cargo clippy -- -D warningsclean🤖 Generated with Claude Code