Skip to content

Commit 937ffbd

Browse files
committed
perf: add connection pooling and parallel shard execution (opt-in, benefits large result sets)
1 parent a69ded3 commit 937ffbd

File tree

10 files changed

+387
-16
lines changed

10 files changed

+387
-16
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#ifndef SQL_ENGINE_CONNECTION_POOL_H
2+
#define SQL_ENGINE_CONNECTION_POOL_H
3+
4+
// Thread-safe connection pool for MySQL backends.
5+
//
6+
// Each backend name maps to a stack of idle MYSQL* handles. Threads check out
7+
// a handle (creating one on demand), use it, and check it back in. The pool
8+
// grows dynamically -- there is no hard cap -- so N concurrent calls to the
9+
// same backend will use N connections.
10+
11+
#include "sql_engine/backend_config.h"
12+
#include <mysql/mysql.h>
13+
#include <mutex>
14+
#include <unordered_map>
15+
#include <vector>
16+
#include <string>
17+
#include <stdexcept>
18+
19+
namespace sql_engine {
20+
21+
class ConnectionPool {
22+
public:
23+
ConnectionPool() = default;
24+
25+
~ConnectionPool() {
26+
std::lock_guard<std::mutex> lk(mu_);
27+
for (auto& kv : idle_) {
28+
for (MYSQL* c : kv.second) {
29+
if (c) mysql_close(c);
30+
}
31+
}
32+
idle_.clear();
33+
}
34+
35+
// Register a backend. Must be called before any checkout for that name.
36+
void add_backend(const BackendConfig& config) {
37+
std::lock_guard<std::mutex> lk(mu_);
38+
configs_[config.name] = config;
39+
}
40+
41+
// Obtain a connection for the named backend. Creates a new one if none is
42+
// idle. The caller MUST call checkin() when done.
43+
MYSQL* checkout(const std::string& backend) {
44+
std::lock_guard<std::mutex> lk(mu_);
45+
auto& stack = idle_[backend];
46+
if (!stack.empty()) {
47+
MYSQL* c = stack.back();
48+
stack.pop_back();
49+
// Quick liveness check (non-blocking).
50+
if (mysql_ping(c) != 0) {
51+
mysql_close(c);
52+
return create_connection(backend);
53+
}
54+
return c;
55+
}
56+
return create_connection(backend);
57+
}
58+
59+
// Return a connection to the pool for reuse.
60+
void checkin(const std::string& backend, MYSQL* conn) {
61+
if (!conn) return;
62+
std::lock_guard<std::mutex> lk(mu_);
63+
idle_[backend].push_back(conn);
64+
}
65+
66+
private:
67+
std::mutex mu_;
68+
std::unordered_map<std::string, BackendConfig> configs_;
69+
std::unordered_map<std::string, std::vector<MYSQL*>> idle_;
70+
71+
// Must be called with mu_ held.
72+
MYSQL* create_connection(const std::string& backend) {
73+
auto it = configs_.find(backend);
74+
if (it == configs_.end()) {
75+
throw std::runtime_error("ConnectionPool: unknown backend: " + backend);
76+
}
77+
const BackendConfig& cfg = it->second;
78+
MYSQL* c = mysql_init(nullptr);
79+
if (!c) throw std::runtime_error("mysql_init failed for " + backend);
80+
81+
unsigned int timeout = 5;
82+
mysql_options(c, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
83+
84+
if (!mysql_real_connect(c, cfg.host.c_str(), cfg.user.c_str(),
85+
cfg.password.c_str(), cfg.database.c_str(),
86+
cfg.port, nullptr, 0)) {
87+
std::string err = mysql_error(c);
88+
mysql_close(c);
89+
throw std::runtime_error("ConnectionPool connect failed for " + backend + ": " + err);
90+
}
91+
mysql_set_character_set(c, "utf8mb4");
92+
return c;
93+
}
94+
};
95+
96+
} // namespace sql_engine
97+
98+
#endif // SQL_ENGINE_CONNECTION_POOL_H

include/sql_engine/operators/merge_aggregate_op.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ class MergeAggregateOperator : public Operator {
152152
std::vector<Operator*> children_;
153153
uint16_t group_key_count_;
154154
std::vector<uint8_t> merge_ops_;
155-
bool parallel_open_;
156155
uint16_t merge_op_count_;
157156
sql_parser::Arena& arena_;
157+
bool parallel_open_;
158158

159159
struct GroupState {
160160
std::vector<Value> group_values;

include/sql_engine/operators/set_op_op.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,30 @@
66
#include <unordered_set>
77
#include <string>
88
#include <vector>
9+
#include <future>
910

1011
namespace sql_engine {
1112

1213
class SetOpOperator : public Operator {
1314
public:
14-
SetOpOperator(Operator* left, Operator* right, uint8_t op, bool all)
15-
: left_(left), right_(right), op_(op), all_(all) {}
15+
SetOpOperator(Operator* left, Operator* right, uint8_t op, bool all,
16+
bool parallel_open = false)
17+
: left_(left), right_(right), op_(op), all_(all),
18+
parallel_open_(parallel_open) {}
1619

1720
void open() override {
18-
left_->open();
19-
right_->open();
21+
if (parallel_open_) {
22+
// Parallel open (#26): launch left and right opens concurrently.
23+
// This is beneficial when both children are RemoteScan operators
24+
// (each performing a network round-trip in open()).
25+
auto fl = std::async(std::launch::async, [this]{ left_->open(); });
26+
auto fr = std::async(std::launch::async, [this]{ right_->open(); });
27+
fl.get();
28+
fr.get();
29+
} else {
30+
left_->open();
31+
right_->open();
32+
}
2033
reading_left_ = true;
2134
seen_.clear();
2235

@@ -99,6 +112,7 @@ class SetOpOperator : public Operator {
99112
Operator* right_;
100113
uint8_t op_;
101114
bool all_;
115+
bool parallel_open_;
102116
bool reading_left_ = true;
103117
std::unordered_set<std::string> seen_;
104118
std::unordered_set<std::string> right_set_;

include/sql_engine/plan_executor.h

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ class PlanExecutor {
8080
remote_executor_ = exec;
8181
}
8282

83+
// Enable parallel opening of RemoteScan children in merge/set operators.
84+
// Only safe when the RemoteExecutor is thread-safe (e.g. ThreadSafeMultiRemoteExecutor
85+
// with connection pooling). Disabled by default for backward compatibility.
86+
void set_parallel_open(bool enabled) {
87+
parallel_open_enabled_ = enabled;
88+
}
89+
8390
// Access the subquery executor (for operators that need it)
8491
SubqueryExecutor<D>* subquery_executor() { return &subquery_exec_; }
8592

@@ -294,6 +301,7 @@ class PlanExecutor {
294301
std::unordered_map<std::string, MutableDataSource*> mutable_sources_;
295302
std::vector<std::unique_ptr<Operator>> operators_;
296303
RemoteExecutor* remote_executor_ = nullptr;
304+
bool parallel_open_enabled_ = false;
297305
DistributeFn distribute_fn_;
298306
SubqueryExecutor<D> subquery_exec_;
299307
sql_parser::Arena subquery_plan_arena_{65536, 1048576};
@@ -1011,8 +1019,12 @@ class PlanExecutor {
10111019
Operator* right = build_operator(node->right);
10121020
if (!left || !right) return nullptr;
10131021

1022+
// Enable parallel open when both children are remote scans and executor is thread-safe
1023+
bool parallel = parallel_open_enabled_ &&
1024+
(node->left && node->left->type == PlanNodeType::REMOTE_SCAN &&
1025+
node->right && node->right->type == PlanNodeType::REMOTE_SCAN);
10141026
auto op = std::make_unique<SetOpOperator>(
1015-
left, right, node->set_op.op, node->set_op.all);
1027+
left, right, node->set_op.op, node->set_op.all, parallel);
10161028
Operator* ptr = op.get();
10171029
operators_.push_back(std::move(op));
10181030
return ptr;
@@ -1037,12 +1049,16 @@ class PlanExecutor {
10371049
}
10381050
if (children.empty()) return nullptr;
10391051

1052+
// Enable parallel open when children are RemoteScans and executor is thread-safe
1053+
bool parallel = parallel_open_enabled_ &&
1054+
(children.size() > 1) && has_remote_scan_children(node);
10401055
auto op = std::make_unique<MergeAggregateOperator>(
10411056
std::move(children),
10421057
node->merge_aggregate.group_key_count,
10431058
node->merge_aggregate.merge_ops,
10441059
node->merge_aggregate.merge_op_count,
1045-
arena_);
1060+
arena_,
1061+
parallel);
10461062
Operator* ptr = op.get();
10471063
operators_.push_back(std::move(op));
10481064
return ptr;
@@ -1074,16 +1090,40 @@ class PlanExecutor {
10741090
sort_dirs.push_back(node->merge_sort.directions[i]);
10751091
}
10761092

1093+
// Enable parallel open when children are RemoteScans and executor is thread-safe
1094+
bool parallel = parallel_open_enabled_ &&
1095+
(children.size() > 1) && has_remote_scan_children_merge_sort(node);
10771096
auto op = std::make_unique<MergeSortOperator>(
10781097
std::move(children),
10791098
sort_col_indices.data(),
10801099
sort_dirs.data(),
1081-
node->merge_sort.key_count);
1100+
node->merge_sort.key_count,
1101+
parallel);
10821102
Operator* ptr = op.get();
10831103
operators_.push_back(std::move(op));
10841104
return ptr;
10851105
}
10861106

1107+
// Check whether all children of a MERGE_AGGREGATE node are REMOTE_SCAN.
1108+
static bool has_remote_scan_children(const PlanNode* node) {
1109+
if (!node || node->type != PlanNodeType::MERGE_AGGREGATE) return false;
1110+
for (uint16_t i = 0; i < node->merge_aggregate.child_count; ++i) {
1111+
if (node->merge_aggregate.children[i]->type != PlanNodeType::REMOTE_SCAN)
1112+
return false;
1113+
}
1114+
return node->merge_aggregate.child_count > 0;
1115+
}
1116+
1117+
// Check whether all children of a MERGE_SORT node are REMOTE_SCAN.
1118+
static bool has_remote_scan_children_merge_sort(const PlanNode* node) {
1119+
if (!node || node->type != PlanNodeType::MERGE_SORT) return false;
1120+
for (uint16_t i = 0; i < node->merge_sort.child_count; ++i) {
1121+
if (node->merge_sort.children[i]->type != PlanNodeType::REMOTE_SCAN)
1122+
return false;
1123+
}
1124+
return node->merge_sort.child_count > 0;
1125+
}
1126+
10871127
uint16_t resolve_column_index(const sql_parser::AstNode* key, const TableInfo* table) {
10881128
if (!key || !table) return 0;
10891129
sql_parser::StringRef col_name;

include/sql_engine/session.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class Session {
5454
remote_executor_ = exec;
5555
}
5656

57+
// Enable parallel opening of RemoteScan children. Only safe when the
58+
// RemoteExecutor is thread-safe (e.g. ThreadSafeMultiRemoteExecutor).
59+
void set_parallel_open(bool enabled) {
60+
parallel_open_enabled_ = enabled;
61+
}
62+
5763
void set_shard_map(const ShardMap* sm) {
5864
shard_map_ = sm;
5965
}
@@ -193,6 +199,7 @@ class Session {
193199
Optimizer<D> optimizer_;
194200
RemoteExecutor* remote_executor_ = nullptr;
195201
const ShardMap* shard_map_ = nullptr;
202+
bool parallel_open_enabled_ = false;
196203
std::unordered_map<std::string, DataSource*> sources_;
197204
std::unordered_map<std::string, MutableDataSource*> mutable_sources_;
198205

@@ -203,6 +210,8 @@ class Session {
203210
executor.add_mutable_data_source(kv.first.c_str(), kv.second);
204211
if (remote_executor_)
205212
executor.set_remote_executor(remote_executor_);
213+
if (parallel_open_enabled_)
214+
executor.set_parallel_open(true);
206215
// If sharding is configured, provide a distribute callback so that
207216
// subqueries also go through the distributed planner.
208217
if (shard_map_ && remote_executor_) {

0 commit comments

Comments
 (0)