Skip to content

Move the CreateTxKey and CreateTxRecord function into CatalogFactory to support multi table engines #85

Merged
lzxddz merged 4 commits intomainfrom
create_tx_key
Sep 24, 2025
Merged

Move the CreateTxKey and CreateTxRecord function into CatalogFactory to support multi table engines #85
lzxddz merged 4 commits intomainfrom
create_tx_key

Conversation

@lzxddz
Copy link
Collaborator

@lzxddz lzxddz commented Sep 17, 2025

Related PR:
eloqdata/tx-log-protos#8
eloqdata/log_service#24
https://github.com/eloqdata/eloq_log_service/pull/19
eloqdata/tx_service#137
#85

eloqdata/eloqsql#111
eloqdata/eloqdoc#188
eloqdata/eloqkv#172

Summary by CodeRabbit

  • New Features

    • Catalog-aware operations across client and scanners for consistent key/record creation and serialization.
    • New API to retrieve the appropriate per-engine catalog factory.
  • Refactor

    • Public APIs updated to accept catalog factory parameters and revised serialization signatures.
    • Scanner and tuple initialization now use per-tuple catalog-created key/record instances.
    • Removed legacy archive-key decoding and unified infinity-key handling.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

Walkthrough

Replaces direct TxKey/TxRecord factory usage with per-engine CatalogFactory across client, closures, scanners, and tuples. Updates EncodeRangeKey and SerializeTxRecord signatures to accept/catalog-aware outputs, removes DecodeArchiveKey, and changes ScanHeapTuple and scanner constructors to accept factory-created keys/records.

Changes

Cohort / File(s) Summary
Client header: CatalogFactory integration
data_store_service_client.h
Adds constructor taking an array of 3 CatalogFactory*, introduces internal range/hash factories and catalog_factory_array_, adds GetCatalogFactory(TableEngine), changes EncodeRangeKey to take a CatalogFactory*, and removes DecodeArchiveKey.
Client impl: encoding & serialization updates
data_store_service_client.cpp
Propagates CatalogFactory to encoding/serialization paths, switches key/record creation to catalog_factory->CreateTxKey/CreateTxRecord, centralizes NegInf handling via catalog_factory->PackedNegativeInfinity()/NegativeInfKey, updates EncodeRangeKey and two SerializeTxRecord overloads to new signatures (per-record size outputs), removes DecodeArchiveKey, and adds runtime null-asserts for catalog_factory.
Closure callbacks: per-engine factory usage
data_store_service_client_closure.cpp
Callbacks now call client.GetCatalogFactory(...) and use returned catalog_factory for CreateTxKey/CreateTxRecord/NegativeInfKey instead of static factories; adds null checks.
Scanner: constructor and tuple building
data_store_service_scanner.h, data_store_service_scanner.cpp
DataStoreServiceHashPartitionScanner constructor now accepts/stores const CatalogFactory*; scanner creates per-tuple TxKey/TxRecord from the factory; start-key infinity handled via KeyType checks.
Scan tuple construction update
kv_store.h
ScanHeapTuple<txservice::TxKey, txservice::TxRecord> constructor changed to accept std::unique_ptr key and record (created by CatalogFactory) and initialize state; removed default in-constructor factory construction.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client as DataStoreServiceClient
  participant Catalog as CatalogFactory
  participant Serializer as SerializeTxRecord
  participant Batch as WriteBatch

  rect rgba(229,240,255,0.5)
    note over Client: Put/Write flow now catalog-aware
    Client->>Catalog: GetCatalogFactory(table_engine)
    Client->>Catalog: CreateTxKey(...) / CreateTxRecord(...)
    Client->>Serializer: SerializeTxRecord(rec, &unpack_info_size, &encoded_blob_size, parts, &wb_size)
    Serializer-->>Client: parts + sizes
    Client->>Batch: Append(parts)
  end
Loading
sequenceDiagram
  autonumber
  participant Scanner as HashPartitionScanner
  participant Catalog as CatalogFactory
  participant Tuple as ScanHeapTuple
  participant Store as DataStore

  rect rgba(235,255,235,0.5)
    note over Scanner: Scan initialization builds tuples via factory
    Scanner->>Catalog: CreateTxKey(...) / CreateTxRecord(...)
    Scanner->>Tuple: Construct(shard_id, key, record)
    Scanner->>Store: FetchNext(batch)
    Store-->>Scanner: KV pairs -> fill Tuple
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

Suggested reviewers

  • liunyl

Poem

A rabbit found a factory gate,
Engines hum and keys create.
Records tucked in neat byte stacks,
Hopping through ranges, leaving tracks.
Cheerful hops — catalog's update! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 31.82% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title accurately and concisely describes the primary change in the PR: moving CreateTxKey and CreateTxRecord into CatalogFactory to enable support for multiple table engines, which matches the provided diffs and PR objectives that propagate catalog_factory usage and update related APIs and constructors.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch create_tx_key

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

3421-3442: Safety: replace remaining sizeof(size_t) uses in on‑wire serialization with fixed-width uint64_t

Remaining on‑wire uses of sizeof(size_t) were found — update writers and readers to a fixed 64‑bit size and adjust casts/consumers.

  • store_util.h: lines ~122–126 (vec size/read uses sizeof(size_t))
  • store_util.cpp: lines ~82 and ~105–109 (vector length read/write)
  • data_store_service_client.cpp: lines ~3432 and ~3438 (unpack_info_size / encoded_blob_size written with sizeof(size_t))
  • bigtable_handler.cpp: lines ~936 and ~952 (appending unpack_info_size / encoded_blob_len with sizeof(size_t))

Action: replace sizeof(size_t) with sizeof(uint64_t), change related reinterpret_casts to uint64_t, and update all readers to consume uint64_t; add wire‑compatibility tests.

🧹 Nitpick comments (8)
data_store_service_client.h (1)

628-633: Centralize engine count and mapping

Hard-coded size 5 will drift. Define a single source of truth (constexpr size or an Engine→index helper) shared with TxService.

data_store_service_client_closure.cpp (2)

1046-1056: Prefer static_cast over reinterpret_cast for downcast

record is a TxRecord that is actually a TxObject; reinterpret_cast is unnecessary and unsafe if hierarchy changes.

Apply:

-        txservice::TxObject *tx_object =
-            reinterpret_cast<txservice::TxObject *>(record.get());
+        txservice::TxObject *tx_object =
+            static_cast<txservice::TxObject *>(record.get());

730-733: Typos in log messages (“retring”)

Fix spelling to “retrying” to aid log searchability.

-            LOG(ERROR) << "Fetch range slices failed: Partial result, keep "
-                          "retring";
+            LOG(ERROR) << "Fetch range slices failed: Partial result, keep "
+                          "retrying";

Same change for the later occurrence.

Also applies to: 801-804

data_store_service_scanner.cpp (1)

209-226: Ensure catalog_factory_ is not null

Add a runtime guard to fail fast if wiring is wrong.

     : DataStoreServiceScanner(client,
                               kv_info->GetKvTableName(table_name),
                               inclusive,  // inclusive_start
                               false,      // inclusive_end
                               "",         // end_key
                               ScanForward,
                               batch_size),
       catalog_factory_(catalog_factory),
