Skip to content

Commit 32dfdbb

Browse files
committed
Replace per-operation thread spawning with shared thread pool
1 parent 9132ffc commit 32dfdbb

File tree

6 files changed

+650
-330
lines changed

6 files changed

+650
-330
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
//
2+
// Copyright (c) 2026 Steve Gerbino
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/corosio
8+
//
9+
10+
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11+
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12+
13+
#include <boost/corosio/detail/config.hpp>
14+
#include <boost/corosio/detail/intrusive.hpp>
15+
#include <boost/capy/ex/execution_context.hpp>
16+
17+
#include <condition_variable>
18+
#include <mutex>
19+
#include <stdexcept>
20+
#include <thread>
21+
#include <vector>
22+
23+
namespace boost::corosio::detail {
24+
25+
/** Base class for thread pool work items.
26+
27+
Derive from this to create work that can be posted to a
28+
@ref thread_pool. Uses static function pointer dispatch,
29+
consistent with the IOCP `op` pattern.
30+
31+
@par Example
32+
@code
33+
struct my_work : pool_work_item
34+
{
35+
int* result;
36+
static void execute( pool_work_item* w ) noexcept
37+
{
38+
auto* self = static_cast<my_work*>( w );
39+
*self->result = 42;
40+
}
41+
};
42+
43+
my_work w;
44+
w.func_ = &my_work::execute;
45+
w.result = &r;
46+
pool.post( &w );
47+
@endcode
48+
*/
49+
struct pool_work_item : intrusive_queue<pool_work_item>::node
50+
{
51+
/// Static dispatch function signature.
52+
using func_type = void (*)(pool_work_item*) noexcept;
53+
54+
/// Completion handler invoked by the worker thread.
55+
func_type func_ = nullptr;
56+
};
57+
58+
/** Shared thread pool for dispatching blocking operations.
59+
60+
Provides a fixed pool of reusable worker threads for operations
61+
that cannot be integrated with async I/O (e.g. blocking DNS
62+
calls). Registered as an `execution_context::service` so it
63+
is a singleton per io_context.
64+
65+
Threads are created eagerly in the constructor. The default
66+
thread count is 1.
67+
68+
@par Thread Safety
69+
All public member functions are thread-safe.
70+
71+
@par Shutdown
72+
Sets a shutdown flag, notifies all threads, and joins them.
73+
In-flight blocking calls complete naturally before the thread
74+
exits.
75+
*/
76+
class thread_pool final
77+
: public capy::execution_context::service
78+
{
79+
std::mutex mutex_;
80+
std::condition_variable cv_;
81+
intrusive_queue<pool_work_item> work_queue_;
82+
std::vector<std::thread> threads_;
83+
bool shutdown_ = false;
84+
85+
void worker_loop();
86+
87+
public:
88+
using key_type = thread_pool;
89+
90+
/** Construct the thread pool service.
91+
92+
Eagerly creates all worker threads.
93+
94+
@par Exception Safety
95+
Strong guarantee. If thread creation fails, all
96+
already-created threads are shut down and joined
97+
before the exception propagates.
98+
99+
@param ctx Reference to the owning execution_context.
100+
@param num_threads Number of worker threads. Must be
101+
at least 1.
102+
103+
@throws std::logic_error If `num_threads` is 0.
104+
*/
105+
explicit thread_pool(
106+
capy::execution_context& ctx,
107+
unsigned num_threads = 1)
108+
{
109+
(void)ctx;
110+
if (!num_threads)
111+
throw std::logic_error(
112+
"thread_pool requires at least 1 thread");
113+
threads_.reserve(num_threads);
114+
try
115+
{
116+
for (unsigned i = 0; i < num_threads; ++i)
117+
threads_.emplace_back([this] { worker_loop(); });
118+
}
119+
catch (...)
120+
{
121+
shutdown();
122+
throw;
123+
}
124+
}
125+
126+
~thread_pool() override = default;
127+
128+
thread_pool(thread_pool const&) = delete;
129+
thread_pool& operator=(thread_pool const&) = delete;
130+
131+
/** Enqueue a work item for execution on the thread pool.
132+
133+
Zero-allocation: the caller owns the work item's storage.
134+
135+
@param w The work item to execute. Must remain valid until
136+
its `func_` has been called.
137+
138+
@return `true` if the item was enqueued, `false` if the
139+
pool has already shut down.
140+
*/
141+
bool post(pool_work_item* w) noexcept;
142+
143+
/** Shut down the thread pool.
144+
145+
Signals all threads to exit after draining any
146+
remaining queued work, then joins them.
147+
*/
148+
void shutdown() override;
149+
};
150+
151+
inline void
152+
thread_pool::worker_loop()
153+
{
154+
for (;;)
155+
{
156+
pool_work_item* w;
157+
{
158+
std::unique_lock<std::mutex> lock(mutex_);
159+
cv_.wait(lock, [this] {
160+
return shutdown_ || !work_queue_.empty();
161+
});
162+
163+
w = work_queue_.pop();
164+
if (!w)
165+
{
166+
if (shutdown_)
167+
return;
168+
continue;
169+
}
170+
}
171+
w->func_(w);
172+
}
173+
}
174+
175+
inline bool
176+
thread_pool::post(pool_work_item* w) noexcept
177+
{
178+
{
179+
std::lock_guard<std::mutex> lock(mutex_);
180+
if (shutdown_)
181+
return false;
182+
work_queue_.push(w);
183+
}
184+
cv_.notify_one();
185+
return true;
186+
}
187+
188+
inline void
189+
thread_pool::shutdown()
190+
{
191+
{
192+
std::lock_guard<std::mutex> lock(mutex_);
193+
shutdown_ = true;
194+
}
195+
cv_.notify_all();
196+
197+
for (auto& t : threads_)
198+
{
199+
if (t.joinable())
200+
t.join();
201+
}
202+
threads_.clear();
203+
204+
{
205+
std::lock_guard<std::mutex> lock(mutex_);
206+
while (work_queue_.pop())
207+
;
208+
}
209+
}
210+
211+
} // namespace boost::corosio::detail
212+
213+
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP

