88#include < concepts>
99#include < coroutine>
1010#include < cstdint>
11+ #include < memory>
1112#include < optional>
1213#include < tuple>
1314#include < type_traits>
1415#include < utility>
1516
17+ #include " runtime-light/coroutine/async-stack.h"
18+ #include " runtime-light/coroutine/coroutine-state.h"
1619#include " runtime-light/coroutine/shared-task.h"
1720#include " runtime-light/coroutine/task.h"
1821#include " runtime-light/k2-platform/k2-api.h"
@@ -55,9 +58,24 @@ class fork_id_watcher_t {
5558 }
5659};
5760
61+ class async_stack_watcher_t {
62+ kphp::coro::async_stack_root* const async_stack_root;
63+ kphp::coro::async_stack_frame* const suspended_async_stack_frame;
64+
65+ protected:
66+ void await_resume () const noexcept {
67+ async_stack_root->top_async_stack_frame = suspended_async_stack_frame;
68+ }
69+
70+ public:
71+ async_stack_watcher_t () noexcept
72+ : async_stack_root(std::addressof(CoroutineInstanceState::get().coroutine_stack_root)),
73+ suspended_async_stack_frame (async_stack_root->top_async_stack_frame) {}
74+ };
75+
5876} // namespace awaitable_impl_
5977
60- class wait_for_update_t : public awaitable_impl_ ::fork_id_watcher_t {
78+ class wait_for_update_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
6179 uint64_t stream_d;
6280 SuspendToken suspend_token;
6381 awaitable_impl_::state state{awaitable_impl_::state::init};
@@ -96,6 +114,7 @@ class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t {
96114
97115 constexpr void await_resume () noexcept {
98116 state = awaitable_impl_::state::end;
117+ async_stack_watcher_t::await_resume ();
99118 fork_id_watcher_t::await_resume ();
100119 }
101120
@@ -111,7 +130,7 @@ class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t {
111130
112131// ================================================================================================
113132
114- class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
133+ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
115134 SuspendToken suspend_token{std::noop_coroutine (), WaitEvent::IncomingStream{}};
116135 awaitable_impl_::state state{awaitable_impl_::state::init};
117136
@@ -146,6 +165,7 @@ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
146165
147166 uint64_t await_resume () noexcept {
148167 state = awaitable_impl_::state::end;
168+ async_stack_watcher_t::await_resume ();
149169 fork_id_watcher_t::await_resume ();
150170 const auto incoming_stream_d{InstanceState::get ().take_incoming_stream ()};
151171 kphp::log::assertion (incoming_stream_d != k2::INVALID_PLATFORM_DESCRIPTOR);
@@ -164,7 +184,7 @@ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
164184
165185// ================================================================================================
166186
167- class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {
187+ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
168188 SuspendToken suspend_token{std::noop_coroutine (), WaitEvent::Rechedule{}};
169189 awaitable_impl_::state state{awaitable_impl_::state::init};
170190
@@ -198,6 +218,7 @@ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {
198218
199219 constexpr void await_resume () noexcept {
200220 state = awaitable_impl_::state::end;
221+ async_stack_watcher_t::await_resume ();
201222 fork_id_watcher_t::await_resume ();
202223 }
203224
@@ -213,7 +234,7 @@ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {
213234
214235// ================================================================================================
215236
216- class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
237+ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
217238 std::chrono::nanoseconds duration;
218239 uint64_t timer_d{k2::INVALID_PLATFORM_DESCRIPTOR};
219240 SuspendToken suspend_token{std::noop_coroutine (), WaitEvent::Rechedule{}};
@@ -259,6 +280,7 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
259280
260281 constexpr void await_resume () noexcept {
261282 state = awaitable_impl_::state::end;
283+ async_stack_watcher_t::await_resume ();
262284 fork_id_watcher_t::await_resume ();
263285 }
264286
@@ -275,7 +297,7 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
275297// ================================================================================================
276298
277299template <typename T>
278- class start_fork_t : awaitable_impl_::fork_id_watcher_t {
300+ class start_fork_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
279301 ForkInstanceState& fork_instance_st{ForkInstanceState::get ()};
280302
281303 int64_t fork_id{};
@@ -304,7 +326,8 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
304326 return fork_awaiter.await_ready ();
305327 }
306328
307- std::coroutine_handle<> await_suspend (std::coroutine_handle<> current_coro) noexcept {
329+ template <typename promise_t >
330+ std::coroutine_handle<> await_suspend (std::coroutine_handle<promise_t > current_coro) noexcept {
308331 state = awaitable_impl_::state::suspend;
309332 fork_instance_st.current_id = fork_id;
310333 if (fork_awaiter.await_suspend (current_coro)) [[unlikely]] {
@@ -318,6 +341,7 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
318341
319342 int64_t await_resume () noexcept {
320343 state = awaitable_impl_::state::end;
344+ async_stack_watcher_t::await_resume ();
321345 fork_id_watcher_t::await_resume ();
322346 fork_awaiter.await_resume ();
323347 return fork_id;
@@ -330,7 +354,7 @@ class start_fork_t : awaitable_impl_::fork_id_watcher_t {
330354// The main difference between them is that wait_fork_t can use shared_task::when_ready as an awaiter,
331355// whereas wait_fork_result_t can use shared_task::operator co_await.
332356template <typename T>
333- class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
357+ class wait_fork_t : awaitable_impl_::fork_id_watcher_t , awaitable_impl_:: async_stack_watcher_t {
334358 kphp::coro::shared_task<T> fork_task;
335359 std::remove_cvref_t <decltype (std::declval<kphp::coro::shared_task<T>>().operator co_await ())> fork_awaiter;
336360 awaitable_impl_::state state{awaitable_impl_::state::init};
@@ -364,13 +388,15 @@ class wait_fork_t : awaitable_impl_::fork_id_watcher_t {
364388 return state == awaitable_impl_::state::ready;
365389 }
366390
367- constexpr bool await_suspend (std::coroutine_handle<> coro) noexcept {
391+ template <typename promise_t >
392+ constexpr bool await_suspend (std::coroutine_handle<promise_t > coro) noexcept {
368393 state = awaitable_impl_::state::suspend;
369394 return fork_awaiter.await_suspend (coro);
370395 }
371396
372397 await_resume_t await_resume () noexcept {
373398 state = awaitable_impl_::state::end;
399+ async_stack_watcher_t::await_resume ();
374400 fork_id_watcher_t::await_resume ();
375401 if constexpr (std::is_void_v<await_resume_t >) {
376402 fork_awaiter.await_resume ();
@@ -431,7 +457,8 @@ class wait_with_timeout_t {
431457 // 3. await_suspend returns std::coroutine_handle<>.
432458 // we must guarantee that 'co_await wait_with_timeout_t{awaitable, timeout}' behaves like 'co_await awaitable' except
433459 // it may cancel 'co_await awaitable' if the timeout has elapsed.
434- await_suspend_return_t await_suspend (std::coroutine_handle<> coro) noexcept {
460+ template <typename promise_t >
461+ await_suspend_return_t await_suspend (std::coroutine_handle<promise_t > coro) noexcept {
435462 // as we don't rely on coroutine scheduler implementation, let's always suspend awaitable first. in case of some smart scheduler
436463 // it won't have any effect, but it will have an effect if our scheduler is quite simple.
437464 state = awaitable_impl_::state::suspend;
0 commit comments