Skip to content

Commit 5f8bf79

Browse files
authored
fix: stop accumulating WhenAny continuations on the Conditions aggregate (#562)
The FDv2 data system's `Conditions::GetFuture()` previously returned the same shared future to every iteration of `RunSynchronizerNext`, so each iteration's `WhenAny(cond_future, next_future).Then(...)` appended a callback to the shared future's `continuations_` vector. On a healthy primary streaming changesets without ever firing fallback/recovery, those continuations accumulated for the synchronizer's lifetime. - `Conditions::GetFuture(token)` now returns a fresh Future per call. - The caller cancels its `CancellationSource` after the WhenAny resolves. - The cancellation callback erases the per-call Promise from a pending list inside `Conditions`. - A single permanent listener on the underlying aggregate drains the pending list and resolves each Promise when any condition fires. Analogous to launchdarkly/java-core#163. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches FDv2 orchestration and async cancellation ordering (locks around `CancellationCallback`); behavior change is localized but long-running streaming paths depend on correct cleanup. > > **Overview** > Fixes unbounded growth of `WhenAny` continuation callbacks during long-lived primary streaming, where each `RunSynchronizerNext` loop reused one shared `Conditions` future. > > **`Conditions::GetFuture`** now takes a `CancellationToken` and returns a **new** future per call. A single internal aggregate listener still watches the first condition to fire; when it resolves, it fans out to all pending per-call promises. **`Close`** also drains any still-pending waiters with `kCancelled`. > > Callers (notably **`FDv2DataSystem::RunSynchronizerNext`**) create a per-iteration `CancellationSource`, pass its token into `GetFuture`, and **cancel after `WhenAny` completes** so the pending entry and its promise/continuations can be dropped. Token cancellation removes the waiter from the pending list if the race finishes early. > > Tests updated to pass a `CancellationToken` into `GetFuture`. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit af09001. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 85afca7 commit 5f8bf79

4 files changed

Lines changed: 119 additions & 14 deletions

File tree

libs/server-sdk/src/data_systems/fdv2/conditions.cpp

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <launchdarkly/async/timer.hpp>
44

5+
#include <algorithm>
56
#include <utility>
67
#include <variant>
78

@@ -133,15 +134,81 @@ async::Future<IFDv2Condition::Type> MakeAggregateFuture(
133134
} // namespace
134135

135136
Conditions::Conditions(std::vector<std::unique_ptr<IFDv2Condition>> conditions)
136-
: conditions_(std::move(conditions)),
137-
future_(MakeAggregateFuture(conditions_)) {}
137+
: conditions_(std::move(conditions)), state_(std::make_shared<State>()) {
138+
MakeAggregateFuture(conditions_)
139+
.Then(
140+
[weak_state = std::weak_ptr<State>(state_)](
141+
IFDv2Condition::Type const& type) -> std::monostate {
142+
auto state = weak_state.lock();
143+
if (!state) {
144+
return {};
145+
}
146+
std::vector<PendingEntry> drained;
147+
{
148+
std::lock_guard lock(state->mutex);
149+
state->aggregate_result = type;
150+
drained = std::move(state->pending);
151+
}
152+
for (auto& entry : drained) {
153+
entry.promise.Resolve(type);
154+
}
155+
return {};
156+
},
157+
async::kInlineExecutor);
158+
}
138159

139160
Conditions::~Conditions() {
140161
Close();
141162
}
142163

143-
async::Future<IFDv2Condition::Type> Conditions::GetFuture() const {
144-
return future_;
164+
async::Future<IFDv2Condition::Type> Conditions::GetFuture(
165+
async::CancellationToken token) {
166+
async::Promise<IFDv2Condition::Type> promise;
167+
auto future = promise.GetFuture();
168+
std::int64_t id;
169+
170+
{
171+
std::lock_guard lock(state_->mutex);
172+
if (state_->aggregate_result) {
173+
promise.Resolve(*state_->aggregate_result);
174+
return future;
175+
}
176+
id = state_->next_id++;
177+
state_->pending.push_back({id, std::move(promise), nullptr});
178+
}
179+
180+
// Construct cb outside the lock: if the token is already cancelled, the
181+
// callback fires synchronously inside the ctor and needs to acquire
182+
// state_->mutex.
183+
auto cb = std::make_unique<async::CancellationCallback>(
184+
token, [weak_state = std::weak_ptr<State>(state_), id]() {
185+
auto state = weak_state.lock();
186+
if (!state) {
187+
return;
188+
}
189+
std::lock_guard lock(state->mutex);
190+
state->pending.erase(
191+
std::remove_if(
192+
state->pending.begin(), state->pending.end(),
193+
[id](PendingEntry const& e) { return e.id == id; }),
194+
state->pending.end());
195+
});
196+
197+
// Re-find the entry under the lock and attach cb. Between the push above
198+
// and here, the aggregate may have fired and drained the vector, or cb's
199+
// callback may have fired synchronously during construction and erased
200+
// the entry. If gone, drop cb.
201+
{
202+
std::lock_guard lock(state_->mutex);
203+
auto it =
204+
std::find_if(state_->pending.begin(), state_->pending.end(),
205+
[id](PendingEntry const& e) { return e.id == id; });
206+
if (it != state_->pending.end()) {
207+
it->cancel_cb = std::move(cb);
208+
}
209+
}
210+
211+
return future;
145212
}
146213

147214
void Conditions::Inform(FDv2SourceResult const& result) {
@@ -154,6 +221,18 @@ void Conditions::Close() {
154221
for (auto const& condition : conditions_) {
155222
condition->Close();
156223
}
224+
// Resolve any promises still pending.
225+
std::vector<PendingEntry> drained;
226+
{
227+
std::lock_guard lock(state_->mutex);
228+
if (!state_->aggregate_result) {
229+
state_->aggregate_result = IFDv2Condition::Type::kCancelled;
230+
}
231+
drained = std::move(state_->pending);
232+
}
233+
for (auto& entry : drained) {
234+
entry.promise.Resolve(IFDv2Condition::Type::kCancelled);
235+
}
157236
}
158237

159238
} // namespace launchdarkly::server_side::data_systems

libs/server-sdk/src/data_systems/fdv2/conditions.hpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
#include <boost/asio/any_io_executor.hpp>
99

1010
#include <chrono>
11+
#include <cstdint>
1112
#include <memory>
1213
#include <mutex>
1314
#include <optional>
15+
#include <vector>
1416

1517
namespace launchdarkly::server_side::data_systems {
1618

@@ -136,7 +138,7 @@ class RecoveryConditionFactory final
136138
* Aggregates a set of conditions into a single Future that resolves with the
137139
* type of the first condition to fire. Inform() and Close() forward to every
138140
* underlying condition. If constructed with no conditions, GetFuture()
139-
* returns a Future that never resolves.
141+
* returns a Future that never resolves (until Close).
140142
*
141143
* Thread-safe: GetFuture, Inform, and Close may be called from any thread.
142144
*/
@@ -153,16 +155,35 @@ class Conditions final {
153155
Conditions& operator=(Conditions const&) = delete;
154156
Conditions& operator=(Conditions&&) = delete;
155157

158+
/**
159+
* Returns a fresh Future that resolves with the type of the first
160+
* condition to fire. The caller must cancel the source corresponding to
161+
* `token` once the result is no longer needed, so that the per-call
162+
* Promise (and its registered continuations) can be released.
163+
*/
156164
[[nodiscard]] async::Future<data_interfaces::IFDv2Condition::Type>
157-
GetFuture() const;
165+
GetFuture(async::CancellationToken token);
158166

159167
void Inform(data_interfaces::FDv2SourceResult const& result);
160168

161169
void Close();
162170

163171
private:
172+
struct PendingEntry {
173+
std::int64_t id;
174+
async::Promise<data_interfaces::IFDv2Condition::Type> promise;
175+
std::unique_ptr<async::CancellationCallback> cancel_cb;
176+
};
177+
178+
struct State {
179+
std::mutex mutex;
180+
std::int64_t next_id = 0;
181+
std::vector<PendingEntry> pending;
182+
std::optional<data_interfaces::IFDv2Condition::Type> aggregate_result;
183+
};
184+
164185
std::vector<std::unique_ptr<data_interfaces::IFDv2Condition>> conditions_;
165-
async::Future<data_interfaces::IFDv2Condition::Type> future_;
186+
std::shared_ptr<State> const state_;
166187
};
167188

168189
} // namespace launchdarkly::server_side::data_systems

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,13 @@ void FDv2DataSystem::RunSynchronizerNext() {
274274
return;
275275
}
276276
auto next_future = active_synchronizer_->Next(selector_);
277-
auto cond_future = active_conditions_->GetFuture();
277+
auto cond_cancel = std::make_shared<async::CancellationSource>();
278+
auto cond_future = active_conditions_->GetFuture(cond_cancel->GetToken());
278279
async::WhenAny(cond_future, next_future)
279280
.Then(
280-
[this, cond_future,
281-
next_future](std::size_t const& idx) -> std::monostate {
281+
[this, cond_future, next_future,
282+
cond_cancel](std::size_t const& idx) -> std::monostate {
283+
cond_cancel->Cancel();
282284
if (idx == 0) {
283285
OnConditionFired(*cond_future.GetResult());
284286
} else {

libs/server-sdk/tests/conditions_test.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ using namespace launchdarkly::server_side::data_interfaces;
1212
using namespace launchdarkly::server_side::data_systems;
1313
using namespace std::chrono_literals;
1414

15+
using launchdarkly::async::CancellationToken;
16+
1517
namespace {
1618

1719
// Holds an io_context running on a worker thread; the executor produced by
@@ -162,7 +164,7 @@ TEST(RecoveryConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) {
162164
TEST(ConditionsTest, EmptyAggregateNeverResolves) {
163165
Conditions conditions({});
164166

165-
auto result = conditions.GetFuture().WaitForResult(50ms);
167+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(50ms);
166168

167169
EXPECT_FALSE(result.has_value());
168170
}
@@ -178,7 +180,7 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) {
178180
/*timeout=*/100ms));
179181
Conditions conditions(std::move(conds));
180182

181-
auto result = conditions.GetFuture().WaitForResult(1s);
183+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);
182184

183185
ASSERT_TRUE(result.has_value());
184186
EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result);
@@ -202,7 +204,7 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) {
202204
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
203205
}});
204206

205-
auto result = conditions.GetFuture().WaitForResult(1s);
207+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);
206208

207209
ASSERT_TRUE(result.has_value());
208210
EXPECT_EQ(IFDv2Condition::Type::kFallback, *result);
@@ -220,7 +222,8 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) {
220222

221223
conditions.Close();
222224

223-
auto result = conditions.GetFuture().WaitForResult(200ms);
225+
auto result =
226+
conditions.GetFuture(CancellationToken{}).WaitForResult(200ms);
224227
ASSERT_TRUE(result.has_value());
225228
EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result);
226229
}

0 commit comments

Comments
 (0)