Skip to content

Commit 6e759c2

Browse files
committed
Fix coroutine frame leaks and IOCP shutdown hang (#159)
Coroutine frames were leaked during shutdown because destroy handlers skipped h.destroy(). post_handler::destroy() and completion_op::destroy() now destroy the coroutine frame in all backends. timer_service::shutdown() destroys frames instead of nulling handles. Fix IOCP shutdown hang caused by service ordering: execution_context creates timer_service from win_scheduler's constructor before prepending win_scheduler to the service list, so shutdown() calls the scheduler first. Its work-counting drain loop spun forever because timer heap waiters hadn't been released. Fix by calling timer_svc_->shutdown() early, matching Asio's pattern. Restructure the IOCP drain loop to match Asio: drain completed_ops first, poll GQCS only when empty, decrement outstanding_work before each destroy. Remove dead shutdown_ flag from all backends. Remove force-reset of outstanding_work_ from non-IOCP backends. Add bounded-destruction depth guard to IOCP post_handler. Widen timing budget in testExpiresAtCancelsWaiter for Windows CI. Add regression tests for shutdown: posted coroutine frames, timer waiters, timer heap drain, abrupt stop with pending ops, and IOCP-specific completion draining.
1 parent 6eaf03d commit 6e759c2

File tree

8 files changed

+468
-54
lines changed

8 files changed

+468
-54
lines changed

include/boost/corosio/detail/timer_service.hpp

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <mutex>
3131
#include <optional>
3232
#include <stop_token>
33+
#include <utility>
3334
#include <vector>
3435

3536
namespace boost::corosio::detail {
@@ -198,8 +199,7 @@ struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
198199
completion_op() noexcept : scheduler_op(&do_complete) {}
199200

200201
void operator()() override;
201-
// No-op — lifetime owned by waiter_node, not the scheduler queue
202-
void destroy() override {}
202+
void destroy() override;
203203
};
204204

205205
// Per-waiter stop_token cancellation
@@ -328,15 +328,22 @@ timer_service::shutdown()
328328
{
329329
timer_service_invalidate_cache();
330330

331-
// Cancel waiting timers still in the heap
331+
// Cancel waiting timers still in the heap.
332+
// Each waiter called work_started() in implementation::wait().
333+
// On IOCP the scheduler shutdown loop exits when outstanding_work_
334+
// reaches zero, so we must call work_finished() here to balance it.
335+
// On other backends this is harmless (their drain loops exit when
336+
// the queue is empty, not based on outstanding_work_).
332337
for (auto& entry : heap_)
333338
{
334339
auto* impl = entry.timer_;
335340
while (auto* w = impl->waiters_.pop_front())
336341
{
337342
w->stop_cb_.reset();
338-
w->h_ = {};
343+
auto h = std::exchange(w->h_, {});
339344
sched_->work_finished();
345+
if (h)
346+
h.destroy();
340347
delete w;
341348
}
342349
impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
@@ -722,10 +729,12 @@ waiter_node::canceller::operator()() const
722729

723730
inline void
724731
waiter_node::completion_op::do_complete(
725-
void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
732+
[[maybe_unused]] void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
726733
{
727-
if (!owner)
728-
return;
734+
// owner is always non-null here. The destroy path (owner == nullptr)
735+
// is unreachable because completion_op overrides destroy() directly,
736+
// bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
737+
BOOST_COROSIO_ASSERT(owner);
729738
static_cast<completion_op*>(base)->operator()();
730739
}
731740

@@ -748,6 +757,30 @@ waiter_node::completion_op::operator()()
748757
sched.work_finished();
749758
}
750759

760+
inline void
761+
waiter_node::completion_op::destroy()
762+
{
763+
// Called during scheduler shutdown drain when this completion_op is
764+
// in the scheduler's ready queue (posted by cancel_timer() or
765+
// process_expired()). Balances the work_started() from
766+
// implementation::wait(). The scheduler drain loop separately
767+
// balances the work_started() from post(). On IOCP both decrements
768+
// are required for outstanding_work_ to reach zero; on other
769+
// backends this is harmless.
770+
//
771+
// This override also prevents scheduler_op::destroy() from calling
772+
// do_complete(nullptr, ...). See also: timer_service::shutdown()
773+
// which drains waiters still in the timer heap (the other path).
774+
auto* w = waiter_;
775+
w->stop_cb_.reset();
776+
auto h = std::exchange(w->h_, {});
777+
auto& sched = w->svc_->get_scheduler();
778+
delete w;
779+
sched.work_finished();
780+
if (h)
781+
h.destroy();
782+
}
783+
751784
inline std::coroutine_handle<>
752785
timer_service::implementation::wait(
753786
std::coroutine_handle<> h,

include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,6 @@ class BOOST_COROSIO_DECL epoll_scheduler final
288288
mutable op_queue completed_ops_;
289289
mutable std::atomic<long> outstanding_work_;
290290
bool stopped_;
291-
bool shutdown_;
292291

293292
// True while a thread is blocked in epoll_wait. Used by
294293
// wake_one_thread_and_unlock and work_finished to know when
@@ -612,7 +611,6 @@ inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
612611
, timer_fd_(-1)
613612
, outstanding_work_(0)
614613
, stopped_(false)
615-
, shutdown_(false)
616614
, task_running_{false}
617615
, task_interrupted_(false)
618616
, state_(0)
@@ -696,7 +694,6 @@ epoll_scheduler::shutdown()
696694
{
697695
{
698696
std::unique_lock lock(mutex_);
699-
shutdown_ = true;
700697

701698
while (auto* h = completed_ops_.pop())
702699
{
@@ -710,8 +707,6 @@ epoll_scheduler::shutdown()
710707
signal_all(lock);
711708
}
712709

713-
outstanding_work_.store(0, std::memory_order_release);
714-
715710
if (event_fd_ >= 0)
716711
interrupt_reactor();
717712
}
@@ -736,7 +731,9 @@ epoll_scheduler::post(std::coroutine_handle<> h) const
736731

737732
void destroy() override
738733
{
734+
auto h = h_;
739735
delete this;
736+
h.destroy();
740737
}
741738
};
742739

include/boost/corosio/native/detail/iocp/win_scheduler.hpp

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ class BOOST_COROSIO_DECL win_scheduler final
100100
void* iocp_;
101101
mutable long outstanding_work_;
102102
mutable long stopped_;
103-
long shutdown_;
104103
long stop_event_posted_;
105104
mutable long dispatch_required_;
106105

@@ -162,7 +161,6 @@ inline win_scheduler::win_scheduler(
162161
: iocp_(nullptr)
163162
, outstanding_work_(0)
164163
, stopped_(0)
165-
, shutdown_(0)
166164
, stop_event_posted_(0)
167165
, dispatch_required_(0)
168166
{
@@ -194,51 +192,59 @@ inline win_scheduler::~win_scheduler()
194192
inline void
195193
win_scheduler::shutdown()
196194
{
197-
::InterlockedExchange(&shutdown_, 1);
198-
199195
if (timers_)
200196
timers_->stop();
201197

202-
for (;;)
198+
// Drain timer heap before the work-counting loop. The timer_service
199+
// was registered after this scheduler (nested make_service from our
200+
// constructor), so execution_context::shutdown() calls us first.
201+
// Asio avoids this by owning timer queues directly inside the
202+
// scheduler; we bridge the gap by shutting down the timer service
203+
// early. The subsequent call from execution_context is a no-op.
204+
if (timer_svc_)
205+
timer_svc_->shutdown();
206+
207+
while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
203208
{
204209
op_queue ops;
205210
{
206211
std::lock_guard<win_mutex> lock(dispatch_mutex_);
207212
ops.splice(completed_ops_);
208213
}
209214

210-
bool drained_any = false;
211-
212-
while (auto* h = ops.pop())
215+
if (!ops.empty())
213216
{
214-
h->destroy();
215-
drained_any = true;
216-
}
217-
218-
DWORD bytes;
219-
ULONG_PTR key;
220-
LPOVERLAPPED overlapped;
221-
::GetQueuedCompletionStatus(iocp_, &bytes, &key, &overlapped, 0);
222-
if (overlapped)
223-
{
224-
if (key == key_posted)
217+
while (auto* h = ops.pop())
225218
{
226-
auto* op = reinterpret_cast<scheduler_op*>(overlapped);
227-
op->destroy();
219+
::InterlockedDecrement(&outstanding_work_);
220+
h->destroy();
228221
}
229-
else
222+
}
223+
else
224+
{
225+
DWORD bytes;
226+
ULONG_PTR key;
227+
LPOVERLAPPED overlapped;
228+
::GetQueuedCompletionStatus(
229+
iocp_, &bytes, &key, &overlapped,
230+
iocp::max_gqcs_timeout);
231+
if (overlapped)
230232
{
231-
auto* op = overlapped_to_op(overlapped);
232-
op->destroy();
233+
::InterlockedDecrement(&outstanding_work_);
234+
if (key == key_posted)
235+
{
236+
auto* op =
237+
reinterpret_cast<scheduler_op*>(overlapped);
238+
op->destroy();
239+
}
240+
else
241+
{
242+
auto* op = overlapped_to_op(overlapped);
243+
op->destroy();
244+
}
233245
}
234-
drained_any = true;
235246
}
236-
237-
if (!drained_any)
238-
break;
239247
}
240-
241-
::InterlockedExchange(&outstanding_work_, 0);
242248
}
243249

244250
inline void
@@ -254,7 +260,28 @@ win_scheduler::post(std::coroutine_handle<> h) const
254260
auto* self = static_cast<post_handler*>(base);
255261
if (!owner)
256262
{
263+
// Shutdown path: destroy the coroutine frame synchronously.
264+
//
265+
// Bounded destruction invariant: the chain triggered by
266+
// coro.destroy() is at most two levels deep:
267+
// 1. task frame destroyed → ~io_awaitable_promise_base()
268+
// destroys stored continuation (if != noop_coroutine)
269+
// 2. continuation (trampoline) destroyed → final_suspend
270+
// returns suspend_never, no further continuation
271+
//
272+
// If a future refactor adds deeper continuation chains,
273+
// this would reintroduce re-entrant stack overflow risk.
274+
#ifndef NDEBUG
275+
static thread_local int destroy_depth = 0;
276+
++destroy_depth;
277+
BOOST_COROSIO_ASSERT(destroy_depth <= 2);
278+
#endif
279+
auto coro = self->h_;
257280
delete self;
281+
coro.destroy();
282+
#ifndef NDEBUG
283+
--destroy_depth;
284+
#endif
258285
return;
259286
}
260287
auto coro = self->h_;

include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ class BOOST_COROSIO_DECL kqueue_scheduler final
363363
mutable op_queue completed_ops_;
364364
mutable std::atomic<std::int64_t> outstanding_work_{0};
365365
std::atomic<bool> stopped_{false};
366-
bool shutdown_ = false;
367366

368367
// True while a thread is blocked in kevent(). Used by
369368
// wake_one_thread_and_unlock and work_finished to know when
@@ -709,7 +708,6 @@ inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
709708
: kq_fd_(-1)
710709
, outstanding_work_(0)
711710
, stopped_(false)
712-
, shutdown_(false)
713711
, task_running_(false)
714712
, task_interrupted_(false)
715713
, state_(0)
@@ -764,7 +762,6 @@ kqueue_scheduler::shutdown()
764762
{
765763
{
766764
std::unique_lock lock(mutex_);
767-
shutdown_ = true;
768765

769766
while (auto* h = completed_ops_.pop())
770767
{
@@ -778,8 +775,6 @@ kqueue_scheduler::shutdown()
778775
signal_all(lock);
779776
}
780777

781-
outstanding_work_.store(0, std::memory_order_release);
782-
783778
if (kq_fd_ >= 0)
784779
interrupt_reactor();
785780
}
@@ -808,7 +803,9 @@ kqueue_scheduler::post(std::coroutine_handle<> h) const
808803

809804
void destroy() override
810805
{
806+
auto h = h_;
811807
delete this;
808+
h.destroy();
812809
}
813810
};
814811

include/boost/corosio/native/detail/select/select_scheduler.hpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ class BOOST_COROSIO_DECL select_scheduler final
156156
mutable op_queue completed_ops_;
157157
mutable std::atomic<long> outstanding_work_;
158158
std::atomic<bool> stopped_;
159-
bool shutdown_;
160159

161160
// Per-fd state for tracking registered operations
162161
struct fd_state
@@ -259,7 +258,6 @@ inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
259258
: pipe_fds_{-1, -1}
260259
, outstanding_work_(0)
261260
, stopped_(false)
262-
, shutdown_(false)
263261
, max_fd_(-1)
264262
, reactor_running_(false)
265263
, reactor_interrupted_(false)
@@ -325,7 +323,6 @@ select_scheduler::shutdown()
325323
{
326324
{
327325
std::unique_lock lock(mutex_);
328-
shutdown_ = true;
329326

330327
while (auto* h = completed_ops_.pop())
331328
{
@@ -337,8 +334,6 @@ select_scheduler::shutdown()
337334
}
338335
}
339336

340-
outstanding_work_.store(0, std::memory_order_release);
341-
342337
if (pipe_fds_[1] >= 0)
343338
interrupt_reactor();
344339

@@ -365,7 +360,9 @@ select_scheduler::post(std::coroutine_handle<> h) const
365360

366361
void destroy() override
367362
{
363+
auto h = h_;
368364
delete this;
365+
h.destroy();
369366
}
370367
};
371368

0 commit comments

Comments
 (0)