Skip to content

Commit a9e7699

Browse files
committed
Add timeout combinator, delay awaitable, and timer_service (#63)
Introduce a first-class timeout(awaitable, duration) function that races an IoAwaitable against a deadline, built on when_any + delay. Components: - timer_service: execution_context::service with a background jthread managing a min-heap of deadlines. Lazy-created via use_service. cancel() blocks until any in-progress callback completes. - delay: standalone IoAwaitable that suspends for a duration using timer_service. Follows the claimed_/stop_callback pattern from async_event for safe cancellation. Destructor cleans up both the stop callback and timer to prevent use-after-free if a coroutine is destroyed while suspended. - timeout: coroutine wrapper that returns the task result on success, or produces error::timeout on deadline expiry. For io_result types, the error code is set directly; for other types, a system_error is thrown. - error::timeout and cond::timeout added to error/condition enums.
1 parent ee8c629 commit a9e7699

File tree

13 files changed

+1483
-2
lines changed

13 files changed

+1483
-2
lines changed

include/boost/capy.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
#include <boost/capy/task.hpp>
2626

2727
// Algorithms
28+
#include <boost/capy/delay.hpp>
2829
#include <boost/capy/read.hpp>
2930
#include <boost/capy/read_until.hpp>
31+
#include <boost/capy/timeout.hpp>
3032
#include <boost/capy/when_all.hpp>
3133
#include <boost/capy/when_any.hpp>
3234
#include <boost/capy/write.hpp>

include/boost/capy/cond.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,14 @@ enum class cond
6969
An `error_code` compares equal to `not_found` when a
7070
lookup operation failed to find the requested item.
7171
*/
72-
not_found = 4
72+
not_found = 4,
73+
74+
/** Operation timed out condition.
75+
76+
An `error_code` compares equal to `timeout` when an
77+
operation exceeded its allowed duration.
78+
*/
79+
timeout = 5
7380
};
7481

7582
} // capy

