Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion strings/base_collections_base.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace winrt::impl
{
struct nop_lock_guard {};
struct [[maybe_unused]] nop_lock_guard {};

struct single_threaded_collection_base
{
Expand Down
287 changes: 146 additions & 141 deletions strings/base_coroutine_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,224 +332,229 @@ namespace winrt::impl
}
};

struct timespan_awaiter : cancellable_awaiter<timespan_awaiter>
template <typename Derived, typename Traits, typename Result = void>
struct threadpool_awaiter_base : cancellable_awaiter<threadpool_awaiter_base<Derived, Traits, Result>>
{
explicit timespan_awaiter(Windows::Foundation::TimeSpan duration) noexcept :
m_duration(duration)
{
}

#if defined(__GNUC__) && !defined(__clang__)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this because the upstream bug has been fixed since GCC 12

// HACK: GCC seems to require a move when calling operator co_await
// on the return value of resume_after.
// This might be related to upstream bug:
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=99575
timespan_awaiter(timespan_awaiter &&other) noexcept :
m_timer{std::move(other.m_timer)},
m_duration{std::move(other.m_duration)},
m_handle{std::move(other.m_handle)},
m_state{other.m_state.load()}
{}
#endif

void enable_cancellation(cancellable_promise* promise)
{
promise->set_canceller([](void* context)
{
auto that = static_cast<timespan_awaiter*>(context);
if (that->m_state.exchange(state::canceled, std::memory_order_acquire) == state::pending)
auto that = static_cast<threadpool_awaiter_base*>(context);
if (that->m_state.exchange(state::canceled, std::memory_order_acquire) == state::suspended)
{
that->fire_immediately();
if (static_cast<Derived*>(that)->cancel(that->m_handle.get()))
{
static_cast<Derived*>(that)->fire_immediately(that->m_handle.get());
}
}
}, this);
}

bool await_ready() const noexcept
Result await_resume()
{
return m_duration.count() <= 0;
if (m_state.exchange(state::idle, std::memory_order_relaxed) == state::canceled)
{
throw hresult_canceled();
}

if constexpr (!std::is_same_v<void, Result>)
{
return static_cast<Derived*>(this)->get_result();
}
}

template <typename T>
void await_suspend(impl::coroutine_handle<T> handle)
bool await_suspend(impl::coroutine_handle<T> resume)
{
set_cancellable_promise_from_handle(handle);
handle_type<Traits> new_handle;
new_handle.attach(check_pointer(static_cast<Derived*>(this)->create_threadpool_handle()));

m_handle = handle;
create_threadpool_timer();
state expected = state::idle;
if (m_state.compare_exchange_strong(expected, state::pending, std::memory_order_release))
{
this->set_cancellable_promise_from_handle(resume);

m_resume = resume;
m_handle = std::move(new_handle);

if (m_state.load(std::memory_order_acquire) != state::canceled)
{
slim_lock_guard guard{m_mutex};

static_cast<Derived*>(this)->suspend_on_threadpool(m_handle.get());

expected = state::pending;
if (!m_state.compare_exchange_strong(expected, state::suspended, std::memory_order_release))
{
// handle the case of the cancelation occurring while we where suspending on the thread pool
if (static_cast<Derived*>(this)->cancel(m_handle.get()))
{
// we canceled before a callback was scheduled, so we can short-circuit
return false;
}
}

return true;
}
else
{
// short-circuit in case of an early cancelation
return false;
}
}
else
{
throw hresult_illegal_method_call();
}
}

void await_resume()
protected:
threadpool_awaiter_base() = default;

void resume()
{
if (m_state.exchange(state::idle, std::memory_order_relaxed) == state::canceled)
{
throw hresult_canceled();
// acquire the mutex to ensure await_suspend is finished executing
slim_lock_guard guard{m_mutex};
}

m_resume();
}

private:
void create_threadpool_timer()
enum class state { idle, pending, suspended, canceled };

handle_type<Traits> m_handle;
impl::coroutine_handle<> m_resume{ nullptr };
std::atomic<state> m_state{ state::idle };
slim_mutex m_mutex;
};

struct tp_timer_traits
{
using type = impl::ptp_timer;

static void close(type value) noexcept
{
m_timer.attach(check_pointer(WINRT_IMPL_CreateThreadpoolTimer(callback, this, nullptr)));
int64_t relative_count = -m_duration.count();
WINRT_IMPL_SetThreadpoolTimer(m_timer.get(), &relative_count, 0, 0);
WINRT_IMPL_CloseThreadpoolTimer(value);
}

state expected = state::idle;
if (!m_state.compare_exchange_strong(expected, state::pending, std::memory_order_release))
{
fire_immediately();
}
static constexpr type invalid() noexcept
{
return nullptr;
}
};

void fire_immediately() noexcept
struct timespan_awaiter : threadpool_awaiter_base<timespan_awaiter, tp_timer_traits>
{
explicit timespan_awaiter(Windows::Foundation::TimeSpan duration) noexcept :
m_duration(duration)
{
if (WINRT_IMPL_SetThreadpoolTimerEx(m_timer.get(), nullptr, 0, 0))
{
int64_t now = 0;
WINRT_IMPL_SetThreadpoolTimer(m_timer.get(), &now, 0, 0);
}
}

static void __stdcall callback(void*, void* context, void*) noexcept
bool await_ready() const noexcept
{
auto that = reinterpret_cast<timespan_awaiter*>(context);
that->m_handle();
return m_duration.count() <= 0;
}

struct timer_traits
impl::ptp_timer create_threadpool_handle() noexcept
{
using type = impl::ptp_timer;
return WINRT_IMPL_CreateThreadpoolTimer(callback, this, nullptr);
}

static void close(type value) noexcept
{
WINRT_IMPL_CloseThreadpoolTimer(value);
}
void suspend_on_threadpool(impl::ptp_timer handle) const noexcept
{
int64_t relative_count = -m_duration.count();
WINRT_IMPL_SetThreadpoolTimerEx(handle, &relative_count, 0, 0);
}

static constexpr type invalid() noexcept
{
return nullptr;
}
};
bool cancel(impl::ptp_timer handle) const noexcept
{
return WINRT_IMPL_SetThreadpoolTimerEx(handle, nullptr, 0, 0);
}

