Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 69 additions & 25 deletions src/mongo/db/modules/eloq/src/eloq_kv_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ EloqKVEngine::EloqKVEngine(const std::string& path) : _dbPath(path) {
if (bootstrap) {
std::vector<txservice::NodeConfig> soloConfig;
soloConfig.emplace_back(
0, eloqGlobalOptions.localAddr.host(), eloqGlobalOptions.localAddr.port());
0, eloqGlobalOptions.localAddr.host(), eloqGlobalOptions.localAddr.port(), true);
ngConfigs.try_emplace(0, std::move(soloConfig));
} else {
if (!txservice::ReadClusterConfigFile(clusterConfigPath, ngConfigs, clusterConfigVersion)) {
Expand Down Expand Up @@ -400,9 +400,48 @@ EloqKVEngine::EloqKVEngine(const std::string& path) : _dbPath(path) {
uasserted(ErrorCodes::InternalError, "Current node does not belong to any node group.");
}

bool isSingleNode = eloqGlobalOptions.ipList.find(',') == eloqGlobalOptions.ipList.npos;
if (bootstrap) {
// For bootstrap mode, we need to fetch all node groups to init data store shards.
std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>> tmpNgConfigs;
uint64_t clusterVersion = 2;
if (!txservice::ReadClusterConfigFile(clusterConfigPath, tmpNgConfigs, clusterVersion)) {
bool parse_res = txservice::ParseNgConfig(eloqGlobalOptions.ipList,
"",
"",
tmpNgConfigs,
eloqGlobalOptions.nodeGroupReplicaNum,
0);
if (!parse_res) {
error() << "Failed to extract cluster configs from ip_port_list.";
uasserted(ErrorCodes::InvalidOptions,
"Failed to extract cluster configs from ip_port_list.");
}
}

initDataStoreService(isSingleNode, nodeId, nativeNgId, ngConfigs);
bool found = false;
uint32_t dssNodeId = UINT32_MAX;
// check whether this node is in cluster.
for (auto& pair : ngConfigs) {
auto& ngNodes = pair.second;
for (auto& ngNode : ngNodes) {
if (ngNode.host_name_ == eloqGlobalOptions.localAddr.host() &&
ngNode.port_ == eloqGlobalOptions.localAddr.port()) {
dssNodeId = ngNode.node_id_;
found = true;
break;
}
}
}

if (!found) {
error() << "Current node does not belong to any node group.";
uasserted(ErrorCodes::InternalError, "Current node does not belong to any node group.");
}

initDataStoreService(dssNodeId, tmpNgConfigs);
} else {
initDataStoreService(nodeId, ngConfigs);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

std::vector<std::string> txlogIPs;
std::vector<uint16_t> txlogPorts;
Expand Down Expand Up @@ -610,29 +649,18 @@ EloqKVEngine::EloqKVEngine(const std::string& path) : _dbPath(path) {
}

void EloqKVEngine::initDataStoreService(
bool isSingleNode,
uint32_t node_id,
uint32_t native_ng_id,
const std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>>& ngConfigs) {
auto localIp = eloqGlobalOptions.localAddr.host();
auto localPort = eloqGlobalOptions.localAddr.port();
txservice::CatalogFactory* catalog_factories[3] = {nullptr, nullptr, &_catalogFactory};

bool opt_bootstrap = serverGlobalParams.bootstrap;
bool isSingleNode = (ngConfigs.size() == 1 && ngConfigs.begin()->second.size() == 1);
std::string ds_peer_node = eloqGlobalOptions.dssPeerNode;

std::string dss_config_file_path = "";
EloqDS::DataStoreServiceClusterManager ds_config;
uint32_t dss_leader_id = EloqDS::UNKNOWN_DSS_LEADER_NODE_ID;

// use tx node id as the dss node id
// since they are deployed together
uint32_t dss_node_id = node_id;
if (opt_bootstrap || isSingleNode) {
dss_leader_id = node_id;
}

if (!ds_peer_node.empty()) {
auto localIp = eloqGlobalOptions.localAddr.host();
auto localPort = eloqGlobalOptions.localAddr.port();
ds_config.SetThisNode(localIp, EloqDS::DataStoreServiceClient::TxPort2DssPort(localPort));
// Fetch ds topology from peer node
if (!EloqDS::DataStoreService::FetchConfigFromPeer(ds_peer_node, ds_config)) {
Expand All @@ -642,13 +670,16 @@ void EloqKVEngine::initDataStoreService(
ds_peer_node);
}
} else {
if (ngConfigs.size() > 1) {
error() << "DSS peer node must be provided in multi-node deployment.";
uasserted(ErrorCodes::InternalError, "DataStoreService initialization failed");
std::unordered_map<uint32_t, uint32_t> ng_leaders;
if (opt_bootstrap || isSingleNode) {
// For bootstrap, start all data store shards in current node.
for (auto& ng : ngConfigs) {
ng_leaders.emplace(ng.first, node_id);
}
}

EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
dss_node_id, native_ng_id, ngConfigs, dss_leader_id, ds_config);
node_id, ngConfigs, ng_leaders, ds_config);
}

#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \
Expand Down Expand Up @@ -710,17 +741,17 @@ void EloqKVEngine::initDataStoreService(
Eloq::dataStoreService = std::make_unique<EloqDS::DataStoreService>(
ds_config, dss_config_file_path, _dbPath + "/DSMigrateLog", std::move(ds_factory));


// setup local data store service, the data store will start data store if needed.
bool ret = true;
#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB)
// For non shared storage like rocksdb,
// we always set create_if_missing to true
// since non conflicts will happen under
// multi-node deployment.
ret = Eloq::dataStoreService->StartService(true, dss_leader_id, dss_node_id);
ret = Eloq::dataStoreService->StartService(true);
#else
ret = Eloq::dataStoreService->StartService(
(opt_bootstrap || isSingleNode), dss_leader_id, dss_node_id);
ret = Eloq::dataStoreService->StartService((opt_bootstrap || isSingleNode));
#endif
if (!ret) {
error() << "Failed to start data store service";
Expand All @@ -729,7 +760,15 @@ void EloqKVEngine::initDataStoreService(

// setup data store service client
Eloq::storeHandler = std::make_unique<EloqDS::DataStoreServiceClient>(
catalog_factories, ds_config, Eloq::dataStoreService.get());
#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB)
true,
#else
(opt_bootstrap || isSingleNode),
#endif
catalog_factories,
ds_config,
ds_peer_node.empty(),
Eloq::dataStoreService.get());

if (!Eloq::storeHandler->Connect()) {
error() << "!!!!!!!! Failed to connect ELOQ_DS server, EloqDB "
Expand Down Expand Up @@ -1333,6 +1372,11 @@ void MongoSystemHandler::ReloadCache(std::function<void(bool)> done) {
mongo::Status status = mongo::Status::OK();

auto serviceContext = mongo::getGlobalServiceContext();
if (serviceContext == nullptr || !serviceContext->isStartupComplete()) {
done(true);
return true;
}

auto client = mongo::getGlobalServiceContext()->makeClient("eloq_table_schema");
auto opCtx = serviceContext->makeOperationContext(client.get());
auto const globalAuthzManager = mongo::AuthorizationManager::get(serviceContext);
Expand Down
2 changes: 0 additions & 2 deletions src/mongo/db/modules/eloq/src/eloq_kv_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class EloqKVEngine final : public KVEngine {
public:
explicit EloqKVEngine(const std::string& path);
void initDataStoreService(
bool isSingleNode,
uint32_t nodeId,
uint32_t nativeNgId,
const std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>>& ng_configs);

~EloqKVEngine() override;
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/modules/eloq/store_handler
Submodule store_handler updated 32 files
+395 −278 data_store_service_client.cpp
+59 −73 data_store_service_client.h
+28 −19 data_store_service_client_closure.cpp
+73 −52 data_store_service_client_closure.h
+5 −1 data_store_service_scanner.cpp
+8 −7 data_store_service_scanner.h
+5 −5 eloq_data_store_service/CMakeLists.txt
+1 −0 eloq_data_store_service/build_eloq_store.cmake
+48 −0 eloq_data_store_service/data_store_factory.h
+805 −239 eloq_data_store_service/data_store_service.cpp
+135 −45 eloq_data_store_service/data_store_service.h
+14 −0 eloq_data_store_service/data_store_service_config.cpp
+16 −1 eloq_data_store_service/data_store_service_config.h
+5 −0 eloq_data_store_service/data_store_service_util.h
+24 −5 eloq_data_store_service/ds_request.proto
+9 −0 eloq_data_store_service/eloq_store_config.cpp
+45 −0 eloq_data_store_service/eloq_store_data_store_factory.h
+198 −69 eloq_data_store_service/internal_request.h
+2 −2 eloq_data_store_service/main.cpp
+114 −16 eloq_data_store_service/rocksdb_cloud_data_store.cpp
+14 −0 eloq_data_store_service/rocksdb_cloud_data_store.h
+54 −1 eloq_data_store_service/rocksdb_cloud_data_store_factory.h
+2 −8 eloq_data_store_service/rocksdb_data_store.cpp
+22 −27 eloq_data_store_service/rocksdb_data_store_common.cpp
+2 −4 eloq_data_store_service/rocksdb_data_store_common.h
+40 −0 eloq_data_store_service/rocksdb_data_store_factory.h
+163 −0 eloq_data_store_service/s3_file_downloader.cpp
+68 −0 eloq_data_store_service/s3_file_downloader.h
+51 −35 eloq_data_store_service/thread_worker_pool.cpp
+1 −0 eloq_data_store_service/thread_worker_pool.h
+3 −2 rocksdb_handler.cpp
+3 −2 rocksdb_handler.h
6 changes: 6 additions & 0 deletions src/mongo/db/service_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ void ServiceContext::notifyStartupComplete() {
_startupCompleteCondVar.notify_all();
}

bool ServiceContext::isStartupComplete() {
stdx::unique_lock lk(_mutex);
return _startupComplete;
}


namespace {

/**
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/service_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ class ServiceContext final : public Decorable<ServiceContext> {
*/
void notifyStartupComplete();

bool isStartupComplete();

/**
* Set the OpObserver.
*/
Expand Down