Skip to content

Commit 8645de4

Browse files
committed
Add thread_pool::join() and fix stop_callback TLS poisoning (#163)
Add join() to thread_pool with real work counting via on_work_started/on_work_finished, matching asio::thread_pool semantics. join() blocks until outstanding work drains, then joins worker threads. stop() interrupts a blocking join(). Fix a deterministic use-after-free where std::stop_callback resumed coroutines inline on the wrong thread, poisoning that thread's TLS frame allocator with a pool-owned resource. Add resume_via_post callable for stop_callbacks to post resumption through the executor. Fix stop_only_awaitable test helper to use the post-back pattern. As defense-in-depth, work::destroy() now destroys abandoned coroutine handles instead of leaking them. Document the stop callback contract on io_env: awaitables must not resume coroutine handles directly in a stop_callback.
1 parent ca16652 commit 8645de4

8 files changed

Lines changed: 641 additions & 39 deletions

File tree

doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,54 @@ The key points:
166166
2. Use the executor to dispatch completion
167167
3. Respect the stop token for cancellation
168168

169+
=== Stop Callbacks Must Post, Not Resume
170+
171+
When implementing a stoppable awaitable, you may register a `std::stop_callback` to wake the coroutine when cancellation is requested. The callback fires synchronously on whatever thread calls `request_stop()`, which is typically *not* the executor's thread.
172+
173+
[WARNING]
174+
====
175+
*Never resume a coroutine handle directly from a stop_callback.* Doing so executes the coroutine on the wrong thread, corrupting the thread-local frame allocator. This causes use-after-free on the next coroutine allocation—potentially in completely unrelated code.
176+
====
177+
178+
Use `stop_resume_callback` and `io_env::post_resume` from `<boost/capy/ex/io_env.hpp>` to post the resume through the executor:
179+
180+
[source,cpp]
181+
----
182+
#include <boost/capy/ex/io_env.hpp>
183+
184+
struct stoppable_awaitable
185+
{
186+
std::optional<stop_resume_callback> stop_cb_;
187+
188+
bool await_ready() { return false; }
189+
190+
std::coroutine_handle<> await_suspend(
191+
std::coroutine_handle<> h, io_env const* env)
192+
{
193+
if (env->stop_token.stop_requested())
194+
return h; // Already cancelled, resume immediately
195+
196+
// Post through executor when stop is requested
197+
stop_cb_.emplace(env->stop_token, env->post_resume(h));
198+
199+
start_async_operation();
200+
return std::noop_coroutine();
201+
}
202+
203+
void await_resume() { /* ... */ }
204+
};
205+
----
206+
207+
The incorrect pattern—which compiles and appears to work but causes memory corruption—looks like this:
208+
209+
[source,cpp]
210+
----
211+
// WRONG: resumes coroutine on the calling thread
212+
stop_cb_.emplace(env->stop_token, h); // h is a raw coroutine_handle
213+
----
214+
215+
See xref:4.coroutines/4e.cancellation.adoc#stoppable-awaitables[Implementing Stoppable Awaitables] for a complete example.
216+
169217
== Reference
170218

171219
[cols="1,3"]

doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,70 @@ Capy's I/O operations (provided by Corosio) respect stop tokens at the OS level:
292292

293293
When you request stop, pending I/O operations are cancelled at the OS level, providing immediate response rather than waiting for the operation to complete naturally.
294294

295-
== Part 8: Patterns
295+
[[stoppable-awaitables]]
296+
== Part 8: Implementing Stoppable Awaitables
297+
298+
The examples above show *polling* for cancellation with `token.stop_requested()`. For awaitables that suspend indefinitely—waiting for I/O, a lock, or an external event—you need a `std::stop_callback` to wake the coroutine when cancellation arrives.
299+
300+
=== The Dangerous Pattern
301+
302+
A `std::stop_callback` fires synchronously on whatever thread calls `request_stop()`. If the callback resumes the coroutine directly, the coroutine runs on the wrong thread:
303+
304+
[source,cpp]
305+
----
306+
// WRONG — causes use-after-free
307+
std::optional<std::stop_callback<std::coroutine_handle<>>> stop_cb;
308+
309+
std::coroutine_handle<> await_suspend(
310+
std::coroutine_handle<> h, io_env const* env)
311+
{
312+
stop_cb.emplace(env->stop_token, h); // Resumes inline!
313+
return std::noop_coroutine();
314+
}
315+
----
316+
317+
When an external thread calls `request_stop()`, `h.resume()` executes the coroutine on that thread. The coroutine machinery sets the thread-local frame allocator to the executor's allocator—poisoning the calling thread's TLS. When the executor's pool destructs, the TLS pointer becomes dangling. The next coroutine allocation on that thread dereferences freed memory.
318+
319+
This bug is deterministic, not a race condition. It manifests as a heap-use-after-free in *unrelated* code—wherever the next coroutine frame happens to be allocated on the poisoned thread.
320+
321+
=== The Correct Pattern: resume_via_post
322+
323+
Use `stop_resume_callback` and `io_env::post_resume` to post the resume through the executor, ensuring the coroutine runs on the correct thread:
324+
325+
[source,cpp]
326+
----
327+
#include <boost/capy/ex/io_env.hpp>
328+
329+
struct my_stoppable_awaitable
330+
{
331+
std::optional<stop_resume_callback> stop_cb_;
332+
// ... other members for the async operation ...
333+
334+
bool await_ready() { return false; }
335+
336+
std::coroutine_handle<> await_suspend(
337+
std::coroutine_handle<> h, io_env const* env)
338+
{
339+
if (env->stop_token.stop_requested())
340+
return h; // Already cancelled
341+
342+
stop_cb_.emplace(env->stop_token, env->post_resume(h));
343+
344+
start_async_operation(h, env);
345+
return std::noop_coroutine();
346+
}
347+
348+
void await_resume() { /* check result or throw */ }
349+
};
350+
----
351+
352+
`stop_resume_callback` is a type alias for `std::stop_callback<resume_via_post>`. `post_resume(h)` creates a `resume_via_post` callable that posts the coroutine handle through this environment's executor.
353+
354+
When `request_stop()` fires the callback, the coroutine handle is posted to the executor's queue instead of resumed inline. The executor's worker thread picks it up and resumes it in the correct execution context.
355+
356+
NOTE: Capy's built-in I/O awaitables (via Corosio) already use the post-back pattern internally. This guidance applies when writing your own custom awaitables.
357+
358+
== Part 9: Patterns
296359

297360
=== Timeout Pattern
298361

doc/unlisted/execution-thread-pool.adoc

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,68 @@ run_async(pool.get_executor())(compute(), [](int result) {
9292

9393
== Lifetime and Shutdown
9494

95-
The pool destructor waits for all work to complete:
95+
=== Waiting for Work: join()
96+
97+
Call `join()` to block until all outstanding work completes:
98+
99+
[source,cpp]
100+
----
101+
thread_pool pool(4);
102+
103+
for (auto& item : batch)
104+
run_async(pool.get_executor())(process(item));
105+
106+
pool.join(); // Blocks until all tasks finish
107+
// Pool is now stopped; worker threads are joined
108+
----
109+
110+
`join()` releases the pool's internal work guard and blocks until the
111+
outstanding work count (tracked by `on_work_started()` / `on_work_finished()`)
112+
reaches zero. After all work completes, the worker threads are joined.
113+
114+
The pool cannot be reused after `join()`. Calling `join()` more than once
115+
is safe (subsequent calls are no-ops).
116+
117+
=== Immediate Stop: stop()
118+
119+
Call `stop()` to abandon remaining work:
120+
121+
[source,cpp]
122+
----
123+
pool.stop(); // Workers exit after current item; queued work is abandoned
124+
pool.join(); // Wait for threads to finish
125+
----
126+
127+
If `join()` is blocking on another thread, calling `stop()` causes it to
128+
stop waiting for outstanding work. The `join()` call still waits for worker
129+
threads to finish their current item and exit before returning.
130+
131+
=== Destructor Behavior
132+
133+
The destructor calls `stop()` then `join()`:
96134

97135
[source,cpp]
98136
----
99137
{
100138
thread_pool pool(4);
101139
run_async(pool.get_executor())(long_running_task());
102-
// Destructor blocks until long_running_task completes
140+
// Destructor: stop() -> join() -> shutdown services -> destroy services
141+
// Queued work that hasn't started is abandoned
103142
}
104143
----
105144

106-
This ensures orderly shutdown without orphaned coroutines.
145+
To wait for all work to complete before shutdown, call `join()` explicitly
146+
before the pool goes out of scope.
107147

108148
=== Destruction Order
109149

110150
When a pool is destroyed:
111151

112-
1. Threads are signaled to stop accepting new work
113-
2. Pending work continues to completion
152+
1. Workers are signaled to stop (pending work is abandoned)
153+
2. Worker threads are joined
114154
3. Services are shut down (in reverse order of creation)
115155
4. Services are destroyed
116-
5. Threads are joined
156+
5. Remaining queued work items are destroyed
117157

118158
== Executor Operations
119159

@@ -161,7 +201,9 @@ Since callers are never "inside" the thread pool's execution context,
161201

162202
== Work Tracking
163203

164-
Work tracking keeps the pool alive while operations are outstanding:
204+
Work tracking keeps the pool alive while operations are outstanding.
205+
When `join()` has been called, the pool waits until the outstanding work
206+
count reaches zero before stopping the worker threads.
165207

166208
[source,cpp]
167209
----
@@ -179,13 +221,14 @@ The `work_guard` RAII wrapper simplifies this:
179221
{
180222
work_guard guard(ex);
181223
// Work count incremented
182-
224+
183225
// ... do work ...
184-
226+
185227
} // Work count decremented
186228
----
187229

188-
`run_async` handles work tracking automatically.
230+
`run_async` handles work tracking automatically — each launched task
231+
holds a `work_guard` for its lifetime.
189232

190233
== Services
191234

@@ -323,6 +366,12 @@ void process_batch()
323366
| `get_executor()`
324367
| Get an executor for the pool
325368

369+
| `join()`
370+
| Wait for all outstanding work to complete
371+
372+
| `stop()`
373+
| Immediately stop the pool, abandoning queued work
374+
326375
| Services
327376
| Polymorphic components owned by the pool
328377
|===

include/boost/capy/ex/io_env.hpp

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,39 @@
1313
#include <boost/capy/detail/config.hpp>
1414
#include <boost/capy/ex/executor_ref.hpp>
1515

16+
#include <coroutine>
1617
#include <memory_resource>
1718
#include <stop_token>
1819

1920
namespace boost {
2021
namespace capy {
2122

23+
/** Callable that posts a coroutine handle to an executor.
24+
25+
Use this as the callback type for `std::stop_callback` instead
26+
of a raw `std::coroutine_handle<>`. Raw handles resume the
27+
coroutine inline on whatever thread calls `request_stop()`,
28+
which bypasses the executor and corrupts the thread-local
29+
frame allocator.
30+
31+
Prefer @ref io_env::post_resume and the @ref stop_resume_callback
32+
alias to construct these—see examples there.
33+
34+
@see io_env::post_resume, stop_resume_callback
35+
*/
36+
struct resume_via_post
37+
{
38+
executor_ref ex;
39+
std::coroutine_handle<> h;
40+
41+
// post() must not throw; stop_callback requires a
42+
// non-throwing invocable.
43+
void operator()() const noexcept
44+
{
45+
ex.post(h);
46+
}
47+
};
48+
2249
/** Execution environment for IoAwaitables.
2350
2451
This struct bundles the execution context passed through
@@ -33,11 +60,27 @@ namespace capy {
3360
chain. Awaitables receive `io_env const*` in `await_suspend`
3461
and should store it directly, never copy the pointed-to object.
3562
63+
@par Stop Callback Contract
64+
65+
Awaitables that register a `std::stop_callback` **must not**
66+
resume the coroutine handle directly. The callback fires
67+
synchronously on the thread that calls `request_stop()`, which
68+
may not be an executor-managed thread. Resuming inline poisons
69+
that thread's TLS frame allocator with the pool's allocator,
70+
causing use-after-free on the next coroutine allocation.
71+
72+
Use @ref io_env::post_resume and @ref stop_resume_callback:
73+
@code
74+
std::optional<stop_resume_callback> stop_cb_;
75+
// In await_suspend:
76+
stop_cb_.emplace(env->stop_token, env->post_resume(h));
77+
@endcode
78+
3679
@par Thread Safety
3780
The referenced executor and allocator must remain valid
3881
for the lifetime of any coroutine using this environment.
3982
40-
@see IoAwaitable, IoRunnable
83+
@see IoAwaitable, IoRunnable, resume_via_post
4184
*/
4285
struct io_env
4386
{
@@ -52,8 +95,43 @@ struct io_env
5295
When null, the default allocator is used.
5396
*/
5497
std::pmr::memory_resource* frame_allocator = nullptr;
98+
99+
/** Create a resume_via_post callable for this environment.
100+
101+
Convenience method for registering @ref stop_resume_callback
102+
instances. Equivalent to `resume_via_post{executor, h}`.
103+
104+
@par Example
105+
@code
106+
stop_cb_.emplace(env->stop_token, env->post_resume(h));
107+
@endcode
108+
109+
@param h The coroutine handle to post on cancellation.
110+
111+
@return A @ref resume_via_post callable that holds a
112+
non-owning @ref executor_ref and the coroutine handle.
113+
The callable must not outlive the executor it references.
114+
115+
@see resume_via_post, stop_resume_callback
116+
*/
117+
resume_via_post
118+
post_resume(std::coroutine_handle<> h) const noexcept
119+
{
120+
return resume_via_post{executor, h};
121+
}
55122
};
56123

124+
/** Type alias for a stop callback that posts through the executor.
125+
126+
Use this to declare the stop callback member in your awaitable:
127+
@code
128+
std::optional<stop_resume_callback> stop_cb_;
129+
@endcode
130+
131+
@see resume_via_post, io_env::post_resume
132+
*/
133+
using stop_resume_callback = std::stop_callback<resume_via_post>;
134+
57135
} // capy
58136
} // boost
59137

0 commit comments

Comments
 (0)