enum class state { idle, pending, canceled };
void fire_immediately(impl::ptp_timer handle) const noexcept
{
int64_t now = 0;
WINRT_IMPL_SetThreadpoolTimerEx(handle, &now, 0, 0);
}

private:
static void __stdcall callback(void*, void* context, void*) noexcept
{
auto that = reinterpret_cast<timespan_awaiter*>(context);
that->resume();
}

handle_type<timer_traits> m_timer;
Windows::Foundation::TimeSpan m_duration;
impl::coroutine_handle<> m_handle;
std::atomic<state> m_state{ state::idle };
};

struct signal_awaiter : cancellable_awaiter<signal_awaiter>
struct tp_wait_traits
{
signal_awaiter(void* handle, Windows::Foundation::TimeSpan timeout) noexcept :
m_timeout(timeout),
m_handle(handle)
{}
using type = impl::ptp_wait;

#if defined(__GNUC__) && !defined(__clang__)
// HACK: GCC seems to require a move when calling operator co_await
// on the return value of resume_on_signal.
// This might be related to upstream bug:
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=99575
signal_awaiter(signal_awaiter &&other) noexcept :
m_wait{std::move(other.m_wait)},
m_timeout{std::move(other.m_timeout)},
m_handle{std::move(other.m_handle)},
m_result{std::move(other.m_result)},
m_resume{std::move(other.m_resume)},
m_state{other.m_state.load()}
{}
#endif
static void close(type value) noexcept
{
WINRT_IMPL_CloseThreadpoolWait(value);
}

void enable_cancellation(cancellable_promise* promise)
static constexpr type invalid() noexcept
{
promise->set_canceller([](void* context)
{
auto that = static_cast<signal_awaiter*>(context);
if (that->m_state.exchange(state::canceled, std::memory_order_acquire) == state::pending)
{
that->fire_immediately();
}
}, this);
return nullptr;
}
};

struct signal_awaiter : threadpool_awaiter_base<signal_awaiter, tp_wait_traits, bool>
{
signal_awaiter(void* handle, Windows::Foundation::TimeSpan timeout) noexcept :
m_timeout(timeout),
m_handle(handle)
{}

bool await_ready() const noexcept
{
return WINRT_IMPL_WaitForSingleObject(m_handle, 0) == 0;
}

template <typename T>
void await_suspend(impl::coroutine_handle<T> resume)
bool get_result() const noexcept
{
set_cancellable_promise_from_handle(resume);

m_resume = resume;
create_threadpool_wait();
return m_result == 0;
}

bool await_resume()
impl::ptp_wait create_threadpool_handle() noexcept
{
if (m_state.exchange(state::idle, std::memory_order_relaxed) == state::canceled)
{
throw hresult_canceled();
}
return m_result == 0;
return WINRT_IMPL_CreateThreadpoolWait(callback, this, nullptr);
}

private:

void create_threadpool_wait()
void suspend_on_threadpool(impl::ptp_wait handle) const noexcept
{
m_wait.attach(check_pointer(WINRT_IMPL_CreateThreadpoolWait(callback, this, nullptr)));
int64_t relative_count = -m_timeout.count();
int64_t* file_time = relative_count != 0 ? &relative_count : nullptr;
WINRT_IMPL_SetThreadpoolWait(m_wait.get(), m_handle, file_time);
WINRT_IMPL_SetThreadpoolWaitEx(handle, m_handle, file_time, nullptr);
}

state expected = state::idle;
if (!m_state.compare_exchange_strong(expected, state::pending, std::memory_order_release))
{
fire_immediately();
}
bool cancel(impl::ptp_wait handle) const noexcept
{
return WINRT_IMPL_SetThreadpoolWaitEx(handle, nullptr, nullptr, nullptr);
}

void fire_immediately() noexcept
void fire_immediately(impl::ptp_wait handle) const noexcept
{
if (WINRT_IMPL_SetThreadpoolWaitEx(m_wait.get(), nullptr, nullptr, nullptr))
{
int64_t now = 0;
WINRT_IMPL_SetThreadpoolWait(m_wait.get(), WINRT_IMPL_GetCurrentProcess(), &now);
}
int64_t now = 0;
WINRT_IMPL_SetThreadpoolWaitEx(handle, WINRT_IMPL_GetCurrentProcess(), &now, nullptr);
}

private:
static void __stdcall callback(void*, void* context, void*, uint32_t result) noexcept
{
auto that = static_cast<signal_awaiter*>(context);
that->m_result = result;
that->m_resume();
that->resume();
}

struct wait_traits
{
using type = impl::ptp_wait;

static void close(type value) noexcept
{
WINRT_IMPL_CloseThreadpoolWait(value);
}

static constexpr type invalid() noexcept
{
return nullptr;
}
};

enum class state { idle, pending, canceled };

handle_type<wait_traits> m_wait;
Windows::Foundation::TimeSpan m_timeout;
void* m_handle;
uint32_t m_result{};
impl::coroutine_handle<> m_resume{ nullptr };
std::atomic<state> m_state{ state::idle };
};
}

Expand Down
Loading