Skip to content

Commit 84e9c05

Browse files
authored
feat: Add support for FDv1 fallback. (#116)
BEGIN_COMMIT_OVERRIDE chore: Add support for FDv1 fallback. chore: Implement FDv2 Data Source Status. chore: Extend FDv2 Data source logging. END_COMMIT_OVERRIDE I would like to still add more detail to logging, but I added the basics in this PR. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches core data acquisition/failover and status-reporting paths; mis-handling could cause unexpected source switching, missed updates, or incorrect SDK health state during outages/closures. > > **Overview** > Adds **FDv1 fallback** support to the FDv2 data system: when an FDv2 synchronizer emits the “fallback to FDv1” signal, the SDK can switch to a configured FDv1 polling data source (built from existing `DataSource` components) via a new `DataSourceSynchronizerAdapter` and a new `fDv1FallbackSynchronizer` configuration path. > > Refactors FDv2 source selection by replacing `SynchronizerStateManager` with `SourceManager` (now handling both initializers and synchronizers, blocking/unblocking, and active-source cleanup), and implements/expands FDv2 **data source status + logging** so initializer/synchronizer failures and exhaustion produce `INTERRUPTED`/`OFF`/`VALID` updates while `close()` cleanly reports `OFF` without spurious errors. > > Contract tests are updated to configure FDv1 fallback and the prior FDv2 fallback suppression is removed; unit tests are added/updated extensively to cover adapter lifecycle/thread cancellation, fallback behavior, and new status transitions. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit dfe0534. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 902b1a6 commit 84e9c05

13 files changed

Lines changed: 2236 additions & 727 deletions

lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.launchdarkly.sdk.server.FlagsStateOption;
1212
import com.launchdarkly.sdk.server.LDClient;
1313
import com.launchdarkly.sdk.server.LDConfig;
14+
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
1415
import com.launchdarkly.sdk.server.migrations.Migration;
1516
import com.launchdarkly.sdk.server.migrations.MigrationBuilder;
1617
import com.launchdarkly.sdk.server.migrations.MigrationExecution;
@@ -25,12 +26,15 @@
2526
import com.launchdarkly.sdk.server.integrations.HooksConfigurationBuilder;
2627
import com.launchdarkly.sdk.server.integrations.ServiceEndpointsBuilder;
2728
import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder;
29+
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
2830
import com.launchdarkly.sdk.server.integrations.DataSystemBuilder;
2931
import com.launchdarkly.sdk.server.DataSystemComponents;
3032
import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
3133
import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
3234
import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
3335
import com.launchdarkly.sdk.server.interfaces.BigSegmentStoreStatusProvider;
36+
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
37+
import com.launchdarkly.sdk.server.subsystems.DataSource;
3438
import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
3539
import com.launchdarkly.sdk.server.datasources.Initializer;
3640
import com.launchdarkly.sdk.server.datasources.Synchronizer;
@@ -563,6 +567,22 @@ private LDConfig buildSdkConfig(SdkConfigParams params, String tag) {
563567
}
564568
}
565569

570+
// Configure FDv1 fallback synchronizer
571+
SdkConfigSynchronizerParams fallbackSynchronizer =
572+
selectFallbackSynchronizer(params.dataSystem);
573+
if (fallbackSynchronizer != null) {
574+
// Set global polling endpoints if the fallback synchronizer has polling with custom base URI
575+
if (fallbackSynchronizer.polling != null &&
576+
fallbackSynchronizer.polling.baseUri != null) {
577+
endpoints.polling(fallbackSynchronizer.polling.baseUri);
578+
}
579+
580+
// Create and configure FDv1 fallback
581+
ComponentConfigurer<DataSource> fdv1Fallback =
582+
createFDv1FallbackSynchronizer(fallbackSynchronizer);
583+
dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Fallback);
584+
}
585+
566586
builder.dataSystem(dataSystemBuilder);
567587
}
568588

