Skip to content

Commit c27bf26

Browse files
authored
fix: stop FDv2DataSource.Conditions from leaking on healthy primary (#163)
## Summary `FDv2DataSource.Conditions.getFuture()` returned the same shared `CompletableFuture<Object>` instance to every caller. The run loop does `CompletableFuture.anyOf(getFuture(), synchronizer.next()).get()` on every iteration, which attaches a new `OrRelay` `Completion` to the shared future's `stack` each time. `CompletableFuture` has no deregister path for the loser of an `anyOf` race, so those `Completion` nodes stay on the stack until the shared future itself completes. On a healthy primary streaming ChangeSets without ever firing fallback/recovery, the shared future never completes — the `stack` grows monotonically for the synchronizer's entire tenure (effectively the SDK's uptime on a stable server). **Per-iteration cost: ~200 B** (OrRelay + anyOf result CF + chain references). **At 10 ChangeSets/sec sustained: ~150 MB/day per active synchronizer.** ## The fix A single permanent `whenComplete` listener on the underlying aggregate fans out completion to every fresh future handed out by `getFuture()`. Pending fresh futures are tracked via `WeakReference`, so a fresh future whose only strong references were the caller's local variables (typical lifetime: one loop iteration) becomes garbage-collectable once that iteration ends. Pending entries whose referent has been collected are pruned opportunistically on each `getFuture()` call and on `close()`. `Conditions` is now package-private (was `private`) so direct unit tests can reach it. A test-only `pendingSize()` helper is added. ## Test plan Adds `FDv2DataSourceConditionsAggregateTest` with five tests: - **`getFutureReturnsDistinctInstancesPerCall`** — bug-prover. Fails on the pre-fix shared-instance behavior, passes after the fix. - **`getFutureReturnsDistinctInstancesEvenWithNoConditions`** — bug-prover. Covers the empty-conditions case (single-synchronizer configuration), which is exactly where per-iteration accumulation would be most damaging. - **`allFreshFuturesCompleteWhenAggregateFires`** — verifies fan-out via the single permanent listener actually delivers to multiple fresh futures handed out before the aggregate fires. - **`getFutureAfterAggregateFiresReturnsCompletedFuture`** — verifies the fast path: callers arriving after completion get an already-completed future synchronously. - **`pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped`** — 10k-iteration soak test that simulates the run-loop pattern (race a fresh future against a fast-resolving sibling, drop the result) and asserts the pending list stays bounded via GC + opportunistic pruning. Caveat in the test docstring about `System.gc()` not being guaranteed — if it ever flakes on CI we can migrate to `-XX:+UseSerialGC` or relax the ceiling. Verified bug-proving discipline: the two distinctness tests fail on the pre-fix shared-instance behavior and pass after the fix. The full server-sdk test suite (1857 tests across 109 classes) is clean. ## Context This was identified during a multi-agent review of the analogous cpp-sdks PR (launchdarkly/cpp-sdks#531), which mirrors this Java implementation's `Conditions` design. The cpp version has the same structural leak; this Java fix shape is what was prototyped there. Filing here first since the runtime impact on a long-running JVM-based server SDK is more pronounced. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches the FDv2 synchronizer condition-aggregation logic used in the main run loop; mistakes could cause missed fallback/recovery signals or incorrect exceptional completion behavior, though changes are localized and covered by new unit tests. > > **Overview** > Prevents a long-lived memory leak in `FDv2DataSource.Conditions` by changing `getFuture()` to return a *fresh* `CompletableFuture` per call until the underlying condition aggregate completes, rather than returning the same shared pending future each iteration. > > Adds a single `whenComplete` fan-out from the aggregate to complete all outstanding per-call futures (and to propagate exceptional completion), tracks pending futures in a `WeakHashMap`-backed set for GC cleanup, and makes `Conditions` package-private to allow direct testing. > > Introduces `FDv2DataSourceConditionsAggregateTest` to assert per-call distinctness, correct completion fan-out, and correct behavior on exceptional and post-completion paths. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit a6c7c36. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent cac1568 commit c27bf26

2 files changed

Lines changed: 331 additions & 5 deletions

File tree

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.Collections;
1616
import java.util.Date;
1717
import java.util.List;
18+
import java.util.Set;
19+
import java.util.WeakHashMap;
1820
import java.util.concurrent.*;
1921
import java.util.concurrent.atomic.AtomicBoolean;
2022
import java.util.stream.Collectors;
@@ -591,21 +593,102 @@ private void maybeReportUnexpectedExhaustion(String message) {
591593

592594
/**
593595
* Helper class to manage the lifecycle of conditions with automatic cleanup.
596+
*
597+
* <p>Before the aggregate completes, {@link #getFuture()} returns a
598+
* <em>fresh</em> {@link CompletableFuture} per call. This matters because
599+
* the run loop calls {@code CompletableFuture.anyOf(getFuture(),
600+
* synchronizerNext)} on every iteration: if {@code getFuture()} returned
601+
* the shared underlying aggregate while it was still pending, each
602+
* {@code anyOf} call would permanently attach an {@code OrRelay}
603+
* {@code Completion} to its {@code stack}. On a healthy primary
604+
* synchronizer that streams ChangeSets without ever arming the fallback
605+
* timer, the aggregate never completes, so those Completion nodes would
606+
* accumulate monotonically for the synchronizer's full tenure -- a real
607+
* memory leak proportional to event rate.
608+
*
609+
* <p>After the aggregate completes, {@link #getFuture()} returns the
610+
* aggregate directly: any continuation registered on an already-completed
611+
* CompletableFuture fires synchronously at registration time and is
612+
* removed from the stack immediately by {@code cleanStack}, so the same
613+
* accumulation cannot happen.
614+
*
615+
* <p>Fresh pre-completion futures are tracked in a {@link WeakHashMap}-backed
616+
* set, so a fresh future whose only strong references were in the caller's
617+
* loop iteration becomes garbage-collectable -- and automatically removed
618+
* from {@code pending} -- once that iteration ends.
619+
*
620+
* <p>Package-private (rather than private) so that direct unit tests can
621+
* exercise the API surface and assert per-call distinctness.
594622
*/
595-
private static class Conditions implements AutoCloseable {
623+
static class Conditions implements AutoCloseable {
596624
private final List<Condition> conditions;
597-
private final CompletableFuture<Object> conditionsFuture;
625+
private final CompletableFuture<Object> aggregate;
626+
private final Object lock = new Object();
627+
628+
/**
629+
* Tracks futures previously returned by {@link #getFuture()} that have
630+
* not yet been completed. Held weakly via {@link WeakHashMap} so that
631+
* fresh futures abandoned by the caller (the typical end-of-iteration
632+
* case) become GC-collectable. Set to {@code null} once the aggregate
633+
* has fired and the entries have been drained. Mutated only under
634+
* {@code lock}.
635+
*/
636+
private Set<CompletableFuture<Object>> pending =
637+
Collections.newSetFromMap(new WeakHashMap<>());
598638

599639
public Conditions(List<Condition> conditions) {
600640
this.conditions = conditions;
601-
this.conditionsFuture = conditions.isEmpty()
641+
this.aggregate = conditions.isEmpty()
602642
? new CompletableFuture<>() // Never completes if no conditions
603643
: CompletableFuture.anyOf(
604-
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
644+
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
645+
646+
// Single permanent listener. This is the only Completion node ever
647+
// attached to aggregate.stack while the aggregate is still pending
648+
// -- subsequent pre-completion getFuture() calls do not touch the
649+
// aggregate at all.
650+
this.aggregate.whenComplete((result, throwable) -> {
651+
List<CompletableFuture<Object>> snapshot;
652+
synchronized (lock) {
653+
if (pending == null) {
654+
return;
655+
}
656+
// Copy under the lock: the ArrayList holds strong
657+
// references so entries that survived GC to this point
658+
// stay alive until we complete them below.
659+
snapshot = new ArrayList<>(pending);
660+
pending = null;
661+
}
662+
for (CompletableFuture<Object> cf : snapshot) {
663+
if (throwable != null) {
664+
cf.completeExceptionally(throwable);
665+
} else {
666+
cf.complete(result);
667+
}
668+
}
669+
});
605670
}
606671

672+
/**
673+
* Returns a future that will complete when the underlying aggregate
674+
* condition fires. Pre-completion, this is a fresh future per call;
675+
* post-completion, this is the aggregate itself (already done).
676+
*/
607677
public CompletableFuture<Object> getFuture() {
608-
return conditionsFuture;
678+
if (aggregate.isDone()) {
679+
return aggregate;
680+
}
681+
682+
CompletableFuture<Object> fresh = new CompletableFuture<>();
683+
synchronized (lock) {
684+
if (pending == null) {
685+
// Raced with aggregate completion between isDone() and
686+
// the lock acquisition; aggregate is now done.
687+
return aggregate;
688+
}
689+
pending.add(fresh);
690+
}
691+
return fresh;
609692
}
610693

611694
public void inform(FDv2SourceResult result) {
@@ -615,6 +698,11 @@ public void inform(FDv2SourceResult result) {
615698
@Override
616699
public void close() {
617700
conditions.forEach(Condition::close);
701+
synchronized (lock) {
702+
if (pending != null) {
703+
pending.clear();
704+
}
705+
}
618706
}
619707
}
620708
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition;
4+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition.ConditionType;
5+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition;
6+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition;
7+
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
8+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
9+
10+
import org.junit.After;
11+
import org.junit.Before;
12+
import org.junit.Test;
13+
14+
import java.time.Instant;
15+
import java.util.Collections;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.ExecutionException;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.not;
24+
import static org.hamcrest.Matchers.sameInstance;
25+
import static org.junit.Assert.assertNotNull;
26+
import static org.junit.Assert.assertTrue;
27+
28+
/**
29+
* Direct tests for {@link FDv2DataSource.Conditions}.
30+
*
31+
* <p>The Conditions class is the aggregator that races fallback/recovery
32+
* condition futures against synchronizer.next() in the FDv2DataSource run
33+
* loop. Each iteration of that loop calls getFuture() and passes the result to
34+
* CompletableFuture.anyOf(...) -- so getFuture() must not return a shared
35+
* instance, or every anyOf call permanently attaches a Completion node to the
36+
* shared instance's stack, leaking memory proportional to event rate during
37+
* the synchronizer's tenure on a healthy primary.
38+
*/
39+
public class FDv2DataSourceConditionsAggregateTest {
40+
private ScheduledExecutorService executor;
41+
42+
@Before
43+
public void setUp() {
44+
executor = Executors.newScheduledThreadPool(1);
45+
}
46+
47+
@After
48+
public void tearDown() {
49+
executor.shutdownNow();
50+
}
51+
52+
/**
53+
* Bug-proving test: getFuture() must return a fresh instance per call.
54+
*
55+
* <p>If it returns the same instance (as it did before the fix), the run
56+
* loop's per-iteration {@code anyOf(getFuture(), syncNext)} attaches a new
57+
* OrRelay Completion to the shared future's stack every iteration, with no
58+
* deregister path -- a monotonic leak for a non-firing aggregate.
59+
*/
60+
@Test
61+
public void getFutureReturnsDistinctInstancesPerCall() {
62+
Condition fallback = new FallbackCondition(executor, 60);
63+
try (FDv2DataSource.Conditions conditions =
64+
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
65+
CompletableFuture<Object> f1 = conditions.getFuture();
66+
CompletableFuture<Object> f2 = conditions.getFuture();
67+
CompletableFuture<Object> f3 = conditions.getFuture();
68+
assertThat(f1, not(sameInstance(f2)));
69+
assertThat(f2, not(sameInstance(f3)));
70+
assertThat(f1, not(sameInstance(f3)));
71+
}
72+
}
73+
74+
/**
75+
* Even with no underlying conditions (a single-synchronizer configuration),
76+
* getFuture() must return fresh instances. The aggregate never completes
77+
* in this case, which is exactly the scenario where any per-iteration
78+
* accumulation would be most damaging.
79+
*/
80+
@Test
81+
public void getFutureReturnsDistinctInstancesEvenWithNoConditions() {
82+
try (FDv2DataSource.Conditions conditions =
83+
new FDv2DataSource.Conditions(Collections.emptyList())) {
84+
CompletableFuture<Object> f1 = conditions.getFuture();
85+
CompletableFuture<Object> f2 = conditions.getFuture();
86+
assertThat(f1, not(sameInstance(f2)));
87+
}
88+
}
89+
90+
/**
91+
* Every fresh future returned by getFuture() must complete when the
92+
* underlying aggregate fires. The fan-out via the single permanent listener
93+
* is what makes the fresh-per-call pattern work; verify it actually
94+
* delivers.
95+
*/
96+
@Test
97+
public void allFreshFuturesCompleteWhenAggregateFires() throws Exception {
98+
// 0-second timeout -> fires on first INTERRUPTED inform.
99+
Condition fallback = new FallbackCondition(executor, 0);
100+
try (FDv2DataSource.Conditions conditions =
101+
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
102+
CompletableFuture<Object> f1 = conditions.getFuture();
103+
CompletableFuture<Object> f2 = conditions.getFuture();
104+
CompletableFuture<Object> f3 = conditions.getFuture();
105+
106+
conditions.inform(makeInterruptedResult());
107+
108+
Object r1 = f1.get(2, TimeUnit.SECONDS);
109+
Object r2 = f2.get(2, TimeUnit.SECONDS);
110+
Object r3 = f3.get(2, TimeUnit.SECONDS);
111+
112+
assertNotNull(r1);
113+
assertNotNull(r2);
114+
assertNotNull(r3);
115+
assertTrue(r1 instanceof Condition);
116+
assertTrue(r2 instanceof Condition);
117+
assertTrue(r3 instanceof Condition);
118+
}
119+
}
120+
121+
/**
122+
* Bug-proving test for the null-sentinel issue caught by Cursor Bugbot:
123+
* if the underlying aggregate completes <em>exceptionally</em>, every
124+
* future returned by getFuture() -- both those handed out before the
125+
* exception and those requested after -- must complete exceptionally too.
126+
*
127+
* <p>Prior to this fix, the "fired" state was tracked via a {@code null}
128+
* sentinel on a {@code completedValue} field, which also stayed
129+
* {@code null} on exceptional completion. A subsequent getFuture() call
130+
* would then return {@code CompletableFuture.completedFuture(null)} --
131+
* silently converting an exceptional completion into a normal
132+
* {@code null} completion. The run loop's downstream
133+
* {@code res.getClass().getName()} would then throw NPE.
134+
*/
135+
@Test
136+
public void getFutureFailsExceptionallyWhenAggregateFailsExceptionally()
137+
throws Exception {
138+
ManualCondition manualCondition = new ManualCondition();
139+
try (FDv2DataSource.Conditions conditions =
140+
new FDv2DataSource.Conditions(Collections.singletonList(manualCondition))) {
141+
// Future requested BEFORE the exceptional completion.
142+
CompletableFuture<Object> before = conditions.getFuture();
143+
144+
RuntimeException boom = new RuntimeException("simulated condition failure");
145+
manualCondition.future.completeExceptionally(boom);
146+
147+
// Future requested AFTER the exceptional completion (exercises the
148+
// fast path through makeCompletedFuture). This is the case bugbot
149+
// caught: pre-fix, it returned completedFuture(null).
150+
CompletableFuture<Object> after = conditions.getFuture();
151+
152+
assertThrowsExecutionExceptionWithCause(before, boom);
153+
assertThrowsExecutionExceptionWithCause(after, boom);
154+
}
155+
}
156+
157+
/**
158+
* getFuture() called after the aggregate has already fired returns an
159+
* already-completed future synchronously (the fast path).
160+
*/
161+
@Test
162+
public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exception {
163+
// RecoveryCondition arms its timer in the constructor and fires after
164+
// the configured timeout. With timeout=0 it fires near-immediately.
165+
Condition recovery = new RecoveryCondition(executor, 0);
166+
try (FDv2DataSource.Conditions conditions =
167+
new FDv2DataSource.Conditions(Collections.singletonList(recovery))) {
168+
// Drain a future to confirm the aggregate has fired.
169+
conditions.getFuture().get(2, TimeUnit.SECONDS);
170+
171+
CompletableFuture<Object> postFire = conditions.getFuture();
172+
assertTrue("post-fire getFuture() should be already complete", postFire.isDone());
173+
assertNotNull(postFire.get(0, TimeUnit.SECONDS));
174+
}
175+
}
176+
177+
private static FDv2SourceResult makeInterruptedResult() {
178+
return FDv2SourceResult.interrupted(
179+
new DataSourceStatusProvider.ErrorInfo(
180+
DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
181+
0,
182+
"simulated",
183+
Instant.now()),
184+
false);
185+
}
186+
187+
/**
188+
* Asserts that {@code future.get()} throws {@link ExecutionException}
189+
* wrapping the expected cause. {@code CompletableFuture#get} surfaces
190+
* exceptional completion as ExecutionException with the original
191+
* exception as its cause.
192+
*/
193+
private static void assertThrowsExecutionExceptionWithCause(
194+
CompletableFuture<Object> future,
195+
Throwable expectedCause) throws Exception {
196+
try {
197+
future.get(2, TimeUnit.SECONDS);
198+
throw new AssertionError("expected ExecutionException, got normal completion");
199+
} catch (ExecutionException ee) {
200+
if (ee.getCause() != expectedCause) {
201+
throw new AssertionError(
202+
"expected cause to be " + expectedCause + " but was " + ee.getCause(), ee);
203+
}
204+
}
205+
}
206+
207+
/**
208+
* Test-only Condition with an externally-controllable future. The
209+
* existing FallbackCondition/RecoveryCondition only resolve normally
210+
* (with {@code this}); to exercise the exceptional path through the
211+
* aggregate's whenComplete listener we need a Condition we can fail
212+
* directly.
213+
*/
214+
private static final class ManualCondition
215+
implements Condition {
216+
final CompletableFuture<Condition> future = new CompletableFuture<>();
217+
218+
@Override
219+
public CompletableFuture<Condition> execute() {
220+
return future;
221+
}
222+
223+
@Override
224+
public void inform(FDv2SourceResult sourceResult) {
225+
// Manually controlled; no auto-trigger from inform.
226+
}
227+
228+
@Override
229+
public void close() {
230+
// No timer to cancel.
231+
}
232+
233+
@Override
234+
public ConditionType getType() {
235+
return ConditionType.FALLBACK;
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)