include/boost/capy/delay.hpp

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
//
2+
// Copyright (c) 2026 Michael Vandeberg
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#ifndef BOOST_CAPY_DELAY_HPP
11+
#define BOOST_CAPY_DELAY_HPP
12+
13+
#include <boost/capy/detail/config.hpp>
14+
#include <boost/capy/ex/executor_ref.hpp>
15+
#include <boost/capy/ex/io_env.hpp>
16+
#include <boost/capy/ex/detail/timer_service.hpp>
17+
18+
#include <atomic>
19+
#include <chrono>
20+
#include <coroutine>
21+
#include <new>
22+
#include <stop_token>
23+
#include <utility>
24+
25+
namespace boost {
26+
namespace capy {
27+
28+
/** IoAwaitable returned by @ref delay.
29+
30+
Suspends the calling coroutine until the deadline elapses
31+
or the environment's stop token is activated, whichever
32+
comes first. Resumption is always posted through the
33+
executor, never inline on the timer thread.
34+
35+
Not intended to be named directly; use the @ref delay
36+
factory function instead.
37+
38+
@par Cancellation
39+
40+
If `stop_requested()` is true before suspension, the
41+
coroutine resumes immediately without scheduling a timer.
42+
If stop is requested while suspended, the stop callback
43+
claims the resume and posts it through the executor; the
44+
pending timer is cancelled on the next `await_resume` or
45+
destructor call.
46+
47+
@par Thread Safety
48+
49+
A single `delay_awaitable` must not be awaited concurrently.
50+
Multiple independent `delay()` calls on the same
51+
execution_context are safe and share one timer thread.
52+
53+
@see delay, timeout
54+
*/
55+
class delay_awaitable
56+
{
57+
std::chrono::nanoseconds dur_;
58+
59+
detail::timer_service* ts_ = nullptr;
60+
detail::timer_service::timer_id tid_ = 0;
61+
62+
// Declared before stop_cb_buf_: the callback
63+
// accesses these members, so they must still be
64+
// alive if the stop_cb_ destructor blocks.
65+
std::atomic<bool> claimed_{false};
66+
bool canceled_ = false;
67+
bool stop_cb_active_ = false;
68+
69+
struct cancel_fn
70+
{
71+
delay_awaitable* self_;
72+
executor_ref ex_;
73+
std::coroutine_handle<> h_;
74+
75+
void operator()() const noexcept
76+
{
77+
if(!self_->claimed_.exchange(
78+
true, std::memory_order_acq_rel))
79+
{
80+
self_->canceled_ = true;
81+
ex_.post(h_);
82+
}
83+
}
84+
};
85+
86+
using stop_cb_t = std::stop_callback<cancel_fn>;
87+
88+
// Aligned storage for the stop callback.
89+
// Declared last: its destructor may block while
90+
// the callback accesses the members above.
91+
#ifdef _MSC_VER
92+
# pragma warning(push)
93+
# pragma warning(disable: 4324)
94+
#endif
95+
alignas(stop_cb_t)
96+
unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
97+
#ifdef _MSC_VER
98+
# pragma warning(pop)
99+
#endif
100+
101+
stop_cb_t& stop_cb_() noexcept
102+
{
103+
return *reinterpret_cast<stop_cb_t*>(stop_cb_buf_);
104+
}
105+
106+
public:
107+
explicit delay_awaitable(std::chrono::nanoseconds dur) noexcept
108+
: dur_(dur)
109+
{
110+
}
111+
112+
/// @pre The stop callback must not be active
113+
/// (i.e. the object has not yet been awaited).
114+
delay_awaitable(delay_awaitable&& o) noexcept
115+
: dur_(o.dur_)
116+
, ts_(o.ts_)
117+
, tid_(o.tid_)
118+
, claimed_(o.claimed_.load(std::memory_order_relaxed))
119+
, canceled_(o.canceled_)
120+
, stop_cb_active_(std::exchange(o.stop_cb_active_, false))
121+
{
122+
}
123+
124+
~delay_awaitable()
125+
{
126+
if(stop_cb_active_)
127+
stop_cb_().~stop_cb_t();
128+
if(ts_)
129+
ts_->cancel(tid_);
130+
}
131+
132+
delay_awaitable(delay_awaitable const&) = delete;
133+
delay_awaitable& operator=(delay_awaitable const&) = delete;
134+
delay_awaitable& operator=(delay_awaitable&&) = delete;
135+
136+
bool await_ready() const noexcept
137+
{
138+
return dur_.count() <= 0;
139+
}
140+
141+
std::coroutine_handle<>
142+
await_suspend(
143+
std::coroutine_handle<> h,
144+
io_env const* env) noexcept
145+
{
146+
// Already stopped: resume immediately
147+
if(env->stop_token.stop_requested())
148+
{
149+
canceled_ = true;
150+
return h;
151+
}
152+
153+
ts_ = &env->executor.context().use_service<detail::timer_service>();
154+
155+
// Schedule timer (won't fire inline since deadline is in the future)
156+
tid_ = ts_->schedule_after(dur_,
157+
[this, h, ex = env->executor]()
158+
{
159+
if(!claimed_.exchange(
160+
true, std::memory_order_acq_rel))
161+
{
162+
ex.post(h);
163+
}
164+
});
165+
166+
// Register stop callback (may fire inline)
167+
::new(stop_cb_buf_) stop_cb_t(
168+
env->stop_token,
169+
cancel_fn{this, env->executor, h});
170+
stop_cb_active_ = true;
171+
172+
return std::noop_coroutine();
173+
}
174+
175+
void await_resume() noexcept
176+
{
177+
if(stop_cb_active_)
178+
{
179+
stop_cb_().~stop_cb_t();
180+
stop_cb_active_ = false;
181+
}
182+
if(ts_)
183+
ts_->cancel(tid_);
184+
}
185+
};
186+
187+
/** Suspend the current coroutine for a duration.
188+
189+
Returns an IoAwaitable that completes at or after the
190+
specified duration, or earlier if the environment's stop
191+
token is activated. Completion is always normal (void
192+
return); no exception is thrown on cancellation.
193+
194+
Zero or negative durations complete synchronously without
195+
scheduling a timer.
196+
197+
@par Example
198+
@code
199+
co_await delay(std::chrono::milliseconds(100));
200+
@endcode
201+
202+
@param dur The duration to wait.
203+
204+
@return A @ref delay_awaitable whose `await_resume`
205+
returns `void`.
206+
207+
@throws Nothing.
208+
209+
@see timeout, delay_awaitable
210+
*/
211+
template<typename Rep, typename Period>
212+
delay_awaitable
213+
delay(std::chrono::duration<Rep, Period> dur) noexcept
214+
{
215+
return delay_awaitable{
216+
std::chrono::duration_cast<std::chrono::nanoseconds>(dur)};
217+
}
218+
219+
} // capy
220+
} // boost
221+
222+
#endif

