Skip to content

Commit 0956ce9

Browse files
authored
fix: Expand data system config to allow for list of synchronizers (#343)
1 parent 709c4a2 commit 0956ce9

7 files changed

Lines changed: 164 additions & 152 deletions

File tree

.github/workflows/common_ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ jobs:
6666
enable_persistence_tests: 'true'
6767
test_service_port: ${{ env.TEST_SERVICE_PORT }}
6868
token: ${{ secrets.GITHUB_TOKEN }}
69-
version: v3.0.0-alpha.1
69+
version: v3.0.0-alpha.3
7070

7171
- name: Upload test service logs
7272
uses: actions/upload-artifact@v4

internal/datasystem/fdv2_datasystem.go

Lines changed: 98 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ type FDv2 struct {
4343
// List of initializers that are capable of obtaining an initial payload of data.
4444
initializers []subsystems.DataInitializer
4545

46-
// The primary synchronizer responsible for keeping data up-to-date.
47-
primarySyncBuilder func() (subsystems.DataSynchronizer, error)
46+
// Mutable list of synchronizer builders. Items are removed when they permanently fail.
47+
// When reverting to FDv1, this list is replaced with a single FDv1 synchronizer.
48+
synchronizerBuilders []func() (subsystems.DataSynchronizer, error)
49+
currentSyncIndex int
50+
51+
// FDv1 fallback builder, used only when a synchronizer requests revert to FDv1
52+
fdv1FallbackBuilder func() (subsystems.DataSynchronizer, error)
4853

4954
// Boolean used to track whether the datasystem was originally configured
5055
// with some sort of valid data source.
@@ -53,12 +58,6 @@ type FDv2 struct {
5358
// they permanently fail.
5459
configuredWithDataSources bool
5560

56-
// The secondary synchronizer, in case the primary is unavailable.
57-
secondarySyncBuilder func() (subsystems.DataSynchronizer, error)
58-
59-
// The fdv1 fallback synchronizer, in case we have to fall back to fdv1.
60-
fdv1SyncBuilder func() (subsystems.DataSynchronizer, error)
61-
6261
// Whether the SDK should make use of persistent store/initializers/synchronizers or not.
6362
disabled bool
6463

@@ -139,10 +138,11 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems
139138
}
140139

141140
fdv2.initializers = cfg.Initializers
142-
fdv2.primarySyncBuilder = cfg.Synchronizers.PrimaryBuilder
143-
fdv2.secondarySyncBuilder = cfg.Synchronizers.SecondaryBuilder
144-
fdv2.fdv1SyncBuilder = cfg.Synchronizers.FDv1FallbackBuilder
141+
fdv2.synchronizerBuilders = cfg.Synchronizers.SynchronizerBuilders
142+
fdv2.currentSyncIndex = 0
143+
fdv2.fdv1FallbackBuilder = cfg.Synchronizers.FDv1FallbackBuilder
145144
fdv2.disabled = disabled
145+
146146
fdv2.fallbackCond = func(status interfaces.DataSourceStatus) bool {
147147
interruptedAtRuntime := status.State == interfaces.DataSourceStateInterrupted &&
148148
time.Since(status.StateSince) > 1*time.Minute
@@ -162,7 +162,7 @@ func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems
162162
return interruptedAtRuntime || healthyForTooLong || cannotInitialize
163163
}
164164

165-
fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || fdv2.primarySyncBuilder != nil
165+
fdv2.configuredWithDataSources = len(fdv2.initializers) > 0 || len(fdv2.synchronizerBuilders) > 0
166166

167167
if cfg.Store != nil && !disabled {
168168
// If there's a persistent Store, we should provide a status monitor and inform Store that it's present.
@@ -263,9 +263,8 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}
263263
}
264264

265265
func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) {
266-
// If the SDK was configured with no synchronizer, then (assuming no initializer succeeded, which would have
267-
// already closed the channel), we should close it now so that MakeClient unblocks.
268-
if f.primarySyncBuilder == nil {
266+
// If no synchronizers configured, close ready channel and return
267+
if len(f.synchronizerBuilders) == 0 {
269268
f.readyOnce.Do(func() {
270269
close(closeWhenReady)
271270
})
@@ -279,82 +278,73 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{
279278
})
280279

281280
for {
282-
primarySync, err := f.primarySyncBuilder()
283-
if err != nil {
284-
f.loggers.Errorf("Failed to build the primary synchronizer: %v", err)
285-
return
286-
}
287-
288-
f.loggers.Debugf("Primary synchronizer %s is starting", primarySync.Name())
289-
resultChan := primarySync.Sync(f.store)
290-
removeSync, fallbackv1, err := f.consumeSynchronizerResults(ctx, resultChan, f.fallbackCond, closeWhenReady)
291-
292-
if err := primarySync.Close(); err != nil {
293-
f.loggers.Errorf("Primary synchronizer %s failed to gracefully close: %v", primarySync.Name(), err)
294-
}
295-
if errors.Is(err, context.Canceled) {
281+
// Check if we've run out of synchronizers
282+
if len(f.synchronizerBuilders) == 0 {
283+
f.loggers.Warn("No more synchronizers available")
284+
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
296285
return
297286
}
298287

299-
if removeSync {
300-
f.primarySyncBuilder = f.secondarySyncBuilder
301-
f.secondarySyncBuilder = nil
302-
303-
if fallbackv1 {
304-
f.primarySyncBuilder = f.fdv1SyncBuilder
305-
}
306-
307-
if f.primarySyncBuilder == nil {
308-
f.loggers.Debugf("No more synchronizers available, closing the channel")
309-
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
310-
f.readyOnce.Do(func() {
311-
close(closeWhenReady)
312-
})
313-
return
314-
}
315-
} else {
316-
f.loggers.Debugf("Fallback condition met")
317-
}
318-
319-
if f.secondarySyncBuilder == nil {
320-
continue
288+
// Ensure currentSyncIndex is within bounds (shouldn't happen with proper logic)
289+
if f.currentSyncIndex >= len(f.synchronizerBuilders) {
290+
f.currentSyncIndex = 0
321291
}
322292

323-
secondarySync, err := f.secondarySyncBuilder()
293+
// Build synchronizer
294+
sync, err := f.synchronizerBuilders[f.currentSyncIndex]()
324295
if err != nil {
325-
f.loggers.Errorf("Failed to build the secondary synchronizer: %v", err)
326-
return
296+
f.loggers.Errorf("Failed to build synchronizer at index %d: %v", f.currentSyncIndex, err)
297+
// Remove the failed builder from the list
298+
f.synchronizerBuilders = append(
299+
f.synchronizerBuilders[:f.currentSyncIndex],
300+
f.synchronizerBuilders[f.currentSyncIndex+1:]...)
301+
// Don't increment currentSyncIndex - it now points to the next synchronizer
302+
continue
327303
}
328-
f.loggers.Debugf("Secondary synchronizer %s is starting", secondarySync.Name())
329304

330-
resultChan = secondarySync.Sync(f.store)
331-
removeSync, fallbackv1, err = f.consumeSynchronizerResults(ctx, resultChan, f.recoveryCond, closeWhenReady)
305+
f.loggers.Infof("Synchronizer at index %d (%s) is starting", f.currentSyncIndex, sync.Name())
306+
resultChan := sync.Sync(f.store)
307+
action, err := f.consumeSynchronizerResults(ctx, resultChan, closeWhenReady)
332308

333-
if err := secondarySync.Close(); err != nil {
334-
f.loggers.Errorf("Secondary synchronizer %s failed to gracefully close: %v", secondarySync.Name(), err)
309+
if err := sync.Close(); err != nil {
310+
f.loggers.Errorf("Synchronizer %s failed to close: %v", sync.Name(), err)
335311
}
312+
336313
if errors.Is(err, context.Canceled) {
337314
return
338315
}
339316

340-
if removeSync {
341-
f.secondarySyncBuilder = nil
342-
343-
if fallbackv1 {
344-
f.primarySyncBuilder = f.fdv1SyncBuilder
345-
346-
if f.primarySyncBuilder == nil {
347-
f.loggers.Debugf("No more synchronizers available, closing the channel")
348-
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
349-
f.readyOnce.Do(func() {
350-
close(closeWhenReady)
351-
})
352-
return
353-
}
317+
// Handle action based on conditions
318+
switch action {
319+
case syncFDv1:
320+
if f.fdv1FallbackBuilder != nil {
321+
f.loggers.Warn("Reverting to FDv1 protocol")
322+
// Replace entire list with single FDv1 synchronizer
323+
f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder}
324+
f.currentSyncIndex = 0
325+
continue
354326
}
327+
f.loggers.Warn("Synchronizer requested FDv1 fallback but none configured")
328+
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
329+
return
330+
case syncRemove:
331+
f.loggers.Warnf("Permanently removing synchronizer at index %d", f.currentSyncIndex)
332+
f.synchronizerBuilders = append(
333+
f.synchronizerBuilders[:f.currentSyncIndex],
334+
f.synchronizerBuilders[f.currentSyncIndex+1:]...)
335+
// Don't increment currentSyncIndex - it now points to the next synchronizer
336+
continue
337+
case syncRecover:
338+
// Recovery: jump back to index 0
339+
f.loggers.Info("Recovery condition met, returning to first synchronizer")
340+
f.currentSyncIndex = 0
341+
case syncFallback:
342+
// Fallback: move to next index
343+
f.loggers.Info("Fallback condition met, trying next synchronizer")
344+
f.currentSyncIndex++
355345
}
356346

357-
f.loggers.Debugf("Recovery condition met")
347+
// Check for cancellation before next iteration
358348
select {
359349
case <-ctx.Done():
360350
return
@@ -364,25 +354,33 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{
364354
})
365355
}
366356

357+
type syncAction int
358+
359+
const (
360+
syncFallback syncAction = iota
361+
syncRecover
362+
syncRemove
363+
syncFDv1
364+
)
365+
367366
func (f *FDv2) consumeSynchronizerResults(
368367
ctx context.Context,
369368
resultChan <-chan subsystems.DataSynchronizerResult,
370-
cond func(status interfaces.DataSourceStatus) bool,
371369
closeWhenReady chan<- struct{},
372-
) (bool, bool, error) {
370+
) (action syncAction, err error) {
373371
ticker := time.NewTicker(10 * time.Second)
374372
defer ticker.Stop()
375373

376374
for {
377375
select {
378376
case <-ctx.Done():
379-
return false, false, ctx.Err()
377+
return syncFallback, ctx.Err()
380378
case result, ok := <-resultChan:
381379
// The status channel being closed means that we won't be receiving
382380
// any more information from that synchronizer and we should
383381
// probably fall back.
384382
if !ok {
385-
return false, false, nil
383+
return syncFallback, nil
386384
}
387385

388386
if result.EnvironmentID.IsDefined() {
@@ -404,15 +402,33 @@ func (f *FDv2) consumeSynchronizerResults(
404402
f.UpdateStatus(result.State, result.Error)
405403
case interfaces.DataSourceStateOff:
406404
f.UpdateStatus(interfaces.DataSourceStateInterrupted, result.Error)
407-
return true, result.RevertToFDv1, nil
405+
if result.RevertToFDv1 {
406+
return syncFDv1, nil
407+
}
408+
return syncRemove, nil
408409
}
409410
case <-ticker.C:
411+
// If there's only one synchronizer, don't check conditions
412+
if len(f.synchronizerBuilders) == 1 {
413+
continue
414+
}
415+
410416
status := f.getStatus()
411417
f.loggers.Debugf("Data source status used to evaluate condition: %s", status.String())
412-
if cond(status) {
413-
return false, false, nil
418+
419+
// Check fallback condition first (things are bad)
420+
if f.fallbackCond(status) {
421+
f.loggers.Debugf("Fallback condition met")
422+
return syncFallback, nil
423+
}
424+
425+
// If not at index 0, also check recovery condition (things are good)
426+
if f.currentSyncIndex > 0 && f.recoveryCond(status) {
427+
f.loggers.Debugf("Recovery condition met")
428+
return syncRecover, nil
414429
}
415-
f.loggers.Debugf("Condition check succeeded, continue with current synchronizer")
430+
431+
f.loggers.Debugf("No condition met, continue with current synchronizer")
416432
}
417433
}
418434
}

ldclient_end_to_end_fdv2_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,12 @@ func TestFDV2ShutdownDownIfBothSynchronizersFail(t *testing.T) {
200200
expectedStreamError := "Error in stream connection (giving up permanently): HTTP error 401 (invalid SDK key)"
201201
expectedPollError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)"
202202
assert.Equal(t, []string{expectedStreamError, expectedPollError}, logCapture.GetOutput(ldlog.Error))
203-
assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn))
203+
assert.Equal(t, []string{
204+
"Permanently removing synchronizer at index 0",
205+
"Permanently removing synchronizer at index 0",
206+
"No more synchronizers available",
207+
initializationFailedErrorMessage,
208+
}, logCapture.GetOutput(ldlog.Warn))
204209
})
205210
}
206211

@@ -280,7 +285,11 @@ func TestFDV2PollingSynchronizerFailsToStartWith401Error(t *testing.T) {
280285

281286
expectedError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)"
282287
assert.Equal(t, []string{expectedError}, logCapture.GetOutput(ldlog.Error))
283-
assert.Equal(t, []string{initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn))
288+
assert.Equal(t, []string{
289+
"Permanently removing synchronizer at index 0",
290+
"No more synchronizers available",
291+
initializationFailedErrorMessage,
292+
}, logCapture.GetOutput(ldlog.Warn))
284293
})
285294
}
286295

@@ -383,7 +392,6 @@ func TestFDV2FileInitializerWillDeferToFirstSynchronizer(t *testing.T) {
383392
).
384393
Synchronizers(
385394
ldcomponents.StreamingDataSourceV2().BaseURI(server.URL),
386-
nil,
387395
),
388396
}
389397

0 commit comments

Comments
 (0)