@@
 {
     assert(client_ != nullptr);
+    assert(catalog_factory_ != nullptr);
data_store_service_client.cpp (4)

144-144: Reset per‑table scratch state to avoid cross‑table coupling.

record_tmp_mem_area_idx and non_object_table_rec_cnt accumulate across tables; record_tmp_mem_area is resized per table based on a cumulative counter. This is fragile and wastes memory.

Minimal containment:

 for (auto &[kv_table_name, entries] : flush_task)
 {
+    // reset per-table scratch state
+    non_object_table_rec_cnt = 0;
+    record_tmp_mem_area_idx = 0;
+    record_tmp_mem_area.clear();

Optionally move size_t non_object_table_rec_cnt inside the loop for clearer ownership.

Also applies to: 234-256, 295-299


297-299: Nit: typo in comment (“temporay”).

Change to “temporary”.

-        // the temporay store of unpack_info_size and encoded_blob_size
+        // the temporary store of unpack_info_size and encoded_blob_size

1792-1840: Remove duplication: reuse SerializeTxRecord in EncodeArchiveValue.

EncodeArchiveValue reimplements the same 5‑part layout. Prefer delegating to SerializeTxRecord(is_deleted, value, ...) to avoid divergence.

-    if (is_deleted) {
-      ... // emit deleted marker + 4 empties
-    } else {
-      ... // emit not_deleted + sizes + data
-    }
+    SerializeTxRecord(is_deleted,
+                      value,
+                      unpack_info_size,
+                      encoded_blob_size,
+                      record_parts,
+                      write_batch_size);

3389-3418: Minor: streamline bool emission.

You can drop the temporary not_deleted and emit directly from is_deleted. Not critical.

-  static const bool not_deleted = false;
-  ...
-  record_parts.emplace_back(std::string_view(
-      reinterpret_cast<const char *>(&not_deleted), sizeof(bool)));
+  const bool not_deleted = false;
+  record_parts.emplace_back(reinterpret_cast<const char *>(&not_deleted),
+                            sizeof(bool));
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6572fb1 and 9bfc88d.

📒 Files selected for processing (6)
  • data_store_service_client.cpp (23 hunks)
  • data_store_service_client.h (6 hunks)
  • data_store_service_client_closure.cpp (6 hunks)
  • data_store_service_scanner.cpp (4 hunks)
  • data_store_service_scanner.h (2 hunks)
  • kv_store.h (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
data_store_service_scanner.h (1)
data_store_service_client.h (1)
  • CatalogFactory (611-615)
data_store_service_client.h (1)
data_store_service_client.cpp (8)
  • EncodeRangeKey (1111-1132)
  • EncodeRangeKey (1111-1114)
  • SerializeTxRecord (3375-3386)
  • SerializeTxRecord (3375-3376)
  • SerializeTxRecord (3388-3419)
  • SerializeTxRecord (3388-3394)
  • SerializeTxRecord (3421-3442)
  • SerializeTxRecord (3421-3426)
🔇 Additional comments (18)
data_store_service_client.h (1)

245-247: Passing CatalogFactory into EncodeRangeKey looks good

API aligns with factory-driven key creation.

kv_store.h (1)

65-72: Constructor change LGTM; initializes version/deleted

Good move to pass prebuilt key/rec. Consider mirroring this initialization in the primary template to avoid uninitialized fields there.

[suggest_minor_refactor]

data_store_service_client_closure.cpp (5)

598-601: Factory retrieval for engine: OK

Null-checked via assert; consistent with new client API.


633-643: Negative-infinity handling: OK

Using catalog_factory->CreateTxKey/NegativeInfKey is consistent and avoids hard-coded sentinels.

Also applies to: 667-671


826-829: Slice key deserialization via factory: OK

Correctly constructs per-key instances and pushes to slice_info_.

Also applies to: 835-848


971-974: Stats sample keys via factory: OK

Maintains engine-specific key layout.

Also applies to: 981-985


1038-1040: Factory retrieval before LoadRangeSlice: OK

Asserts will catch miswired engines.

data_store_service_scanner.cpp (2)

235-237: Start-key ∓∞ normalization: OK

Clearing start_key_ for sentinel keys matches KV scan semantics.


448-452: Scan tuple construction via factory: OK

Keeps engine-specific record type; sets packed key afterward.

data_store_service_scanner.h (2)

319-343: Member addition LGTM

Storing catalog_factory_ at scanner scope is appropriate.


286-298: Constructor API change — callers updated

All instantiations pass the new catalog_factory parameter: std::make_unique<DataStoreServiceHashPartitionScanner>(this, catalog_factory, key_schema, ...) at data_store_service_client.cpp:987 and std::make_unique<DataStoreServiceHashPartitionScanner>(this, catalog_factory, key_schema, ...) at data_store_service_client.cpp:1007; no other callers found.

data_store_service_client.cpp (7)

1112-1124: LGTM: EncodeRangeKey now uses CatalogFactory for −∞.

Change is correct and aligns multi‑engine behavior.


983-1010: LGTM: Scanner constructors updated to receive CatalogFactory.

Passing the factory into DataStoreServiceHashPartitionScanner looks right. Ensure scanner stores the pointer safely.

Also applies to: 1006-1016


1212-1214: LGTM: Replace −∞ slice key via factory.

Using PackedNegativeInfinity()->GetShallowCopy() avoids engine‑specific branching.


2135-2145: LGTM: CopyBaseToArchive creates keys/records via CatalogFactory.

Factory usage looks consistent with the PR goal.


221-228: Accounting: update write_batch_size for all serialized parts.

Double‑check that SerializeTxRecord(...) is the only place adding size for its 5 parts. With the fixes above, write_batch_size math will remain consistent.


3025-3027: LGTM: InitTableRanges builds key with factory −∞.

Aligned with new API.


895-902: LGTM: Broad factory adoption (CreateTxKey/Record, PackedNegativeInfinity).

These call‑site updates look correct and fulfill the PR objective.

Also applies to: 920-927, 1189-1191, 1303-1305, 2071-2073, 2274-2276, 2301-2302, 3010-3015

Comment on lines 389 to 405
* @param unpack_info_size temporary storage of unpack_info_size
* @param encoded_blob_size temporary storage of encoded_blob_size
* @param record_parts output record parts
* @param write_batch_size output write batch size
*/
static void SerializeTxRecord(bool is_deleted,
const txservice::TxRecord *rec,
std::vector<uint64_t> &record_tmp_mem_area,
uint64_t &unpack_info_size,
uint64_t &encoded_blob_size,
std::vector<std::string_view> &record_parts,
size_t &write_batch_size);

static void SerializeTxRecord(const txservice::TxRecord *rec,
std::vector<uint64_t> &record_tmp_mem_area,
uint64_t &unpack_info_size,
uint64_t &encoded_blob_size,
std::vector<std::string_view> &record_parts,
size_t &write_batch_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Wire-format size fields: type/width mismatch with implementation

Header exposes uint64_t sizes, but implementation writes sizeof(size_t). This can corrupt cross-arch compatibility and is inconsistent.

Prefer fixed-width 64-bit on the wire. Update cpp:

-    unpack_info_size = rec->UnpackInfoSize();
-    record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(&unpack_info_size), sizeof(size_t)));
+    unpack_info_size = rec->UnpackInfoSize();
+    record_parts.emplace_back(std::string_view(
+        reinterpret_cast<const char *>(&unpack_info_size), sizeof(uint64_t)));
@@
-    encoded_blob_size = rec->EncodedBlobSize();
-    record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(&encoded_blob_size), sizeof(size_t)));
+    encoded_blob_size = rec->EncodedBlobSize();
+    record_parts.emplace_back(std::string_view(
+        reinterpret_cast<const char *>(&encoded_blob_size), sizeof(uint64_t)));

