Skip to content

Commit 1c35867

Browse files
authored
feat: honor the FDv1 fallback directive on success, error, and goodbye (#312)
Stacked on #311 (the `SseHttpError` surfacing this depends on). ## What Honors the FDv1 fallback directive across every way the server can deliver it: - **Successful connection:** the directive is emitted with the basis change set, so the streamed payload is applied *before* the SDK falls back — previously the basis was dropped the moment the header was seen. - **Any error response carrying the header** (`SseHttpError`, recoverable or not): the streaming source closes the connection — which stops the client's own retry — and routes to the fallback tier. - **`goodbye` event with `protocolFallbackTTL`:** treated as an in-band fallback directive, for transports that cannot read response headers. A single helper parses the directive (presence + TTL) from response headers, used for both the successful and error paths; the goodbye path reads its TTL in-band. The streaming source maps `SseHttpError` by its `recoverable` flag: recoverable → interrupted (the client retries), unrecoverable → terminal. The FDv1 streaming source ignores recoverable errors so a transient 5xx no longer shuts it down. When a fallback tier is configured the orchestrator engages it. When none is configured, the SDK stays interrupted and retries FDv2 after the directive's TTL (default 1 hour; a TTL of `0` means remain paused with no retry) rather than halting or reconnecting immediately. Source results carry the fallback TTL. ## Tests - Orchestrator: apply-then-engage from a directive-bearing change set, and the three no-fallback cases (finite TTL retries, absent TTL defers, zero TTL pauses). - `streaming_base`: defer-on-success, the TTL header, the goodbye/TTL directive, and the `SseHttpError` paths (recoverable → interrupted/stays open, unrecoverable → terminal/closes, directive → terminal+TTL regardless of recoverability). `protocol_handler` covers `protocolFallbackTTL` parsing. - v3 contract harness `fdv1-fallback` suite passes end-to-end (816 total, 792 ran, exit 0), with no regression. The contract-test-service capability + `fdv1Fallback` config wiring that exercises this in the harness lives on the e2e branch and will land with the v3 contract-tests PR. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes core FDv2 connection lifecycle, tier selection, and flag delivery ordering when servers send fallback signals; behavior shifts are broad but covered by new unit/contract tests. > > **Overview** > Implements end-to-end handling when the server asks the SDK to fall back from FDv2 to FDv1, including **TTL-aware retry** when no FDv1 tier is configured. > > **Directive parsing and propagation:** Adds shared `readFallbackDirective` for `x-ld-fd-fallback` / `x-ld-fd-fallback-ttl`, threads `fdv1FallbackTtl` through `FDv2SourceResult`, and parses in-band `protocolFallbackTTL` on goodbye events. > > **Streaming / polling behavior:** On a **successful** stream open, the directive is **deferred** and emitted with the next change set so the basis payload is applied before fallback (replacing immediate terminal error on connect). **HTTP errors** with the header, and **goodbye** with TTL or a pending header, surface as terminal fallback results so the orchestrator does not recycle past the directive. Polling mirrors goodbye/header TTL stamping. FDv2 streaming maps `SseHttpError` by recoverability; legacy FDv1 streaming **ignores recoverable** SSE HTTP errors so transient 5xx no longer shuts the source down. > > **Orchestrator:** Classifies directives into engage FDv1 tier, defer retry (no tier: wait TTL—default 1h, zero = pause indefinitely), or none; schedules recycle via `_pendingRetryDelay` and interruptible `_delay` on stop. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 74e7c4b. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 0707b60 commit 1c35867

12 files changed

Lines changed: 688 additions & 92 deletions
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/// The FDv1 fallback directive parsed from a connection's response
2+
/// headers. Its presence means the server asked the SDK to fall back;
3+
/// [ttl] is how long to remain on the fallback before retrying FDv2
4+
/// (null when the server gave no TTL; [Duration.zero] means indefinitely).
5+
final class FallbackDirective {
6+
final Duration? ttl;
7+
const FallbackDirective(this.ttl);
8+
}
9+
10+
/// Reads the FDv1 fallback directive from response headers, or null when
11+
/// the `x-ld-fd-fallback` header is not `"true"`. The single place that
12+
/// interprets these headers, shared by the streaming and polling sources.
13+
FallbackDirective? readFallbackDirective(Map<String, String> headers) {
14+
if (headers['x-ld-fd-fallback']?.toLowerCase() != 'true') {
15+
return null;
16+
}
17+
final raw = headers['x-ld-fd-fallback-ttl'];
18+
final seconds = raw == null ? null : int.tryParse(raw);
19+
return FallbackDirective(seconds == null ? null : Duration(seconds: seconds));
20+
}

packages/common_client/lib/src/data_sources/fdv2/orchestrator.dart

Lines changed: 110 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ enum _SynchronizerOutcome {
3434
stop,
3535
}
3636

37+
/// How the orchestrator responds to server-directed actions.
38+
enum _DirectiveAction {
39+
/// No directive present; carry on normally.
40+
none,
41+
42+
/// An FDv1 fallback tier exists and was engaged; advance to it.
43+
engageFdv1Fallback,
44+
45+
/// The directive was received but no FDv1 fallback tier is configured.
46+
/// The SDK stays interrupted and retries FDv2 after the directive's TTL
47+
/// rather than removing the source.
48+
deferRetry,
49+
}
50+
3751
/// The FDv2 data source orchestrator.
3852
///
3953
/// Runs the initializer chain to bring the SDK to a usable state, then
@@ -64,6 +78,18 @@ final class FDv2DataSourceOrchestrator implements DataSource {
6478
bool _emittedPayload = false;
6579
bool _initialized = false;
6680

81+
/// Default wait before retrying FDv2 after a fallback directive arrives
82+
/// with no FDv1 fallback tier configured and no server-provided TTL.
83+
static const Duration _defaultFallbackRetryInterval = Duration(hours: 1);
84+
85+
/// Overrides the next recycle delay when set (consumed once), used to
86+
/// honor a fallback directive's retry TTL.
87+
Duration? _pendingRetryDelay;
88+
89+
/// Completes when [stop] is called so a long interruptible wait (e.g. a
90+
/// fallback retry delay) wakes promptly on shutdown.
91+
final Completer<void> _stopCompleter = Completer<void>();
92+
6793
/// True when the only sources are cache initializers (no synchronizers).
6894
/// Such a system must still reach a usable state on a cache miss, so an
6995
/// empty payload is emitted when no data was produced.
@@ -117,6 +143,7 @@ final class FDv2DataSourceOrchestrator implements DataSource {
117143
void stop() {
118144
if (_closed) return;
119145
_closed = true;
146+
if (!_stopCompleter.isCompleted) _stopCompleter.complete();
120147
_sourceManager.close();
121148
// Wake the active synchronizer run so it can observe the closed
122149
// state.
@@ -190,22 +217,52 @@ final class FDv2DataSourceOrchestrator implements DataSource {
190217
_controller.add(InitializedEvent());
191218
}
192219

193-
/// True when the source indicated an FDv1 fallback directive and a
194-
/// fallback tier exists to engage. The directive is ignored when the
220+
/// Classifies a server-directed FDv1 fallback and, for
221+
/// [_DirectiveAction.engageFdv1Fallback], makes the source manager switch
222+
/// tiers. The directive is ignored ([_DirectiveAction.none]) when the
195223
/// FDv1 fallback synchronizer is already the active source, so it cannot
196-
/// loop (the fallback source also never re-asserts the directive).
197-
bool _handleFdv1Fallback(FDv2SourceResult result) {
198-
if (result.fdv1Fallback &&
199-
_sourceManager.hasFdv1FallbackConfigured &&
200-
!_sourceManager.isCurrentSynchronizerFdv1Fallback) {
224+
/// loop (the fallback source also never re-asserts the directive). When
225+
/// there is no FDv1 fallback tier we defer retrying FDv2
226+
/// ([_DirectiveAction.deferRetry]).
227+
_DirectiveAction _processDirective(FDv2SourceResult result) {
228+
if (!result.fdv1Fallback ||
229+
_sourceManager.isCurrentSynchronizerFdv1Fallback) {
230+
return _DirectiveAction.none;
231+
}
232+
if (_sourceManager.hasFdv1FallbackConfigured) {
201233
_logger.warn('Server directed fallback to FDv1; engaging the FDv1 '
202234
'fallback synchronizer.');
203235
_sourceManager.engageFdv1Fallback();
204-
return true;
236+
return _DirectiveAction.engageFdv1Fallback;
237+
}
238+
return _DirectiveAction.deferRetry;
239+
}
240+
241+
/// Honors a fallback directive that has no FDv1 tier to engage: the SDK
242+
/// stays interrupted and schedules an FDv2 retry after the directive's
243+
/// TTL. A TTL of zero means remain indefinitely (no retry); an absent
244+
/// TTL uses [_defaultFallbackRetryInterval].
245+
void _scheduleFallbackRetry(
246+
Duration? ttl, void Function(_SynchronizerOutcome) resolve) {
247+
if (ttl == Duration.zero) {
248+
_logger.warn('Server directed FDv1 fallback with no fallback tier '
249+
'configured and an indefinite TTL; FDv2 updates are paused.');
250+
resolve(_SynchronizerOutcome.stop);
251+
return;
205252
}
206-
return false;
253+
final delay = ttl ?? _defaultFallbackRetryInterval;
254+
_logger.warn('Server directed FDv1 fallback but no fallback tier is '
255+
'configured; staying interrupted and retrying FDv2 in '
256+
'${delay.inSeconds}s.');
257+
_pendingRetryDelay = delay;
258+
resolve(_SynchronizerOutcome.recycle);
207259
}
208260

261+
/// Waits [d], returning early if the orchestrator is stopped so a long
262+
/// fallback-retry delay does not pin the run open past shutdown.
263+
Future<void> _delay(Duration d) =>
264+
Future.any([Future<void>.delayed(d), _stopCompleter.future]);
265+
209266
Future<void> _runInitializers() async {
210267
var errorDuringInit = false;
211268
var dataReceived = false;
@@ -225,19 +282,26 @@ final class FDv2DataSourceOrchestrator implements DataSource {
225282
_emitPayload(result);
226283
dataReceived = true;
227284

228-
if (_handleFdv1Fallback(result)) {
229-
// Data was received but the server directed FDv1 fallback;
230-
// move on to synchronizers where the fallback tier runs and
231-
// its first change set completes initialization.
232-
return;
233-
}
234-
235-
if (result.changeSet.selector.isNotEmpty) {
236-
// A selector means a complete, server-versioned payload:
237-
// initialization is done. A selector-less payload (e.g. cache)
238-
// is applied, but we keep initializing toward network data.
239-
_emitInitialized();
240-
return;
285+
switch (_processDirective(result)) {
286+
case _DirectiveAction.engageFdv1Fallback:
287+
// Data received but the server directed FDv1 fallback;
288+
// move on to synchronizers where the fallback tier runs
289+
// and its first change set completes initialization.
290+
return;
291+
case _DirectiveAction.deferRetry:
292+
// An initializer is one-shot and cannot retry itself;
293+
// let the chain exhaust and leave retry timing to the
294+
// synchronizer tier.
295+
break;
296+
case _DirectiveAction.none:
297+
if (result.changeSet.selector.isNotEmpty) {
298+
// A selector means a complete, server-versioned payload:
299+
// initialization is done. A selector-less payload (e.g.
300+
// cache) is applied, but we keep initializing toward
301+
// network data.
302+
_emitInitialized();
303+
return;
304+
}
241305
}
242306
}
243307
case StatusResult():
@@ -253,7 +317,8 @@ final class FDv2DataSourceOrchestrator implements DataSource {
253317
case SourceState.goodbye:
254318
break;
255319
}
256-
if (_handleFdv1Fallback(result)) {
320+
if (_processDirective(result) ==
321+
_DirectiveAction.engageFdv1Fallback) {
257322
return;
258323
}
259324
}
@@ -298,8 +363,10 @@ final class FDv2DataSourceOrchestrator implements DataSource {
298363
while (!_closed) {
299364
if (recycleCurrent) {
300365
recycleCurrent = false;
301-
if (_recycleDelay > Duration.zero) {
302-
await Future<void>.delayed(_recycleDelay);
366+
final delay = _pendingRetryDelay ?? _recycleDelay;
367+
_pendingRetryDelay = null;
368+
if (delay > Duration.zero) {
369+
await _delay(delay);
303370
if (_closed) return;
304371
}
305372
synchronizer = _sourceManager.recreateCurrentSynchronizer();
@@ -393,12 +460,15 @@ final class FDv2DataSourceOrchestrator implements DataSource {
393460
_logger.warn('Synchronizer terminal error: '
394461
'${result.message ?? 'unknown error'}');
395462
_reportTransientError(result);
396-
if (_handleFdv1Fallback(result)) {
397-
resolve(_SynchronizerOutcome.advance);
398-
return;
463+
switch (_processDirective(result)) {
464+
case _DirectiveAction.engageFdv1Fallback:
465+
resolve(_SynchronizerOutcome.advance);
466+
case _DirectiveAction.deferRetry:
467+
_scheduleFallbackRetry(result.fdv1FallbackTtl, resolve);
468+
case _DirectiveAction.none:
469+
_sourceManager.blockCurrentSynchronizer();
470+
resolve(_SynchronizerOutcome.advance);
399471
}
400-
_sourceManager.blockCurrentSynchronizer();
401-
resolve(_SynchronizerOutcome.advance);
402472
return;
403473
case SourceState.shutdown:
404474
// A synchronizer that shuts itself down before the system
@@ -421,8 +491,16 @@ final class FDv2DataSourceOrchestrator implements DataSource {
421491
}
422492
}
423493

424-
if (_handleFdv1Fallback(result)) {
425-
resolve(_SynchronizerOutcome.advance);
494+
switch (_processDirective(result)) {
495+
case _DirectiveAction.engageFdv1Fallback:
496+
// A change set carrying the directive (e.g. a successful stream
497+
// that delivered its basis then asked us to fall back) has been
498+
// applied above; now advance to the fallback tier.
499+
resolve(_SynchronizerOutcome.advance);
500+
case _DirectiveAction.deferRetry:
501+
_scheduleFallbackRetry(result.fdv1FallbackTtl, resolve);
502+
case _DirectiveAction.none:
503+
break;
426504
}
427505
}, onDone: () {
428506
if (outcome.isCompleted || _closed) return;

packages/common_client/lib/src/data_sources/fdv2/polling_base.dart

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'dart:convert';
44
import 'package:http/http.dart' as http;
55
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
66

7+
import 'fallback_directive.dart';
78
import 'flag_eval_mapper.dart';
89
import 'payload.dart';
910
import 'protocol_handler.dart';
@@ -27,12 +28,14 @@ import 'source_result.dart';
2728
/// through an [FDv2ProtocolHandler]. The first emitted action
2829
/// determines the result.
2930
///
30-
/// `x-ld-fd-fallback: true` is treated as an annotation on whatever
31-
/// result the response would otherwise produce: the body is still
32-
/// parsed and used, the 304 is still treated as no-op, errors are
33-
/// still classified by status code, and `fdv1Fallback: true` is
34-
/// stamped on the resulting [FDv2SourceResult]. The orchestrator can
35-
/// consume the data and transition to FDv1 in the same step.
31+
/// The `x-ld-fd-fallback` directive (read via [readFallbackDirective],
32+
/// shared with the streaming source) is treated as an annotation on
33+
/// whatever result the response would otherwise produce: the body is
34+
/// still parsed and used, the 304 is still treated as no-op, errors are
35+
/// still classified by status code, and `fdv1Fallback: true` plus the
36+
/// fallback TTL are stamped on the resulting [FDv2SourceResult]. The
37+
/// orchestrator can consume the data and transition to FDv1 in the same
38+
/// step.
3639
final class FDv2PollingBase {
3740
final LDLogger _logger;
3841
final FDv2Requestor _requestor;
@@ -65,9 +68,9 @@ final class FDv2PollingBase {
6568
}
6669

6770
FDv2SourceResult _processResponse(RequestorResponse response) {
68-
// Match `x-ld-fd-fallback` case-insensitively.
69-
final fdv1Fallback =
70-
response.headers['x-ld-fd-fallback']?.toLowerCase() == 'true';
71+
final directive = readFallbackDirective(response.headers);
72+
final fdv1Fallback = directive != null;
73+
final fdv1FallbackTtl = directive?.ttl;
7174
final environmentId = response.headers['x-ld-envid'];
7275

7376
// 304 Not Modified means the SDK's cached data is confirmed current.
@@ -78,6 +81,7 @@ final class FDv2PollingBase {
7881
freshness: _now(),
7982
persist: true,
8083
fdv1Fallback: fdv1Fallback,
84+
fdv1FallbackTtl: fdv1FallbackTtl,
8185
);
8286
}
8387

@@ -89,27 +93,31 @@ final class FDv2PollingBase {
8993
statusCode: response.status,
9094
message: message,
9195
fdv1Fallback: fdv1Fallback,
96+
fdv1FallbackTtl: fdv1FallbackTtl,
9297
);
9398
}
9499
_logger.error('$message; will not retry');
95100
return FDv2SourceResults.terminalError(
96101
statusCode: response.status,
97102
message: message,
98103
fdv1Fallback: fdv1Fallback,
104+
fdv1FallbackTtl: fdv1FallbackTtl,
99105
);
100106
}
101107

102108
return _parseBody(
103109
response,
104110
environmentId: environmentId,
105111
fdv1Fallback: fdv1Fallback,
112+
fdv1FallbackTtl: fdv1FallbackTtl,
106113
);
107114
}
108115

109116
FDv2SourceResult _parseBody(
110117
RequestorResponse response, {
111118
String? environmentId,
112119
required bool fdv1Fallback,
120+
required Duration? fdv1FallbackTtl,
113121
}) {
114122
// The whole parse path is wrapped: jsonDecode plus the structural
115123
// casts inside FDv2EventsCollection.fromJson and the per-event
@@ -122,6 +130,7 @@ final class FDv2PollingBase {
122130
statusCode: response.status,
123131
message: 'Polling response was not a JSON object',
124132
fdv1Fallback: fdv1Fallback,
133+
fdv1FallbackTtl: fdv1FallbackTtl,
125134
);
126135
}
127136

@@ -145,21 +154,33 @@ final class FDv2PollingBase {
145154
freshness: _now(),
146155
persist: true,
147156
fdv1Fallback: fdv1Fallback,
157+
fdv1FallbackTtl: fdv1FallbackTtl,
148158
);
149-
case ActionGoodbye(:final reason):
150-
return FDv2SourceResults.goodbyeResult(
151-
message: reason,
152-
fdv1Fallback: fdv1Fallback,
153-
);
159+
case ActionGoodbye(:final reason, :final protocolFallbackTtl):
160+
// A goodbye can itself carry a fallback directive: in-band via
161+
// protocolFallbackTtl, or via the response header. The
162+
// orchestrator's goodbye path recycles without consulting the
163+
// directive, so surface it as a terminal fallback result (as the
164+
// streaming source does) rather than an ordinary goodbye.
165+
if (protocolFallbackTtl != null || fdv1Fallback) {
166+
return FDv2SourceResults.terminalError(
167+
message: reason,
168+
fdv1Fallback: true,
169+
fdv1FallbackTtl: protocolFallbackTtl ?? fdv1FallbackTtl,
170+
);
171+
}
172+
return FDv2SourceResults.goodbyeResult(message: reason);
154173
case ActionServerError(:final reason):
155174
return FDv2SourceResults.interrupted(
156175
message: reason,
157176
fdv1Fallback: fdv1Fallback,
177+
fdv1FallbackTtl: fdv1FallbackTtl,
158178
);
159179
case ActionError(:final message):
160180
return FDv2SourceResults.interrupted(
161181
message: message,
162182
fdv1Fallback: fdv1Fallback,
183+
fdv1FallbackTtl: fdv1FallbackTtl,
163184
);
164185
case ActionNone():
165186
// Continue accumulating events until a payload-transferred or
@@ -175,6 +196,7 @@ final class FDv2PollingBase {
175196
statusCode: response.status,
176197
message: 'Polling response did not include a complete payload',
177198
fdv1Fallback: fdv1Fallback,
199+
fdv1FallbackTtl: fdv1FallbackTtl,
178200
);
179201
} catch (err, stack) {
180202
// Log only the type at error level (not the message — `jsonDecode`
@@ -187,6 +209,7 @@ final class FDv2PollingBase {
187209
statusCode: response.status,
188210
message: 'Polling response body was malformed',
189211
fdv1Fallback: fdv1Fallback,
212+
fdv1FallbackTtl: fdv1FallbackTtl,
190213
);
191214
}
192215
}

0 commit comments

Comments
 (0)