Skip to content

put slices in different ranges into a single batch write#156

Merged
liunyl merged 1 commit intomainfrom
upsert_ranges
Nov 24, 2025
Merged

put slices in different ranges into a single batch write#156
liunyl merged 1 commit intomainfrom
upsert_ranges

Conversation

@liunyl
Copy link
Copy Markdown
Contributor

@liunyl liunyl commented Nov 24, 2025

Summary by CodeRabbit

  • Performance
    • Cross-plan batching for range-slice writes: segments from multiple plans are merged into larger batched writes, reducing RPC calls and improving throughput and latency.
  • Behavior
    • Write dispatch now sends larger, consolidated batches with per-batch concurrency control, decreasing overhead for multi-range updates.

✏️ Tip: You can customize this high-level summary in your review settings.

@liunyl
Copy link
Copy Markdown
Contributor Author

liunyl commented Nov 24, 2025

@CodeRabbit review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Nov 24, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Nov 24, 2025

Warning

Rate limit exceeded

@liunyl has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 55 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 79c1b96 and 5f1b729.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (5 hunks)
  • data_store_service_client.h (1 hunks)

Walkthrough

DispatchRangeSliceBatches now accepts a vector of RangeSliceBatchPlan objects and merges segments across all plans into batched BatchWriteRecords calls, dispatching when batch size thresholds or finalization conditions are met; callers were updated to pass vectors instead of single plans.

Changes

Cohort / File(s) Summary
Method signature update
data_store_service_client.h
Updated DispatchRangeSliceBatches parameter from a single RangeSliceBatchPlan reference to const std::vector<RangeSliceBatchPlan> &plans, changing the public API to accept multiple plans
Cross-plan batching implementation & call-site updates
data_store_service_client.cpp
Reworked DispatchRangeSliceBatches to pre-reserve capacity, iterate over multiple plans, accumulate segments across plans into batched key/record vectors, dispatch BatchWriteRecords when reaching MAX_WRITE_BATCH_SIZE or at the end, and added per-batch concurrencyWait around unfinished_request_cnt_. Updated UpdateRangeSlices and UpsertRanges to build and pass std::vector<RangeSliceBatchPlan> instead of calling per-plan.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Dispatch
    participant Accumulator
    participant RPC
    Note over Caller,Dispatch: New flow — multi-plan aggregation
    Caller->>Dispatch: call(plans: vector<RangeSliceBatchPlan>)
    loop for each plan in plans
        Dispatch->>Accumulator: append plan.segments
        alt batch size >= MAX_WRITE_BATCH_SIZE
            Accumulator->>RPC: BatchWriteRecords(merged batch)
            RPC-->>Dispatch: ack / async completion
        end
    end
    alt remaining segments
        Accumulator->>RPC: BatchWriteRecords(final batch)
        RPC-->>Dispatch: ack / async completion
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20–30 minutes

  • Inspect batching accumulation correctness (cross-plan merging), reserve/size calculations, and boundary conditions when batching across plan boundaries.
  • Verify concurrency handling around unfinished_request_cnt_ and that waits/retries don't deadlock.
  • Confirm updated call sites (UpdateRangeSlices, UpsertRanges) construct vectors correctly and preserve previous semantics.

Possibly related PRs

Suggested reviewers

  • lzxddz

Poem

🐰 I hopped through plans, one, two, and three,

Merged all the slices to sail batch-free,
Fewer calls in a single sprint,
My paws stacked segments—what a hint!
🥕👏

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: enabling cross-plan batch aggregation to combine slices from multiple ranges into a single batch write operation.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

1693-1707: Use-after-free risk: passing a temporary std::vector<RangeSliceBatchPlan> into DispatchRangeSliceBatches.

Here:

auto slice_plan =
    PrepareRangeSliceBatches(table_name, version, slices, partition_id);

SyncConcurrentRequest *slice_sync_concurrent =
    sync_concurrent_request_pool_.NextObject();
PoolableGuard slice_guard(slice_sync_concurrent);
slice_sync_concurrent->Reset();
DispatchRangeSliceBatches(kv_range_slices_table_name,
                          KvPartitionIdOf(table_name),
                          version,
                          std::vector<RangeSliceBatchPlan>{std::move(slice_plan)},
                          slice_sync_concurrent);

DispatchRangeSliceBatches builds std::vector<std::string_view> pointing into RangeSliceBatchPlan::segment_keys/segment_records. Because you construct a temporary std::vector<RangeSliceBatchPlan> and pass it by const reference, that temporary (and the RangeSliceBatchPlan it contains) is destroyed immediately after DispatchRangeSliceBatches returns, while the async BatchWriteRecords closures are still in flight. The string_views in those closures then dangle → undefined behavior / use-after-free.

