Skip to content

Commit 93b2b2f

Browse files
committed
Symmetric-transfer thread_pool dispatch on worker threads
dispatch() previously always posted, forcing a queue round-trip even when the caller was already running on one of the pool's workers. It now returns c.h when the calling thread is a worker of the pool, per the Executor concept's inline-resume contract. A thread_local_ptr marker set via RAII in run() identifies pool workers; cross-pool and non-worker callers still post.
1 parent 1fafde7 commit 93b2b2f

3 files changed

Lines changed: 175 additions & 19 deletions

File tree

include/boost/capy/ex/thread_pool.hpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,22 +196,22 @@ class thread_pool::executor_type
196196

197197
/** Dispatch a continuation for execution.
198198
199-
Posts the continuation to the thread pool for execution on a
200-
worker thread and returns `std::noop_coroutine()`. Thread
201-
pools never execute inline because no single thread "owns"
202-
the pool.
203-
204-
@param c The continuation to execute. Must remain at a
205-
stable address until dequeued and resumed.
206-
207-
@return `std::noop_coroutine()` always.
199+
If the calling thread is a worker of this pool, returns
200+
`c.h` for symmetric transfer so the caller can resume the
201+
continuation inline. Otherwise, posts the continuation to
202+
the pool for execution on a worker thread and returns
203+
`std::noop_coroutine()`.
204+
205+
@param c The continuation to execute. On the post path,
206+
must remain at a stable address until dequeued
207+
and resumed.
208+
209+
@return `c.h` when the calling thread is a pool worker;
210+
`std::noop_coroutine()` otherwise.
208211
*/
212+
BOOST_CAPY_DECL
209213
std::coroutine_handle<>
210-
dispatch(continuation& c) const
211-
{
212-
post(c);
213-
return std::noop_coroutine();
214-
}
214+
dispatch(continuation& c) const;
215215

