Skip to content

Send out batchwrite req concurrenctly and wait all at once#83

Merged
liunyl merged 9 commits intomainfrom
batch_write
Sep 18, 2025
Merged

Send out batchwrite req concurrenctly and wait all at once#83
liunyl merged 9 commits intomainfrom
batch_write

Conversation

@liunyl
Copy link
Contributor

@liunyl liunyl commented Sep 17, 2025

Summary by CodeRabbit

  • New Features

    • Per-table, per-partition batched writes with bounded concurrent in-flight gating and continuation callbacks.
    • Archive support: key/value encoding, fetch/read/upsert, base-to-archive copy, and snapshot-for-backup flows.
    • Expanded async APIs for cluster/table/database discovery, statistics, range management, and backup snapshot creation (new public snapshot API).
  • Improvements

    • Connection retry, leader/locality awareness, threading/pooling scaffolding, and coordinated per-partition error handling.
    • Default store file amplify factor reduced (4 → 2) for storage tuning.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

Walkthrough

Adds per-partition concurrency primitives and coordinated batched write paths (PutAll/PutArchivesAll), archive key/value encoding/decoding and MVCC flows, extensive async closures/callbacks for reads/writes/backups, SetupConfig/Connect/init helpers, catalog/table/database/statistics APIs, snapshot-for-backup orchestration, and lifecycle/locality helpers.

Changes

