Follow-up: unify receiver HTTP body helpers and harden health transitions#1606
Conversation
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
WalkthroughThis change extracts common HTTP request-handling utilities into a new shared Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 1 warning)
✅ Passed checks (2 passed)
Comment |
ApprovabilityVerdict: Approved This PR consolidates duplicate HTTP body helper functions into a shared module and hardens atomic health state transitions with proper CAS loops. These are mechanical refactoring and correctness improvements with no new features or significant behavioral changes. You can customize Macroscope's approvability policy. Learn more. |
There was a problem hiding this comment.
Pull request overview
Follow-up refactor to reduce duplicated HTTP body handling across receivers and to make OTAP/Arrow receiver health updates race-free under concurrent request handling.
Changes:
- Added
receiver_httpmodule with sharedContent-Lengthparsing and bounded body reading (withContent-Lengthpreallocation hints). - Routed OTLP, OTAP, Arrow IPC, and generic
http_inputrequest body reads through the shared helper. - Hardened OTAP and Arrow IPC receiver health transitions using atomic CAS loops to avoid lost updates under concurrency.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/logfwd-io/src/receiver_http.rs | New shared helpers for parsing Content-Length and reading bounded request bodies. |
| crates/logfwd-io/src/otlp_receiver.rs | Uses shared HTTP helper to remove local duplicated body logic. |
| crates/logfwd-io/src/otap_receiver.rs | Uses shared HTTP helper; updates receiver health via CAS loop to avoid lost transitions. |
| crates/logfwd-io/src/arrow_ipc_receiver.rs | Uses shared HTTP helper; updates receiver health via CAS loop to avoid lost transitions. |
| crates/logfwd-io/src/http_input.rs | Uses shared HTTP helper to remove local duplicated body logic. |
| crates/logfwd-io/src/lib.rs | Registers the new receiver_http module. |
| let mut body = body; | ||
| let mut out = Vec::with_capacity( | ||
| content_length_hint | ||
| .map(|hint| hint.min(max_body_size as u64) as usize) | ||
| .unwrap_or_default(), | ||
| ); |
There was a problem hiding this comment.
read_limited_body preallocates the output buffer up to min(Content-Length, max_body_size). For receivers with configurable max_body_size (e.g., http_input), a large configured limit plus a large Content-Length can force an immediate large allocation per request, increasing memory pressure for slowloris-style traffic and high concurrency. Consider capping the preallocation to a smaller fixed upper bound (while still enforcing max_body_size during reads), or reserving incrementally as frames arrive rather than allocating the full hinted size up front.
| pub(crate) fn declared_content_length(headers: &HeaderMap) -> Option<u64> { | ||
| headers | ||
| .get(CONTENT_LENGTH) | ||
| .and_then(|value| value.to_str().ok()) | ||
| .and_then(|value| value.parse::<u64>().ok()) | ||
| } | ||
|
|
||
| pub(crate) async fn read_limited_body( | ||
| body: Body, | ||
| max_body_size: usize, | ||
| content_length_hint: Option<u64>, | ||
| ) -> Result<Vec<u8>, StatusCode> { | ||
| if content_length_hint.is_some_and(|hint| hint > max_body_size as u64) { | ||
| return Err(StatusCode::PAYLOAD_TOO_LARGE); | ||
| } |
There was a problem hiding this comment.
New shared helpers declared_content_length / read_limited_body are now a single point of correctness for multiple receivers, but this new module doesn’t have direct unit tests. Adding focused tests (e.g., invalid/absent Content-Length, Content-Length > max triggers 413, streaming body that exceeds max_body_size triggers 413, and non-data frames are skipped) would help prevent regressions across OTLP/OTAP/Arrow/http_input.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-io/src/otlp_receiver.rs (1)
373-402:⚠️ Potential issue | 🟡 MinorInconsistent health update pattern vs. other receivers.
PR objective states "harden OTAP/Arrow receiver health transitions using atomic CAS loops." The OTAP and Arrow receivers now use
compare_exchange_weakloops instore_health_event, but OTLP receiver still uses plainstore(..., Ordering::Relaxed)for health updates.This creates an inconsistency: under concurrent request handling, OTLP receiver health updates can exhibit the same load/store races that the CAS loops were introduced to prevent in the other receivers.
Consider unifying by either:
- Adding a shared
store_health_eventtoreceiver_health.rsand using it here, or- Documenting why OTLP doesn't need the same treatment
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/otlp_receiver.rs` around lines 373 - 402, The OTLP receiver is still using direct atomic store calls (state.health.store(..., Ordering::Relaxed)) for health transitions inside the send_result match, which is inconsistent with the CAS-based updates used by OTAP/Arrow; update the OTLP handling to use the same CAS-loop helper (store_health_event) from receiver_health.rs (or implement the same compare_exchange_weak loop inline) when setting ComponentHealth::Healthy, Degraded and Failed, and preserve the existing is_running check before marking Failed so health updates are performed via the compare-and-swap loop to avoid load/store races; locate uses in otlp_receiver.rs around the send_result match and replace each state.health.store(...) with a call to store_health_event(state, ComponentHealth::X) (or replicate the identical compare_exchange_weak loop logic referencing ComponentHealth and state.health) so behavior matches other receivers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/logfwd-io/src/otlp_receiver.rs`:
- Around line 373-402: The OTLP receiver is still using direct atomic store
calls (state.health.store(..., Ordering::Relaxed)) for health transitions inside
the send_result match, which is inconsistent with the CAS-based updates used by
OTAP/Arrow; update the OTLP handling to use the same CAS-loop helper
(store_health_event) from receiver_health.rs (or implement the same
compare_exchange_weak loop inline) when setting ComponentHealth::Healthy,
Degraded and Failed, and preserve the existing is_running check before marking
Failed so health updates are performed via the compare-and-swap loop to avoid
load/store races; locate uses in otlp_receiver.rs around the send_result match
and replace each state.health.store(...) with a call to
store_health_event(state, ComponentHealth::X) (or replicate the identical
compare_exchange_weak loop logic referencing ComponentHealth and state.health)
so behavior matches other receivers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: c29718c6-c1da-488b-b3ea-b7fe5973bf02
📒 Files selected for processing (6)
crates/logfwd-io/src/arrow_ipc_receiver.rscrates/logfwd-io/src/http_input.rscrates/logfwd-io/src/lib.rscrates/logfwd-io/src/otap_receiver.rscrates/logfwd-io/src/otlp_receiver.rscrates/logfwd-io/src/receiver_http.rs
Summary
receiver_httpforContent-Lengthparsing and bounded body readsContent-Lengthhints (while still enforcing max body size)Context
This is a follow-up to the axum receiver migration work to address review feedback and tighten concurrency behavior.
Validation
just lintcargo test -p logfwd-io otlp_receiver::tests:: -- --nocapturecargo test -p logfwd-io otap_receiver::tests:: -- --nocapturecargo test -p logfwd-io arrow_ipc_receiver::tests:: -- --nocapturecargo test -p logfwd-io http_input::tests:: -- --nocaptureNote
Unify HTTP body helpers and fix concurrent health state transitions in receivers
declared_content_lengthandread_limited_bodyhelpers into a newreceiver_httpmodule, replacing duplicated local implementations in the Arrow IPC, OTAP, OTLP, and HTTP input receivers.read_limited_bodyaccepts an optional content-length hint and returns 413 immediately (before reading any body data) when the declaredContent-Lengthexceeds the configured limit.store_health_eventacross the Arrow IPC and OTAP receivers by replacing a non-atomic load/store sequence with acompare_exchange_weakCAS loop, preventing lost updates under concurrent modifications.Macroscope summarized cd86b64.