Skip to content

Commit 0303f66

Browse files
authored
refactor: Implement cancellation for Delay() timers. (#524)
This implements CancellationSource / CancellationToken, based on the similar concepts in C++20 and other languages. It also updates the spots where we race timers to cancel any timers that aren't finished, so that they don't pile up internally. This is mostly Claude-generated code, but I have read through it and verified it is correct. There are alternatives to this model for simpler cases, but I like that this implementation means we could eventually use it for other cases (like cancelling http requests), and since all the things racing share a cancellation source, they can all be cancelled together. This PR does not implement cancellation for HTTP requests yet, since that is much more complicated, and because they are less likely to pile up during normal operation. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Introduces new cross-thread cancellation primitives and changes timer scheduling/cancellation behavior, which can affect concurrency, ordering, and potential deadlocks/leaks if incorrect. Scope is limited to async utilities and FDv2 polling timeouts/delays, with added tests to validate the new behavior. > > **Overview** > Adds a new `CancellationSource`/`CancellationToken`/`CancellationCallback` primitive (C++20 `stop_*`-style) to support coordinated cancellation with RAII callback registration and destructor synchronization. > > Extends `async::Delay` to accept an optional `CancellationToken` and cancels the underlying ASIO timer when triggered, while also hardening `Continuation` construction to avoid self-recursion. Updates FDv2 polling (`polling_synchronizer`) to create a shared cancellation source for each `WhenAny` race and cancel the non-winning delay/timeout timers to prevent timer buildup, and adds comprehensive unit tests for cancellation and timer interaction. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit a187254. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 1dd9793 commit 0303f66

6 files changed

Lines changed: 560 additions & 19 deletions

File tree

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
#pragma once
2+
3+
#include <launchdarkly/async/promise.hpp>
4+
5+
#include <condition_variable>
6+
#include <map>
7+
#include <memory>
8+
#include <mutex>
9+
#include <thread>
10+
11+
// This file implements a cancellation primitive modelled after C++20's
12+
// std::stop_source / std::stop_token / std::stop_callback design: a source
13+
// owns the ability to trigger cancellation, lightweight tokens (derived from
14+
// the source) can be freely passed around, and CancellationCallback provides
15+
// RAII registration of a callback tied to a token.
16+
17+
namespace launchdarkly::async {
18+
19+
// CancellationState is the shared state between a CancellationSource and all
20+
// CancellationTokens and CancellationCallbacks derived from it. This is an
21+
// internal class; use CancellationSource, CancellationToken, and
22+
// CancellationCallback instead.
23+
class CancellationState {
24+
public:
25+
using CallbackId = std::size_t;
26+
27+
// Sentinel returned by Register() when the state was already cancelled;
28+
// the callback is invoked immediately in that case. Deregister() is a
29+
// no-op for this value.
30+
static constexpr CallbackId kAlreadyCancelled = 0;
31+
32+
CancellationState() = default;
33+
~CancellationState() = default;
34+
CancellationState(CancellationState const&) = delete;
35+
CancellationState& operator=(CancellationState const&) = delete;
36+
CancellationState(CancellationState&&) = delete;
37+
CancellationState& operator=(CancellationState&&) = delete;
38+
39+
// Registers a callback and returns its ID. If Cancel() has already been
40+
// called, invokes cb immediately (outside the lock) and returns
41+
// kAlreadyCancelled.
42+
CallbackId Register(Continuation<void()> cb) {
43+
std::unique_lock lock(mutex_);
44+
if (cancelled_) {
45+
lock.unlock();
46+
cb();
47+
return kAlreadyCancelled;
48+
}
49+
CallbackId id = next_id_++;
50+
callbacks_.emplace(id, std::move(cb));
51+
return id;
52+
}
53+
54+
// Deregisters the callback with the given ID. If the callback is currently
55+
// executing on another thread, blocks until execution completes. This
56+
// mirrors the synchronization guarantee of C++20's stop_callback
57+
// destructor: after Deregister returns, the callback is guaranteed to have
58+
// either never run or fully completed. No-op if id is kAlreadyCancelled or
59+
// the callback has already run.
60+
void Deregister(CallbackId id) {
61+
if (id == kAlreadyCancelled) {
62+
return;
63+
}
64+
65+
std::unique_lock lock(mutex_);
66+
67+
// The callback is still pending: remove it before it can run.
68+
if (callbacks_.erase(id)) {
69+
return;
70+
}
71+
72+
// The callback has already run, or was never registered.
73+
if (executing_id_ != id) {
74+
return;
75+
}
76+
77+
// The callback is executing on this thread (re-entrant call from
78+
// within the callback itself): return without waiting to avoid
79+
// deadlock.
80+
if (executing_thread_ == std::this_thread::get_id()) {
81+
return;
82+
}
83+
84+
// The callback is executing on another thread. Wait for it to
85+
// finish. executing_id_ is set while the state lock is held — before
86+
// unlocking for invocation — so there is no window where the callback
87+
// is running but executing_id_ is not yet set.
88+
executing_done_.wait(lock, [this, id] { return executing_id_ != id; });
89+
}
90+
91+
// Invokes all registered callbacks in registration order, then clears the
92+
// pending list. Callbacks are executed one at a time with the lock
93+
// released during each invocation to prevent deadlocks. No-op if called
94+
// more than once.
95+
void Cancel() {
96+
std::unique_lock lock(mutex_);
97+
if (cancelled_) {
98+
return;
99+
}
100+
cancelled_ = true;
101+
102+
while (!callbacks_.empty()) {
103+
// Extract the next entry while still holding the lock, then set
104+
// executing_id_ before releasing. This ensures Deregister can
105+
// never observe a window where the callback is running but
106+
// executing_id_ is unset.
107+
auto node = callbacks_.extract(callbacks_.begin());
108+
executing_id_ = node.key();
109+
executing_thread_ = std::this_thread::get_id();
110+
111+
lock.unlock();
112+
node.mapped()();
113+
lock.lock();
114+
115+
executing_id_ = kAlreadyCancelled;
116+
lock.unlock();
117+
executing_done_.notify_all();
118+
lock.lock();
119+
}
120+
}
121+
122+
// Returns true if Cancel() has been called.
123+
bool IsCancelled() const {
124+
std::lock_guard lock(mutex_);
125+
return cancelled_;
126+
}
127+
128+
private:
129+
mutable std::mutex mutex_;
130+
bool cancelled_ = false;
131+
CallbackId next_id_ = 1; // Real IDs start at 1; 0 is kAlreadyCancelled.
132+
std::map<CallbackId, Continuation<void()>> callbacks_;
133+
134+
// Tracks which callback (if any) is currently being invoked by Cancel(),
135+
// and on which thread, to support the blocking destructor in Deregister.
136+
CallbackId executing_id_ = kAlreadyCancelled;
137+
std::thread::id executing_thread_;
138+
std::condition_variable executing_done_;
139+
};
140+
141+
class CancellationToken;
142+
143+
// CancellationSource is the write end of a cancellation pair: call Cancel()
144+
// to signal all operations holding tokens derived from this source.
145+
//
146+
// CancellationSource is copyable; copies share the same underlying
147+
// CancellationState, matching the behaviour of C++20's stop_source.
148+
class CancellationSource {
149+
public:
150+
CancellationSource() : state_(std::make_shared<CancellationState>()) {}
151+
152+
~CancellationSource() = default;
153+
CancellationSource(CancellationSource const&) = default;
154+
CancellationSource& operator=(CancellationSource const&) = default;
155+
CancellationSource(CancellationSource&&) = default;
156+
CancellationSource& operator=(CancellationSource&&) = default;
157+
158+
// Invokes all registered callbacks in registration order. No-op if called
159+
// more than once.
160+
void Cancel() { state_->Cancel(); }
161+
162+
// Returns true if Cancel() has been called.
163+
bool IsCancelled() const { return state_->IsCancelled(); }
164+
165+
// Returns a token referring to this source's cancellation state. The
166+
// token may be freely copied and passed to any number of
167+
// CancellationCallbacks.
168+
CancellationToken GetToken() const;
169+
170+
private:
171+
std::shared_ptr<CancellationState> state_;
172+
};
173+
174+
// CancellationToken is the read end of a cancellation pair. Tokens are
175+
// obtained from CancellationSource::GetToken() and passed to
176+
// CancellationCallback constructors to register callbacks.
177+
//
178+
// A default-constructed token has no associated state: any
179+
// CancellationCallback constructed from it is never invoked.
180+
//
181+
// CancellationToken is cheap to copy; all copies share the same underlying
182+
// CancellationState.
183+
class CancellationToken {
184+
public:
185+
CancellationToken() = default;
186+
187+
explicit CancellationToken(std::shared_ptr<CancellationState> state)
188+
: state_(std::move(state)) {}
189+
190+
// Returns true if the associated source has been cancelled, or false if
191+
// there is no associated source.
192+
bool IsCancelled() const { return state_ && state_->IsCancelled(); }
193+
194+
private:
195+
std::shared_ptr<CancellationState> state_;
196+
197+
friend class CancellationCallback;
198+
};
199+
200+
inline CancellationToken CancellationSource::GetToken() const {
201+
return CancellationToken(state_);
202+
}
203+
204+
// CancellationCallback registers a callback to be invoked when the associated
205+
// CancellationSource is cancelled. The callback is invoked on whichever thread
206+
// calls CancellationSource::Cancel().
207+
//
208+
// The design follows C++20's std::stop_callback:
209+
//
210+
// - Constructing a CancellationCallback registers the callback. If the
211+
// source was already cancelled, the callback is invoked immediately in the
212+
// constructor.
213+
// - Destroying a CancellationCallback deregisters the callback. If the
214+
// callback is currently executing on another thread, the destructor blocks
215+
// until execution completes, preventing use-after-free of anything captured
216+
// by the callback.
217+
// - CancellationCallback is non-copyable and non-movable, matching C++20's
218+
// stop_callback.
219+
class CancellationCallback {
220+
public:
221+
CancellationCallback(CancellationToken token, Continuation<void()> cb)
222+
: state_(token.state_),
223+
id_(state_ ? state_->Register(std::move(cb))
224+
: CancellationState::kAlreadyCancelled) {}
225+
226+
~CancellationCallback() {
227+
if (state_) {
228+
state_->Deregister(id_);
229+
}
230+
}
231+
232+
CancellationCallback(CancellationCallback const&) = delete;
233+
CancellationCallback& operator=(CancellationCallback const&) = delete;
234+
CancellationCallback(CancellationCallback&&) = delete;
235+
CancellationCallback& operator=(CancellationCallback&&) = delete;
236+
237+
private:
238+
std::shared_ptr<CancellationState> state_;
239+
CancellationState::CallbackId id_;
240+
};
241+
242+
} // namespace launchdarkly::async

libs/internal/include/launchdarkly/async/promise.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ class Continuation<R(Args...)> {
5454
// function pointer, or other callable; it need not be copy-constructible.
5555
// F&& is a forwarding reference: accepts any callable by move or copy,
5656
// then moves it into Impl<F> so Continuation itself owns the callable.
57-
template <typename F>
57+
template <typename F,
58+
typename = std::enable_if_t<
59+
!std::is_same_v<std::decay_t<F>, Continuation>>>
5860
Continuation(F&& f)
5961
: impl_(std::make_unique<Impl<std::decay_t<F>>>(std::forward<F>(f))) {}
6062
Continuation(Continuation&&) = default;

libs/internal/include/launchdarkly/async/timer.hpp

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#pragma once
22

3+
#include <launchdarkly/async/cancellation.hpp>
34
#include <launchdarkly/async/promise.hpp>
45

56
#include <boost/asio/any_io_executor.hpp>
67
#include <boost/asio/error.hpp>
8+
#include <boost/asio/post.hpp>
79
#include <boost/asio/steady_timer.hpp>
810
#include <boost/system/error_code.hpp>
911

@@ -15,17 +17,55 @@ namespace launchdarkly::async {
1517
// Returns a Future<bool> that resolves once the given duration elapses.
1618
// The future resolves with true if the timer fired normally, or false if
1719
// the timer was cancelled before it expired.
20+
//
21+
// If a CancellationToken is provided, cancelling the associated
22+
// CancellationSource cancels the timer, resolving the future with false.
1823
template <typename Rep, typename Period>
1924
Future<bool> Delay(boost::asio::any_io_executor executor,
20-
std::chrono::duration<Rep, Period> duration) {
25+
std::chrono::duration<Rep, Period> duration,
26+
CancellationToken token = {}) {
2127
auto timer = std::make_shared<boost::asio::steady_timer>(executor);
2228
timer->expires_after(duration);
2329
Promise<bool> promise;
2430
auto future = promise.GetFuture();
25-
timer->async_wait([p = std::move(promise),
26-
timer](boost::system::error_code code) mutable {
31+
32+
// This code is tricky because there are a few constraints that conflict.
33+
// 1. We need to make sure timer->cancel isn't called _before_
34+
// timer->async_wait, or else it'll just be ignored.
35+
// 2. The cancellation_callback has to be created _before_
36+
// timer->async_wait, because it has to be captured by async_wait's
37+
// handler, because it is an RAII type, and once it is destroyed, it
38+
// deregisters itself. It has to stay alive as long as the timer needs to
39+
// be cancellable.
40+
41+
Promise<std::monostate> timer_started_promise;
42+
Future<std::monostate> timer_started_future =
43+
timer_started_promise.GetFuture();
44+
45+
auto cancel_timer = [timer, executor,
46+
timer_started_future =
47+
std::move(timer_started_future)]() mutable {
48+
timer_started_future.Then(
49+
[timer](auto const&) -> std::monostate {
50+
timer->cancel();
51+
return {};
52+
},
53+
[executor](Continuation<void()> f) {
54+
boost::asio::post(executor, std::move(f));
55+
});
56+
};
57+
58+
auto cancellation_callback =
59+
std::make_shared<CancellationCallback>(std::move(token), cancel_timer);
60+
61+
timer->async_wait([p = std::move(promise), timer, cancellation_callback](
62+
boost::system::error_code code) mutable {
63+
cancellation_callback.reset();
2764
p.Resolve(code != boost::asio::error::operation_aborted);
2865
});
66+
67+
timer_started_promise.Resolve({});
68+
2969
return future;
3070
}
3171

0 commit comments

Comments
 (0)