Skip to content

Commit 7b46ac6

Browse files
authored
feat: Translate FDv2 payloads at the data source layer (#309)
BEGIN_COMMIT_OVERRIDE feat: Translate FDv2 payloads at the data source layer feat: Add FDv2 streaming source factories and query-parameter authentication END_COMMIT_OVERRIDE ## What this changes Moves flag-eval translation out of the apply path and into the data source layer, and introduces a typed `ChangeSet` distinct from the wire-level `Payload`. Previously the streaming and polling sources emitted a `ChangeSetResult` carrying the raw wire `Payload` (a list of `Update`s with unparsed object maps), and the flag-eval objects were converted to typed descriptors later, at apply time in `DataSourceEventHandler.handlePayload`. A payload that parsed at the protocol level but whose flag objects were malformed therefore failed *after* acquisition — surfacing as an invalid-message that drove a connection restart, on a fixed interval, without ever arming the orchestrator's fallback timer. Now: - `ChangeSet` carries typed `Map<String, ItemDescriptor>` updates plus the type and selector. `translatePayload` converts a wire `Payload` into a `ChangeSet`, throwing if any flag-eval object cannot be parsed. - The streaming and polling sources translate at acquisition. A translation failure becomes an **interrupted** source result, exactly like a malformed protocol body — so the orchestrator's fallback timer governs recovery (a source stuck on invalid data falls back after the timeout instead of retrying forever). Streaming discards the partial handler state and keeps the SSE connection; the server's next valid payload is processed normally. - `ChangeSetResult` carries the `ChangeSet`, and `handlePayload` applies it directly with no conversion and no failure path of its own. - The cache initializer builds its `ChangeSet` straight from the already-typed cached evaluation results, dropping a JSON round-trip that previously serialized typed results only to re-parse them. This is foundational for the FDv2 data system: the streaming-source and orchestrator PRs build on `ChangeSet`. Nothing constructs the FDv2 sources in production yet, so behavior is unchanged for shipping code. ## Testing Source-layer tests cover a protocol-valid payload whose flag data cannot be parsed: polling returns interrupted, and streaming returns interrupted then recovers on the next valid payload. The flag manager / event handler tests apply typed change sets directly (full replace, partial without per-item version comparison, none). Cache initializer tests assert the evaluation result is carried through without re-parsing. Full `common_client` suite passes. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches FDv2 acquisition, URL/auth composition, and error-classification paths that govern fallback and flag updates; changes are largely behind not-yet-production FDv2 wiring but alter recovery semantics when invalid flag data arrives. > > **Overview** > FDv2 now separates wire **`Payload`** from typed **`ChangeSet`** (`Map<String, ItemDescriptor>`). **`translatePayload`** runs in polling/streaming (and cache builds descriptors directly), so **`ChangeSetResult`** and **`handlePayload`** only apply already-typed updates. > > **Malformed flag-eval data** is surfaced as **`interrupted`** at the source layer (like other transient parse errors) instead of failing later in the event handler—so the orchestrator’s fallback timer can kick in rather than endless connection retries. > > Adds shared **`buildFDv2Uri`** (relay query params, `basis`, `withReasons`) for polling and streaming; **`SourceFactoryContext`** carries credential-driven **`auth`** query params. **Streaming synchronizer factories** are implemented (SSE client, per-reconnect URI/`basis`, legacy **`ping`** → one-shot poll, env ID fallback). Streaming also maps **`UnrecoverableStatusError`** to terminal errors with optional FDv1 fallback. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 38646ce. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent b20c35e commit 7b46ac6

22 files changed

Lines changed: 655 additions & 173 deletions

packages/common_client/lib/src/data_sources/data_source_event_handler.dart

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import '../flag_manager/flag_manager.dart';
66
import '../item_descriptor.dart';
77
import 'data_source_status.dart';
88
import 'data_source_status_manager.dart';
9-
import 'fdv2/flag_eval_mapper.dart';
109
import 'fdv2/payload.dart';
1110

1211
enum MessageStatus { messageHandled, invalidMessage, unhandledVerb }
@@ -98,24 +97,23 @@ final class DataSourceEventHandler {
9897
}
9998
}
10099