Cohort / File(s) Summary of changes
Client core & lifecycle
data_store_service_client.cpp
Adds SetupConfig, Connect (with retry), ScheduleTimerTasks (placeholder), InitPreBuiltTables, lifecycle hooks (OnLeaderStart, OnStartFollowing, OnShutdown), locality helpers (IsLocalShard/IsLocalPartition), channel accessors and many public helper APIs.
Partitioned batching & sync primitives
data_store_service_client_closure.h, data_store_service_client.h, data_store_service_client.cpp
Introduces PartitionFlushState, PartitionBatchRequest, PartitionCallbackData, SyncConcurrentRequest / SyncPutAllData coordination types, PreparePartitionBatches / PrepareRangePartitionBatches methods, friend PartitionBatchCallback, and related forward declarations and typedefs for partition-aware batching.
PutAll / PutArchivesAll / Copy base→archive
data_store_service_client.cpp, data_store_service_client_closure.cpp, data_store_service_client_closure.h
Reworks PutAll into per-table/per-partition batches with a global coordinator and per-partition callbacks; adds gated in‑flight write control (SyncConcurrentRequest / max_flying_write_count), PartitionBatchCallback chaining, PutArchivesAll refactor, and CopyBaseToArchive coordination.
Archive key/value and MVCC encoding
data_store_service_client.cpp
Adds HashArchiveKey, EncodeArchiveKey (variants), DecodeArchiveKey, EncodeArchiveValue (unpack/encoded sizes), DecodeArchiveValue, and MVCC/archive batching/encoding helpers and decoding utilities.
Batch write scaffolding & serialization
data_store_service_client.cpp, data_store_service_client_closure.h
Adds BatchWriteRecords / BatchWriteRecordsInternal and closures, SerializeTxRecord overloads, DeserializeTxRecordStr, FlushDataClosure and ReadClosure declarations, and helpers for assembling write batches.
Catalog / tables / database / stats APIs
data_store_service_client.cpp, data_store_service_client_closure.h
Adds UpsertTable, FetchTableCatalog/FetchTable, DiscoverAllTableNames, UpsertDatabase/DropDatabase/FetchDatabase, FetchCurrentTableStatistics/FetchTableStatistics, UpsertTableStatistics, Init/Delete table ranges/statistics and related RPC callback scaffolding.
Range operations & slices
data_store_service_client.cpp, data_store_service_client_closure.h
Adds EncodeRangeValue, EncodeRangeSliceKey, UpdateEncodedRangeSliceKey, UpdateRangeSlices, UpsertRanges, FetchTableRanges, FetchRangeSlices and associated callbacks/closures for range management.
Fetch/backup & snapshot orchestration
data_store_service_client.cpp, data_store_service_client_closure.h, eloq_data_store_service/*
Adds FetchArchives/FetchVisibleArchive, FetchSnapshot helpers, CreateSnapshotForBackup / CreateSnapshotForBackupInternal, and callback/data structs to orchestrate per-node snapshot RPCs and aggregate results; adds no-op CreateSnapshotForBackup stub on EloqStoreDataStore and related header.
Closures, callbacks, and async coordination
data_store_service_client_closure.h, data_store_service_client_closure.cpp
Expands DataStoreCallback typedef, SyncCallbackData, many callback-data structs (fetch/archive/backup/db/table/stats), renames SyncPutAllCallback→SyncConcurrentRequestCallback, adds PartitionBatchCallback and PartitionFlushState::GetNextBatch, and enhances async coordination primitives.
Config tweak
eloq_data_store_service/eloq_store_config.cpp
Default flag eloq_store_file_amplify_factor changed from 4 → 2.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller
  participant DSSC as DataStoreServiceClient
  participant Global as SyncConcurrentRequest
  participant Part as PartitionFlushState
  participant Node as DataStore Node

  note right of DSSC: Bounded concurrent per‑partition PutAll
  Caller->>DSSC: PutAll(flush_tasks)
  DSSC->>Global: Init(total_partitions, max_flying)
  loop prepare partitions
    DSSC->>Part: PreparePartitionBatches(...)
    DSSC->>Global: Register(partition_state)
  end
  par bounded concurrent processing
    Global->>Node: BatchWriteRecords(batch) callback-> PartitionBatchCallback
    Node-->>DSSC: callback(result)
    alt batch error
      PartitionBatchCallback->>Part: MarkFailed()
      PartitionBatchCallback->>Global: OnPartitionCompleted(partition_id, failed)
    else success & more batches
      PartitionBatchCallback->>Node: BatchWriteRecords(next_batch)
    else success & no more
      PartitionBatchCallback->>Global: OnPartitionCompleted(partition_id, success)
    end
  end
  Global->>DSSC: WaitAll() -> aggregated result
  DSSC-->>Caller: Success/Failure
Loading
sequenceDiagram
  autonumber
  participant Admin
  participant DSSC as DataStoreServiceClient
  participant ClMgr as ClusterManager
  participant Node

  note right of DSSC: Snapshot for backup across nodes
  Admin->>DSSC: CreateSnapshotForBackup(name, ts)
  DSSC->>ClMgr: Resolve shards/partitions
  par per node
    DSSC->>Node: RequestSnapshot(timeout)
    Node-->>DSSC: Snapshot files/result
  end
  DSSC-->>Admin: Aggregate snapshot files/status
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I hop through batches, stitch each part,
Partitions hum like woven art.
Archives tucked in careful rows,
Snapshots twinkle, backup grows.
I thump — new pipelines spring to heart. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title accurately captures the primary change in the PR—sending batch-write requests concurrently and waiting for all to finish—which aligns with the added per-partition batching and SyncConcurrentRequest/PartitionBatchCallback logic described in the changes. It is concise and focused on the main behavior change. The title does contain minor typos and an abbreviation ("concurrenctly", "req") that slightly reduce clarity.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch batch_write

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Comment @coderabbitai help to get the list of available commands and usage tips.

@liunyl liunyl requested a review from lzxddz September 17, 2025 08:17
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
data_store_service_client_closure.h (1)

118-133: Underflow risk if callers don’t pre-increment; tighten invariants and only latch first error.

Given the new batching model, unfinished_request_cnt_ can go negative if callers increment it late (see cpp). Add a guard and only capture the first non‑OK error.

Apply:

 void Finish(const remote::CommonResult &res)
 {
   std::unique_lock<bthread::Mutex> lk(mux_);
-  if (result_.error_code() == remote::DataStoreError::NO_ERROR)
-  {
-      result_.set_error_code(res.error_code());
-      result_.set_error_msg(res.error_msg());
-  }
+  // only latch the first non-OK error
+  if (res.error_code() != remote::DataStoreError::NO_ERROR &&
+      result_.error_code() == remote::DataStoreError::NO_ERROR) {
+      result_.set_error_code(res.error_code());
+      result_.set_error_msg(res.error_msg());
+  }
 
-  --unfinished_request_cnt_;
+  // defensive: catch protocol misuse in debug
+  assert(unfinished_request_cnt_ > 0 && "unfinished_request_cnt_ underflow");
+  --unfinished_request_cnt_;
   if ((all_request_started_ && unfinished_request_cnt_ == 0) ||
       unfinished_request_cnt_ == max_flying_write_count - 1)
   {
     cv_.notify_one();
   }
 }
data_store_service_client.cpp (1)

309-347: Batch-splitting threshold uses in-flight limit; should use byte budget.

write_batch_size is bytes. Comparing it to max_flying_write_count (a request-count cap) is a units bug. Use MAX_WRITE_BATCH_SIZE for split.

Apply:

-                if (write_batch_size >= SyncPutAllData::max_flying_write_count)
+                if (write_batch_size >= MAX_WRITE_BATCH_SIZE)

and similarly in the range-partitioned loop:

-                    if (write_batch_size >=
-                        SyncPutAllData::max_flying_write_count)
+                    if (write_batch_size >= MAX_WRITE_BATCH_SIZE)

Also applies to: 405-446

🧹 Nitpick comments (1)
data_store_service_client_closure.h (1)

102-103: Good cap, but consider configurability.

max_flying_write_count = 32 is reasonable. Consider making it configurable (env/flag) for different deployments, defaulting to 32.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6572fb1 and 797284c.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (10 hunks)
  • data_store_service_client_closure.h (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
data_store_service_client.cpp (1)
data_store_service_client_closure.cpp (2)
  • SyncPutAllCallback (399-406)
  • SyncPutAllCallback (399-402)
🔇 Additional comments (1)
data_store_service_client.cpp (1)

2561-2564: Longer timeout for backup ops: LGTM.

@liunyl
Copy link
Contributor Author

liunyl commented Sep 17, 2025

@coderabbitai generate docstrings

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

✅ Actions performed

Docstrings generation started.

coderabbitai bot added a commit that referenced this pull request Sep 17, 2025
Docstrings generation was requested by @liunyl.

* #83 (comment)

The following files were modified:

* `data_store_service_client.cpp`
* `data_store_service_client_closure.h`
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

Note

Generated docstrings for this pull request at #84

liunyl and others added 3 commits September 17, 2025 08:32
Docstrings generation was requested by @liunyl.

* #83 (comment)

The following files were modified:

* `data_store_service_client.cpp`
* `data_store_service_client_closure.h`

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
data_store_service_client_closure.h (1)

2634-2654: Base Clear not called in derived Clear.

CreateSnapshotForBackupCallbackData::Clear() resets its own fields but never calls SyncCallbackData::Clear(), leaving stale finished_/result_ across reuse from the pool. Call base Clear first.

Apply:

 void Clear() override
 {
+    SyncCallbackData::Clear();
     backup_name_ = "";
     backup_ts_ = 0;
     backup_files_ = nullptr;
 }
data_store_service_client.cpp (4)

356-392: Pre‑increment unfinished_request_cnt_ before dispatch; current ordering can underflow and disables gating.

In PutAll, you increment after calling BatchWriteRecords, so a fast callback may decrement first (going negative) and the dispatcher never blocks at the intended limit. Pre‑increment under lock and gate before each dispatch; remove post‑dispatch increments.

Apply across all four call sites (hash and range, regular and “last batch”):

-                if (write_batch_size >= MAX_WRITE_BATCH_SIZE)
-                {
-                    BatchWriteRecords(kv_table_name,
-                                      part_it->first,
-                                      std::move(key_parts),
-                                      std::move(record_parts),
-                                      std::move(records_ts),
-                                      std::move(records_ttl),
-                                      std::move(op_types),
-                                      true,
-                                      sync_putall,
-                                      SyncPutAllCallback,
-                                      parts_cnt_per_key,
-                                      parts_cnt_per_record);
-                    // Wait for in-flight requests to decrease if limit reached
-                    {
-                        std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
-                        sync_putall->unfinished_request_cnt_++;
-                        while (sync_putall->unfinished_request_cnt_ >=
-                               SyncPutAllData::max_flying_write_count)
-                        {
-                            sync_putall->cv_.wait(lk);
-                        }
-                    }
+                if (write_batch_size >= MAX_WRITE_BATCH_SIZE)
+                {
+                    // Pre-account and gate
+                    {
+                        std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                        while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                            sync_putall->cv_.wait(lk);
+                        }
+                        ++sync_putall->unfinished_request_cnt_;
+                    }
+                    BatchWriteRecords(kv_table_name,
+                                      part_it->first,
+                                      std::move(key_parts),
+                                      std::move(record_parts),
+                                      std::move(records_ts),
+                                      std::move(records_ttl),
+                                      std::move(op_types),
+                                      true,
+                                      sync_putall,
+                                      SyncPutAllCallback,
+                                      parts_cnt_per_key,
+                                      parts_cnt_per_record);
                     key_parts.clear();
@@
-            if (key_parts.size() > 0)
-            {
-                BatchWriteRecords(kv_table_name,
-                                  part_it->first,
-                                  std::move(key_parts),
-                                  std::move(record_parts),
-                                  std::move(records_ts),
-                                  std::move(records_ttl),
-                                  std::move(op_types),
-                                  true,
-                                  sync_putall,
-                                  SyncPutAllCallback,
-                                  parts_cnt_per_key,
-                                  parts_cnt_per_record);
-                {
-                    std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
-                    sync_putall->unfinished_request_cnt_++;
-                }
+            if (key_parts.size() > 0)
+            {
+                // Pre-account and gate
+                {
+                    std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                    while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                        sync_putall->cv_.wait(lk);
+                    }
+                    ++sync_putall->unfinished_request_cnt_;
+                }
+                BatchWriteRecords(kv_table_name,
+                                  part_it->first,
+                                  std::move(key_parts),
+                                  std::move(record_parts),
+                                  std::move(records_ts),
+                                  std::move(records_ttl),
+                                  std::move(op_types),
+                                  true,
+                                  sync_putall,
+                                  SyncPutAllCallback,
+                                  parts_cnt_per_key,
+                                  parts_cnt_per_record);
                 key_parts.clear();
@@
-                    if (write_batch_size >= MAX_WRITE_BATCH_SIZE)
-                    {
-                        BatchWriteRecords(kv_table_name,
-                                          part_it->first,
-                                          std::move(key_parts),
-                                          std::move(record_parts),
-                                          std::move(records_ts),
-                                          std::move(records_ttl),
-                                          std::move(op_types),
-                                          true,
-                                          sync_putall,
-                                          SyncPutAllCallback,
-                                          parts_cnt_per_key,
-                                          parts_cnt_per_record);
+                    if (write_batch_size >= MAX_WRITE_BATCH_SIZE)
+                    {
+                        // Pre-account and gate
+                        {
+                            std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                            while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                                sync_putall->cv_.wait(lk);
+                            }
+                            ++sync_putall->unfinished_request_cnt_;
+                        }
+                        BatchWriteRecords(kv_table_name,
+                                          part_it->first,
+                                          std::move(key_parts),
+                                          std::move(record_parts),
+                                          std::move(records_ts),
+                                          std::move(records_ttl),
+                                          std::move(op_types),
+                                          true,
+                                          sync_putall,
+                                          SyncPutAllCallback,
+                                          parts_cnt_per_key,
+                                          parts_cnt_per_record);
                         record_tmp_mem_area.clear();
@@
-                if (key_parts.size() > 0)
-                {
-                    BatchWriteRecords(kv_table_name,
-                                      part_it->first,
-                                      std::move(key_parts),
-                                      std::move(record_parts),
-                                      std::move(records_ts),
-                                      std::move(records_ttl),
-                                      std::move(op_types),
-                                      true,
-                                      sync_putall,
-                                      SyncPutAllCallback,
-                                      parts_cnt_per_key,
-                                      parts_cnt_per_record);
+                if (key_parts.size() > 0)
+                {
+                    // Pre-account and gate
+                    {
+                        std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                        while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                            sync_putall->cv_.wait(lk);
+                        }
+                        ++sync_putall->unfinished_request_cnt_;
+                    }
+                    BatchWriteRecords(kv_table_name,
+                                      part_it->first,
+                                      std::move(key_parts),
+                                      std::move(record_parts),
+                                      std::move(records_ts),
+                                      std::move(records_ttl),
+                                      std::move(op_types),
+                                      true,
+                                      sync_putall,
+                                      SyncPutAllCallback,
+                                      parts_cnt_per_key,
+                                      parts_cnt_per_record);
                     record_tmp_mem_area.clear();
-                    {
-                        std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
-                        sync_putall->unfinished_request_cnt_++;
-                    }

Also applies to: 409-427, 455-493, 505-530


2391-2421: Same pre‑increment issue for the final batch in PutArchivesAll.

The “last batch” path increments after dispatch. Pre‑increment under lock and gate before calling BatchWriteRecords; remove the post‑dispatch increment.

-        if (keys.size() > 0)
-        {
-            BatchWriteRecords(kv_mvcc_archive_name,
+        if (keys.size() > 0)
+        {
+            // Pre-account and gate
+            {
+                std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                    sync_putall->cv_.wait(lk);
+                }
+                ++sync_putall->unfinished_request_cnt_;
+            }
+            BatchWriteRecords(kv_mvcc_archive_name,
                               partition_id,
                               std::move(keys),
                               std::move(records),
                               std::move(records_ts),
                               std::move(records_ttl),
                               std::move(op_types),
                               true,
                               sync_putall,
                               SyncPutAllCallback,
                               parts_cnt_per_key,
                               parts_cnt_per_record);
@@
-            write_batch_size = 0;
-            {
-                std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
-                sync_putall->unfinished_request_cnt_++;
-            }
+            write_batch_size = 0;

2060-2136: Archive key format is fragile (delimiter collisions) and uses unaligned reads; switch to length‑prefixed binary.

  • Keys/values are arbitrary bytes; using "" as a separator is ambiguous. rfind cannot help because commit_ts is raw bytes.
  • DecodeArchiveKey uses reinterpret_cast to read uint64_t, which is UB on unaligned addresses.

Encode with 32‑bit BE lengths for table_name and key, then 8‑byte BE commit_ts; update both overloads to emit length‑prefixed parts and decode via memcpy.

Apply:

-std::string DataStoreServiceClient::EncodeArchiveKey(std::string_view table_name, std::string_view key, uint64_t be_commit_ts)
+std::string DataStoreServiceClient::EncodeArchiveKey(std::string_view table_name, std::string_view key, uint64_t be_commit_ts)
 {
-    std::string archive_key;
-    archive_key.reserve(table_name.size() + key.size() + KEY_SEPARATOR.size());
-    archive_key.append(table_name);
-    archive_key.append(KEY_SEPARATOR);
-    archive_key.append(key);
-    archive_key.append(KEY_SEPARATOR);
-    archive_key.append(reinterpret_cast<const char *>(&be_commit_ts), sizeof(uint64_t));
-    return archive_key;
+    uint32_t tn_len_be = EloqShare::host_to_big_endian(static_cast<uint32_t>(table_name.size()));
+    uint32_t key_len_be = EloqShare::host_to_big_endian(static_cast<uint32_t>(key.size()));
+    std::string out;
+    out.reserve(sizeof(tn_len_be) + table_name.size() + sizeof(key_len_be) + key.size() + sizeof(uint64_t));
+    out.append(reinterpret_cast<const char*>(&tn_len_be), sizeof(tn_len_be));
+    out.append(table_name.data(), table_name.size());
+    out.append(reinterpret_cast<const char*>(&key_len_be), sizeof(key_len_be));
+    out.append(key.data(), key.size());
+    out.append(reinterpret_cast<const char*>(&be_commit_ts), sizeof(uint64_t));
+    return out;
 }
@@
-void DataStoreServiceClient::EncodeArchiveKey(std::string_view table_name,
-                                              std::string_view key,
-                                              uint64_t &be_commit_ts,
-                                              std::vector<std::string_view> &keys,
-                                              uint64_t &write_batch_size)
+void DataStoreServiceClient::EncodeArchiveKey(std::string_view table_name,
+                                              std::string_view key,
+                                              uint64_t &be_commit_ts,
+                                              std::vector<std::string_view> &keys,
+                                              uint64_t &write_batch_size)
 {
-    keys.emplace_back(table_name);
-    write_batch_size += table_name.size();
-    keys.emplace_back(KEY_SEPARATOR);
-    write_batch_size += KEY_SEPARATOR.size();
-    keys.emplace_back(key);
-    write_batch_size += key.size();
-    keys.emplace_back(KEY_SEPARATOR);
-    write_batch_size += KEY_SEPARATOR.size();
-    keys.emplace_back(reinterpret_cast<const char *>(&be_commit_ts), sizeof(uint64_t));
-    write_batch_size += sizeof(uint64_t);
+    uint32_t tn_len_be = EloqShare::host_to_big_endian(static_cast<uint32_t>(table_name.size()));
+    uint32_t key_len_be = EloqShare::host_to_big_endian(static_cast<uint32_t>(key.size()));
+    keys.emplace_back(reinterpret_cast<const char*>(&tn_len_be), sizeof(uint32_t));
+    write_batch_size += sizeof(uint32_t);
+    keys.emplace_back(table_name);
+    write_batch_size += table_name.size();
+    keys.emplace_back(reinterpret_cast<const char*>(&key_len_be), sizeof(uint32_t));
+    write_batch_size += sizeof(uint32_t);
+    keys.emplace_back(key);
+    write_batch_size += key.size();
+    keys.emplace_back(reinterpret_cast<const char *>(&be_commit_ts), sizeof(uint64_t));
+    write_batch_size += sizeof(uint64_t);
 }
@@
-bool DataStoreServiceClient::DecodeArchiveKey(const std::string &archive_key,
+bool DataStoreServiceClient::DecodeArchiveKey(const std::string &archive_key,
                                               std::string &table_name,
                                               txservice::TxKey &key,
                                               uint64_t &be_commit_ts)
 {
-    // Find the first separator
-    size_t first_sep = archive_key.find(KEY_SEPARATOR);
-    if (first_sep == std::string::npos) { return false; }
-    // Extract table_name
-    table_name = archive_key.substr(0, first_sep);
-    // Find the second separator
-    size_t second_sep = archive_key.find(KEY_SEPARATOR, first_sep + KEY_SEPARATOR.size());
-    if (second_sep == std::string::npos) { return false; }
-    // Extract key
-    size_t key_start = first_sep + KEY_SEPARATOR.size();
-    size_t key_length = second_sep - key_start;
-    key = txservice::TxKeyFactory::CreateTxKey(archive_key.data() + key_start, key_length);
-    // Extract commit_ts
-    size_t ts_pos = second_sep + KEY_SEPARATOR.size();
-    if (ts_pos + sizeof(uint64_t) > archive_key.size()) { return false; }
-    be_commit_ts = *reinterpret_cast<const uint64_t *>(archive_key.data() + ts_pos);
+    const char* p = archive_key.data();
+    size_t n = archive_key.size();
+    if (n < sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t)) return false;
+    uint32_t tn_len_be = 0; std::memcpy(&tn_len_be, p, sizeof(uint32_t)); p += sizeof(uint32_t); n -= sizeof(uint32_t);
+    uint32_t tn_len = EloqShare::big_endian_to_host(tn_len_be);
+    if (n < tn_len + sizeof(uint32_t) + sizeof(uint64_t)) return false;
+    table_name.assign(p, tn_len); p += tn_len; n -= tn_len;
+    uint32_t key_len_be = 0; std::memcpy(&key_len_be, p, sizeof(uint32_t)); p += sizeof(uint32_t); n -= sizeof(uint32_t);
+    uint32_t key_len = EloqShare::big_endian_to_host(key_len_be);
+    if (n < key_len + sizeof(uint64_t)) return false;
+    key = txservice::TxKeyFactory::CreateTxKey(p, key_len); p += key_len; n -= key_len;
+    std::memcpy(&be_commit_ts, p, sizeof(uint64_t));
     return true;
 }

Note: EloqShare::big_endian_to_host must exist; otherwise add the inverse helper alongside host_to_big_endian.


3844-3889: Avoid assert(false) on missing channel in production path.

BatchWriteRecordsInternal() remote branch asserts and then continues. This can crash debug builds and is redundant. Set NETWORK_ERROR and Run() without assert.

-        if (!channel)
-        {
-            // TODO(lzx): retry..
-            assert(false);
-            closure->result_.set_error_code(
-                remote::DataStoreError::NETWORK_ERROR);
-            closure->Run();
-            return;
-        }
+        if (!channel)
+        {
+            closure->result_.set_error_code(remote::DataStoreError::NETWORK_ERROR);
+            closure->Run();
+            return;
+        }
🧹 Nitpick comments (3)
data_store_service_client_closure.h (2)

217-315: key_str_ lifetime is a string_view; verify it outlives aggregation.

ReadBaseForArchiveCallbackData::AddResult stores a std::string_view to the source key. Ensure the source buffer (from ReadClosure->Key()) is backed by stable memory (e.g., the original TxKey in flush_task) until after results are consumed. If not, copy the key.


137-178: Verified: no negative-counter path — document pre-increment invariant (or add helper)

All BatchWriteRecords call sites increment sync_putall->unfinished_request_cnt_ while holding sync_putall->mux_ before dispatch (examples: data_store_service_client.cpp:374,425,486,528,2310,2419). Finish() decrements under the same mutex, so the counter cannot go negative. Add an explicit sentence to data_store_service_client_closure.h (near the SyncPutAllData definition, ~lines 124–127) requiring callers to increment unfinished_request_cnt_ while holding mux_ before sending the request — or provide a helper that atomically gates+increments before dispatch.

data_store_service_client.cpp (1)

2983-3001: Missing error propagation when no channel for a shard in CreateSnapshotForBackupInternal.

Currently logs and skips the shard, final result may still be NO_ERROR. Consider setting a failure in closure->Result() and completing, or tracking per-shard errors.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 797284c and 13cae07.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (41 hunks)
  • data_store_service_client_closure.h (13 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
data_store_service_client_closure.h (2)
eloq_data_store_service/object_pool.h (1)
  • Poolable (39-185)
data_store_service_client_closure.cpp (36)
  • SyncCallback (43-53)
  • SyncCallback (43-46)
  • SyncBatchReadForArchiveCallback (55-107)
  • SyncBatchReadForArchiveCallback (55-58)
  • LoadRangeSliceCallback (1004-1102)
  • LoadRangeSliceCallback (1004-1007)
  • FetchRecordCallback (109-221)
  • FetchRecordCallback (109-112)
  • FetchSnapshotCallback (223-309)
  • FetchSnapshotCallback (223-226)
  • AsyncDropTableCallback (311-325)
  • AsyncDropTableCallback (311-314)
  • FetchTableCatalogCallback (327-360)
  • FetchTableCatalogCallback (327-330)
  • SyncPutAllCallback (399-406)
  • SyncPutAllCallback (399-402)
  • FetchDatabaseCallback (408-436)
  • FetchDatabaseCallback (408-411)
  • FetchAllDatabaseCallback (438-499)
  • FetchAllDatabaseCallback (438-441)
  • DiscoverAllTableNamesCallback (501-566)
  • DiscoverAllTableNamesCallback (501-504)
  • FetchTableRangesCallback (568-695)
  • FetchTableRangesCallback (568-571)
  • FetchRangeSlicesCallback (697-875)
  • FetchRangeSlicesCallback (697-700)
  • FetchCurrentTableStatsCallback (877-903)
  • FetchCurrentTableStatsCallback (877-880)
  • FetchTableStatsCallback (905-1002)
  • FetchTableStatsCallback (905-908)
  • FetchArchivesCallback (1104-1163)
  • FetchArchivesCallback (1104-1107)
  • FetchRecordArchivesCallback (1165-1293)
  • FetchRecordArchivesCallback (1165-1168)
  • FetchSnapshotArchiveCallback (1295-1356)
  • FetchSnapshotArchiveCallback (1295-1298)
data_store_service_client.cpp (3)
data_store_service_client.h (2)
  • ScheduleTimerTasks (109-130)
  • NeedCopyRange (340-354)
rocksdb_handler.cpp (20)
  • ScheduleTimerTasks (254-259)
  • ScheduleTimerTasks (254-254)
  • lk (1869-1869)
  • lk (1875-1875)
  • lk (1896-1896)
  • lk (1900-1900)
  • lk (2034-2034)
  • lk (3097-3097)
  • lk (3102-3102)
  • lk (3115-3115)
  • NeedCopyRange (1470-1473)
  • NeedCopyRange (1470-1470)
  • RestoreTxCache (1688-1856)
  • RestoreTxCache (1688-1689)
  • OnLeaderStart (1858-1882)
  • OnLeaderStart (1858-1858)
  • OnStartFollowing (1884-1905)
  • OnStartFollowing (1884-1884)
  • OnShutdown (1907-1973)
  • OnShutdown (1907-1907)
data_store_service_client_closure.cpp (2)
  • SyncPutAllCallback (399-406)
  • SyncPutAllCallback (399-402)

Comment on lines 2512 to +2536
void DiscoverAllTableNamesCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);

/**
* Callback for fetching table ranges.
*
* Handles the completion of table range fetch operations and processes the result.
*/
void FetchTableRangesCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);