include/boost/corosio/native/detail/iocp/win_resolver.hpp

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#endif
2424

2525
#include <boost/corosio/detail/scheduler.hpp>
26+
#include <boost/corosio/detail/thread_pool.hpp>
2627
#include <boost/corosio/endpoint.hpp>
2728
#include <boost/corosio/resolver.hpp>
2829
#include <boost/corosio/resolver_results.hpp>
@@ -42,12 +43,9 @@
4243
#include <WS2tcpip.h>
4344

4445
#include <atomic>
45-
#include <condition_variable>
4646
#include <cstring>
4747
#include <memory>
4848
#include <string>
49-
#include <thread>
50-
#include <unordered_map>
5149

5250
// MinGW may not have GetAddrInfoExCancel declared
5351
#if defined(__MINGW32__) || defined(__MINGW64__)
@@ -72,30 +70,28 @@ extern "C"
7270
Reverse Resolution (GetNameInfoW)
7371
---------------------------------
7472
Unlike GetAddrInfoExW, GetNameInfoW has no async variant. Reverse
75-
resolution spawns a detached worker thread that calls GetNameInfoW
76-
and posts the result to the scheduler upon completion.
73+
resolution dispatches the blocking call to the shared
74+
resolver_thread_pool service.
7775
7876
Class Hierarchy
7977
---------------
8078
- win_resolver_service (execution_context::service)
8179
- Owns all win_resolver instances via shared_ptr
8280
- Coordinates with win_scheduler for work tracking
83-
- Tracks active worker threads for safe shutdown
8481
- win_resolver (one per resolver object)
8582
- Contains embedded resolve_op and reverse_resolve_op
8683
- Inherits from enable_shared_from_this for thread safety
8784
- resolve_op (overlapped_op subclass)
8885
- OVERLAPPED base enables IOCP integration
8986
- Static completion() callback invoked by Windows
9087
- reverse_resolve_op (overlapped_op subclass)
91-
- Used by worker thread for reverse resolution
88+
- Used by pool thread for reverse resolution
9289
93-
Shutdown Synchronization
94-
------------------------
95-
The service uses condition_variable_any and win_mutex to track active
96-
worker threads. During shutdown(), the service waits for all threads
97-
to complete before destroying resources. Worker threads always post
98-
their completions so the scheduler can properly drain them via destroy().
90+
Shutdown
91+
--------
92+
The resolver service cancels all resolvers and clears the impl map.
93+
The thread pool service shuts down separately via execution_context
94+
service ordering, joining all worker threads.
9995
10096
Cancellation
10197
------------
@@ -128,17 +124,14 @@ extern "C"
128124
129125
Reverse Resolution (GetNameInfoW)
130126
---------------------------------
131-
Unlike GetAddrInfoExW, GetNameInfoW has no async variant. We use a worker
132-
thread approach similar to POSIX:
133-
1. reverse_resolve() spawns a detached worker thread
134-
2. Worker calls GetNameInfoW() (blocking)
135-
3. Worker converts wide results to UTF-8 via WideCharToMultiByte
136-
4. Worker posts completion to scheduler
127+
Unlike GetAddrInfoExW, GetNameInfoW has no async variant. The blocking
128+
call is dispatched to the shared resolver_thread_pool:
129+
1. reverse_resolve() posts work to the thread pool
130+
2. Pool thread calls GetNameInfoW() (blocking)
131+
3. Pool thread converts wide results to UTF-8 via WideCharToMultiByte
132+
4. Pool thread posts completion to scheduler
137133
5. op_() resumes the coroutine with results
138134
139-
Thread tracking (thread_started/thread_finished) ensures safe shutdown
140-
by waiting for all worker threads before destroying the service.
141-
142135
String Conversion
143136
-----------------
144137
Windows APIs require wide strings. We use MultiByteToWideChar for
@@ -250,6 +243,16 @@ class win_resolver final
250243
friend struct resolve_op;
251244

252245
public:
246+
/// Embedded pool work item for thread pool dispatch.
247+
struct pool_op : pool_work_item
248+
{
249+
/// Resolver that owns this work item.
250+
win_resolver* resolver_ = nullptr;
251+
252+
/// Prevent impl destruction while work is in flight.
253+
std::shared_ptr<win_resolver> ref_;
254+
};
255+
253256
explicit win_resolver(win_resolver_service& svc) noexcept;
254257

255258
std::coroutine_handle<> resolve(
@@ -276,6 +279,12 @@ class win_resolver final
276279
resolve_op op_;
277280
reverse_resolve_op reverse_op_;
278281

282+
/// Pool work item for reverse resolution.
283+
pool_op reverse_pool_op_;
284+
285+
/// Execute blocking `GetNameInfoW()` on a pool thread.
286+
static void do_reverse_resolve_work(pool_work_item*) noexcept;
287+
279288
private:
280289
win_resolver_service& svc_;
281290
};

0 commit comments

Comments
 (0)