Skip to content

Commit eb9a35f

Browse files
authored
feat: Add the FDv2 data source orchestrator (#307)
> Stacked on #309 (FDv2 source-layer translation); that lands first. ## What this adds The FDv2 orchestrator: the loop that runs the initializer chain to bring the SDK to a usable state, then drives the synchronizer tier with fallback and recovery transitions. Nothing constructs it in production yet (the data system wires it up in a later PR), so behavior is unchanged. ### The orchestration model The orchestrator is subscription-driven rather than future-racing. Each synchronizer run holds exactly two subscriptions — one on the synchronizer's results, one on the merged condition stream — and a single outcome completer decides what happens next: advance to the next synchronizer (fallback fired or terminal error), recycle the current one (goodbye / invalid data), recover to the primary (recovery fired), or stop (shutdown). Everything a run allocates is subscription-scoped and released in its `finally`, so a healthy synchronizer that streams change sets indefinitely holds constant memory: there is no per-result allocation, and no listener is ever attached to a long-lived future (future listeners are only released on completion, which is the leak class this design avoids). Other behaviors: - Initializer results seed the selector; the first applied payload's selector is carried across source instances so reconnects resume with deltas. - A server FDv1-fallback directive engages the FDv1 fallback tier when one is configured (the FDv1 slots start blocked). - Exhausting every source without ever receiving data halts the data system with a shutdown status, so a pending identify fails rather than hanging; exhaustion after data was received logs and stops quietly. - A configurable recycle delay separates goodbye-driven reconnects so a flapping server cannot drive a tight reconnect loop. ### `PayloadEvent` Applied payloads are emitted as `PayloadEvent`s on the data source event stream — the already-parsed updates, rather than the FDv1 JSON string forms. Since `DataSourceEvent` is sealed, the FDv1 consumers (`DataSourceManager`, the polling and streaming data sources) gain no-op cases; FDv1 sources never produce payload events, and routing `PayloadEvent` into the event handler is the data system PR's job. ## Testing Orchestrator tests drive full lifecycles with fake synchronizers and `fake_async`: initializer chain ordering and seeding, fallback after the interrupted timeout, recovery back to the primary, goodbye recycling with the delay, FDv1 fallback engagement, halt-on-exhaustion before first data, shutdown, and a memory-bound test asserting a healthy synchronizer streaming results holds constant memory across thousands of results. Full package suite passes. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches core flag acquisition orchestration and identify/shutdown semantics, but production behavior is unchanged until the orchestrator is wired in. > > **Overview** > Adds the **FDv2 data source orchestrator** (`FDv2DataSourceOrchestrator`), which implements `DataSource` but is not wired into production yet. It runs the initializer chain until the SDK has basis data, then drives synchronizer tiers with **fallback**, **recovery**, **goodbye/restart recycling**, and optional **FDv1 fallback** engagement. > > Applied FDv2 change sets are surfaced as a new sealed event type **`PayloadEvent`** (typed `ChangeSet`, not FDv1 JSON). FDv1 paths (`DataSourceManager`, polling, streaming) add **no-op** `PayloadEvent` branches so exhaustive switches compile; routing payloads into flag handling is left for a follow-up PR. > > The orchestrator uses **subscription-driven** synchronizer runs (fixed listeners + outcome completer) to avoid per-result memory growth, reports recoverable errors via `DataSourceStatusManager`, and emits **shutdown** `StatusEvent`s only when sources exhaust or fail before any data (so identify does not hang). Extensive orchestrator tests cover lifecycles, halt/recovery edge cases, and a memory soak. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 63851e1. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 7b46ac6 commit eb9a35f

8 files changed

Lines changed: 1046 additions & 0 deletions

File tree

packages/common_client/lib/src/data_sources/data_source.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'data_source_status.dart';
2+
import 'fdv2/payload.dart';
23

34
sealed class DataSourceEvent {}
45

@@ -10,6 +11,16 @@ final class DataEvent implements DataSourceEvent {
1011
DataEvent(this.type, this.data, {this.environmentId});
1112
}
1213

14+
/// An FDv2 change set produced by the data source orchestrator. Carries
15+
/// typed flag descriptors translated at acquisition time, not the FDv1
16+
/// JSON string forms.
17+
final class PayloadEvent implements DataSourceEvent {
18+
final ChangeSet changeSet;
19+
final String? environmentId;
20+
21+
PayloadEvent(this.changeSet, {this.environmentId});
22+
}
23+
1324
final class StatusEvent implements DataSourceEvent {
1425
ErrorKind kind;
1526
num? statusCode;

packages/common_client/lib/src/data_sources/data_source_manager.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ final class DataSourceManager {
159159
// Only need to complete this the first time.
160160
_identifyCompleter = null;
161161
return handled;
162+
case PayloadEvent():
163+
// The FDv1 data sources this manager runs never produce FDv2
164+
// payload events.
165+
return MessageStatus.messageHandled;
162166
case StatusEvent():
163167
if (_identifyCompleter != null && !_identifyCompleter!.isCompleted) {
164168
_identifyCompleter!.completeError(Exception(event.message));

0 commit comments

Comments
 (0)