From 1b5458bb5cd5e97470535d39436639fce888fcad Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 12:15:47 -0400 Subject: [PATCH 01/14] fix: Honor x-ld-fd-fallback header in FDv2 initializer phase and on successful responses The .NET SDK previously only inspected the x-ld-fd-fallback header on FDv2 synchronizer error responses. Two gaps: - The header was ignored on successful (200) responses. The server can include it alongside a valid payload, expecting the SDK to apply the payload and then switch to FDv1. - The header was ignored entirely during the initializer phase. An initializer that received the directive would just fail through to the next initializer or to the FDv2 synchronizer chain rather than switching directly to the FDv1 fallback synchronizer. Polling and streaming data sources now check the response/connection headers on the success path and emit Off+FDv1Fallback after applying the payload. The FDv2 composite attaches the FDv1 fallback applier to the initializers entry as well as the synchronizers entry, with extra-skip semantics so an initializer-phase fallback bypasses the synchronizer chain entirely. A shared latch coordinates the FDv1 fallback applier with the surrounding fallback/recovery appliers so exhaustion signals from disposed entries cannot block the fallback synchronizer that just started. --- .../FDv2DataSource.InitializationTracker.cs | 2 +- .../FDv2DataSources/FDv2DataSource.cs | 130 ++++++++++++++++-- .../FDv2DataSources/FDv2PollingDataSource.cs | 29 +++- .../FDv2StreamingDataSource.cs | 29 ++++ 4 files changed, 175 insertions(+), 15 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs index 349ec0f7..a9c0e806 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs @@ -264,7 +264,7 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n { DetermineState(Action.FallingBack); } - + DetermineState(Action.SynchronizersExhausted); break; diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index b88dc61e..504b30d0 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -35,13 +35,29 @@ public static IDataSource CreateFDv2DataSource( // Here we make a combined composite source, with the initializer source first which switches or falls back to the // synchronizer source when the initializer succeeds or when the initializer source reports Off (all initializers failed) + // Shared latch: once the FDv1 fallback applier has fired for any entry, all other + // appliers attached at this level become no-ops. This prevents the existing + // initializer-exhaustion or synchronizer-exhaustion appliers from firing afterwards + // and blocking the FDv1 fallback entry that was just selected. + var fdv1FallbackTriggered = new FDv1FallbackLatch(); + ActionApplierFactory blacklistWhenSuccessOrOff = - (actionable) => new ActionApplierBlacklistWhenSuccessOrOff(actionable); + (actionable) => new GatedObserver( + new ActionApplierBlacklistWhenSuccessOrOff(actionable), fdv1FallbackTriggered); ActionApplierFactory fastFallbackApplierFactory = (actionable) => new ActionApplierFastFallback(actionable); ActionApplierFactory timedFallbackAndRecoveryApplierFactory = - (actionable) => new ActionApplierTimedFallbackAndRecovery(actionable, sublogger); - - ActionApplierFactory fdv1FallbackApplierFactory = (actionable) => new FDv1FallbackActionApplier(actionable); + (actionable) => new GatedObserver( + new ActionApplierTimedFallbackAndRecovery(actionable, sublogger), fdv1FallbackTriggered); + + // From the synchronizers entry, the FDv1 fallback entry is the next entry in the + // outer list, so no extra entries to skip. + ActionApplierFactory fdv1FallbackApplierFactory = + (actionable) => new FDv1FallbackActionApplier(actionable, fdv1FallbackTriggered); + // From the initializers entry, the FDv1 fallback entry is two ahead when synchronizers + // are configured (skip past synchronizers), or one ahead when they are not. + var initializerFdv1FallbackExtraSkips = (synchronizers != null && synchronizers.Count > 0) ? 1 : 0; + ActionApplierFactory initializerFdv1FallbackApplierFactory = + (actionable) => new FDv1FallbackActionApplier(actionable, fdv1FallbackTriggered, initializerFdv1FallbackExtraSkips); var initializationTracker = new InitializationTracker(Any(initializers), Any(synchronizers)); @@ -71,8 +87,22 @@ public static IDataSource CreateFDv2DataSource( // The common data source updates implements both IDataSourceUpdates and IDataSourceUpdatesV2. return new CompositeSource("Initializers", sink, initializerFactory, sublogger, circular: false); }, - (actionable) => new CompositeObserver( - initializationObserver, blacklistWhenSuccessOrOff(actionable)) + (actionable) => + { + // Honor the FDv1 fallback directive in the initializer phase too. When the + // server returns x-ld-fd-fallback during init, skip the FDv2 synchronizer + // chain entirely and switch directly to the FDv1 fallback synchronizer. + if (fdv1Synchronizers != null && fdv1Synchronizers.Count > 0) + { + return new CompositeObserver( + initializationObserver, + blacklistWhenSuccessOrOff(actionable), + initializerFdv1FallbackApplierFactory(actionable)); + } + + return new CompositeObserver( + initializationObserver, blacklistWhenSuccessOrOff(actionable)); + } )); } @@ -400,24 +430,98 @@ public void Apply(ChangeSet changeSet) } } - private class FDv1FallbackActionApplier : IDataSourceObserver + /// + /// Single-shot latch shared between the FDv1 fallback applier and the surrounding + /// fallback/recovery appliers. Once the FDv1 fallback directive is observed at any entry, + /// other appliers stop reacting -- they would otherwise observe the now-unwanted Off + /// signals from previously running data sources and try to advance the composite again. + /// + internal sealed class FDv1FallbackLatch + { + private int _triggered; + + /// + /// Returns whether the latch has already been triggered. + /// + public bool IsTriggered => System.Threading.Volatile.Read(ref _triggered) != 0; + + /// + /// Atomically sets the latch. Returns true if this call was the one that set it. + /// + public bool TryTrigger() => System.Threading.Interlocked.CompareExchange(ref _triggered, 1, 0) == 0; + } + + /// + /// Wraps another observer and suppresses both Apply and UpdateStatus calls once the FDv1 + /// fallback latch has been triggered. The latch is set by + /// when it observes the FDv1 fallback directive. + /// + internal sealed class GatedObserver : IDataSourceObserver + { + private readonly IDataSourceObserver _inner; + private readonly FDv1FallbackLatch _latch; + + public GatedObserver(IDataSourceObserver inner, FDv1FallbackLatch latch) + { + _inner = inner ?? throw new ArgumentNullException(nameof(inner)); + _latch = latch ?? throw new ArgumentNullException(nameof(latch)); + } + + public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) + { + if (_latch.IsTriggered) return; + _inner.UpdateStatus(newState, newError); + } + + public void Apply(ChangeSet changeSet) + { + if (_latch.IsTriggered) return; + _inner.Apply(changeSet); + } + } + + /// + /// Action applier that observes an FDv1 fallback signal and advances the outer composite + /// to the FDv1 fallback synchronizer entry, blocking the current entry and any number of + /// intermediate entries that should also be skipped. + /// + /// + /// When attached to the synchronizers entry of the outer FDv2 composite, the FDv1 fallback + /// entry is the next one in the list, so extraEntriesToSkip is 0. When attached to + /// the initializers entry and the synchronizers entry is also configured, we have to skip + /// past it, so extraEntriesToSkip is 1. + /// + internal class FDv1FallbackActionApplier : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; + private readonly FDv1FallbackLatch _latch; + private readonly int _extraEntriesToSkip; - public FDv1FallbackActionApplier(ICompositeSourceActionable actionable) + public FDv1FallbackActionApplier(ICompositeSourceActionable actionable, FDv1FallbackLatch latch = null, int extraEntriesToSkip = 0) { _actionable = actionable ?? throw new ArgumentNullException(nameof(actionable)); + if (extraEntriesToSkip < 0) + { + throw new ArgumentOutOfRangeException(nameof(extraEntriesToSkip)); + } + _latch = latch ?? new FDv1FallbackLatch(); + _extraEntriesToSkip = extraEntriesToSkip; } public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { - if (newError != null && newError.Value.FDv1Fallback) + if (newError == null || !newError.Value.FDv1Fallback) return; + if (!_latch.TryTrigger()) return; + + _actionable.BlockCurrent(); // blacklist the current entry + _actionable.DisposeCurrent(); // dispose the current data source + for (var i = 0; i < _extraEntriesToSkip; i++) { - _actionable.BlockCurrent(); // blacklist the synchronizers altogether - _actionable.DisposeCurrent(); // dispose the synchronizers - _actionable.GoToNext(); // go to the FDv1 fallback synchronizer - _actionable.StartCurrent(); // start the FDv1 fallback synchronizer + _actionable.GoToNext(); // advance to the entry we are skipping past + _actionable.BlockCurrent(); // remove that entry from the list too } + _actionable.GoToNext(); // go to the FDv1 fallback synchronizer entry + _actionable.StartCurrent(); // start the FDv1 fallback synchronizer } public void Apply(ChangeSet changeSet) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs index b8f92834..789a5207 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading; @@ -100,12 +101,28 @@ private async Task UpdateTaskAsync() } ProcessPollingResponse(response.Value); + + // The server can ask the SDK to fall back to FDv1 even on a successful response. + // Apply the payload first (above), then signal the fallback so the action applier + // can switch to the FDv1 fallback synchronizer. + if (HasFDv1FallbackHeader(response.Value.Headers)) + { + _log.Info("LaunchDarkly polling response indicates fallback to FDv1"); + var fallbackError = new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }; + Shutdown(fallbackError); + return; + } } catch (UnsuccessfulResponseException ex) { var recoverable = HttpErrors.IsRecoverable(ex.StatusCode); var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode, recoverable); - + // Check for LD fallback header if (ex.Headers != null) { @@ -266,5 +283,15 @@ private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo) _canceler?.Cancel(); _dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo); } + + internal static bool HasFDv1FallbackHeader( + IEnumerable>> headers) + { + if (headers == null) return false; + return headers + .Where(h => string.Equals(h.Key, "x-ld-fd-fallback", StringComparison.OrdinalIgnoreCase)) + .SelectMany(h => h.Value) + .Any(v => string.Equals(v, "true", StringComparison.OrdinalIgnoreCase)); + } } } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs index d791055b..5f14d2b4 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs @@ -46,6 +46,11 @@ internal delegate IEventSource EventSourceCreator(Uri streamUri, private volatile string _environmentId; + // _fdv1FallbackRequested is set to true when the streaming response carries the + // x-ld-fd-fallback: true header. Once any payload is successfully applied while this is set, + // the SDK shuts the stream down and signals the FDv1 fallback action applier. + private volatile bool _fdv1FallbackRequested; + /// /// When the store enters a failed state, and we don't have "data source monitoring", we want to log /// a message that we are restarting the event source. We don't want to log this message on multiple @@ -225,6 +230,24 @@ private void OnMessage(object sender, MessageReceivedEventArgs e) { _lastStoreUpdateFailed.GetAndSet(false); MaybeMarkInitialized(); + + // The server may include the FDv1 fallback header on a successful + // streaming response. Apply the payload first (above), then shut the + // stream down with FDv1Fallback=true so the action applier swaps to the + // FDv1 fallback synchronizer. + if (_fdv1FallbackRequested) + { + _log.Info("LaunchDarkly streaming response indicates fallback to FDv1"); + var fallbackError = new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }; + _initTask.TrySetResult(true); + Shutdown(fallbackError); + return; + } } else { @@ -341,6 +364,12 @@ private void OnOpen(object sender, StateChangedEventArgs e) _environmentId = e.Headers?.FirstOrDefault((item) => item.Key.ToLower() == HeaderConstants.EnvironmentId).Value ?.FirstOrDefault(); + + // Capture the FDv1 fallback header from the connection-open response. The SDK applies + // any payload that arrives on this stream and then shuts the stream down so the action + // applier can switch to the FDv1 fallback synchronizer. + _fdv1FallbackRequested = FDv2PollingDataSource.HasFDv1FallbackHeader(e.Headers); + _log.Debug("EventSource Opened"); RecordStreamInit(false); } From 67dad4699cebaa7a69ccbe34eb560eb000bd5bb5 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 12:16:03 -0400 Subject: [PATCH 02/14] test: Wire FDv1 Fallback Directive contract-test capability and add unit coverage The contract-test harness now treats the FDv1 Fallback Synchronizer as a distinct field on the data system (dataSystem.fdv1Fallback) rather than deriving it from the FDv2 Primary/Fallback synchronizer chain, and gates the directive subtests on a new fdv1-fallback capability. Wire the test service to match: - declare the fdv1-fallback capability - accept the new fdv1Fallback config field as SdkConfigPollingParams - build the FDv1 Fallback Synchronizer from that field directly, instead of coercing it out of the last polling synchronizer - bump the FDv2 contract-tests pin from v3.0.0-alpha.3 to v3.0.0-alpha.6 Add unit-level coverage for the new SDK behavior: - FDv2PollingDataSourceTest: success+fallback header path - FDv2StreamingDataSourceTest: success+fallback header path - FDv2DataSourceTest: FDv1FallbackActionApplier extra-skip behavior and the shared GatedObserver latch. --- .github/actions/contract-tests/action.yml | 6 +- .../server/contract-tests/Representations.cs | 4 + .../server/contract-tests/SdkClientEntity.cs | 85 ++++----------- pkgs/sdk/server/contract-tests/TestService.cs | 3 +- .../FDv2DataSources/FDv2DataSourceTest.cs | 103 ++++++++++++++++++ .../FDv2PollingDataSourceTest.cs | 65 +++++++++++ .../FDv2StreamingDataSourceTest.cs | 71 ++++++++++++ 7 files changed, 268 insertions(+), 69 deletions(-) diff --git a/.github/actions/contract-tests/action.yml b/.github/actions/contract-tests/action.yml index 9ccaad11..832b7721 100644 --- a/.github/actions/contract-tests/action.yml +++ b/.github/actions/contract-tests/action.yml @@ -12,7 +12,7 @@ inputs: required: false default: '' run_fdv2_tests: - description: 'Whether to run contract tests from the v3.0.0-alpha.3 tag' + description: 'Whether to run contract tests from the v3.0.0-alpha.6 tag' required: false default: 'false' @@ -64,7 +64,7 @@ runs: shell: bash run: dotnet ${{ inputs.service_dll_file }} > test-service.log 2>&1 & disown - - name: Clone and run contract tests from v3.0.0-alpha.3 tag + - name: Clone and run contract tests from v3.0.0-alpha.6 tag if: inputs.run_fdv2_tests == 'true' shell: bash run: | @@ -72,7 +72,7 @@ runs: git clone https://github.com/launchdarkly/sdk-test-harness.git /tmp/sdk-test-harness cp $(dirname ./${{ inputs.service_project_file }})/test-supressions-fdv2.txt /tmp/sdk-test-harness/testharness-suppressions-fdv2.txt cd /tmp/sdk-test-harness - git checkout v3.0.0-alpha.3 + git checkout v3.0.0-alpha.6 go build -o test-harness . ./test-harness -url http://localhost:8000 -debug -status-timeout=360 --skip-from=testharness-suppressions-fdv2.txt --stop-service-at-end env: diff --git a/pkgs/sdk/server/contract-tests/Representations.cs b/pkgs/sdk/server/contract-tests/Representations.cs index 5030b315..c9879c4c 100644 --- a/pkgs/sdk/server/contract-tests/Representations.cs +++ b/pkgs/sdk/server/contract-tests/Representations.cs @@ -153,6 +153,10 @@ public class SdkConfigDataSystemParams public int? StoreMode { get; set; } public SdkConfigDataInitializerParams[] Initializers { get; set; } public SdkConfigDataSynchronizerParams[] Synchronizers { get; set; } + // FDv1Fallback configures the SDK's FDv1 Fallback Synchronizer, which is engaged only when + // the LaunchDarkly server returns the FDv1 fallback directive. It is distinct from the FDv2 + // Primary/Fallback Synchronizers above. + public SdkConfigPollingParams FDv1Fallback { get; set; } public string PayloadFilter { get; set; } } diff --git a/pkgs/sdk/server/contract-tests/SdkClientEntity.cs b/pkgs/sdk/server/contract-tests/SdkClientEntity.cs index 2b8631fe..4e5ffc7b 100644 --- a/pkgs/sdk/server/contract-tests/SdkClientEntity.cs +++ b/pkgs/sdk/server/contract-tests/SdkClientEntity.cs @@ -503,47 +503,29 @@ private static Configuration BuildSdkConfig(SdkConfigParams sdkParams, ILogAdapt if (synchronizers.Count > 0) { dataSystemBuilder.Synchronizers(synchronizers.ToArray()); - - // Find the best synchronizer to use for FDv1 fallback configuration - // Prefer polling synchronizers since FDv1 fallback is polling-based - SdkConfigDataSynchronizerParams synchronizerForFallback = null; - - // First, try to find a polling synchronizer - foreach (var syncParams in sdkParams.DataSystem.Synchronizers) - { - if (syncParams.Polling != null) - { - synchronizerForFallback = syncParams; - break; - } - } - - // If no polling synchronizer found, use the first synchronizer (could be streaming) - if (synchronizerForFallback == null && sdkParams.DataSystem.Synchronizers.Length > 0) - { - synchronizerForFallback = sdkParams.DataSystem.Synchronizers[0]; - } - - if (synchronizerForFallback != null) - { - // Only configure global polling endpoints if we have a polling synchronizer with a custom base URI - // This ensures the FDv1 fallback synchronizer uses the same base URI without overwriting - // existing polling endpoint configuration - if (synchronizerForFallback.Polling != null && - synchronizerForFallback.Polling.BaseUri != null) - { - endpoints.Polling(synchronizerForFallback.Polling.BaseUri); - } - - var fdv1Fallback = CreateFDv1FallbackSynchronizer(synchronizerForFallback); - if (fdv1Fallback != null) - { - dataSystemBuilder.FDv1FallbackSynchronizer(fdv1Fallback); - } - } } } + // Configure the FDv1 Fallback Synchronizer directly from dataSystem.fdv1Fallback, + // separate from the FDv2 Primary/Fallback synchronizer chain. This is engaged only + // in response to a server-directed FDv1 Fallback Directive. + if (sdkParams.DataSystem.FDv1Fallback != null) + { + if (sdkParams.DataSystem.FDv1Fallback.BaseUri != null) + { + endpoints.Polling(sdkParams.DataSystem.FDv1Fallback.BaseUri); + } + + var fdv1FallbackBuilder = DataSystemComponents.FDv1Polling(); + if (sdkParams.DataSystem.FDv1Fallback.PollIntervalMs.HasValue) + { + fdv1FallbackBuilder.PollInterval( + TimeSpan.FromMilliseconds(sdkParams.DataSystem.FDv1Fallback.PollIntervalMs.Value)); + } + + dataSystemBuilder.FDv1FallbackSynchronizer(fdv1FallbackBuilder); + } + builder.DataSystem(dataSystemBuilder); } @@ -595,33 +577,6 @@ private static IComponentConfigurer CreateSynchronizer( return null; } - private static IComponentConfigurer CreateFDv1FallbackSynchronizer( - SdkConfigDataSynchronizerParams synchronizer) - { - // FDv1 fallback synchronizer is always polling-based - var fdv1PollingBuilder = DataSystemComponents.FDv1Polling(); - - // Configure polling interval if the synchronizer has polling configuration - if (synchronizer.Polling != null) - { - if (synchronizer.Polling.PollIntervalMs.HasValue) - { - fdv1PollingBuilder.PollInterval(TimeSpan.FromMilliseconds(synchronizer.Polling.PollIntervalMs.Value)); - } - // Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI - // will use the global service endpoints configuration - } - else if (synchronizer.Streaming != null) - { - // For streaming synchronizers, we still create a polling fallback - // Use default polling interval since streaming doesn't have a poll interval - // Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI - // will use the global service endpoints configuration - } - - return fdv1PollingBuilder; - } - private MigrationVariationResponse DoMigrationVariation(MigrationVariationParams migrationVariation) { var defaultStage = MigrationStageExtensions.FromDataModelString(migrationVariation.DefaultStage); diff --git a/pkgs/sdk/server/contract-tests/TestService.cs b/pkgs/sdk/server/contract-tests/TestService.cs index 922a2676..13f8df7a 100644 --- a/pkgs/sdk/server/contract-tests/TestService.cs +++ b/pkgs/sdk/server/contract-tests/TestService.cs @@ -38,7 +38,8 @@ public class Webapp "inline-context-all", "anonymous-redaction", "evaluation-hooks", - "client-prereq-events" + "client-prereq-events", + "fdv1-fallback" }; public readonly Handler Handler; diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 16fdfe47..db02db72 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1418,32 +1418,42 @@ private class MockCompositeSourceActionable : ICompositeSourceActionable public bool GoToFirstCalled { get; private set; } public bool StartCurrentCalled { get; private set; } public bool BlockCurrentCalled { get; private set; } + public int GoToNextCallCount { get; private set; } + public int BlockCurrentCallCount { get; private set; } + public List CallSequence { get; } = new List(); private bool _isAtFirst = false; public void DisposeCurrent() { DisposeCurrentCalled = true; + CallSequence.Add(nameof(DisposeCurrent)); } public void GoToNext() { GoToNextCalled = true; + GoToNextCallCount++; + CallSequence.Add(nameof(GoToNext)); } public void GoToFirst() { GoToFirstCalled = true; + CallSequence.Add(nameof(GoToFirst)); } public Task StartCurrent() { StartCurrentCalled = true; + CallSequence.Add(nameof(StartCurrent)); return Task.FromResult(true); } public void BlockCurrent() { BlockCurrentCalled = true; + BlockCurrentCallCount++; + CallSequence.Add(nameof(BlockCurrent)); } public bool IsAtFirst() @@ -1463,9 +1473,102 @@ public void Reset() GoToFirstCalled = false; StartCurrentCalled = false; BlockCurrentCalled = false; + GoToNextCallCount = 0; + BlockCurrentCallCount = 0; + CallSequence.Clear(); _isAtFirst = false; } } + + [Fact] + public void FDv1FallbackActionApplierIgnoresStatusWithoutFDv1Fallback() + { + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + // Status updates without an FDv1Fallback error must not advance the composite. + applier.UpdateStatus(DataSourceState.Off, null); + applier.UpdateStatus(DataSourceState.Interrupted, + new DataSourceStatus.ErrorInfo { Kind = DataSourceStatus.ErrorKind.NetworkError }); + applier.UpdateStatus(DataSourceState.Valid, + new DataSourceStatus.ErrorInfo { FDv1Fallback = false }); + + Assert.False(mockActionable.BlockCurrentCalled); + Assert.False(mockActionable.GoToNextCalled); + Assert.False(mockActionable.StartCurrentCalled); + } + + [Fact] + public void FDv1FallbackActionApplierWithDefaultSkipMovesToNextEntry() + { + // Default behavior (synchronizers entry): block current, dispose, go to next, start. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + applier.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + + Assert.Equal( + new List { "BlockCurrent", "DisposeCurrent", "GoToNext", "StartCurrent" }, + mockActionable.CallSequence); + } + + [Fact] + public void FDv1FallbackActionApplierWithExtraSkipsAdvancesPastIntermediateEntries() + { + // From the initializers entry with synchronizers configured, the applier must skip + // past the synchronizers entry to land on the FDv1 fallback entry. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable, extraEntriesToSkip: 1); + + applier.UpdateStatus( + DataSourceState.Interrupted, + new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + + Assert.Equal( + new List { "BlockCurrent", "DisposeCurrent", "GoToNext", "BlockCurrent", "GoToNext", "StartCurrent" }, + mockActionable.CallSequence); + Assert.Equal(2, mockActionable.GoToNextCallCount); + Assert.Equal(2, mockActionable.BlockCurrentCallCount); + } + + [Fact] + public void GatedObserverSuppressesEventsAfterLatchTriggered() + { + // The latch coordinates the FDv1 fallback applier with the surrounding fallback / + // recovery appliers: once one entry's applier has triggered the fallback, others must + // not respond to the now-stale Off signals from disposed entries. + var inner = new CapturingObserver(); + var latch = new FDv2DataSource.FDv1FallbackLatch(); + var gated = new FDv2DataSource.GatedObserver(inner, latch); + + // Before triggering: events flow through. + gated.UpdateStatus(DataSourceState.Off, new DataSourceStatus.ErrorInfo()); + Assert.Equal(1, inner.UpdateStatusCallCount); + + // Trigger the latch (e.g. FDv1FallbackActionApplier observed the directive). + Assert.True(latch.TryTrigger()); + + // After triggering: events are suppressed. + gated.UpdateStatus(DataSourceState.Off, new DataSourceStatus.ErrorInfo()); + gated.Apply(new ChangeSet( + ChangeSetType.Full, Selector.Empty, + new Dictionary>(), null)); + Assert.Equal(1, inner.UpdateStatusCallCount); + Assert.Equal(0, inner.ApplyCallCount); + } + + private class CapturingObserver : IDataSourceObserver + { + public int UpdateStatusCallCount { get; private set; } + public int ApplyCallCount { get; private set; } + + public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) + => UpdateStatusCallCount++; + + public void Apply(ChangeSet changeSet) => ApplyCallCount++; + } } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs index 12f16dbf..2e37787d 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs @@ -977,6 +977,71 @@ public async Task UnrecoverableHttpErrorWithFallbackHeaderFalseDoesNotSetFDv1Fal } } + [Fact] + public async Task SuccessfulResponseWithFallbackHeaderAppliesPayloadThenSignalsFDv1Fallback() + { + // The server can include x-ld-fd-fallback: true on a successful 200 response. The SDK + // must apply the payload first (so evaluations see the freshest data the server is + // willing to give us) and then emit Off + FDv1Fallback so the action applier hands off + // to the FDv1 fallback synchronizer. + var headers = new List>> + { + new KeyValuePair>( + "x-ld-fd-fallback", new[] { "true" }) + }; + + _mockRequestor.Setup(r => r.PollingRequestAsync(It.IsAny())) + .ReturnsAsync(CreatePollingResponseWithHeaders( + new[] { CreateServerIntentEvent("none", "test-payload", 1) }, + headers)); + + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + // The payload must be applied -- the SDK's first observable side effect is the + // ChangeSet reaching the data store. + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.None, changeSet.Type); + + // After the payload, the data source emits Off with FDv1Fallback=true so the action + // applier can swap to FDv1. + var status = _updateSink.StatusUpdates.ExpectValue(); + Assert.Equal(DataSourceState.Off, status.State); + Assert.NotNull(status.LastError); + Assert.True( + status.LastError.Value.FDv1Fallback, + "FDv1Fallback should be set when the success response carried the fallback header"); + + // Init task completes successfully because the payload was applied. + var result = await startTask; + Assert.True(result); + } + } + + [Fact] + public async Task SuccessfulResponseWithoutFallbackHeaderDoesNotSignalFDv1Fallback() + { + // Sanity check: a successful response with no fallback header must NOT trigger fallback. + _mockRequestor.Setup(r => r.PollingRequestAsync(It.IsAny())) + .ReturnsAsync(CreatePollingResponse( + CreateServerIntentEvent("none", "test-payload", 1))); + + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + var result = await startTask; + Assert.True(result); + + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.None, changeSet.Type); + + // No status update should appear -- the data source remains active and would simply + // poll again on the next tick without falling back. + _updateSink.StatusUpdates.ExpectNoValue(TimeSpan.FromMilliseconds(50)); + } + } + [Fact] public async Task UnrecoverableHttpError403ReportsOffStatusWithRecoverableFalse() { diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs index ba56a176..1adaa80e 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs @@ -710,6 +710,77 @@ public async Task UnrecoverableHttpErrorWithFallbackHeaderFalseDoesNotSetFDv1Fal } } + [Fact] + public async Task SuccessfulStreamWithFallbackHeaderAppliesPayloadThenSignalsFDv1Fallback() + { + // The server can ride x-ld-fd-fallback: true on a successful streaming connection. The + // SDK must apply any payload that arrives on the stream and then shut the stream down + // with FDv1Fallback=true so the action applier hands off to the FDv1 fallback + // synchronizer. Without this behavior the stream would stay open against FDv2 forever. + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + _mockEventSource.TriggerOpen(headers); + + // Send a complete xfer-full payload so the protocol applies a ChangeSet. + _mockEventSource.TriggerMessage(CreateMessageEvent("server-intent", + CreateServerIntentJson("xfer-full", "p1", 1))); + _mockEventSource.TriggerMessage(CreateMessageEvent("payload-transferred", + CreatePayloadTransferredJson("(p:p1:1)", 1))); + + // The payload must be applied first. + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + // Then the data source emits Off with FDv1Fallback=true so the action applier can + // swap to FDv1. + var status = _updateSink.StatusUpdates.ExpectValue(); + Assert.Equal(DataSourceState.Off, status.State); + Assert.NotNull(status.LastError); + Assert.True( + status.LastError.Value.FDv1Fallback, + "FDv1Fallback should be set when the success response carried the fallback header"); + + // Init task completes successfully because the payload was applied. + var result = await startTask; + Assert.True(result); + + // The event source is closed so the stream stops consuming events from FDv2. + Assert.True(_mockEventSource.IsClosed); + } + } + + [Fact] + public async Task SuccessfulStreamWithoutFallbackHeaderDoesNotSignalFDv1Fallback() + { + // Sanity check: a normal successful streaming response must not produce a fallback + // signal nor close the stream. + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + _mockEventSource.TriggerOpen(); + _mockEventSource.TriggerMessage(CreateMessageEvent("server-intent", + CreateServerIntentJson("xfer-full", "p1", 1))); + _mockEventSource.TriggerMessage(CreateMessageEvent("payload-transferred", + CreatePayloadTransferredJson("(p:p1:1)", 1))); + + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + var result = await startTask; + Assert.True(result); + + _updateSink.StatusUpdates.ExpectNoValue(TimeSpan.FromMilliseconds(50)); + Assert.False(_mockEventSource.IsClosed); + } + } + [Fact] public async Task UnrecoverableHttpError403ReportsOffStatusWithRecoverableFalse() { From 2a2bf1f074fe045343392b6f5e305b3abaf4752d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 12:51:55 -0400 Subject: [PATCH 03/14] fix: Trigger FallingBack tracker action when FDv1 fallback signaled regardless of state The InitializationTracker only transitioned to FallingBack when it observed an underlying source's raw Off state with FDv1Fallback=true. But by the time the outer composite's secondaries see status updates from an inner composite, the inner CompositeSource's sanitizer has mapped Off to Interrupted -- so the Off case never matched, and the tracker stayed at NoData (or Data, if a fallback synchronizer's Apply with empty selector arrived). Either way, the tracker.Task never completed, so client initialization timed out at StartWaitTime and the contract-test service threw "Client initialization failed" for the streaming- error and polling-initializer FDv1 fallback directive scenarios. Move the FDv1Fallback check ahead of the state switch so it fires for any state (Interrupted on a recoverable error or sanitized Off, Off on an unrecoverable error, Valid + header on a successful directive) when the signal comes from either the initializer or synchronizer category. Apply the same handling to the initializer-phase fallback now that it is honored. Add three end-to-end tests against a real FDv2DataSource composite that reproduce each failing harness scenario: synchronizer Off+FDv1Fallback, synchronizer success-then-Off+FDv1Fallback, and initializer Off+FDv1Fallback. With the fix removed the first two hang at MakeCustomClient's StartWaitTime (matching the harness failure) and the action applier sequence is unverified; with the fix they all pass and the FDv1 fallback synchronizer is engaged. --- .../FDv2DataSource.InitializationTracker.cs | 22 +- .../FDv2DataSources/FDv2DataSourceTest.cs | 218 ++++++++++++++++++ 2 files changed, 232 insertions(+), 8 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs index a9c0e806..924ad132 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs @@ -248,6 +248,20 @@ public void Apply(DataStoreTypes.ChangeSet change public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError, DataSourceCategory category) { + // The FDv1 fallback directive is the SDK's signal that the initializer or + // synchronizer phase is being terminated and the FDv1 fallback synchronizer is + // taking over. This can ride on Interrupted (recoverable error or success + + // header) or Off (unrecoverable error + header) -- transition to FallingBack + // regardless of which state we observe so the tracker can complete via the + // fallback path. + var fdv1FallbackSignaled = newError.HasValue && newError.Value.FDv1Fallback; + if (fdv1FallbackSignaled + && (category == DataSourceCategory.Initializers + || category == DataSourceCategory.Synchronizers)) + { + DetermineState(Action.FallingBack); + } + switch (category) { case DataSourceCategory.Initializers when newState == DataSourceState.Off: @@ -257,14 +271,6 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n } case DataSourceCategory.Synchronizers when newState == DataSourceState.Off: { - // Currently, FDv1 fallback happens in the synchronizers group. If something from that group - // reports Off, with a reason indicating that it should fallback to v1, then we can - // transition to that state. - if (newError.HasValue && newError.Value.FDv1Fallback) - { - DetermineState(Action.FallingBack); - } - DetermineState(Action.SynchronizersExhausted); break; diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index db02db72..94753e45 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1559,6 +1559,224 @@ public void GatedObserverSuppressesEventsAfterLatchTriggered() Assert.Equal(0, inner.ApplyCallCount); } + // End-to-end test: synchronizer reports Off+FDv1Fallback (mirrors the FDv2 streaming + // source's Shutdown path on a 403 + x-ld-fd-fallback header). The SDK must engage the + // FDv1 fallback synchronizer and reach Initialized via that path. The harness suite + // "directive on streaming error engages FDv1 fallback" exercises this. + [Fact] + public async Task SynchronizerOffWithFDv1FallbackErrorEngagesFDv1FallbackSynchronizer() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // Synchronizer reports Off (unrecoverable HTTP error) with the FDv1 fallback flag set + // -- the same error the FDv2 streaming source emits for a 403+directive response. + var synchronizerStartCount = 0; + SourceFactory synchronizerFactory = (updatesSink) => + { + synchronizerStartCount++; + return new MockDataSourceWithInit(async () => + { + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 403, + FDv1Fallback = true, + Recoverable = false, + Time = DateTime.Now + }); + await Task.Yield(); + }); + }; + + // FDv1 fallback synchronizer applies a payload and reaches Valid -- the SDK must + // reach Initialized via this path (not via the FDv2 sync, which failed). + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, Selector.Empty, fdv1Data.Data, null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult, "Start should complete via the FDv1 fallback path"); + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + + // The Apply from FDv1 reached the data store -- this is the load-bearing + // observation that proves the FDv1 fallback engaged. + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test: synchronizer applies a payload first, then reports Off+FDv1Fallback + // (mirrors the streaming success path with x-ld-fd-fallback header). The SDK must apply + // the initial payload AND then engage FDv1 fallback. Harness suite "directive on + // streaming success applies payload then engages FDv1" exercises this. + [Fact] + public async Task SynchronizerSuccessThenFDv1FallbackEngagesFDv1FallbackSynchronizer() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var syncData = new FullDataSet( + new Dictionary>()); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + SourceFactory synchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + // Apply a payload (the SDK's first observable side effect). + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "synchronizer-state"), + syncData.Data, + null)); + // Then signal Off+FDv1Fallback. The action applier must engage FDv1 even + // though the data system is already Initialized via the payload above. + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + FDv1Fallback = true, + Time = DateTime.Now + }); + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult); + + // Two Applies: first from the synchronizer's initial payload, then from FDv1. + var firstChangeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal("synchronizer-state", firstChangeSet.Selector.State); + var secondChangeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondChangeSet.Selector.State); + + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test: an initializer reports Off+FDv1Fallback. The SDK must skip the FDv2 + // synchronizer chain entirely and engage the FDv1 fallback synchronizer. Harness suite + // "directive on polling initializer skips FDv2 synchronizers" exercises this. + [Fact] + public async Task InitializerOffWithFDv1FallbackErrorSkipsSynchronizersAndEngagesFDv1() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + SourceFactory initializerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 403, + FDv1Fallback = true, + Recoverable = false, + Time = DateTime.Now + }); + await Task.Yield(); + }); + + // Synchronizer must NOT have its underlying data source's Start called. The composite + // may call the factory while walking past the entry, but the action applier must + // not start it. + var synchronizerStarted = false; + SourceFactory synchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + synchronizerStarted = true; + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, Selector.Empty, fdv1Data.Data, null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List { initializerFactory }, + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult, "Start should complete via the FDv1 fallback path"); + + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + Assert.False(synchronizerStarted, + "FDv2 synchronizer must not be started when the initializer signals FDv1 fallback"); + } + finally + { + dataSource.Dispose(); + } + } + private class CapturingObserver : IDataSourceObserver { public int UpdateStatusCallCount { get; private set; } From 7a60a5753030c5f5caa03cbc6f63f686131a9de2 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 13:08:28 -0400 Subject: [PATCH 04/14] test: Add end-to-end FDv1 fallback test using real FDv2 streaming source Adds RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterApply, which wires a real FDv2StreamingDataSource (with a mock IEventSource) into the FDv2DataSource composite alongside a mock FDv1 fallback synchronizer. Drives synthetic SSE events that mirror the harness suite "directive on streaming success applies payload then engages FDv1": a server-intent + put-object + payload-transferred sequence with the x-ld-fd-fallback header on the connection-open response. The earlier mock-only tests asserted the action applier sequence in isolation but did not exercise the streaming source's MaybeMarkInitialized + Shutdown path inside the composite. This test bridges that gap. --- .../FDv2DataSources/FDv2DataSourceTest.cs | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 94753e45..0659a389 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1777,6 +1777,139 @@ public async Task InitializerOffWithFDv1FallbackErrorSkipsSynchronizersAndEngage } } + // End-to-end test using the REAL FDv2StreamingDataSource (with a mock IEventSource so we + // can drive synthetic SSE events). This catches harness-style regressions in the + // streaming source's interaction with the FDv2DataSource composite that the simpler + // mock-based tests above cannot reach. Mirrors the harness suite "directive on streaming + // success applies payload then engages FDv1". + [Fact] + public async Task RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterApply() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // The streaming factory is invoked asynchronously by the OUTER composite's queue + // processor. Use a TCS to know when the streaming source has been constructed and + // its event handlers wired to the mock event source -- otherwise the test thread can + // race ahead and trigger the mock before the streaming source is listening. + var streamingMock = new IntegrationMockEventSource(); + var streamingReady = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + SourceFactory streamingFactory = (updatesSink) => + { + var context = BasicContext.WithDataSourceUpdates( + new LaunchDarkly.Sdk.Server.Internal.DataSystem.DataSourceUpdatesV2ToV1Adapter(updatesSink)); + var source = new FDv2StreamingDataSource( + context, + context.DataSourceUpdates, + new Uri("http://example.com"), + TimeSpan.FromMilliseconds(10), + () => Selector.Empty, + (uri, http) => streamingMock); + streamingReady.TrySetResult(true); + return source; + }; + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { streamingFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + // Wait for the streaming factory to wire up before triggering events on the mock. + Assert.True(await Task.WhenAny(streamingReady.Task, Task.Delay(TimeSpan.FromSeconds(2))) + == streamingReady.Task, + "streaming source should be constructed within 2s of dataSource.Start()"); + + // Drive the mock event source with the headers + a complete xfer-full payload. + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + streamingMock.TriggerOpen(headers); + streamingMock.TriggerMessage(new LaunchDarkly.EventSource.MessageReceivedEventArgs( + new LaunchDarkly.EventSource.MessageEvent("server-intent", + @"{""payloads"":[{""id"":""p1"",""target"":1,""intentCode"":""xfer-full"",""reason"":""r""}]}", + null))); + streamingMock.TriggerMessage(new LaunchDarkly.EventSource.MessageReceivedEventArgs( + new LaunchDarkly.EventSource.MessageEvent("payload-transferred", + @"{""state"":""(p:p1:1)"",""version"":1}", + null))); + + var startResult = await startTask; + Assert.True(startResult, "Start should complete via the streaming Apply"); + + // Two Applies expected: streaming xfer-full (non-empty selector), then FDv1. + var firstApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal(ChangeSetType.Full, firstApply.Type); + var secondApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondApply.Selector.State); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer should be engaged when streaming success carries the FDv1 directive"); + Assert.True(streamingMock.IsClosed, + "Streaming event source should be closed after the directive is applied"); + } + finally + { + dataSource.Dispose(); + } + } + + // Mock EventSource for integration tests -- duplicates the helper from + // FDv2StreamingDataSourceTest to keep the integration test contained in this file. + // Suppress unused-event warnings: we only use Opened/MessageReceived/Error here. +#pragma warning disable 67 + private class IntegrationMockEventSource : LaunchDarkly.EventSource.IEventSource + { + public event EventHandler Opened; + public event EventHandler Closed; + public event EventHandler MessageReceived; + public event EventHandler Error; + public event EventHandler CommentReceived; + + public bool IsClosed { get; private set; } + public LaunchDarkly.EventSource.ReadyState ReadyState { get; private set; } = + LaunchDarkly.EventSource.ReadyState.Closed; + + public Task StartAsync() + { + ReadyState = LaunchDarkly.EventSource.ReadyState.Open; + return Task.CompletedTask; + } + + public void Close() => IsClosed = true; + public void Restart(bool forceNewConnection = false) { } + + public void TriggerOpen(IEnumerable>> headers = null) => + Opened?.Invoke(this, new LaunchDarkly.EventSource.StateChangedEventArgs( + LaunchDarkly.EventSource.ReadyState.Open, headers)); + + public void TriggerMessage(LaunchDarkly.EventSource.MessageReceivedEventArgs args) => + MessageReceived?.Invoke(this, args); + } +#pragma warning restore 67 + private class CapturingObserver : IDataSourceObserver { public int UpdateStatusCallCount { get; private set; } From 3bc179ef3acc4736f4f2119120f36f6a586e0a0c Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 13:19:20 -0400 Subject: [PATCH 05/14] test: Add real-streaming 403 + FDv1 directive end-to-end test Mirrors the harness suite "directive on streaming error engages FDv1 fallback" using a real FDv2StreamingDataSource driven by a mock IEventSource that fires an EventSourceServiceUnsuccessfulResponseException with the x-ld-fd-fallback header. Without the InitializationTracker fix this test would hang at MakeCustomClient's StartWaitTime; with the fix it passes in ~25ms. Also adds TriggerError to the integration mock event source so the test can simulate the EventSource library's error path. --- .../FDv2DataSources/FDv2DataSourceTest.cs | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 0659a389..b161ad3c 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1876,6 +1876,90 @@ public async Task RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterA } } + // End-to-end test using the REAL FDv2StreamingDataSource: 403 error response with the + // x-ld-fd-fallback header. Mirrors the harness suite "directive on streaming error + // engages FDv1 fallback". + [Fact] + public async Task RealStreamingSource403WithFallbackHeaderEngagesFDv1Fallback() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + var streamingMock = new IntegrationMockEventSource(); + var streamingReady = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + SourceFactory streamingFactory = (updatesSink) => + { + var context = BasicContext.WithDataSourceUpdates( + new LaunchDarkly.Sdk.Server.Internal.DataSystem.DataSourceUpdatesV2ToV1Adapter(updatesSink)); + var source = new FDv2StreamingDataSource( + context, + context.DataSourceUpdates, + new Uri("http://example.com"), + TimeSpan.FromMilliseconds(10), + () => Selector.Empty, + (uri, http) => streamingMock); + streamingReady.TrySetResult(true); + return source; + }; + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Empty, + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { streamingFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + Assert.True(await Task.WhenAny(streamingReady.Task, Task.Delay(TimeSpan.FromSeconds(2))) + == streamingReady.Task, + "streaming source should be constructed within 2s of dataSource.Start()"); + + // The streaming source receives a 403 with the FDv1 fallback header. This is the + // exact exception the EventSource library throws on a non-2xx response carrying + // the directive (see EventSourceServiceUnsuccessfulResponseException in 5.3.0+). + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + var ex = new LaunchDarkly.EventSource.EventSourceServiceUnsuccessfulResponseException( + 403, headers); + streamingMock.TriggerError(ex); + + var startResult = await startTask; + Assert.True(startResult, + "Start should complete: tracker reaches Initialized via FDv1 fallback Apply"); + + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer should be engaged on a 403 + directive response"); + } + finally + { + dataSource.Dispose(); + } + } + // Mock EventSource for integration tests -- duplicates the helper from // FDv2StreamingDataSourceTest to keep the integration test contained in this file. // Suppress unused-event warnings: we only use Opened/MessageReceived/Error here. @@ -1907,6 +1991,9 @@ public void TriggerOpen(IEnumerable>> h public void TriggerMessage(LaunchDarkly.EventSource.MessageReceivedEventArgs args) => MessageReceived?.Invoke(this, args); + + public void TriggerError(Exception exception) => + Error?.Invoke(this, new LaunchDarkly.EventSource.ExceptionEventArgs(exception)); } #pragma warning restore 67 From 19c77273c56ccda45728311d742be4fdc9d2a76b Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 13:20:10 -0400 Subject: [PATCH 06/14] ci: Upload contract-test service log on FDv2 contract-test failure When the harness reports a failure (e.g. "Client initialization failed" from the test service), the contract-test logs are the only way to see why the SDK didn't reach the expected state. Capture them as a workflow artifact so subsequent debugging doesn't require re-running locally. --- .github/actions/contract-tests/action.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/actions/contract-tests/action.yml b/.github/actions/contract-tests/action.yml index 832b7721..a529c099 100644 --- a/.github/actions/contract-tests/action.yml +++ b/.github/actions/contract-tests/action.yml @@ -77,3 +77,12 @@ runs: ./test-harness -url http://localhost:8000 -debug -status-timeout=360 --skip-from=testharness-suppressions-fdv2.txt --stop-service-at-end env: GITHUB_TOKEN: ${{ inputs.token }} + + - name: Upload test service log on failure + if: failure() && inputs.run_fdv2_tests == 'true' + uses: actions/upload-artifact@v4 + with: + name: contract-test-service-log-${{ runner.os }}-${{ runner.arch }} + path: test-service.log + if-no-files-found: warn + retention-days: 7 From ecb4768089cb3139f928d37072b27459203ed77d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 13:48:46 -0400 Subject: [PATCH 07/14] test: Map dataSystem.fdv1Fallback JSON field with explicit JsonPropertyName The contract-test service uses System.Text.Json with PropertyNamingPolicy = CamelCase. CamelCase converts the C# property name FDv1Fallback to "fDv1Fallback" (only the first character lowercased), but the harness sends the field as "fdv1Fallback" (lowercase 'd' too). The mismatch made sdkParams.DataSystem.FDv1Fallback always null, which meant dataSystemBuilder.FDv1FallbackSynchronizer(...) was never called, which meant dataSystemConfiguration.FDv1FallbackSynchronizer was null, which caused the OUTER composite's FDv1 fallback factory to throw NRE the moment the action applier tried to invoke it. Add an explicit [JsonPropertyName("fdv1Fallback")] attribute so the deserializer accepts the harness's wire format. Also remove the temporary debug logging from CompositeSource and SdkClientEntity that was used to identify the bug. --- pkgs/sdk/server/contract-tests/Representations.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkgs/sdk/server/contract-tests/Representations.cs b/pkgs/sdk/server/contract-tests/Representations.cs index c9879c4c..b898c414 100644 --- a/pkgs/sdk/server/contract-tests/Representations.cs +++ b/pkgs/sdk/server/contract-tests/Representations.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text.Json.Serialization; using LaunchDarkly.Sdk; // Note, in order for System.Text.Json serialization/deserialization to work correctly, the members of @@ -155,7 +156,9 @@ public class SdkConfigDataSystemParams public SdkConfigDataSynchronizerParams[] Synchronizers { get; set; } // FDv1Fallback configures the SDK's FDv1 Fallback Synchronizer, which is engaged only when // the LaunchDarkly server returns the FDv1 fallback directive. It is distinct from the FDv2 - // Primary/Fallback Synchronizers above. + // Primary/Fallback Synchronizers above. The harness sends this as "fdv1Fallback" (lowercase + // 'd'); we override the default CamelCase mapping (which would produce "fDv1Fallback"). + [JsonPropertyName("fdv1Fallback")] public SdkConfigPollingParams FDv1Fallback { get; set; } public string PayloadFilter { get; set; } } From 4b3559ea1da44d091c79de611c4f46eeb5902776 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 13:50:57 -0400 Subject: [PATCH 08/14] fix: Skip building FDv1 fallback factory list when no FDv1 synchronizer is configured FDv2DataSystem.Create unconditionally constructed a one-entry list with FactoryWithContext(clientContext)(dataSystemConfiguration.FDv1FallbackSynchronizer). When FDv1FallbackSynchronizer was null this still produced a non-null SourceFactory delegate that captured the null configurer. The OUTER composite would then add a phantom FDv1 fallback entry; the moment an action applier advanced to it the entry's factory invocation would NullReferenceException inside FactoryWithContext. Build the list as empty when no FDv1 fallback is configured, so the OUTER composite simply has no FDv1 fallback entry and the data system halts cleanly per the spec. --- .../src/Internal/DataSystem/FDv2DataSystem.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs b/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs index ab1114f7..3d4deaf2 100644 --- a/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs +++ b/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs @@ -79,14 +79,20 @@ public static FDv2DataSystem Create(Logger logger, Configuration configuration, var contextWithSelectorSource = clientContext.WithSelectorSource(new SelectorSourceFacade(writeThroughStore)); + // FDv1 fallback synchronizer is optional; only build a list entry when one is + // configured. An always-present list entry that captured a null configurer would + // throw NRE the moment the action applier advanced to the FDv1 fallback entry. + var fdv1FallbackFactories = dataSystemConfiguration.FDv1FallbackSynchronizer == null + ? new List() + : new List + { + FactoryWithContext(clientContext)(dataSystemConfiguration.FDv1FallbackSynchronizer) + }; var compositeDataSource = configuration.Offline ? Components.ExternalUpdatesOnly.Build(contextWithSelectorSource) : FDv2DataSource.CreateFDv2DataSource( dataSourceUpdates, dataSystemConfiguration.Initializers.Select(FactoryWithContext(contextWithSelectorSource)).ToList(), dataSystemConfiguration.Synchronizers.Select(FactoryWithContext(contextWithSelectorSource)).ToList(), - new List - { - FactoryWithContext(clientContext)(dataSystemConfiguration.FDv1FallbackSynchronizer) - }, + fdv1FallbackFactories, logger ); From 0edfb9dbea248585d63d5939b020c194228a468a Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 14:05:49 -0400 Subject: [PATCH 09/14] fix: Attach FDv1 fallback applier unconditionally so the directive halts the data system without a fallback The previous attempt to guard FDv2DataSystem.Create against a null FDv1FallbackSynchronizer (commit dec9450e) regressed Requirement 1.6.3(4): when no FDv1 fallback is configured and the directive arrives, the SDK must halt -- it must not keep retrying the failing synchronizer. Skipping the FDv1 fallback factory list also stripped the fdv1FallbackApplier from the synchronizer entry's outer-level observer, so the directive was no longer observed: nothing disposed the streaming source, and the EventSource library kept reconnecting after every 500. Attach the fdv1FallbackApplier (and the initializer variant) to the outer composite unconditionally. When an FDv1 fallback entry is in the outer list the applier advances to it; when one is not, the applier's BlockCurrent + DisposeCurrent + GoToNext sequence exhausts the outer list -- which disposes the current synchronizer (stopping reconnects) and transitions the outer composite to Off via InternalDispose. Add an end-to-end test (SynchronizerFDv1FallbackWithoutFallbackConfiguredHaltsDataSystem) that mirrors the harness "directive without FDv1 fallback configured halts the data system" scenario: a synchronizer reports Interrupted+FDv1Fallback (500 + directive), no FDv1 fallback configured, the underlying source must be disposed, and the data system must reach Off. --- .../FDv2DataSources/FDv2DataSource.cs | 34 ++++---- .../FDv2DataSources/FDv2DataSourceTest.cs | 86 +++++++++++++++++++ 2 files changed, 101 insertions(+), 19 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index 504b30d0..5f022beb 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -89,19 +89,15 @@ public static IDataSource CreateFDv2DataSource( }, (actionable) => { - // Honor the FDv1 fallback directive in the initializer phase too. When the - // server returns x-ld-fd-fallback during init, skip the FDv2 synchronizer - // chain entirely and switch directly to the FDv1 fallback synchronizer. - if (fdv1Synchronizers != null && fdv1Synchronizers.Count > 0) - { - return new CompositeObserver( - initializationObserver, - blacklistWhenSuccessOrOff(actionable), - initializerFdv1FallbackApplierFactory(actionable)); - } - + // Honor the FDv1 fallback directive in the initializer phase too. The + // applier is attached unconditionally: when an FDv1 fallback entry is + // configured the applier advances to it; when one is not, the applier's + // BlockCurrent + DisposeCurrent + GoToNext sequence exhausts the outer + // list and halts the data system per Requirement 1.6.3(4). return new CompositeObserver( - initializationObserver, blacklistWhenSuccessOrOff(actionable)); + initializationObserver, + blacklistWhenSuccessOrOff(actionable), + initializerFdv1FallbackApplierFactory(actionable)); } )); } @@ -125,13 +121,13 @@ public static IDataSource CreateFDv2DataSource( }, (actionable) => { - // Only attach FDv1 fallback applier if FDv1 synchronizers are actually provided - if (fdv1Synchronizers != null && fdv1Synchronizers.Count > 0) - { - return new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)); - } - - return synchronizationObserver; + // The FDv1 fallback applier is attached unconditionally: when an FDv1 + // fallback entry exists in the outer list the applier advances to it; + // when one does not, BlockCurrent + DisposeCurrent + GoToNext exhausts + // the outer list and halts the data system per Requirement 1.6.3(4). + // Disposing the current synchronizer is what stops the streaming source + // from reconnecting. + return new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)); } )); } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index b161ad3c..ae075091 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1777,6 +1777,92 @@ public async Task InitializerOffWithFDv1FallbackErrorSkipsSynchronizersAndEngage } } + // End-to-end test: synchronizer reports the FDv1 fallback directive but no FDv1 + // fallback is configured. The SDK must HALT (Requirement 1.6.3(4)) -- which in + // practice means the synchronizer's underlying data source must be disposed so it + // stops trying to reconnect. The data system reaches Off via outer composite + // exhaustion. Mirrors the harness suite "directive without FDv1 fallback configured + // halts the data system". + [Fact] + public void SynchronizerFDv1FallbackWithoutFallbackConfiguredHaltsDataSystem() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + + var synchronizerDisposed = false; + SourceFactory synchronizerFactory = (updatesSink) => + new HaltMockDataSource( + onStart: async () => + { + // 500 + directive: a recoverable status that would normally drive retries. + // The directive must take precedence and halt those retries. + updatesSink.UpdateStatus( + DataSourceState.Interrupted, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 500, + FDv1Fallback = true, + Recoverable = true, + Time = DateTime.Now + }); + await Task.Yield(); + }, + onDispose: () => synchronizerDisposed = true); + + // No FDv1 fallback configured (empty list). + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List(), + TestLogger); + + try + { + _ = dataSource.Start(); + + // The data system must transition to Off via outer composite exhaustion. Wait a + // few status updates -- intermediate Interrupted statuses come first. + var statuses = capturingSink.WaitForStatusUpdates(2, TimeSpan.FromSeconds(5)); + Assert.Contains(statuses, s => s.State == DataSourceState.Off); + + // Critically, the synchronizer must have been disposed -- this is what stops the + // streaming source from reconnecting in the harness scenario. + Assert.True(synchronizerDisposed, + "synchronizer source must be disposed so the streaming source stops reconnecting"); + } + finally + { + dataSource.Dispose(); + } + } + + // Mock data source that reports its disposal -- used to verify that the FDv1 fallback + // applier disposes the current synchronizer when no fallback is configured. + private class HaltMockDataSource : IDataSource + { + private readonly Func _onStart; + private readonly Action _onDispose; + private bool _initialized; + + public HaltMockDataSource(Func onStart, Action onDispose) + { + _onStart = onStart; + _onDispose = onDispose; + } + + public async Task Start() + { + await _onStart(); + _initialized = true; + return true; + } + + public bool Initialized => _initialized; + + public void Dispose() => _onDispose(); + } + // End-to-end test using the REAL FDv2StreamingDataSource (with a mock IEventSource so we // can drive synthetic SSE events). This catches harness-style regressions in the // streaming source's interaction with the FDv2DataSource composite that the simpler From 1fe05373cc965d54f36a96b5b01d5a297ce7eb11 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 5 May 2026 10:21:32 -0400 Subject: [PATCH 10/14] fix: defer blacklist applier's Apply-driven advancement to coordinate with FDv1 fallback latch When an FDv2 initializer applies a non-empty changeset (success path) and the response also carries the x-ld-fd-fallback header, the source emits Apply followed by UpdateStatus(Off, FDv1Fallback) on the same propagation chain. Previously, ActionApplierBlacklistWhenSuccessOrOff's Apply path would synchronously enqueue advancement onto the outer composite, and the FDv1FallbackActionApplier's UpdateStatus path would independently enqueue its own advancement. The shared latch could not prevent the blacklist applier because it was only triggered during the later UpdateStatus call -- by then, the blacklist applier had already enqueued its actions. The result was the outer composite double-advancing past the FDv1 fallback entry. The fix: - Add ICompositeSourceActionable.EnqueueAction so observers can defer work onto the composite's serialized queue. - Pass the FDv1FallbackLatch into the blacklist applier. Its UpdateStatus path now consults the latch synchronously (sanitizer maps Off->Interrupted before reaching outer secondaries, so this branch is largely defensive). Its Apply path defers advancement onto the queue with a bounded re-enqueue, giving the source thread time to complete UpdateStatus and set the latch before the deferred action checks it. - Reorder the initializer entry's CompositeObserver so the FDv1 fallback applier runs before the blacklist applier, ensuring the latch is set as early as possible in any UpdateStatus propagation. Adds a composite-level regression test that drives the exact scenario end-to-end against a real FDv2DataSource composite, asserting the FDv1 fallback synchronizer is engaged and the FDv2 streaming synchronizer is never started. --- .../CompositeDataSource/CompositeSource.cs | 10 ++ .../ICompositeSourceActionable.cs | 15 ++- .../FDv2DataSources/FDv2DataSource.cs | 105 +++++++++++++--- .../FDv2DataSources/FDv2DataSourceTest.cs | 115 ++++++++++++++++++ 4 files changed, 226 insertions(+), 19 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs index 1ea5447e..2003f5a2 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs @@ -451,6 +451,16 @@ public void BlockCurrent() }); } + // Explicit interface implementation forwards to the private EnqueueAction so callers using the + // ICompositeSourceActionable abstraction can defer arbitrary work onto the same serialized + // action queue. This allows observers to interleave their advancement decisions with other + // queued actions, while keeping the underlying queue plumbing private. + void ICompositeSourceActionable.EnqueueAction(Action action) + { + if (action == null) return; + EnqueueAction(action); + } + private void logTransition(String previousDescription, String currentDescription) { if (previousDescription != null && currentDescription != null) { _log.Debug("{0} transitioned from {1} to {2}.", _compositeDescription, previousDescription, currentDescription); diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs index 5410f111..4e918963 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; namespace LaunchDarkly.Sdk.Server.Internal.DataSources @@ -34,10 +35,22 @@ internal interface ICompositeSourceActionable bool IsAtFirst(); /// - /// Blocks the the current source's factory. This prevents the current source's factory from being used again. + /// Blocks the the current source's factory. This prevents the current source's factory from being used again. /// Note that this does not tear down the current data source, it just prevents its factory from being used again. /// void BlockCurrent(); + + /// + /// Enqueues an arbitrary action to run on the composite's serialized action queue. + /// + /// + /// The action runs at queue-processing time, after any actions enqueued earlier on the + /// same queue. Observers can use this to defer advancement decisions until after the + /// current synchronous propagation chain has completed -- for example, an applier that + /// wants to check a latch that is set by a sibling applier in the same propagation. + /// + /// the action to enqueue + void EnqueueAction(Action action); } } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index 5f022beb..d6af7906 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -42,8 +42,7 @@ public static IDataSource CreateFDv2DataSource( var fdv1FallbackTriggered = new FDv1FallbackLatch(); ActionApplierFactory blacklistWhenSuccessOrOff = - (actionable) => new GatedObserver( - new ActionApplierBlacklistWhenSuccessOrOff(actionable), fdv1FallbackTriggered); + (actionable) => new ActionApplierBlacklistWhenSuccessOrOff(actionable, fdv1FallbackTriggered); ActionApplierFactory fastFallbackApplierFactory = (actionable) => new ActionApplierFastFallback(actionable); ActionApplierFactory timedFallbackAndRecoveryApplierFactory = (actionable) => new GatedObserver( @@ -94,10 +93,19 @@ public static IDataSource CreateFDv2DataSource( // configured the applier advances to it; when one is not, the applier's // BlockCurrent + DisposeCurrent + GoToNext sequence exhausts the outer // list and halts the data system per Requirement 1.6.3(4). + // + // Observer order is significant: the FDv1 fallback applier MUST run before + // the blacklist applier so that it has a chance to set the FDv1 fallback + // latch when the source emits UpdateStatus(Off, FDv1Fallback). The + // blacklist applier's UpdateStatus path consults the latch and no-ops when + // it is set, preventing it from concurrently advancing the outer composite + // toward the FDv2 synchronizer entry. (See ActionApplierBlacklistWhenSuccessOrOff + // for the corresponding Apply-path handling, which defers advancement onto + // the actionable's queue so the latch can be checked at execution time.) return new CompositeObserver( initializationObserver, - blacklistWhenSuccessOrOff(actionable), - initializerFdv1FallbackApplierFactory(actionable)); + initializerFdv1FallbackApplierFactory(actionable), + blacklistWhenSuccessOrOff(actionable)); } )); } @@ -127,6 +135,10 @@ public static IDataSource CreateFDv2DataSource( // the outer list and halts the data system per Requirement 1.6.3(4). // Disposing the current synchronizer is what stops the streaming source // from reconnecting. + // + // Note: the synchronizers entry attaches only the FDv1 fallback applier + // (no blacklist applier), so observer ordering relative to blacklist is + // not a concern here. return new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)); } )); @@ -390,39 +402,96 @@ public void Apply(ChangeSet changeSet) /// Action applier that blacklists the current datasource when init occurs or when Off status is seen, /// then disposes the current datasource, goes to the next datasource, and starts it. /// - private class ActionApplierBlacklistWhenSuccessOrOff : IDataSourceObserver + /// + /// When the optional is supplied, this applier defers the + /// Apply-driven advancement onto the actionable's serialized queue and re-checks the latch + /// at queue-processing time. This is necessary because, on a successful FDv2 initializer + /// response that also carries the FDv1 fallback directive, the polling/streaming source + /// fires Apply (this applier) and then UpdateStatus(Off, FDv1Fallback) (the FDv1 fallback + /// applier) on the same propagation chain. Without the deferral, both appliers enqueue + /// independent advancement sequences and the outer composite double-advances past the + /// FDv1 fallback entry. By deferring and consulting the latch when the queued action runs, + /// the FDv1 fallback applier's UpdateStatus has had a chance to set the latch and we + /// no-op, leaving the FDv1 fallback applier as the sole driver of the transition. + /// + internal class ActionApplierBlacklistWhenSuccessOrOff : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; + private readonly FDv1FallbackLatch _latch; public ActionApplierBlacklistWhenSuccessOrOff(ICompositeSourceActionable actionable) + : this(actionable, null) { } + + public ActionApplierBlacklistWhenSuccessOrOff(ICompositeSourceActionable actionable, FDv1FallbackLatch latch) { _actionable = actionable ?? throw new ArgumentNullException(nameof(actionable)); + _latch = latch; } public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { - // When Off status is seen, blacklist current, dispose current, go to next, and start current - if (newState == DataSourceState.Off) - { - _actionable.BlockCurrent(); - _actionable.DisposeCurrent(); - _actionable.GoToNext(); - _actionable.StartCurrent(); - } + // When Off status is seen, blacklist current, dispose current, go to next, and start current. + // Skip when the FDv1 fallback latch is set: the FDv1 fallback applier will perform + // the advancement to the FDv1 fallback entry, and we must not double-advance. + if (newState != DataSourceState.Off) return; + if (_latch != null && _latch.IsTriggered) return; + + _actionable.BlockCurrent(); + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); } public void Apply(ChangeSet changeSet) { // If this change has a selector, then we know we can move out of the current phase. // This doesn't look at the type of the changeset (Full, Partial, None), because having - // a selector means that we have some payload. + // a selector means that we have some payload. // From a forward development perspective this could be because we had a local stale selector which was // persisted in some way, and we are getting up to date via an initializer. if (changeSet.Selector.IsEmpty) return; - _actionable.BlockCurrent(); - _actionable.DisposeCurrent(); - _actionable.GoToNext(); - _actionable.StartCurrent(); + + if (_latch == null) + { + // No FDv1 fallback coordination -- run advancement inline as before. + _actionable.BlockCurrent(); + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); + return; + } + + // Defer advancement onto the actionable's serialized queue. Any sibling applier + // observing the same propagation chain (e.g. FDv1FallbackActionApplier reacting to + // a subsequent UpdateStatus call from the same source) gets a chance to run and + // set the latch before the queued action executes. At execution time, if the latch + // has been triggered we skip our advancement and let the FDv1 fallback applier own + // the transition. + // + // Bounded re-enqueue: the first time the deferred action runs, if the latch is not + // yet set, we re-enqueue ourselves once before checking again. This closes the + // narrow timing window in which the queue's background processor could pick up the + // deferred action before the source thread has finished its synchronous propagation + // (and called UpdateStatus + set the latch). After the second pass, if the latch is + // still unset, no FDv1 fallback signal is coming on this propagation, and we + // proceed with the normal advancement. + var retried = false; + Action deferred = null; + deferred = () => + { + if (_latch.IsTriggered) return; + if (!retried) + { + retried = true; + _actionable.EnqueueAction(deferred); + return; + } + _actionable.BlockCurrent(); + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); + }; + _actionable.EnqueueAction(deferred); } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index ae075091..8c425dcd 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1466,6 +1466,14 @@ public void SetIsAtFirst(bool value) _isAtFirst = value; } + public void EnqueueAction(Action action) + { + // For unit-test mocks we run enqueued actions inline so existing tests that + // assert observable side effects of the action sequence continue to work. + CallSequence.Add(nameof(EnqueueAction)); + action?.Invoke(); + } + public void Reset() { DisposeCurrentCalled = false; @@ -1962,6 +1970,113 @@ public async Task RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterA } } + // Regression test for the Bugbot-flagged double-advance bug: when an initializer applies + // a non-empty changeset (success path) and then reports Off + FDv1Fallback on the same + // propagation chain (mirrors FDv2PollingDataSource handling a 200 response that carries + // the x-ld-fd-fallback header), the outer composite must NOT start the FDv2 synchronizer + // and MUST start the FDv1 fallback synchronizer. The pre-fix behavior had both + // ActionApplierBlacklistWhenSuccessOrOff (via Apply) and FDv1FallbackActionApplier (via + // UpdateStatus) independently enqueue advancement on the outer composite, over-advancing + // past the FDv1 fallback entry. + [Fact] + public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var initializerData = new FullDataSet( + new Dictionary>()); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // The polling-style initializer: applies a non-empty changeset (selector populated) + // and then reports Off + FDv1Fallback on the same Start() invocation. This mirrors + // FDv2PollingDataSource.UpdateTaskAsync when a successful 200 response carries the + // x-ld-fd-fallback header. + SourceFactory pollingInitializerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + initializerData.Data, + null)); + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }); + await Task.Yield(); + }); + + // The FDv2 synchronizer must never be constructed -- if the bug is present and the + // outer composite double-advances after the initializer's Apply, we would land on + // (and start) this synchronizer. + var streamingSynchronizerStarted = false; + SourceFactory streamingSynchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + streamingSynchronizerStarted = true; + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List { pollingInitializerFactory }, + new List { streamingSynchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + // The initializer's Apply propagates first; then UpdateStatus(Off, FDv1Fallback) + // sets the latch. The blacklist applier defers its Apply-driven advancement onto + // the actionable's queue; by the time the queued action runs, the latch is set + // and the deferred advancement is suppressed. Only the FDv1 fallback applier + // drives the transition to the FDv1 fallback entry. + var startResult = await startTask; + Assert.True(startResult, + "Start should complete via the FDv1 fallback synchronizer's Apply"); + + // Two Applies expected on the outer sink: the initializer's data, then the FDv1 + // fallback synchronizer's data. + var firstApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("init-state", firstApply.Selector.State); + var secondApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondApply.Selector.State); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer must be engaged when the initializer's success " + + "response carries the FDv1 directive"); + Assert.False(streamingSynchronizerStarted, + "FDv2 streaming synchronizer must NOT be started: the FDv1 directive should " + + "skip past the synchronizers entry. If the outer composite double-advances " + + "(blacklist applier's Apply-driven advancement is not deferred), this entry " + + "would be reached and started before the FDv1 fallback entry."); + } + finally + { + dataSource.Dispose(); + } + } + // End-to-end test using the REAL FDv2StreamingDataSource: 403 error response with the // x-ld-fd-fallback header. Mirrors the harness suite "directive on streaming error // engages FDv1 fallback". From c235747755a9f1d3e8c091cab3b967c576401d95 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 7 May 2026 11:19:47 -0400 Subject: [PATCH 11/14] refactor: Replace FDv1 fallback latch with BlockAll and ChangeSet directive flag Address review feedback on the cross-applier latch + GatedObserver + deferred-Apply re-enqueue machinery used to coordinate FDv1 fallback. Tag each outer composite entry with a CompositeEntryKind (FDv2 / FDv1Fallback) and add ICompositeSourceActionable.BlockAll(predicate), which removes every matching entry from the source list. The FDv1 fallback applier now calls BlockAll(FDv2) + GoToNext + StartCurrent, which is phase-agnostic and replaces the per-phase _extraEntriesToSkip counting and the initializer-only applier-factory variant. Add a FDv1Fallback flag to ChangeSet so the polling and streaming sources can ride the directive on Apply when a successful response carries the x-ld-fd-fallback header. With the flag inline on the event, the blacklist applier sees it synchronously and bails, and the FDv1 fallback applier triggers off the same Apply -- the success-path race that the latch was solving no longer exists. Each flow-control applier (FastFallback, TimedFallbackAndRecovery, BlacklistWhenSuccessOrOff) now checks newError?.FDv1Fallback and changeSet.FDv1Fallback inline and bails so the FDv1 fallback applier owns the transition uncontested. Drop FDv1FallbackLatch, GatedObserver, the cross-applier latch wiring, the initializer-only fdv1FallbackApplierFactory variant, _extraEntriesToSkip, and the deferred-Apply bounded-re-enqueue branch in ActionApplierBlacklistWhenSuccessOrOff. EnqueueAction is removed from the public actionable interface (it is no longer needed by any applier). Tests: replace FDv1FallbackActionApplierWith*Skip*, GatedObserverSuppressesEventsAfterLatchTriggered with UsesBlockAllAndAdvances, TriggersOnApplyCarryingDirective, and IsIdempotent. The end-to-end Bugbot regression test InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance still passes against the simpler design. --- .../CompositeDataSource/CompositeEntryKind.cs | 23 ++ .../CompositeDataSource/CompositeSource.cs | 72 ++++- .../ICompositeSourceActionable.cs | 19 +- .../CompositeDataSource/SourcesList.cs | 32 +++ .../FDv2ChangeSetTranslator.cs | 7 +- .../FDv2DataSources/FDv2DataSource.cs | 269 +++++------------- .../FDv2DataSources/FDv2PollingDataSource.cs | 23 +- .../FDv2StreamingDataSource.cs | 13 +- .../server/src/Subsystems/DataStoreTypes.cs | 12 +- .../FDv2DataSources/FDv2DataSourceTest.cs | 112 ++++---- 10 files changed, 293 insertions(+), 289 deletions(-) create mode 100644 pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs new file mode 100644 index 00000000..27d5ad64 --- /dev/null +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs @@ -0,0 +1,23 @@ +namespace LaunchDarkly.Sdk.Server.Internal.DataSources +{ + /// + /// Categorizes an entry in a composite source's list. Appliers use this to express + /// "block every FDv2 entry" via + /// without having to count positions or know which phase they were attached at. + /// + internal enum CompositeEntryKind + { + /// + /// Default kind. Entries that participate in the FDv2 protocol -- initializers and + /// FDv2 synchronizers in the outer composite, and any entry that is not the + /// FDv1 fallback synchronizer. + /// + FDv2, + + /// + /// The FDv1 fallback synchronizer entry. Used by the FDv1 fallback applier to + /// distinguish the fallback target from the FDv2 entries it should block. + /// + FDv1Fallback + } +} diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs index 2003f5a2..6f61667b 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs @@ -30,17 +30,18 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable private readonly IDataSourceUpdatesV2 _originalUpdateSink; private readonly IDataSourceUpdatesV2 _sanitizedUpdateSink; - private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> _sourcesList; + private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> _sourcesList; private readonly DisableableDataSourceUpdatesTracker _disableableTracker; // Tracks the entry from the sources list that was used to create the current // data source instance. This allows operations such as blacklist to remove // the correct factory/action-applier-factory tuple from the list. - private (SourceFactory Factory, ActionApplierFactory ActionApplierFactory) _currentEntry; + private (SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind) _currentEntry; private IDataSource _currentDataSource; /// - /// Creates a new . + /// Creates a new . Every entry is treated as kind + /// . /// /// description of the composite source for logging purposes /// the sink that receives updates from the active source @@ -53,6 +54,21 @@ public CompositeSource( IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> factoryTuples, Logger logger, bool circular = true) + : this(compositeDescription, updatesSink, WithDefaultKind(factoryTuples), logger, circular) + { + } + + /// + /// Creates a new with explicit per-entry kinds. The kind is + /// surfaced to so appliers can express + /// "block every FDv2 entry" without needing to know list positions. + /// + public CompositeSource( + string compositeDescription, + IDataSourceUpdatesV2 updatesSink, + IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> factoryTuples, + Logger logger, + bool circular = true) { if (updatesSink is null) { @@ -72,12 +88,24 @@ public CompositeSource( // this tracker is used to disconnect the current source from the updates sink when it is no longer needed. _disableableTracker = new DisableableDataSourceUpdatesTracker(); - _sourcesList = new SourcesList<(SourceFactory SourceFactory, ActionApplierFactory ActionApplierFactory)>( + _sourcesList = new SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>( circular: circular, initialList: factoryTuples ); } + private static IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> + WithDefaultKind(IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> tuples) + { + if (tuples is null) return null; + var result = new List<(SourceFactory, ActionApplierFactory, CompositeEntryKind)>(tuples.Count); + foreach (var t in tuples) + { + result.Add((t.Factory, t.ActionApplierFactory, CompositeEntryKind.FDv2)); + } + return result; + } + /// /// Returns a string representation of this data source for informational purposes. /// @@ -451,14 +479,36 @@ public void BlockCurrent() }); } - // Explicit interface implementation forwards to the private EnqueueAction so callers using the - // ICompositeSourceActionable abstraction can defer arbitrary work onto the same serialized - // action queue. This allows observers to interleave their advancement decisions with other - // queued actions, while keeping the underlying queue plumbing private. - void ICompositeSourceActionable.EnqueueAction(Action action) + public void BlockAll(Predicate kindMatches) { - if (action == null) return; - EnqueueAction(action); + if (kindMatches is null) throw new ArgumentNullException(nameof(kindMatches)); + if (_disposed) + { + return; + } + + EnqueueAction(() => + { + lock (_lock) + { + var removed = _sourcesList.RemoveAll(entry => kindMatches(entry.Kind)); + if (removed == 0) + { + return; + } + + // If the current entry was removed, clear the reference so a subsequent + // BlockCurrent doesn't accidentally remove a remaining entry. The current + // data source is left running -- callers are expected to follow BlockAll + // with DisposeCurrent / GoToNext when they want it torn down. + if (_currentEntry != default && kindMatches(_currentEntry.Kind)) + { + _currentEntry = default; + } + + _log.Debug("{0} blocked {1} entries by kind predicate.", _compositeDescription, removed); + } + }); } private void logTransition(String previousDescription, String currentDescription) { diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs index 4e918963..daabaf8a 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs @@ -41,17 +41,14 @@ internal interface ICompositeSourceActionable void BlockCurrent(); /// - /// Enqueues an arbitrary action to run on the composite's serialized action queue. + /// Removes every entry whose kind matches the predicate from the source list, regardless + /// of its position. The current data source is not disposed -- callers that want the + /// current source torn down should follow this with or + /// . If the current entry was removed, the internal "current entry" + /// reference is cleared so subsequent block operations don't accidentally remove a + /// remaining entry. /// - /// - /// The action runs at queue-processing time, after any actions enqueued earlier on the - /// same queue. Observers can use this to defer advancement decisions until after the - /// current synchronous propagation chain has completed -- for example, an applier that - /// wants to check a latch that is set by a sibling applier in the same propagation. - /// - /// the action to enqueue - void EnqueueAction(Action action); + /// predicate selecting which kinds to remove + void BlockAll(Predicate kindMatches); } } - - diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs index 50f66dc0..ef7289a2 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs @@ -112,6 +112,38 @@ public int IndexOf(T element) return _list.IndexOf(element); } + /// + /// Removes every element that matches the predicate, adjusting the head position so it + /// continues to point at whichever element followed the previous head. Returns the number + /// of elements removed. + /// + public int RemoveAll(Predicate match) + { + if (match is null) throw new ArgumentNullException(nameof(match)); + // Walk back-to-front so a removal never shifts an unvisited index. Adjust _pos as we go + // by mirroring SourcesList.Remove's "if removed index is before head, head moves left". + var removed = 0; + for (var i = _list.Count - 1; i >= 0; i--) + { + if (!match(_list[i])) continue; + _list.RemoveAt(i); + removed++; + if (i < _pos) + { + _pos -= 1; + } + } + if (_list.Count == 0) + { + _pos = 0; + } + else if (_circular && _pos > _list.Count - 1) + { + _pos = 0; + } + return removed; + } + /// /// Reset the head position to the start of the list. /// diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs index cfd40ced..4e5da698 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs @@ -18,12 +18,14 @@ internal static class FDv2ChangeSetTranslator /// The FDv2 changeset to convert. /// Logger for diagnostic messages. /// The environment ID to include in the changeset. + /// Whether to mark the changeset as carrying the FDv1 fallback directive. /// A DataStoreTypes.ChangeSet containing the converted data. /// Thrown when the changeset type is unknown. public static DataStoreTypes.ChangeSet ToChangeSet( FDv2ChangeSet changeset, Logger log, - string environmentId = null) + string environmentId = null, + bool fdv1Fallback = false) { DataStoreTypes.ChangeSetType changeSetType; switch (changeset.Type) @@ -102,7 +104,8 @@ internal static class FDv2ChangeSetTranslator changeSetType, changeset.Selector, dataBuilder.ToImmutable(), - environmentId); + environmentId, + fdv1Fallback); } /// diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index d6af7906..cb9527d3 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -11,6 +11,7 @@ namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources { using FactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>; + using OuterFactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>; internal static partial class FDv2DataSource @@ -32,31 +33,23 @@ public static IDataSource CreateFDv2DataSource( Logger logger) { var sublogger = logger.SubLogger(LogNames.FDv2DataSourceSubLog); - - // Here we make a combined composite source, with the initializer source first which switches or falls back to the - // synchronizer source when the initializer succeeds or when the initializer source reports Off (all initializers failed) - // Shared latch: once the FDv1 fallback applier has fired for any entry, all other - // appliers attached at this level become no-ops. This prevents the existing - // initializer-exhaustion or synchronizer-exhaustion appliers from firing afterwards - // and blocking the FDv1 fallback entry that was just selected. - var fdv1FallbackTriggered = new FDv1FallbackLatch(); + // The flow-control appliers (fast-fallback, blacklist, timed-fallback-and-recovery) + // each watch for the FDv1 directive on their inputs and bail when it is set, so the + // FDv1 fallback applier owns the transition uncontested. The directive arrives either + // on UpdateStatus (errorInfo.FDv1Fallback) or on Apply (changeSet.FDv1Fallback) when + // the response was a successful FDv2 payload that also carried the directive header. ActionApplierFactory blacklistWhenSuccessOrOff = - (actionable) => new ActionApplierBlacklistWhenSuccessOrOff(actionable, fdv1FallbackTriggered); + (actionable) => new ActionApplierBlacklistWhenSuccessOrOff(actionable); ActionApplierFactory fastFallbackApplierFactory = (actionable) => new ActionApplierFastFallback(actionable); ActionApplierFactory timedFallbackAndRecoveryApplierFactory = - (actionable) => new GatedObserver( - new ActionApplierTimedFallbackAndRecovery(actionable, sublogger), fdv1FallbackTriggered); + (actionable) => new ActionApplierTimedFallbackAndRecovery(actionable, sublogger); - // From the synchronizers entry, the FDv1 fallback entry is the next entry in the - // outer list, so no extra entries to skip. + // The FDv1 fallback applier is phase-agnostic: when the directive is observed it + // calls BlockAll(FDv2) to remove every FDv2 entry from the outer list, then GoToNext + // lands on the FDv1 fallback entry (or on exhaustion when none was configured). ActionApplierFactory fdv1FallbackApplierFactory = - (actionable) => new FDv1FallbackActionApplier(actionable, fdv1FallbackTriggered); - // From the initializers entry, the FDv1 fallback entry is two ahead when synchronizers - // are configured (skip past synchronizers), or one ahead when they are not. - var initializerFdv1FallbackExtraSkips = (synchronizers != null && synchronizers.Count > 0) ? 1 : 0; - ActionApplierFactory initializerFdv1FallbackApplierFactory = - (actionable) => new FDv1FallbackActionApplier(actionable, fdv1FallbackTriggered, initializerFdv1FallbackExtraSkips); + (actionable) => new FDv1FallbackActionApplier(actionable); var initializationTracker = new InitializationTracker(Any(initializers), Any(synchronizers)); @@ -67,7 +60,7 @@ public static IDataSource CreateFDv2DataSource( var fallbackSynchronizationObserver = new InitializationObserver(initializationTracker, DataSourceCategory.FallbackSynchronizers); - var underlyingComposites = new FactoryList(); + var underlyingComposites = new OuterFactoryList(); // Only create the initializers composite if initializers are provided if (initializers != null && initializers.Count > 0) @@ -86,27 +79,11 @@ public static IDataSource CreateFDv2DataSource( // The common data source updates implements both IDataSourceUpdates and IDataSourceUpdatesV2. return new CompositeSource("Initializers", sink, initializerFactory, sublogger, circular: false); }, - (actionable) => - { - // Honor the FDv1 fallback directive in the initializer phase too. The - // applier is attached unconditionally: when an FDv1 fallback entry is - // configured the applier advances to it; when one is not, the applier's - // BlockCurrent + DisposeCurrent + GoToNext sequence exhausts the outer - // list and halts the data system per Requirement 1.6.3(4). - // - // Observer order is significant: the FDv1 fallback applier MUST run before - // the blacklist applier so that it has a chance to set the FDv1 fallback - // latch when the source emits UpdateStatus(Off, FDv1Fallback). The - // blacklist applier's UpdateStatus path consults the latch and no-ops when - // it is set, preventing it from concurrently advancing the outer composite - // toward the FDv2 synchronizer entry. (See ActionApplierBlacklistWhenSuccessOrOff - // for the corresponding Apply-path handling, which defers advancement onto - // the actionable's queue so the latch can be checked at execution time.) - return new CompositeObserver( - initializationObserver, - initializerFdv1FallbackApplierFactory(actionable), - blacklistWhenSuccessOrOff(actionable)); - } + (actionable) => new CompositeObserver( + initializationObserver, + fdv1FallbackApplierFactory(actionable), + blacklistWhenSuccessOrOff(actionable)), + CompositeEntryKind.FDv2 )); } @@ -127,20 +104,8 @@ public static IDataSource CreateFDv2DataSource( return new CompositeSource("Synchronizers", sink, synchronizersFactoryTuples, sublogger); }, - (actionable) => - { - // The FDv1 fallback applier is attached unconditionally: when an FDv1 - // fallback entry exists in the outer list the applier advances to it; - // when one does not, BlockCurrent + DisposeCurrent + GoToNext exhausts - // the outer list and halts the data system per Requirement 1.6.3(4). - // Disposing the current synchronizer is what stops the streaming source - // from reconnecting. - // - // Note: the synchronizers entry attaches only the FDv1 fallback applier - // (no blacklist applier), so observer ordering relative to blacklist is - // not a concern here. - return new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)); - } + (actionable) => new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)), + CompositeEntryKind.FDv2 )); } @@ -160,7 +125,9 @@ public static IDataSource CreateFDv2DataSource( } return new CompositeSource("FDv1FallbackSynchronizers", sink, fdv1SynchronizersFactoryTuples, sublogger); - }, (applier) => fallbackSynchronizationObserver + }, + (applier) => fallbackSynchronizationObserver, + CompositeEntryKind.FDv1Fallback )); } @@ -182,6 +149,10 @@ public ActionApplierFastFallback(ICompositeSourceActionable actionable) public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { + // When the FDv1 directive rides on the status, the FDv1 fallback applier owns the + // transition. Bail so we don't double-advance the inner initializers list. + if (newError?.FDv1Fallback == true) return; + // when an initializer has an issue, fall back if (newState == DataSourceState.Interrupted || newState == DataSourceState.Off || newError != null) { @@ -201,6 +172,9 @@ public void Apply(ChangeSet changeSet) return; } + // Empty selector with directive: still defer to the FDv1 fallback applier. + if (changeSet.FDv1Fallback) return; + _actionable.DisposeCurrent(); _actionable.GoToNext(); _actionable.StartCurrent(); @@ -239,6 +213,15 @@ internal ActionApplierTimedFallbackAndRecovery(ICompositeSourceActionable action public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { + // When the FDv1 directive rides on the status, the FDv1 fallback applier owns the + // transition. Bail before scheduling timers or advancing within the inner sync + // list -- otherwise we'd briefly start the next sync before the outer composite + // disposes us. + if (newError?.FDv1Fallback == true) + { + return; + } + lock (_lock) { // If there's a pending fallback task and status is not Interrupted, cancel it @@ -403,38 +386,22 @@ public void Apply(ChangeSet changeSet) /// then disposes the current datasource, goes to the next datasource, and starts it. /// /// - /// When the optional is supplied, this applier defers the - /// Apply-driven advancement onto the actionable's serialized queue and re-checks the latch - /// at queue-processing time. This is necessary because, on a successful FDv2 initializer - /// response that also carries the FDv1 fallback directive, the polling/streaming source - /// fires Apply (this applier) and then UpdateStatus(Off, FDv1Fallback) (the FDv1 fallback - /// applier) on the same propagation chain. Without the deferral, both appliers enqueue - /// independent advancement sequences and the outer composite double-advances past the - /// FDv1 fallback entry. By deferring and consulting the latch when the queued action runs, - /// the FDv1 fallback applier's UpdateStatus has had a chance to set the latch and we - /// no-op, leaving the FDv1 fallback applier as the sole driver of the transition. + /// When the FDv1 directive rides on the input (changeset or error info), this applier + /// bails so the FDv1 fallback applier can drive the transition uncontested. /// internal class ActionApplierBlacklistWhenSuccessOrOff : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; - private readonly FDv1FallbackLatch _latch; public ActionApplierBlacklistWhenSuccessOrOff(ICompositeSourceActionable actionable) - : this(actionable, null) { } - - public ActionApplierBlacklistWhenSuccessOrOff(ICompositeSourceActionable actionable, FDv1FallbackLatch latch) { _actionable = actionable ?? throw new ArgumentNullException(nameof(actionable)); - _latch = latch; } public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { - // When Off status is seen, blacklist current, dispose current, go to next, and start current. - // Skip when the FDv1 fallback latch is set: the FDv1 fallback applier will perform - // the advancement to the FDv1 fallback entry, and we must not double-advance. if (newState != DataSourceState.Off) return; - if (_latch != null && _latch.IsTriggered) return; + if (newError?.FDv1Fallback == true) return; _actionable.BlockCurrent(); _actionable.DisposeCurrent(); @@ -450,148 +417,62 @@ public void Apply(ChangeSet changeSet) // From a forward development perspective this could be because we had a local stale selector which was // persisted in some way, and we are getting up to date via an initializer. if (changeSet.Selector.IsEmpty) return; + if (changeSet.FDv1Fallback) return; - if (_latch == null) - { - // No FDv1 fallback coordination -- run advancement inline as before. - _actionable.BlockCurrent(); - _actionable.DisposeCurrent(); - _actionable.GoToNext(); - _actionable.StartCurrent(); - return; - } - - // Defer advancement onto the actionable's serialized queue. Any sibling applier - // observing the same propagation chain (e.g. FDv1FallbackActionApplier reacting to - // a subsequent UpdateStatus call from the same source) gets a chance to run and - // set the latch before the queued action executes. At execution time, if the latch - // has been triggered we skip our advancement and let the FDv1 fallback applier own - // the transition. - // - // Bounded re-enqueue: the first time the deferred action runs, if the latch is not - // yet set, we re-enqueue ourselves once before checking again. This closes the - // narrow timing window in which the queue's background processor could pick up the - // deferred action before the source thread has finished its synchronous propagation - // (and called UpdateStatus + set the latch). After the second pass, if the latch is - // still unset, no FDv1 fallback signal is coming on this propagation, and we - // proceed with the normal advancement. - var retried = false; - Action deferred = null; - deferred = () => - { - if (_latch.IsTriggered) return; - if (!retried) - { - retried = true; - _actionable.EnqueueAction(deferred); - return; - } - _actionable.BlockCurrent(); - _actionable.DisposeCurrent(); - _actionable.GoToNext(); - _actionable.StartCurrent(); - }; - _actionable.EnqueueAction(deferred); - } - } - - /// - /// Single-shot latch shared between the FDv1 fallback applier and the surrounding - /// fallback/recovery appliers. Once the FDv1 fallback directive is observed at any entry, - /// other appliers stop reacting -- they would otherwise observe the now-unwanted Off - /// signals from previously running data sources and try to advance the composite again. - /// - internal sealed class FDv1FallbackLatch - { - private int _triggered; - - /// - /// Returns whether the latch has already been triggered. - /// - public bool IsTriggered => System.Threading.Volatile.Read(ref _triggered) != 0; - - /// - /// Atomically sets the latch. Returns true if this call was the one that set it. - /// - public bool TryTrigger() => System.Threading.Interlocked.CompareExchange(ref _triggered, 1, 0) == 0; - } - - /// - /// Wraps another observer and suppresses both Apply and UpdateStatus calls once the FDv1 - /// fallback latch has been triggered. The latch is set by - /// when it observes the FDv1 fallback directive. - /// - internal sealed class GatedObserver : IDataSourceObserver - { - private readonly IDataSourceObserver _inner; - private readonly FDv1FallbackLatch _latch; - - public GatedObserver(IDataSourceObserver inner, FDv1FallbackLatch latch) - { - _inner = inner ?? throw new ArgumentNullException(nameof(inner)); - _latch = latch ?? throw new ArgumentNullException(nameof(latch)); - } - - public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) - { - if (_latch.IsTriggered) return; - _inner.UpdateStatus(newState, newError); - } - - public void Apply(ChangeSet changeSet) - { - if (_latch.IsTriggered) return; - _inner.Apply(changeSet); + _actionable.BlockCurrent(); + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); } } /// /// Action applier that observes an FDv1 fallback signal and advances the outer composite - /// to the FDv1 fallback synchronizer entry, blocking the current entry and any number of - /// intermediate entries that should also be skipped. + /// to the FDv1 fallback synchronizer entry. The directive may arrive either on + /// (errorInfo.FDv1Fallback) or on + /// (changeSet.FDv1Fallback) when a successful payload also carried the directive header. /// /// - /// When attached to the synchronizers entry of the outer FDv2 composite, the FDv1 fallback - /// entry is the next one in the list, so extraEntriesToSkip is 0. When attached to - /// the initializers entry and the synchronizers entry is also configured, we have to skip - /// past it, so extraEntriesToSkip is 1. + /// Phase-agnostic: when attached to either the initializers entry or the synchronizers + /// entry of the outer FDv2 composite, + /// removes every FDv2 entry in one shot, leaving the FDv1 fallback entry (if configured) + /// as the next stop. When no FDv1 fallback entry was configured the outer list is exhausted + /// and the composite halts the data system. + /// + /// The single-shot _triggered flag makes this applier idempotent in case the directive + /// arrives more than once on the same propagation chain (for example, on a successful FDv2 + /// response the source emits Apply with the flag and then UpdateStatus(Off, FDv1Fallback) + /// during shutdown). /// internal class FDv1FallbackActionApplier : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; - private readonly FDv1FallbackLatch _latch; - private readonly int _extraEntriesToSkip; + private int _triggered; - public FDv1FallbackActionApplier(ICompositeSourceActionable actionable, FDv1FallbackLatch latch = null, int extraEntriesToSkip = 0) + public FDv1FallbackActionApplier(ICompositeSourceActionable actionable) { _actionable = actionable ?? throw new ArgumentNullException(nameof(actionable)); - if (extraEntriesToSkip < 0) - { - throw new ArgumentOutOfRangeException(nameof(extraEntriesToSkip)); - } - _latch = latch ?? new FDv1FallbackLatch(); - _extraEntriesToSkip = extraEntriesToSkip; } public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { - if (newError == null || !newError.Value.FDv1Fallback) return; - if (!_latch.TryTrigger()) return; - - _actionable.BlockCurrent(); // blacklist the current entry - _actionable.DisposeCurrent(); // dispose the current data source - for (var i = 0; i < _extraEntriesToSkip; i++) - { - _actionable.GoToNext(); // advance to the entry we are skipping past - _actionable.BlockCurrent(); // remove that entry from the list too - } - _actionable.GoToNext(); // go to the FDv1 fallback synchronizer entry - _actionable.StartCurrent(); // start the FDv1 fallback synchronizer + if (newError?.FDv1Fallback != true) return; + Trigger(); } public void Apply(ChangeSet changeSet) { - // this FDv1 fallback action applier doesn't care about apply, it only looks for the FDv1Fallback flag in the errors + if (!changeSet.FDv1Fallback) return; + Trigger(); + } + + private void Trigger() + { + if (Interlocked.CompareExchange(ref _triggered, 1, 0) != 0) return; + + _actionable.BlockAll(kind => kind == CompositeEntryKind.FDv2); + _actionable.GoToNext(); + _actionable.StartCurrent(); } } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs index 789a5207..ae24b4ba 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs @@ -100,12 +100,13 @@ private async Task UpdateTaskAsync() ?.FirstOrDefault(); } - ProcessPollingResponse(response.Value); + var fdv1Fallback = HasFDv1FallbackHeader(response.Value.Headers); + ProcessPollingResponse(response.Value, fdv1Fallback); - // The server can ask the SDK to fall back to FDv1 even on a successful response. - // Apply the payload first (above), then signal the fallback so the action applier - // can switch to the FDv1 fallback synchronizer. - if (HasFDv1FallbackHeader(response.Value.Headers)) + // On a successful response carrying the directive, the changeset above already + // rode the FDv1Fallback flag through to the appliers, which trigger the fallback + // transition. Tear down the polling loop here so we stop hitting the server. + if (fdv1Fallback) { _log.Info("LaunchDarkly polling response indicates fallback to FDv1"); var fallbackError = new DataSourceStatus.ErrorInfo @@ -174,7 +175,7 @@ private async Task UpdateTaskAsync() } } - private void ProcessPollingResponse(FDv2PollingResponse response) + private void ProcessPollingResponse(FDv2PollingResponse response, bool fdv1Fallback) { lock (_protocolLock) { @@ -183,7 +184,7 @@ private void ProcessPollingResponse(FDv2PollingResponse response) foreach (var evt in response.Events) { var action = _protocolHandler.HandleEvent(evt); - ProcessProtocolAction(action); + ProcessProtocolAction(action, fdv1Fallback); } } } @@ -202,12 +203,12 @@ private void HandleJsonError(string message) _dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo); } - private void ProcessProtocolAction(IFDv2ProtocolAction action) + private void ProcessProtocolAction(IFDv2ProtocolAction action, bool fdv1Fallback) { switch (action) { case FDv2ActionChangeset changesetAction: - ProcessChangeSet(changesetAction.Changeset); + ProcessChangeSet(changesetAction.Changeset, fdv1Fallback); break; case FDv2ActionError errorAction: _log.Error("FDv2 error event: {0} - {1}", errorAction.Id, errorAction.Reason); @@ -235,12 +236,12 @@ private void ProcessProtocolAction(IFDv2ProtocolAction action) } } - private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet) + private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet, bool fdv1Fallback) { if (!(_dataSourceUpdates is ITransactionalDataSourceUpdates transactionalDataSourceUpdates)) throw new InvalidOperationException("Cannot apply updates to non-transactional data source"); - var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId); + var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId, fdv1Fallback); // If the update fails, then we wait until the next poll and try again. // This is different from a streaming data source, which will need to re-start to get an initial diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs index 5f14d2b4..816c7389 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs @@ -223,19 +223,20 @@ private void OnMessage(object sender, MessageReceivedEventArgs e) case FDv2ActionChangeset changeAction: { var changeset = changeAction.Changeset; + var fdv1Fallback = _fdv1FallbackRequested; var storeError = !_transactionalDataSourceUpdates.Apply( - FDv2ChangeSetTranslator.ToChangeSet(changeAction.Changeset, _log, _environmentId)); + FDv2ChangeSetTranslator.ToChangeSet(changeAction.Changeset, _log, _environmentId, fdv1Fallback)); if (!storeError) { _lastStoreUpdateFailed.GetAndSet(false); MaybeMarkInitialized(); - // The server may include the FDv1 fallback header on a successful - // streaming response. Apply the payload first (above), then shut the - // stream down with FDv1Fallback=true so the action applier swaps to the - // FDv1 fallback synchronizer. - if (_fdv1FallbackRequested) + // On a successful response carrying the FDv1 directive, the changeset + // above already rode the FDv1Fallback flag through to the appliers, which + // trigger the fallback transition. Tear down the stream here so we stop + // reconnecting. + if (fdv1Fallback) { _log.Info("LaunchDarkly streaming response indicates fallback to FDv1"); var fallbackError = new DataSourceStatus.ErrorInfo diff --git a/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs b/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs index 3b943c87..70450eac 100644 --- a/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs +++ b/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs @@ -436,13 +436,23 @@ public struct ChangeSet /// public IEnumerable>> Data { get; } + /// + /// Whether the response that produced this changeset also carried the FDv1 fallback + /// directive. When true, the data system applies the payload but should hand control + /// to the FDv1 fallback synchronizer instead of continuing the FDv2 protocol. Defaults + /// to false. + /// + public bool FDv1Fallback { get; } + internal ChangeSet(ChangeSetType type, Selector selector, - IEnumerable>> data, string environmentId) + IEnumerable>> data, string environmentId, + bool fdv1Fallback = false) { Type = type; Selector = selector; Data = data; EnvironmentId = environmentId; + FDv1Fallback = fdv1Fallback; } } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 8c425dcd..0ea9d7c0 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1418,8 +1418,11 @@ private class MockCompositeSourceActionable : ICompositeSourceActionable public bool GoToFirstCalled { get; private set; } public bool StartCurrentCalled { get; private set; } public bool BlockCurrentCalled { get; private set; } + public bool BlockAllCalled { get; private set; } public int GoToNextCallCount { get; private set; } public int BlockCurrentCallCount { get; private set; } + public int BlockAllCallCount { get; private set; } + public Predicate LastBlockAllPredicate { get; private set; } public List CallSequence { get; } = new List(); private bool _isAtFirst = false; @@ -1466,12 +1469,12 @@ public void SetIsAtFirst(bool value) _isAtFirst = value; } - public void EnqueueAction(Action action) + public void BlockAll(Predicate kindMatches) { - // For unit-test mocks we run enqueued actions inline so existing tests that - // assert observable side effects of the action sequence continue to work. - CallSequence.Add(nameof(EnqueueAction)); - action?.Invoke(); + BlockAllCalled = true; + BlockAllCallCount++; + LastBlockAllPredicate = kindMatches; + CallSequence.Add(nameof(BlockAll)); } public void Reset() @@ -1481,8 +1484,11 @@ public void Reset() GoToFirstCalled = false; StartCurrentCalled = false; BlockCurrentCalled = false; + BlockAllCalled = false; GoToNextCallCount = 0; BlockCurrentCallCount = 0; + BlockAllCallCount = 0; + LastBlockAllPredicate = null; CallSequence.Clear(); _isAtFirst = false; } @@ -1507,9 +1513,11 @@ public void FDv1FallbackActionApplierIgnoresStatusWithoutFDv1Fallback() } [Fact] - public void FDv1FallbackActionApplierWithDefaultSkipMovesToNextEntry() + public void FDv1FallbackActionApplierUsesBlockAllAndAdvances() { - // Default behavior (synchronizers entry): block current, dispose, go to next, start. + // The applier blocks every FDv2 entry from the outer list (so the FDv1 fallback + // entry is the next stop), then advances to it. No skip-counting -- the entry list + // mutation handles both initializer-phase and synchronizer-phase wirings uniformly. var mockActionable = new MockCompositeSourceActionable(); var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); @@ -1518,53 +1526,54 @@ public void FDv1FallbackActionApplierWithDefaultSkipMovesToNextEntry() new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); Assert.Equal( - new List { "BlockCurrent", "DisposeCurrent", "GoToNext", "StartCurrent" }, + new List { "BlockAll", "GoToNext", "StartCurrent" }, mockActionable.CallSequence); + Assert.NotNull(mockActionable.LastBlockAllPredicate); + Assert.True(mockActionable.LastBlockAllPredicate(CompositeEntryKind.FDv2)); + Assert.False(mockActionable.LastBlockAllPredicate(CompositeEntryKind.FDv1Fallback)); } [Fact] - public void FDv1FallbackActionApplierWithExtraSkipsAdvancesPastIntermediateEntries() + public void FDv1FallbackActionApplierTriggersOnApplyCarryingDirective() { - // From the initializers entry with synchronizers configured, the applier must skip - // past the synchronizers entry to land on the FDv1 fallback entry. + // Successful FDv2 responses that also carry the directive ride the flag on the + // ChangeSet rather than UpdateStatus. The applier must trigger from either path. var mockActionable = new MockCompositeSourceActionable(); - var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable, extraEntriesToSkip: 1); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); - applier.UpdateStatus( - DataSourceState.Interrupted, - new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + applier.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + new Dictionary>(), + null, + fdv1Fallback: true)); Assert.Equal( - new List { "BlockCurrent", "DisposeCurrent", "GoToNext", "BlockCurrent", "GoToNext", "StartCurrent" }, + new List { "BlockAll", "GoToNext", "StartCurrent" }, mockActionable.CallSequence); - Assert.Equal(2, mockActionable.GoToNextCallCount); - Assert.Equal(2, mockActionable.BlockCurrentCallCount); } [Fact] - public void GatedObserverSuppressesEventsAfterLatchTriggered() + public void FDv1FallbackActionApplierIsIdempotent() { - // The latch coordinates the FDv1 fallback applier with the surrounding fallback / - // recovery appliers: once one entry's applier has triggered the fallback, others must - // not respond to the now-stale Off signals from disposed entries. - var inner = new CapturingObserver(); - var latch = new FDv2DataSource.FDv1FallbackLatch(); - var gated = new FDv2DataSource.GatedObserver(inner, latch); - - // Before triggering: events flow through. - gated.UpdateStatus(DataSourceState.Off, new DataSourceStatus.ErrorInfo()); - Assert.Equal(1, inner.UpdateStatusCallCount); - - // Trigger the latch (e.g. FDv1FallbackActionApplier observed the directive). - Assert.True(latch.TryTrigger()); - - // After triggering: events are suppressed. - gated.UpdateStatus(DataSourceState.Off, new DataSourceStatus.ErrorInfo()); - gated.Apply(new ChangeSet( - ChangeSetType.Full, Selector.Empty, - new Dictionary>(), null)); - Assert.Equal(1, inner.UpdateStatusCallCount); - Assert.Equal(0, inner.ApplyCallCount); + // On a successful response the source emits Apply(directive) and then + // UpdateStatus(Off, FDv1Fallback) during shutdown. The applier must trigger only + // once; the second event is a no-op so we don't double-advance off the FDv1 entry. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + applier.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + new Dictionary>(), + null, + fdv1Fallback: true)); + applier.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + + Assert.Equal(1, mockActionable.BlockAllCallCount); + Assert.Equal(1, mockActionable.GoToNextCallCount); } // End-to-end test: synchronizer reports Off+FDv1Fallback (mirrors the FDv2 streaming @@ -1970,14 +1979,12 @@ public async Task RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterA } } - // Regression test for the Bugbot-flagged double-advance bug: when an initializer applies - // a non-empty changeset (success path) and then reports Off + FDv1Fallback on the same - // propagation chain (mirrors FDv2PollingDataSource handling a 200 response that carries - // the x-ld-fd-fallback header), the outer composite must NOT start the FDv2 synchronizer - // and MUST start the FDv1 fallback synchronizer. The pre-fix behavior had both - // ActionApplierBlacklistWhenSuccessOrOff (via Apply) and FDv1FallbackActionApplier (via - // UpdateStatus) independently enqueue advancement on the outer composite, over-advancing - // past the FDv1 fallback entry. + // Regression test for the Bugbot-flagged double-advance bug. The polling source on a + // successful response with the x-ld-fd-fallback header now rides the directive on the + // ChangeSet (see FDv2PollingDataSource), so the blacklist applier and the FDv1 fallback + // applier both observe it inline: the blacklist bails and the FDv1 fallback applier + // calls BlockAll(FDv2) before advancing. The outer composite must NOT start the FDv2 + // synchronizer and MUST start the FDv1 fallback synchronizer. [Fact] public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance() { @@ -2046,11 +2053,10 @@ public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance( { var startTask = dataSource.Start(); - // The initializer's Apply propagates first; then UpdateStatus(Off, FDv1Fallback) - // sets the latch. The blacklist applier defers its Apply-driven advancement onto - // the actionable's queue; by the time the queued action runs, the latch is set - // and the deferred advancement is suppressed. Only the FDv1 fallback applier - // drives the transition to the FDv1 fallback entry. + // The initializer's Apply rides the FDv1Fallback flag on the ChangeSet, so the + // blacklist applier sees it inline and bails. The FDv1 fallback applier triggers + // off the same Apply, calls BlockAll(FDv2) to remove the synchronizers entry, + // then advances to the FDv1 fallback entry uncontested. var startResult = await startTask; Assert.True(startResult, "Start should complete via the FDv1 fallback synchronizer's Apply"); From e86a90006e32a86df28f2f3bb3db8c71ce7c65a4 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 8 May 2026 12:22:14 -0400 Subject: [PATCH 12/14] fix: Address review feedback from second review round on FDv1 fallback refactor Tighten the CompositeSource API and apply the small comment / defensive cleanup items called out by tanderson-ld and Cursor on the prior commit. Drop the 2-tuple CompositeSource constructor and the WithDefaultKind helper. CompositeSource is internal-only, so all callers now pass a 3-tuple list with explicit CompositeEntryKind. Inner sub-composites in FDv2DataSource pass FDv2 uniformly (the kind is inert for inner composites whose appliers never call BlockAll). Update the six test declarations in CompositeSourceTest.cs to match. Soften SourcesList.RemoveAll(null) to return 0 rather than throw -- a null matcher matches nothing, so there is nothing to remove. Add 9 unit tests for RemoveAll covering the head-position edge cases: matches none, matches all, matches only entries before / after / at the head, single-element list, empty list, and a circular wrap case. Restore the deleted "When Off status is seen, blacklist current..." comment in ActionApplierBlacklistWhenSuccessOrOff.UpdateStatus. In FDv1FallbackActionApplier.Trigger, call DisposeCurrent() between BlockAll and GoToNext as a defensive guard against a source that does not shut itself down. DisposeCurrent is idempotent so this is safe. Update the matching unit-test CallSequence assertions. In FDv2StreamingDataSource, drop the redundant _initTask.TrySetResult inside the FDv1 fallback branch -- MaybeMarkInitialized() on the preceding line already covers it. Update the InitializerSuccess... regression test so the mock initializer sets fdv1Fallback: true on its ChangeSet, mirroring what FDv2PollingDataSource does in production. Without this, the test exercised the pre-refactor race-prone path rather than the new inline-bail path. --- .../CompositeDataSource/CompositeSource.cs | 36 +----- .../CompositeDataSource/SourcesList.cs | 2 +- .../FDv2DataSources/FDv2DataSource.cs | 21 +++- .../FDv2StreamingDataSource.cs | 1 - .../CompositeSourceTest.cs | 34 +++--- .../CompositeDataSource/SourcesListTest.cs | 115 ++++++++++++++++++ .../FDv2DataSources/FDv2DataSourceTest.cs | 15 ++- 7 files changed, 163 insertions(+), 61 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs index 6f61667b..69b869b5 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs @@ -40,29 +40,17 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable private IDataSource _currentDataSource; /// - /// Creates a new . Every entry is treated as kind - /// . + /// Creates a new . Each entry carries an explicit + /// so appliers can express "block every FDv2 entry" + /// via without needing to know list + /// positions. For inner sub-composites whose entries never participate in cross-kind + /// blocking, callers should pass uniformly. /// /// description of the composite source for logging purposes /// the sink that receives updates from the active source - /// the ordered list of source factories and their associated action applier factories + /// the ordered list of source factories, action applier factories, and entry kinds /// the logger instance to use /// whether to loop off the end of the list back to the start when fallback occurs - public CompositeSource( - string compositeDescription, - IDataSourceUpdatesV2 updatesSink, - IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> factoryTuples, - Logger logger, - bool circular = true) - : this(compositeDescription, updatesSink, WithDefaultKind(factoryTuples), logger, circular) - { - } - - /// - /// Creates a new with explicit per-entry kinds. The kind is - /// surfaced to so appliers can express - /// "block every FDv2 entry" without needing to know list positions. - /// public CompositeSource( string compositeDescription, IDataSourceUpdatesV2 updatesSink, @@ -94,18 +82,6 @@ public CompositeSource( ); } - private static IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> - WithDefaultKind(IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> tuples) - { - if (tuples is null) return null; - var result = new List<(SourceFactory, ActionApplierFactory, CompositeEntryKind)>(tuples.Count); - foreach (var t in tuples) - { - result.Add((t.Factory, t.ActionApplierFactory, CompositeEntryKind.FDv2)); - } - return result; - } - /// /// Returns a string representation of this data source for informational purposes. /// diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs index ef7289a2..e698ed28 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs @@ -119,7 +119,7 @@ public int IndexOf(T element) /// public int RemoveAll(Predicate match) { - if (match is null) throw new ArgumentNullException(nameof(match)); + if (match is null) return 0; // Walk back-to-front so a removal never shifts an unvisited index. Adjust _pos as we go // by mirroring SourcesList.Remove's "if removed index is before head, head moves left". var removed = 0; diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index cb9527d3..a1d1d47d 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -10,8 +10,7 @@ namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources { - using FactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>; - using OuterFactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>; + using FactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>; internal static partial class FDv2DataSource @@ -60,7 +59,7 @@ public static IDataSource CreateFDv2DataSource( var fallbackSynchronizationObserver = new InitializationObserver(initializationTracker, DataSourceCategory.FallbackSynchronizers); - var underlyingComposites = new OuterFactoryList(); + var underlyingComposites = new FactoryList(); // Only create the initializers composite if initializers are provided if (initializers != null && initializers.Count > 0) @@ -73,7 +72,8 @@ public static IDataSource CreateFDv2DataSource( for (int i = 0; i < initializers.Count; i++) { initializerFactory.Add((initializers[i], - fastFallbackApplierFactory)); + fastFallbackApplierFactory, + CompositeEntryKind.FDv2)); } // The common data source updates implements both IDataSourceUpdates and IDataSourceUpdatesV2. @@ -99,7 +99,8 @@ public static IDataSource CreateFDv2DataSource( for (int i = 0; i < synchronizers.Count; i++) { synchronizersFactoryTuples.Add((synchronizers[i], - timedFallbackAndRecoveryApplierFactory)); + timedFallbackAndRecoveryApplierFactory, + CompositeEntryKind.FDv2)); } return new CompositeSource("Synchronizers", sink, synchronizersFactoryTuples, sublogger); @@ -120,8 +121,11 @@ public static IDataSource CreateFDv2DataSource( new FactoryList(); for (int i = 0; i < fdv1Synchronizers.Count; i++) { + // The Kind is irrelevant inside this inner sub-composite (its + // appliers never call BlockAll); FDv2 is the inert default. fdv1SynchronizersFactoryTuples.Add((fdv1Synchronizers[i], - timedFallbackAndRecoveryApplierFactory)); // fdv1 synchronizers behave same as synchronizers + timedFallbackAndRecoveryApplierFactory, + CompositeEntryKind.FDv2)); } return new CompositeSource("FDv1FallbackSynchronizers", sink, fdv1SynchronizersFactoryTuples, sublogger); @@ -403,6 +407,7 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n if (newState != DataSourceState.Off) return; if (newError?.FDv1Fallback == true) return; + // When Off status is seen, blacklist current, dispose current, go to next, and start current _actionable.BlockCurrent(); _actionable.DisposeCurrent(); _actionable.GoToNext(); @@ -471,6 +476,10 @@ private void Trigger() if (Interlocked.CompareExchange(ref _triggered, 1, 0) != 0) return; _actionable.BlockAll(kind => kind == CompositeEntryKind.FDv2); + // DisposeCurrent is defensive: a well-behaved source will have already shut + // itself down by the time the directive is observed, but DisposeCurrent is + // idempotent and guards against sources that don't. + _actionable.DisposeCurrent(); _actionable.GoToNext(); _actionable.StartCurrent(); } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs index 816c7389..4615e145 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs @@ -245,7 +245,6 @@ private void OnMessage(object sender, MessageReceivedEventArgs e) Time = DateTime.Now, FDv1Fallback = true }; - _initTask.TrySetResult(true); Shutdown(fallbackError); return; } diff --git a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs index 9545d6c7..1ecebf6c 100644 --- a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs @@ -65,11 +65,11 @@ public async Task CanFallbackOnInterrupted() }; // Create CompositeSource with three factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, actionApplierFactory), - (secondSourceFactory, actionApplierFactory), - (thirdSourceFactory, actionApplierFactory) + (firstSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (thirdSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -147,10 +147,10 @@ public async Task BlacklistsDataSourceFactoryAfterOffState() }; // Create CompositeSource with two factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, firstActionApplierFactory), - (secondSourceFactory, secondActionApplierFactory) + (firstSourceFactory, firstActionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, secondActionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger, circular: true); @@ -261,10 +261,10 @@ public async Task DisabledDataSourceCannotTriggerActions() }; // Create CompositeSource with two factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, firstActionApplierFactory), - (secondSourceFactory, secondActionApplierFactory) + (firstSourceFactory, firstActionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, secondActionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger, circular: true); @@ -339,9 +339,9 @@ public async Task DisposeReportsOffState() ActionApplierFactory actionApplierFactory = (actionable) => { return new MockActionApplier(actionable); }; // Create CompositeSource with one factory tuple - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (sourceFactory, actionApplierFactory) + (sourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -416,11 +416,11 @@ public async Task AllThreeSourcesFailReportsOffWithExhaustedMessage() }; // Create CompositeSource with three factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, actionApplierFactory), - (secondSourceFactory, actionApplierFactory), - (thirdSourceFactory, actionApplierFactory) + (firstSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (thirdSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -471,7 +471,7 @@ public async Task NoSourcesProvidedReportsOffWithExhaustedMessage() var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); // Create CompositeSource with empty factory tuples list - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>(); + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>(); var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); diff --git a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs index a17462b7..ce2ea494 100644 --- a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs +++ b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs @@ -184,6 +184,121 @@ public void NonCircularListReturnsNullAfterConsumingAllElements() Assert.Null(underTest.Next()); Assert.Null(underTest.Next()); } + + [Fact] + public void RemoveAllNullPredicateRemovesNothing() + { + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal(0, underTest.RemoveAll(null)); + Assert.Equal(3, underTest.Length); + Assert.Equal("1", underTest.Next()); + } + + [Fact] + public void RemoveAllOnEmptyListIsNoOp() + { + var underTest = new SourcesList(false); + Assert.Equal(0, underTest.RemoveAll(_ => true)); + Assert.Equal(0, underTest.Length); + Assert.Null(underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesNoneLeavesListIntact() + { + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); // head -> 1 (pos = 1 after Next) + + Assert.Equal(0, underTest.RemoveAll(s => s == "missing")); + Assert.Equal(3, underTest.Length); + + // Head is unchanged: next yields the previous next ("2"). + Assert.Equal("2", underTest.Next()); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesEverythingClearsListAndResetsHead() + { + var underTest = new SourcesList(true, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + + Assert.Equal(3, underTest.RemoveAll(_ => true)); + Assert.Equal(0, underTest.Length); + Assert.Equal(0, underTest.Pos); + Assert.Null(underTest.Next()); + Assert.Null(underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesOnlyEntriesBeforeHeadAdjustsPos() + { + // After consuming "1" and "2", head is at index 2 ("3"). Remove "1" and "2": list + // becomes ["3"] and head must point at "3" (pos = 0) so the next Next() returns "3". + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + + Assert.Equal(2, underTest.RemoveAll(s => s == "1" || s == "2")); + Assert.Equal(1, underTest.Length); + Assert.Equal(0, underTest.Pos); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesOnlyEntriesAfterHeadLeavesPosUnchanged() + { + // Head at index 1 ("2"). Removing entries that all live at indices > pos must not + // shift pos -- the next Next() should still return "2". + var underTest = new SourcesList(false, new[] { "1", "2", "3", "4" }); + Assert.Equal("1", underTest.Next()); // pos = 1 + + Assert.Equal(2, underTest.RemoveAll(s => s == "3" || s == "4")); + Assert.Equal(2, underTest.Length); + Assert.Equal(1, underTest.Pos); + Assert.Equal("2", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesHeadButNotPosIndex() + { + // Head at index 1 ("2"). Removing "2" itself: pos was 1; removed index 1 is NOT + // before pos, so pos stays at 1, but the list shrinks so pos lands on the element + // that used to be at index 2 ("3"). Subsequent Next() returns "3". + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); // pos = 1, head -> "2" + + Assert.Equal(1, underTest.RemoveAll(s => s == "2")); + Assert.Equal(2, underTest.Length); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllCircularResetsPosWhenListShrinksBelowOldPos() + { + // Circular list, all positions consumed; pos has wrapped. After RemoveAll leaves a + // single element, pos must be reset to 0 so Next() returns it. + var underTest = new SourcesList(true, new[] { "1", "2", "3", "4" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + Assert.Equal("3", underTest.Next()); + Assert.Equal("4", underTest.Next()); // circular: pos wraps back to 0 + + Assert.Equal(3, underTest.RemoveAll(s => s != "2")); + Assert.Equal(1, underTest.Length); + Assert.Equal("2", underTest.Next()); + } + + [Fact] + public void RemoveAllReturnsCountOfMatchedEntries() + { + var underTest = new SourcesList(false, new[] { "a", "b", "a", "c", "a" }); + Assert.Equal(3, underTest.RemoveAll(s => s == "a")); + Assert.Equal(2, underTest.Length); + Assert.Equal("b", underTest.Next()); + Assert.Equal("c", underTest.Next()); + } } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 0ea9d7c0..931a306a 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1526,7 +1526,7 @@ public void FDv1FallbackActionApplierUsesBlockAllAndAdvances() new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); Assert.Equal( - new List { "BlockAll", "GoToNext", "StartCurrent" }, + new List { "BlockAll", "DisposeCurrent", "GoToNext", "StartCurrent" }, mockActionable.CallSequence); Assert.NotNull(mockActionable.LastBlockAllPredicate); Assert.True(mockActionable.LastBlockAllPredicate(CompositeEntryKind.FDv2)); @@ -1549,7 +1549,7 @@ public void FDv1FallbackActionApplierTriggersOnApplyCarryingDirective() fdv1Fallback: true)); Assert.Equal( - new List { "BlockAll", "GoToNext", "StartCurrent" }, + new List { "BlockAll", "DisposeCurrent", "GoToNext", "StartCurrent" }, mockActionable.CallSequence); } @@ -1994,10 +1994,12 @@ public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance( var fdv1Data = new FullDataSet( new Dictionary>()); - // The polling-style initializer: applies a non-empty changeset (selector populated) - // and then reports Off + FDv1Fallback on the same Start() invocation. This mirrors + // The polling-style initializer: applies a non-empty changeset with the FDv1 + // directive flag set, then reports Off + FDv1Fallback during shutdown. This mirrors // FDv2PollingDataSource.UpdateTaskAsync when a successful 200 response carries the - // x-ld-fd-fallback header. + // x-ld-fd-fallback header. The flag on the changeset is what makes the blacklist + // applier bail synchronously; without it the test would exercise the old race-prone + // path instead of the new bail path. SourceFactory pollingInitializerFactory = (updatesSink) => new MockDataSourceWithInit(async () => { @@ -2005,7 +2007,8 @@ public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance( ChangeSetType.Full, Selector.Make(1, "init-state"), initializerData.Data, - null)); + null, + fdv1Fallback: true)); updatesSink.UpdateStatus( DataSourceState.Off, new DataSourceStatus.ErrorInfo From d9a0697f1202948ca04b3b657c04cd8200593c0d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 11 May 2026 10:31:24 -0400 Subject: [PATCH 13/14] fix: Fail tracker when FDv1 directive fires but no fallback is configured The FDv1 fallback applier is now attached to both the initializers and synchronizers entries unconditionally. When the directive fires and no FDv1 fallback synchronizer was configured, BlockAll(FDv2) exhausts the outer composite. Exhaustion-driven Off goes directly to the external sink, bypassing the InitializationTracker's observers, so the tracker permanently stays in FallingBack and dataSource.Start() never resolves. Pass hasFdv1Fallback to InitializationTracker. When a transition would enter FallingBack without a fallback configured, transition to Failed instead so Start() resolves with false (matching the pre-refactor behavior where the applier was conditionally attached and the error propagated through SynchronizersExhausted -> Failed). Strengthen SynchronizerFDv1FallbackWithoutFallbackConfiguredHaltsDataSystem to assert that Start() resolves within 5s with false -- without the fix the test hangs and times out. --- .../FDv2DataSource.InitializationTracker.cs | 19 ++++++++++++++++--- .../FDv2DataSources/FDv2DataSource.cs | 2 +- .../FDv2DataSources/FDv2DataSourceTest.cs | 11 ++++++++++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs index 924ad132..7ca36ab6 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs @@ -29,6 +29,7 @@ private class InitializationTracker : IDisposable private bool _initializersRemain; private bool _synchronizersRemain; + private readonly bool _hasFdv1Fallback; private enum State { @@ -96,8 +97,10 @@ private enum Action FallingBack, } - public InitializationTracker(bool hasInitializers, bool hasSynchronizers) + public InitializationTracker(bool hasInitializers, bool hasSynchronizers, bool hasFdv1Fallback) { + _hasFdv1Fallback = hasFdv1Fallback; + if (!(hasInitializers || hasSynchronizers)) { // If we have no data sources, then we are immediately initialized. @@ -115,6 +118,16 @@ public InitializationTracker(bool hasInitializers, bool hasSynchronizers) } } + /// + /// Resolves the target state for a transition that would otherwise enter + /// . When no FDv1 fallback synchronizer is configured, + /// there is no entry left to drive the tracker out of + /// (the outer composite's exhaustion-driven Off goes directly to the external sink, + /// bypassing the tracker's observers), so we transition straight to + /// and complete the task with false. + /// + private State FallingBackOrFailed() => _hasFdv1Fallback ? State.FallingBack : State.Failed; + public Task Task => _taskCompletionSource.Task; private void HandleRemainingSources() @@ -143,7 +156,7 @@ private void DetermineState(Action action) _state = State.Data; break; case Action.FallingBack: - _state = State.FallingBack; + _state = FallingBackOrFailed(); break; case Action.InitializersExhausted: _initializersRemain = false; @@ -196,7 +209,7 @@ private void DetermineState(Action action) break; case Action.FallingBack: - _state = State.FallingBack; + _state = FallingBackOrFailed(); break; case Action.DataReceived: case Action.SelectorReceived: diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index a1d1d47d..94575c09 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -51,7 +51,7 @@ public static IDataSource CreateFDv2DataSource( (actionable) => new FDv1FallbackActionApplier(actionable); var initializationTracker = - new InitializationTracker(Any(initializers), Any(synchronizers)); + new InitializationTracker(Any(initializers), Any(synchronizers), Any(fdv1Synchronizers)); var initializationObserver = new InitializationObserver(initializationTracker, DataSourceCategory.Initializers); var synchronizationObserver = diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 931a306a..1f73862b 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1836,13 +1836,22 @@ public void SynchronizerFDv1FallbackWithoutFallbackConfiguredHaltsDataSystem() try { - _ = dataSource.Start(); + var startTask = dataSource.Start(); // The data system must transition to Off via outer composite exhaustion. Wait a // few status updates -- intermediate Interrupted statuses come first. var statuses = capturingSink.WaitForStatusUpdates(2, TimeSpan.FromSeconds(5)); Assert.Contains(statuses, s => s.State == DataSourceState.Off); + // Start() must resolve (with false) rather than hang. Without the + // InitializationTracker's no-fallback shortcut, the tracker would stay in + // FallingBack forever because the outer composite's exhaustion-driven Off goes + // directly to the external sink, bypassing the tracker's observers. + Assert.True(startTask.Wait(TimeSpan.FromSeconds(5)), + "Start() must resolve when FDv1 directive arrives but no fallback is configured"); + Assert.False(startTask.Result, + "Start() should resolve with false when no FDv1 fallback is configured to take over"); + // Critically, the synchronizer must have been disposed -- this is what stops the // streaming source from reconnecting in the harness scenario. Assert.True(synchronizerDisposed, From 752da6a509808322c280d15447a47ff104e669bb Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 11 May 2026 15:48:22 -0400 Subject: [PATCH 14/14] fix: Resolve polling source _initTask when shutting down for FDv1 fallback If ProcessChangeSet fails (data store rejects the changeset) on a response carrying the FDv1 directive, control falls through to the fdv1Fallback branch in UpdateTaskAsync. Shutdown permanently cancels the poll loop, but _initTask is never resolved -- the polling source's Start() task hangs forever (a resource leak; not user-visible because CompletingDataSource.Start() returns the tracker's task rather than the underlying source's). Add _initTask.TrySetResult(false) before Shutdown. TrySet is a no-op when ProcessChangeSet already set it to true on the success path, so this only affects the store-failure path. --- .../src/Internal/FDv2DataSources/FDv2PollingDataSource.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs index ae24b4ba..2a45241a 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs @@ -115,6 +115,11 @@ private async Task UpdateTaskAsync() Time = DateTime.Now, FDv1Fallback = true }; + // Resolve _initTask so the polling source's Start() task doesn't leak when + // ProcessChangeSet failed (e.g. transient data store error): Shutdown below + // permanently cancels the poll loop, so the task would otherwise never + // complete. TrySet is a no-op if ProcessChangeSet already set it to true. + _initTask.TrySetResult(false); Shutdown(fallbackError); return; }