Skip to content

Commit 2b2e9da

Browse files
committed
fix: stop accumulating WhenAny continuations on the Conditions aggregate
1 parent c5fd239 commit 2b2e9da

4 files changed

Lines changed: 99 additions & 14 deletions

File tree

libs/server-sdk/src/data_systems/fdv2/conditions.cpp

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <launchdarkly/async/timer.hpp>
44

5+
#include <algorithm>
56
#include <utility>
67
#include <variant>
78

@@ -133,15 +134,73 @@ async::Future<IFDv2Condition::Type> MakeAggregateFuture(
133134
} // namespace
134135

135136
Conditions::Conditions(std::vector<std::unique_ptr<IFDv2Condition>> conditions)
136-
: conditions_(std::move(conditions)),
137-
future_(MakeAggregateFuture(conditions_)) {}
137+
: conditions_(std::move(conditions)), state_(std::make_shared<State>()) {
138+
MakeAggregateFuture(conditions_)
139+
.Then(
140+
[state =
141+
state_](IFDv2Condition::Type const& type) -> std::monostate {
142+
std::vector<PendingEntry> drained;
143+
{
144+
std::lock_guard lock(state->mutex);
145+
state->aggregate_result = type;
146+
drained = std::move(state->pending);
147+
}
148+
for (auto& entry : drained) {
149+
entry.promise.Resolve(type);
150+
}
151+
return {};
152+
},
153+
async::kInlineExecutor);
154+
}
138155

139156
Conditions::~Conditions() {
140157
Close();
141158
}
142159