/**
* Callback for fetching range slices.
*
* Handles the completion of range slice fetch operations and processes the result.
*/
void FetchRangeSlicesCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Callback implementations use unaligned reinterpret_cast loads; switch to memcpy.

Implementations for FetchRangeSlicesCallback/FetchTableRangesCallback/FetchTableStatsCallback decode integers via
reinterpret_cast<const T*>(buf), which is UB on unaligned addresses. Use std::memcpy to a local T and advance the cursor.

Locate sites:


🏁 Script executed:

#!/bin/bash
rg -nP --type=cpp 'reinterpret_cast<const (u?int(32|64)|uint8)_t \*\>' data_store_service_client_closure.cpp -C2

Length of output: 2475


Use std::memcpy for unaligned integer reads in callbacks

Callback code (FetchRangeSlicesCallback / FetchTableRangesCallback / FetchTableStatsCallback) reads integers directly from char buffers via reinterpret_cast<const T*>(buf) — this is UB on unaligned addresses. Copy into a local variable with std::memcpy and advance the cursor instead.

Affected locations (data_store_service_client_closure.cpp): lines 615, 617, 619, 741, 743, 745, 747, 810, 826, 834, 949, 957, 968.
Replace patterns like:
(reinterpret_cast<const uint32_t>(buf))
with:
uint32_t v; std::memcpy(&v, buf, sizeof(v)); buf += sizeof(v);

🤖 Prompt for AI Agents
In data_store_service_client_closure.cpp around the listed offsets (lines ~615,
617, 619, 741, 743, 745, 747, 810, 826, 834, 949, 957, 968) replace all direct
unaligned reads that use constructs like *(reinterpret_cast<const
uint32_t*>(buf)) (or other integer types) with safe memcpy-based reads: declare
a local integer of the correct fixed-width type, call std::memcpy(&local, buf,
sizeof(local)), then advance buf by sizeof(local) before using the value; ensure
you include <cstring> if not present and use the exact integer widths
(uint16_t/uint32_t/uint64_t) matching the original reads.

@liunyl liunyl linked an issue Sep 17, 2025 that may be closed by this pull request
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
data_store_service_client.cpp (3)

2062-2066: Type mismatch: vector<uint64_t> with size_t references

record_tmp_mem_area is std::vector<uint64_t>, but you bind size_t& to its elements. This won’t compile on many platforms and is semantically inconsistent.

Make types consistent:

-        std::vector<uint64_t> record_tmp_mem_area;
+        std::vector<size_t> record_tmp_mem_area;
         record_tmp_mem_area.resize(archive_ptrs.size() * 2);
...
-            size_t &unpack_info_size = record_tmp_mem_area[i * 2];
-            size_t &encode_blob_size = record_tmp_mem_area[i * 2 + 1];
+            size_t &unpack_info_size = record_tmp_mem_area[i * 2];
+            size_t &encode_blob_size = record_tmp_mem_area[i * 2 + 1];

And in EncodeArchiveValue, use sizeof(size_t) for the size fields (see next comment).

Also applies to: 2158-2161


2175-2206: Last batch increments after dispatch; can exceed cap

Account the last batch under the cap before sending it.

Apply:

-        if (keys.size() > 0)
+        if (keys.size() > 0)
         {
+            // Gate and account
+            {
+                std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                while (sync_putall->unfinished_request_cnt_ >=
+                       SyncPutAllData::max_flying_write_count) {
+                    sync_putall->cv_.wait(lk);
+                }
+                ++sync_putall->unfinished_request_cnt_;
+            }
             BatchWriteRecords(kv_mvcc_archive_name,
                               partition_id,
                               std::move(keys),
                               std::move(records),
                               std::move(records_ts),
                               std::move(records_ttl),
                               std::move(op_types),
                               true,
                               sync_putall,
                               SyncPutAllCallback,
                               parts_cnt_per_key,
                               parts_cnt_per_record);
             ...
-            {
-                std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
-                sync_putall->unfinished_request_cnt_++;
-            }

407-447: PartitionBatchCallback must decrement global in‑flight counter and gate next dispatch

SyncPutAllData (data_store_service_client_closure.h:143–181) provides unfinished_request_cnt_ and Finish() that decrements it. PartitionBatchCallback (data_store_service_client_closure.cpp:408–447) does not update the global in‑flight counter or gate before launching the next batch, so the global cap is ineffective. Fix: call global_coordinator->Finish(result) when any batch completes (success or error), and before sending the next BatchWriteRecords wait on global_coordinator->mux_ while unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count, then ++unfinished_request_cnt_ when dispatching (mirror the pattern at data_store_service_client.cpp:2088–2220).

♻️ Duplicate comments (2)
data_store_service_client_closure.cpp (1)

23-31: UB: unaligned reinterpret_cast loads; replace with memcpy-based reads.

These direct loads invoke undefined behavior on unaligned buffers and can break on non-x86 or with sanitizers.

Apply a safe reader and use it at all sites:

+#include <cstring>
+
+namespace {
+template <typename T>
+inline T ReadUnaligned(const char *&buf) {
+  T v;
+  std::memcpy(&v, buf, sizeof(T));
+  buf += sizeof(T);
+  return v;
+}
+}

Examples:

- int32_t partition_id = *(reinterpret_cast<const int32_t *>(buf));
- buf += sizeof(partition_id);
- uint64_t range_version = *(reinterpret_cast<const uint64_t *>(buf));
- buf += sizeof(range_version);
- uint64_t slice_version = *(reinterpret_cast<const uint64_t *>(buf));
- buf += sizeof(slice_version);
+ int32_t partition_id = ReadUnaligned<int32_t>(buf);
+ uint64_t range_version = ReadUnaligned<uint64_t>(buf);
+ uint64_t slice_version = ReadUnaligned<uint64_t>(buf);
- uint32_t segment_cnt = *(reinterpret_cast<const uint32_t *>(buf));
+ uint32_t segment_cnt = ReadUnaligned<uint32_t>(buf);
- uint64_t slice_version = *(reinterpret_cast<const uint64_t *>(buf));
+ uint64_t slice_version = ReadUnaligned<uint64_t>(buf);
- uint32_t key_len = *(reinterpret_cast<const uint32_t *>(buf));
- buf += sizeof(uint32_t);
+ uint32_t key_len = ReadUnaligned<uint32_t>(buf);
- uint32_t slice_size = *(reinterpret_cast<const uint32_t *>(buf));
- buf += sizeof(uint32_t);
+ uint32_t slice_size = ReadUnaligned<uint32_t>(buf);
- uint8_t indextype_val = *(reinterpret_cast<const uint8_t *>(value_buf));
- offset += sizeof(uint8_t);
+ uint8_t indextype_val = *(reinterpret_cast<const uint8_t *>(value_buf + offset));
+ offset += sizeof(uint8_t);
- uint64_t records_cnt = *(reinterpret_cast<const uint64_t *>(value_buf + offset));
+ uint64_t records_cnt = ReadUnaligned<uint64_t>(value_buf += offset);

Apply similarly to all integer reads in these functions.

Also applies to: 646-663, 777-790, 868-878, 990-1016

data_store_service_client.cpp (1)

284-315: Missing global in‑flight gating; can blast thousands of concurrent RPCs

Docstring promises honoring SyncPutAllData::max_flying_write_count, but first batches are dispatched for all partitions without accounting/limits.

Gate and pre‑increment before each dispatch:

             // Start the first batch for this partition
             PartitionBatchRequest first_batch;
             if (partition_state->GetNextBatch(first_batch)) {
+                // Gate global concurrency and account this batch
+                {
+                    std::unique_lock<bthread::Mutex> lk(sync_putall->mux_);
+                    while (sync_putall->unfinished_request_cnt_ >= SyncPutAllData::max_flying_write_count) {
+                        sync_putall->cv_.wait(lk);
+                    }
+                    ++sync_putall->unfinished_request_cnt_;
+                }
                 BatchWriteRecords(
                     callback_data->table_name,
                     partition_state->partition_id,
                     std::move(first_batch.key_parts),
                     std::move(first_batch.record_parts),
                     std::move(first_batch.records_ts),
                     std::move(first_batch.records_ttl),
                     std::move(first_batch.op_types),
                     true, // skip_wal
                     callback_data,
                     PartitionBatchCallback,
                     first_batch.parts_cnt_per_key,
                     first_batch.parts_cnt_per_record);

And decrement in PartitionBatchCallback after each batch completes (see separate comment). Also update the docstring if you intentionally removed gating.

🧹 Nitpick comments (7)
data_store_service_client_closure.h (1)

241-305: has_inflight_request is never maintained; HasMoreBatches() is misleading.

The flag is never set/reset in callbacks, so HasMoreBatches() can return false positives/negatives.

Set/unset the flag when sending/completing a batch, or remove the flag and simplify HasMoreBatches():

-  bool HasMoreBatches() const {
-      std::unique_lock<bthread::Mutex> lk(mux);
-      return !pending_batches.empty() || has_inflight_request;
-  }
+  bool HasMoreBatches() const {
+      std::unique_lock<bthread::Mutex> lk(mux);
+      return !pending_batches.empty();
+  }

And in PartitionBatchCallback set has_inflight_request accordingly (see cpp diff).

data_store_service_client_closure.cpp (2)

1615-1620: Make RPC timeout configurable.

Hardcoding 5s may not fit all deployments. Consider plumbed config or client-level default with override.

- cntl_.set_timeout_ms(5000);
+ cntl_.set_timeout_ms(ds_service_client_->batch_write_timeout_ms());

And add a sane default in DataStoreServiceClient with config hook.


1149-1159: Avoid abort/assert on decode failures in production paths.

These hard fails can take down the process. Prefer error propagation with context.

Return DATA_STORE_ERR and include key/table in error_msg; optionally count a metric.

Also applies to: 1392-1401

data_store_service_client.cpp (4)

1937-1984: EncodeArchiveValue writes size fields with sizeof(uint64_t); use sizeof(size_t) to match producer

The callers provide size_t fields; writing 8 bytes unconditionally can misencode on non‑LP64. Align with SerializeTxRecord.

Apply:

-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&unpack_info_size),
-                             sizeof(uint64_t)));
-        write_batch_size += sizeof(uint64_t);
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&unpack_info_size),
+            sizeof(size_t)));
+        write_batch_size += sizeof(size_t);
...
-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&encoded_blob_size),
-                             sizeof(uint64_t)));
-        write_batch_size += sizeof(uint64_t);
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&encoded_blob_size),
+            sizeof(size_t)));
+        write_batch_size += sizeof(size_t);

