Skip to content

Commit 211783a

Browse files
feat: Add FDv2 Streaming Synchronizer (#106)
<!-- CURSOR_SUMMARY --> > [!NOTE] > **Adds FDv2 streaming support and refines polling behavior** > > - New `StreamingSynchronizerImpl` maintains an SSE connection via `EventSource`, parses FDv2 events, emits `CHANGE_SET`/status results, handles recoverable/non-recoverable errors, restarts on `GOODBYE`, and lazily starts on `next()`; supports selector and `filter` query params > - Polling updates: map specific `INTERNAL_ERROR` types (`MISSING_PAYLOAD`, `JSON_ERROR`) to `INVALID_DATA`; `PollingSynchronizerImpl` now completes a shutdown future on terminal errors to stop further polling > - API docs clarified for `Initializer` and `Synchronizer` (state diagrams; `next()` usage on shutdown/terminal) > - Extensive tests added for streaming behaviors (events, reconnection, errors, selector/filter) and for polling internal-error kinds and shutdown/continuation > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit d4dff45. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Todd Anderson <tanderson@launchdarkly.com>
1 parent c987dc4 commit 211783a

8 files changed

Lines changed: 1497 additions & 15 deletions

File tree

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingBase.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,20 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
124124
case NONE:
125125
break;
126126
case INTERNAL_ERROR: {
127+
FDv2ProtocolHandler.FDv2ActionInternalError internalErrorAction = (FDv2ProtocolHandler.FDv2ActionInternalError) res;
128+
DataSourceStatusProvider.ErrorKind kind = DataSourceStatusProvider.ErrorKind.UNKNOWN;
129+
switch (internalErrorAction.getErrorType()) {
130+
case MISSING_PAYLOAD:
131+
case JSON_ERROR:
132+
kind = DataSourceStatusProvider.ErrorKind.INVALID_DATA;
133+
break;
134+
case UNKNOWN_EVENT:
135+
case IMPLEMENTATION_ERROR:
136+
case PROTOCOL_ERROR:
137+
break;
138+
}
127139
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
128-
DataSourceStatusProvider.ErrorKind.UNKNOWN,
140+
kind,
129141
0,
130142
"Internal error occurred during polling",
131143
new Date().toInstant());

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ public PollingSynchronizerImpl(
3838
private void doPoll() {
3939
try {
4040
FDv2SourceResult res = poll(selectorSource.getSelector(), false).get();
41-
switch(res.getResultType()) {
41+
boolean shouldShutdown = false;
42+
switch (res.getResultType()) {
4243
case CHANGE_SET:
4344
break;
4445
case STATUS:
45-
switch(res.getStatus().getState()) {
46+
switch (res.getStatus().getState()) {
4647
case INTERRUPTED:
4748
break;
4849
case SHUTDOWN:
@@ -54,6 +55,7 @@ private void doPoll() {
5455
task.cancel(true);
5556
}
5657
internalShutdown();
58+
shouldShutdown = true;
5759
break;
5860
case GOODBYE:
5961
// We don't need to take any action, as the connection for the poll
@@ -63,7 +65,11 @@ private void doPoll() {
6365
}
6466
break;
6567
}
66-
resultQueue.put(res);
68+
if (shouldShutdown) {
69+
shutdownFuture.complete(res);
70+
} else {
71+
resultQueue.put(res);
72+
}
6773
} catch (InterruptedException | ExecutionException e) {
6874
// TODO: Determine if handling is needed.
6975
}

0 commit comments

Comments
 (0)