Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ void DataStoreServiceClient::ScheduleTimerTasks()
assert(false);
}

/**
* @brief Batch-writes a set of flush tasks into KV tables.
*
* Processes the provided flush tasks grouped by table and partition, serializes
* each record (object tables use raw encoded blobs; non-object tables encode
* tx-records with unpack info), and issues batched PUT/DELETE operations via
* BatchWriteRecords. Batches are emitted per KV-partition and sized according
* to SyncPutAllData::max_flying_write_count; the method blocks as necessary to
* respect the global in-flight write limit and waits for all dispatched
* requests to complete before returning.
*
* The function distinguishes hash- and range-partitioned tables, computes
* per-partition batches, and updates per-record timestamps/TTLs and operation
* types. Partial batches are flushed at partition boundaries. On any remote or
* batch-level error the function logs the failure and returns false.
*
* @param flush_task Mapping from KV table name to a vector of flush task
* entries containing the records to write. Each entry's
* data_sync_vec_ provides the sequence of records for that
* flush task.
* @return true if all batches completed successfully; false if any batch
* reported an error.
*/
bool DataStoreServiceClient::PutAll(
std::unordered_map<std::string_view,
std::vector<std::unique_ptr<txservice::FlushTaskEntry>>>
Expand Down Expand Up @@ -1868,6 +1891,26 @@ void DataStoreServiceClient::DecodeArchiveValue(
value_offset = pos;
}

/**
* @brief Writes multiple MVCC archive records to the MVCC archive KV table in partitioned batches.
*
* Groups archive entries from the provided flush tasks by archive partition, serializes keys
* and values into batch write requests, and dispatches those requests (possibly concurrently)
* to the KV layer. Batches are split to respect MAX_WRITE_BATCH_SIZE and an internal limit on
* in-flight write requests; the method waits for all dispatched batches for each partition to
* complete before returning.
*
* Side effects:
* - Commits serialized archive records to kv_mvcc_archive_name with a default TTL of 1 day.
* - Converts per-record commit timestamps to big-endian form as part of key encoding (the
* in-memory commit_ts field of those records is mutated during processing).
*
* @param flush_task Map from KV table name to a vector of FlushTaskEntry pointers whose
* archive vectors contain the FlushRecord entries to write. Only entries
* with non-empty archive vectors are processed.
* @return true if all batches for all partitions completed successfully; false if any batch
* failed (an error will be logged).
*/
bool DataStoreServiceClient::PutArchivesAll(
std::unordered_map<std::string_view,
std::vector<std::unique_ptr<txservice::FlushTaskEntry>>>
Expand Down
93 changes: 91 additions & 2 deletions data_store_service_client_closure.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/**
* Copyright (C) 2025 EloqData Inc.
*
Expand Down Expand Up @@ -35,7 +34,97 @@
#include "data_store_service_scanner.h"
#include "eloq_data_store_service/object_pool.h"

namespace EloqDS
/**
* Callback type invoked on completion of a datastore operation.
*
* Parameters:
* - data: user-provided context pointer passed through the async call.
* - closure: protobuf closure associated with the RPC (may be nullptr for local paths).
* - client: reference to the DataStoreServiceClient that executed the operation.
* - result: operation result detail (error code/message and any operation-specific fields).
*/

/**
* Synchronization helper used to wait for an asynchronous datastore operation to complete.
*
* Provides a mutex/condition variable pair and a CommonResult to store the outcome.
* Typical usage: Reset() before issuing the async operation, Notify() from the async
* completion callback, and Wait() from the waiting thread. HasError() reports whether
* the stored result represents an error other than NO_ERROR or KEY_NOT_FOUND.
*/

/**
* Aggregation and flow-control helper for coordinating many concurrent put-all writes.
*
* - unfinished_request_cnt_: signed count of outstanding write requests (must be signed).
* - all_request_started_: set to true once all requests have been launched.
* - max_flying_write_count: upper bound on concurrent in-flight writes (32).
*
* Finish(res) will merge the first non-NO_ERROR result into `result_`, decrement the
* unfinished request count, and notify a waiter when either:
* - all requests have been started and the unfinished count reaches zero, or
* - the unfinished count falls to (max_flying_write_count - 1), enabling flow control
* to allow launching further requests while keeping in-flight writes bounded.
*/

/**
* Generic synchronous callback adapter invoked by closures to signal completion.
*
* Parameters:
* - data: user-provided context pointer passed through the async call.
* - closure: protobuf closure associated with the RPC (may be nullptr for local paths).
* - client: reference to the DataStoreServiceClient that executed the operation.
* - result: operation result detail (error code/message and any operation-specific fields).
*/

/**
* Shared helper used when reading archived records concurrently.
*
* Holds references to external synchronization primitives and counters:
* - mtx_, cv_: external mutex and condition variable used to guard flying_read_cnt_.
* - flying_read_cnt_: reference to the shared in-flight read counter.
* - error_code_: reference to an integer used to capture the first observed error.
*
* Also stores the most recent read result (partition_id_, key_str_, value_str_, ts_, ttl_).
* Thread-safe: methods that mutate or read shared resources acquire the provided mutex.
*/

/**
* Callback invoked for batch archive reads to aggregate or forward results.
*
* Parameters:
* - data: user-provided context pointer passed through the async call.
* - closure: protobuf closure associated with the RPC (may be nullptr for local paths).
* - client: reference to the DataStoreServiceClient that executed the operation.
* - result: operation result detail (error code/message and any operation-specific fields).
*/

/**
* Callback invoked to load a range slice (archive or otherwise).
*
* Parameters:
* - data: user-provided context pointer passed through the async call.
* - closure: protobuf closure associated with the RPC (may be nullptr for local paths).
* - client: reference to the DataStoreServiceClient that executed the operation.
* - result: operation result detail (error code/message and any operation-specific fields).
*/

/**
* Closure implementing a datastore Read operation supporting both local and remote paths.
*
* Use Reset(...) to configure a read (table, partition, key, client, and callback), then:
* - PrepareRequest(is_local): prepare an RPC request if remote, or clear local result for local reads.
* - Run(): executed when an RPC completes (or when local processing is finished). Run()
* handles RPC failures with retry logic, translates NOT_OWNER into sharding handling
* and potential retry, and finally invokes the user callback with the CommonResult.
*
* Accessors provide access to the brpc::Controller, request/response objects, table/partition/key,
* and local-result fields (value, ts, ttl, result). Value accessors return either the local
* in-memory values or the response's values depending on the request mode.
*
* Note: retry behavior is governed by the associated DataStoreServiceClient retry_limit_.
*/
namespace EloqDS
{
typedef void (*DataStoreCallback)(void *data,
::google::protobuf::Closure *closure,
Expand Down