Skip to content

Commit b4a598d

Browse files
ericnieblermaikel
andauthored
fix deadlock when awaiting a sender completes inline with set_stopped (#2038)
* fix potential deadlock in `stdexec::task` * Encode cross-thread completion in ref count * stress test task scheduler affinity --------- Co-authored-by: Maikel Nadolski <maikel.nadolski@gmail.com>
1 parent 931f256 commit b4a598d

12 files changed

Lines changed: 377 additions & 386 deletions

include/exec/at_coroutine_exit.hpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
// The original idea is taken from libunifex and adapted to stdexec.
2020

21-
#include <exception>
22-
2321
#include "../stdexec/execution.hpp"
2422

2523
#include "any_sender_of.hpp"
2624

25+
#include <exception>
26+
#include <tuple>
27+
2728
namespace experimental::execution
2829
{
2930
namespace __at_coro_exit
@@ -79,8 +80,8 @@ namespace experimental::execution
7980

8081
template <receiver _Receiver>
8182
requires sender_to<_Sender, __receiver<_Receiver>>
82-
auto
83-
connect(_Receiver __rcvr) && noexcept -> connect_result_t<_Sender, __receiver<_Receiver>>
83+
auto connect(_Receiver __rcvr) && noexcept //
84+
-> connect_result_t<_Sender, __receiver<_Receiver>>
8485
{
8586
return STDEXEC::connect(static_cast<_Sender&&>(__sender_),
8687
__receiver<_Receiver>{static_cast<_Receiver&&>(__rcvr)});
@@ -137,6 +138,12 @@ namespace experimental::execution
137138
: __coro_(std::exchange(__that.__coro_, {}))
138139
{}
139140

141+
~__task()
142+
{
143+
if (__coro_)
144+
__coro_.destroy();
145+
}
146+
140147
[[nodiscard]]
141148
static constexpr auto await_ready() noexcept -> bool
142149
{
@@ -148,7 +155,7 @@ namespace experimental::execution
148155
//! coroutine exit; i.e., the coroutine that is co_await-ing the result of calling
149156
//! at_coroutine_exit.
150157
template <__has_continuation _Promise>
151-
auto await_suspend(__std::coroutine_handle<_Promise> __parent) noexcept -> bool
158+
auto await_suspend(__std::coroutine_handle<_Promise> __parent) -> bool
152159
{
153160
// Set the cleanup task's scheduler to the parent coroutine's scheduler.
154161
__coro_.promise().__scheduler_ = get_start_scheduler(get_env(__parent.promise()));
@@ -163,11 +170,13 @@ namespace experimental::execution
163170

164171
auto await_resume() noexcept -> std::tuple<_Ts&...>
165172
{
173+
// Release the cleanup coroutine. It is now responsible for destroying itself in
174+
// its final suspend.
166175
return std::exchange(__coro_, {}).promise().__args_;
167176
}
168177

169178
private:
170-
struct __final_awaitable
179+
struct __final_awaiter
171180
{
172181
static constexpr auto await_ready() noexcept -> bool
173182
{
@@ -183,7 +192,7 @@ namespace experimental::execution
183192
return STDEXEC_CORO_DESTROY_AND_CONTINUE(__h, __coro);
184193
}
185194

186-
void await_resume() const noexcept {}
195+
static constexpr void await_resume() noexcept {}
187196
};
188197

189198
struct __env
@@ -211,7 +220,7 @@ namespace experimental::execution
211220
}
212221

213222
[[nodiscard]]
214-
auto final_suspend() noexcept -> __final_awaitable
223+
auto final_suspend() noexcept -> __final_awaiter
215224
{
216225
return {};
217226
}

include/exec/task.hpp

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "../stdexec/__detail/__meta.hpp"
2424
#include "../stdexec/__detail/__optional.hpp"
2525
#include "../stdexec/__detail/__variant.hpp"
26+
#include "../stdexec/coroutine.hpp"
2627
#include "../stdexec/execution.hpp"
2728
#include "../stdexec/functional.hpp"
2829

@@ -42,10 +43,42 @@ namespace experimental::execution
4243

4344
// The required set_value_t() scheduler-sender completion signature is added in
4445
// any_receiver_ref::any_sender::any_scheduler.
45-
using __any_scheduler_completions =
46+
using __any_scheduler_completions_t =
4647
completion_signatures<set_value_t(), set_error_t(std::exception_ptr), set_stopped_t()>;
4748

48-
using __any_scheduler = any_scheduler<any_sender<any_receiver<__any_scheduler_completions>>>;
49+
using __any_scheduler_impl_t =
50+
any_scheduler<any_sender<any_receiver<__any_scheduler_completions_t>>>;
51+
52+
// A scheduler concept that does not check for copyability since that creates a cycle
53+
// in the type system.
54+
template <class _Scheduler>
55+
concept __semi_scheduler = requires(_Scheduler& __sched) {
56+
typename _Scheduler::scheduler_concept;
57+
requires __std::derived_from<typename _Scheduler::scheduler_concept, scheduler_tag>;
58+
{ schedule(__sched) } -> sender;
59+
};
60+
61+
struct __any_scheduler
62+
{
63+
using scheduler_concept = scheduler_t;
64+
65+
template <__not_same_as<__any_scheduler> _Scheduler>
66+
requires __semi_scheduler<_Scheduler>
67+
constexpr __any_scheduler(_Scheduler __sched) noexcept
68+
: __impl_(std::forward<_Scheduler>(__sched))
69+
{}
70+
71+
bool operator==(__any_scheduler const & __other) const noexcept = default;
72+
73+
[[nodiscard]]
74+
auto schedule() const
75+
{
76+
return __impl_.schedule();
77+
}
78+
79+
private:
80+
__any_scheduler_impl_t __impl_;
81+
};
4982

5083
static_assert(scheduler<__any_scheduler>);
5184

@@ -358,13 +391,13 @@ namespace experimental::execution
358391
{
359392
// Resuming the continuation of the parent coroutine will cause it to continue
360393
// executing on the new scheduler.
361-
__parent_.resume();
394+
STDEXEC::__coroutine_resume_nothrow(__parent_);
362395
}
363396

364397
void set_error(std::exception_ptr __eptr) noexcept
365398
{
366399
__eptr_ = std::move(__eptr);
367-
__parent_.resume();
400+
STDEXEC::__coroutine_resume_nothrow(__parent_);
368401
}
369402

370403
void set_stopped() noexcept
@@ -373,7 +406,7 @@ namespace experimental::execution
373406
// a promise that can handle the stopped signal. The coroutine referred to by
374407
// __continuation_ will never be resumed.
375408
__std::coroutine_handle<> __unwind = __parent_.promise().unhandled_stopped();
376-
__unwind.resume();
409+
STDEXEC::__coroutine_resume_nothrow(__unwind);
377410
}
378411

379412
[[nodiscard]]
@@ -505,7 +538,7 @@ namespace experimental::execution
505538
using __scheduler_t =
506539
__call_result_or_t<get_start_scheduler_t, STDEXEC::inline_scheduler, _Context>;
507540

508-
struct __final_awaitable
541+
struct __final_awaiter
509542
{
510543
static constexpr auto await_ready() noexcept -> bool
511544
{
@@ -535,7 +568,7 @@ namespace experimental::execution
535568
return {};
536569
}
537570

538-
constexpr auto final_suspend() noexcept -> __final_awaitable
571+
constexpr auto final_suspend() noexcept -> __final_awaiter
539572
{
540573
return {};
541574
}

0 commit comments

Comments
 (0)