Skip to content

Commit b5e5d64

Browse files
committed
feat: add POSIX scheduler for non-Windows platforms
- Add posix_scheduler.hpp with class declaration using std::mutex/condition_variable - Add posix_scheduler.cpp implementing run/post/poll methods with thread-safe work tracking - Update io_context.cpp to use posix_scheduler instead of reactive_scheduler on non-Windows - Fix resolver_impl access by making it public (fixes build error)
1 parent b42dd24 commit b5e5d64

4 files changed

Lines changed: 424 additions & 3 deletions

File tree

include/boost/corosio/resolver.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ class resolver : public io_object
316316
BOOST_COROSIO_DECL
317317
void cancel();
318318

319-
private:
319+
public:
320320
struct resolver_impl : io_object_impl
321321
{
322322
virtual void resolve(
@@ -330,6 +330,7 @@ class resolver : public io_object
330330
resolver_results*) = 0;
331331
};
332332

333+
private:
333334
inline resolver_impl& get() const noexcept
334335
{
335336
return *static_cast<resolver_impl*>(impl_);

src/src/detail/posix_scheduler.cpp

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
//
2+
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/corosio
8+
//
9+
10+
#ifndef _WIN32
11+
12+
#include "src/detail/posix_scheduler.hpp"
13+
14+
#include <boost/capy/core/thread_local_ptr.hpp>
15+
16+
#include <limits>
17+
18+
namespace boost {
19+
namespace corosio {
20+
namespace detail {
21+
22+
namespace {
23+
24+
struct scheduler_context
25+
{
26+
posix_scheduler const* key;
27+
scheduler_context* next;
28+
};
29+
30+
capy::thread_local_ptr<scheduler_context> context_stack;
31+
32+
struct thread_context_guard
33+
{
34+
scheduler_context frame_;
35+
36+
explicit thread_context_guard(
37+
posix_scheduler const* ctx) noexcept
38+
: frame_{ctx, context_stack.get()}
39+
{
40+
context_stack.set(&frame_);
41+
}
42+
43+
~thread_context_guard() noexcept
44+
{
45+
context_stack.set(frame_.next);
46+
}
47+
};
48+
49+
} // namespace
50+
51+
posix_scheduler::
52+
posix_scheduler(
53+
capy::execution_context&,
54+
int)
55+
: outstanding_work_(0)
56+
, stopped_(false)
57+
, shutdown_(false)
58+
{
59+
}
60+
61+
posix_scheduler::
62+
~posix_scheduler()
63+
{
64+
}
65+
66+
void
67+
posix_scheduler::
68+
shutdown()
69+
{
70+
std::unique_lock lock(mutex_);
71+
shutdown_ = true;
72+
73+
// Drain all outstanding operations without invoking handlers
74+
while (outstanding_work_.load(std::memory_order_acquire) > 0)
75+
{
76+
while (auto* h = completed_ops_.pop())
77+
{
78+
outstanding_work_.fetch_sub(1, std::memory_order_relaxed);
79+
lock.unlock();
80+
h->destroy();
81+
lock.lock();
82+
}
83+
84+
// If work count still positive but queue empty,
85+
// wait briefly for more completions
86+
if (outstanding_work_.load(std::memory_order_acquire) > 0 &&
87+
completed_ops_.empty())
88+
{
89+
lock.unlock();
90+
std::this_thread::yield();
91+
lock.lock();
92+
}
93+
}
94+
}
95+
96+
void
97+
posix_scheduler::
98+
post(capy::any_coro h) const
99+
{
100+
struct post_handler
101+
: capy::execution_context::handler
102+
{
103+
capy::any_coro h_;
104+
105+
explicit
106+
post_handler(capy::any_coro h)
107+
: h_(h)
108+
{
109+
}
110+
111+
void operator()() override
112+
{
113+
auto h = h_;
114+
delete this;
115+
h.resume();
116+
}
117+
118+
void destroy() override
119+
{
120+
delete this;
121+
}
122+
};
123+
124+
auto* ph = new post_handler(h);
125+
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
126+
127+
{
128+
std::lock_guard lock(mutex_);
129+
completed_ops_.push(ph);
130+
}
131+
wakeup_.notify_one();
132+
}
133+
134+
void
135+
posix_scheduler::
136+
post(capy::execution_context::handler* h) const
137+
{
138+
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
139+
140+
{
141+
std::lock_guard lock(mutex_);
142+
completed_ops_.push(h);
143+
}
144+
wakeup_.notify_one();
145+
}
146+
147+
void
148+
posix_scheduler::
149+
on_work_started() noexcept
150+
{
151+
outstanding_work_.fetch_add(1, std::memory_order_relaxed);
152+
}
153+
154+
void
155+
posix_scheduler::
156+
on_work_finished() noexcept
157+
{
158+
if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
159+
stop();
160+
}
161+
162+
bool
163+
posix_scheduler::
164+
running_in_this_thread() const noexcept
165+
{
166+
for (auto* c = context_stack.get(); c != nullptr; c = c->next)
167+
if (c->key == this)
168+
return true;
169+
return false;
170+
}
171+
172+
void
173+
posix_scheduler::
174+
stop()
175+
{
176+
bool expected = false;
177+
if (stopped_.compare_exchange_strong(expected, true,
178+
std::memory_order_release, std::memory_order_relaxed))
179+
{
180+
std::lock_guard lock(mutex_);
181+
wakeup_.notify_all();
182+
}
183+
}
184+
185+
bool
186+
posix_scheduler::
187+
stopped() const noexcept
188+
{
189+
return stopped_.load(std::memory_order_acquire);
190+
}
191+
192+
void
193+
posix_scheduler::
194+
restart()
195+
{
196+
stopped_.store(false, std::memory_order_release);
197+
}
198+
199+
std::size_t
200+
posix_scheduler::
201+
run()
202+
{
203+
if (outstanding_work_.load(std::memory_order_acquire) == 0)
204+
{
205+
stop();
206+
return 0;
207+
}
208+
209+
thread_context_guard ctx(this);
210+
211+
std::size_t n = 0;
212+
while (do_one(-1))
213+
if (n != (std::numeric_limits<std::size_t>::max)())
214+
++n;
215+
return n;
216+
}
217+
218+
std::size_t
219+
posix_scheduler::
220+
run_one()
221+
{
222+
if (outstanding_work_.load(std::memory_order_acquire) == 0)
223+
{
224+
stop();
225+
return 0;
226+
}
227+
228+
thread_context_guard ctx(this);
229+
return do_one(-1);
230+
}
231+
232+
std::size_t
233+
posix_scheduler::
234+
wait_one(long usec)
235+
{
236+
if (outstanding_work_.load(std::memory_order_acquire) == 0)
237+
{
238+
stop();
239+
return 0;
240+
}
241+
242+
thread_context_guard ctx(this);
243+
return do_one(usec);
244+
}
245+
246+
std::size_t
247+
posix_scheduler::
248+
poll()
249+
{
250+
if (outstanding_work_.load(std::memory_order_acquire) == 0)
251+
{
252+
stop();
253+
return 0;
254+
}
255+
256+
thread_context_guard ctx(this);
257+
258+
std::size_t n = 0;
259+
while (do_one(0))
260+
if (n != (std::numeric_limits<std::size_t>::max)())
261+
++n;
262+
return n;
263+
}
264+
265+
std::size_t
266+
posix_scheduler::
267+
poll_one()
268+
{
269+
if (outstanding_work_.load(std::memory_order_acquire) == 0)
270+
{
271+
stop();
272+
return 0;
273+
}
274+
275+
thread_context_guard ctx(this);
276+
return do_one(0);
277+
}
278+
279+
// RAII guard - work_finished called even if handler throws
280+
struct work_guard
281+
{
282+
posix_scheduler* self;
283+
~work_guard() { self->on_work_finished(); }
284+
};
285+
286+
std::size_t
287+
posix_scheduler::
288+
do_one(long timeout_us)
289+
{
290+
std::unique_lock lock(mutex_);
291+
292+
// Check for available work or wait
293+
if (timeout_us < 0)
294+
{
295+
// Infinite wait
296+
wakeup_.wait(lock, [this] {
297+
return stopped_.load(std::memory_order_acquire) ||
298+
!completed_ops_.empty();
299+
});
300+
}
301+
else if (timeout_us > 0)
302+
{
303+
// Timed wait
304+
wakeup_.wait_for(lock, std::chrono::microseconds(timeout_us), [this] {
305+
return stopped_.load(std::memory_order_acquire) ||
306+
!completed_ops_.empty();
307+
});
308+
}
309+
// timeout_us == 0: poll, no wait
310+
311+
if (stopped_.load(std::memory_order_acquire))
312+
return 0;
313+
314+
auto* h = completed_ops_.pop();
315+
if (!h)
316+
return 0;
317+
318+
lock.unlock();
319+
320+
work_guard g{this};
321+
(*h)();
322+
return 1;
323+
}
324+
325+
} // namespace detail
326+
} // namespace corosio
327+
} // namespace boost
328+
329+
#endif

0 commit comments

Comments
 (0)