If cross-language consumers exist, consider big-endian encoding for these fields, mirroring be_commit_ts.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

898-921: Null start_key crashes DeleteOutOfRangeData. Handle nullptr.

The API doc says start_key can be nullptr to indicate -∞ but code dereferences it.

Apply:

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
-
-    if (start_key->Type() == txservice::KeyType::NegativeInf)
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (!catalog_factory) { return false; }
+
+    if (start_key == nullptr ||
+        start_key->Type() == txservice::KeyType::NegativeInf)
     {
         const txservice::TxKey *neg_key =
             catalog_factory->PackedNegativeInfinity();
         start_key_str = std::string(neg_key->Data(), neg_key->Size());
     }
♻️ Duplicate comments (5)
data_store_service_client.cpp (3)

1959-2006: Blocker: size/width mismatch (size_t vs uint64_t) and header/impl signature drift.

  • EncodeArchiveValue takes size_t& but writes 8 bytes (uint64_t). UB on 32‑bit; inconsistent on‑wire format.
  • PutArchivesAll uses std::vector<size_t> temp area; serialize uses 8‑byte widths.
  • SerializeTxRecord overloads in .cc use std::vector<size_t> and sizeof(size_t), but header declares std::vector<uint64_t>. This won’t link/compile and corrupts protocol.

Unify on uint64_t for all serialized sizes and match header.

Apply:

@@ void DataStoreServiceClient::EncodeArchiveValue(
-    size_t &unpack_info_size,
-    size_t &encoded_blob_size,
+    uint64_t &unpack_info_size,
+    uint64_t &encoded_blob_size,
@@
-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&unpack_info_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&unpack_info_size),
+            sizeof(uint64_t)));
@@
-        record_parts.emplace_back(
-            std::string_view(reinterpret_cast<const char *>(&encoded_blob_size),
-                             sizeof(uint64_t)));
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&encoded_blob_size),
+            sizeof(uint64_t)));
@@ bool DataStoreServiceClient::PutArchivesAll(...)
-        std::vector<size_t> record_tmp_mem_area;
+        std::vector<uint64_t> record_tmp_mem_area;
@@
-            size_t &unpack_info_size = record_tmp_mem_area[i * 2];
-            size_t &encode_blob_size = record_tmp_mem_area[i * 2 + 1];
+            uint64_t &unpack_info_size = record_tmp_mem_area[i * 2];
+            uint64_t &encode_blob_size = record_tmp_mem_area[i * 2 + 1];
             if (rec != nullptr)
             {
-                unpack_info_size = rec->UnpackInfoSize();
-                encode_blob_size = rec->EncodedBlobSize();
+                unpack_info_size = static_cast<uint64_t>(rec->UnpackInfoSize());
+                encode_blob_size = static_cast<uint64_t>(rec->EncodedBlobSize());
             }
@@ void DataStoreServiceClient::SerializeTxRecord(
-    bool is_deleted,
-    const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    bool is_deleted,
+    const txservice::TxRecord *rec,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)
@@
-        record_parts.emplace_back(std::string_view(
-            reinterpret_cast<const char *>(&not_deleted), sizeof(bool)));
+        record_parts.emplace_back(std::string_view(
+            reinterpret_cast<const char *>(&not_deleted), sizeof(bool)));
@@ void DataStoreServiceClient::SerializeTxRecord(
-    const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    const txservice::TxRecord *rec,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)
 {
-    record_tmp_mem_area.emplace_back(rec->UnpackInfoSize());
-    size_t *unpack_info_size = &record_tmp_mem_area.back();
-    record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(unpack_info_size), sizeof(size_t)));
-    write_batch_size += sizeof(size_t);
+    record_tmp_mem_area.emplace_back(
+        static_cast<uint64_t>(rec->UnpackInfoSize()));
+    uint64_t *unpack_info_size = &record_tmp_mem_area.back();
+    record_parts.emplace_back(std::string_view(
+        reinterpret_cast<const char *>(unpack_info_size), sizeof(uint64_t)));
+    write_batch_size += sizeof(uint64_t);
     record_parts.emplace_back(rec->UnpackInfoData(), rec->UnpackInfoSize());
     write_batch_size += rec->UnpackInfoSize();
-    record_tmp_mem_area.emplace_back(rec->EncodedBlobSize());
-    uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
-    record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(encoded_blob_size), sizeof(size_t)));
-    write_batch_size += sizeof(size_t);
+    record_tmp_mem_area.emplace_back(
+        static_cast<uint64_t>(rec->EncodedBlobSize()));
+    uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
+    record_parts.emplace_back(std::string_view(
+        reinterpret_cast<const char *>(encoded_blob_size), sizeof(uint64_t)));
+    write_batch_size += sizeof(uint64_t);

Additionally update the header declarations of EncodeArchiveValue to use uint64_t&, and ensure all callers pass uint64_t (not size_t).

Also applies to: 2097-2100, 2193-2207, 3740-3767, 3769-3791


432-437: Header alignment needed: EncodeArchiveValue should use uint64_t sizes.

Ensure the header signature matches the .cc fix to avoid ABI drift and on‑wire corruption.

Would you like me to push a header patch as well?


872-876: Replace assert with runtime guard for GetCatalogFactory across call sites.

Asserts compile out in release; null deref will crash. Handle null with an error path.

Apply (example for three sites; mirror the pattern elsewhere):

@@ void DataStoreServiceClient::FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc)
-    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
+    if (!catalog_factory) {
+        fetch_cc->SetFinish(txservice::CcErrorCode::DATA_STORE_ERR);
+        return;
+    }
@@ bool DataStoreServiceClient::LoadRangeSlice(...)
-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (!catalog_factory) {
+        return txservice::store::DataStoreHandler::DataStoreOpStatus::Error;
+    }
@@ std::unique_ptr<txservice::store::DataStoreScanner> DataStoreServiceClient::ScanForward(...)
-    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (!catalog_factory) {
+        LOG(ERROR) << "ScanForward: null CatalogFactory for engine "
+                   << static_cast<int>(table_name.Engine());
+        return nullptr;
+    }

Also applies to: 908-915, 1004-1011, 1030-1031, 1062-1070, 1263-1265, 1377-1379, 2512-2514, 3362-3367, 3377-3379

data_store_service_client.h (2)

69-77: Constructor dereferences catalog_factory before any null check.

catalog_factory and its 3 elements can be null; dereferencing in the initializer list is UB. Initialize catalog_factory_array_ in the body after validating inputs.

Apply:

-    DataStoreServiceClient(
-        txservice::CatalogFactory *catalog_factory[3],
+    DataStoreServiceClient(
+        txservice::CatalogFactory *catalog_factory[3],
         const DataStoreServiceClusterManager &cluster_manager,
         DataStoreService *data_store_service = nullptr)
-        : ds_serv_shutdown_indicator_(false),
-          catalog_factory_array_{catalog_factory[0],
-                                 catalog_factory[1],
-                                 catalog_factory[2],
-                                 &range_catalog_factory_,
-                                 &hash_catalog_factory_},
+        : ds_serv_shutdown_indicator_(false),
           cluster_manager_(cluster_manager),
           data_store_service_(data_store_service),
           flying_remote_fetch_count_(0)
     {
+        assert(catalog_factory != nullptr);
+        for (int i = 0; i < 3; ++i) { assert(catalog_factory[i] != nullptr); }
+        catalog_factory_array_[0] = catalog_factory[0];
+        catalog_factory_array_[1] = catalog_factory[1];
+        catalog_factory_array_[2] = catalog_factory[2];
+        catalog_factory_array_[3] = &range_catalog_factory_;
+        catalog_factory_array_[4] = &hash_catalog_factory_;

631-635: Fragile engine→index math can OOB; add bounds check.

Casting enum to int and subtracting 1 is brittle; validate range.

Apply:

-    {
-        return catalog_factory_array_.at(static_cast<int>(table_engine) - 1);
-    }
+    {
+        int idx = static_cast<int>(table_engine) - 1;
+        if (idx < 0 || idx >= static_cast<int>(catalog_factory_array_.size()))
+        {
+            return nullptr;
+        }
+        return catalog_factory_array_[static_cast<size_t>(idx)];
+    }
🧹 Nitpick comments (10)
data_store_service_client.h (3)

36-36: Avoid heavy headers in public header; move to .cc.

Including eloq_basic_catalog_factory.h here increases rebuild times. Forward‑declare CatalogFactory derivatives in the header and include the concrete header in .cc instead.


249-251: Public API: EncodeRangeKey now depends on a non‑null factory. Document and enforce.

Add a precondition comment and a defensive DCHECK/return empty on null to avoid crashes at call sites.


648-653: Centralize engine→factory mapping.

The TODO suggests a global mapping. Expose a single helper (e.g., txservice::GetCatalogFactoryFor(engine)) to keep Server/Handler consistent.

data_store_service_client.cpp (7)

1206-1215: Endianness claim vs implementation.

Comments say “little-endian” but code appends host-endian integers. If nodes can be big‑endian, persist explicit LE via host_to_little_endian before append.

Would you like me to patch this to use explicit LE helpers throughout key/value encoders?

Also applies to: 1228-1235


1133-1146: Precondition check for catalog_factory in EncodeRangeKey.

Add a DCHECK and treat NegativeInf only if factory present; otherwise fall back to appending provided key bytes.

Apply:

 std::string DataStoreServiceClient::EncodeRangeKey(
     const txservice::CatalogFactory *catalog_factory,
     const txservice::TableName &table_name,
     const txservice::TxKey &range_start_key)
 {
+    assert(catalog_factory != nullptr);

3362-3367: InitTableRanges: avoid assert; return error when factory is null.

Return false on null factory so caller can surface a proper error.

Apply:

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (!catalog_factory) { return false; }

Also applies to: 3377-3379


2394-2405: Safety: SetVersionedPayload path assumes non‑None engines.

The assert allows Sequences or any non‑None engine; if None slips through, it’s UB later. Consider explicit error handling/log instead of assert.


3641-3672: Validate parts_cnt_per_key/record at runtime.

The asserts can be compiled out; add CHECKs or early error if counts don’t divide sizes.


1959-2006: Naming nit: value vs rec.

EncodeArchiveValue’s “value” param is a TxRecord; consider renaming to rec for consistency with SerializeTxRecord.


2097-2105: Reserve record_tmp_mem_area capacity to avoid reallocation invalidating string_views.

You already rely on stable addresses; ensure capacity is sufficient up front.

Apply:

-        std::vector<uint64_t> record_tmp_mem_area;
-        record_tmp_mem_area.resize(archive_ptrs.size() * 2);
+        std::vector<uint64_t> record_tmp_mem_area;
+        record_tmp_mem_area.reserve(archive_ptrs.size() * 2);
+        record_tmp_mem_area.resize(archive_ptrs.size() * 2);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9bfc88d and d7c0d95.

📒 Files selected for processing (6)
  • data_store_service_client.cpp (17 hunks)
  • data_store_service_client.h (5 hunks)
  • data_store_service_client_closure.cpp (6 hunks)
  • data_store_service_scanner.cpp (4 hunks)
  • data_store_service_scanner.h (2 hunks)
  • kv_store.h (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • kv_store.h
  • data_store_service_client_closure.cpp
  • data_store_service_scanner.cpp
  • data_store_service_scanner.h
🧰 Additional context used
🧬 Code graph analysis (1)
data_store_service_client.h (1)
data_store_service_client.cpp (2)
  • EncodeRangeKey (1132-1153)
  • EncodeRangeKey (1132-1135)

fetch_cc->kv_start_key_ = EncodeRangeKey(fetch_cc->table_name_, start_key);

auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
assert(catalog_factory != nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Follow‑ups: replace all remaining assert(catalog_factory != nullptr).

Search and patch all instances across the repo.


🏁 Script executed:

#!/bin/bash
# Find suspect asserts on CatalogFactory and size_t-based serialization uses
rg -n -C1 -e 'assert\s*\(\s*catalog_factory\s*!=\s*nullptr\s*\)' --type=cpp
rg -n -C2 -e 'std::vector<\s*size_t\s*>\s*record_tmp_mem_area' --type=cpp
rg -n -C2 -e 'EncodeArchiveValue\s*\(.*size_t\s*&\s*unpack_info_size' --type=cpp
rg -n -C2 -e 'SerializeTxRecord\s*\(.*std::vector<\s*size_t' --type=cpp

Length of output: 1031


🏁 Script executed:

#!/bin/bash
# Re-search for all instances of assert(catalog_factory != nullptr) with correct regex
rg -n -C1 'assert\(catalog_factory != nullptr\)' --type=cpp

Length of output: 127


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "Searching for exact 'assert(catalog_factory != nullptr)' and common variants (fixed-string search)..."
find . -type f \( -name "*.cpp" -o -name "*.cc" -o -name "*.c" -o -name "*.h" -o -name "*.hpp" \) -print0 \
  | xargs -0 grep -n -F 'assert(catalog_factory != nullptr)' || true

find . -type f \( -name "*.cpp" -o -name "*.cc" -o -name "*.c" -o -name "*.h" -o -name "*.hpp" \) -print0 \
  | xargs -0 grep -n -F 'assert (catalog_factory != nullptr)' || true

echo "Searching for any 'catalog_factory != nullptr' usages..."
find . -type f \( -name "*.cpp" -o -name "*.cc" -o -name "*.c" -o -name "*.h" -o -name "*.hpp" \) -print0 \
  | xargs -0 grep -n -F 'catalog_factory != nullptr' || true

echo "Listing all occurrences of the identifier 'catalog_factory' for manual inspection..."
find . -type f \( -name "*.cpp" -o -name "*.cc" -o -name "*.c" -o -name "*.h" -o -name "*.hpp" \) -print0 \
  | xargs -0 grep -n --line-number 'catalog_factory' || true

Length of output: 6307


Replace all remaining assert(catalog_factory != nullptr) with explicit runtime null-checks and proper error handling.

  • Rationale: asserts may be disabled in release builds; dereferencing a null catalog_factory causes UB/crashes.
  • Representative locations to fix: data_store_service_client.cpp (lines 873, 909, 1063, 1264, 2295, 2513, 3363), data_store_service_client_closure.cpp (lines 644, 872, 1017, 1083). Search for the literal "assert(catalog_factory != nullptr)" to find all occurrences and replace with an early-return/exception/log plus safe handling before any dereference.
🤖 Prompt for AI Agents
In data_store_service_client.cpp around line 873, replace the
assert(catalog_factory != nullptr) with an explicit runtime null-check: if
catalog_factory is nullptr, log an error with contextual information and
return/propagate an appropriate error value or throw an exception consistent
with the surrounding function's error-handling convention (or otherwise perform
an early return to avoid dereferencing). Ensure you do not dereference
catalog_factory afterward and mirror the same pattern at the other listed
occurrences so behavior is consistent across the codebase.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
data_store_service_client.cpp (1)

3740-3791: Blocker: SerializeTxRecord signature mismatch with header; on-wire width bug

Definitions use std::vector<size_t>, header declares std::vector<uint64_t>. This will not compile and also writes sizeof(size_t) fields. Unify to uint64_t and write fixed 8-byte sizes.

Apply:

 void DataStoreServiceClient::SerializeTxRecord(
     bool is_deleted,
     const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)
 {
@@
-void DataStoreServiceClient::SerializeTxRecord(
+void DataStoreServiceClient::SerializeTxRecord(
     const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)
 {
-    record_tmp_mem_area.emplace_back(rec->UnpackInfoSize());
-    size_t *unpack_info_size = &record_tmp_mem_area.back();
+    record_tmp_mem_area.emplace_back(
+        static_cast<uint64_t>(rec->UnpackInfoSize()));
+    uint64_t *unpack_info_size = &record_tmp_mem_area.back();
     record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(unpack_info_size), sizeof(size_t)));
-    write_batch_size += sizeof(size_t);
+        reinterpret_cast<const char *>(unpack_info_size), sizeof(uint64_t)));
+    write_batch_size += sizeof(uint64_t);
     record_parts.emplace_back(rec->UnpackInfoData(), rec->UnpackInfoSize());
     write_batch_size += rec->UnpackInfoSize();
-    record_tmp_mem_area.emplace_back(rec->EncodedBlobSize());
-    uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
+    record_tmp_mem_area.emplace_back(
+        static_cast<uint64_t>(rec->EncodedBlobSize()));
+    uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
     record_parts.emplace_back(std::string_view(
-        reinterpret_cast<const char *>(encoded_blob_size), sizeof(size_t)));
-    write_batch_size += sizeof(size_t);
+        reinterpret_cast<const char *>(encoded_blob_size), sizeof(uint64_t)));
+    write_batch_size += sizeof(uint64_t);
     record_parts.emplace_back(rec->EncodedBlobData(), rec->EncodedBlobSize());
     write_batch_size += rec->EncodedBlobSize();
 }
