Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,225 changes: 917 additions & 308 deletions data_store_service_client.cpp

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@

namespace EloqDS
{
// Forward declarations for types defined in closure header
struct PartitionFlushState;
struct PartitionBatchRequest;
struct PartitionCallbackData;
class DataStoreServiceClient;
class BatchWriteRecordsClosure;
class ReadClosure;
Expand Down Expand Up @@ -489,6 +493,27 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

void BatchWriteRecordsInternal(BatchWriteRecordsClosure *closure);

/**
* Helper methods for concurrent PutAll implementation
*/
void PreparePartitionBatches(
PartitionFlushState& partition_state,
const std::vector<std::pair<size_t, size_t>>& flush_recs,
const std::vector<std::unique_ptr<txservice::FlushTaskEntry>>& entries,
const txservice::TableName& table_name,
uint16_t parts_cnt_per_key,
uint16_t parts_cnt_per_record,
uint64_t now);

void PrepareRangePartitionBatches(
PartitionFlushState& partition_state,
const std::vector<size_t>& flush_recs,
const std::vector<std::unique_ptr<txservice::FlushTaskEntry>>& entries,
const txservice::TableName& table_name,
uint16_t parts_cnt_per_key,
uint16_t parts_cnt_per_record,
uint64_t now);

/**
* Delete range and flush data are not frequent calls, all calls are sent
* with rpc.
Expand Down Expand Up @@ -635,6 +660,10 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
friend class DropTableClosure;
friend class ScanNextClosure;
friend class CreateSnapshotForBackupClosure;
friend void PartitionBatchCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);
friend class SinglePartitionScanner;
friend void FetchAllDatabaseCallback(void *data,
::google::protobuf::Closure *closure,
Expand Down
72 changes: 64 additions & 8 deletions data_store_service_client_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
*
*/

#include "data_store_service_client_closure.h"

#include <memory>
#include <string>
#include <utility>

#include "data_store_service_client_closure.h"
#include "store_util.h" // host_to_big_endian
#include "tx_service/include/cc/cc_request.h"
#include "tx_service/include/cc/local_cc_shards.h"
Expand Down Expand Up @@ -173,8 +174,8 @@ void FetchRecordCallback(void *data,
if (!DataStoreServiceClient::DeserializeTxRecordStr(
val, is_deleted, offset))
{
LOG(ERROR) << "====fetch record===decode error=="
<< " key: " << read_closure->Key()
LOG(ERROR) << "====fetch record===decode error==" << " key: "
<< read_closure->Key()
<< " status: " << (int) fetch_cc->rec_status_;
std::abort();
}
Expand Down Expand Up @@ -396,15 +397,58 @@ void FetchTableCallback(void *data,
fetch_table_data->Notify();
}

void SyncPutAllCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result)
void SyncConcurrentRequestCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result)
{
auto *callback_data = reinterpret_cast<SyncPutAllData *>(data);
auto *callback_data = reinterpret_cast<SyncConcurrentRequest *>(data);
callback_data->Finish(result);
}

void PartitionBatchCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result)
{
auto *callback_data = reinterpret_cast<PartitionCallbackData *>(data);
auto *partition_state = callback_data->partition_state;
auto *global_coordinator = callback_data->global_coordinator;

// Check if the batch failed
if (result.error_code() != remote::DataStoreError::NO_ERROR)
{
partition_state->MarkFailed(result);
// Notify the global coordinator that this partition failed
global_coordinator->OnPartitionCompleted();
return;
}

// Try to get the next batch for this partition
PartitionBatchRequest &next_batch = callback_data->inflight_batch;
if (partition_state->GetNextBatch(next_batch))
{
// Send the next batch
client.BatchWriteRecords(callback_data->table_name,
partition_state->partition_id,
std::move(next_batch.key_parts),
std::move(next_batch.record_parts),
std::move(next_batch.records_ts),
std::move(next_batch.records_ttl),
std::move(next_batch.op_types),
true, // skip_wal
callback_data,
PartitionBatchCallback,
next_batch.parts_cnt_per_key,
next_batch.parts_cnt_per_record);
}
else
{
// Notify the global coordinator that this partition completed
global_coordinator->OnPartitionCompleted();
}
}

void FetchDatabaseCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
Expand Down Expand Up @@ -1386,4 +1430,16 @@ void CreateSnapshotForBackupCallback(void *data,

backup_callback_data->Notify();
}

bool PartitionFlushState::GetNextBatch(PartitionBatchRequest &batch)
{
std::unique_lock<bthread::Mutex> lk(mux);
if (pending_batches.empty())
{
return false;
}
batch = std::move(pending_batches.front());
pending_batches.pop();
return true;
}
} // namespace EloqDS
Loading