Skip to content

Commit a97e9af

Browse files
authored
feat: Support per-attempt URIs and typed unrecoverable status errors in the SSE client (#296)
## What this adds - **`uriProvider`**: an optional callback on `SSEClient` that is invoked before every connection attempt (including automatic reconnects) and whose result is used in place of the fixed URI. This is core to FDv2: streaming reconnects must carry the current `basis` selector as a query parameter, which advances as payloads are received. Without it, a reconnect re-establishes with a stale basis and the server replays changes the SDK already has. Supported on all platforms — the html implementation builds a fresh `EventSource` per attempt (its error handling restarts with its own backoff rather than relying on native reconnection), so it consults the provider the same way the HTTP implementation does. - **`UnrecoverableStatusError`**: unrecoverable HTTP status codes (anything other than 200, 400, 408, 429, or 5xx) were previously reported as a `ClientException` with the status embedded in the message text. They are now a typed error carrying the status code and response headers, so consumers can distinguish terminal failures from transient ones and read service directives delivered on error responses (e.g. the FDv2 fallback directive). The client's behavior is otherwise unchanged: it still stops reconnecting and reports the error on the stream. ## Platform limitation, now documented When running in the browser, the native `EventSource` exposes no HTTP status codes or response headers, so the html implementation is **incapable of reporting terminal errors**: every failure is treated as recoverable and retried with backoff indefinitely, and `UnrecoverableStatusError` is never produced on web. Consumers that need to react to unrecoverable statuses (e.g. invalid credentials) must detect them through a transport that can observe HTTP responses, such as a polling request. This is now stated on `SSEClient`, `HtmlSseClient`, and `UnrecoverableStatusError`. Both API changes are additive; existing consumers are unaffected. ## Testing New unit tests cover the per-attempt URI resolution (fixed URI without a provider, provider invoked per attempt) and the typed error's status code and headers. The html implementation was additionally compile-checked for the web target. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > Additive API and error-type change in the SSE client package; existing callers without `uriProvider` behave the same aside from a more specific stream error type on HTTP. > > **Overview** > Adds optional **`uriProvider`** on `SSEClient` so each connect and reconnect uses a freshly resolved URI (via `StateValues.connectUri` on HTTP and a new `EventSource` on web), enabling query params such as an advancing FDv2 `basis` selector. > > Non-retryable HTTP failures on the HTTP transport are now surfaced as exported **`UnrecoverableStatusError`** (status + headers) instead of a `ClientException` with the code in the message; reconnect behavior is unchanged. Docs call out that browser `EventSource` cannot observe HTTP responses, so web never emits this error and keeps retrying with backoff. > > Unit tests cover fixed vs per-attempt URIs and typed 401 errors with response headers. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 023ad08. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent ce4dd4d commit a97e9af

9 files changed

Lines changed: 164 additions & 24 deletions

packages/event_source_client/lib/launchdarkly_event_source_client.dart

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import 'src/sse_client_stub.dart'
1212
if (dart.library.js_interop) 'src/sse_client_html.dart';
1313
import 'src/test_sse_client.dart';
1414

15+
export 'src/errors.dart' show UnrecoverableStatusError;
1516
export 'src/events.dart' show Event, MessageEvent, OpenEvent;
1617
export 'src/test_sse_client.dart' show TestSseClient;
1718
export 'src/logging.dart'
@@ -67,6 +68,12 @@ enum SseHttpMethod {
6768
/// In certain cases, unrecoverable errors will be reported on the [stream] at
6869
/// which point the stream will be done.
6970
///
71+
/// On `html` platforms the client is incapable of reporting unrecoverable
72+
/// errors: the browser's native `EventSource` exposes no HTTP status codes
73+
/// or response headers, so every failure is treated as recoverable and
74+
/// retried with backoff indefinitely, and no error is ever reported on the
75+
/// [stream].
76+
///
7077
/// The [SSEClient] will make best effort to maintain the streaming connection.
7178
abstract class SSEClient {
7279
static const defaultHeaders = <String, String>{
@@ -106,8 +113,10 @@ abstract class SSEClient {
106113
/// Factory constructor to return the platform implementation.
107114
///
108115
/// On all platforms, the [uri] and [eventTypes] arguments are required.
109-
/// On majority of platforms, the optional arguments are used.
110-
/// On web, the optional arguments are not used.
116+
/// On majority of platforms, the optional arguments are used. On web,
117+
/// the [headers], [connectTimeout], [readTimeout], [body], and
118+
/// [httpMethod] arguments are not used; the standard `EventSource`
119+
/// does not support them.
111120
///
112121
/// The [uri] specifies where to connect. The [eventTypes] determines which
113122
/// event types will be emitted. For non-web platforms, pass in [headers] to
@@ -126,19 +135,27 @@ abstract class SSEClient {
126135
///
127136
/// An optional [logger] for controlling logging output from the SSE client.
128137
/// If not provided, a [NoOpLogger] will be used.
138+
///
139+
/// An optional [uriProvider]. When provided, it is invoked before every
140+
/// connection attempt -- the first connect and each automatic
141+
/// reconnect -- and its result is used for that attempt; [uri] is used
142+
/// only when no provider is given. This allows query parameters to
143+
/// vary between attempts (e.g. a state selector that advances as data
144+
/// is received). Supported on all platforms.
129145
factory SSEClient(Uri uri, Set<String> eventTypes,
130146
{Map<String, String> headers = defaultHeaders,
131147
Duration connectTimeout = defaultConnectTimeout,
132148
Duration readTimeout = defaultReadTimeout,
133149
String? body,
134150
SseHttpMethod httpMethod = SseHttpMethod.get,
135-
EventSourceLogger? logger}) {
151+
EventSourceLogger? logger,
152+
Uri Function()? uriProvider}) {
136153
// merge headers so consumer gets reasonable defaults
137154
var mergedHeaders = <String, String>{};
138155
mergedHeaders.addAll(defaultHeaders);
139156
mergedHeaders.addAll(headers);
140157
return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout,
141-
readTimeout, body, httpMethod.toString(), logger);
158+
readTimeout, body, httpMethod.toString(), logger, uriProvider);
142159
}
143160

144161
/// Get an SSE client for use in unit tests.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/// Reported on the event stream when the server responds with an HTTP
2+
/// status code that the client will not retry (anything other than 200,
3+
/// 400, 408, 429, or 5xx). After reporting this error the client stops
4+
/// reconnecting until connection desire changes.
5+
///
6+
/// Only produced by implementations whose transport can observe HTTP
7+
/// responses. The browser's native `EventSource` cannot, so on `html`
8+
/// platforms this error is never reported and the client retries every
9+
/// failure indefinitely.
10+
final class UnrecoverableStatusError implements Exception {
11+
/// The HTTP status code of the response.
12+
final int statusCode;
13+
14+
/// The response headers, when available. May carry service directives
15+
/// (e.g. protocol fallback instructions) even on error responses.
16+
final Map<String, String> headers;
17+
18+
const UnrecoverableStatusError(this.statusCode,
19+
[this.headers = const <String, String>{}]);
20+
21+
@override
22+
String toString() => 'UnrecoverableStatusError(statusCode: $statusCode)';
23+
}

packages/event_source_client/lib/src/sse_client_html.dart

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ import 'backoff.dart';
99
import 'events.dart' as ld_message_event;
1010

1111
/// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support.
12+
///
13+
/// The native `EventSource` API does not expose HTTP status codes or
14+
/// response headers, so this implementation is incapable of reporting
15+
/// terminal errors: every failure is treated as recoverable and retried
16+
/// with backoff indefinitely, and no error is ever reported on the
17+
/// [stream]. Consumers that need to react to unrecoverable statuses
18+
/// (e.g. invalid credentials) must detect them through a transport that
19+
/// can observe HTTP responses, such as a polling request.
1220
class HtmlSseClient implements SSEClient {
1321
/// The underlying eventsource
1422
web.EventSource? _eventSource;
@@ -21,15 +29,22 @@ class HtmlSseClient implements SSEClient {
2129
Backoff _backoff = Backoff(math.Random());
2230

2331
final Uri _uri;
32+
final Uri Function()? _uriProvider;
2433
final Set<String> _eventTypes;
2534

2635
int? _activeSince;
2736
Timer? _retryTimer;
2837

29-
/// Creates an instance of an SSEClient that will connect in the future
30-
/// to the [uri] provided.
31-
HtmlSseClient(Uri uri, Set<String> eventTypes, EventSourceLogger? logger)
38+
/// Creates an instance of an SSEClient that will connect in the future.
39+
///
40+
/// Every connection attempt -- the first connect and each reconnect --
41+
/// constructs a fresh `EventSource` from the [uriProvider] result when
42+
/// a provider is given. The fixed [uri] is used only when no provider
43+
/// is given.
44+
HtmlSseClient(Uri uri, Set<String> eventTypes, EventSourceLogger? logger,
45+
{Uri Function()? uriProvider})
3246
: _uri = uri,
47+
_uriProvider = uriProvider,
3348
_eventTypes = eventTypes {
3449
_logger = logger ?? NoOpLogger();
3550
_messageEventsController =
@@ -56,7 +71,8 @@ class HtmlSseClient implements SSEClient {
5671
}
5772

5873
void _setupConnection() {
59-
_eventSource = web.EventSource(_uri.toString());
74+
final connectUri = _uriProvider?.call() ?? _uri;
75+
_eventSource = web.EventSource(connectUri.toString());
6076

6177
for (var eventType in _eventTypes) {
6278
_eventSource?.addEventListener(eventType, _handleMessageEvent.toJS);
@@ -130,6 +146,7 @@ SSEClient getSSEClient(
130146
Duration readTimeout,
131147
String? body,
132148
String method,
133-
EventSourceLogger? logger) =>
149+
EventSourceLogger? logger,
150+
Uri Function()? uriProvider) =>
134151
// dropping unsupported configuration options
135-
HtmlSseClient(uri, eventTypes, logger);
152+
HtmlSseClient(uri, eventTypes, logger, uriProvider: uriProvider);

packages/event_source_client/lib/src/sse_client_http.dart

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class HttpSseClient implements SSEClient {
3939
Duration readTimeout,
4040
String? body,
4141
String httpMethod,
42-
EventSourceLogger? logger)
42+
EventSourceLogger? logger,
43+
{Uri Function()? uriProvider})
4344
: this.internal(
4445
uri,
4546
eventTypes,
@@ -51,7 +52,8 @@ class HttpSseClient implements SSEClient {
5152
math.Random(),
5253
body,
5354
httpMethod,
54-
logger);
55+
logger,
56+
uriProvider: uriProvider);
5557

5658
/// An internal constructor for injecting necessary dependencies for testing.
5759
HttpSseClient.internal(
@@ -65,7 +67,8 @@ class HttpSseClient implements SSEClient {
6567
math.Random random,
6668
String? body,
6769
String httpMethod,
68-
EventSourceLogger? logger) {
70+
EventSourceLogger? logger,
71+
{Uri Function()? uriProvider}) {
6972
_logger = logger ?? NoOpLogger();
7073
_messageEventsController = StreamController<Event>.broadcast(
7174
// this is triggered when first listener subscribes
@@ -88,7 +91,8 @@ class HttpSseClient implements SSEClient {
8891
body,
8992
httpMethod,
9093
_resetRequest.stream,
91-
logger ?? NoOpLogger()));
94+
logger ?? NoOpLogger(),
95+
uriProvider: uriProvider));
9296
}
9397

9498
/// Subscribe to this [stream] to receive events and sometimes errors. The first
@@ -134,6 +138,8 @@ SSEClient getSSEClient(
134138
Duration readTimeout,
135139
String? body,
136140
String method,
137-
EventSourceLogger? logger) =>
141+
EventSourceLogger? logger,
142+
Uri Function()? uriProvider) =>
138143
HttpSseClient(uri, eventTypes, headers, connectTimeout, readTimeout, body,
139-
method, logger);
144+
method, logger,
145+
uriProvider: uriProvider);

packages/event_source_client/lib/src/sse_client_stub.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ SSEClient getSSEClient(
99
Duration readTimeout,
1010
String? body,
1111
String method,
12-
EventSourceLogger? logger) =>
12+
EventSourceLogger? logger,
13+
Uri Function()? uriProvider) =>
1314
throw UnsupportedError(
1415
'LaunchDarkly SSE Client is not supported on this platform.');

packages/event_source_client/lib/src/state_connecting.dart

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:async';
33
import 'package:http/http.dart' as http;
44

55
import 'error_utils.dart';
6+
import 'errors.dart';
67
import 'http_consts.dart';
78
import 'state_backoff.dart';
89
import 'state_connected.dart';
@@ -35,7 +36,7 @@ class StateConnecting {
3536
/// to connect. The returned function will run the next state.
3637
static Future<Function> _tryGetConnected(
3738
StateValues svo, http.Client client) async {
38-
final request = http.Request(svo.httpMethod, svo.uri);
39+
final request = http.Request(svo.httpMethod, svo.connectUri);
3940
request.headers.addAll(svo.headers);
4041
if (svo.body != null) {
4142
request.body = svo.body!;
@@ -50,7 +51,7 @@ class StateConnecting {
5051
}
5152

5253
try {
53-
svo.logger.debug('Sending HTTP request to ${svo.uri}');
54+
svo.logger.debug('Sending HTTP request to ${request.url}');
5455
final response = await client.send(request).timeout(svo.connectTimeout);
5556

5657
// anything besides OK is bad, but some may be recoverable with a retry.
@@ -61,8 +62,8 @@ class StateConnecting {
6162
// looks like the error wasn't recoverable, go to idle and wait
6263
// for something to change
6364
return () => StateIdle.run(svo,
64-
errorCause: http.ClientException(
65-
'Got unrecoverable status code ${response.statusCode}'));
65+
errorCause: UnrecoverableStatusError(
66+
response.statusCode, response.headers));
6667
}
6768

6869
// the error is recoverable, backoff then we'll try again

packages/event_source_client/lib/src/state_value_object.dart

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ typedef ClientFactory = http.Client Function();
1919
class StateValues {
2020
// Non-transient configuration data
2121
final Uri uri;
22+
23+
/// When provided, produces the URI for every connection attempt -- the
24+
/// first connect and each reconnect -- in place of [uri], so callers
25+
/// can vary query parameters between attempts (e.g. a state selector
26+
/// that advances as data is received).
27+
final Uri Function()? uriProvider;
2228
final Set<String> eventTypes;
2329
final Map<String, String> headers;
2430
final Duration connectTimeout;
@@ -50,6 +56,9 @@ class StateValues {
5056
/// Headers received from the connection.
5157
Map<String, String>? connectHeaders;
5258

59+
/// The URI to use for the next connection attempt.
60+
Uri get connectUri => uriProvider?.call() ?? uri;
61+
5362
/// Creates a [_StateValues] instance. Used by the state machine.
5463
StateValues(
5564
this.uri,
@@ -65,6 +74,7 @@ class StateValues {
6574
this.body,
6675
this.httpMethod,
6776
this.resetRequest,
68-
this.logger)
77+
this.logger,
78+
{this.uriProvider})
6979
: backoff = Backoff(random);
7080
}

packages/event_source_client/test/test_utils.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TestUtils {
2020

2121
static StateValues makeMockStateValues(
2222
{Uri? uri,
23+
Uri Function()? uriProvider,
2324
Set<String>? eventTypes,
2425
Map<String, String>? headers,
2526
Duration? connectTimeout,
@@ -45,15 +46,18 @@ class TestUtils {
4546
null,
4647
'GET',
4748
resetStream ?? StreamController<void>.broadcast().stream,
48-
logger ?? NoOpLogger());
49+
logger ?? NoOpLogger(),
50+
uriProvider: uriProvider);
4951
}
5052

5153
static MockClient makeMockHttpClient(
5254
{int httpStatusCode = HttpStatusCodes.okStatus,
5355
Map<String, String> headers = defaultHeaders,
54-
bool blocking = false}) {
56+
bool blocking = false,
57+
void Function(BaseRequest)? onRequest}) {
5558
return MockClient.streaming((request, bodyStream) async {
5659
return bodyStream.bytesToString().then((bodyString) async {
60+
onRequest?.call(request);
5761
if (blocking) {
5862
await Completer().future; // blocks indefinitely
5963
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// ignore_for_file: close_sinks
2+
3+
import 'dart:async';
4+
5+
import 'package:http/http.dart';
6+
import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart';
7+
import 'package:launchdarkly_event_source_client/src/state_connecting.dart';
8+
import 'package:test/test.dart';
9+
10+
import 'test_utils.dart';
11+
12+
void main() {
13+
test('uses the fixed uri when no provider is given', () async {
14+
final requestedPaths = <String>[];
15+
final svo = TestUtils.makeMockStateValues(
16+
uri: Uri.parse('/fixed'),
17+
clientFactory: () => TestUtils.makeMockHttpClient(
18+
onRequest: (BaseRequest request) =>
19+
requestedPaths.add(request.url.path)));
20+
21+
await StateConnecting.run(svo);
22+
23+
expect(requestedPaths, equals(['/fixed']));
24+
});
25+
26+
test('invokes the uriProvider for each connection attempt', () async {
27+
final requestedPaths = <String>[];
28+
var attempt = 0;
29+
final svo = TestUtils.makeMockStateValues(
30+
uri: Uri.parse('/unused'),
31+
uriProvider: () => Uri.parse('/attempt-${++attempt}'),
32+
clientFactory: () => TestUtils.makeMockHttpClient(
33+
onRequest: (BaseRequest request) =>
34+
requestedPaths.add(request.url.path)));
35+
36+
await StateConnecting.run(svo);
37+
await StateConnecting.run(svo);
38+
39+
expect(requestedPaths, equals(['/attempt-1', '/attempt-2']));
40+
});
41+
42+
test(
43+
'reports UnrecoverableStatusError with the status code and headers '
44+
'for non-retryable status codes', () async {
45+
final eventsController = StreamController<Event>.broadcast();
46+
final svo = TestUtils.makeMockStateValues(
47+
eventSink: eventsController,
48+
clientFactory: () => TestUtils.makeMockHttpClient(
49+
httpStatusCode: 401, headers: {'x-ld-fd-fallback': 'true'}));
50+
51+
final expectation = expectLater(
52+
eventsController.stream,
53+
emitsError(isA<UnrecoverableStatusError>()
54+
.having((error) => error.statusCode, 'statusCode', 401)
55+
.having((error) => error.headers['x-ld-fd-fallback'],
56+
'fallback header', 'true')));
57+
58+
await StateConnecting.run(svo);
59+
await expectation;
60+
});
61+
}

0 commit comments

Comments
 (0)