diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index e91f310..c749395 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -130,6 +130,29 @@ void DataStoreServiceClient::ScheduleTimerTasks() assert(false); } +/** + * @brief Batch-writes a set of flush tasks into KV tables. + * + * Processes the provided flush tasks grouped by table and partition, serializes + * each record (object tables use raw encoded blobs; non-object tables encode + * tx-records with unpack info), and issues batched PUT/DELETE operations via + * BatchWriteRecords. Batches are emitted per KV-partition and sized according + * to SyncPutAllData::max_flying_write_count; the method blocks as necessary to + * respect the global in-flight write limit and waits for all dispatched + * requests to complete before returning. + * + * The function distinguishes hash- and range-partitioned tables, computes + * per-partition batches, and updates per-record timestamps/TTLs and operation + * types. Partial batches are flushed at partition boundaries. On any remote or + * batch-level error the function logs the failure and returns false. + * + * @param flush_task Mapping from KV table name to a vector of flush task + * entries containing the records to write. Each entry's + * data_sync_vec_ provides the sequence of records for that + * flush task. + * @return true if all batches completed successfully; false if any batch + * reported an error. + */ bool DataStoreServiceClient::PutAll( std::unordered_map>> @@ -1868,6 +1891,26 @@ void DataStoreServiceClient::DecodeArchiveValue( value_offset = pos; } +/** + * @brief Writes multiple MVCC archive records to the MVCC archive KV table in partitioned batches. + * + * Groups archive entries from the provided flush tasks by archive partition, serializes keys + * and values into batch write requests, and dispatches those requests (possibly concurrently) + * to the KV layer. Batches are split to respect MAX_WRITE_BATCH_SIZE and an internal limit on + * in-flight write requests; the method waits for all dispatched batches for each partition to + * complete before returning. + * + * Side effects: + * - Commits serialized archive records to kv_mvcc_archive_name with a default TTL of 1 day. + * - Converts per-record commit timestamps to big-endian form as part of key encoding (the + * in-memory commit_ts field of those records is mutated during processing). + * + * @param flush_task Map from KV table name to a vector of FlushTaskEntry pointers whose + * archive vectors contain the FlushRecord entries to write. Only entries + * with non-empty archive vectors are processed. + * @return true if all batches for all partitions completed successfully; false if any batch + * failed (an error will be logged). + */ bool DataStoreServiceClient::PutArchivesAll( std::unordered_map>> diff --git a/data_store_service_client_closure.h b/data_store_service_client_closure.h index d0d8ddd..77af7b5 100644 --- a/data_store_service_client_closure.h +++ b/data_store_service_client_closure.h @@ -1,4 +1,3 @@ - /** * Copyright (C) 2025 EloqData Inc. * @@ -35,7 +34,97 @@ #include "data_store_service_scanner.h" #include "eloq_data_store_service/object_pool.h" -namespace EloqDS +/** + * Callback type invoked on completion of a datastore operation. + * + * Parameters: + * - data: user-provided context pointer passed through the async call. + * - closure: protobuf closure associated with the RPC (may be nullptr for local paths). + * - client: reference to the DataStoreServiceClient that executed the operation. + * - result: operation result detail (error code/message and any operation-specific fields). + */ + + /** + * Synchronization helper used to wait for an asynchronous datastore operation to complete. + * + * Provides a mutex/condition variable pair and a CommonResult to store the outcome. + * Typical usage: Reset() before issuing the async operation, Notify() from the async + * completion callback, and Wait() from the waiting thread. HasError() reports whether + * the stored result represents an error other than NO_ERROR or KEY_NOT_FOUND. + */ + + /** + * Aggregation and flow-control helper for coordinating many concurrent put-all writes. + * + * - unfinished_request_cnt_: signed count of outstanding write requests (must be signed). + * - all_request_started_: set to true once all requests have been launched. + * - max_flying_write_count: upper bound on concurrent in-flight writes (32). + * + * Finish(res) will merge the first non-NO_ERROR result into `result_`, decrement the + * unfinished request count, and notify a waiter when either: + * - all requests have been started and the unfinished count reaches zero, or + * - the unfinished count falls to (max_flying_write_count - 1), enabling flow control + * to allow launching further requests while keeping in-flight writes bounded. + */ + + /** + * Generic synchronous callback adapter invoked by closures to signal completion. + * + * Parameters: + * - data: user-provided context pointer passed through the async call. + * - closure: protobuf closure associated with the RPC (may be nullptr for local paths). + * - client: reference to the DataStoreServiceClient that executed the operation. + * - result: operation result detail (error code/message and any operation-specific fields). + */ + + /** + * Shared helper used when reading archived records concurrently. + * + * Holds references to external synchronization primitives and counters: + * - mtx_, cv_: external mutex and condition variable used to guard flying_read_cnt_. + * - flying_read_cnt_: reference to the shared in-flight read counter. + * - error_code_: reference to an integer used to capture the first observed error. + * + * Also stores the most recent read result (partition_id_, key_str_, value_str_, ts_, ttl_). + * Thread-safe: methods that mutate or read shared resources acquire the provided mutex. + */ + + /** + * Callback invoked for batch archive reads to aggregate or forward results. + * + * Parameters: + * - data: user-provided context pointer passed through the async call. + * - closure: protobuf closure associated with the RPC (may be nullptr for local paths). + * - client: reference to the DataStoreServiceClient that executed the operation. + * - result: operation result detail (error code/message and any operation-specific fields). + */ + + /** + * Callback invoked to load a range slice (archive or otherwise). + * + * Parameters: + * - data: user-provided context pointer passed through the async call. + * - closure: protobuf closure associated with the RPC (may be nullptr for local paths). + * - client: reference to the DataStoreServiceClient that executed the operation. + * - result: operation result detail (error code/message and any operation-specific fields). + */ + + /** + * Closure implementing a datastore Read operation supporting both local and remote paths. + * + * Use Reset(...) to configure a read (table, partition, key, client, and callback), then: + * - PrepareRequest(is_local): prepare an RPC request if remote, or clear local result for local reads. + * - Run(): executed when an RPC completes (or when local processing is finished). Run() + * handles RPC failures with retry logic, translates NOT_OWNER into sharding handling + * and potential retry, and finally invokes the user callback with the CommonResult. + * + * Accessors provide access to the brpc::Controller, request/response objects, table/partition/key, + * and local-result fields (value, ts, ttl, result). Value accessors return either the local + * in-memory values or the response's values depending on the request mode. + * + * Note: retry behavior is governed by the associated DataStoreServiceClient retry_limit_. + */ + namespace EloqDS { typedef void (*DataStoreCallback)(void *data, ::google::protobuf::Closure *closure,