@@ -12,6 +12,7 @@ import std;
1212#include < concepts>
1313#include < coroutine>
1414#include < exception>
15+ #include < thread>
1516#include < tuple>
1617#include < type_traits>
1718#include < utility>
@@ -53,13 +54,15 @@ class sender_awaitable {
5354 ::beman::execution::detail::single_sender_value_type<Sndr, ::beman::execution::env_of_t <Promise>>;
5455 using result_type = ::std::conditional_t <::std::is_void_v<value_type>, unit, value_type>;
5556 using variant_type = ::std::variant<::std::monostate, result_type, ::std::exception_ptr>;
56- using data_type = ::std::tuple<variant_type, ::std::atomic<bool >, ::std::coroutine_handle<Promise>>;
57+ using data_type = ::std::tuple<variant_type, ::std::atomic<::std::thread::id >, ::std::coroutine_handle<Promise>>;
5758
5859 struct awaitable_receiver {
5960 using receiver_concept = ::beman::execution::receiver_t ;
6061
6162 void resume () {
62- if (::std::get<1 >(*result_ptr_).exchange (true , std::memory_order_acq_rel)) {
63+ std::thread::id id (::std::this_thread::get_id ());
64+ if (not ::std::get<1 >(*result_ptr_)
65+ .compare_exchange_strong (id, ::std::thread::id{}, std::memory_order_acq_rel)) {
6366 ::std::get<2 >(*result_ptr_).resume ();
6467 }
6568 }
@@ -82,7 +85,9 @@ class sender_awaitable {
8285 }
8386
8487 void set_stopped () && noexcept {
85- if (::std::get<1 >(*result_ptr_).exchange (true , ::std::memory_order_acq_rel)) {
88+ std::thread::id id (::std::this_thread::get_id ());
89+ if (not ::std::get<1 >(*result_ptr_)
90+ .compare_exchange_strong (id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
8691 static_cast <::std::coroutine_handle<>>(::std::get<2 >(*result_ptr_).promise ().unhandled_stopped ())
8792 .resume ();
8893 }
@@ -102,14 +107,16 @@ class sender_awaitable {
102107
103108 public:
104109 sender_awaitable (Sndr&& sndr, Promise& p)
105- : result{::std::monostate{}, false , ::std::coroutine_handle<Promise>::from_promise (p)},
110+ : result{::std::monostate{}, :: std::this_thread::get_id () , ::std::coroutine_handle<Promise>::from_promise (p)},
106111 state{::beman::execution::connect (::std::forward<Sndr>(sndr),
107112 sender_awaitable::awaitable_receiver{::std::addressof (result)})} {}
108113
109114 static constexpr bool await_ready () noexcept { return false ; }
110115 ::std::coroutine_handle<> await_suspend (::std::coroutine_handle<Promise> handle) noexcept {
111116 ::beman::execution::start (state);
112- if (::std::get<1 >(this ->result ).exchange (true , std::memory_order_acq_rel)) {
117+ ::std::thread::id id (::std::this_thread::get_id ());
118+ if (not ::std::get<1 >(this ->result )
119+ .compare_exchange_strong (id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
113120 if (::std::holds_alternative<::std::monostate>(::std::get<0 >(this ->result ))) {
114121 return ::std::get<2 >(this ->result ).promise ().unhandled_stopped ();
115122 }
0 commit comments