include/boost/capy/error.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ enum class error
4343
stream_truncated,
4444

4545
/// Requested item was not found. Compare with `cond::not_found`.
46-
not_found
46+
not_found,
47+
48+
/// Operation timed out. Compare with `cond::timeout`.
49+
timeout
4750
};
4851

4952
} // capy
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//
2+
// Copyright (c) 2026 Michael Vandeberg
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#ifndef BOOST_CAPY_EX_TIMER_SERVICE_HPP
11+
#define BOOST_CAPY_EX_TIMER_SERVICE_HPP
12+
13+
#include <boost/capy/detail/config.hpp>
14+
#include <boost/capy/ex/execution_context.hpp>
15+
16+
#include <chrono>
17+
#include <cstdint>
18+
#include <functional>
19+
#include <mutex>
20+
#include <condition_variable>
21+
#include <queue>
22+
#include <thread>
23+
#include <unordered_set>
24+
#include <vector>
25+
26+
namespace boost {
27+
namespace capy {
28+
namespace detail {
29+
30+
/* Shared timer thread for an execution_context.
31+
32+
One background std::jthread per execution_context. All timeouts
33+
scheduled through this context share the same thread, which sleeps
34+
on a condition variable until the next deadline.
35+
36+
The timer thread never touches coroutine frames or executors
37+
directly — callbacks are responsible for posting work through
38+
the appropriate executor.
39+
*/
40+
41+
class BOOST_CAPY_DECL
42+
timer_service
43+
: public execution_context::service
44+
{
45+
public:
46+
using timer_id = std::uint64_t;
47+
48+
explicit timer_service(execution_context& ctx);
49+
50+
/** Schedule a callback to fire after a duration.
51+
52+
The callback is invoked on the timer service's background
53+
thread. It must not block for extended periods.
54+
55+
@return An id that can be passed to cancel().
56+
*/
57+
template<typename Rep, typename Period>
58+
timer_id schedule_after(
59+
std::chrono::duration<Rep, Period> dur,
60+
std::function<void()> cb)
61+
{
62+
auto deadline = std::chrono::steady_clock::now() + dur;
63+
return schedule_at(deadline, std::move(cb));
64+
}
65+
66+
/** Cancel a pending timer.
67+
68+
After this function returns, the callback is guaranteed
69+
not to be running and will never be invoked. If the
70+
callback is currently executing on the timer thread,
71+
this call blocks until it completes.
72+
73+
Safe to call with any id, including ids that have
74+
already fired, been cancelled, or were never issued.
75+
*/
76+
void cancel(timer_id id);
77+
78+
protected:
79+
void shutdown() override;
80+
81+
private:
82+
struct entry
83+
{
84+
std::chrono::steady_clock::time_point deadline;
85+
timer_id id;
86+
std::function<void()> callback;
87+
88+
bool operator>(entry const& o) const noexcept
89+
{
90+
return deadline > o.deadline;
91+
}
92+
};
93+
94+
timer_id schedule_at(
95+
std::chrono::steady_clock::time_point deadline,
96+
std::function<void()> cb);
97+
98+
void run();
99+
100+
// warning C4251: std types need to have dll-interface
101+
#ifdef _MSC_VER
102+
# pragma warning(push)
103+
# pragma warning(disable: 4251)
104+
#endif
105+
std::mutex mutex_;
106+
std::condition_variable cv_;
107+
std::condition_variable cancel_cv_;
108+
std::priority_queue<
109+
entry,
110+
std::vector<entry>,
111+
std::greater<>> queue_;
112+
std::unordered_set<timer_id> active_ids_;
113+
timer_id next_id_ = 0;
114+
timer_id executing_id_ = 0;
115+
bool stopped_ = false;
116+
std::jthread thread_;
117+
#ifdef _MSC_VER
118+
# pragma warning(pop)
119+
#endif
120+
};
121+
122+
} // detail
123+
} // capy
124+
} // boost
125+
126+
#endif

0 commit comments

Comments
 (0)