Skip to content

parallelize write requests in upsert ranges#150

Merged
liunyl merged 2 commits intomainfrom
upsert_ranges
Nov 21, 2025
Merged

parallelize write requests in upsert ranges#150
liunyl merged 2 commits intomainfrom
upsert_ranges

Conversation

@liunyl
Copy link
Copy Markdown
Contributor

@liunyl liunyl commented Nov 21, 2025

Summary by CodeRabbit

  • Refactor
    • Reworked range update pipeline into a planning + batched-dispatch flow with coordinated metadata batching to improve throughput and concurrency control.
    • Updated upsert flow to prepare, batch and concurrently flush range data and metadata, reducing per-item overhead and improving reliability and end-to-end synchronization of range writes.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Nov 21, 2025

Walkthrough

Plans and batches range-slice writes and range metadata separately. Adds RangeSliceBatchPlan, RangeMetadataAccumulator, Prepare/Dispatch helpers, and refactors UpdateRangeSlices/UpsertRanges to prepare plans, enqueue metadata, dispatch slice and metadata batches concurrently, and synchronize completion with per-path error propagation.

Changes

Cohort / File(s) Summary
Range batching scaffolding (API/Types)
data_store_service_client.h
Adds RangeSliceBatchPlan, RangeMetadataRecord, RangeMetadataAccumulator, forward-decl SyncConcurrentRequest, and declarations for PrepareRangeSliceBatches, DispatchRangeSliceBatches, EnqueueRangeMetadataRecord, DispatchRangeMetadataBatches (with configurable max batch size).
Range batching implementation (client)
data_store_service_client.cpp
Implements plan construction for range slices, accumulates range metadata into per-table/partition accumulator, dispatches metadata and slice batches with per-batch concurrency control, tightens synchronization, and refactors UpdateRangeSlices/UpsertRanges to use the new planning + batched-dispatch pipeline.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant DSC as DataStoreServiceClient
  participant Plan as RangeSliceBatchPlan
  participant MetaAcc as RangeMetadataAccumulator
  participant SCR as SyncConcurrentRequest
  participant KV as KV Backend

  Caller->>DSC: UpsertRanges(ranges, version...)
  rect rgba(230,245,255,0.6)
    Note over DSC: Planning & accumulation
    DSC->>DSC: PrepareRangeSliceBatches(...) → Plan
    DSC->>DSC: EnqueueRangeMetadataRecord(...) → MetaAcc
  end

  par Dispatch in parallel
    rect rgba(245,255,230,0.6)
      Note over DSC,SCR: Dispatch metadata batches
      DSC->>SCR: DispatchRangeMetadataBatches(kv_table, MetaAcc, SCR)
      SCR->>KV: write metadata batches
      KV-->>SCR: batch results
    end
  and
    rect rgba(245,255,230,0.6)
      Note over DSC,SCR: Dispatch slice batches
      DSC->>SCR: DispatchRangeSliceBatches(kv_table, partition, version, Plan, SCR)
      SCR->>KV: write slice batches
      KV-->>SCR: batch results
    end
  end

  rect rgba(240,240,255,0.6)
    Note over DSC,SCR: Synchronize & validate
    DSC-->>DSC: Wait for both SCR results
    alt any error
      DSC-->>Caller: propagate error / abort
    else success
      DSC-->>Caller: flush range data / return success
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Inspect new public types and header-level declarations for ABI and usage.
  • Validate concurrency model and error propagation in DispatchRangeSliceBatches and DispatchRangeMetadataBatches.
  • Review refactor sites (UpdateRangeSlices, UpsertRanges) for correct plan assembly, accumulator usage, and final flush semantics.

Possibly related PRs

Suggested reviewers

  • lzxddz

Poem

I stack my carrots into neat little bands,
Batching slices with careful hands.
Metadata hops into its row,
Concurrently we flourish, steady we grow.
Hooray — the meadow writes and stands. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'parallelize write requests in upsert ranges' directly reflects the main change: introducing batching and parallel dispatch of range slice and metadata writes in the UpsertRanges flow.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch upsert_ranges

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

1552-1635: Wait for slice writes before publishing range metadata