152-156: Don’t assert in a public override; return gracefully or noop

ScheduleTimerTasks aborts the process if called. Prefer a no‑op or TODO with feature flag.

Apply:

-    LOG(ERROR) << "ScheduleTimerTasks not implemented";
-    assert(false);
+    DLOG(INFO) << "ScheduleTimerTasks noop (not implemented)";

1869-1881: Docstring mismatch with actual key format

Comment shows “log:item:{table_name}:{key}:{commit_ts}”, but implementation uses backslash separators and binary commit_ts. Update docs for accuracy.


1138-1153: Nit: “small endian” → “little‑endian”

Fix terminology in comment.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 13cae07 and 383e392.

📒 Files selected for processing (4)
  • data_store_service_client.cpp (39 hunks)
  • data_store_service_client.h (3 hunks)
  • data_store_service_client_closure.cpp (1 hunks)
  • data_store_service_client_closure.h (12 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
data_store_service_client_closure.cpp (1)
data_store_service_client.cpp (1)
  • callback_data (2440-2446)
data_store_service_client.h (2)
data_store_service_client.cpp (4)
  • PreparePartitionBatches (4113-4265)
  • PreparePartitionBatches (4113-4120)
  • PrepareRangePartitionBatches (4267-4364)
  • PrepareRangePartitionBatches (4267-4274)
data_store_service_client_closure.cpp (2)
  • PartitionBatchCallback (408-448)
  • PartitionBatchCallback (408-411)
data_store_service_client.cpp (2)
data_store_service_client.h (2)
  • ScheduleTimerTasks (113-134)
  • NeedCopyRange (344-358)
data_store_service_client_closure.cpp (2)
  • PartitionBatchCallback (408-448)
  • PartitionBatchCallback (408-411)
data_store_service_client_closure.h (2)
data_store_service_client.h (1)
  • DataStoreServiceClient (62-616)
data_store_service_client_closure.cpp (2)
  • PartitionBatchCallback (408-448)
  • PartitionBatchCallback (408-411)
🔇 Additional comments (3)
data_store_service_client_closure.h (2)

143-166: Unfinished in-flight counter is not integrated with new partition completion path.

OnPartitionCompleted() uses a separate completion condition but does not interact with unfinished_request_cnt_. If send-side gating relies on unfinished_request_cnt_ to respect max_flying_write_count, callbacks must decrement/increment it accordingly.

Please confirm that every launched batch increments unfinished_request_cnt_ and that each callback decrements it via Finish(...). The cpp callback currently doesn’t call Finish. The cpp patch wires this up.

Also applies to: 185-205


310-317: Verify lifetime of PartitionCallbackData pointers.

partition_state is a raw pointer while SyncPutAllData holds partition_states_ as unique_ptr. Ensure PartitionCallbackData outlives all chained callbacks and is not destroyed until OnPartitionCompleted has fired and no further RPCs can reference it.

Would you like me to scan the call sites to confirm allocation and lifetime semantics?

data_store_service_client.h (1)

663-667: Friend for PartitionBatchCallback is fine.

Declaration matches implementation needs and keeps callback access scoped.

Comment on lines 191 to 196
auto &table_name = entries.front()->data_sync_task_->table_name_;

// Group records by partition
std::unordered_map<uint32_t, std::vector<std::pair<size_t, size_t>>>
hash_partitions_map;
std::unordered_map<uint32_t, std::vector<size_t>> range_partitions_map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against empty entries before using entries.front()

Dereferencing entries.front() when entries is empty is UB. Skip empty vectors.

Apply:

-        auto &table_name = entries.front()->data_sync_task_->table_name_;
+        if (entries.empty()) {
+            DLOG(INFO) << "PutAll: skip empty entries for table " << kv_table_name;
+            continue;
+        }
+        auto &table_name = entries.front()->data_sync_task_->table_name_;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto &table_name = entries.front()->data_sync_task_->table_name_;
// Group records by partition
std::unordered_map<uint32_t, std::vector<std::pair<size_t, size_t>>>
hash_partitions_map;
std::unordered_map<uint32_t, std::vector<size_t>> range_partitions_map;
if (entries.empty()) {
DLOG(INFO) << "PutAll: skip empty entries for table " << kv_table_name;
continue;
}
auto &table_name = entries.front()->data_sync_task_->table_name_;
// Group records by partition
std::unordered_map<uint32_t, std::vector<std::pair<size_t, size_t>>>
hash_partitions_map;
std::unordered_map<uint32_t, std::vector<size_t>> range_partitions_map;
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 191 to 196, the code dereferences
entries.front() without checking if entries is empty which causes undefined
behavior; add a guard at the top of this block to check if entries.empty() and
either return/continue or otherwise skip processing empty vectors, e.g. handle
the empty case before accessing entries.front(), or throw/log an error and exit
the function; ensure subsequent code that assumes a valid front() is only run
when entries is non-empty.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
data_store_service_client_closure.cpp (1)

1185-1206: Limit handling is wrong; may over/under-fetch archives.

You compare the current batch size to limit_. Use the accumulated count and cap the next batch size accordingly.

-    if (items_size < scan_next_closure->BatchSize() ||
-        items_size >= fetch_data->limit_)
+    const bool reached_limit =
+        fetch_data->archive_values_.size() >= fetch_data->limit_;
+    if (items_size < scan_next_closure->BatchSize() || reached_limit)
     {
         // has no more data, notify
         fetch_data->Result().set_error_code(remote::DataStoreError::NO_ERROR);
         fetch_data->Notify();
     }
     else
     {
+        // Respect remaining limit when continuing.
+        const size_t remaining =
+            fetch_data->limit_ - fetch_data->archive_values_.size();
+        const uint32_t next_batch =
+            static_cast<uint32_t>(std::min(remaining,
+                                           static_cast<size_t>(fetch_data->batch_size_)));
         client.ScanNext(fetch_data->kv_table_name_,
                         fetch_data->partition_id_,
                         fetch_data->start_key_,
                         fetch_data->end_key_,
                         scan_next_closure->SessionId(),
                         false,
                         false,
                         fetch_data->scan_forward_,
-                        fetch_data->batch_size_,
+                        next_batch,
                         nullptr,
                         fetch_data,
                         &FetchArchivesCallback);
     }
data_store_service_client.cpp (3)

3873-3880: Bug: result of InitTableLastRangePartitionId ignored.

The boolean chaining is not assigned back to ok; failures are silently ignored.

Apply:

-            bool ok = InitTableRanges(tablename, table_version);
-            ok &&InitTableLastRangePartitionId(tablename);
+            bool ok = InitTableRanges(tablename, table_version);
+            ok = ok && InitTableLastRangePartitionId(tablename);

1926-1981: Archive key decoding is fragile; KEY_SEPARATOR collides with binary keys.

Key is arbitrary binary; using "" as a delimiter breaks parsing and risks mis-splitting. Prefer length‑prefixing or fixed layout: [len(table)][table][len(key)][key][be_ts].

Suggested change (API-safe):

  • Change EncodeArchiveKey/DecodeArchiveKey to length‑prefix table and key; keep be_ts fixed 8 bytes.
  • If changing format isn’t possible now, at least validate that key/table never contain "" and document it.

2249-2279: Fix last‑batch in‑flight accounting: pre‑increment under lock before dispatch; remove post‑dispatch increment.

Current code dispatches the last batch, then increments unfinished_request_cnt_. Callback may run first, causing underflow/early completion.

Apply:

-        if (keys.size() > 0)
+        if (keys.size() > 0)
         {
+            // Gate and account one in-flight request
+            {
+                std::unique_lock<bthread::Mutex> lk(sync_concurrent->mux_);
+                while (sync_concurrent->unfinished_request_cnt_ >=
+                       SyncConcurrentRequest::max_flying_write_count) {
+                    sync_concurrent->cv_.wait(lk);
+                }
+                ++sync_concurrent->unfinished_request_cnt_;
+            }
             BatchWriteRecords(kv_mvcc_archive_name,
                               partition_id,
                               std::move(keys),
                               std::move(records),
                               std::move(records_ts),
                               std::move(records_ttl),
                               std::move(op_types),
                               true,
                               sync_concurrent,
                               SyncConcurrentRequestCallback,
                               parts_cnt_per_key,
                               parts_cnt_per_record);
             keys.clear();
             records.clear();
             records_ts.clear();
             records_ttl.clear();
             op_types.clear();
@@
-            write_batch_size = 0;
-            {
-                std::unique_lock<bthread::Mutex> lk(sync_concurrent->mux_);
-                sync_concurrent->unfinished_request_cnt_++;
-            }
+            write_batch_size = 0;
         }
♻️ Duplicate comments (2)
data_store_service_client_closure.h (1)

324-341: Constructor forgets to initialize record_tmp_mem_area (leads to lost backing).

The member isn’t initialized, so the intended backing storage isn’t preserved.

-    PartitionBatchRequest(std::vector<std::string_view> &&keys,
-                          std::vector<std::string_view> &&records,
-                          std::vector<uint64_t> &&ts,
-                          std::vector<uint64_t> &&ttl,
-                          std::vector<size_t> &&record_tmp_mem_area,
-                          std::vector<WriteOpType> &&ops,
+    PartitionBatchRequest(std::vector<std::string_view> &&keys,
+                          std::vector<std::string_view> &&records,
+                          std::vector<uint64_t> &&ts,
+                          std::vector<uint64_t> &&ttl,
+                          std::vector<size_t> &&tmp_mem_area,
+                          std::vector<WriteOpType> &&ops,
                           uint16_t key_parts_count,
                           uint16_t record_parts_count)
         : key_parts(std::move(keys)),
           record_parts(std::move(records)),
           records_ts(std::move(ts)),
           records_ttl(std::move(ttl)),
+          record_tmp_mem_area(std::move(tmp_mem_area)),
           op_types(std::move(ops)),
           parts_cnt_per_key(key_parts_count),
           parts_cnt_per_record(record_parts_count)
     {
     }
data_store_service_client.cpp (1)

201-205: Guard against empty entries before dereferencing entries.front().

entries may be empty; dereferencing front() is UB and can crash.

Apply:

-    for (auto &[kv_table_name, entries] : flush_task)
+    for (auto &[kv_table_name, entries] : flush_task)
     {
-        auto &table_name = entries.front()->data_sync_task_->table_name_;
+        if (entries.empty()) {
+            DLOG(INFO) << "PutAll: skip empty entries for table " << kv_table_name;
+            continue;
+        }
+        auto &table_name = entries.front()->data_sync_task_->table_name_;
🧹 Nitpick comments (9)
data_store_service_client_closure.cpp (1)

177-181: Avoid hard abort on decode error; propagate an error instead.

std::abort() will crash the process. Prefer signaling failure via SetFinish(...) and returning.

-                LOG(ERROR) << "====fetch record===decode error==" << " key: "
-                           << read_closure->Key()
-                           << " status: " << (int) fetch_cc->rec_status_;
-                std::abort();
+                LOG(ERROR) << "decode error for key="
+                           << std::string(read_closure->Key());
+                fetch_cc->SetFinish(
+                    static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+                return;
data_store_service_client_closure.h (3)

470-481: Potential dangling key string_view in archive read aggregator.

key_str_ stores a std::string_view to ReadClosure::Key(). After the closure is freed, the view can dangle. Store an owned copy.

-    std::string_view key_str_;
+    std::string key_str_;
@@
-    void AddResult(uint32_t partition_id,
+    void AddResult(uint32_t partition_id,
                    const std::string_view key,
                    std::string &&value,
                    uint64_t ts,
                    uint64_t ttl)
     {
         partition_id_ = partition_id;
-        key_str_ = key;
+        key_str_.assign(key.data(), key.size());
         value_str_ = std::move(value);
         ts_ = ts;
         ttl_ = ttl;
     }

Also applies to: 503-508


208-209: Unused constant.

SyncPutAllData::max_flying_write_count isn’t referenced. Remove or wire it into the gating logic to avoid confusion.


58-62: Duplicate DataStoreCallback typedef across headers.

DataStoreServiceClient.h already declares this alias. Prefer a single source (include that header) to avoid drift.

Also applies to: 34-36

data_store_service_client.cpp (5)

206-211: Remove unused partition_record_cnt bookkeeping.

partition_record_cnt is populated but never used; drop it to avoid confusion.

Apply:

-        std::unordered_map<uint32_t, size_t> partition_record_cnt;
@@
-                    partition_record_cnt.try_emplace(kv_partition_id, 0);
-                    partition_record_cnt[kv_partition_id]++;
+                    /* no-op */
@@
-                partition_record_cnt.try_emplace(parition_id, 0);
-                partition_record_cnt[parition_id] += batch.size();
+                /* no-op */

Also applies to: 236-238, 249-251


1141-1171: Doc says little‑endian, code writes host‑endian.

If portability matters, explicitly encode LE via helpers; otherwise update comment.

Apply one:

  • Code: use explicit little-endian writes.
  • Or Comment: s/Uses little-endian encoding/Uses host-endian encoding/.

1997-2044: Inconsistent size header width for archive value.

You write sizeof(uint64_t) for sizes while other serializers use sizeof(size_t). Align to one to avoid decode mismatches on non‑LP64 targets.

Apply:

-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&unpack_info_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&unpack_info_size),
+            sizeof(unpack_info_size)));
@@
-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&encoded_blob_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&encoded_blob_size),
+            sizeof(encoded_blob_size)));