216216
/** Post a continuation to the thread pool.
217217

src/ex/thread_pool.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <boost/capy/ex/thread_pool.hpp>
1212
#include <boost/capy/continuation.hpp>
13+
#include <boost/capy/detail/thread_local_ptr.hpp>
1314
#include <boost/capy/ex/frame_allocator.hpp>
1415
#include <boost/capy/test/thread_name.hpp>
1516
#include <algorithm>
@@ -51,6 +52,12 @@ namespace capy {
5152

5253
class thread_pool::impl
5354
{
55+
// Identifies the pool owning the current worker thread, or
56+
// nullptr if the calling thread is not a pool worker. Checked
57+
// by dispatch() to decide between symmetric transfer (inline
58+
// resume) and post.
59+
static inline detail::thread_local_ptr<impl const> current_;
60+
5461
// Intrusive queue of continuations via continuation::next.
5562
// No per-post allocation: the continuation is owned by the caller.
5663
continuation* head_ = nullptr;
@@ -96,6 +103,12 @@ class thread_pool::impl
96103
public:
97104
~impl() = default;
98105

106+
bool
107+
running_in_this_thread() const noexcept
108+
{
109+
return current_.get() == this;
110+
}
111+
99112
// Destroy abandoned coroutine frames. Must be called
100113
// before execution_context::shutdown()/destroy() so
101114
// that suspended-frame destructors (e.g. delay_awaitable
@@ -213,6 +226,14 @@ class thread_pool::impl
213226
std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
214227
set_current_thread_name(name);
215228

229+
// Mark this thread as a worker of this pool so dispatch()
230+
// can symmetric-transfer when called from within pool work.
231+
struct scoped_pool
232+
{
233+
scoped_pool(impl const* p) noexcept { current_.set(p); }
234+
~scoped_pool() noexcept { current_.set(nullptr); }
235+
} guard(this);
236+
216237
for(;;)
217238
{
218239
continuation* c = nullptr;
@@ -297,5 +318,15 @@ post(continuation& c) const
297318
pool_->impl_->post(c);
298319
}
299320

321+
std::coroutine_handle<>
322+
thread_pool::executor_type::
323+
dispatch(continuation& c) const
324+
{
325+
if(pool_->impl_->running_in_this_thread())
326+
return c.h;
327+
pool_->impl_->post(c);
328+
return std::noop_coroutine();
329+
}
330+
300331
} // capy
301332
} // boost

test/unit/ex/thread_pool.cpp

Lines changed: 130 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,46 @@ struct test_service : execution_context::service
4949
void shutdown() override {}
5050
};
5151

52+
// Probe coroutine starts suspended; resuming it completes and
53+
// auto-destroys the frame (suspend_never final). If never
54+
// resumed, probe_coro's dtor destroys it.
55+
struct probe_coro
56+
{
57+
struct promise_type
58+
{
59+
probe_coro
60+
get_return_object() noexcept
61+
{
62+
return probe_coro{
63+
std::coroutine_handle<promise_type>::from_promise(*this)};
64+
}
65+
std::suspend_always initial_suspend() noexcept { return {}; }
66+
std::suspend_never final_suspend() noexcept { return {}; }
67+
void return_void() noexcept {}
68+
void unhandled_exception() { std::terminate(); }
69+
};
70+
71+
std::coroutine_handle<promise_type> h_;
72+
73+
~probe_coro() { if(h_) h_.destroy(); }
74+
75+
probe_coro(probe_coro&& other) noexcept
76+
: h_(other.h_) { other.h_ = nullptr; }
77+
78+
std::coroutine_handle<void> handle() const noexcept { return h_; }
79+
void release() noexcept { h_ = nullptr; }
80+
81+
private:
82+
explicit probe_coro(std::coroutine_handle<promise_type> h)
83+
: h_(h) {}
84+
};
85+
86+
inline probe_coro
87+
make_probe()
88+
{
89+
co_return;
90+
}
91+
5292
#if defined(BOOST_CAPY_TEST_CAN_GET_THREAD_NAME)
5393
// Result storage for thread name check
5494
struct name_check_result
@@ -189,12 +229,95 @@ struct thread_pool_test
189229
void
190230
testDispatch()
191231
{
192-
continuation c{std::noop_coroutine()};
193-
thread_pool pool(1);
194-
auto ex = pool.get_executor();
232+
// From outside any pool, dispatch() posts.
233+
auto probe = make_probe();
234+
auto probe_h = probe.handle();
235+
auto* target = new continuation{probe_h};
236+
237+
std::coroutine_handle<> returned;
238+
{
239+
thread_pool pool(1);
240+
auto ex = pool.get_executor();
241+
returned = ex.dispatch(*target);
242+
}
243+
244+
BOOST_TEST(returned != probe_h);
245+
if(returned != probe_h)
246+
probe.release();
247+
delete target;
248+
}
249+
250+
void
251+
testDispatchSymmetricTransfer()
252+
{
253+
// From a worker thread of the same pool, dispatch()
254+
// returns c.h for symmetric transfer and does not
255+
// enqueue the continuation.
256+
auto probe = make_probe();
257+
auto probe_h = probe.handle();
258+
259+
// Heap-allocated so target outlives the pool if a buggy
260+
// implementation erroneously posts it.
261+
auto* target = new continuation{probe_h};
262+
263+
std::atomic<bool> done{false};
264+
std::coroutine_handle<> returned;
265+
266+
{
267+
thread_pool pool(1);
268+
auto ex = pool.get_executor();
269+
270+
run_async(ex, [&]{
271+
returned = ex.dispatch(*target);
272+
done.store(true);
273+
})(void_task());
274+
275+
BOOST_TEST(wait_for([&]{ return done.load(); }));
276+
}
277+
278+
// On symmetric transfer the returned handle equals the
279+
// target's handle and the probe is never enqueued.
280+
BOOST_TEST(returned == probe_h);
281+
282+
// If the dispatch posted (buggy), the pool destructor's
283+
// drain_abandoned already destroyed probe_h; release so
284+
// the probe_coro dtor does not double-destroy.
285+
if(returned != probe_h)
286+
probe.release();
287+
delete target;
288+
}
289+
290+
void
291+
testDispatchCrossPool()
292+
{
293+
// Worker threads of pool A are not workers of pool B:
294+
// dispatch() on B from an A worker must post, not
295+
// symmetric-transfer.
296+
auto probe = make_probe();
297+
auto probe_h = probe.handle();
298+
auto* target = new continuation{probe_h};
299+
300+
std::atomic<bool> done{false};
301+
std::coroutine_handle<> returned;
302+
303+
{
304+
thread_pool pool_a(1);
305+
thread_pool pool_b(1);
306+
auto ex_a = pool_a.get_executor();
307+
auto ex_b = pool_b.get_executor();
308+
309+
run_async(ex_a, [&]{
310+
returned = ex_b.dispatch(*target);
311+
done.store(true);
312+
})(void_task());
313+
314+
BOOST_TEST(wait_for([&]{ return done.load(); }));
315+
}
195316

196-
// dispatch() always posts for thread_pool (returns void)
197-
ex.dispatch(c);
317+
BOOST_TEST(returned != probe_h);
318+
if(returned != probe_h)
319+
probe.release();
320+
delete target;
198321
}
199322

200323
void
@@ -584,6 +707,8 @@ struct thread_pool_test
584707
testPostWork();
585708
testWorkCounting();
586709
testDispatch();
710+
testDispatchSymmetricTransfer();
711+
testDispatchCrossPool();
587712
testServiceManagement();
588713
testMakeService();
589714
testConcurrentPost();

0 commit comments

Comments
 (0)