fix: OTLP wire format — raw zstd + correct resource attribute field tag#887
Conversation
Two P0 fixes in the OTLP encoding path: 1. Use raw zstd compression instead of ChunkCompressor (#455). ChunkCompressor prepends a 16-byte logfwd-internal header that OTLP receivers cannot parse. Now uses zstd::bulk::Compressor directly to produce standard zstd frames per HTTP Content-Encoding spec. 2. Parameterize protobuf field tag in encode_key_value_* functions (#642). Resource.attributes is field 1, LogRecord.attributes is field 6. Previously all call sites hardcoded field 6, causing resource attributes to be misplaced in the protobuf output. Fixes #455, Fixes #642 Part of work-unit #869 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughThe PR replaces the OTLP sink's zstd compression mechanism from a custom logfwd wire format to raw zstd frames by adding the Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (3 errors, 3 warnings)
✅ Passed checks (1 passed)
Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/logfwd-output/src/otlp_sink.rs (2)
641-650:⚠️ Potential issue | 🟡 Minor
expect()on payload size could panic on oversized batches.If
payload.len() > u32::MAX, this panics. While 4 GiB payloads are unlikely, a defensive check returningio::Errorwould be safer in production.🛡️ Defensive alternative
fn write_grpc_frame(buf: &mut Vec<u8>, payload: &[u8], compressed: bool) { + let len = u32::try_from(payload.len()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "gRPC payload exceeds 4 GiB"))?; buf.clear(); buf.push(u8::from(compressed)); - buf.extend_from_slice( - &u32::try_from(payload.len()) - .expect("gRPC message payload must be < 4 GiB") - .to_be_bytes(), - ); + buf.extend_from_slice(&len.to_be_bytes()); buf.extend_from_slice(payload); + Ok(()) }This requires updating the signature to return
io::Result<()>and propagating insend_batch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-output/src/otlp_sink.rs` around lines 641 - 650, The helper write_grpc_frame currently calls expect(...) and can panic for payloads > u32::MAX; change write_grpc_frame to return std::io::Result<()> instead of panicking, validate payload.len() <= u32::MAX, and return Err(std::io::Error::new(io::ErrorKind::InvalidData, "gRPC message payload too large")) when it exceeds; update callers (notably send_batch) to propagate the error (using ?), and adapt any call sites to handle or convert the io::Error as needed.
201-215:⚠️ Potential issue | 🟠 MajorHot-path allocation:
compress()allocates a newVecon every batch.
compressor.compress()creates a fresh buffer per send call, violating the "no per-record allocations" rule. The correct fix reusescompress_bufwithcompress_to_buffer()via aCursor, as shown incrates/logfwd-io/src/compress.rs.🔧 Correct fix: reuse buffer with Cursor
Compression::Zstd => { if let Some(ref mut compressor) = self.compressor { // Produce raw zstd frames — no logfwd-internal header. // OTLP receivers expect standard zstd per HTTP Content-Encoding. - self.compress_buf.clear(); - let compressed = compressor - .compress(&self.encoder_buf) - .map_err(io::Error::other)?; - self.compress_buf = compressed; + // Resize to worst-case bound to avoid reallocation. + let max_compressed = zstd::zstd_safe::compress_bound(self.encoder_buf.len()); + self.compress_buf.clear(); + self.compress_buf.resize(max_compressed, 0); + let mut cursor = std::io::Cursor::new(&mut self.compress_buf); + let compressed_len = compressor + .compress_to_buffer(&self.encoder_buf, &mut cursor) + .map_err(io::Error::other)?; + self.compress_buf.truncate(compressed_len); &self.compress_buf🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-output/src/otlp_sink.rs` around lines 201 - 215, The current hot path calls compressor.compress(&self.encoder_buf) which allocates a new Vec each send; change it to reuse self.compress_buf by calling the non-allocating API (compress_to_buffer) with a Cursor over self.compress_buf: clear or truncate self.compress_buf, create a Cursor (std::io::Cursor) around it, call compressor.compress_to_buffer(&mut cursor, &self.encoder_buf) (as implemented in compress.rs), then use &self.compress_buf as the payload; keep the branch and symbol names (Compression::Zstd, self.compressor, self.compress_buf, self.encoder_buf) so only the allocation call is replaced with the Cursor-based compress_to_buffer approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/logfwd-output/src/otlp_sink.rs`:
- Around line 641-650: The helper write_grpc_frame currently calls expect(...)
and can panic for payloads > u32::MAX; change write_grpc_frame to return
std::io::Result<()> instead of panicking, validate payload.len() <= u32::MAX,
and return Err(std::io::Error::new(io::ErrorKind::InvalidData, "gRPC message
payload too large")) when it exceeds; update callers (notably send_batch) to
propagate the error (using ?), and adapt any call sites to handle or convert the
io::Error as needed.
- Around line 201-215: The current hot path calls
compressor.compress(&self.encoder_buf) which allocates a new Vec each send;
change it to reuse self.compress_buf by calling the non-allocating API
(compress_to_buffer) with a Cursor over self.compress_buf: clear or truncate
self.compress_buf, create a Cursor (std::io::Cursor) around it, call
compressor.compress_to_buffer(&mut cursor, &self.encoder_buf) (as implemented in
compress.rs), then use &self.compress_buf as the payload; keep the branch and
symbol names (Compression::Zstd, self.compressor, self.compress_buf,
self.encoder_buf) so only the allocation call is replaced with the Cursor-based
compress_to_buffer approach.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: b9fab4b7-891f-4a68-8f6f-0abb144a5237
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
crates/logfwd-output/Cargo.tomlcrates/logfwd-output/src/otlp_sink.rs
… panic Reuse self.compress_buf via compress_to_buffer() instead of allocating a new Vec on every batch in the hot path. Replace expect() panic in write_grpc_frame with io::Result propagation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… panic (#888) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
ChunkCompressor(which prepends a 16-byte logfwd-internal header) withzstd::bulk::Compressorso OTLP receivers get standard zstd frames per HTTPContent-Encodingspec.field_numberparameter toencode_key_value_{string,int,double,bool}. Resource attributes now use field 1 (Resource.attributes), LogRecord attributes use field 6 (LogRecord.attributes).Fixes #455, Fixes #642
Part of work-unit #869
Test plan
cargo test -p logfwd-output— 140 tests pass (including OTLP encoding tests)cargo test -p logfwd-core— 66 tests passcargo clippy -p logfwd-output -- -D warnings— cleancargo fmt— clean🤖 Generated with Claude Code