143-
async::Future<IFDv2Condition::Type> Conditions::GetFuture() const {
144-
return future_;
160+
async::Future<IFDv2Condition::Type> Conditions::GetFuture(
161+
async::CancellationToken token) {
162+
async::Promise<IFDv2Condition::Type> promise;
163+
auto future = promise.GetFuture();
164+
std::int64_t id;
165+
166+
{
167+
std::lock_guard lock(state_->mutex);
168+
if (state_->aggregate_result) {
169+
promise.Resolve(*state_->aggregate_result);
170+
return future;
171+
}
172+
id = state_->next_id++;
173+
state_->pending.push_back({id, std::move(promise), nullptr});
174+
}
175+
176+
// Construct cb outside the lock: if the token is already cancelled, the
177+
// callback fires synchronously inside the ctor and needs to acquire
178+
// state_->mutex.
179+
auto cb = std::make_unique<async::CancellationCallback>(
180+
token, [state = state_, id]() {
181+
std::lock_guard lock(state->mutex);
182+
state->pending.erase(
183+
std::remove_if(
184+
state->pending.begin(), state->pending.end(),
185+
[id](PendingEntry const& e) { return e.id == id; }),
186+
state->pending.end());
187+
});
188+
189+
// Re-find the entry under the lock and attach cb. Between the push above
190+
// and here, the aggregate may have fired and drained the vector, or cb's
191+
// callback may have fired synchronously during construction and erased
192+
// the entry. If gone, drop cb.
193+
{
194+
std::lock_guard lock(state_->mutex);
195+
auto it =
196+
std::find_if(state_->pending.begin(), state_->pending.end(),
197+
[id](PendingEntry const& e) { return e.id == id; });
198+
if (it != state_->pending.end()) {
199+
it->cancel_cb = std::move(cb);
200+
}
201+
}
202+
203+
return future;
145204
}
146205

147206
void Conditions::Inform(FDv2SourceResult const& result) {

libs/server-sdk/src/data_systems/fdv2/conditions.hpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
#include <boost/asio/any_io_executor.hpp>
99

1010
#include <chrono>
11+
#include <cstdint>
1112
#include <memory>
1213
#include <mutex>
1314
#include <optional>
15+
#include <vector>
1416

1517
namespace launchdarkly::server_side::data_systems {
1618

@@ -136,7 +138,7 @@ class RecoveryConditionFactory final
136138
* Aggregates a set of conditions into a single Future that resolves with the
137139
* type of the first condition to fire. Inform() and Close() forward to every
138140
* underlying condition. If constructed with no conditions, GetFuture()
139-
* returns a Future that never resolves.
141+
* returns a Future that never resolves (until Close).
140142
*
141143
* Thread-safe: GetFuture, Inform, and Close may be called from any thread.
142144
*/
@@ -153,16 +155,35 @@ class Conditions final {
153155
Conditions& operator=(Conditions const&) = delete;
154156
Conditions& operator=(Conditions&&) = delete;
155157

158+
/**
159+
* Returns a fresh Future that resolves with the type of the first
160+
* condition to fire. The caller must cancel the source corresponding to
161+
* `token` once the result is no longer needed, so that the per-call
162+
* Promise (and its registered continuations) can be released.
163+
*/
156164
[[nodiscard]] async::Future<data_interfaces::IFDv2Condition::Type>
157-
GetFuture() const;
165+
GetFuture(async::CancellationToken token);
158166

159167
void Inform(data_interfaces::FDv2SourceResult const& result);
160168

161169
void Close();
162170

163171
private:
172+
struct PendingEntry {
173+
std::int64_t id;
174+
async::Promise<data_interfaces::IFDv2Condition::Type> promise;
175+
std::unique_ptr<async::CancellationCallback> cancel_cb;
176+
};
177+
178+
struct State {
179+
std::mutex mutex;
180+
std::int64_t next_id = 0;
181+
std::vector<PendingEntry> pending;
182+
std::optional<data_interfaces::IFDv2Condition::Type> aggregate_result;
183+
};
184+
164185
std::vector<std::unique_ptr<data_interfaces::IFDv2Condition>> conditions_;
165-
async::Future<data_interfaces::IFDv2Condition::Type> future_;
186+
std::shared_ptr<State> const state_;
166187
};
167188

168189
} // namespace launchdarkly::server_side::data_systems

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,13 @@ void FDv2DataSystem::RunSynchronizerNext() {
274274
return;
275275
}
276276
auto next_future = active_synchronizer_->Next(selector_);
277-
auto cond_future = active_conditions_->GetFuture();
277+
auto cond_cancel = std::make_shared<async::CancellationSource>();
278+
auto cond_future = active_conditions_->GetFuture(cond_cancel->GetToken());
278279
async::WhenAny(cond_future, next_future)
279280
.Then(
280-
[this, cond_future,
281-
next_future](std::size_t const& idx) -> std::monostate {
281+
[this, cond_future, next_future,
282+
cond_cancel](std::size_t const& idx) -> std::monostate {
283+
cond_cancel->Cancel();
282284
if (idx == 0) {
283285
OnConditionFired(*cond_future.GetResult());
284286
} else {

libs/server-sdk/tests/conditions_test.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ using namespace launchdarkly::server_side::data_interfaces;
1212
using namespace launchdarkly::server_side::data_systems;
1313
using namespace std::chrono_literals;
1414

15+
using launchdarkly::async::CancellationToken;
16+
1517
namespace {
1618

1719
// Holds an io_context running on a worker thread; the executor produced by
@@ -162,7 +164,7 @@ TEST(RecoveryConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) {
162164
TEST(ConditionsTest, EmptyAggregateNeverResolves) {
163165
Conditions conditions({});
164166

165-
auto result = conditions.GetFuture().WaitForResult(50ms);
167+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(50ms);
166168

167169
EXPECT_FALSE(result.has_value());
168170
}
@@ -178,7 +180,7 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) {
178180
/*timeout=*/100ms));
179181
Conditions conditions(std::move(conds));
180182

181-
auto result = conditions.GetFuture().WaitForResult(1s);
183+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);
182184

183185
ASSERT_TRUE(result.has_value());
184186
EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result);
@@ -202,7 +204,7 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) {
202204
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
203205
}});
204206

205-
auto result = conditions.GetFuture().WaitForResult(1s);
207+
auto result = conditions.GetFuture(CancellationToken{}).WaitForResult(1s);
206208

207209
ASSERT_TRUE(result.has_value());
208210
EXPECT_EQ(IFDv2Condition::Type::kFallback, *result);
@@ -220,7 +222,8 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) {
220222

221223
conditions.Close();
222224

223-
auto result = conditions.GetFuture().WaitForResult(200ms);
225+
auto result =
226+
conditions.GetFuture(CancellationToken{}).WaitForResult(200ms);
224227
ASSERT_TRUE(result.has_value());
225228
EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result);
226229
}

0 commit comments

Comments
 (0)