🧹 Nitpick comments (5)
kv_store.h (3)

65-72: Guard against null unique_ptrs in specialized constructor

Enforce non-null key/rec to prevent UB later.

Apply:

-    explicit ScanHeapTuple(uint32_t shard_id,
-                           std::unique_ptr<txservice::TxKey> key,
-                           std::unique_ptr<txservice::TxRecord> rec)
-        : key_(std::move(key)),
-          rec_(std::move(rec)),
+    explicit ScanHeapTuple(uint32_t shard_id,
+                           std::unique_ptr<txservice::TxKey> key,
+                           std::unique_ptr<txservice::TxRecord> rec)
+        : key_(std::move(key)),
+          rec_(std::move(rec)),
           version_ts_(0),
           deleted_(false),
           sid_(shard_id)
     {
+        assert(key_ && rec_);
     }

45-50: Initialize all fields in the generic template ctor

version_ts_ and deleted_ are left uninitialized for generic KeyT/ValueT. Initialize to match the specialized behavior.

Apply:

-    ScanHeapTuple(uint32_t shard_id)
-        : key_(std::make_unique<KeyT>()),
-          rec_(std::make_unique<ValueT>()),
-          sid_(shard_id)
+    ScanHeapTuple(uint32_t shard_id)
+        : key_(std::make_unique<KeyT>()),
+          rec_(std::make_unique<ValueT>()),
+          version_ts_(0),
+          deleted_(false),
+          sid_(shard_id)
     {
     }

160-169: Missing include for std::string_view

Add <string_view> to ensure the overload is declared portably.

data_store_service_client.cpp (2)

1959-2006: Unify serialized size fields to fixed 8 bytes (uint64_t)

EncodeArchiveValue takes size_t refs but writes 8 bytes. Align types to uint64_t to avoid cross-arch corruption.

Proposed changes (header + impl):

  • In data_store_service_client.h: change EncodeArchiveValue signature to use uint64_t& for unpack_info_size and encoded_blob_size.
  • In this function, keep sizeof(uint64_t) writes (already correct).
  • Update all callers to pass uint64_t.

If you prefer a minimal change now, keep size_t parameters but cast to uint64_t locals before writing.


1132-1153: Clarify endianness in EncodeRangeValue/Key comments vs implementation

Comments claim LE/BE but code appends host-endian. Either update comments or encode explicitly (host_to_big_endian) for on-wire stability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d7c0d95 and 1eb8bb2.