101-
/// Applies an FDv2 payload to the flag store.
100+
/// Applies an FDv2 change set to the flag store.
102101
///
103-
/// Full payloads replace the stored flags, partial payloads apply each
104-
/// update, and a payload of type none confirms the SDK is up to date
105-
/// without changing data. All three mark the data source valid.
106-
Future<MessageStatus> handlePayload(LDContext context, Payload payload,
102+
/// Full change sets replace the stored flags, partial change sets apply
103+
/// each update, and a change set of type none confirms the SDK is up to
104+
/// date without changing data. All three mark the data source valid.
105+
Future<MessageStatus> handlePayload(LDContext context, ChangeSet changeSet,
107106
{String? environmentId}) async {
108107
try {
109-
final updates = mapUpdatesToItemDescriptors(payload.updates);
110-
await _flagManager.applyChanges(context, updates, payload.type,
108+
await _flagManager.applyChanges(
109+
context, changeSet.updates, changeSet.type,
111110
environmentId: environmentId);
112111
_statusManager.setValid();
113112
return MessageStatus.messageHandled;
114113
} catch (err) {
115-
_logger.error('FDv2 payload contained invalid flag data: '
116-
'${err.runtimeType}');
114+
_logger.error('Failed to apply an FDv2 change set: ${err.runtimeType}');
117115
_statusManager.setErrorByKind(
118-
ErrorKind.invalidData, 'FDv2 payload contained invalid data');
116+
ErrorKind.invalidData, 'Failed to apply an FDv2 change set');
119117
return MessageStatus.invalidMessage;
120118
}
121119
}

packages/common_client/lib/src/data_sources/fdv2/cache_initializer.dart

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
22

3-
import 'flag_eval_mapper.dart';
3+
import '../../item_descriptor.dart';
44
import 'payload.dart';
5-
import 'selector.dart';
65
import 'source.dart';
76
import 'source_result.dart';
87

@@ -67,20 +66,15 @@ final class CacheInitializer implements Initializer {
6766
return _miss();
6867
}
6968

70-
final updates = <Update>[];
69+
final updates = <String, ItemDescriptor>{};
7170
cached.flags.forEach((key, evalResult) {
72-
updates.add(Update(
73-
kind: flagEvalKind,
74-
key: key,
75-
version: evalResult.version,
76-
object: LDEvaluationResultSerialization.toJson(evalResult),
77-
));
71+
updates[key] =
72+
ItemDescriptor(version: evalResult.version, flag: evalResult);
7873
});
7974

8075
return ChangeSetResult(
81-
payload: Payload(
76+
changeSet: ChangeSet(
8277
type: PayloadType.full,
83-
selector: Selector.empty,
8478
updates: updates,
8579
),
8680
environmentId: cached.environmentId,
@@ -95,9 +89,9 @@ final class CacheInitializer implements Initializer {
9589
}
9690

9791
ChangeSetResult _miss() => ChangeSetResult(
98-
payload: const Payload(
92+
changeSet: const ChangeSet(
9993
type: PayloadType.none,
100-
updates: [],
94+
updates: {},
10195
),
10296
freshness: _now(),
10397
persist: false,

packages/common_client/lib/src/data_sources/fdv2/endpoints.dart

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
2+
3+
import 'selector.dart';
4+
15
/// FDv2 endpoint paths.
26
///
37
/// These paths are uniform across mobile and browser SDKs; FDv2 does
@@ -20,3 +24,39 @@ abstract final class FDv2Endpoints {
2024
static String streamingGet(String encodedContext) =>
2125
'$streaming/$encodedContext';
2226
}
27+
28+
/// Builds an FDv2 request URI: appends [addedPath] to [baseUri]'s path and
29+
/// merges the [withReasons], [basis], and [additionalQueryParameters]
30+
/// query parameters onto the base URL's own.
31+
///
32+
/// Composes against the parsed [baseUri] so a custom URL carrying its own
33+
/// query parameters (e.g. a relay proxy with a token) is preserved --
34+
/// including repeated keys, via `queryParametersAll`, which a plain
35+
/// `queryParameters` map would collapse to the last value. String
36+
/// concatenation against the base would instead land the appended path
37+
/// inside the query component.
38+
///
39+
/// Shared by the polling requestor and the streaming source so the two
40+
/// transports build URLs identically.
41+
Uri buildFDv2Uri({
42+
required Uri baseUri,
43+
required String addedPath,
44+
required bool withReasons,
45+
required Selector basis,
46+
Map<String, String> additionalQueryParameters = const {},
47+
}) {
48+
final mergedPath = appendPath(baseUri.path, addedPath);
49+
final mergedQuery = <String, dynamic>{}
50+
..addAll(baseUri.queryParametersAll)
51+
..addAll(additionalQueryParameters);
52+
if (withReasons) {
53+
mergedQuery['withReasons'] = 'true';
54+
}
55+
if (basis.state case final state? when state.isNotEmpty) {
56+
mergedQuery['basis'] = state;
57+
}
58+
return baseUri.replace(
59+
path: mergedPath,
60+
queryParameters: mergedQuery.isEmpty ? null : mergedQuery,
61+
);
62+
}

packages/common_client/lib/src/data_sources/fdv2/entry_factories.dart

Lines changed: 120 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import 'dart:convert';
22

33
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'
44
hide ServiceEndpoints;
5+
import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart';
56

7+
import '../../config/defaults/default_config.dart';
68
import '../../config/service_endpoints.dart';
9+
import '../streaming_data_source.dart' show LDLoggerToEventSourceAdapter;
710
import 'cache_initializer.dart' as cache_src;
11+
import 'endpoints.dart';
12+
import 'protocol_types.dart';
813
import 'source_factory_context.dart';
914
import 'mode_definition.dart' as mode;
1015
import 'polling_base.dart';
@@ -13,6 +18,8 @@ import 'polling_synchronizer.dart';
1318
import 'requestor.dart';
1419
import 'selector.dart';
1520
import 'source.dart';
21+
import 'streaming_base.dart';
22+
import 'streaming_synchronizer.dart';
1623

1724
/// Merges per-entry [mode.EndpointConfig] overrides into [base].
1825
ServiceEndpoints mergeServiceEndpoints(
@@ -50,6 +57,7 @@ FDv2PollingBase _buildPollingBase({
5057
contextJson: ctx.contextJson,
5158
usePost: usePost,
5259
withReasons: ctx.withReasons,
60+
additionalQueryParameters: ctx.additionalQueryParameters,
5361
httpProperties: ctx.httpProperties,
5462
httpClientFactory: ctx.httpClientFactory ?? _defaultHttpClientFactory,
5563
);
@@ -63,6 +71,69 @@ HttpClient _defaultHttpClientFactory(HttpProperties httpProperties) {
6371
return HttpClient(httpProperties: httpProperties);
6472
}
6573

74+
/// Constructs the [SSEClient] used by a streaming source. [uriProvider]
75+
/// is re-invoked on every connection attempt so the `basis` query
76+
/// parameter reflects the current selector. Tests inject a fake.
77+
typedef FDv2SseClientFactory = SSEClient Function({
78+
required Uri Function() uriProvider,
79+
required HttpProperties httpProperties,
80+
required String? body,
81+
required SseHttpMethod method,
82+
required EventSourceLogger logger,
83+
});
84+
85+
/// FDv2 event names subscribed on the streaming connection. Includes the
86+
/// legacy `ping` bridge event.
87+
const Set<String> _fdv2StreamEventNames = {
88+
FDv2EventTypes.serverIntent,
89+
FDv2EventTypes.putObject,
90+
FDv2EventTypes.deleteObject,
91+
FDv2EventTypes.payloadTransferred,
92+
FDv2EventTypes.goodbye,
93+
FDv2EventTypes.error,
94+
FDv2EventTypes.heartbeat,
95+
'ping',
96+
};
97+
98+
/// Constructs the production [SSEClient]. The default for the factory
99+
/// builders; tests inject a fake through the same parameter.
100+
SSEClient defaultSseClientFactory({
101+
required Uri Function() uriProvider,
102+
required HttpProperties httpProperties,
103+
required String? body,
104+
required SseHttpMethod method,
105+
required EventSourceLogger logger,
106+
}) {
107+
return SSEClient(uriProvider(), _fdv2StreamEventNames,
108+
headers: httpProperties.baseHeaders,
109+
body: body,
110+
httpMethod: method,
111+
logger: logger,
112+
uriProvider: uriProvider);
113+
}
114+
115+
/// Builds the streaming URI for the current state. Invoked per
116+
/// connection attempt so the `basis` parameter tracks the selector.
117+
Uri _buildStreamingUri({
118+
required ServiceEndpoints endpoints,
119+
required String contextEncoded,
120+
required bool usePost,
121+
required bool withReasons,
122+
required Selector basis,
123+
Map<String, String> additionalQueryParameters = const {},
124+
}) {
125+
final addedPath = usePost
126+
? FDv2Endpoints.streaming
127+
: FDv2Endpoints.streamingGet(contextEncoded);
128+
return buildFDv2Uri(
129+
baseUri: Uri.parse(endpoints.streaming),
130+
addedPath: addedPath,
131+
withReasons: withReasons,
132+
basis: basis,
133+
additionalQueryParameters: additionalQueryParameters,
134+
);
135+
}
136+
66137
/// A factory for creating [Initializer] instances.
67138
final class InitializerFactory {
68139
/// True for cache initializers.
@@ -130,12 +201,11 @@ InitializerFactory createInitializerFactoryFromEntry(
130201
}
131202

132203
/// Builds a [SynchronizerFactory] for a single [mode.SynchronizerEntry].
133-
///
134-
/// Throws [UnsupportedError] for unsupported entry types.
135204
SynchronizerFactory createSynchronizerFactoryFromEntry(
136205
mode.SynchronizerEntry entry,
137-
SourceFactoryContext ctx,
138-
) {
206+
SourceFactoryContext ctx, {
207+
FDv2SseClientFactory sseClientFactory = defaultSseClientFactory,
208+
}) {
139209
switch (entry) {
140210
case final mode.PollingSynchronizer e:
141211
final interval = e.pollInterval ?? ctx.defaultPollingInterval;
@@ -155,9 +225,47 @@ SynchronizerFactory createSynchronizerFactoryFromEntry(
155225
);
156226
},
157227
);
158-
case mode.StreamingSynchronizer():
159-
throw UnsupportedError(
160-
'FDv2 StreamingSynchronizer factories are not implemented yet',
228+
case final mode.StreamingSynchronizer e:
229+
return SynchronizerFactory(
230+
create: (SelectorGetter selectorGetter) {
231+
final endpointsResolved =
232+
mergeServiceEndpoints(ctx.serviceEndpoints, e.endpoints);
233+
Uri uriProvider() => _buildStreamingUri(
234+
endpoints: endpointsResolved,
235+
contextEncoded: base64UrlEncode(utf8.encode(ctx.contextJson)),
236+
usePost: e.usePost,
237+
withReasons: ctx.withReasons,
238+
basis: selectorGetter(),
239+
additionalQueryParameters: ctx.additionalQueryParameters,
240+
);
241+
final sseClient = sseClientFactory(
242+
uriProvider: uriProvider,
243+
httpProperties: ctx.httpProperties,
244+
body: e.usePost ? ctx.contextJson : null,
245+
method: e.usePost ? SseHttpMethod.post : SseHttpMethod.get,
246+
logger: LDLoggerToEventSourceAdapter(ctx.logger),
247+
);
248+
249+
// Legacy ping events trigger a one-shot poll.
250+
final pingPollingBase = _buildPollingBase(
251+
endpoints: e.endpoints,
252+
usePost: e.usePost,
253+
ctx: ctx,
254+
);
255+
256+
return FDv2StreamingSynchronizer(
257+
base: FDv2StreamingBase(
258+
sseClient: sseClient,
259+
pingHandler: () =>
260+
pingPollingBase.pollOnce(basis: selectorGetter()),
261+
logger: ctx.logger,
262+
// Used when the transport exposes no response headers (the
263+
// browser EventSource) to read x-ld-envid from.
264+
defaultEnvironmentId: DefaultConfig.credentialConfig
265+
.environmentIdFallback(ctx.credential),
266+
),
267+
);
268+
},
161269
);
162270
}
163271
}
@@ -173,9 +281,11 @@ List<InitializerFactory> buildInitializerFactories(
173281
/// One factory per entry, in list order.
174282
List<SynchronizerFactory> buildSynchronizerFactories(
175283
List<mode.SynchronizerEntry> entries,
176-
SourceFactoryContext ctx,
177-
) {
284+
SourceFactoryContext ctx, {
285+
FDv2SseClientFactory sseClientFactory = defaultSseClientFactory,
286+
}) {
178287
return entries
179-
.map((e) => createSynchronizerFactoryFromEntry(e, ctx))
288+
.map((e) => createSynchronizerFactoryFromEntry(e, ctx,
289+
sseClientFactory: sseClientFactory))
180290
.toList();
181291
}

packages/common_client/lib/src/data_sources/fdv2/flag_eval_mapper.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ import 'payload.dart';
66
/// The object kind for client-side flag evaluation results.
77
const String flagEvalKind = 'flag-eval';
88

9+
/// Translates a wire-level [Payload] into a typed [ChangeSet] ready for
10+
/// the flag store.
11+
///
12+
/// Throws if any flag-eval object cannot be parsed, so the data source
13+
/// layer can report it as a data source error and recover the connection.
14+
ChangeSet translatePayload(Payload payload) {
15+
return ChangeSet(
16+
selector: payload.selector,
17+
type: payload.type,
18+
updates: mapUpdatesToItemDescriptors(payload.updates),
19+
);
20+
}
21+
922
/// Converts FDv2 [Update] objects to a map of [ItemDescriptor]s suitable
1023
/// for the flag store.
1124
///

packages/common_client/lib/src/data_sources/fdv2/payload.dart

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
22

3+
import '../../item_descriptor.dart';
34
import 'selector.dart';
45

56
/// The type of payload transfer.
@@ -95,3 +96,35 @@ final class Payload {
9596
String toString() => 'Payload(selector: $selector, type: $type, '
9697
'updates: ${updates.length})';
9798
}
99+
100+
/// A [Payload] translated into typed flag descriptors, ready to apply to
101+
/// the flag store.
102+
///
103+
/// The wire-level [Payload] carries raw [Update] objects; producing a
104+
/// [ChangeSet] converts each flag-eval object into an [ItemDescriptor].
105+
/// The data source layer performs this conversion (see `translatePayload`),
106+
/// so a malformed object is reported as a data source error where the
107+
/// connection can recover.
108+
final class ChangeSet {
109+
/// The selector for this change set, carried over from the originating
110+
/// [Payload]. [Selector.empty] when the source provided none (cached
111+
/// data or an intent of none).
112+
final Selector selector;
113+
114+
/// Whether this is a full, partial, or no-op change set.
115+
final PayloadType type;
116+
117+
/// The typed updates, keyed by flag key. A tombstone (deletion) is an
118+
/// [ItemDescriptor] with a null flag.
119+
final Map<String, ItemDescriptor> updates;
120+
121+
const ChangeSet({
122+
this.selector = Selector.empty,
123+
required this.type,
124+
required this.updates,
125+
});
126+
127+
@override
128+
String toString() => 'ChangeSet(selector: $selector, type: $type, '
129+
'updates: ${updates.length})';
130+
}

0 commit comments

Comments
 (0)