DispatchRangeMetadataBatches is triggered while slice batches are still outstanding. Because BatchWriteRecords completes asynchronously, the metadata PUT into table_ranges can commit before the corresponding slice records land in table_range_slices. Readers like FetchRangeSlices will observe the new segment_cnt and range_version immediately and begin fetching segments that do not exist yet, breaking range reconstruction. Please defer the metadata dispatch (or at least the wait on its SyncConcurrentRequest) until after slice_sync_concurrent has drained so metadata only becomes visible once all slice data is durable.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e6949e and 3c5d93b.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (5 hunks)
  • data_store_service_client.h (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client.h
  • data_store_service_client.cpp
🧬 Code graph analysis (2)
data_store_service_client.h (2)
data_store_service_client_closure.h (15)
  • string (810-815)
  • string (2153-2158)
  • vector (773-776)
  • vector (1083-1086)
  • vector (1126-1133)
  • vector (2233-2236)
  • string_view (311-368)
  • string_view (763-766)
  • string_view (798-803)
  • string_view (1296-1299)
  • string_view (1532-1535)
  • string_view (2113-2116)
  • string_view (2118-2121)
  • string_view (2221-2226)
  • string_view (2432-2435)
data_store_service_client.cpp (8)
  • PrepareRangeSliceBatches (1262-1334)
  • PrepareRangeSliceBatches (1262-1266)
  • DispatchRangeSliceBatches (1336-1383)
  • DispatchRangeSliceBatches (1336-1341)
  • EnqueueRangeMetadataRecord (1385-1413)
  • EnqueueRangeMetadataRecord (1385-1393)
  • DispatchRangeMetadataBatches (1415-1532)
  • DispatchRangeMetadataBatches (1415-1419)
data_store_service_client.cpp (3)
eloq_data_store_service/data_store_service.cpp (4)
  • BatchWriteRecords (881-929)
  • BatchWriteRecords (881-885)
  • BatchWriteRecords (1144-1208)
  • BatchWriteRecords (1144-1156)
data_store_service_client_closure.cpp (2)
  • SyncConcurrentRequestCallback (487-494)
  • SyncConcurrentRequestCallback (487-490)
data_store_service_client.h (1)
  • KvPartitionIdOf (697-705)

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
data_store_service_client.cpp (3)

1282-1283: Use a named constant for segment size limit.

The hardcoded 1024 * 1024 (1 MB) should be a named constant like MAX_RANGE_SLICE_SEGMENT_SIZE for maintainability. Additionally, the capacity estimation at lines 1275-1276 (slices.size() / 10 + 1) is a rough heuristic that could lead to vector reallocations if many slices result in more segments. Consider basing the estimate on total slice data size if available.

+    static constexpr size_t MAX_RANGE_SLICE_SEGMENT_SIZE = 1024 * 1024;  // 1 MB
+
     std::string segment_key =
         EncodeRangeSliceKey(table_name, partition_id, plan.segment_cnt);
     std::string segment_record;
     size_t batch_size = segment_key.size() + sizeof(uint64_t);
-    size_t max_segment_size = 1024 * 1024;  // 1 MB
+    size_t max_segment_size = MAX_RANGE_SLICE_SEGMENT_SIZE;

1415-1429: Clarify the role of the kv_table_name parameter.

The logic at lines 1426-1428 suggests kv_table_name is an optional override, but the usage pattern is unclear:

std::string_view target_table_name = kv_table_name.empty() ? kv_table_name_str : kv_table_name;

Since the accumulator already groups records by (table_name, partition_id), this parameter appears redundant. Consider either removing it or documenting when and why it should override the accumulator's table name.


1449-1455: Batch size limit can be exceeded for oversized records.

The overhead estimate (20 bytes) is approximate, and the batching logic allows a single record to form its own batch even if it exceeds max_batch_size. While this prevents records from being dropped, it could result in batches larger than 64 MB being sent. If the downstream BatchWriteRecords or storage layer enforces strict size limits, add validation or consider splitting oversized records.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c5d93b and 7dfd6c3.

📒 Files selected for processing (1)
  • data_store_service_client.cpp (5 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client.cpp
🧬 Code graph analysis (1)
data_store_service_client.cpp (6)
dynamo_handler.cpp (4)
  • table_name (2824-2824)
  • table_name (4408-4409)
  • UpdateRangeSlices (3285-3296)
  • UpdateRangeSlices (3285-3291)
eloq_data_store_service/data_store_service.cpp (4)
  • BatchWriteRecords (881-929)
  • BatchWriteRecords (881-885)
  • BatchWriteRecords (1144-1208)
  • BatchWriteRecords (1144-1156)
data_store_service_client_closure.cpp (2)
  • SyncConcurrentRequestCallback (487-494)
  • SyncConcurrentRequestCallback (487-490)
data_store_service_client.h (1)
  • KvPartitionIdOf (697-705)
rocksdb_handler.cpp (2)
  • UpdateRangeSlices (1443-1455)
  • UpdateRangeSlices (1443-1449)
bigtable_handler.cpp (2)
  • UpdateRangeSlices (1409-1469)
  • UpdateRangeSlices (1409-1415)
🔇 Additional comments (3)
data_store_service_client.cpp (3)

1563-1637: LGTM: Ordering issue from previous review is resolved.

The refactored UpdateRangeSlices now correctly dispatches and waits for all slice batches to complete (lines 1572-1594) before enqueuing and dispatching metadata batches (lines 1598-1634). This ensures range metadata in table_ranges is only visible after the associated slice rows in table_range_slices have been successfully written, preventing consumers from failing to load referenced segments.

Based on past review comments, which flagged metadata-before-slices ordering as a critical issue.


1681-1689: Verify the range_version parameter usage.

Line 1685 passes version as the range_version parameter with the comment "using version for now." If range_version should semantically differ from the general version (e.g., to track per-range schema changes independently), this may require a follow-up. Confirm whether using the same value for both is correct or if separate versioning is needed.


1668-1771: LGTM: Parallelization correctly sequences slice and metadata writes.

UpsertRanges now batches slice preparation and metadata accumulation across all ranges (lines 1668-1689), dispatches slice batches concurrently (lines 1691-1705), waits for slice completion (lines 1707-1722), and only then dispatches metadata batches (lines 1724-1752). The flush at lines 1754-1768 ensures persistence. This ordering is correct and aligns with the fix for the previous ordering issue.

Based on past review comments and learnings about partition-level concurrency management.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

1292-1363: Fix segment size accounting in PrepareRangeSliceBatches

The batch_size accounting is incorrect and allows segments to exceed max_segment_size:

  1. Missing slice accounting: The code appends sizeof(uint32_t) (slice_size field) + slice_size bytes of data, but batch_size only tracks sizeof(uint32_t) (key_size field) + key_size bytes. The slice bytes are completely unaccounted for.

  2. Post-rollover drift: After flushing a segment at line 1330, batch_size is reset but the loop immediately continues, appending the current slice without re-applying its full size contribution. This compounds over multiple slices.

  3. Empty ranges edge case: When slices.empty(), the code still emits one segment containing only the version header (8 bytes), setting segment_cnt = 1. If the format previously signaled empty ranges with segment_cnt == 0 and no rows in table_range_slices, this subtly changes encoding semantics.

The suggested fix correctly computes record_total_size (all bytes per slice) once and applies it consistently:

@@
-    for (size_t i = 0; i < slices.size(); ++i)
+    for (size_t i = 0; i < slices.size(); ++i)
     {
         txservice::TxKey slice_start_key = slices[i]->StartTxKey();
@@
-        uint32_t key_size = static_cast<uint32_t>(slice_start_key.Size());
-        batch_size += sizeof(uint32_t);
-        batch_size += key_size;
-
-        if (batch_size >= max_segment_size)
+        uint32_t key_size = static_cast<uint32_t>(slice_start_key.Size());
+        uint32_t slice_size = static_cast<uint32_t>(slices[i]->Size());
+
+        // key_size field + key bytes + slice_size field + slice bytes
+        size_t record_total_size =
+            sizeof(uint32_t) + key_size + sizeof(uint32_t) + slice_size;
+
+        // If adding this record would overflow the segment and we already
+        // have at least one record in it, flush current segment first.
+        if (batch_size + record_total_size >= max_segment_size &&
+            !segment_record.empty())
         {
             plan.segment_keys.emplace_back(std::move(segment_key));
             plan.segment_records.emplace_back(std::move(segment_record));
@@
-            segment_key =
-                EncodeRangeSliceKey(table_name, partition_id, plan.segment_cnt);
-            batch_size = segment_key.size();
-
-            segment_record.clear();
-            segment_record.reserve(max_segment_size - segment_key.size());
-            segment_record.append(reinterpret_cast<const char *>(&version),
-                                  sizeof(uint64_t));
-            batch_size += sizeof(uint64_t);
+            segment_key = EncodeRangeSliceKey(
+                table_name, partition_id, plan.segment_cnt);
+            segment_record.clear();
+            segment_record.reserve(max_segment_size - segment_key.size());
+            batch_size = segment_key.size();
+            segment_record.append(reinterpret_cast<const char *>(&version),
+                                  sizeof(uint64_t));
+            batch_size += sizeof(uint64_t);
         }
 
         segment_record.append(reinterpret_cast<const char *>(&key_size),
                               sizeof(uint32_t));
         segment_record.append(slice_start_key.Data(), key_size);
-        uint32_t slice_size = static_cast<uint32_t>(slices[i]->Size());
         segment_record.append(reinterpret_cast<const char *>(&slice_size),
                               sizeof(uint32_t));
+        batch_size += record_total_size;
     }

Also confirm that readers tolerate empty ranges encoded as a single version-only segment (if behavior changed from previously skipping empty ranges entirely).

♻️ Duplicate comments (1)
data_store_service_client.cpp (1)

1569-1669: UpdateRangeSlices now enforces slices-before-metadata ordering

The refactored UpdateRangeSlices:

  • builds a RangeSliceBatchPlan,
  • dispatches all slice batches to kv_range_slices_table_name under a SyncConcurrentRequest,
  • waits for all slice writes to complete and checks their result, then
  • enqueues and dispatches the single range-metadata record and waits for it to finish.

This guarantees that range metadata in table_ranges is only written after all referenced slice segments are durably recorded, addressing the earlier “metadata visible before slices” issue from the previous review. The separation of slice_sync_concurrent and meta_sync_concurrent also keeps error reporting clear per path.

🧹 Nitpick comments (4)
data_store_service_client.cpp (2)

1417-1445: metadata accumulator key structure is slightly misleadingly named

EnqueueRangeMetadataRecord stores entries into RangeMetadataAccumulator::records_by_table_partition keyed by (kv_table_name, kv_partition_id), but here kv_table_name is actually the base table name (used only to derive the partition id), while the actual KV table written later is kv_range_table_name passed to DispatchRangeMetadataBatches.

This is fine functionally—the partition id is what matters—but the name kv_table_name on this path is a bit confusing. If you touch this again, consider a more explicit name like source_table_name or at least updating the comment on records_by_table_partition to reflect that the first component isn’t necessarily the real KV table name written.


1447-1567: DispatchRangeMetadataBatches batching strategy is correct; small optional tweaks

The batching logic correctly:

  • groups by (kv_table_name, kv_partition_id) so each batch is single‑partition,
  • caps batch size using an approximate per-record overhead and max_batch_size, and
  • uses SyncConcurrentRequest with the same flow-control pattern as slices.

Two minor, deferrable refinements you might consider:

  1. Avoid redundant reserve after std::move
    After moving the vectors into BatchWriteRecords, clear() followed by reserve(records_vec.size()) on each rollover is usually unnecessary; the moved-from vector typically retains its capacity. You can just clear() and rely on the existing capacity for the next batch to reduce allocator churn.

  2. Allow caller‑specific table names more explicitly
    Currently kv_table_name parameter overrides kv_table_name_str when non-empty. If you anticipate reusing RangeMetadataAccumulator in contexts where the map’s first key really does match the destination KV table, it may be clearer to either:

    • drop the kv_table_name parameter and always trust the map key, or
    • assert that kv_table_name.empty() || kv_table_name == kv_table_name_str to guard against accidental mismatches.

Neither affects correctness today, so this can stay as-is if you prefer.

data_store_service_client.h (2)

62-76: RangeSliceBatchPlan helper is reasonable; Clear enables reuse

The RangeSliceBatchPlan struct cleanly encapsulates per-plan state (segment_cnt plus owned key/record buffers). The Clear() method resets all members and allows potential reuse from callers if you ever choose to store these in a pool or a vector. Current .cpp usage is consistent with this layout.


78-96: RangeMetadataRecord/RangeMetadataAccumulator align with dispatch implementation

The accumulator design (map from (string,int32_t) to vector<RangeMetadataRecord>) matches how DispatchRangeMetadataBatches groups and dispatches writes. Clear() is simple and adequate.

One minor nit: given the actual usage in .cpp, the comment

 // Key: (kv_table_name, kv_partition_id) as string pair

can be slightly misleading because the first component is the logical table name used to derive kv_partition_id, not always the concrete destination KV table name; but this is only documentation and doesn’t affect behavior.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7dfd6c3 and 8cdbbaf.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (6 hunks)
  • data_store_service_client.h (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
Repo: eloqdata/store_handler PR: 83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client.cpp
  • data_store_service_client.h
🧬 Code graph analysis (2)
data_store_service_client.cpp (5)
eloq_data_store_service/data_store_service_config.cpp (2)
  • GetShardIdByPartitionId (816-821)
  • GetShardIdByPartitionId (816-817)
data_store_service_client_closure.cpp (3)
  • lk (1556-1556)
  • SyncConcurrentRequestCallback (484-491)
  • SyncConcurrentRequestCallback (484-487)
eloq_data_store_service/data_store_service.cpp (19)
  • lk (91-91)
  • lk (105-105)
  • lk (116-116)
  • lk (134-134)
  • lk (162-162)
  • lk (168-168)
  • lk (174-174)
  • lk (186-186)
  • lk (193-193)
  • lk (204-204)
  • lk (248-248)
  • lk (1670-1670)
  • lk (1705-1705)
  • lk (2224-2224)
  • lk (2284-2284)
  • BatchWriteRecords (900-948)
  • BatchWriteRecords (900-904)
  • BatchWriteRecords (1181-1246)
  • BatchWriteRecords (1181-1194)
eloq_data_store_service/rocksdb_data_store_common.cpp (3)
  • BatchWriteRecords (499-661)
  • BatchWriteRecords (499-500)
  • key (852-852)
data_store_service_client.h (1)
  • KvPartitionIdOf (700-705)
data_store_service_client.h (1)
data_store_service_client.cpp (8)
  • PrepareRangeSliceBatches (1292-1364)
  • PrepareRangeSliceBatches (1292-1296)
  • DispatchRangeSliceBatches (1366-1415)
  • DispatchRangeSliceBatches (1366-1371)
  • EnqueueRangeMetadataRecord (1417-1445)
  • EnqueueRangeMetadataRecord (1417-1425)
  • DispatchRangeMetadataBatches (1447-1567)
  • DispatchRangeMetadataBatches (1447-1451)
🔇 Additional comments (4)
data_store_service_client.cpp (2)

1366-1415: concurrency and lifetime in DispatchRangeSliceBatches look sound

The pattern of incrementing unfinished_request_cnt_ under the mutex before each BatchWriteRecords and then waiting on SyncConcurrentRequest in the caller until it drops to zero ensures a bounded number of in‑flight slice batches and avoids premature scope exit while plan.segment_* storage is still referenced via std::string_view. This is consistent with existing SyncConcurrentRequest usage such as PutArchivesAll and keeps slice writes partition‑local with global concurrency control.


1686-1785: multi-range UpsertRanges orchestration looks correct; double-check range_version semantics

The new UpsertRanges flow:

  1. Validates table_name and returns early on empty range_info.
  2. First pass:
    • builds a RangeSliceBatchPlan per range, and
    • enqueues corresponding metadata records into a shared RangeMetadataAccumulator.
  3. Uses a single SyncConcurrentRequest to dispatch all slice batches for all ranges to kv_range_slices_table_name, waits for completion, and aborts on any slice error.
  4. Uses another SyncConcurrentRequest to dispatch all accumulated metadata batches to kv_range_table_name, waits for completion, and aborts on error.
  5. Flushes kv_range_table_name via FlushData only after both paths succeed.

This preserves the invariant that table_ranges metadata is published only after all slice rows for all affected ranges are durably written, while still parallelizing slice writes across ranges. Lifetime of the slice_plans and meta_acc buffers is also safe because you wait on both SyncConcurrentRequests before leaving the function.

One thing worth double-checking: in the per-range enqueue, you pass

EnqueueRangeMetadataRecord(
    catalog_factory,
    table_name,
    range.start_key_,
    range.partition_id_,
    version,  // range_version (using version for now)
    version,
    slice_plans.back().segment_cnt,
    meta_acc);

If SplitRangeInfo conceptually carries a distinct range_version separate from the global version argument, you might want to thread that through and pass it instead of reusing version twice; otherwise the on-disk metadata will not distinguish between range-version bumps and global version bumps. If no such separate range-version exists in your model, this is fine as-is.

data_store_service_client.h (2)

50-51: forward declaration of SyncConcurrentRequest is appropriate

Adding struct SyncConcurrentRequest; here cleanly breaks the dependency on the closure header while still allowing the new helpers to accept a pointer. Signatures in the .cpp match this declaration.


587-620: new private helpers are well-scoped and match definitions

The declarations for:

  • PrepareRangeSliceBatches(...)
  • DispatchRangeSliceBatches(...)
  • EnqueueRangeMetadataRecord(...)
  • DispatchRangeMetadataBatches(...)

match the cpp definitions and keep the range batching concerns encapsulated inside DataStoreServiceClient. Defaulting max_batch_size to 64 * 1024 * 1024 in the header keeps the behavior discoverable at the call site.

No interface issues spotted here.

@liunyl liunyl self-assigned this Nov 21, 2025
@liunyl liunyl merged commit c70ebbf into main Nov 21, 2025
1 check passed
@liunyl liunyl deleted the upsert_ranges branch November 21, 2025 10:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants