Skip to content

Commit 43d899e

Browse files
authored
feat: Add FDv2 source manager (#298)
## What this adds The source manager tracks which FDv2 data source is active and handles source transitions for the orchestrator (a later PR). Nothing consumes it yet, so behavior is unchanged. It manages two tiers with different lifecycles: - **Initializers** run once each, in order, and exhaust: each call to `nextInitializer` closes the previous active source and produces the next one until the list runs out. The orchestrator uses this to bring the SDK to a usable state from cache or a poll. - **Synchronizer slots** cycle rather than exhaust: `nextAvailableSynchronizer` scans forward from the current position with wrap-around, skipping blocked slots. A slot is blocked after a terminal error (`blockCurrentSynchronizer`), the scan position resets to the primary for recovery (`resetSynchronizerIndex`), and `recreateCurrentSynchronizer` produces a fresh instance of the current slot without advancing — used when a source must drop its connection and re-establish it (goodbye, restart). `isPrimarySynchronizer` and `availableSynchronizerCount` feed the fallback/recovery condition selection (#297). **FDv1 fallback slots** start blocked and are only activated by a server fallback directive: `engageFdv1Fallback` makes the FDv1 slots available and blocks the FDv2 tier, disabling it rather than removing it so the configuration survives for a later recovery to FDv2. Every transition closes the previously active source, and `close` shuts the manager down so no further sources can be created. ## Testing Unit tests cover initializer ordering and exhaustion, synchronizer cycling with wrap-around, blocked-slot skipping, the all-blocked case, recovery reset, in-place re-creation, FDv1 fallback engagement (FDv2 tier disabled, FDv1 tier activated), and that close prevents further source creation and closes the active source. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > New isolated module with tests only; no integration into existing data source paths until a follow-up PR. > > **Overview** > Adds a new FDv2 **`SourceManager`** (plus **`SynchronizerSlot`** / slot state) that will drive orchestrator source transitions. It is **not wired in yet**, so runtime behavior is unchanged. > > **Initializers** are advanced sequentially via `nextInitializer()` (each step closes the prior active source) until the factory list is exhausted. **Synchronizers** live in slots that **rotate** with `nextAvailableSynchronizer()` (forward scan with wrap, skip blocked slots). Recovery helpers include `blockCurrentSynchronizer()`, `resetSynchronizerIndex()`, and `recreateCurrentSynchronizer()` for reconnect without changing slot. **`engageFdv1Fallback()`** blocks FDv2 slots and unblocks FDv1 fallback slots (which start blocked). Exposes `isPrimarySynchronizer`, `availableSynchronizerCount`, and `hasFdv1Fallback` for later fallback logic. > > Unit tests cover ordering, cycling, blocking, FDv1 engagement, recreate, and shutdown. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 1716d97. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent d3e6d39 commit 43d899e

2 files changed

Lines changed: 356 additions & 0 deletions

File tree

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import 'entry_factories.dart';
2+
import 'source.dart';
3+
4+
/// State of a synchronizer slot.
5+
enum SynchronizerSlotState {
6+
/// Can be selected for use.
7+
available,
8+
9+
/// Cannot be selected (terminal error, or an FDv1 fallback slot that has
10+
/// not been activated).
11+
blocked,
12+
}
13+
14+
/// A slot in the synchronizer list, wrapping a factory with state.
15+
final class SynchronizerSlot {
16+
final SynchronizerFactory factory;
17+
18+
/// True when this slot is the FDv1 fallback adapter. FDv1 slots start
19+
/// blocked and are only activated by a server fallback directive.
20+
final bool isFdv1Fallback;
21+
22+
SynchronizerSlotState state;
23+
24+
SynchronizerSlot({
25+
required this.factory,
26+
this.isFdv1Fallback = false,
27+
SynchronizerSlotState? initialState,
28+
}) : state = initialState ??
29+
(isFdv1Fallback
30+
? SynchronizerSlotState.blocked
31+
: SynchronizerSlotState.available);
32+
}
33+
34+
/// Manages the state of initializers and synchronizers, tracks which
35+
/// source is active, and handles source transitions for the orchestrator.
36+
final class SourceManager {
37+
final List<InitializerFactory> _initializerFactories;
38+
final List<SynchronizerSlot> _synchronizerSlots;
39+
final SelectorGetter _selectorGetter;
40+
41+
Initializer? _activeInitializer;
42+
Synchronizer? _activeSynchronizer;
43+
int _initializerIndex = -1;
44+
int _synchronizerIndex = -1;
45+
bool _shutdown = false;
46+
47+
SourceManager({
48+
required List<InitializerFactory> initializerFactories,
49+
required List<SynchronizerSlot> synchronizerSlots,
50+
required SelectorGetter selectorGetter,
51+
}) : _initializerFactories = initializerFactories,
52+
_synchronizerSlots = synchronizerSlots,
53+
_selectorGetter = selectorGetter;
54+
55+
bool get isShutdown => _shutdown;
56+
57+
void _closeActiveSource() {
58+
_activeInitializer?.close();
59+
_activeInitializer = null;
60+
_activeSynchronizer?.close();
61+
_activeSynchronizer = null;
62+
}
63+
64+
int _firstAvailableIndex() => _synchronizerSlots
65+
.indexWhere((slot) => slot.state == SynchronizerSlotState.available);
66+
67+
/// Get the next initializer and set it as the active source. Closes the
68+
/// previous active source. Returns null when all initializers are
69+
/// exhausted.
70+
Initializer? nextInitializer() {
71+
if (_shutdown) return null;
72+
73+
_initializerIndex += 1;
74+
if (_initializerIndex >= _initializerFactories.length) {
75+
return null;
76+
}
77+
78+
_closeActiveSource();
79+
final initializer =
80+
_initializerFactories[_initializerIndex].create(_selectorGetter);
81+
_activeInitializer = initializer;
82+
return initializer;
83+
}
84+
85+
/// Get the next available (non-blocked) synchronizer and set it as the
86+
/// active source, scanning forward from the current position and
87+
/// wrapping around. Closes the previous active source. Returns null
88+
/// when no synchronizer is available.
89+
Synchronizer? nextAvailableSynchronizer() {
90+
if (_shutdown || _synchronizerSlots.isEmpty) return null;
91+
92+
var visited = 0;
93+
while (visited < _synchronizerSlots.length) {
94+
_synchronizerIndex += 1;
95+
if (_synchronizerIndex >= _synchronizerSlots.length) {
96+
_synchronizerIndex = 0;
97+
}
98+
99+
final candidate = _synchronizerSlots[_synchronizerIndex];
100+
if (candidate.state == SynchronizerSlotState.available) {
101+
_closeActiveSource();
102+
final synchronizer = candidate.factory.create(_selectorGetter);
103+
_activeSynchronizer = synchronizer;
104+
return synchronizer;
105+
}
106+
visited += 1;
107+
}
108+
109+
return null;
110+
}
111+
112+
/// Close the active synchronizer and create a fresh instance from the
113+
/// current slot, without advancing the scan position. Used when a
114+
/// source must drop its connection and re-establish it (goodbye,
115+
/// invalid data). Returns null if the current slot is not available
116+
/// or no synchronizer has been started yet.
117+
Synchronizer? recreateCurrentSynchronizer() {
118+
if (_shutdown ||
119+
_synchronizerIndex < 0 ||
120+
_synchronizerIndex >= _synchronizerSlots.length) {
121+
return null;
122+
}
123+
final slot = _synchronizerSlots[_synchronizerIndex];
124+
if (slot.state != SynchronizerSlotState.available) {
125+
return null;
126+
}
127+
_closeActiveSource();
128+
final synchronizer = slot.factory.create(_selectorGetter);
129+
_activeSynchronizer = synchronizer;
130+
return synchronizer;
131+
}
132+
133+
/// Mark the current synchronizer as blocked (e.g. after a terminal
134+
/// error).
135+
void blockCurrentSynchronizer() {
136+
if (_synchronizerIndex >= 0 &&
137+
_synchronizerIndex < _synchronizerSlots.length) {
138+
_synchronizerSlots[_synchronizerIndex].state =
139+
SynchronizerSlotState.blocked;
140+
}
141+
}
142+
143+
/// Reset the synchronizer scan position so the next call to
144+
/// [nextAvailableSynchronizer] starts from the beginning.
145+
void resetSynchronizerIndex() {
146+
_synchronizerIndex = -1;
147+
}
148+
149+
/// Block all non-FDv1 synchronizers and unblock FDv1 synchronizers.
150+
void engageFdv1Fallback() {
151+
for (final slot in _synchronizerSlots) {
152+
slot.state = slot.isFdv1Fallback
153+
? SynchronizerSlotState.available
154+
: SynchronizerSlotState.blocked;
155+
}
156+
_synchronizerIndex = -1;
157+
}
158+
159+
/// True if the current synchronizer is the first available (primary).
160+
bool get isPrimarySynchronizer =>
161+
_synchronizerIndex == _firstAvailableIndex();
162+
163+
/// Count of synchronizers in the available state.
164+
int get availableSynchronizerCount => _synchronizerSlots
165+
.where((slot) => slot.state == SynchronizerSlotState.available)
166+
.length;
167+
168+
/// True if any synchronizer slot is marked as an FDv1 fallback.
169+
bool get hasFdv1Fallback =>
170+
_synchronizerSlots.any((slot) => slot.isFdv1Fallback);
171+
172+
/// Close the active source and mark the manager as shut down.
173+
void close() {
174+
_shutdown = true;
175+
_closeActiveSource();
176+
}
177+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import 'dart:async';
2+
3+
import 'package:launchdarkly_common_client/src/data_sources/fdv2/entry_factories.dart';
4+
import 'package:launchdarkly_common_client/src/data_sources/fdv2/selector.dart';
5+
import 'package:launchdarkly_common_client/src/data_sources/fdv2/source.dart';
6+
import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_manager.dart';
7+
import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart';
8+
import 'package:test/test.dart';
9+
10+
final class RecordingInitializer implements Initializer {
11+
bool closed = false;
12+
13+
@override
14+
Future<FDv2SourceResult> run() async =>
15+
FDv2SourceResults.shutdown(message: 'unused');
16+
17+
@override
18+
void close() {
19+
closed = true;
20+
}
21+
}
22+
23+
final class RecordingSynchronizer implements Synchronizer {
24+
final int slotIndex;
25+
bool closed = false;
26+
27+
RecordingSynchronizer(this.slotIndex);
28+
29+
@override
30+
Stream<FDv2SourceResult> get results => const Stream.empty();
31+
32+
@override
33+
void close() {
34+
closed = true;
35+
}
36+
}
37+
38+
SynchronizerSlot _slot(int index, List<RecordingSynchronizer> created,
39+
{bool isFdv1Fallback = false}) {
40+
return SynchronizerSlot(
41+
isFdv1Fallback: isFdv1Fallback,
42+
factory: SynchronizerFactory(create: (_) {
43+
final synchronizer = RecordingSynchronizer(index);
44+
created.add(synchronizer);
45+
return synchronizer;
46+
}),
47+
);
48+
}
49+
50+
SourceManager _manager(
51+
{List<InitializerFactory> initializers = const [],
52+
List<SynchronizerSlot> slots = const []}) {
53+
return SourceManager(
54+
initializerFactories: initializers,
55+
synchronizerSlots: slots,
56+
selectorGetter: () => Selector.empty,
57+
);
58+
}
59+
60+
void main() {
61+
test('initializers are produced once each, in order, and exhaust', () {
62+
final created = <RecordingInitializer>[];
63+
InitializerFactory factory() => InitializerFactory(create: (_) {
64+
final initializer = RecordingInitializer();
65+
created.add(initializer);
66+
return initializer;
67+
});
68+
69+
final manager = _manager(initializers: [factory(), factory()]);
70+
71+
expect(manager.nextInitializer(), isNotNull);
72+
expect(manager.nextInitializer(), isNotNull);
73+
expect(manager.nextInitializer(), isNull);
74+
expect(created, hasLength(2));
75+
expect(created.first.closed, isTrue,
76+
reason: 'starting the second source closes the first');
77+
});
78+
79+
test('synchronizers cycle through available slots and wrap around', () {
80+
final created = <RecordingSynchronizer>[];
81+
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);
82+
83+
expect(manager.nextAvailableSynchronizer(), isNotNull);
84+
expect(created.last.slotIndex, 0);
85+
expect(manager.isPrimarySynchronizer, isTrue);
86+
87+
expect(manager.nextAvailableSynchronizer(), isNotNull);
88+
expect(created.last.slotIndex, 1);
89+
expect(manager.isPrimarySynchronizer, isFalse);
90+
91+
expect(manager.nextAvailableSynchronizer(), isNotNull);
92+
expect(created.last.slotIndex, 0, reason: 'cycling wraps to the start');
93+
});
94+
95+
test('a blocked slot is skipped', () {
96+
final created = <RecordingSynchronizer>[];
97+
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);
98+
99+
manager.nextAvailableSynchronizer();
100+
manager.blockCurrentSynchronizer();
101+
expect(manager.availableSynchronizerCount, 1);
102+
103+
manager.nextAvailableSynchronizer();
104+
expect(created.last.slotIndex, 1);
105+
manager.nextAvailableSynchronizer();
106+
expect(created.last.slotIndex, 1,
107+
reason: 'only the unblocked slot remains in rotation');
108+
});
109+
110+
test('all slots blocked yields null', () {
111+
final created = <RecordingSynchronizer>[];
112+
final manager = _manager(slots: [_slot(0, created)]);
113+
114+
manager.nextAvailableSynchronizer();
115+
manager.blockCurrentSynchronizer();
116+
expect(manager.nextAvailableSynchronizer(), isNull);
117+
});
118+
119+
test('resetSynchronizerIndex returns rotation to the first slot', () {
120+
final created = <RecordingSynchronizer>[];
121+
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);
122+
123+
manager.nextAvailableSynchronizer();
124+
manager.nextAvailableSynchronizer();
125+
manager.resetSynchronizerIndex();
126+
manager.nextAvailableSynchronizer();
127+
expect(created.last.slotIndex, 0);
128+
});
129+
130+
test(
131+
'recreateCurrentSynchronizer produces a fresh instance of the same '
132+
'slot', () {
133+
final created = <RecordingSynchronizer>[];
134+
final manager = _manager(slots: [_slot(0, created), _slot(1, created)]);
135+
136+
manager.nextAvailableSynchronizer();
137+
final recreated = manager.recreateCurrentSynchronizer();
138+
expect(recreated, isNotNull);
139+
expect(created, hasLength(2));
140+
expect(created.last.slotIndex, 0);
141+
expect(created.first.closed, isTrue);
142+
expect(manager.isPrimarySynchronizer, isTrue);
143+
});
144+
145+
test('FDv1 fallback slots start blocked and engage on fallback', () {
146+
final fdv2Created = <RecordingSynchronizer>[];
147+
final fdv1Created = <RecordingSynchronizer>[];
148+
final manager = _manager(slots: [
149+
_slot(0, fdv2Created),
150+
_slot(1, fdv1Created, isFdv1Fallback: true),
151+
]);
152+
153+
expect(manager.hasFdv1Fallback, isTrue);
154+
expect(manager.availableSynchronizerCount, 1);
155+
156+
manager.nextAvailableSynchronizer();
157+
expect(fdv2Created, hasLength(1));
158+
159+
manager.engageFdv1Fallback();
160+
expect(manager.availableSynchronizerCount, 1);
161+
162+
manager.nextAvailableSynchronizer();
163+
expect(fdv1Created, hasLength(1));
164+
expect(fdv2Created, hasLength(1),
165+
reason: 'the FDv2 tier is disabled after fallback');
166+
});
167+
168+
test('close prevents further source creation', () {
169+
final created = <RecordingSynchronizer>[];
170+
final manager = _manager(slots: [_slot(0, created)]);
171+
172+
manager.nextAvailableSynchronizer();
173+
manager.close();
174+
expect(manager.isShutdown, isTrue);
175+
expect(created.single.closed, isTrue);
176+
expect(manager.nextAvailableSynchronizer(), isNull);
177+
expect(manager.recreateCurrentSynchronizer(), isNull);
178+
});
179+
}

0 commit comments

Comments
 (0)