Skip to content

Commit 17f7b4e

Browse files
authored
feat: Add FDv2 synchronizer fallback and recovery conditions (#297)
## What this adds Timed conditions that the FDv2 data source orchestrator (a later PR) observes alongside the active synchronizer's results to drive tier transitions: - **Fallback condition**: starts its timer when the synchronizer reports an interrupted status and cancels it when a change set arrives. If it fires (120 seconds by default), the orchestrator moves to the next available synchronizer. Terminal statuses (shutdown, terminal error, goodbye) do not arm the timer — the orchestrator reacts to those immediately rather than waiting out a fallback period — and repeated interruptions do not extend the deadline; the period counts from the first interruption. - **Recovery condition**: starts when observed and ignores results. If it fires (300 seconds by default), the orchestrator returns to the primary synchronizer. `getConditions` selects which conditions apply: none when only one synchronizer is available (nowhere to fall back to), fallback only for the primary, and both for a non-primary synchronizer. `ConditionGroup` merges its members and emits the first to fire. The timeout defaults match the other client-side FDv2 implementations. ## Why streams instead of futures The Java and C++ implementations model conditions as one-shot futures, and both had the same leak (fixed in java-core by `c27bf26` / launchdarkly/java-core#163): a future's listeners can never be detached, only released by completion, so an orchestration loop that races a long-lived pending future per result accumulates an irremovable listener garland per change set — unbounded on a healthy primary. Dart futures have the identical property (measured: ~559 B retained per race against a pending future), but Dart also has the primitive those platforms lack: cancellable stream subscriptions. Each condition therefore exposes a single-subscription `Stream<ConditionType>` that emits **at most once** and then closes (closing without emitting if the condition is closed first). Lifetimes are scoped to the subscription: self-starting timers begin when the stream is listened to, and cancelling the subscription closes the condition and releases its timers. One bounded edge remains by design — informing a never-listened group can arm a fallback timer until its timeout elapses or `close()` is called — and is documented on the API. The orchestrator PR consumes these with one subscription per synchronizer run, closes in a `finally`, and includes a soak test asserting bounded memory across a sustained stream of results. ## Testing Tests run inside a `package:fake_async` zone, advancing time with `elapse` and asserting timer state through `pendingTimers` — no timer injection in the production code. They cover timer start/cancel behavior for both condition kinds, terminal statuses not arming the fallback timer, the fallback deadline not extending on repeated interruptions, at-most-once emission including when two member timers contend in the same instant, close semantics, subscription cancellation releasing condition and group timers, group merging and inform broadcast, and the `getConditions` selection rules. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > New timing logic will drive synchronizer failover when wired into the orchestrator; behavior is isolated in new modules with thorough tests and no production integration in this PR. > > **Overview** > Adds **FDv2 synchronizer tier-transition conditions** for a future orchestrator: timed signals that emit **fallback** (switch to the next synchronizer after sustained interruption) or **recovery** (return to primary after running on a backup). > > **Fallback** arms on `interrupted`, cancels on a change set, ignores terminal statuses, and does not reset the deadline on repeated interruptions (defaults: 120s). **Recovery** starts when the condition is observed and ignores results (default 300s). Conditions expose **single-subscription streams** (at-most-one emit) so subscriptions can be cancelled without the listener-retention issues of racing futures. > > `ConditionGroup` merges members (first fire wins), broadcasts `inform` to all members, and `getConditions` picks an empty group, fallback-only for primary, or fallback+recovery for non-primary when multiple synchronizers exist. > > Adds **`fake_async`** tests for timer behavior, close/cancel semantics, group contention, and `getConditions` rules. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 4fc80f6. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent cb6a5c0 commit 17f7b4e

2 files changed

Lines changed: 597 additions & 0 deletions

File tree

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import 'dart:async';
2+
3+
import 'source_result.dart';
4+
5+
/// Default time a synchronizer may remain interrupted before the
6+
/// orchestrator falls back to the next available synchronizer.
7+
const Duration defaultFallbackTimeout = Duration(seconds: 120);
8+
9+
/// Default time a non-primary synchronizer runs before the orchestrator
10+
/// attempts to recover back to the primary synchronizer.
11+
const Duration defaultRecoveryTimeout = Duration(seconds: 300);
12+
13+
/// The kind of condition that fired, determining the orchestrator's
14+
/// response.
15+
enum ConditionType {
16+
/// Move to the next available synchronizer.
17+
fallback,
18+
19+
/// Reset to the primary synchronizer.
20+
recovery,
21+
}
22+
23+
/// A timed condition observed alongside the active synchronizer's
24+
/// results. When the condition fires, its [events] stream emits a
25+
/// [ConditionType] that the orchestration loop uses to decide what to
26+
/// do.
27+
///
28+
/// Conditions are streams rather than futures so a consumer can detach:
29+
/// cancelling a stream subscription releases the consumer's listener,
30+
/// whereas a listener on a never-completing future can never be
31+
/// removed and would be retained for the condition's whole lifetime.
32+
abstract interface class Condition {
33+
/// Single-subscription stream that emits at most one [ConditionType]
34+
/// when the condition fires and then closes. Closes without emitting
35+
/// if the condition is closed first.
36+
///
37+
/// The condition's lifetime is scoped to the subscription: timers that
38+
/// start on their own start when the stream is listened to, and
39+
/// cancelling the subscription closes the condition. A condition that
40+
/// is informed but never listened to can hold a timer until its
41+
/// timeout elapses; call [close] to release it early.
42+
Stream<ConditionType> get events;
43+
44+
/// Inform the condition about a synchronizer result. Some conditions
45+
/// use this to start or cancel their timers.
46+
void inform(FDv2SourceResult result);
47+
48+
/// Cancel any pending timers and close [events]. Idempotent.
49+
void close();
50+
}
51+
52+
final class _TimedCondition implements Condition {
53+
final Duration _timeout;
54+
final ConditionType _type;
55+
final void Function(FDv2SourceResult result,
56+
{required void Function() start,
57+
required void Function() cancel})? _informHandler;
58+
59+
late final StreamController<ConditionType> _controller =
60+
StreamController<ConditionType>(
61+
onListen: _onListen,
62+
onCancel: close,
63+
);
64+
Timer? _timer;
65+
bool _closed = false;
66+
67+
_TimedCondition({
68+
required Duration timeout,
69+
required ConditionType type,
70+
void Function(FDv2SourceResult result,
71+
{required void Function() start, required void Function() cancel})?
72+
informHandler,
73+
}) : assert(timeout > Duration.zero),
74+
_timeout = timeout,
75+
_type = type,
76+
_informHandler = informHandler;
77+
78+
void _onListen() {
79+
// Without an inform handler the timer starts as soon as the
80+
// condition is observed (recovery behavior). With one, the handler
81+
// decides when to start it.
82+
if (_informHandler == null) {
83+
_startTimer();
84+
}
85+
}
86+
87+
void _startTimer() {
88+
if (_timer != null || _closed) return;
89+
_timer = Timer(_timeout, () {
90+
if (_closed) return;
91+
_closed = true;
92+
// The controller buffers the event if the subscription has not
93+
// started yet, so firing before listen is not lost.
94+
_controller.add(_type);
95+
_controller.close();
96+
});
97+
}
98+
99+
void _cancelTimer() {
100+
_timer?.cancel();
101+
_timer = null;
102+
}
103+
104+
@override
105+
Stream<ConditionType> get events => _controller.stream;
106+
107+
@override
108+
void inform(FDv2SourceResult result) {
109+
if (_closed) return;
110+
_informHandler?.call(result, start: _startTimer, cancel: _cancelTimer);
111+
}
112+
113+
@override
114+
void close() {
115+
if (_closed) return;
116+
_closed = true;
117+
_cancelTimer();
118+
_controller.close();
119+
}
120+
}
121+
122+
/// Creates a fallback condition. The condition starts its timer when an
123+
/// interrupted status is received and cancels it when a change set is
124+
/// received. If the timer fires, the condition emits
125+
/// [ConditionType.fallback].
126+
///
127+
/// Terminal statuses (shutdown, terminal error, goodbye) do not arm the
128+
/// timer: the orchestrator reacts to those immediately, out of band,
129+
/// rather than waiting out a fallback period. The timer is also not
130+
/// re-armed by repeated interruptions; the fallback period counts from
131+
/// the first interruption.
132+
Condition createFallbackCondition(Duration timeout) {
133+
return _TimedCondition(
134+
timeout: timeout,
135+
type: ConditionType.fallback,
136+
informHandler: (result, {required start, required cancel}) {
137+
switch (result) {
138+
case ChangeSetResult():
139+
cancel();
140+
case StatusResult(state: SourceState.interrupted):
141+
start();
142+
case StatusResult():
143+
break;
144+
}
145+
},
146+
);
147+
}
148+
149+
/// Creates a recovery condition. The timer starts immediately and the
150+
/// condition emits [ConditionType.recovery] when it fires. Results do
151+
/// not affect it.
152+
Condition createRecoveryCondition(Duration timeout) {
153+
return _TimedCondition(
154+
timeout: timeout,
155+
type: ConditionType.recovery,
156+
);
157+
}
158+
159+
/// A group of conditions managed together. The group merges the member
160+
/// streams and broadcasts results to all of them.
161+
final class ConditionGroup {
162+
final List<Condition> _conditions;
163+
final List<StreamSubscription<ConditionType>> _subscriptions = [];
164+
165+
late final StreamController<ConditionType> _controller =
166+
StreamController<ConditionType>(
167+
onListen: _subscribe,
168+
onCancel: close,
169+
);
170+
bool _fired = false;
171+
bool _closed = false;
172+
173+
ConditionGroup(List<Condition> conditions) : _conditions = conditions;
174+
175+
/// Single-subscription stream that emits at most one [ConditionType]
176+
/// (the first member condition to fire) and then closes. Closes
177+
/// without emitting if the group is closed first; a group with no
178+
/// member conditions never emits.
179+
///
180+
/// The group's lifetime is scoped to the subscription: member timers
181+
/// that start on their own start when the stream is listened to, and
182+
/// cancelling the subscription closes the group and its members. A
183+
/// group that is informed but never listened to can hold a timer
184+
/// until its timeout elapses; call [close] to release it early.
185+
Stream<ConditionType> get events => _controller.stream;
186+
187+
void _subscribe() {
188+
if (_closed) return;
189+
for (final condition in _conditions) {
190+
_subscriptions.add(condition.events.listen((type) {
191+
// First member to fire wins; member timers firing in the same
192+
// event-loop turn cannot produce a second emission.
193+
if (_fired || _closed) return;
194+
_fired = true;
195+
_controller.add(type);
196+
_finish();
197+
}));
198+
}
199+
}
200+
201+
/// Broadcast a result to all conditions.
202+
void inform(FDv2SourceResult result) {
203+
if (_closed) return;
204+
for (final condition in _conditions) {
205+
condition.inform(result);
206+
}
207+
}
208+
209+
/// Close all conditions and the merged stream. Idempotent.
210+
void close() {
211+
if (_closed) return;
212+
_finish();
213+
}
214+
215+
void _finish() {
216+
_closed = true;
217+
for (final subscription in _subscriptions) {
218+
subscription.cancel();
219+
}
220+
_subscriptions.clear();
221+
for (final condition in _conditions) {
222+
condition.close();
223+
}
224+
if (!_controller.isClosed) {
225+
_controller.close();
226+
}
227+
}
228+
}
229+
230+
/// Determines which conditions apply to the active synchronizer.
231+
///
232+
/// - With at most one available synchronizer there is nowhere to fall
233+
/// back to, so no conditions are created.
234+
/// - The primary (first available) synchronizer gets only a fallback
235+
/// condition.
236+
/// - A non-primary synchronizer gets both fallback and recovery
237+
/// conditions.
238+
ConditionGroup getConditions({
239+
required int availableSynchronizerCount,
240+
required bool isPrimary,
241+
Duration fallbackTimeout = defaultFallbackTimeout,
242+
Duration recoveryTimeout = defaultRecoveryTimeout,
243+
}) {
244+
if (availableSynchronizerCount <= 1) {
245+
return ConditionGroup(const []);
246+
}
247+
248+
if (isPrimary) {
249+
return ConditionGroup([createFallbackCondition(fallbackTimeout)]);
250+
}
251+
252+
return ConditionGroup([
253+
createFallbackCondition(fallbackTimeout),
254+
createRecoveryCondition(recoveryTimeout),
255+
]);
256+
}

0 commit comments

Comments
 (0)