Or add static_assert(sizeof(size_t) == 8) with a clear comment.


2321-2371: Minor: shadowing flying_cnt within loop hampers readability.

The inner size_t flying_cnt shadows the outer variable. Rename the inner to curr_flying or similar.

-                size_t flying_cnt = callback_data->AddFlyingReadCount();
+                size_t curr_flying = callback_data->AddFlyingReadCount();
@@
-                if (flying_cnt >= MAX_FLYING_READ_COUNT)
+                if (curr_flying >= MAX_FLYING_READ_COUNT)

157-161: Avoid assert(false) in production paths.

ScheduleTimerTasks crashes the process if called. Prefer returning an error or a TODO log without aborting.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 383e392 and 5c44575.

📒 Files selected for processing (3)
  • data_store_service_client.cpp (42 hunks)
  • data_store_service_client_closure.cpp (4 hunks)
  • data_store_service_client_closure.h (13 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.299Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.299Z
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.299Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client_closure.cpp
  • data_store_service_client_closure.h
  • data_store_service_client.cpp
🧬 Code graph analysis (2)
data_store_service_client_closure.h (3)
data_store_service_client.h (2)
  • EloqDS (40-702)
  • DataStoreServiceClient (62-616)
eloq_data_store_service/object_pool.h (2)
  • EloqDS (36-186)
  • Poolable (39-185)
data_store_service_client_closure.cpp (11)
  • lk (1436-1436)
  • GetNextBatch (1434-1444)
  • GetNextBatch (1434-1434)
  • SyncCallback (44-54)
  • SyncCallback (44-47)
  • SyncBatchReadForArchiveCallback (56-108)
  • SyncBatchReadForArchiveCallback (56-59)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
data_store_service_client.cpp (3)
data_store_service_client.h (1)
  • ScheduleTimerTasks (113-134)
eloq_data_store_service/data_store_service.cpp (11)
  • BatchWriteRecords (768-814)
  • BatchWriteRecords (768-772)
  • BatchWriteRecords (1081-1143)
  • BatchWriteRecords (1081-1093)
  • lk (81-81)
  • lk (95-95)
  • lk (106-106)
  • lk (124-124)
  • lk (152-152)
  • lk (158-158)
  • lk (164-164)
data_store_service_client_closure.cpp (5)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
  • lk (1436-1436)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
🔇 Additional comments (10)
data_store_service_client_closure.cpp (2)

400-407: LGTM: callback just forwards to coordinator.

Signature and delegation align with SyncConcurrentRequest.


1434-1445: LGTM: thread-safe dequeue.

Move/pop under bthread::Mutex is correct.

data_store_service_client_closure.h (1)

266-306: Resolved — concurrency gating is correct; no change needed.

Increments of unfinished_request_cnt_ happen under mux_ (data_store_service_client.cpp:2167–2174, 2276–2278), the wait uses while (unfinished_request_cnt_ >= max_flying_write_count) (2168–2171), all_request_started_ is set under mux_ before waiting for completion (2283–2285), and Finish() decrements + notifies appropriately (data_store_service_client_closure.h:292–296).

data_store_service_client.cpp (7)

312-339: Confirm intentional unbounded partition concurrency.

You launch the first batch for every partition without a global cap. If the number of partitions is large, this can overwhelm the system. If that’s intentional per design (“partition-level” concurrency), acknowledge and consider a global cap in SyncPutAllData later.


341-349: Wait loop is fine.

Simple completed_partitions_ vs total_partitions_ gating looks correct.


2165-2174: Good: pre‑increment under lock before mid‑batch dispatch.

This correctly gates concurrent requests.


2283-2289: Set all_request_started_ only after all pre‑increments are done.

Your fix above ensures this remains correct. Good.


4192-4343: Good: batches now carry owned size buffer; avoids dangling string_view.

Moving record_tmp_mem_area into PartitionBatchRequest fixes lifetime issues.


4345-4442: Same positive fix for range partitions.

Ownership model mirrors the hash‑partition case correctly.


263-285: PartitionFlushState objects are returned to the pool — resolved.

SyncPutAllData's cleanup iterates partition_states_, calls partition_state->Clear(); partition_state->Free(); and then partition_states_.clear(), so the PartitionFlushState objects pushed into sync_putall->partition_states_ are returned to the pool.

Comment on lines +418 to +444
// Check if the batch failed
if (result.error_code() != remote::DataStoreError::NO_ERROR)
{
partition_state->MarkFailed(result);
// Notify the global coordinator that this partition failed
global_coordinator->OnPartitionCompleted();
return;
}

// Try to get the next batch for this partition
PartitionBatchRequest next_batch;
if (partition_state->GetNextBatch(next_batch))
{
// Send the next batch
client.BatchWriteRecords(callback_data->table_name,
partition_state->partition_id,
std::move(next_batch.key_parts),
std::move(next_batch.record_parts),
std::move(next_batch.records_ts),
std::move(next_batch.records_ttl),
std::move(next_batch.op_types),
true, // skip_wal
callback_data,
PartitionBatchCallback,
next_batch.parts_cnt_per_key,
next_batch.parts_cnt_per_record);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

❓ Verification inconclusive

Fix dangling string_view: keep batch backing alive across async RPC.

record_parts are string_views that may reference PartitionBatchRequest::record_tmp_mem_area. next_batch is a local and gets destroyed when this function returns, risking UAF in the RPC closure before it builds the request.

Hold the current batch in PartitionCallbackData until the RPC completes, and only then release it at the start of the next callback (or on failure).

Apply:

@@
-    // Check if the batch failed
+    // Release previous batch's backing storage (its RPC just completed).
+    callback_data->inflight_batch.reset();
+
+    // Check if the batch failed
     if (result.error_code() != remote::DataStoreError::NO_ERROR)
     {
         partition_state->MarkFailed(result);
+        // Free any inflight batch storage
+        callback_data->inflight_batch.reset();
         // Notify the global coordinator that this partition failed
         global_coordinator->OnPartitionCompleted();
         return;
     }
@@
-    PartitionBatchRequest next_batch;
-    if (partition_state->GetNextBatch(next_batch))
+    PartitionBatchRequest next_batch;
+    if (partition_state->GetNextBatch(next_batch))
     {
-        // Send the next batch
-        client.BatchWriteRecords(callback_data->table_name,
-                                 partition_state->partition_id,
-                                 std::move(next_batch.key_parts),
-                                 std::move(next_batch.record_parts),
-                                 std::move(next_batch.records_ts),
-                                 std::move(next_batch.records_ttl),
-                                 std::move(next_batch.op_types),
-                                 true,  // skip_wal
-                                 callback_data,
-                                 PartitionBatchCallback,
-                                 next_batch.parts_cnt_per_key,
-                                 next_batch.parts_cnt_per_record);
+        // Keep backing storage alive for the duration of the async RPC.
+        callback_data->inflight_batch =
+            std::make_unique<PartitionBatchRequest>(std::move(next_batch));
+
+        // Send the next batch (views moved; backing stays in inflight_batch).
+        client.BatchWriteRecords(callback_data->table_name,
+                                 partition_state->partition_id,
+                                 std::move(callback_data->inflight_batch->key_parts),
+                                 std::move(callback_data->inflight_batch->record_parts),
+                                 std::move(callback_data->inflight_batch->records_ts),
+                                 std::move(callback_data->inflight_batch->records_ttl),
+                                 std::move(callback_data->inflight_batch->op_types),
+                                 true,  // skip_wal
+                                 callback_data,
+                                 PartitionBatchCallback,
+                                 callback_data->inflight_batch->parts_cnt_per_key,
+                                 callback_data->inflight_batch->parts_cnt_per_record);
     }
     else
     {
         // Notify the global coordinator that this partition completed
         global_coordinator->OnPartitionCompleted();
     }

Also applies to: 445-449, 414-417, 421-424


Fix dangling string_view: keep batch backing alive across async RPC.

record_parts are string_view referencing PartitionBatchRequest::record_tmp_mem_area; next_batch is local and gets destroyed on return, causing a UAF in the async RPC closure—move the batch into callback_data->inflight_batch and reset it on RPC completion or on failure.

File: data_store_service_client_closure.cpp
Lines: 418-444

@@
-    // Check if the batch failed
+    // Release previous batch's backing storage (its RPC just completed).
+    callback_data->inflight_batch.reset();
+
+    // Check if the batch failed
     if (result.error_code() != remote::DataStoreError::NO_ERROR)
     {
         partition_state->MarkFailed(result);
+        // Free any inflight batch storage
+        callback_data->inflight_batch.reset();
         // Notify the global coordinator that this partition failed
         global_coordinator->OnPartitionCompleted();
         return;
     }
@@
-    PartitionBatchRequest next_batch;
-    if (partition_state->GetNextBatch(next_batch))
+    PartitionBatchRequest next_batch;
+    if (partition_state->GetNextBatch(next_batch))
     {
-        // Send the next batch
-        client.BatchWriteRecords(callback_data->table_name,
-                                 partition_state->partition_id,
-                                 std::move(next_batch.key_parts),
-                                 std::move(next_batch.record_parts),
-                                 std::move(next_batch.records_ts),
-                                 std::move(next_batch.records_ttl),
-                                 std::move(next_batch.op_types),
-                                 true,  // skip_wal
-                                 callback_data,
-                                 PartitionBatchCallback,
-                                 next_batch.parts_cnt_per_key,
-                                 next_batch.parts_cnt_per_record);
+        // Keep backing storage alive for the duration of the async RPC.
+        callback_data->inflight_batch =
+            std::make_unique<PartitionBatchRequest>(std::move(next_batch));
+
+        // Send the next batch (views moved; backing stays in inflight_batch).
+        client.BatchWriteRecords(callback_data->table_name,
+                                 partition_state->partition_id,
+                                 std::move(callback_data->inflight_batch->key_parts),
+                                 std::move(callback_data->inflight_batch->record_parts),
+                                 std::move(callback_data->inflight_batch->records_ts),
+                                 std::move(callback_data->inflight_batch->records_ttl),
+                                 std::move(callback_data->inflight_batch->op_types),
+                                 true,  // skip_wal
+                                 callback_data,
+                                 PartitionBatchCallback,
+                                 callback_data->inflight_batch->parts_cnt_per_key,
+                                 callback_data->inflight_batch->parts_cnt_per_record);
     }
     else
     {
         // Notify the global coordinator that this partition completed
         global_coordinator->OnPartitionCompleted();
     }

Also applies to: 445-449, 414-417, 421-424

🤖 Prompt for AI Agents
In data_store_service_client_closure.cpp around lines 414-449, the local
PartitionBatchRequest next_batch contains record_parts that are string_views
into its record_tmp_mem_area which is destroyed when next_batch goes out of
scope, causing a UAF in the async RPC closure; fix by moving the
PartitionBatchRequest into callback_data->inflight_batch (assign with std::move)
before starting the async BatchWriteRecords so the backing memory remains alive
for the callback, update the BatchWriteRecords call to use
callback_data->inflight_batch.* members (or pass
std::move(callback_data->inflight_batch.whatever) as needed), and ensure
callback_data->inflight_batch is cleared/reset on RPC completion and also when
marking the partition failed (covering the other occurrences at 414-417,
421-424, and 445-449).

Comment on lines +345 to +371
struct PartitionCallbackData : public Poolable
{
PartitionFlushState *partition_state;
SyncPutAllData *global_coordinator;
std::string_view table_name;

PartitionCallbackData()
: partition_state(nullptr), global_coordinator(nullptr), table_name("")
{
}

void Reset(PartitionFlushState *ps,
SyncPutAllData *gc,
const std::string_view tn)
{
partition_state = ps;
global_coordinator = gc;
table_name = tn;
}

void Clear() override
{
partition_state = nullptr;
global_coordinator = nullptr;
table_name = "";
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add inflight_batch to keep backing storage alive across async calls.

Hold the current PartitionBatchRequest until the RPC completes; reset it on the next callback or on failure.

 struct PartitionCallbackData : public Poolable
 {
     PartitionFlushState *partition_state;
     SyncPutAllData *global_coordinator;
     std::string_view table_name;
+    // Backing for string_view parts of the in-flight batch.
+    std::unique_ptr<PartitionBatchRequest> inflight_batch;

@@
     void Clear() override
     {
         partition_state = nullptr;
         global_coordinator = nullptr;
         table_name = "";
+        inflight_batch.reset();
     }
 };

Pair with the cpp change in PartitionBatchCallback to reset/assign this pointer.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
struct PartitionCallbackData : public Poolable
{
PartitionFlushState *partition_state;
SyncPutAllData *global_coordinator;
std::string_view table_name;
PartitionCallbackData()
: partition_state(nullptr), global_coordinator(nullptr), table_name("")
{
}
void Reset(PartitionFlushState *ps,
SyncPutAllData *gc,
const std::string_view tn)
{
partition_state = ps;
global_coordinator = gc;
table_name = tn;
}
void Clear() override
{
partition_state = nullptr;
global_coordinator = nullptr;
table_name = "";
}
};
struct PartitionCallbackData : public Poolable
{
PartitionFlushState *partition_state;
SyncPutAllData *global_coordinator;
std::string_view table_name;
// Backing for string_view parts of the in-flight batch.
std::unique_ptr<PartitionBatchRequest> inflight_batch;
PartitionCallbackData()
: partition_state(nullptr), global_coordinator(nullptr), table_name("")
{
}
void Reset(PartitionFlushState *ps,
SyncPutAllData *gc,
const std::string_view tn)
{
partition_state = ps;
global_coordinator = gc;
table_name = tn;
}
void Clear() override
{
partition_state = nullptr;
global_coordinator = nullptr;
table_name = "";
inflight_batch.reset();
}
};
🤖 Prompt for AI Agents
In data_store_service_client_closure.h around lines 345 to 371, the
PartitionCallbackData struct needs an additional member to keep the current
PartitionBatchRequest alive across async RPC calls; add a member like
PartitionBatchRequest* inflight_batch (or appropriate smart pointer), initialize
it in the constructor (nullptr), set it in Reset(ps,gc,tn, ...) or overload
Reset to accept the inflight pointer, and clear/reset it to nullptr in Clear();
this ensures the backing storage is retained until the RPC completes and can be
reset/assigned by PartitionBatchCallback as described in the paired C++ changes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client_closure.h (1)

24-37: Add missing standard headers (, ) to fix compile errors and tighten contracts.

std::function is used (Fetch*CallbackData), and assert is called in multiple closures but headers aren’t included here. Avoid relying on transitive includes.

Apply this diff:

 #include <brpc/channel.h>
 #include <bthread/condition_variable.h>
 #include <bthread/mutex.h>

 #include <memory>
 #include <queue>
+#include <functional>
+#include <cassert>
 #include <string>
 #include <utility>
 #include <vector>
♻️ Duplicate comments (2)
data_store_service_client.cpp (1)

201-206: Blocker: entries.front() UB on empty vector.

entries can be empty; front() would crash.

 for (auto &[kv_table_name, entries] : flush_task)
 {
-    auto &table_name = entries.front()->data_sync_task_->table_name_;
+    if (entries.empty()) {
+        DLOG(INFO) << "PutAll: skip empty entries for table " << kv_table_name;
+        continue;
+    }
+    auto &table_name = entries.front()->data_sync_task_->table_name_;
data_store_service_client_closure.h (1)

363-367: inflight_batch solves prior dangling string_view risks.

Holding the batch by value in PartitionCallbackData keeps backing storage alive across async RPCs and addresses the earlier UAF risk.

🧹 Nitpick comments (14)
data_store_service_client_closure.cpp (1)

177-179: Avoid logging raw binary keys; redact or bound output.

Logging read_closure->Key() directly can dump arbitrary/binary data and PII into logs. Log length/hash/hex-truncated form instead.

data_store_service_client.cpp (5)

150-162: Don’t assert in shipped code paths.

ScheduleTimerTasks() asserts; prefer a stub that returns or a feature flag to avoid crashing if accidentally called.

 void DataStoreServiceClient::ScheduleTimerTasks()
 {
-    LOG(ERROR) << "ScheduleTimerTasks not implemented";
-    assert(false);
+    LOG(WARNING) << "ScheduleTimerTasks not implemented";
 }

2219-2225: Confirm in-place BE mutation of commit_ts_ is safe.

You mutate ckpt_rec.commit_ts_ to big‑endian after pushing it into records_ts. Ensure no later code reads commit_ts_ assuming host‑endian, and that reuse of the same FlushRecord won’t observe the mutated value.


1997-2044: Size field width mismatch vs. base serialization.

Archive encoding uses sizeof(uint64_t) for size fields; base SerializeTxRecord uses sizeof(size_t). Ensure the server’s decoder expects 8‑byte sizes for archives; otherwise align both.


3814-3819: Use size_t for encoded_blob_size pointer (minor correctness/readability).

Pointer type should match the stored element type.

-    uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
+    size_t *encoded_blob_size = &record_tmp_mem_area.back();
     record_parts.emplace_back(std::string_view(
         reinterpret_cast<const char *>(encoded_blob_size), sizeof(size_t)));

1926-1941: Docstring format mismatch with actual key layout.

Comment says log:item:{table_name}:{key}:{commit_ts}, but implementation uses backslash separators and binary ts. Update doc to avoid confusion.

data_store_service_client_closure.h (8)

155-163: Make Clear() symmetrical to Reset() and release queue capacity.

Clear() doesn’t reset failed or result. Also, popping elements doesn’t free capacity. Prefer swapping with an empty queue.

 void Clear() override
 {
     partition_id = 0;
-    while (!pending_batches.empty())
-    {
-        pending_batches.pop();
-    }
+    std::queue<PartitionBatchRequest>().swap(pending_batches);
+    failed = false;
+    result.Clear();
 }

358-385: Keep inflight batch memory tidy between uses.

Clear() should also clear inflight_batch to release any retained capacity before the object returns to the pool.

 void Clear() override
 {
     partition_state = nullptr;
     global_coordinator = nullptr;
     table_name = "";
+    inflight_batch.Clear();
 }

884-899: Duplicate cntl_.Reset() calls in Clear() methods.

FlushDataClosure::Clear, DeleteRangeClosure::Clear, and DropTableClosure::Clear call cntl_.Reset() twice; keep a single call.

-    cntl_.Reset();
-    request_.Clear();
-    response_.Clear();
-    cntl_.Reset();
+    cntl_.Reset();
+    request_.Clear();
+    response_.Clear();

Also applies to: 1096-1110, 1315-1330


1694-1697: Avoid asserts for runtime validation in request building.

These invariants can be violated by upstream input; asserting will crash debug builds and be a no-op in release. Prefer explicit checks that set an error and short‑circuit.

-        assert(record_ts_.size() * parts_cnt_per_key_ == key_parts_.size());
-        assert(record_ts_.size() * parts_cnt_per_record_ ==
-               record_parts_.size());
+        if (record_ts_.size() * parts_cnt_per_key_ != key_parts_.size() ||
+            record_ts_.size() * parts_cnt_per_record_ != record_parts_.size()) {
+            result_.set_error_code(EloqDS::remote::DataStoreError::INVALID_ARGUMENT);
+            result_.set_error_msg("Mismatched parts/records sizes");
+            (*callback_)(callback_data_, this, *ds_service_client_, result_);
+            return;
+        }

425-522: Minor: signal on every decrement could reduce tail latency.

DecreaseFlyingReadCount() only notifies when it hits zero. If producers block when the count exceeds a threshold, consider notifying on each decrement.


1010-1022: Consistent RPC timeouts.

BatchWriteRecordsClosure sets a 5s timeout; others rely on defaults. Consider consistent, configurable timeouts across all closures.

Also applies to: 1437-1449, 1871-1904


308-354: If you keep record_tmp_mem_area, document its invariants.

Document that it backs record_parts slices and must not be resized while in-flight. Consider renaming to record_tmp_storage for clarity.


308-354: Optional: provide noexcept where applicable.

Trivial ctors/dtors and accessors can be marked noexcept to aid codegen.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c44575 and 479388b.

📒 Files selected for processing (3)
  • data_store_service_client.cpp (42 hunks)
  • data_store_service_client_closure.cpp (4 hunks)
  • data_store_service_client_closure.h (13 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client_closure.cpp
  • data_store_service_client_closure.h
  • data_store_service_client.cpp
🧬 Code graph analysis (2)
data_store_service_client_closure.h (3)
data_store_service_client.h (1)
  • DataStoreServiceClient (62-616)
eloq_data_store_service/object_pool.h (1)
  • Poolable (39-185)
data_store_service_client_closure.cpp (11)
  • lk (1436-1436)
  • GetNextBatch (1434-1444)
  • GetNextBatch (1434-1434)
  • SyncCallback (44-54)
  • SyncCallback (44-47)
  • SyncBatchReadForArchiveCallback (56-108)
  • SyncBatchReadForArchiveCallback (56-59)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
data_store_service_client.cpp (3)
data_store_service_client.h (1)
  • ScheduleTimerTasks (113-134)
eloq_data_store_service/data_store_service.cpp (11)
  • BatchWriteRecords (768-814)
  • BatchWriteRecords (768-772)
  • BatchWriteRecords (1081-1143)
  • BatchWriteRecords (1081-1093)
  • lk (81-81)
  • lk (95-95)
  • lk (106-106)
  • lk (124-124)
  • lk (152-152)
  • lk (158-158)
  • lk (164-164)
data_store_service_client_closure.cpp (5)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
  • lk (1436-1436)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
🔇 Additional comments (14)
data_store_service_client_closure.cpp (3)

400-406: LGTM: minimal concurrent request callback.

Casting to SyncConcurrentRequest and delegating to Finish(result) is correct and keeps the accounting centralized.


409-450: Good per-partition chaining; backing storage survives RPC.

Using callback_data->inflight_batch as the sink for GetNextBatch ensures record_tmp_mem_area remains alive while record_parts views reside in the in-flight RPC. This respects partition-level coordination.

Please confirm BatchWriteRecords constructs its request buffers before invoking this callback (so overwriting inflight_batch at the start of the next callback can’t race with an outstanding send).


1434-1445: LGTM: safe move-out from queue with locking.

GetNextBatch moves the batch under mutex and pops it—simple and safe.

data_store_service_client.cpp (2)

68-71: LGTM: pools for new concurrency primitives.

Thread‑local pools for SyncConcurrentRequest, PartitionFlushState, and PartitionCallbackData fit the hot-path usage.


309-349: Fix leak: free PartitionCallbackData on early return; PartitionFlushState is already reclaimed by SyncPutAllData::Clear()

  • PartitionFlushState objects are already cleared/freed in SyncPutAllData::Clear(), invoked by the PoolableGuard (sync_putall_guard); manually freeing partition_states_ here is redundant/incorrect.
  • Critical: callback_data_list is not freed when PutAll returns early on partition failure — this leaks PartitionCallbackData. Free them before returning (or use RAII).

Location: data_store_service_client.cpp — partition error check (around lines 351–358).

Suggested change (free callback_data_list before the early return):

if (partition_state->IsFailed())
{
    LOG(ERROR) << ...;
    for (auto *cb : callback_data_list) { cb->Clear(); cb->Free(); }
    callback_data_list.clear();
    return false;
}

Likely an incorrect or invalid review comment.

data_store_service_client_closure.h (9)

206-245: Partition-level completion tracking matches the intended design.

OnPartitionCompleted() increments once per partition and signals when all partitions finish, aligning with the per‑partition concurrency model.

Please confirm that callers only invoke OnPartitionCompleted() from (a) terminal failure of a partition or (b) after the last batch drains, never per batch. This matches the retrieved learning for this system.


283-298: Recheck wake-up condition in SyncConcurrentRequest::Finish for producer throttling.

Notifying when unfinished_request_cnt_ == max_flying_write_count - 1 assumes the producer waits on “>= max.” If the producer waits on “== max,” it’s correct; if it waits on “>= max_flying_write_count,” also fine. Just ensure the wait predicate matches this notify threshold.

If needed, I can scan the codebase to confirm the wait predicate and propose a precise predicate/notify pair.


2497-2512: Callback renaming looks good; types line up.

SyncConcurrentRequestCallback supersedes the old naming and correctly updates to take SyncConcurrentRequest*.


400-404: Function declarations: interface surface LGTM.

Signatures and parameter contracts are consistent and readable.

Also applies to: 535-539, 552-556, 2486-2490, 2660-2664, 2733-2737, 2744-2748, 2755-2759, 2765-2769, 2776-2780, 2825-2829, 2836-2840, 2847-2851, 2890-2894


577-869: ReadClosure: lifetime of string_view parameters.

table_name_ and key_ are string_views; ensure the passed buffers outlive the async flow.

I can scan call sites to confirm they reference stable storage (e.g., table registry strings).


1761-1790: BatchWriteRecordsClosure fields and flow look solid.

Move semantics and retry handling are correct; request assembly respects parts counts.


2465-2484: FetchTable/Database callback data: pointer ownership clear and Clear/Reset symmetry good.

No issues.

Also applies to: 2570-2574


279-306: Signed counter: good call-out.

unfinished_request_cnt_ is signed; avoids underflow on decrement.


358-385: Partitioned PutAll flow aligns with retrieved learning.

Per-partition serialization with global fan-out and a single completion signal per partition is implemented correctly via PartitionBatchCallback and SyncPutAllData.

Please confirm tests cover:

  • partition failure mid-stream triggers exactly one OnPartitionCompleted();
  • last-batch success path signals completion;
  • no double-finishes on retry paths.

Also applies to: 206-245, 408-449

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
data_store_service_client.cpp (2)

1942-1981: Unaligned read via reinterpret_cast in DecodeArchiveKey; use memcpy.

Reading uint64_t from std::string with reinterpret_cast is UB on some arch. Copy to a local first.

Apply:

-    be_commit_ts =
-        *reinterpret_cast<const uint64_t *>(archive_key.data() + ts_pos);
+    std::memcpy(&be_commit_ts, archive_key.data() + ts_pos, sizeof(be_commit_ts));

2024-2043: Size-field width mismatch (uint64_t vs size_t) in EncodeArchiveValue.

You store size_t but take 8 bytes (sizeof(uint64_t)), which will corrupt on 32‑bit builds. Use sizeof(size_t) for both size fields to match the backing buffer type, or standardize both to a fixed 64‑bit field end‑to‑end.

Apply:

-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&unpack_info_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(
+            std::string_view(reinterpret_cast<const char *>(&unpack_info_size),
+                             sizeof(size_t)));
...
-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&encoded_blob_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(
+            std::string_view(reinterpret_cast<const char *>(&encoded_blob_size),
+                             sizeof(size_t)));

If you prefer fixed width, change the temp buffer to std::vector<uint64_t> and pass sizeof(uint64_t) consistently in both write and read paths.

♻️ Duplicate comments (2)
data_store_service_client.cpp (2)

2249-2280: Final-batch in-flight gating increments after dispatch; pre-increment under lock.

Increment unfinished_request_cnt_ and gate before sending the last batch to avoid overshooting the cap.

Apply:

-        if (keys.size() > 0)
-        {
-            BatchWriteRecords(...,
-                              sync_concurrent,
-                              SyncConcurrentRequestCallback,
-                              parts_cnt_per_key,
-                              parts_cnt_per_record);
+        if (keys.size() > 0)
+        {
+            // Gate and account one in-flight request
+            {
+                std::unique_lock<bthread::Mutex> lk(sync_concurrent->mux_);
+                while (sync_concurrent->unfinished_request_cnt_ >=
+                       SyncConcurrentRequest::max_flying_write_count) {
+                    sync_concurrent->cv_.wait(lk);
+                }
+                ++sync_concurrent->unfinished_request_cnt_;
+            }
+            BatchWriteRecords(...,
+                              sync_concurrent,
+                              SyncConcurrentRequestCallback,
+                              parts_cnt_per_key,
+                              parts_cnt_per_record);
             ...
-            {
-                std::unique_lock<bthread::Mutex> lk(sync_concurrent->mux_);
-                sync_concurrent->unfinished_request_cnt_++;
-            }

200-206: Guard against empty entries before using entries.front().

entries.front() is UB when entries is empty.

Apply:

-        auto &table_name = entries.front()->data_sync_task_->table_name_;
+        if (entries.empty()) {
+            DLOG(INFO) << "PutAll: skip empty entries for table " << kv_table_name;
+            continue;
+        }
+        auto &table_name = entries.front()->data_sync_task_->table_name_;
🧹 Nitpick comments (4)
data_store_service_client_closure.h (1)

392-398: Release inflight batch memory on Clear().

Call inflight_batch.Clear() to drop reserved capacity between uses.

Apply:

     void Clear() override
     {
         partition_state = nullptr;
         global_coordinator = nullptr;
         table_name = "";
+        inflight_batch.Clear();
     }
data_store_service_client.cpp (3)

3822-3833: Unaligned/aliasing read of bool in DeserializeTxRecordStr.

Avoid reinterpret_cast on potentially unaligned record.data()+offset. Use std::memcpy.

Apply:

-    is_deleted = *reinterpret_cast<const bool *>(record.data() + offset);
+    std::memcpy(&is_deleted, record.data() + offset, sizeof(is_deleted));

1888-1924: Archive key assembly: separator + BE timestamp is fine; consider escaping if keys can contain the separator.

Minor note only.


151-162: Unimplemented ScheduleTimerTasks(): assert(false).

Consider returning early with error instead of assert in production builds.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 479388b and 3be1e0b.

📒 Files selected for processing (2)
  • data_store_service_client.cpp (42 hunks)
  • data_store_service_client_closure.h (13 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • data_store_service_client_closure.h
  • data_store_service_client.cpp
🧬 Code graph analysis (2)
data_store_service_client_closure.h (3)
data_store_service_client.h (1)
  • DataStoreServiceClient (62-616)
eloq_data_store_service/object_pool.h (1)
  • Poolable (39-185)
data_store_service_client_closure.cpp (10)
  • GetNextBatch (1434-1444)
  • GetNextBatch (1434-1434)
  • SyncCallback (44-54)
  • SyncCallback (44-47)
  • SyncBatchReadForArchiveCallback (56-108)
  • SyncBatchReadForArchiveCallback (56-59)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
data_store_service_client.cpp (5)
data_store_service_client.h (1)
  • ScheduleTimerTasks (113-134)
eloq_data_store_service/data_store_service.cpp (11)
  • BatchWriteRecords (768-814)
  • BatchWriteRecords (768-772)
  • BatchWriteRecords (1081-1143)
  • BatchWriteRecords (1081-1093)
  • lk (81-81)
  • lk (95-95)
  • lk (106-106)
  • lk (124-124)
  • lk (152-152)
  • lk (158-158)
  • lk (164-164)
eloq_data_store_service/eloq_store_data_store.cpp (2)
  • BatchWriteRecords (164-231)
  • BatchWriteRecords (164-164)
data_store_service_client_closure.cpp (5)
  • PartitionBatchCallback (409-450)
  • PartitionBatchCallback (409-412)
  • lk (1436-1436)
  • SyncConcurrentRequestCallback (400-407)
  • SyncConcurrentRequestCallback (400-403)
data_store_service_client_closure.h (10)
  • string_view (311-366)
  • string_view (745-748)
  • string_view (755-758)
  • string_view (780-785)
  • string_view (1257-1260)
  • string_view (1482-1485)
  • string_view (2013-2016)
  • string_view (2018-2021)
  • string_view (2116-2121)
  • string_view (2330-2333)
🔇 Additional comments (10)
data_store_service_client_closure.h (1)

267-299: Notify gating waiters only when capacity frees up.

The clause unfinished_request_cnt_ == max_flying_write_count - 1 in Finish() is fine, just ensure all dispatch sites wait while >= limit and pre-increment under the same lock (see paired cpp comment).

data_store_service_client.cpp (9)

311-349: Confirm concurrency policy: one in-flight per partition, unlimited partitions.

This sends one batch per partition concurrently (no global cap). Verify this is intentional for large partition counts (e.g., 1k+), or add an optional global limiter.

Would you like a patch that gates the number of concurrently active partitions (e.g., semaphore with cap)?


4192-4331: Batch allocator capacity logic prevents reallocation of size-backing buffer.

Good: record_tmp_mem_area.capacity() check prevents invalidating record_parts string_views. Keep it as is after the type/order fix above.


614-747: Table statistics segmentation logic is sound; per-segment RPC dispatch is correct.

The MAX_WRITE_BATCH_SIZE guard and per-segment write loop look good.


2819-2865: Remote snapshot path handles missing channel by skipping shard.

LGTM. Timeout and retry handling are appropriate.


88-96: Destructor: notifying shutdown CV is safe and idempotent.

Looks good.


3670-3754: Local vs RPC write path split is clean; asserts on part counts help catch misuse.

No issues.


49-71: Thread‑local pools new additions look correct.

Pools for new closure/callback types are fine.


98-121: Connect() retry loop is reasonable.

Fine for now.


1396-1441: UpsertRanges: flush after per‑range writes is correct; error logging is clear.

LGTM.

Comment on lines 34 to 36
#include "data_store_service_client.h"
#include "data_store_service_scanner.h"
#include "eloq_data_store_service/data_store_service.h"
#include "eloq_data_store_service/object_pool.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Break potential include cycle; rely on forward decl only.

#include "data_store_service_client.h" plus a forward declaration of DataStoreServiceClient risks circular inclusion. Drop the include here and keep only the forward decl.

Apply:

-#include "data_store_service_client.h"
+#if 0
+#include "data_store_service_client.h"  // avoid circular include; forward-declare below instead
+#endif

Also applies to: 38-42

🤖 Prompt for AI Agents
In data_store_service_client_closure.h around lines 34-36 (and similarly lines
38-42), remove the #include "data_store_service_client.h" and replace its usage
with a forward declaration of class DataStoreServiceClient; keep the other
includes intact (eloq_data_store_service/data_store_service.h and
object_pool.h). Ensure any pointer/reference members or function signatures in
this header refer to DataStoreServiceClient by forward declaration only, and
move the include of the concrete header into the corresponding .cpp file where
full definition is required.

Comment on lines +128 to +184
/**
* @brief Per-partition state management for concurrent flushing
*/
struct PartitionFlushState : public Poolable
{
int32_t partition_id;
std::queue<PartitionBatchRequest> pending_batches;
bool failed = false;
remote::CommonResult result;
mutable bthread::Mutex mux;

PartitionFlushState() : partition_id(0)
{
result.Clear();
}

void Reset(int32_t pid)
{
partition_id = pid;
while (!pending_batches.empty())
{
pending_batches.pop();
}
failed = false;
result.Clear();
}

void Clear() override
{
partition_id = 0;
while (!pending_batches.empty())
{
pending_batches.pop();
}
}
bool IsFailed() const
{
std::unique_lock<bthread::Mutex> lk(mux);
return failed;
}

void MarkFailed(const remote::CommonResult &error)
{
std::unique_lock<bthread::Mutex> lk(mux);
failed = true;
result.set_error_code(error.error_code());
result.set_error_msg(error.error_msg());
}

bool GetNextBatch(PartitionBatchRequest &batch);

void AddBatch(PartitionBatchRequest &&batch)
{
std::unique_lock<bthread::Mutex> lk(mux);
pending_batches.push(std::move(batch));
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Compile blocker: PartitionFlushState uses PartitionBatchRequest before it is defined.

std::queue<PartitionBatchRequest> requires a complete type; current order won’t compile. Move PartitionBatchRequest above PartitionFlushState (preferred), or change the queue to hold std::unique_ptr<PartitionBatchRequest>.

Preferred fix (reorder):

-// PartitionFlushState (current position)
+// ---- Move PartitionBatchRequest above PartitionFlushState ----
+struct PartitionBatchRequest
+{
+    std::vector<std::string_view> key_parts;
+    std::vector<std::string_view> record_parts;
+    std::vector<uint64_t> records_ts;
+    std::vector<uint64_t> records_ttl;
+    std::vector<size_t> record_tmp_mem_area;
+    std::vector<WriteOpType> op_types;
+    uint16_t parts_cnt_per_key;
+    uint16_t parts_cnt_per_record;
+    PartitionBatchRequest() = default;
+    PartitionBatchRequest(std::vector<std::string_view> &&keys,
+                          std::vector<std::string_view> &&records,
+                          std::vector<uint64_t> &&ts,
+                          std::vector<uint64_t> &&ttl,
+                          std::vector<size_t> &&record_tmp_mem_area,
+                          std::vector<WriteOpType> &&ops,
+                          uint16_t key_parts_count,
+                          uint16_t record_parts_count)
+        : key_parts(std::move(keys)),
+          record_parts(std::move(records)),
+          records_ts(std::move(ts)),
+          records_ttl(std::move(ttl)),
+          record_tmp_mem_area(std::move(record_tmp_mem_area)),
+          op_types(std::move(ops)),
+          parts_cnt_per_key(key_parts_count),
+          parts_cnt_per_record(record_parts_count) {}
+    void Clear() {
+        key_parts.clear(); record_parts.clear(); records_ts.clear();
+        records_ttl.clear(); record_tmp_mem_area.clear(); op_types.clear();
+        parts_cnt_per_key = 1; parts_cnt_per_record = 1;
+    }
+    void Reset(uint16_t key_parts_count, uint16_t record_parts_count, size_t record_cnt) {
+        Clear();
+        parts_cnt_per_key = key_parts_count;
+        parts_cnt_per_record = record_parts_count;
+        key_parts.reserve(key_parts_count * record_cnt);
+        record_parts.reserve(record_parts_count * record_cnt);
+        records_ts.reserve(record_cnt);
+        records_ttl.reserve(record_cnt);
+        record_tmp_mem_area.reserve(record_cnt * 2);
+        op_types.reserve(record_cnt);
+    }
+};
+
+// PartitionFlushState (move below)
 struct PartitionFlushState : public Poolable
 {
     int32_t partition_id;
     std::queue<PartitionBatchRequest> pending_batches;
     bool failed = false;
     remote::CommonResult result;
     mutable bthread::Mutex mux;
     ...
 };

Alternative (keep order):

-    std::queue<PartitionBatchRequest> pending_batches;
+    std::queue<std::unique_ptr<PartitionBatchRequest>> pending_batches;

…and adjust GetNextBatch/AddBatch signatures and call sites.

Also applies to: 311-367

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
eloq_data_store_service/eloq_store_config.cpp (1)

88-89: Default halved; add input validation and document bounds.

Changing the default to 2 is fine, but guard against 0 and reflect the minimum in the flag help.

Apply:

-DEFINE_uint32(eloq_store_file_amplify_factor,
-              2,
-              "EloqStore file amplify factor.");
+DEFINE_uint32(eloq_store_file_amplify_factor,
+              2,
+              "EloqStore file amplify factor (min 1).");

And after assigning file_amplify_factor:

@@
     eloqstore_configs_.file_amplify_factor =
         !CheckCommandLineFlagIsDefault("eloq_store_file_amplify_factor")
             ? FLAGS_eloq_store_file_amplify_factor
             : config_reader.GetInteger("store",
                                        "eloq_store_file_amplify_factor",
                                        FLAGS_eloq_store_file_amplify_factor);
+    if (eloqstore_configs_.file_amplify_factor == 0) {
+        LOG(WARNING) << "eloq_store_file_amplify_factor must be >= 1; clamping to 1";
+        eloqstore_configs_.file_amplify_factor = 1;
+    }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3be1e0b and 031ae08.

📒 Files selected for processing (3)
  • eloq_data_store_service/eloq_store_config.cpp (1 hunks)
  • eloq_data_store_service/eloq_store_data_store.cpp (1 hunks)
  • eloq_data_store_service/eloq_store_data_store.h (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.
🧬 Code graph analysis (2)
eloq_data_store_service/eloq_store_data_store.h (4)
eloq_data_store_service/eloq_store_data_store.cpp (2)
  • CreateSnapshotForBackup (498-502)
  • CreateSnapshotForBackup (498-499)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (2)
  • CreateSnapshotForBackup (626-692)
  • CreateSnapshotForBackup (626-627)
data_store_service_client.cpp (2)
  • CreateSnapshotForBackup (2777-2807)
  • CreateSnapshotForBackup (2777-2780)
eloq_data_store_service/data_store_service.cpp (4)
  • CreateSnapshotForBackup (1145-1174)
  • CreateSnapshotForBackup (1145-1149)
  • CreateSnapshotForBackup (1176-1205)
  • CreateSnapshotForBackup (1176-1182)
eloq_data_store_service/eloq_store_data_store.cpp (2)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (2)
  • CreateSnapshotForBackup (626-692)
  • CreateSnapshotForBackup (626-627)
eloq_data_store_service/data_store_service.cpp (4)
  • CreateSnapshotForBackup (1145-1174)
  • CreateSnapshotForBackup (1145-1149)
  • CreateSnapshotForBackup (1176-1205)
  • CreateSnapshotForBackup (1176-1182)

Comment on lines +498 to +503
void EloqStoreDataStore::CreateSnapshotForBackup(
CreateSnapshotForBackupRequest *req)
{
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: request never finished in CreateSnapshotForBackup.

This no‑op doesn’t call SetFinish, leaking the pooled request and blocking the caller.

Apply:

 void EloqStoreDataStore::CreateSnapshotForBackup(
     CreateSnapshotForBackupRequest *req)
 {
-    return;
+    PoolableGuard req_guard(req);
+    req->SetFinish(::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR,
+                   "CreateSnapshotForBackup is not supported by EloqStoreDataStore");
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void EloqStoreDataStore::CreateSnapshotForBackup(
CreateSnapshotForBackupRequest *req)
{
return;
}
void EloqStoreDataStore::CreateSnapshotForBackup(
CreateSnapshotForBackupRequest *req)
{
PoolableGuard req_guard(req);
req->SetFinish(::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR,
"CreateSnapshotForBackup is not supported by EloqStoreDataStore");
}
🤖 Prompt for AI Agents
In eloq_data_store_service/eloq_store_data_store.cpp around lines 498-503, the
CreateSnapshotForBackup method currently returns without finishing the pooled
request which leaks the request and blocks the caller; modify the method to call
the request completion API (e.g., req->SetFinish(...) or the appropriate
req->Finish/Complete method) with a success Status when snapshot creation
succeeds or with an error Status when it fails, ensuring the method always
finishes the request before returning so the pooled request is released.

Comment on lines +255 to +256
void CreateSnapshotForBackup(CreateSnapshotForBackupRequest *req) override;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not leave CreateSnapshotForBackup as a no‑op; callers will hang.

DataStoreService expects the DS implementation to call req->SetFinish(...). The current no‑op in the .cpp never completes the request, which can deadlock client waiters.

🤖 Prompt for AI Agents
In eloq_data_store_service/eloq_store_data_store.h around lines 255-256,
CreateSnapshotForBackup is declared but the current implementation left it as a
no-op causing callers to hang; implement the method to perform the snapshot work
(sync or async) and always complete the incoming request by invoking
req->SetFinish(...) with the appropriate result/status when done or on error; if
snapshot work is asynchronous, capture the req pointer and call
req->SetFinish(...) from the completion callback (ensuring thread-safety), and
if synchronous, call req->SetFinish(...) before returning so no caller remains
blocked.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PutAll should write data into different partitions concurrently.

2 participants