99import com .launchdarkly .sdk .server .subsystems .DataSource ;
1010import com .launchdarkly .sdk .server .subsystems .DataSourceUpdateSinkV2 ;
1111
12- import java .io .Closeable ;
1312import java .io .IOException ;
1413import java .util .ArrayList ;
1514import java .util .Collections ;
@@ -35,7 +34,7 @@ class FDv2DataSource implements DataSource {
3534 private static final long defaultRecoveryTimeout = 5 * 60 ;
3635
3736 private final List <DataSourceFactory <Initializer >> initializers ;
38- private final List < SynchronizerFactoryWithState > synchronizers ;
37+ private final SynchronizerStateManager synchronizerStateManager ;
3938
4039 private final List <ConditionFactory > conditionFactories ;
4140
@@ -44,13 +43,6 @@ class FDv2DataSource implements DataSource {
4443 private final CompletableFuture <Boolean > startFuture = new CompletableFuture <>();
4544 private final AtomicBoolean started = new AtomicBoolean (false );
4645
47- /**
48- * Lock for active sources and shutdown state.
49- */
50- private final Object activeSourceLock = new Object ();
51- private Closeable activeSource ;
52- private boolean isShutdown = false ;
53-
5446 private final int threadPriority ;
5547
5648 private final LDLogger logger ;
@@ -67,7 +59,15 @@ public FDv2DataSource(
6759 LDLogger logger ,
6860 ScheduledExecutorService sharedExecutor
6961 ) {
70- this (initializers , synchronizers , dataSourceUpdates , threadPriority , logger , sharedExecutor , defaultFallbackTimeout , defaultRecoveryTimeout );
62+ this (initializers ,
63+ synchronizers ,
64+ dataSourceUpdates ,
65+ threadPriority ,
66+ logger ,
67+ sharedExecutor ,
68+ defaultFallbackTimeout ,
69+ defaultRecoveryTimeout
70+ );
7171 }
7272
7373
@@ -82,10 +82,11 @@ public FDv2DataSource(
8282 long recoveryTimeout
8383 ) {
8484 this .initializers = initializers ;
85- this . synchronizers = synchronizers
85+ List < SynchronizerFactoryWithState > synchronizerFactories = synchronizers
8686 .stream ()
8787 .map (SynchronizerFactoryWithState ::new )
8888 .collect (Collectors .toList ());
89+ this .synchronizerStateManager = new SynchronizerStateManager (synchronizerFactories );
8990 this .dataSourceUpdates = dataSourceUpdates ;
9091 this .threadPriority = threadPriority ;
9192 this .logger = logger ;
@@ -110,62 +111,13 @@ private void run() {
110111 runThread .start ();
111112 }
112113
113- /**
114- * We start at -1, so finding the next synchronizer can non-conditionally increment the index.
115- */
116- private int sourceIndex = -1 ;
117-
118- /**
119- * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for
120- * the next one to use. This is used when recovering from a non-primary synchronizer.
121- */
122- private void resetSynchronizerSourceIndex () {
123- synchronized (activeSourceLock ) {
124- sourceIndex = -1 ;
125- }
126- }
127-
128- /**
129- * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer,
130- * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers
131- * the source index will be reset, and we start at the beginning.
132- * <p>
133- * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again.
134- * Synchronizers that are not blocked are available, and this function will only return available synchronizers.
135- * @return the next synchronizer factory to use, or null if there are no more available synchronizers.
136- */
137- private SynchronizerFactoryWithState getNextAvailableSynchronizer () {
138- synchronized (synchronizers ) {
139- SynchronizerFactoryWithState factory = null ;
140-
141- // There is at least one available factory.
142- if (synchronizers .stream ().anyMatch (s -> s .getState () == SynchronizerFactoryWithState .State .Available )) {
143- // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.)
144- while (factory == null ) {
145- sourceIndex ++;
146- // We aren't using module here because we want to keep the stored index within range instead
147- // of increasing indefinitely.
148- if (sourceIndex >= synchronizers .size ()) {
149- sourceIndex = 0 ;
150- }
151- SynchronizerFactoryWithState candidate = synchronizers .get (sourceIndex );
152- if (candidate .getState () == SynchronizerFactoryWithState .State .Available ) {
153- factory = candidate ;
154- }
155-
156- }
157- }
158-
159- return factory ;
160- }
161- }
162114
163115 private void runInitializers () {
164116 boolean anyDataReceived = false ;
165117 for (DataSourceFactory <Initializer > factory : initializers ) {
166118 try {
167119 Initializer initializer = factory .build ();
168- if (setActiveSource (initializer )) return ;
120+ if (synchronizerStateManager . setActiveSource (initializer )) return ;
169121 FDv2SourceResult result = initializer .run ().get ();
170122 switch (result .getResultType ()) {
171123 case CHANGE_SET :
@@ -202,24 +154,9 @@ private void runInitializers() {
202154 * @return a list of conditions to apply to the synchronizer
203155 */
204156 private List <Condition > getConditions () {
205- boolean isPrimeSynchronizer = false ;
206- int availableSynchronizers = 0 ;
207- boolean firstAvailableSynchronizer = true ;
208-
209- synchronized (activeSourceLock ) {
210- for (int index = 0 ; index < synchronizers .size (); index ++) {
157+ int availableSynchronizers = synchronizerStateManager .getAvailableSynchronizerCount ();
158+ boolean isPrimeSynchronizer = synchronizerStateManager .isPrimeSynchronizer ();
211159
212- if (synchronizers .get (index ).getState () == SynchronizerFactoryWithState .State .Available ) {
213- if (firstAvailableSynchronizer && sourceIndex == index ) {
214- // This is the first synchronizer that is available, and it also is the current one.
215- isPrimeSynchronizer = true ;
216- }
217- // Subsequently encountered synchronizers that are available are not the first one.
218- firstAvailableSynchronizer = false ;
219- availableSynchronizers ++;
220- }
221- }
222- }
223160 if (availableSynchronizers == 1 ) {
224161 // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions.
225162 return Collections .emptyList ();
@@ -235,24 +172,27 @@ private List<Condition> getConditions() {
235172 }
236173
237174 private boolean runSynchronizers () {
238- SynchronizerFactoryWithState availableSynchronizer = getNextAvailableSynchronizer ();
175+ SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager . getNextAvailableSynchronizer ();
239176 while (availableSynchronizer != null ) {
240177 Synchronizer synchronizer = availableSynchronizer .build ();
241178
242179 // Returns true if shutdown.
243- if (setActiveSource (synchronizer )) return false ;
180+ if (synchronizerStateManager . setActiveSource (synchronizer )) return false ;
244181
245182 try {
246183 boolean running = true ;
247184 // Conditions run once for the life of the synchronizer.
248185 List <Condition > conditions = getConditions ();
249- CompletableFuture <Object > conditionFutures = CompletableFuture .anyOf (
186+
187+ // The conditionsFuture will complete if any condition is met. Meeting any condition means we will
188+ // switch to a different synchronizer.
189+ CompletableFuture <Object > conditionsFuture = CompletableFuture .anyOf (
250190 conditions .stream ().map (Condition ::execute ).toArray (CompletableFuture []::new ));
251191
252192 while (running ) {
253193 CompletableFuture <FDv2SourceResult > nextResultFuture = synchronizer .next ();
254194
255- Object res = CompletableFuture .anyOf (conditionFutures , nextResultFuture ).get ();
195+ Object res = CompletableFuture .anyOf (conditionsFuture , nextResultFuture ).get ();
256196
257197 if (res instanceof Condition ) {
258198 Condition c = (Condition ) res ;
@@ -265,7 +205,7 @@ private boolean runSynchronizers() {
265205 case RECOVERY :
266206 // For recovery, we will start at the first available synchronizer.
267207 // So we reset the source index, and finding the source will start at the beginning.
268- resetSynchronizerSourceIndex ();
208+ synchronizerStateManager . resetSourceIndex ();
269209 break ;
270210 }
271211 // A running synchronizer will only have fallback and recovery conditions that it can act on.
@@ -308,37 +248,16 @@ private boolean runSynchronizers() {
308248 // We have been requested to fall back to FDv1. We handle whatever message was associated,
309249 // close the synchronizer, and then fallback.
310250 if (result .isFdv1Fallback ()) {
311- safeClose (synchronizer );
251+ // When falling back to FDv1, we are done with any FDv2 synchronizers.
252+ synchronizerStateManager .shutdown ();
312253 return true ;
313254 }
314255 }
315256 } catch (ExecutionException | InterruptedException | CancellationException e ) {
316257 // TODO: Log.
317258 // Move to next synchronizer.
318259 }
319- availableSynchronizer = getNextAvailableSynchronizer ();
320- }
321- return false ;
322- }
323-
324- private void safeClose (Closeable synchronizer ) {
325- try {
326- synchronizer .close ();
327- } catch (IOException e ) {
328- // Ignore close exceptions.
329- }
330- }
331-
332- private boolean setActiveSource (Closeable synchronizer ) {
333- synchronized (activeSourceLock ) {
334- if (activeSource != null ) {
335- safeClose (activeSource );
336- }
337- if (isShutdown ) {
338- safeClose (synchronizer );
339- return true ;
340- }
341- activeSource = synchronizer ;
260+ availableSynchronizer = synchronizerStateManager .getNextAvailableSynchronizer ();
342261 }
343262 return false ;
344263 }
@@ -361,17 +280,12 @@ public boolean isInitialized() {
361280 }
362281
363282 @ Override
364- public void close () throws IOException {
283+ public void close () {
365284 // If there is an active source, we will shut it down, and that will result in the loop handling that source
366285 // exiting.
367286 // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When
368287 // it detects shutdown, it will exit the loop.
369- synchronized (activeSourceLock ) {
370- isShutdown = true ;
371- if (activeSource != null ) {
372- activeSource .close ();
373- }
374- }
288+ synchronizerStateManager .shutdown ();
375289
376290 // If this is already set, then this has no impact.
377291 startFuture .complete (false );
0 commit comments