Skip to content

Commit 3d2a701

Browse files
committed
coroutine stack: use boost::context::protected_fixedsize_stack
1 parent bd60381 commit 3d2a701

3 files changed

Lines changed: 39 additions & 81 deletions

File tree

src/mongo/db/kill_sessions_local.cpp

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
#include "mongo/util/log.h"
4545

4646
#include <boost/context/continuation_fcontext.hpp>
47+
#include <boost/context/protected_fixedsize_stack.hpp>
48+
4749

4850
namespace mongo {
4951
namespace {
@@ -71,6 +73,15 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
7173
return {std::vector<HostAndPort>{}};
7274
}
7375

76+
struct CoroCtx {
77+
static constexpr size_t kCoroStackSize = 3200 * 1024;
78+
boost::context::protected_fixedsize_stack salloc{kCoroStackSize};
79+
boost::context::continuation source;
80+
std::function<void()> yieldFunc;
81+
std::function<void()> resumeFunc;
82+
std::function<void()> longResumeFunc;
83+
};
84+
7485
void killAllExpiredTransactions(OperationContext* opCtx) {
7586
RecoveryUnit* ru = opCtx->releaseRecoveryUnit();
7687
WriteUnitOfWork::RecoveryUnitState ruState = opCtx->getRecoveryUnitState();
@@ -84,38 +95,40 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
8495
std::mutex mux;
8596
std::condition_variable cv;
8697

98+
std::shared_ptr<CoroCtx> coroCtx = std::make_shared<CoroCtx>();
99+
87100
auto client = Client::releaseCurrent();
88101
dassert(client->coroutineFunctors() == CoroutineFunctors::Unavailable);
89102

90-
std::function<void()> yieldFunc, resumeFunc, longResumeFunc;
91103
client->setCoroutineFunctors(CoroutineFunctors{
92-
&yieldFunc,
93-
&resumeFunc,
94-
&longResumeFunc,
104+
&coroCtx->yieldFunc,
105+
&coroCtx->resumeFunc,
106+
&coroCtx->longResumeFunc,
95107
nullptr,
96108
});
97109
transport::ServiceExecutor* serviceExecutor =
98110
getGlobalServiceContext()->getServiceEntryPoint()->getServiceExecutor();
99111

100-
boost::context::continuation source;
101-
std::function<void()> resumeTask = [&source, &client] {
102-
log() << "abortArbitraryTransactionIfExpired call resume";
112+
std::function<void()> resumeTask = [&source = coroCtx->source, &client] {
113+
log() << "abortArbitraryTransactionIfExpired call resume.";
103114
Client::setCurrent(std::move(client));
104115
source = source.resume();
105116
};
106-
resumeFunc =
117+
118+
coroCtx->resumeFunc =
107119
serviceExecutor->coroutineResumeFunctor(session->ThreadGroupId(), resumeTask);
108-
longResumeFunc =
120+
coroCtx->longResumeFunc =
109121
serviceExecutor->coroutineLongResumeFunctor(session->ThreadGroupId(), resumeTask);
110122

111-
auto task = [&finished, &mux, &cv, &source, &yieldFunc, opCtx, session, &client] {
123+
auto task = [&finished, &mux, &cv, coroCtx, opCtx, session, &client] {
112124
Client::setCurrent(std::move(client));
113-
114-
source = boost::context::callcc(
115-
[&finished, &mux, &cv, &yieldFunc, opCtx, session, &client](
125+
coroCtx->source = boost::context::callcc(
126+
std::allocator_arg,
127+
coroCtx->salloc,
128+
[&finished, &mux, &cv, coroCtx, opCtx, session, &client](
116129
boost::context::continuation&& sink) {
117-
yieldFunc = [&sink, &client]() {
118-
log() << "abortArbitraryTransactionIfExpired call yield";
130+
coroCtx->yieldFunc = [&sink, &client]() {
131+
log() << "abortArbitraryTransactionIfExpired call yield.";
119132
client = Client::releaseCurrent();
120133
sink = sink.resume();
121134
};

src/mongo/transport/service_state_machine.cpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -545,22 +545,22 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
545545
_coroMigrateThreadGroup = std::bind(
546546
&ServiceStateMachine::_migrateThreadGroup, this, std::placeholders::_1);
547547

548-
boost::context::stack_context sc = _coroStackContext();
549-
boost::context::preallocated prealloc(sc.sp, sc.size, sc);
548+
std::weak_ptr<ServiceStateMachine> wssm = weak_from_this();
550549
_source = boost::context::callcc(
551550
std::allocator_arg,
552-
prealloc,
553-
NoopAllocator(),
554-
[ssm = shared_from_this(),
555-
&guard](boost::context::continuation&& sink) {
551+
_salloc,
552+
[wssm, &guard](boost::context::continuation&& sink) {
553+
auto ssm = wssm.lock();
554+
if (!ssm) {
555+
return std::move(sink);
556+
}
556557
ssm->_coroYield = [ssm = ssm.get(), &sink]() {
557558
MONGO_LOG(3) << "call yield";
558559
ssm->_dbClient = Client::releaseCurrent();
559-
ssm->_abortIfStackOverflow();
560560
sink = sink.resume();
561561
};
562562
ssm->_processMessage(std::move(guard));
563-
ssm->_abortIfStackOverflow();
563+
564564
return std::move(sink);
565565
});
566566

@@ -578,15 +578,13 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
578578
// _coroYield = [this, &sink]() {
579579
// MONGO_LOG(1) << "call yield";
580580
// _dbClient = Client::releaseCurrent();
581-
// _abortIfStackOverflow();
582581
// sink = sink.resume();
583582
// };
584583
// _processMessage(std::move(guard));
585584
// return std::move(sink);
586585
// });
587586
} else if (_coroStatus == CoroStatus::OnGoing) {
588587
MONGO_LOG(1) << "coroutine ongoing";
589-
_abortIfStackOverflow();
590588
_source = _source.resume();
591589
}
592590
}
@@ -618,7 +616,6 @@ void ServiceStateMachine::_runResumeProcess() {
618616
MONGO_LOG(3) << "ServiceStateMachine::_resumeRun";
619617
if (_coroStatus == CoroStatus::OnGoing) {
620618
MONGO_LOG(3) << "coroutine ongoing";
621-
_abortIfStackOverflow();
622619
_source = _source.resume();
623620
}
624621
}

src/mongo/transport/service_state_machine.h

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#include <atomic>
3232
#include <boost/context/continuation.hpp>
3333
#include <boost/context/continuation_fcontext.hpp>
34-
#include <boost/context/stack_context.hpp>
34+
#include <boost/context/protected_fixedsize_stack.hpp>
3535
#include <functional>
3636

3737
#include "boost/optional/optional.hpp"
@@ -261,64 +261,12 @@ class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMach
261261
#endif
262262
std::string _oldThreadName;
263263

264-
// Coroutine design
265-
class NoopAllocator {
266-
public:
267-
NoopAllocator() = default;
268-
269-
boost::context::stack_context allocate() {
270-
boost::context::stack_context sc;
271-
return sc;
272-
}
273-
274-
void deallocate(boost::context::stack_context& sc) {
275-
// no-op
276-
}
277-
};
278-
279-
static constexpr size_t kCoroStackSize = 3200 * 1024;
280-
static constexpr char kCanaryByte = 0xAB;
281-
static constexpr size_t kCanarySize = 16;
282-
static constexpr char kCanaryBytes[kCanarySize] = {
283-
kCanaryByte,
284-
kCanaryByte,
285-
kCanaryByte,
286-
kCanaryByte,
287-
kCanaryByte,
288-
kCanaryByte,
289-
kCanaryByte,
290-
kCanaryByte,
291-
kCanaryByte,
292-
kCanaryByte,
293-
kCanaryByte,
294-
kCanaryByte,
295-
kCanaryByte,
296-
kCanaryByte,
297-
kCanaryByte,
298-
kCanaryByte,
299-
};
300-
301-
boost::context::stack_context _coroStackContext() {
302-
boost::context::stack_context sc;
303-
sc.size = kCoroStackSize;
304-
// Because stack grows downwards from high address?
305-
sc.sp = _coroStack + kCoroStackSize;
306-
// Set canary bytes at the end of the stack to detect stack overflow.
307-
std::memset(_coroStack, kCanaryByte, kCanarySize);
308-
return sc;
309-
}
310-
311-
void _abortIfStackOverflow() {
312-
if (std::memcmp(_coroStack, kCanaryBytes, kCanarySize) != 0) {
313-
std::abort();
314-
}
315-
}
316-
317264
void _migrateThreadGroup(uint16_t threadGroupId);
318265

319266

267+
static constexpr size_t kCoroStackSize = 3200 * 1024;
268+
boost::context::protected_fixedsize_stack _salloc{kCoroStackSize};
320269
boost::context::continuation _source;
321-
char _coroStack[kCoroStackSize];
322270

323271
enum class CoroStatus { Empty = 0, OnGoing, Finished };
324272
CoroStatus _coroStatus{CoroStatus::Empty};

0 commit comments

Comments
 (0)