fix: process batch RPC request in parallel#7093
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds ParallelBatchLayer middleware that executes JSON-RPC batch entries concurrently using ordered futures and a size-limited BatchResponseBuilder, exposes a max_response_body_size configuration, and wires the layer into the RPC server stack after MetricsLayer using a shared ServerConfig. ChangesParallel Batch Processing
Sequence DiagramsequenceDiagram
participant Client
participant ParallelBatchService
participant InnerService
participant BatchResponseBuilder
Client->>ParallelBatchService: send batch (array of entries)
ParallelBatchService->>InnerService: spawn per-entry tasks (bounded by semaphore)
InnerService-->>ParallelBatchService: per-entry MethodResponse | None (notification)
ParallelBatchService->>BatchResponseBuilder: append call responses in original order
ParallelBatchService-->>Client: return batch response or notification-only response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 58-85: The loop in parallel_batch_layer::batch schedules every
BatchEntry immediately via tasks (FuturesOrdered) which can create unbounded
in-flight work; add a configurable concurrency cap (e.g., a semaphore or a
bounded buffer) referenced from the struct (introduce a field like
concurrency_limit or semaphore) and acquire a permit before spawning each task
for self.service.call(...) or service.notification(...), releasing the permit
when the spawned future completes, so you still push the task into tasks
(FuturesOrdered) for response assembly but never exceed the configured in-flight
count; ensure Err branch (MethodResponse::error(...)) still gets pushed
immediately without consuming a permit if you prefer, or respect the same cap to
be consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0f6fea72-6328-4972-8676-b80d11aa4797
📒 Files selected for processing (2)
src/rpc/mod.rssrc/rpc/parallel_batch_layer.rs
51d0b77 to
e9030aa
Compare
e9030aa to
df7758b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/rpc/parallel_batch_layer.rs (1)
85-123:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftAvoid returning on
batch_rp.append(...)error whiletasks(FuturesOrdered) still owns in-flight RPC futures.
The earlyreturn errdropstasks, and dropping aFuturesOrderedcancels/drops all futures it still manages; since call/notification futures are running under the concurrency semaphore, some entries may be aborted mid-flight and only the overflow error is returned to the client. Drain/poll the remainingtasksuntil empty (ignoring/short-circuiting furtherappends as needed) before returning the error.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/rpc/parallel_batch_layer.rs` around lines 85 - 123, The loop currently returns early when batch_rp.append(...) fails, which drops tasks (the FuturesOrdered) and cancels in-flight call/notification futures; instead, capture the append error into a local Option<ErrType> (or a flag) when MethodResponse append fails, but do not return immediately—continue polling/draining tasks (the variable tasks) until it is exhausted so semaphore-backed futures (service.call/service.notification) can complete or be polled to completion, and after the while loop returns the captured error if any; update the async block containing tasks.next().await to implement this drain-and-lazy-error-return behavior while still creating the error MethodResponse via MethodResponse::error(...) for Err(BatchEntry) cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 67-80: The semaphore is being created inside batch() so each
request gets a fresh limiter; move the limiter out to be a shared field on the
long-lived type (e.g., add a field like semaphore: Arc<Semaphore> on
ParallelBatchLayer or ParallelBatchService and initialize it once using
MAX_CONCURRENCY / MAX_CONCURRENCY_ENV during construction) and then clone that
Arc inside batch() instead of creating a new Semaphore there; alternatively, if
the intent is per-batch limiting, rename the local variable and
MAX_CONCURRENCY_ENV to make that explicit (e.g., per-batch max) so callers know
it is not global.
---
Outside diff comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 85-123: The loop currently returns early when batch_rp.append(...)
fails, which drops tasks (the FuturesOrdered) and cancels in-flight
call/notification futures; instead, capture the append error into a local
Option<ErrType> (or a flag) when MethodResponse append fails, but do not return
immediately—continue polling/draining tasks (the variable tasks) until it is
exhausted so semaphore-backed futures (service.call/service.notification) can
complete or be polled to completion, and after the while loop returns the
captured error if any; update the async block containing tasks.next().await to
implement this drain-and-lazy-error-return behavior while still creating the
error MethodResponse via MethodResponse::error(...) for Err(BatchEntry) cases.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a192deef-805b-4c77-97cf-25c1bd52f98b
📒 Files selected for processing (3)
CHANGELOG.mddocs/docs/users/reference/env_variables.mdsrc/rpc/parallel_batch_layer.rs
✅ Files skipped from review due to trivial changes (2)
- docs/docs/users/reference/env_variables.md
- CHANGELOG.md
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 24 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
87347a6 to
d274cea
Compare
Summary of changes
Changes introduced in this pull request:
Reference issue to close (if applicable)
Closes #7092
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
New Features
Refactor
Documentation
Chores