@@ -601,4 +621,59 @@ private DataSourceBuilder<Synchronizer> createSynchronizer(
601621
}
602622
return null;
603623
}
624+
625+
/**
626+
* Selects the best synchronizer configuration to use for FDv1 fallback.
627+
* Prefers polling synchronizers, falls back to primary synchronizer.
628+
*/
629+
private static SdkConfigSynchronizerParams selectFallbackSynchronizer(
630+
SdkConfigDataSystemParams dataSystemParams) {
631+
632+
// Prefer secondary polling synchronizer
633+
if (dataSystemParams.synchronizers != null &&
634+
dataSystemParams.synchronizers.secondary != null &&
635+
dataSystemParams.synchronizers.secondary.polling != null) {
636+
return dataSystemParams.synchronizers.secondary;
637+
}
638+
639+
// Fall back to primary polling synchronizer
640+
if (dataSystemParams.synchronizers != null &&
641+
dataSystemParams.synchronizers.primary != null &&
642+
dataSystemParams.synchronizers.primary.polling != null) {
643+
return dataSystemParams.synchronizers.primary;
644+
}
645+
646+
// Fall back to primary synchronizer (even if streaming)
647+
if (dataSystemParams.synchronizers != null &&
648+
dataSystemParams.synchronizers.primary != null) {
649+
return dataSystemParams.synchronizers.primary;
650+
}
651+
652+
return null;
653+
}
654+
655+
/**
656+
* Creates the FDv1 fallback synchronizer based on the selected synchronizer config.
657+
* FDv1 fallback is always polling-based and uses the global service endpoints configuration.
658+
*/
659+
private static ComponentConfigurer<DataSource> createFDv1FallbackSynchronizer(
660+
SdkConfigSynchronizerParams synchronizer) {
661+
662+
// FDv1 fallback is always polling-based
663+
PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource();
664+
665+
// Configure polling interval if the synchronizer has polling configuration
666+
if (synchronizer.polling != null) {
667+
if (synchronizer.polling.pollIntervalMs != null) {
668+
fdv1Polling.pollInterval(Duration.ofMillis(synchronizer.polling.pollIntervalMs));
669+
}
670+
// Note: FDv1 polling doesn't support per-source service endpoints override,
671+
// so it will use the global service endpoints configuration (which is set
672+
// by the caller before this method is invoked)
673+
}
674+
// If streaming synchronizer, use default polling interval
675+
// (no additional configuration needed)
676+
677+
return fdv1Polling;
678+
}
604679
}
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +0,0 @@
1-
streaming/validation/unrecognized data that can be safely ignored/unknown event name with JSON body
2-
streaming/validation/unrecognized data that can be safely ignored/unknown event name with non-JSON body
3-
streaming/validation/unrecognized data that can be safely ignored/patch event with unrecognized path kind
4-
streaming/fdv2/fallback to FDv1 handling
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
4+
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
5+
import com.launchdarkly.sdk.server.datasources.Synchronizer;
6+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
7+
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
8+
import com.launchdarkly.sdk.server.subsystems.DataSource;
9+
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
11+
12+
import java.io.IOException;
13+
import java.time.Instant;
14+
import java.util.AbstractMap;
15+
import java.util.Collections;
16+
import java.util.Map;
17+
import java.util.concurrent.CancellationException;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.Future;
21+
22+
/**
23+
* Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol).
24+
* <p>
25+
* This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface
26+
* by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects.
27+
* <p>
28+
* The adapter is constructed with a factory function that receives the listening update sink and
29+
* creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it.
30+
*/
31+
class DataSourceSynchronizerAdapter implements Synchronizer {
32+
private final DataSource dataSource;
33+
private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>();
34+
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
35+
private final Object startLock = new Object();
36+
private boolean started = false;
37+
private boolean closed = false;
38+
private Future<Void> startFuture;
39+
40+
/**
41+
* Functional interface for creating a DataSource with a given update sink.
42+
*/
43+
@FunctionalInterface
44+
public interface DataSourceFactory {
45+
DataSource create(DataSourceUpdateSink updateSink);
46+
}
47+
48+
/**
49+
* Creates a new adapter that wraps a DataSource.
50+
*
51+
* @param dataSourceFactory factory that creates the DataSource with the provided update sink
52+
*/
53+
public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) {
54+
ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue);
55+
this.dataSource = dataSourceFactory.create(convertingSink);
56+
}
57+
58+
@Override
59+
public CompletableFuture<FDv2SourceResult> next() {
60+
synchronized (startLock) {
61+
if (!started && !closed) {
62+
started = true;
63+
startFuture = dataSource.start();
64+
65+
// Monitor the start future for errors
66+
// The data source will emit updates through the listening sink
67+
Thread monitorThread = new Thread(() -> {
68+
try {
69+
startFuture.get();
70+
} catch (ExecutionException e) {
71+
// Initialization failed - emit an interrupted status
72+
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
73+
DataSourceStatusProvider.ErrorKind.UNKNOWN,
74+
0,
75+
e.getCause() != null ? e.getCause().toString() : e.toString(),
76+
Instant.now()
77+
);
78+
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, false));
79+
} catch (CancellationException e) {
80+
// Start future was canceled (during close) - exit cleanly
81+
} catch (InterruptedException e) {
82+
Thread.currentThread().interrupt();
83+
}
84+
});
85+
monitorThread.setName("LaunchDarkly-SDK-Server-DataSourceAdapter-Monitor");
86+
monitorThread.setDaemon(true);
87+
monitorThread.start();
88+
}
89+
}
90+
91+
return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
92+
.thenApply(result -> (FDv2SourceResult) result);
93+
}
94+
95+
@Override
96+
public void close() {
97+
synchronized (startLock) {
98+
if (closed) {
99+
return;
100+
}
101+
closed = true;
102+
}
103+
104+
try {
105+
dataSource.close();
106+
} catch (IOException e) {
107+
// Ignore as we are shutting down.
108+
}
109+
shutdownFuture.complete(FDv2SourceResult.shutdown());
110+
if(startFuture != null) {
111+
// If the start future is done, this has no effect.
112+
// If it is not, then this will unblock the code waiting on start.
113+
startFuture.cancel(true);
114+
}
115+
}
116+
117+
/**
118+
* A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects.
119+
* This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results.
120+
*/
121+
private static class ConvertingUpdateSink implements DataSourceUpdateSink {
122+
private final IterableAsyncQueue<FDv2SourceResult> resultQueue;
123+
124+
public ConvertingUpdateSink(IterableAsyncQueue<FDv2SourceResult> resultQueue) {
125+
this.resultQueue = resultQueue;
126+
}
127+
128+
@Override
129+
public boolean init(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData) {
130+
// Convert the full data set into a ChangeSet and emit it
131+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
132+
new DataStoreTypes.ChangeSet<>(
133+
DataStoreTypes.ChangeSetType.Full,
134+
Selector.EMPTY,
135+
allData.getData(),
136+
null);
137+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
138+
return true;
139+
}
140+
141+
@Override
142+
public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) {
143+
// Convert the upsert into a ChangeSet with a single item and emit it
144+
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items =
145+
new DataStoreTypes.KeyedItems<>(Collections.<Map.Entry<String, DataStoreTypes.ItemDescriptor>>singletonList(
146+
new AbstractMap.SimpleEntry<>(key, item)));
147+
Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data =
148+
Collections.<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>singletonList(
149+
new AbstractMap.SimpleEntry<>(kind, items));
150+
151+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
152+
new DataStoreTypes.ChangeSet<>(
153+
DataStoreTypes.ChangeSetType.Partial,
154+
Selector.EMPTY,
155+
data,
156+
null);
157+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
158+
return true;
159+
}
160+
161+
@Override
162+
public DataStoreStatusProvider getDataStoreStatusProvider() {
163+
// This adapter doesn't use a data store
164+
return null;
165+
}
166+
167+
@Override
168+
public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) {
169+
// Convert state changes to FDv2SourceResult status events
170+
switch (newState) {
171+
case INTERRUPTED:
172+
resultQueue.put(FDv2SourceResult.interrupted(newError, false));
173+
break;
174+
case OFF:
175+
if (newError != null) {
176+
resultQueue.put(FDv2SourceResult.terminalError(newError, false));
177+
}
178+
break;
179+
case VALID:
180+
case INITIALIZING:
181+
// These states don't map to FDv2SourceResult status events
182+
break;
183+
}
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)