📒 Files selected for processing (6)
  • data_store_service_client.cpp (17 hunks)
  • data_store_service_client.h (5 hunks)
  • data_store_service_client_closure.cpp (6 hunks)
  • data_store_service_scanner.cpp (4 hunks)
  • data_store_service_scanner.h (2 hunks)
  • kv_store.h (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • data_store_service_scanner.cpp
  • data_store_service_scanner.h
🧰 Additional context used
🧬 Code graph analysis (1)
data_store_service_client.h (1)
eloq_data_store_service/data_store_service_config.h (1)
  • DataStoreServiceClusterManager (314-510)
🔇 Additional comments (4)
data_store_service_client_closure.cpp (1)

677-688: LGTM: migration to catalog_factory for key/record and -inf handling

CreateTxKey/NegativeInfKey usage is consistent and aligns with multi‑engine support.

Please ensure all other call sites have been similarly migrated.

Also applies to: 710-715, 1025-1029, 1090-1094

data_store_service_client.h (3)

249-251: API change: EncodeRangeKey now requires CatalogFactory

Callers must provide a factory; ensure all call sites updated.


69-78: Move catalog_factory_array_ initialization into ctor body with validation

Current member initializer dereferences user-provided pointers before validation; nullptr leads to UB. Initialize in-body after checks.

Apply:

-    DataStoreServiceClient(
-        txservice::CatalogFactory *catalog_factory[3],
+    DataStoreServiceClient(
+        txservice::CatalogFactory *catalog_factory[3],
         const DataStoreServiceClusterManager &cluster_manager,
         DataStoreService *data_store_service = nullptr)
-        : ds_serv_shutdown_indicator_(false),
-          catalog_factory_array_{catalog_factory[0],
-                                 catalog_factory[1],
-                                 catalog_factory[2],
-                                 &range_catalog_factory_,
-                                 &hash_catalog_factory_},
+        : ds_serv_shutdown_indicator_(false),
           cluster_manager_(cluster_manager),
           data_store_service_(data_store_service),
           flying_remote_fetch_count_(0)
     {
+        assert(catalog_factory != nullptr);
+        for (int i = 0; i < 3; ++i) { assert(catalog_factory[i] != nullptr); }
+        catalog_factory_array_[0] = catalog_factory[0];
+        catalog_factory_array_[1] = catalog_factory[1];
+        catalog_factory_array_[2] = catalog_factory[2];
+        catalog_factory_array_[3] = &range_catalog_factory_;
+        catalog_factory_array_[4] = &hash_catalog_factory_;

631-635: Bounds-check engine→factory mapping; return nullptr on unknown engine

Casting enum and subtracting 1 risks OOB and crash for invalid/zero engines.

Apply:

-    {
-        return catalog_factory_array_.at(static_cast<int>(table_engine) - 1);
-    }
+    {
+        int idx = static_cast<int>(table_engine) - 1;
+        if (idx < 0 || idx >= static_cast<int>(catalog_factory_array_.size()))
+        {
+            return nullptr;
+        }
+        return catalog_factory_array_[static_cast<size_t>(idx)];
+    }

Comment on lines +642 to +645
auto catalog_factory =
client.GetCatalogFactory(fetch_range_cc->table_name_.Engine());
assert(catalog_factory != nullptr);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace asserts on catalog_factory with runtime checks and fail fast

Asserts vanish in release; dereferencing a null factory would crash. Handle nullptr and set error/finish appropriately.

Apply pattern (example 1: FetchTableRangesCallback):

-        auto catalog_factory =
-            client.GetCatalogFactory(fetch_range_cc->table_name_.Engine());
-        assert(catalog_factory != nullptr);
+        auto catalog_factory =
+            client.GetCatalogFactory(fetch_range_cc->table_name_.Engine());
+        if (catalog_factory == nullptr)
+        {
+            LOG(ERROR) << "No CatalogFactory for engine="
+                       << static_cast<int>(fetch_range_cc->table_name_.Engine());
+            fetch_range_cc->SetFinish(
+                static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+            return;
+        }

Example 2 (FetchRangeSlicesCallback):

-            auto catalog_factory =
-                client.GetCatalogFactory(fetch_req->table_name_.Engine());
-            assert(catalog_factory != nullptr);
+            auto catalog_factory =
+                client.GetCatalogFactory(fetch_req->table_name_.Engine());
+            if (catalog_factory == nullptr)
+            {
+                LOG(ERROR) << "No CatalogFactory for engine="
+                           << static_cast<int>(fetch_req->table_name_.Engine());
+                fetch_req->SetFinish(txservice::CcErrorCode::DATA_STORE_ERR);
+                txservice::Sharder::Instance().UnpinNodeGroupData(ng_id);
+                return;
+            }

Example 3 (FetchTableStatsCallback):

-            auto catalog_factory =
-                client.GetCatalogFactory(fetch_cc->CatalogName().Engine());
-            assert(catalog_factory != nullptr);
+            auto catalog_factory =
+                client.GetCatalogFactory(fetch_cc->CatalogName().Engine());
+            if (catalog_factory == nullptr)
+            {
+                LOG(ERROR) << "No CatalogFactory for engine="
+                           << static_cast<int>(fetch_cc->CatalogName().Engine());
+                fetch_cc->SetFinish(
+                    static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+                return;
+            }

Example 4 (LoadRangeSliceCallback):

-    auto catalog_factory = client.GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = client.GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        fill_store_slice_req->SetKvFinish(false);
+        txservice::Sharder::Instance().UnpinNodeGroupData(
+            fill_store_slice_req->NodeGroup());
+        return;
+    }

Also applies to: 870-873, 1015-1018, 1082-1084

Comment on lines +872 to 876
auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
assert(catalog_factory != nullptr);
fetch_cc->kv_start_key_ =
EncodeRangeKey(catalog_factory, fetch_cc->table_name_, start_key);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle missing CatalogFactory at runtime instead of assert

Convert asserts to defensive checks and return appropriate errors to avoid crashes in release builds.

Apply pattern (examples):

FetchRangeSlices (void):

-    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(fetch_cc->table_name_.Engine());
+        fetch_cc->SetFinish(txservice::CcErrorCode::DATA_STORE_ERR);
+        return;
+    }

DeleteOutOfRangeData (bool):

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

LoadRangeSlice (returns DataStoreOpStatus):

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return txservice::store::DataStoreHandler::DataStoreOpStatus::Error;
+    }

UpdateRangeSlices (bool):

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

FetchArchives (bool):

-    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

InitTableRanges (bool):

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

Also applies to: 908-911, 1062-1065, 1263-1265, 2512-2514, 3362-3367

🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 872-876, replace the
assert(catalog_factory != nullptr) with a defensive nullptr check: if
GetCatalogFactory(...) returns nullptr, log an error with context (table
name/engine), and return an appropriate failure from the enclosing function (for
void functions: log and return; for bool functions: log and return false; for
functions returning DataStoreOpStatus: log and return a suitable error status).
Apply the same defensive pattern to the other occurrences listed (908-911,
1062-1065, 1263-1265, 2512-2514, 3362-3367), ensuring you choose the correct
return value/type for each function signature and avoid program termination in
release builds.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
data_store_service_client.cpp (6)

908-920: Fix nullptr deref: start_key can be null (NegativeInf sentinel)

Function comment says start_key nullptr indicates -inf, but code dereferences it unconditionally.

Apply:

-    if (start_key->Type() == txservice::KeyType::NegativeInf)
+    if (start_key == nullptr || start_key->Type() == txservice::KeyType::NegativeInf)
     {
         const txservice::TxKey *neg_key =
             catalog_factory->PackedNegativeInfinity();
         start_key_str = std::string(neg_key->Data(), neg_key->Size());
     }
     else
     {
         start_key_str = std::string(start_key->Data(), start_key->Size());
     }

1959-1966: Unify serialized size fields to uint64_t (wire-format correctness)

Parameters are size_t&, but you serialize as 8 bytes; on 32‑bit this reads past the variable. Use uint64_t consistently.

Apply:

-void DataStoreServiceClient::EncodeArchiveValue(
+void DataStoreServiceClient::EncodeArchiveValue(
     bool is_deleted,
     const txservice::TxRecord *value,
-    size_t &unpack_info_size,
-    size_t &encoded_blob_size,
+    uint64_t &unpack_info_size,
+    uint64_t &encoded_blob_size,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)

Also update the header declaration accordingly (see separate note below).


2096-2100: Use uint64_t backing for serialized size placeholders

Avoid width mismatch; these buffers back string_views of 8 bytes.

Apply:

-        std::vector<size_t> record_tmp_mem_area;
+        std::vector<uint64_t> record_tmp_mem_area;

3741-3767: Signature/type mismatch with header; fix to uint64_t and 8‑byte writes (part 1)

Header declares std::vector<uint64_t>&; implementation uses size_t&. This will not compile and also risks width bugs.

Apply:

 void DataStoreServiceClient::SerializeTxRecord(
     bool is_deleted,
     const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)

3769-3791: Signature/type mismatch with header; fix to uint64_t and 8‑byte writes (part 2)

Align types and ensure 8‑byte sizes on wire.

Apply:

 void DataStoreServiceClient::SerializeTxRecord(
     const txservice::TxRecord *rec,
-    std::vector<size_t> &record_tmp_mem_area,
+    std::vector<uint64_t> &record_tmp_mem_area,
     std::vector<std::string_view> &record_parts,
     size_t &write_batch_size)
 {
   // ...
-  record_tmp_mem_area.emplace_back(rec->UnpackInfoSize());
-  size_t *unpack_info_size = &record_tmp_mem_area.back();
-  record_parts.emplace_back(std::string_view(
-      reinterpret_cast<const char *>(unpack_info_size), sizeof(size_t)));
-  write_batch_size += sizeof(size_t);
+  record_tmp_mem_area.emplace_back(static_cast<uint64_t>(rec->UnpackInfoSize()));
+  uint64_t *unpack_info_size = &record_tmp_mem_area.back();
+  record_parts.emplace_back(std::string_view(
+      reinterpret_cast<const char *>(unpack_info_size), sizeof(uint64_t)));
+  write_batch_size += sizeof(uint64_t);
   record_parts.emplace_back(rec->UnpackInfoData(), rec->UnpackInfoSize());
   write_batch_size += rec->UnpackInfoSize();
-  record_tmp_mem_area.emplace_back(rec->EncodedBlobSize());
-  uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
-  record_parts.emplace_back(std::string_view(
-      reinterpret_cast<const char *>(encoded_blob_size), sizeof(size_t)));
-  write_batch_size += sizeof(size_t);
+  record_tmp_mem_area.emplace_back(static_cast<uint64_t>(rec->EncodedBlobSize()));
+  uint64_t *encoded_blob_size = &record_tmp_mem_area.back();
+  record_parts.emplace_back(std::string_view(
+      reinterpret_cast<const char *>(encoded_blob_size), sizeof(uint64_t)));
+  write_batch_size += sizeof(uint64_t);

1004-1045: Header sync required for size-type changes

Update the corresponding declarations in data_store_service_client.h to match the uint64_t changes:

Add/modify in header:

-    static void EncodeArchiveValue(bool is_deleted,
-                                   const txservice::TxRecord *value,
-                                   size_t &unpack_info_size,
-                                   size_t &encoded_blob_size,
+    static void EncodeArchiveValue(bool is_deleted,
+                                   const txservice::TxRecord *value,
+                                   uint64_t &unpack_info_size,
+                                   uint64_t &encoded_blob_size,
                                    std::vector<std::string_view> &record_parts,
                                    size_t &write_batch_size);
@@
-    static void SerializeTxRecord(bool is_deleted,
-                                  const txservice::TxRecord *rec,
-                                  std::vector<size_t> &record_tmp_mem_area,
+    static void SerializeTxRecord(bool is_deleted,
+                                  const txservice::TxRecord *rec,
+                                  std::vector<uint64_t> &record_tmp_mem_area,
                                   std::vector<std::string_view> &record_parts,
                                   size_t &write_batch_size);
@@
-    static void SerializeTxRecord(const txservice::TxRecord *rec,
-                                  std::vector<size_t> &record_tmp_mem_area,
+    static void SerializeTxRecord(const txservice::TxRecord *rec,
+                                  std::vector<uint64_t> &record_tmp_mem_area,
                                   std::vector<std::string_view> &record_parts,
                                   size_t &write_batch_size);
🧹 Nitpick comments (2)
data_store_service_scanner.cpp (2)

205-226: Validate catalog_factory before use

Avoid null deref on catalog_factory_. Fail fast if null.

Apply:

 DataStoreServiceHashPartitionScanner(
     DataStoreServiceClient *client,
     const txservice::CatalogFactory *catalog_factory,
@@
       initialized_(false)
 {
     assert(client_ != nullptr);
+    if (catalog_factory_ == nullptr)
+    {
+        LOG(ERROR) << "CatalogFactory is null";
+        return;
+    }

448-452: Optional: avoid double-work when constructing key

You can construct key directly from bytes via factory to skip SetPackedKey.

Example:

-    ScanHeapTuple<txservice::TxKey, txservice::TxRecord> scan_tuple(
-        part_id,
-        std::make_unique<txservice::TxKey>(catalog_factory_->CreateTxKey()),
-        catalog_factory_->CreateTxRecord());
-    scan_tuple.key_->SetPackedKey(part_scan_tuple.key_.data(),
-                                  part_scan_tuple.key_.size());
+    ScanHeapTuple<txservice::TxKey, txservice::TxRecord> scan_tuple(
+        part_id,
+        std::make_unique<txservice::TxKey>(catalog_factory_->CreateTxKey(
+            part_scan_tuple.key_.data(), part_scan_tuple.key_.size())),
+        catalog_factory_->CreateTxRecord());
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1eb8bb2 and 2a0d157.

📒 Files selected for processing (6)
  • data_store_service_client.cpp (17 hunks)
  • data_store_service_client.h (5 hunks)
  • data_store_service_client_closure.cpp (6 hunks)
  • data_store_service_scanner.cpp (4 hunks)
  • data_store_service_scanner.h (2 hunks)
  • kv_store.h (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • data_store_service_scanner.h
🧰 Additional context used
🧬 Code graph analysis (1)
data_store_service_client.h (1)
data_store_service_client.cpp (2)
  • EncodeRangeKey (1132-1153)
  • EncodeRangeKey (1132-1135)
🔇 Additional comments (7)
data_store_service_client_closure.cpp (1)

642-645: Replace asserts on catalog_factory with runtime checks and fail fast

Asserts disappear in release; dereferencing a null factory would crash. Handle nullptrs with error propagation.

Apply (1) FetchTableRangesCallback:

-        auto catalog_factory =
-            client.GetCatalogFactory(fetch_range_cc->table_name_.Engine());
-        assert(catalog_factory != nullptr);
+        auto catalog_factory =
+            client.GetCatalogFactory(fetch_range_cc->table_name_.Engine());
+        if (catalog_factory == nullptr)
+        {
+            LOG(ERROR) << "No CatalogFactory for engine="
+                       << static_cast<int>(fetch_range_cc->table_name_.Engine());
+            fetch_range_cc->SetFinish(
+                static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+            return;
+        }

Apply (2) FetchRangeSlicesCallback:

-            auto catalog_factory =
-                client.GetCatalogFactory(fetch_req->table_name_.Engine());
-            assert(catalog_factory != nullptr);
+            auto catalog_factory =
+                client.GetCatalogFactory(fetch_req->table_name_.Engine());
+            if (catalog_factory == nullptr)
+            {
+                LOG(ERROR) << "No CatalogFactory for engine="
+                           << static_cast<int>(fetch_req->table_name_.Engine());
+                fetch_req->SetFinish(txservice::CcErrorCode::DATA_STORE_ERR);
+                txservice::Sharder::Instance().UnpinNodeGroupData(ng_id);
+                return;
+            }

Apply (3) FetchTableStatsCallback:

-            auto catalog_factory =
-                client.GetCatalogFactory(fetch_cc->CatalogName().Engine());
-            assert(catalog_factory != nullptr);
+            auto catalog_factory =
+                client.GetCatalogFactory(fetch_cc->CatalogName().Engine());
+            if (catalog_factory == nullptr)
+            {
+                LOG(ERROR) << "No CatalogFactory for engine="
+                           << static_cast<int>(fetch_cc->CatalogName().Engine());
+                fetch_cc->SetFinish(
+                    static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+                return;
+            }

Apply (4) LoadRangeSliceCallback:

-    auto catalog_factory = client.GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = client.GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        fill_store_slice_req->SetKvFinish(false);
+        txservice::Sharder::Instance().UnpinNodeGroupData(
+            fill_store_slice_req->NodeGroup());
+        return;
+    }

Also applies to: 870-873, 1015-1018, 1082-1084

data_store_service_client.h (1)

631-635: Harden GetCatalogFactory against OOB/unknown engines

Casting enum to int and subtracting 1 is brittle. Add bounds checks and return nullptr if out of range.

Apply:

-    {
-        return catalog_factory_array_.at(static_cast<int>(table_engine) - 1);
-    }
+    {
+        int idx = static_cast<int>(table_engine) - 1;
+        if (idx < 0 || idx >= static_cast<int>(catalog_factory_array_.size()))
+        {
+            return nullptr;
+        }
+        return catalog_factory_array_[static_cast<size_t>(idx)];
+    }
data_store_service_client.cpp (5)

872-876: Replace assert with runtime check (FetchRangeSlices)

Prevent crashes in release builds.

Apply:

-    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(fetch_cc->table_name_.Engine());
+    if (catalog_factory == nullptr)
+    {
+        fetch_cc->SetFinish(txservice::CcErrorCode::DATA_STORE_ERR);
+        return txservice::store::DataStoreHandler::DataStoreOpStatus::Error;
+    }
     fetch_cc->kv_start_key_ =
         EncodeRangeKey(catalog_factory, fetch_cc->table_name_, start_key);

1062-1064: Replace assert with runtime check (LoadRangeSlice)

Avoid null deref.

Apply:

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        return txservice::store::DataStoreHandler::DataStoreOpStatus::Error;
+    }

1263-1265: Replace assert with runtime check (UpdateRangeSlices)

Apply:

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

2294-2296: Replace assert with runtime check (CopyBaseToArchive)

Apply:

-        auto *catalog_factory = GetCatalogFactory(table_name.Engine());
-        assert(catalog_factory != nullptr);
+        auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+        if (catalog_factory == nullptr)
+        {
+            LOG(ERROR) << "No CatalogFactory for engine="
+                       << static_cast<int>(table_name.Engine());
+            return false;
+        }

3362-3367: Replace assert with runtime check (InitTableRanges)

Apply:

-    auto catalog_factory = GetCatalogFactory(table_name.Engine());
-    assert(catalog_factory != nullptr);
+    auto catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return false;
+    }

Comment on lines +1004 to 1011
auto *catalog_factory = GetCatalogFactory(table_name.Engine());
if (scan_forward)
{
auto scanner =
std::make_unique<DataStoreServiceHashPartitionScanner<true>>(
this,
catalog_factory,
key_schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard scanner construction when factory is missing

Return nullptr if no CatalogFactory is available.

Apply:

-    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+    auto *catalog_factory = GetCatalogFactory(table_name.Engine());
+    if (catalog_factory == nullptr)
+    {
+        LOG(ERROR) << "No CatalogFactory for engine="
+                   << static_cast<int>(table_name.Engine());
+        return nullptr;
+    }

Also applies to: 1028-1031

🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 1004-1011 (and similarly at
1028-1031), the code constructs a scanner unconditionally using catalog_factory;
if GetCatalogFactory returns nullptr this will lead to incorrect behavior or
crashes — check whether catalog_factory is null and if so return nullptr
immediately instead of constructing the scanner; apply the same null-check and
early-return pattern to the second construction site at 1028-1031.

Comment on lines +69 to 78
txservice::CatalogFactory *catalog_factory[3],
const DataStoreServiceClusterManager &cluster_manager,
DataStoreService *data_store_service = nullptr)
: ds_serv_shutdown_indicator_(false),
catalog_factory_array_{catalog_factory[0],
catalog_factory[1],
catalog_factory[2],
&range_catalog_factory_,
&hash_catalog_factory_},
cluster_manager_(cluster_manager),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Constructor dereferences possibly null input; initialize safely and validate

The initializer list dereferences catalog_factory[0..2] before any checks. Initialize with nulls and populate inside the body after validation.

Apply:

-        : ds_serv_shutdown_indicator_(false),
-          catalog_factory_array_{catalog_factory[0],
-                                 catalog_factory[1],
-                                 catalog_factory[2],
-                                 &range_catalog_factory_,
-                                 &hash_catalog_factory_},
+        : ds_serv_shutdown_indicator_(false),
+          catalog_factory_array_{nullptr,
+                                 nullptr,
+                                 nullptr,
+                                 &range_catalog_factory_,
+                                 &hash_catalog_factory_},
           cluster_manager_(cluster_manager),
           data_store_service_(data_store_service),
           flying_remote_fetch_count_(0)
     {

And inside the ctor body (after existing code), populate with checks:

// Populate factory array safely
if (catalog_factory == nullptr)
{
    LOG(ERROR) << "Null CatalogFactory array";
}
else
{
    for (int i = 0; i < 3; ++i)
    {
        if (catalog_factory[i] == nullptr)
        {
            LOG(ERROR) << "Null CatalogFactory at idx=" << i;
        }
        catalog_factory_array_[i] = catalog_factory[i];
    }
}
🤖 Prompt for AI Agents
In data_store_service_client.h around lines 69 to 78, the constructor
initializer list dereferences catalog_factory[0..2] unsafely; change the
initializer to set the catalog_factory_array_ entries to nullptr (or default)
instead of dereferencing the input pointer, then in the constructor body
validate catalog_factory for null, log an error if the pointer itself is null,
otherwise loop i=0..2 and check each catalog_factory[i] for null, log per-index
errors and assign catalog_factory_array_[i] = catalog_factory[i]; ensure no
dereferences occur in the initializer list and all assignments happen after the
null checks in the ctor body.

Comment on lines +65 to 74
explicit ScanHeapTuple(uint32_t shard_id,
std::unique_ptr<txservice::TxKey> key,
std::unique_ptr<txservice::TxRecord> rec)
: key_(std::move(key)),
rec_(std::move(rec)),
version_ts_(0),
deleted_(false),
sid_(shard_id)
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enforce non-null key/record and keep fields initialized

Guard against nullptrs and align initialization with generic template (which currently leaves version_ts_ and deleted_ uninitialized).

Apply:

-    explicit ScanHeapTuple(uint32_t shard_id,
-                           std::unique_ptr<txservice::TxKey> key,
-                           std::unique_ptr<txservice::TxRecord> rec)
-        : key_(std::move(key)),
-          rec_(std::move(rec)),
+    explicit ScanHeapTuple(uint32_t shard_id,
+                           std::unique_ptr<txservice::TxKey> key,
+                           std::unique_ptr<txservice::TxRecord> rec)
+        : key_(std::move(key)),
+          rec_(std::move(rec)),
           version_ts_(0),
           deleted_(false),
           sid_(shard_id)
     {
+        assert(key_ && rec_);
     }

Follow-up: Consider initializing version_ts_/deleted_ in the primary template’s ctor as well for consistency.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
explicit ScanHeapTuple(uint32_t shard_id,
std::unique_ptr<txservice::TxKey> key,
std::unique_ptr<txservice::TxRecord> rec)
: key_(std::move(key)),
rec_(std::move(rec)),
version_ts_(0),
deleted_(false),
sid_(shard_id)
{
}
explicit ScanHeapTuple(uint32_t shard_id,
std::unique_ptr<txservice::TxKey> key,
std::unique_ptr<txservice::TxRecord> rec)
: key_(std::move(key)),
rec_(std::move(rec)),
version_ts_(0),
deleted_(false),
sid_(shard_id)
{
assert(key_ && rec_);
}
🤖 Prompt for AI Agents
In kv_store.h around lines 65 to 74, the ScanHeapTuple constructor accepts
unique_ptrs for key and rec but does not guard against nullptr and currently
initializes version_ts_ and deleted_ here while the generic template leaves them
uninitialized; modify this ctor to validate key and rec (e.g., add DCHECK/ASSERT
or throw std::invalid_argument if key==nullptr or rec==nullptr) before moving
them, and explicitly initialize version_ts_ (e.g., to 0) and deleted_ (e.g., to
false) in the member initializer list; as a follow-up, apply the same explicit
initialization of version_ts_ and deleted_ in the primary template’s constructor
for consistency.

@lzxddz lzxddz merged commit b3e8d0e into main Sep 24, 2025
1 check passed
@lzxddz lzxddz deleted the create_tx_key branch September 24, 2025 03:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Change the way of using "CreateTxKey" and "CreateTxRecord" to support multi-table-engine.

2 participants