Refactor to unify OutputSink and Sink traits by removing sync outputs#857
Conversation
… Sink as canonical Agent-Logs-Url: https://github.com/strawgate/memagent/sessions/363be27f-1dd8-4e98-a8fb-5422c2fb2a80 Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
…update all call sites Agent-Logs-Url: https://github.com/strawgate/memagent/sessions/363be27f-1dd8-4e98-a8fb-5422c2fb2a80 Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
Agent-Logs-Url: https://github.com/strawgate/memagent/sessions/363be27f-1dd8-4e98-a8fb-5422c2fb2a80 Co-authored-by: strawgate <6384545+strawgate@users.noreply.github.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: Repository YAML (base), Organization UI (inherited) Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR transitions the logfwd output subsystem from a synchronous Possibly related PRs
Caution Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional.
❌ Failed checks (2 errors, 2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/logfwd-output/src/sink.rs`:
- Around line 172-173: Add a /// doc comment for the public constructor
OnceAsyncFactory::new describing what the factory does, its parameters (name:
String and sink: Box<dyn Sink>), and the returned type/behavior (creates a
OnceAsyncFactory that will forward to the provided sink once asynchronously).
Follow the project's CONFIG_REFERENCE.md doc style used elsewhere in this module
(brief description line, param annotations for `name` and `sink`, and any
panics/notes if applicable) and place the comment directly above the impl/new
signature to match the rest of the public output API.
In `@crates/logfwd-test-utils/Cargo.toml`:
- Line 13: The crate currently pins tokio locally (tokio = { version = "1",
features = ["rt", "time", "macros"] }); instead declare tokio in the root
[workspace.dependencies] with the explicit version (e.g., tokio = "1") and then
change this crate's dependency to reference the workspace (tokio = { workspace =
true, features = ["rt","time","macros"] }) so the workspace policy is followed;
update the workspace root Cargo.toml and replace the local tokio entry in this
crate's Cargo.toml accordingly.
In `@crates/logfwd/src/pipeline.rs`:
- Around line 287-290: The single-use async sink created in with_sink currently
uses a 30s idle timeout which lets the sole worker exit and causes subsequent
create() to hit the "already consumed" error; change the timeout passed to
OutputWorkerPool::new in with_sink to the same non-expiring value used for
single-use sinks in from_config (e.g., use Duration::MAX or the same sentinel
timeout constant) so the single worker will not idle-expire and the
OnceAsyncFactory won't be re-created and rejected.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 00adaf6b-f995-4ca8-b7c9-8ba398c6c09e
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
crates/logfwd-bench/benches/pipeline.rscrates/logfwd-output/src/fanout.rscrates/logfwd-output/src/json_lines.rscrates/logfwd-output/src/lib.rscrates/logfwd-output/src/loki.rscrates/logfwd-output/src/null.rscrates/logfwd-output/src/otlp_sink.rscrates/logfwd-output/src/parquet.rscrates/logfwd-output/src/sink.rscrates/logfwd-output/src/stdout.rscrates/logfwd-output/src/tcp_sink.rscrates/logfwd-output/src/udp_sink.rscrates/logfwd-test-utils/Cargo.tomlcrates/logfwd-test-utils/src/sinks.rscrates/logfwd/src/pipeline.rscrates/logfwd/tests/compliance.rs
| impl OnceAsyncFactory { | ||
| pub fn new(name: String, sink: Box<dyn Sink>) -> Self { |
There was a problem hiding this comment.
Document OnceAsyncFactory::new.
Lines 172-173 add a new public constructor without a /// doc comment. Please document it like the rest of the public output API.
Suggested fix
impl OnceAsyncFactory {
+ /// Create a single-use factory around one pre-built async sink.
pub fn new(name: String, sink: Box<dyn Sink>) -> Self {
OnceAsyncFactory {
name,
inner: Mutex::new(Some(sink)),
}As per coding guidelines, "All new public items must have /// doc comments matching CONFIG_REFERENCE.md where applicable".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl OnceAsyncFactory { | |
| pub fn new(name: String, sink: Box<dyn Sink>) -> Self { | |
| impl OnceAsyncFactory { | |
| /// Create a single-use factory around one pre-built async sink. | |
| pub fn new(name: String, sink: Box<dyn Sink>) -> Self { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/logfwd-output/src/sink.rs` around lines 172 - 173, Add a /// doc
comment for the public constructor OnceAsyncFactory::new describing what the
factory does, its parameters (name: String and sink: Box<dyn Sink>), and the
returned type/behavior (creates a OnceAsyncFactory that will forward to the
provided sink once asynchronously). Follow the project's CONFIG_REFERENCE.md doc
style used elsewhere in this module (brief description line, param annotations
for `name` and `sink`, and any panics/notes if applicable) and place the comment
directly above the impl/new signature to match the rest of the public output
API.
- Add doc comment to OnceAsyncFactory::new() explaining single-use semantics - Move tokio to workspace dependencies and reference from logfwd-test-utils - Change Duration::from_secs(30) to Duration::MAX in Pipeline::with_sink() to prevent premature worker expiration for single-use sinks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflict in sink.rs: keep both the Kani proofs added on master and the #[allow(deprecated)] annotation from the PR branch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
OutputSinktrait with#[deprecated]attribute inlib.rs#[allow(deprecated)]to all existingimpl OutputSinkblocks (production sinks, test helpers, FanOut, etc.)#[allow(deprecated)]to all references toOutputSinkin type positions (OnceFactory,build_output_sink,FanOut::new, etc.)OnceAsyncFactory— wraps a singleBox<dyn Sink>into aSinkFactorywithout going through sync adapterOnceAsyncFactorySinkas canonicallogfwd-test-utils/sinks.rsto implement asyncSinktrait (DevNullSink, SlowSink, FrozenSink, FailingSink, CountingSink)Pipeline::with_sinkmethod acceptingBox<dyn Sink>directlyPipeline::with_outputwith_sinkinstead ofwith_outputSink#[allow(deprecated)]cargo clippy -- -D warningspassescargo fmt --checkpassescargo check --workspace --testspassescargo testnot yet confirmed (was running when session ended)