UpsertRanges avoids this by keeping slice_plans alive until after waiting for unfinished_request_cnt_ == 0. UpdateRangeSlices needs the same pattern.

Suggested fix: materialize a local std::vector<RangeSliceBatchPlan> that outlives the async writes and pass it by reference, without moving it into a temporary:

@@ bool DataStoreServiceClient::UpdateRangeSlices(
-    // 1- Prepare slice batches
-    auto slice_plan =
-        PrepareRangeSliceBatches(table_name, version, slices, partition_id);
+    // 1- Prepare slice batches
+    auto slice_plan =
+        PrepareRangeSliceBatches(table_name, version, slices, partition_id);
+    // Wrap in a local vector so the plan's owned buffers stay alive
+    // until all async writes complete.
+    std::vector<RangeSliceBatchPlan> slice_plans;
+    slice_plans.emplace_back(std::move(slice_plan));
@@
-    slice_sync_concurrent->Reset();
-    DispatchRangeSliceBatches(kv_range_slices_table_name,
-                              KvPartitionIdOf(table_name),
-                              version,
-                              std::vector<RangeSliceBatchPlan>{std::move(slice_plan)},
-                              slice_sync_concurrent);
+    slice_sync_concurrent->Reset();
+    DispatchRangeSliceBatches(kv_range_slices_table_name,
+                              KvPartitionIdOf(table_name),
+                              version,
+                              slice_plans,
+                              slice_sync_concurrent);

This mirrors the safe lifetime pattern used in UpsertRanges and ensures all segment_keys / segment_records storage remains valid until after the wait loop completes.

🧹 Nitpick comments (1)
data_store_service_client.cpp (1)

1374-1509: Cross-plan DispatchRangeSliceBatches batching and concurrency logic look sound.

The function correctly:

  • Accumulates total segment count to reserve vector capacity once.
  • Uses write_batch_size plus an overhead estimate to respect MAX_WRITE_BATCH_SIZE.
  • Applies the same SyncConcurrentRequest pattern used elsewhere: it blocks when unfinished_request_cnt_ reaches max_flying_write_count, increments the counter per BatchWriteRecords call, and leaves coordination to the shared callback + the caller’s all_request_started_ / wait loop.
  • Reuses the batch vectors after each dispatch via clear() + reserve(total_segments), which is cheap once capacity is established.

Given that it only captures std::string_views into the RangeSliceBatchPlan storage, this function assumes callers keep all plans[i].segment_keys/segment_records alive until all async writes complete; that contract is honored in UpsertRanges but currently violated in UpdateRangeSlices (see separate comment).

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a322a76 and 737969f.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (4 hunks)
  • data_store_service_client.h (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client.cpp
🧬 Code graph analysis (1)
data_store_service_client.cpp (5)
eloq_data_store_service/data_store_service_config.cpp (2)
  • GetShardIdByPartitionId (816-821)
  • GetShardIdByPartitionId (816-817)
data_store_service_client_closure.cpp (2)
  • SyncConcurrentRequestCallback (484-491)
  • SyncConcurrentRequestCallback (484-487)
eloq_data_store_service/data_store_service.cpp (4)
  • BatchWriteRecords (900-948)
  • BatchWriteRecords (900-904)
  • BatchWriteRecords (1181-1246)
  • BatchWriteRecords (1181-1194)
eloq_data_store_service/eloq_store_data_store.cpp (2)
  • BatchWriteRecords (166-229)
  • BatchWriteRecords (166-166)
eloq_data_store_service/rocksdb_data_store_common.cpp (2)
  • BatchWriteRecords (499-661)
  • BatchWriteRecords (499-500)
🔇 Additional comments (2)
data_store_service_client.h (1)

590-600: Multi-plan DispatchRangeSliceBatches signature looks consistent.

The updated declaration to accept const std::vector<RangeSliceBatchPlan> &plans matches the implementation and enables the intended cross-plan batching. Just note that callers must ensure RangeSliceBatchPlan instances outlive the async writes (see cpp comments).

data_store_service_client.cpp (1)

1819-1832: UpsertRanges multi-plan dispatch wiring is correct and lifetime-safe.

slice_plans is built once, passed by const reference to DispatchRangeSliceBatches, and kept alive until after:

while (slice_sync_concurrent->unfinished_request_cnt_ != 0) {
    slice_sync_concurrent->cv_.wait(lk);
}

returns, so the string_views inside BatchWriteRecords closures always reference valid segment_keys / segment_records. Using a shared SyncConcurrentRequest for all ranges also correctly enforces the global max-flying-write cap across all slice batches.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

1693-1707: Avoid reading from moved‑from slice_plan in UpdateRangeSlices

Here:

auto slice_plan =
    PrepareRangeSliceBatches(table_name, version, slices, partition_id);
// ...
std::vector<RangeSliceBatchPlan> slice_plans;
slice_plans.emplace_back(std::move(slice_plan));
// ...
EnqueueRangeMetadataRecord(...,
                           slice_plan.segment_cnt,
                           meta_acc);

slice_plan has been moved into slice_plans, so relying on slice_plan.segment_cnt afterward is confusing and (in strict C++ terms) relies on unspecified moved-from state, even though current implementations will effectively copy the uint32_t.

Use the moved‑to object (or cache the count before moving) instead. For example:

-    std::vector<RangeSliceBatchPlan> slice_plans;
-    slice_plans.emplace_back(std::move(slice_plan));
+    std::vector<RangeSliceBatchPlan> slice_plans;
+    slice_plans.emplace_back(std::move(slice_plan));
+    const uint32_t segment_cnt = slice_plans.back().segment_cnt;

    DispatchRangeSliceBatches(kv_range_slices_table_name,
                              KvPartitionIdOf(table_name),
                              version,
                              slice_plans,
                              slice_sync_concurrent);
    // ...
    EnqueueRangeMetadataRecord(catalog_factory,
                               table_name,
                               range_start_key,
                               partition_id,
                               range_version,
                               version,
-                              slice_plan.segment_cnt,
+                              segment_cnt,
                               meta_acc);

This keeps the intent clear and avoids depending on moved‑from object state.

Also applies to: 1728-1737

🧹 Nitpick comments (2)
data_store_service_client.h (1)

596-600: Private API change to accept multiple RangeSliceBatchPlan objects looks consistent

The switch to const std::vector<RangeSliceBatchPlan> &plans matches the new multi-plan batching in UpdateRangeSlices and UpsertRanges, and since this is a private helper there’s no external ABI risk. You might optionally add a brief note in the comment (in the .cpp) that callers must ensure all plans target the same KV table/partition/version, as currently enforced only by convention, not assertions.

data_store_service_client.cpp (1)

1374-1510: DispatchRangeSliceBatches batching and concurrency look correct; consider a couple of small cleanups

The new implementation correctly:

  • Aggregates segments from all plans into batches bounded by MAX_WRITE_BATCH_SIZE (based on key+record size plus per-record overhead).
  • Uses SyncConcurrentRequest’s unfinished_request_cnt_ and condition variable in the same pattern as PutArchivesAll/DispatchRangeMetadataBatches, so global in‑flight write limits remain honored.
  • Holds plans (and their backing std::strings) alive in the caller until all RPCs complete, so the std::string_view keys/records passed to BatchWriteRecords remain valid.

Two minor polish points you might consider:

  • Instead of the magic constexpr size_t overhead_per_segment = 20;, compute it from the actual field sizes for clarity and future safety, e.g.:
constexpr size_t overhead_per_segment =
    sizeof(uint64_t)   // records_ts
  + sizeof(uint64_t)   // records_ttl
  + sizeof(WriteOpType);
  • After each dispatch you clear() and then reserve(total_segments) again. Since capacity is already at least total_segments after the first reserve, the subsequent reserve calls are effectively no‑ops; you could drop them for readability, or (if you ever want tighter bounds) reserve based on an estimated max segments per batch instead. This is cosmetic, not correctness-related.

Overall, the logic and concurrency behavior look solid.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 737969f and 79c1b96.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (4 hunks)
  • data_store_service_client.h (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client.cpp
🧬 Code graph analysis (1)
data_store_service_client.cpp (4)
data_store_service_client_closure.cpp (2)
  • SyncConcurrentRequestCallback (484-491)
  • SyncConcurrentRequestCallback (484-487)
eloq_data_store_service/data_store_service.cpp (4)
  • BatchWriteRecords (900-948)
  • BatchWriteRecords (900-904)
  • BatchWriteRecords (1181-1246)
  • BatchWriteRecords (1181-1194)
eloq_data_store_service/eloq_store_data_store.cpp (2)
  • BatchWriteRecords (166-229)
  • BatchWriteRecords (166-166)
data_store_service_client.h (1)
  • KvPartitionIdOf (700-705)
🔇 Additional comments (1)
data_store_service_client.cpp (1)

1828-1833: Multi-plan DispatchRangeSliceBatches call in UpsertRanges is consistent and improves batching

The new single call:

DispatchRangeSliceBatches(kv_range_slices_table_name,
                          kv_partition_id,
                          version,
                          slice_plans,
                          slice_sync_concurrent);

correctly reuses the shared slice_sync_concurrent, respects the table-wide kv_partition_id, and allows segments from different ranges to be merged into fewer BatchWriteRecords RPCs without changing the existing wait/finish pattern below. This matches the intended PR goal of cross-range batching.

@liunyl liunyl merged commit 50a1320 into main Nov 24, 2025
1 check passed
@liunyl liunyl deleted the upsert_ranges branch November 24, 2025 08:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants