fix: delete sync ElasticsearchSink, port bench + tests to async factory#793
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughReplaces the synchronous ureq-based Elasticsearch sink with an async pipeline using Tokio and reqwest, removes the old Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 2 warnings)
✅ Passed checks (1 passed)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-bench/benches/elasticsearch_arrow.rs`:
- Around line 140-164: The benchmark currently adds elapsed time to
total_duration immediately after getting Ok(response), which excludes the
expensive response.bytes() and Arrow decoding; move the Instant::now()/start and
elapsed calculation so that timing wraps the entire request+read+decode path:
start the timer before client.post(...).send().await (or immediately before
awaiting response) and compute elapsed after response.bytes() and after
iterating/decoding via StreamReader::try_new and batch processing
(functions/variables: Instant::now, rt.block_on, client.post(...).send(),
response.bytes(), StreamReader::try_new, total_duration, total_rows). Apply the
same change to the other identical block (lines ~190-210) and ensure decode/read
errors are still observed (don’t short-circuit timing on decode failures) so
failures count toward measured latency.
- Around line 142-147: The benchmark currently requests the wrong Arrow media
type and stops timing too early; update the client.post Accept header in the
elasticsearch_arrow benchmark to use the production media type
"application/vnd.apache.arrow.stream" (matching the production query_arrow path)
and move the elapsed timing so it starts just before awaiting
client.post(...).send().await and stops only after the response body is fully
downloaded and the Arrow stream has been parsed/decoded (i.e., include the
body.read/bytes collection and Arrow decoding steps inside the timed section) so
the measured latency includes transfer and parsing; locate the request call
using client.post(...) with ES_ENDPOINT and query_bytes to modify the header and
timing boundaries.
In `@crates/logfwd-bench/src/es_throughput.rs`:
- Around line 228-269: The benchmark progression reuses the same ES index across
multiple run_scenario calls, causing later scenarios to be affected by prior
data; modify run_scenario (or its caller) to ensure each scenario uses an
isolated index by either creating a unique index name per scenario (e.g., append
scenario name, workers, batch size, compress flag) or deleting/clearing the
index between runs; update the call sites in the workers==0 block and the custom
branch so each run_scenario invocation is given a distinct index identifier or
performs an index-cleanup step before starting to guarantee independent
workloads.
- Around line 129-140: The benchmark is incorrectly using the synthetic raw
estimate (rows * 300) for total_wire_bytes; update the send path in the block
that calls sink.send_batch(&result, &meta) to capture and use the actual
encoded/transmitted byte count returned or exposed by the sink (e.g., use any
length/bytes field or return value from TcpSink/OtlpSink or a method like
sink.last_wire_bytes or send_batch_result.wire_bytes) and increment
total_wire_bytes with that real value instead of raw, while continuing to use
total_raw_bytes for the synthetic unencoded estimate and preserving updates to
total_events and total_batches.
In `@crates/logfwd-output/src/elasticsearch.rs`:
- Line 33: Make the batch_buf field private instead of pub to avoid leaking
internal serialization state: remove the pub modifier from batch_buf and add a
/// doc comment per CONFIG_REFERENCE.md for the containing struct; update any
external access to use controlled APIs (e.g., provide a read-only accessor that
returns &[u8] or expose behavior via serialize_batch and send_batch) and ensure
serialize_batch and send_batch operate on the now-private batch_buf; adjust
callers to use the new accessor or methods rather than referencing batch_buf
directly.
- Around line 184-196: The query_arrow implementation parses the response body
as Arrow IPC without checking HTTP status; update query_arrow to first inspect
response.status().is_success() (similar to submit_bulk) and only attempt Arrow
parsing on success, otherwise read the response body or text and return an
io::Error::new containing the HTTP status and the error body/text to surface the
real HTTP error; locate the response handling in function query_arrow and mirror
the status-check/early-error-return pattern used in submit_bulk.
In `@crates/logfwd-output/tests/elasticsearch_arrow_ipc.rs`:
- Around line 41-62: The test currently uses a local helper function named
query_arrow to query ES but never exercises the
ElasticsearchAsyncSink::query_arrow method; update the test to call
sink.query_arrow(&query) wherever the local helper is invoked (e.g., the calls
around sink.send_batch()) so the sink's configured headers and endpoint are
used, and remove or repurpose the local query_arrow helper (only keep it if you
need a header-less baseline) to avoid duplicate logic.
In `@examples/elasticsearch/ARROW_IPC.md`:
- Around line 64-83: The example calls query_arrow on sink returned from
SinkFactory::create(), but SinkFactory::create() returns Box<dyn Sink> and
query_arrow is defined only on the concrete ElasticsearchAsyncSink and is async;
update the docs to show a working pattern: either obtain the concrete async sink
from the factory (e.g., create or downcast to ElasticsearchAsyncSink) and await
its async query_arrow call, or change the factory API to return the concrete
type or add query_arrow to the Sink trait; specifically reference
ElasticsearchSinkFactory, SinkFactory::create,
ElasticsearchAsyncSink::query_arrow and the sink variable, and make clear that
query_arrow is async and must be awaited.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 67265670-2fa0-4465-b48d-0ee42d031a93
⛔ Files ignored due to path filters (5)
Cargo.lockis excluded by!**/*.lockcrates/logfwd-output/src/snapshots/logfwd_output__elasticsearch__snapshot_tests__all_null_fields.snapis excluded by!**/*.snapcrates/logfwd-output/src/snapshots/logfwd_output__elasticsearch__snapshot_tests__basic_multi_type.snapis excluded by!**/*.snapcrates/logfwd-output/src/snapshots/logfwd_output__elasticsearch__snapshot_tests__nullable_columns.snapis excluded by!**/*.snapcrates/logfwd-output/src/snapshots/logfwd_output__elasticsearch__snapshot_tests__special_char_strings.snapis excluded by!**/*.snap
📒 Files selected for processing (9)
.gitignorecrates/logfwd-bench/Cargo.tomlcrates/logfwd-bench/benches/elasticsearch_arrow.rscrates/logfwd-bench/src/es_throughput.rscrates/logfwd-output/src/elasticsearch.rscrates/logfwd-output/src/lib.rscrates/logfwd-output/tests/elasticsearch_arrow_ipc.rscrates/logfwd-output/tests/elasticsearch_integration.rsexamples/elasticsearch/ARROW_IPC.md
| let rows = result.num_rows() as u64; | ||
| let raw = rows as usize * 300; // approximate bytes per row | ||
|
|
||
| if let Err(e) = rt.block_on(sink.send_batch(&result, &meta)) { | ||
| eprintln!("[worker {worker_id}] send_batch error: {e}"); | ||
| total_errors.fetch_add(1, Ordering::Relaxed); | ||
| } else { | ||
| total_raw_bytes.fetch_add(raw as u64, Ordering::Relaxed); | ||
| total_wire_bytes.fetch_add(raw as u64, Ordering::Relaxed); | ||
| total_events.fetch_add(rows, Ordering::Relaxed); | ||
| total_batches.fetch_add(1, Ordering::Relaxed); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n "enum SendResult|pub enum SendResult|send_batch\\(" crates/logfwd-output/src crates/logfwd-bench/src/es_throughput.rsRepository: strawgate/memagent
Length of output: 3523
🏁 Script executed:
cat -n crates/logfwd-output/src/sink.rs | sed -n '20,50p'Repository: strawgate/memagent
Length of output: 1380
🏁 Script executed:
cat -n crates/logfwd-bench/src/es_throughput.rs | sed -n '115,150p'Repository: strawgate/memagent
Length of output: 1459
🏁 Script executed:
cat -n crates/logfwd-output/src/lib.rs | sed -n '75,90p'Repository: strawgate/memagent
Length of output: 702
🏁 Script executed:
cat -n crates/logfwd-output/src/otlp_sink.rs | sed -n '170,200p'Repository: strawgate/memagent
Length of output: 1439
🏁 Script executed:
cat -n crates/logfwd-output/src/tcp_sink.rs | sed -n '90,130p'Repository: strawgate/memagent
Length of output: 1211
🏁 Script executed:
head -50 crates/logfwd-bench/src/es_throughput.rsRepository: strawgate/memagent
Length of output: 1876
Fix byte measurement in benchmark counters.
total_wire_bytes reuses the same synthetic rows * 300 estimate as total_raw_bytes instead of measuring actual encoded length. Sinks like TcpSink and OtlpSink track actual bytes transmitted (with compression), but the benchmark discards this data. Compression ratios and throughput metrics are therefore not based on real wire usage.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-bench/src/es_throughput.rs` around lines 129 - 140, The
benchmark is incorrectly using the synthetic raw estimate (rows * 300) for
total_wire_bytes; update the send path in the block that calls
sink.send_batch(&result, &meta) to capture and use the actual
encoded/transmitted byte count returned or exposed by the sink (e.g., use any
length/bytes field or return value from TcpSink/OtlpSink or a method like
sink.last_wire_bytes or send_batch_result.wire_bytes) and increment
total_wire_bytes with that real value instead of raw, while continuing to use
total_raw_bytes for the synthetic unencoded estimate and preserving updates to
total_events and total_batches.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-output/src/elasticsearch.rs (1)
38-43:⚠️ Potential issue | 🟠 MajorExpose a concrete construction path if
query_arrow()is meant to be public.
ElasticsearchAsyncSink::new()ispub(crate), andElasticsearchSinkFactory::create()erases the concrete type toBox<dyn Sink>. External callers therefore cannot obtain anElasticsearchAsyncSinkto invoke the new method, which is why downstream benches/tests are re-implementing the_querypath instead of exercising this one. Either add a public concrete constructor/factory method or keepquery_arrow()internal.Also applies to: 172-203, 359-367
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-output/src/elasticsearch.rs` around lines 38 - 43, The test/bench reimplements the _query path because ElasticsearchAsyncSink::new is pub(crate) while ElasticsearchSinkFactory::create erases to Box<dyn Sink>, preventing external callers from constructing an ElasticsearchAsyncSink to call query_arrow(); either make a public concrete constructor (e.g., change ElasticsearchAsyncSink::new visibility to pub or add a pub fn new_public(...) that delegates to the existing new) so external code can obtain an ElasticsearchAsyncSink and call query_arrow(), or alternatively keep query_arrow() internal and ensure all query logic is exercised via the Sink trait implementation returned by ElasticsearchSinkFactory::create; update the visibility of ElasticsearchAsyncSink::new or provide a new public factory accordingly and adjust callers/tests to use that public constructor.
♻️ Duplicate comments (4)
crates/logfwd-bench/src/es_throughput.rs (2)
106-107:⚠️ Potential issue | 🟠 MajorThe reported byte metrics are not measuring bytes.
raw payloaduses a fixedrows * 300estimate even thoughline_buf.len()already gives the exact input size, andwire bytesreuses the same estimate, so the printed MB/s and compression ratio are fabricated. Either plumb actual post-serialization/post-compression sizes out of the sink or rename/drop the metric.Also applies to: 128-136
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-bench/src/es_throughput.rs` around lines 106 - 107, The reported byte metrics are wrong because the code uses a fixed estimate (rows * 300) instead of real sizes; change the metrics to use the actual buffer lengths: use line_buf.len() (from gen_json_lines) as the raw payload byte count, and replace the reused estimate for "wire bytes" with the actual post-serialization/compression size returned from the sink (or from the buffer produced by the HTTP/client layer) — if the sink API does not expose a byte count, either plumb that size out of the sink (e.g., return bytes_written from the send function used by es_throughput.rs) or remove/rename the MB/s and compression-ratio calculations; apply the same fix for the related metrics in the block around lines 128-136.
151-158:⚠️ Potential issue | 🟠 MajorIsolate indices per scenario in the default progression.
All of the 1-index scenarios reuse the same
ES_INDEX, so later runs execute against a larger and more merge-heavy index than earlier ones. That confounds the worker/batch/compression comparison this benchmark is trying to make.Also applies to: 240-247
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-bench/src/es_throughput.rs` around lines 151 - 158, The code currently sets indices = vec![index_base] when num_indices <= 1 which causes all 1-index scenarios to reuse the same ES_INDEX; instead always generate per-scenario index names so each run gets its own index (e.g., use the same generation logic for the single-index case: map over 0..num_indices.max(1) and format!("{index_base}-{i}") or when num_indices == 1 produce format!("{index_base}-0")), updating the block that builds indices (variables/functions: index_base, indices, es_index()) and apply the same change to the other identical construction elsewhere in the file so every scenario uses isolated indices rather than a shared base name.crates/logfwd-bench/benches/elasticsearch_arrow.rs (2)
191-209:⚠️ Potential issue | 🟠 MajorKeep JSON timing around body read and parse.
total_durationis updated beforeresponse.bytes()and JSON decoding. Arrow now includes transfer/decode cost, JSON still does not, so the Arrow-vs-JSON comparison remains optimistic on the JSON side. Non-2xx / parse failures are also currently counted as zero-row samples instead of benchmark failures.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-bench/benches/elasticsearch_arrow.rs` around lines 191 - 209, The timing currently stops at start.elapsed() before reading the response body and JSON decode, which makes JSON measurements optimistic and treats non-2xx or parse failures as zero-row successes; move the elapsed/timing update to after awaiting response.bytes() and after successful serde_json::from_slice::<serde_json::Value>(), so total_duration += start.elapsed() only when body read and parse complete, check response.status() (via response.status().is_success()) and treat non-2xx or JSON parse errors as benchmark failures (return Err or propagate the error) instead of adding zero rows, and only increment total_rows when json.get("values").and_then(|v| v.as_array()) succeeds. Use the existing symbols client.post(...)._query, rt.block_on, response.bytes(), serde_json::from_slice, total_duration, total_rows, and start.elapsed() to locate the change.
142-160:⚠️ Potential issue | 🟠 MajorMirror the production Arrow query path here.
This helper has already drifted from
ElasticsearchAsyncSink::query_arrow(): it requestsapplication/vnd.elasticsearch+arrow+streaminstead ofapplication/vnd.apache.arrow.stream, and non-2xx / Arrow parse failures currently collapse into zero-row iterations. That means the benchmark can silently measure an error path under a different response contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-bench/benches/elasticsearch_arrow.rs` around lines 142 - 160, The benchmark HTTP request diverged from production: change the Accept header value in the client request to "application/vnd.apache.arrow.stream" to match ElasticsearchAsyncSink::query_arrow(), and stop treating non-2xx responses or Arrow parse errors as silent zero-row results — after receiving response, assert response.status().is_success() (or propagate/log the error with response.text()/bytes), and when creating StreamReader or iterating batch_result, propagate or surface errors instead of ignoring them so parsing failures fail the benchmark rather than count as zero rows; update the code paths around response.bytes(), StreamReader::try_new, and the for batch_result loop accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-bench/benches/elasticsearch_arrow.rs`:
- Around line 103-113: The helper function index_batch (in
crates/logfwd-bench/benches/elasticsearch_arrow.rs) currently treats any non-IO
error from sink.send_batch as success; change index_batch to inspect the
SendResult returned by rt.block_on(sink.send_batch(batch, &metadata)) and only
treat SendResult::Ok as success, mapping SendResult::RetryAfter and
SendResult::Rejected to a failure (return an Err(std::io::Error) with a clear
message or converted error) so the benchmark fails/records ingestion problems
instead of proceeding on partial/failed sends. Ensure you match on the
SendResult enum returned by the Sink::send_batch implementation and return an
Err for non-Ok variants while preserving existing IO error propagation.
In `@crates/logfwd-bench/src/es_throughput.rs`:
- Around line 131-138: The code treats any Ok from
rt.block_on(sink.send_batch(&result, &meta)) as a success; change the handling
to match the SendResult variants so only Ok(SendResult::Ok) increments
total_raw_bytes, total_wire_bytes, total_events and total_batches, while
Ok(SendResult::RetryAfter(_)) and Ok(SendResult::Rejected(_)) should not
increment these counters (and can increment a separate retry/rejected counter or
fall through to counting as non-success), and Err(e) should continue to
increment total_errors; locate the call to sink.send_batch and replace the if
let Err(...) check with a match on the result to distinguish SendResult::Ok,
SendResult::RetryAfter, SendResult::Rejected, and Err.
---
Outside diff comments:
In `@crates/logfwd-output/src/elasticsearch.rs`:
- Around line 38-43: The test/bench reimplements the _query path because
ElasticsearchAsyncSink::new is pub(crate) while ElasticsearchSinkFactory::create
erases to Box<dyn Sink>, preventing external callers from constructing an
ElasticsearchAsyncSink to call query_arrow(); either make a public concrete
constructor (e.g., change ElasticsearchAsyncSink::new visibility to pub or add a
pub fn new_public(...) that delegates to the existing new) so external code can
obtain an ElasticsearchAsyncSink and call query_arrow(), or alternatively keep
query_arrow() internal and ensure all query logic is exercised via the Sink
trait implementation returned by ElasticsearchSinkFactory::create; update the
visibility of ElasticsearchAsyncSink::new or provide a new public factory
accordingly and adjust callers/tests to use that public constructor.
---
Duplicate comments:
In `@crates/logfwd-bench/benches/elasticsearch_arrow.rs`:
- Around line 191-209: The timing currently stops at start.elapsed() before
reading the response body and JSON decode, which makes JSON measurements
optimistic and treats non-2xx or parse failures as zero-row successes; move the
elapsed/timing update to after awaiting response.bytes() and after successful
serde_json::from_slice::<serde_json::Value>(), so total_duration +=
start.elapsed() only when body read and parse complete, check response.status()
(via response.status().is_success()) and treat non-2xx or JSON parse errors as
benchmark failures (return Err or propagate the error) instead of adding zero
rows, and only increment total_rows when json.get("values").and_then(|v|
v.as_array()) succeeds. Use the existing symbols client.post(...)._query,
rt.block_on, response.bytes(), serde_json::from_slice, total_duration,
total_rows, and start.elapsed() to locate the change.
- Around line 142-160: The benchmark HTTP request diverged from production:
change the Accept header value in the client request to
"application/vnd.apache.arrow.stream" to match
ElasticsearchAsyncSink::query_arrow(), and stop treating non-2xx responses or
Arrow parse errors as silent zero-row results — after receiving response, assert
response.status().is_success() (or propagate/log the error with
response.text()/bytes), and when creating StreamReader or iterating
batch_result, propagate or surface errors instead of ignoring them so parsing
failures fail the benchmark rather than count as zero rows; update the code
paths around response.bytes(), StreamReader::try_new, and the for batch_result
loop accordingly.
In `@crates/logfwd-bench/src/es_throughput.rs`:
- Around line 106-107: The reported byte metrics are wrong because the code uses
a fixed estimate (rows * 300) instead of real sizes; change the metrics to use
the actual buffer lengths: use line_buf.len() (from gen_json_lines) as the raw
payload byte count, and replace the reused estimate for "wire bytes" with the
actual post-serialization/compression size returned from the sink (or from the
buffer produced by the HTTP/client layer) — if the sink API does not expose a
byte count, either plumb that size out of the sink (e.g., return bytes_written
from the send function used by es_throughput.rs) or remove/rename the MB/s and
compression-ratio calculations; apply the same fix for the related metrics in
the block around lines 128-136.
- Around line 151-158: The code currently sets indices = vec![index_base] when
num_indices <= 1 which causes all 1-index scenarios to reuse the same ES_INDEX;
instead always generate per-scenario index names so each run gets its own index
(e.g., use the same generation logic for the single-index case: map over
0..num_indices.max(1) and format!("{index_base}-{i}") or when num_indices == 1
produce format!("{index_base}-0")), updating the block that builds indices
(variables/functions: index_base, indices, es_index()) and apply the same change
to the other identical construction elsewhere in the file so every scenario uses
isolated indices rather than a shared base name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 43513e7c-22d2-4e6d-bfa0-e0471dad5f66
📒 Files selected for processing (4)
crates/logfwd-bench/benches/elasticsearch_arrow.rscrates/logfwd-bench/src/es_throughput.rscrates/logfwd-output/src/elasticsearch.rscrates/logfwd-output/tests/elasticsearch_arrow_ipc.rs
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/logfwd-config/src/lib.rs (1)
500-542:⚠️ Potential issue | 🟡 MinorRemove dead
FileOutvalidation arm in second match block.
FileOutis rejected as unimplemented at lines 491–497, so the dedicated validation arm at lines 523–529 is unreachable. Remove it to eliminate dead code from this hot validation path.♻️ Proposed cleanup
match output.output_type { OutputType::Otlp | OutputType::Http | OutputType::Elasticsearch | OutputType::Loki => { if output.endpoint.is_none() { return Err(ConfigError::Validation(format!( "pipeline '{name}' output '{label}': {} output requires 'endpoint'", output_type_name(&output.output_type), ))); } if let Some(ep) = &output.endpoint && let Err(msg) = validate_endpoint_url(ep) { return Err(ConfigError::Validation(format!( "pipeline '{name}' output '{label}': {msg}", ))); } if output.output_type == OutputType::Otlp && output.compression.as_deref() == Some("gzip") { return Err(ConfigError::Validation(format!( "pipeline '{name}' output '{label}': otlp output does not support 'gzip' compression yet" ))); } } - OutputType::FileOut => { - if output.path.is_none() { - return Err(ConfigError::Validation(format!( - "pipeline '{name}' output '{label}': {} output requires 'path'", - output_type_name(&output.output_type), - ))); - } - } OutputType::Stdout | OutputType::Null => {} OutputType::TcpOut | OutputType::UdpOut => { if output.endpoint.is_none() { return Err(ConfigError::Validation(format!( "pipeline '{name}' output '{label}': {} output requires 'endpoint'", output_type_name(&output.output_type), ))); } } // FileOut and Parquet are already rejected above. OutputType::FileOut | OutputType::Parquet => { unreachable!("placeholder types are rejected before this match") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-config/src/lib.rs` around lines 500 - 542, The match on output.output_type contains a dead arm for OutputType::FileOut that is unreachable because FileOut is already rejected earlier; remove the dedicated OutputType::FileOut validation arm (the branch that checks output.path) from the second match so the match only keeps the required branches (Otlp/Http/Elasticsearch/Loki, Stdout/Null, TcpOut/UdpOut and the final placeholder arm OutputType::FileOut | OutputType::Parquet if desired), leaving the earlier rejection code for FileOut in place; locate the branch by finding the match on output.output_type and the OutputType::FileOut arm that references output.path and delete that arm.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-output/src/elasticsearch.rs`:
- Around line 232-250: The current branch in send_batch_recursive returns Err
when splitting reaches MAX_SPLIT_DEPTH and still gets a 413, causing callers to
treat it as an error instead of a deterministic rejection; change that terminal
case to return Ok(super::sink::SendResult::Rejected) (consistent with do_send’s
4xx handling) rather than Err(e). Locate the match arm in send_batch_recursive
that checks Err(e) if e.kind() == io::ErrorKind::InvalidInput && depth <
MAX_SPLIT_DEPTH and modify the fall-through/terminal path (including the
analogous code around lines 295-301) so that when splitting can no longer make
progress you convert the final 413 outcome into Ok(SendResult::Rejected) instead
of propagating Err. Ensure other non-413 Err branches still return Err(e).
---
Outside diff comments:
In `@crates/logfwd-config/src/lib.rs`:
- Around line 500-542: The match on output.output_type contains a dead arm for
OutputType::FileOut that is unreachable because FileOut is already rejected
earlier; remove the dedicated OutputType::FileOut validation arm (the branch
that checks output.path) from the second match so the match only keeps the
required branches (Otlp/Http/Elasticsearch/Loki, Stdout/Null, TcpOut/UdpOut and
the final placeholder arm OutputType::FileOut | OutputType::Parquet if desired),
leaving the earlier rejection code for FileOut in place; locate the branch by
finding the match on output.output_type and the OutputType::FileOut arm that
references output.path and delete that arm.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4d3f1bc0-f7d1-4d66-937e-ef71b0a8ee7d
📒 Files selected for processing (2)
crates/logfwd-config/src/lib.rscrates/logfwd-output/src/elasticsearch.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
crates/logfwd-output/src/elasticsearch.rs (1)
251-259:⚠️ Potential issue | 🟠 MajorTerminal 413 still surfaces as
Errinstead ofRejected.When splitting is exhausted (
n == 1ordepth >= MAX_SPLIT_DEPTH), a 413 falls through toErr(e) => Err(e). This can cause infinite retries since the caller treatsErras transient. Other 4xx errors returnRejectedindo_send(lines 332-336); terminal 413 should do the same.Proposed fix
Err(e) if e.kind() == io::ErrorKind::InvalidInput && n > 1 && depth < MAX_SPLIT_DEPTH => { self.send_split_halves(batch, metadata, depth).await } + // Terminal 413: cannot split further → treat as rejection + Err(e) if e.kind() == io::ErrorKind::InvalidInput => { + Ok(super::sink::SendResult::Rejected( + "ES 413: batch cannot be split further".to_string(), + )) + } Err(e) => Err(e),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-output/src/elasticsearch.rs` around lines 251 - 259, The match arm currently returning Err(e) when encountering io::ErrorKind::InvalidInput (HTTP 413) after splitting is exhausted should instead return Rejected so the caller doesn't treat it as transient; in the method where the match lives (the branch that calls self.send_split_halves(batch, metadata, depth)), update the fallback Err(e) => Err(e) arm to detect when e.kind() == io::ErrorKind::InvalidInput and (n == 1 || depth >= MAX_SPLIT_DEPTH) and return Err(SendResult::Rejected(...)) (or the crate's Rejected variant used by do_send) with an appropriate rejection reason, otherwise keep returning Err(e); ensure the change aligns with do_send's handling of other 4xx rejections so terminal 413s are treated the same as other Rejected cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@crates/logfwd-output/src/elasticsearch.rs`:
- Around line 251-259: The match arm currently returning Err(e) when
encountering io::ErrorKind::InvalidInput (HTTP 413) after splitting is exhausted
should instead return Rejected so the caller doesn't treat it as transient; in
the method where the match lives (the branch that calls
self.send_split_halves(batch, metadata, depth)), update the fallback Err(e) =>
Err(e) arm to detect when e.kind() == io::ErrorKind::InvalidInput and (n == 1 ||
depth >= MAX_SPLIT_DEPTH) and return Err(SendResult::Rejected(...)) (or the
crate's Rejected variant used by do_send) with an appropriate rejection reason,
otherwise keep returning Err(e); ensure the change aligns with do_send's
handling of other 4xx rejections so terminal 413s are treated the same as other
Rejected cases.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 51e6d4d9-4113-4caf-85ed-9906a2da16ca
📒 Files selected for processing (1)
crates/logfwd-output/src/elasticsearch.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/logfwd-io/src/tail.rs (3)
552-565: 🧹 Nitpick | 🔵 TrivialConsider pre-allocating
resultto reduce reallocations.With 256 KB
read_buf_sizeand 4 MiBmax_read_bytes, the loop may iterate ~16 times. Startingresultat capacity 0 causes multiple reallocations. Sincecurrent_size - tailed.offsetis already known, a capacity hint is straightforward.Proposed change
// Read available bytes, up to max_read_bytes per poll to prevent // unbounded batch sizes downstream. let max_read = self.config.max_read_bytes; + let bytes_available = (current_size - tailed.offset) as usize; + let capacity_hint = bytes_available.min(max_read); - let mut result = Vec::new(); + let mut result = Vec::with_capacity(capacity_hint); loop {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/tail.rs` around lines 552 - 565, Pre-allocate the Vec used for reads to avoid repeated reallocations: compute the expected read capacity as the minimum of self.config.max_read_bytes and the remaining bytes (current_size - tailed.offset) converted to usize, handle potential conversion/overflow, and create result with Vec::with_capacity(expected_capacity) (or call result.reserve_exact) before the loop that reads into self.read_buf; keep the existing loop logic and only change the initialization of result (symbols: result, self.config.max_read_bytes, tailed.offset, self.read_buf / read_buf_size).
625-1401: 🧹 Nitpick | 🔵 TrivialConsider adding a test for the
max_read_byteslimit.Existing tests don't verify that oversized files are read across multiple polls. A test creating a file larger than
max_read_bytes, then asserting multiplepoll()calls are needed to drain it, would guard against regressions.Example test sketch
#[test] fn test_max_read_bytes_caps_per_poll() { let dir = tempfile::tempdir().unwrap(); let log_path = dir.path().join("big.log"); // Write ~1 MiB of data. let data_size = 1024 * 1024; { let mut f = File::create(&log_path).unwrap(); f.write_all(&vec![b'x'; data_size]).unwrap(); } let config = TailConfig { start_from_end: false, poll_interval_ms: 10, max_read_bytes: 256 * 1024, // 256 KiB limit ..Default::default() }; let mut tailer = FileTailer::new(std::slice::from_ref(&log_path), config).unwrap(); // First poll should return at most ~256 KiB. std::thread::sleep(Duration::from_millis(50)); let events = tailer.poll().unwrap(); let first_chunk: Vec<u8> = events .iter() .filter_map(|e| match e { TailEvent::Data { bytes, .. } => Some(bytes.clone()), _ => None, }) .flatten() .collect(); assert!(first_chunk.len() <= 256 * 1024 + 256 * 1024, "should cap near max_read_bytes"); assert!(first_chunk.len() < data_size, "should not read entire file in one poll"); // Subsequent polls should drain the rest. let mut total = first_chunk.len(); for _ in 0..10 { std::thread::sleep(Duration::from_millis(20)); let events = tailer.poll().unwrap(); let chunk: Vec<u8> = events .iter() .filter_map(|e| match e { TailEvent::Data { bytes, .. } => Some(bytes.clone()), _ => None, }) .flatten() .collect(); total += chunk.len(); if total >= data_size { break; } } assert_eq!(total, data_size, "should eventually read all data"); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/tail.rs` around lines 625 - 1401, Add a new unit test (e.g. test_max_read_bytes_caps_per_poll) to the existing tests module that creates a large file (~1 MiB), constructs a TailConfig with start_from_end=false and max_read_bytes set to a smaller value (e.g. 256 KiB), creates a FileTailer via FileTailer::new, then call tailer.poll() multiple times (with small sleeps) collecting TailEvent::Data bytes and assert that the first poll returns no more than the configured max_read_bytes (allowing small framing overhead) and that subsequent polls eventually drain the whole file; use TailEvent::Data matching to extract bytes and ensure total bytes read equals the original file size.
527-528:⚠️ Potential issue | 🟡 MinorDoc comment now inaccurate.
The comment says "Read ALL available new data" but the function now respects
max_read_bytes. Should clarify that reads are capped and remaining data deferred to next poll.Suggested fix
- /// Read ALL available new data from a file. Drains until read() returns 0. - /// Returns None if no new data. + /// Read new data from a file, up to `max_read_bytes` per call. + /// Remaining data is read on subsequent polls. Returns None if no new data.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/logfwd-io/src/tail.rs` around lines 527 - 528, Update the doc comment that currently states "Read ALL available new data from a file" to clarify that the function reads at most max_read_bytes per call and will return partial data if more is available, deferring remaining bytes to the next poll; keep the note that it returns None if no new data. Mention the max_read_bytes parameter and the polling/next-poll behavior so callers understand the capped-read semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@crates/logfwd-io/src/tail.rs`:
- Around line 552-565: Pre-allocate the Vec used for reads to avoid repeated
reallocations: compute the expected read capacity as the minimum of
self.config.max_read_bytes and the remaining bytes (current_size -
tailed.offset) converted to usize, handle potential conversion/overflow, and
create result with Vec::with_capacity(expected_capacity) (or call
result.reserve_exact) before the loop that reads into self.read_buf; keep the
existing loop logic and only change the initialization of result (symbols:
result, self.config.max_read_bytes, tailed.offset, self.read_buf /
read_buf_size).
- Around line 625-1401: Add a new unit test (e.g.
test_max_read_bytes_caps_per_poll) to the existing tests module that creates a
large file (~1 MiB), constructs a TailConfig with start_from_end=false and
max_read_bytes set to a smaller value (e.g. 256 KiB), creates a FileTailer via
FileTailer::new, then call tailer.poll() multiple times (with small sleeps)
collecting TailEvent::Data bytes and assert that the first poll returns no more
than the configured max_read_bytes (allowing small framing overhead) and that
subsequent polls eventually drain the whole file; use TailEvent::Data matching
to extract bytes and ensure total bytes read equals the original file size.
- Around line 527-528: Update the doc comment that currently states "Read ALL
available new data from a file" to clarify that the function reads at most
max_read_bytes per call and will return partial data if more is available,
deferring remaining bytes to the next poll; keep the note that it returns None
if no new data. Mention the max_read_bytes parameter and the polling/next-poll
behavior so callers understand the capped-read semantics.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: db1dfb41-a4fc-4ea3-a6be-69e449e29608
📒 Files selected for processing (1)
crates/logfwd-io/src/tail.rs
5cf4644 to
5c04e0d
Compare
Adds a new benchmark binary measuring end-to-end Elasticsearch output throughput with pprof flamegraph support. Credentials are read from ES_ENDPOINT / ES_API_KEY env vars (no hardcoded secrets). Adds HANDOFF.md documenting findings from the CPU profiling session and detailed next steps for deleting the dead sync ElasticsearchSink and rewriting the bench to use the async production path. Profiling findings: - ~1,900 evt/s baseline (network-bound, GCP us-central1 cluster) - ~15.9% CPU in DNS re-resolution per request (ureq, no connection pooling) - Bench was using sync sink (dead code path in production) - Production ElasticsearchSinkFactory has pooling, gzip, 4 workers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The sync ElasticsearchSink (ureq, no connection pooling, no gzip) was dead code — the production path uses ElasticsearchSinkFactory (reqwest, connection pooling, gzip, up to 4 workers). This commit: - Deletes ElasticsearchSink (~300 lines) and all ureq usage in ES code - Ports es-throughput bench to use ElasticsearchSinkFactory (async) - Ports elasticsearch_arrow bench to use reqwest + tokio - Ports integration tests and Arrow IPC tests to async factory - Adds query_arrow() to ElasticsearchAsyncSink for ES|QL Arrow queries - Accepts new snapshots (now include injected @timestamp) - Adds .env to .gitignore, es-throughput accepts ES_URL env var Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The ts_str format started with a spurious `"` that doubled the closing quote of the previous field value, producing `gateway"","@timestamp"`. Removed the extra quote so the injected comma-separated field is valid. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Workers round-robin across indices (e.g. logfwd-bench-0, logfwd-bench-1) to let serverless ES distribute shard writes. New CLI arg: ./es-throughput [duration] [workers] [batch] [compress] [indices] ./es-throughput 300 16 5000 1 4 # 5min, 16 workers, 4 indices Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- query_arrow: check HTTP status before parsing Arrow IPC response - batch_buf: pub → pub(crate) to avoid leaking internal state - Benchmark: stop clock after full response read + decode, not after send() - Test helper: add HTTP status check matching sink implementation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Serverless ES tiers have lower payload limits than self-hosted. When a _bulk request returns 413, the sink now splits the Arrow RecordBatch in half and retries each half, recursing up to 4 levels deep (16 sub-batches). This lets logfwd auto-adapt to any payload size limit without manual batch_target_bytes tuning. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of only reacting to 413 responses, the sink now checks the serialized payload size before sending. If it exceeds max_bulk_bytes (default 5 MiB), the Arrow RecordBatch is split in half and each half is sent as a separate _bulk request. Recursive up to depth 6 (64 sub-batches). The 413 fallback is kept as a safety net. This prevents the 60s worker timeout that occurred when a single large batch took too long to split-and-retry reactively. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5c04e0d to
3a0e3ee
Compare
Summary
ElasticsearchSink(sync/ureq, ~300 lines) — production usesElasticsearchSinkFactory(async/reqwest)es-throughputbench binary to use async factory (reqwest, connection pooling, gzip)elasticsearch_arrowbench to use reqwest + tokio instead of ureqquery_arrow()toElasticsearchAsyncSinkfor ES|QL Arrow IPC queries.envto.gitignore,es-throughputacceptsES_URLenv varSupersedes #788 which only had the sync bench + HANDOFF.md.
Test plan
cargo test -p logfwd-output— all passing (20 lib + 3 integration)cargo test -p logfwd --lib— 51 passingcargo check --all-targets— cleanes-throughputagainst real ES cluster withES_URL+ES_API_KEY🤖 Generated with Claude Code