diff --git a/.github/actions/contract-tests/action.yml b/.github/actions/contract-tests/action.yml index 9ccaad11..a529c099 100644 --- a/.github/actions/contract-tests/action.yml +++ b/.github/actions/contract-tests/action.yml @@ -12,7 +12,7 @@ inputs: required: false default: '' run_fdv2_tests: - description: 'Whether to run contract tests from the v3.0.0-alpha.3 tag' + description: 'Whether to run contract tests from the v3.0.0-alpha.6 tag' required: false default: 'false' @@ -64,7 +64,7 @@ runs: shell: bash run: dotnet ${{ inputs.service_dll_file }} > test-service.log 2>&1 & disown - - name: Clone and run contract tests from v3.0.0-alpha.3 tag + - name: Clone and run contract tests from v3.0.0-alpha.6 tag if: inputs.run_fdv2_tests == 'true' shell: bash run: | @@ -72,8 +72,17 @@ runs: git clone https://github.com/launchdarkly/sdk-test-harness.git /tmp/sdk-test-harness cp $(dirname ./${{ inputs.service_project_file }})/test-supressions-fdv2.txt /tmp/sdk-test-harness/testharness-suppressions-fdv2.txt cd /tmp/sdk-test-harness - git checkout v3.0.0-alpha.3 + git checkout v3.0.0-alpha.6 go build -o test-harness . ./test-harness -url http://localhost:8000 -debug -status-timeout=360 --skip-from=testharness-suppressions-fdv2.txt --stop-service-at-end env: GITHUB_TOKEN: ${{ inputs.token }} + + - name: Upload test service log on failure + if: failure() && inputs.run_fdv2_tests == 'true' + uses: actions/upload-artifact@v4 + with: + name: contract-test-service-log-${{ runner.os }}-${{ runner.arch }} + path: test-service.log + if-no-files-found: warn + retention-days: 7 diff --git a/pkgs/sdk/server/contract-tests/Representations.cs b/pkgs/sdk/server/contract-tests/Representations.cs index 5030b315..b898c414 100644 --- a/pkgs/sdk/server/contract-tests/Representations.cs +++ b/pkgs/sdk/server/contract-tests/Representations.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text.Json.Serialization; using LaunchDarkly.Sdk; // Note, in order for System.Text.Json serialization/deserialization to work correctly, the members of @@ -153,6 +154,12 @@ public class SdkConfigDataSystemParams public int? StoreMode { get; set; } public SdkConfigDataInitializerParams[] Initializers { get; set; } public SdkConfigDataSynchronizerParams[] Synchronizers { get; set; } + // FDv1Fallback configures the SDK's FDv1 Fallback Synchronizer, which is engaged only when + // the LaunchDarkly server returns the FDv1 fallback directive. It is distinct from the FDv2 + // Primary/Fallback Synchronizers above. The harness sends this as "fdv1Fallback" (lowercase + // 'd'); we override the default CamelCase mapping (which would produce "fDv1Fallback"). + [JsonPropertyName("fdv1Fallback")] + public SdkConfigPollingParams FDv1Fallback { get; set; } public string PayloadFilter { get; set; } } diff --git a/pkgs/sdk/server/contract-tests/SdkClientEntity.cs b/pkgs/sdk/server/contract-tests/SdkClientEntity.cs index 2b8631fe..4e5ffc7b 100644 --- a/pkgs/sdk/server/contract-tests/SdkClientEntity.cs +++ b/pkgs/sdk/server/contract-tests/SdkClientEntity.cs @@ -503,47 +503,29 @@ private static Configuration BuildSdkConfig(SdkConfigParams sdkParams, ILogAdapt if (synchronizers.Count > 0) { dataSystemBuilder.Synchronizers(synchronizers.ToArray()); - - // Find the best synchronizer to use for FDv1 fallback configuration - // Prefer polling synchronizers since FDv1 fallback is polling-based - SdkConfigDataSynchronizerParams synchronizerForFallback = null; - - // First, try to find a polling synchronizer - foreach (var syncParams in sdkParams.DataSystem.Synchronizers) - { - if (syncParams.Polling != null) - { - synchronizerForFallback = syncParams; - break; - } - } - - // If no polling synchronizer found, use the first synchronizer (could be streaming) - if (synchronizerForFallback == null && sdkParams.DataSystem.Synchronizers.Length > 0) - { - synchronizerForFallback = sdkParams.DataSystem.Synchronizers[0]; - } - - if (synchronizerForFallback != null) - { - // Only configure global polling endpoints if we have a polling synchronizer with a custom base URI - // This ensures the FDv1 fallback synchronizer uses the same base URI without overwriting - // existing polling endpoint configuration - if (synchronizerForFallback.Polling != null && - synchronizerForFallback.Polling.BaseUri != null) - { - endpoints.Polling(synchronizerForFallback.Polling.BaseUri); - } - - var fdv1Fallback = CreateFDv1FallbackSynchronizer(synchronizerForFallback); - if (fdv1Fallback != null) - { - dataSystemBuilder.FDv1FallbackSynchronizer(fdv1Fallback); - } - } } } + // Configure the FDv1 Fallback Synchronizer directly from dataSystem.fdv1Fallback, + // separate from the FDv2 Primary/Fallback synchronizer chain. This is engaged only + // in response to a server-directed FDv1 Fallback Directive. + if (sdkParams.DataSystem.FDv1Fallback != null) + { + if (sdkParams.DataSystem.FDv1Fallback.BaseUri != null) + { + endpoints.Polling(sdkParams.DataSystem.FDv1Fallback.BaseUri); + } + + var fdv1FallbackBuilder = DataSystemComponents.FDv1Polling(); + if (sdkParams.DataSystem.FDv1Fallback.PollIntervalMs.HasValue) + { + fdv1FallbackBuilder.PollInterval( + TimeSpan.FromMilliseconds(sdkParams.DataSystem.FDv1Fallback.PollIntervalMs.Value)); + } + + dataSystemBuilder.FDv1FallbackSynchronizer(fdv1FallbackBuilder); + } + builder.DataSystem(dataSystemBuilder); } @@ -595,33 +577,6 @@ private static IComponentConfigurer CreateSynchronizer( return null; } - private static IComponentConfigurer CreateFDv1FallbackSynchronizer( - SdkConfigDataSynchronizerParams synchronizer) - { - // FDv1 fallback synchronizer is always polling-based - var fdv1PollingBuilder = DataSystemComponents.FDv1Polling(); - - // Configure polling interval if the synchronizer has polling configuration - if (synchronizer.Polling != null) - { - if (synchronizer.Polling.PollIntervalMs.HasValue) - { - fdv1PollingBuilder.PollInterval(TimeSpan.FromMilliseconds(synchronizer.Polling.PollIntervalMs.Value)); - } - // Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI - // will use the global service endpoints configuration - } - else if (synchronizer.Streaming != null) - { - // For streaming synchronizers, we still create a polling fallback - // Use default polling interval since streaming doesn't have a poll interval - // Note: FDv1 polling doesn't support ServiceEndpointsOverride, so base URI - // will use the global service endpoints configuration - } - - return fdv1PollingBuilder; - } - private MigrationVariationResponse DoMigrationVariation(MigrationVariationParams migrationVariation) { var defaultStage = MigrationStageExtensions.FromDataModelString(migrationVariation.DefaultStage); diff --git a/pkgs/sdk/server/contract-tests/TestService.cs b/pkgs/sdk/server/contract-tests/TestService.cs index 922a2676..13f8df7a 100644 --- a/pkgs/sdk/server/contract-tests/TestService.cs +++ b/pkgs/sdk/server/contract-tests/TestService.cs @@ -38,7 +38,8 @@ public class Webapp "inline-context-all", "anonymous-redaction", "evaluation-hooks", - "client-prereq-events" + "client-prereq-events", + "fdv1-fallback" }; public readonly Handler Handler; diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs new file mode 100644 index 00000000..27d5ad64 --- /dev/null +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeEntryKind.cs @@ -0,0 +1,23 @@ +namespace LaunchDarkly.Sdk.Server.Internal.DataSources +{ + /// + /// Categorizes an entry in a composite source's list. Appliers use this to express + /// "block every FDv2 entry" via + /// without having to count positions or know which phase they were attached at. + /// + internal enum CompositeEntryKind + { + /// + /// Default kind. Entries that participate in the FDv2 protocol -- initializers and + /// FDv2 synchronizers in the outer composite, and any entry that is not the + /// FDv1 fallback synchronizer. + /// + FDv2, + + /// + /// The FDv1 fallback synchronizer entry. Used by the FDv1 fallback applier to + /// distinguish the fallback target from the FDv2 entries it should block. + /// + FDv1Fallback + } +} diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs index 1ea5447e..69b869b5 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/CompositeSource.cs @@ -30,27 +30,31 @@ internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable private readonly IDataSourceUpdatesV2 _originalUpdateSink; private readonly IDataSourceUpdatesV2 _sanitizedUpdateSink; - private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> _sourcesList; + private readonly SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> _sourcesList; private readonly DisableableDataSourceUpdatesTracker _disableableTracker; // Tracks the entry from the sources list that was used to create the current // data source instance. This allows operations such as blacklist to remove // the correct factory/action-applier-factory tuple from the list. - private (SourceFactory Factory, ActionApplierFactory ActionApplierFactory) _currentEntry; + private (SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind) _currentEntry; private IDataSource _currentDataSource; /// - /// Creates a new . + /// Creates a new . Each entry carries an explicit + /// so appliers can express "block every FDv2 entry" + /// via without needing to know list + /// positions. For inner sub-composites whose entries never participate in cross-kind + /// blocking, callers should pass uniformly. /// /// description of the composite source for logging purposes /// the sink that receives updates from the active source - /// the ordered list of source factories and their associated action applier factories + /// the ordered list of source factories, action applier factories, and entry kinds /// the logger instance to use /// whether to loop off the end of the list back to the start when fallback occurs public CompositeSource( string compositeDescription, IDataSourceUpdatesV2 updatesSink, - IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> factoryTuples, + IList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> factoryTuples, Logger logger, bool circular = true) { @@ -72,7 +76,7 @@ public CompositeSource( // this tracker is used to disconnect the current source from the updates sink when it is no longer needed. _disableableTracker = new DisableableDataSourceUpdatesTracker(); - _sourcesList = new SourcesList<(SourceFactory SourceFactory, ActionApplierFactory ActionApplierFactory)>( + _sourcesList = new SourcesList<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>( circular: circular, initialList: factoryTuples ); @@ -451,6 +455,38 @@ public void BlockCurrent() }); } + public void BlockAll(Predicate kindMatches) + { + if (kindMatches is null) throw new ArgumentNullException(nameof(kindMatches)); + if (_disposed) + { + return; + } + + EnqueueAction(() => + { + lock (_lock) + { + var removed = _sourcesList.RemoveAll(entry => kindMatches(entry.Kind)); + if (removed == 0) + { + return; + } + + // If the current entry was removed, clear the reference so a subsequent + // BlockCurrent doesn't accidentally remove a remaining entry. The current + // data source is left running -- callers are expected to follow BlockAll + // with DisposeCurrent / GoToNext when they want it torn down. + if (_currentEntry != default && kindMatches(_currentEntry.Kind)) + { + _currentEntry = default; + } + + _log.Debug("{0} blocked {1} entries by kind predicate.", _compositeDescription, removed); + } + }); + } + private void logTransition(String previousDescription, String currentDescription) { if (previousDescription != null && currentDescription != null) { _log.Debug("{0} transitioned from {1} to {2}.", _compositeDescription, previousDescription, currentDescription); diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs index 5410f111..daabaf8a 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/ICompositeSourceActionable.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; namespace LaunchDarkly.Sdk.Server.Internal.DataSources @@ -34,11 +35,20 @@ internal interface ICompositeSourceActionable bool IsAtFirst(); /// - /// Blocks the the current source's factory. This prevents the current source's factory from being used again. + /// Blocks the the current source's factory. This prevents the current source's factory from being used again. /// Note that this does not tear down the current data source, it just prevents its factory from being used again. /// void BlockCurrent(); + + /// + /// Removes every entry whose kind matches the predicate from the source list, regardless + /// of its position. The current data source is not disposed -- callers that want the + /// current source torn down should follow this with or + /// . If the current entry was removed, the internal "current entry" + /// reference is cleared so subsequent block operations don't accidentally remove a + /// remaining entry. + /// + /// predicate selecting which kinds to remove + void BlockAll(Predicate kindMatches); } } - - diff --git a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs index 50f66dc0..e698ed28 100644 --- a/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs +++ b/pkgs/sdk/server/src/Internal/DataSources/CompositeDataSource/SourcesList.cs @@ -112,6 +112,38 @@ public int IndexOf(T element) return _list.IndexOf(element); } + /// + /// Removes every element that matches the predicate, adjusting the head position so it + /// continues to point at whichever element followed the previous head. Returns the number + /// of elements removed. + /// + public int RemoveAll(Predicate match) + { + if (match is null) return 0; + // Walk back-to-front so a removal never shifts an unvisited index. Adjust _pos as we go + // by mirroring SourcesList.Remove's "if removed index is before head, head moves left". + var removed = 0; + for (var i = _list.Count - 1; i >= 0; i--) + { + if (!match(_list[i])) continue; + _list.RemoveAt(i); + removed++; + if (i < _pos) + { + _pos -= 1; + } + } + if (_list.Count == 0) + { + _pos = 0; + } + else if (_circular && _pos > _list.Count - 1) + { + _pos = 0; + } + return removed; + } + /// /// Reset the head position to the start of the list. /// diff --git a/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs b/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs index ab1114f7..3d4deaf2 100644 --- a/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs +++ b/pkgs/sdk/server/src/Internal/DataSystem/FDv2DataSystem.cs @@ -79,14 +79,20 @@ public static FDv2DataSystem Create(Logger logger, Configuration configuration, var contextWithSelectorSource = clientContext.WithSelectorSource(new SelectorSourceFacade(writeThroughStore)); + // FDv1 fallback synchronizer is optional; only build a list entry when one is + // configured. An always-present list entry that captured a null configurer would + // throw NRE the moment the action applier advanced to the FDv1 fallback entry. + var fdv1FallbackFactories = dataSystemConfiguration.FDv1FallbackSynchronizer == null + ? new List() + : new List + { + FactoryWithContext(clientContext)(dataSystemConfiguration.FDv1FallbackSynchronizer) + }; var compositeDataSource = configuration.Offline ? Components.ExternalUpdatesOnly.Build(contextWithSelectorSource) : FDv2DataSource.CreateFDv2DataSource( dataSourceUpdates, dataSystemConfiguration.Initializers.Select(FactoryWithContext(contextWithSelectorSource)).ToList(), dataSystemConfiguration.Synchronizers.Select(FactoryWithContext(contextWithSelectorSource)).ToList(), - new List - { - FactoryWithContext(clientContext)(dataSystemConfiguration.FDv1FallbackSynchronizer) - }, + fdv1FallbackFactories, logger ); diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs index cfd40ced..4e5da698 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2ChangeSetTranslator.cs @@ -18,12 +18,14 @@ internal static class FDv2ChangeSetTranslator /// The FDv2 changeset to convert. /// Logger for diagnostic messages. /// The environment ID to include in the changeset. + /// Whether to mark the changeset as carrying the FDv1 fallback directive. /// A DataStoreTypes.ChangeSet containing the converted data. /// Thrown when the changeset type is unknown. public static DataStoreTypes.ChangeSet ToChangeSet( FDv2ChangeSet changeset, Logger log, - string environmentId = null) + string environmentId = null, + bool fdv1Fallback = false) { DataStoreTypes.ChangeSetType changeSetType; switch (changeset.Type) @@ -102,7 +104,8 @@ internal static class FDv2ChangeSetTranslator changeSetType, changeset.Selector, dataBuilder.ToImmutable(), - environmentId); + environmentId, + fdv1Fallback); } /// diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs index 349ec0f7..7ca36ab6 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.InitializationTracker.cs @@ -29,6 +29,7 @@ private class InitializationTracker : IDisposable private bool _initializersRemain; private bool _synchronizersRemain; + private readonly bool _hasFdv1Fallback; private enum State { @@ -96,8 +97,10 @@ private enum Action FallingBack, } - public InitializationTracker(bool hasInitializers, bool hasSynchronizers) + public InitializationTracker(bool hasInitializers, bool hasSynchronizers, bool hasFdv1Fallback) { + _hasFdv1Fallback = hasFdv1Fallback; + if (!(hasInitializers || hasSynchronizers)) { // If we have no data sources, then we are immediately initialized. @@ -115,6 +118,16 @@ public InitializationTracker(bool hasInitializers, bool hasSynchronizers) } } + /// + /// Resolves the target state for a transition that would otherwise enter + /// . When no FDv1 fallback synchronizer is configured, + /// there is no entry left to drive the tracker out of + /// (the outer composite's exhaustion-driven Off goes directly to the external sink, + /// bypassing the tracker's observers), so we transition straight to + /// and complete the task with false. + /// + private State FallingBackOrFailed() => _hasFdv1Fallback ? State.FallingBack : State.Failed; + public Task Task => _taskCompletionSource.Task; private void HandleRemainingSources() @@ -143,7 +156,7 @@ private void DetermineState(Action action) _state = State.Data; break; case Action.FallingBack: - _state = State.FallingBack; + _state = FallingBackOrFailed(); break; case Action.InitializersExhausted: _initializersRemain = false; @@ -196,7 +209,7 @@ private void DetermineState(Action action) break; case Action.FallingBack: - _state = State.FallingBack; + _state = FallingBackOrFailed(); break; case Action.DataReceived: case Action.SelectorReceived: @@ -248,6 +261,20 @@ public void Apply(DataStoreTypes.ChangeSet change public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError, DataSourceCategory category) { + // The FDv1 fallback directive is the SDK's signal that the initializer or + // synchronizer phase is being terminated and the FDv1 fallback synchronizer is + // taking over. This can ride on Interrupted (recoverable error or success + + // header) or Off (unrecoverable error + header) -- transition to FallingBack + // regardless of which state we observe so the tracker can complete via the + // fallback path. + var fdv1FallbackSignaled = newError.HasValue && newError.Value.FDv1Fallback; + if (fdv1FallbackSignaled + && (category == DataSourceCategory.Initializers + || category == DataSourceCategory.Synchronizers)) + { + DetermineState(Action.FallingBack); + } + switch (category) { case DataSourceCategory.Initializers when newState == DataSourceState.Off: @@ -257,14 +284,6 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n } case DataSourceCategory.Synchronizers when newState == DataSourceState.Off: { - // Currently, FDv1 fallback happens in the synchronizers group. If something from that group - // reports Off, with a reason indicating that it should fallback to v1, then we can - // transition to that state. - if (newError.HasValue && newError.Value.FDv1Fallback) - { - DetermineState(Action.FallingBack); - } - DetermineState(Action.SynchronizersExhausted); break; diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs index b88dc61e..94575c09 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2DataSource.cs @@ -10,7 +10,7 @@ namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources { - using FactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>; + using FactoryList = List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>; internal static partial class FDv2DataSource @@ -32,19 +32,26 @@ public static IDataSource CreateFDv2DataSource( Logger logger) { var sublogger = logger.SubLogger(LogNames.FDv2DataSourceSubLog); - - // Here we make a combined composite source, with the initializer source first which switches or falls back to the - // synchronizer source when the initializer succeeds or when the initializer source reports Off (all initializers failed) + + // The flow-control appliers (fast-fallback, blacklist, timed-fallback-and-recovery) + // each watch for the FDv1 directive on their inputs and bail when it is set, so the + // FDv1 fallback applier owns the transition uncontested. The directive arrives either + // on UpdateStatus (errorInfo.FDv1Fallback) or on Apply (changeSet.FDv1Fallback) when + // the response was a successful FDv2 payload that also carried the directive header. ActionApplierFactory blacklistWhenSuccessOrOff = (actionable) => new ActionApplierBlacklistWhenSuccessOrOff(actionable); ActionApplierFactory fastFallbackApplierFactory = (actionable) => new ActionApplierFastFallback(actionable); ActionApplierFactory timedFallbackAndRecoveryApplierFactory = (actionable) => new ActionApplierTimedFallbackAndRecovery(actionable, sublogger); - ActionApplierFactory fdv1FallbackApplierFactory = (actionable) => new FDv1FallbackActionApplier(actionable); + // The FDv1 fallback applier is phase-agnostic: when the directive is observed it + // calls BlockAll(FDv2) to remove every FDv2 entry from the outer list, then GoToNext + // lands on the FDv1 fallback entry (or on exhaustion when none was configured). + ActionApplierFactory fdv1FallbackApplierFactory = + (actionable) => new FDv1FallbackActionApplier(actionable); var initializationTracker = - new InitializationTracker(Any(initializers), Any(synchronizers)); + new InitializationTracker(Any(initializers), Any(synchronizers), Any(fdv1Synchronizers)); var initializationObserver = new InitializationObserver(initializationTracker, DataSourceCategory.Initializers); var synchronizationObserver = @@ -65,14 +72,18 @@ public static IDataSource CreateFDv2DataSource( for (int i = 0; i < initializers.Count; i++) { initializerFactory.Add((initializers[i], - fastFallbackApplierFactory)); + fastFallbackApplierFactory, + CompositeEntryKind.FDv2)); } // The common data source updates implements both IDataSourceUpdates and IDataSourceUpdatesV2. return new CompositeSource("Initializers", sink, initializerFactory, sublogger, circular: false); }, (actionable) => new CompositeObserver( - initializationObserver, blacklistWhenSuccessOrOff(actionable)) + initializationObserver, + fdv1FallbackApplierFactory(actionable), + blacklistWhenSuccessOrOff(actionable)), + CompositeEntryKind.FDv2 )); } @@ -88,21 +99,14 @@ public static IDataSource CreateFDv2DataSource( for (int i = 0; i < synchronizers.Count; i++) { synchronizersFactoryTuples.Add((synchronizers[i], - timedFallbackAndRecoveryApplierFactory)); + timedFallbackAndRecoveryApplierFactory, + CompositeEntryKind.FDv2)); } return new CompositeSource("Synchronizers", sink, synchronizersFactoryTuples, sublogger); }, - (actionable) => - { - // Only attach FDv1 fallback applier if FDv1 synchronizers are actually provided - if (fdv1Synchronizers != null && fdv1Synchronizers.Count > 0) - { - return new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)); - } - - return synchronizationObserver; - } + (actionable) => new CompositeObserver(synchronizationObserver, fdv1FallbackApplierFactory(actionable)), + CompositeEntryKind.FDv2 )); } @@ -117,12 +121,17 @@ public static IDataSource CreateFDv2DataSource( new FactoryList(); for (int i = 0; i < fdv1Synchronizers.Count; i++) { + // The Kind is irrelevant inside this inner sub-composite (its + // appliers never call BlockAll); FDv2 is the inert default. fdv1SynchronizersFactoryTuples.Add((fdv1Synchronizers[i], - timedFallbackAndRecoveryApplierFactory)); // fdv1 synchronizers behave same as synchronizers + timedFallbackAndRecoveryApplierFactory, + CompositeEntryKind.FDv2)); } return new CompositeSource("FDv1FallbackSynchronizers", sink, fdv1SynchronizersFactoryTuples, sublogger); - }, (applier) => fallbackSynchronizationObserver + }, + (applier) => fallbackSynchronizationObserver, + CompositeEntryKind.FDv1Fallback )); } @@ -144,6 +153,10 @@ public ActionApplierFastFallback(ICompositeSourceActionable actionable) public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { + // When the FDv1 directive rides on the status, the FDv1 fallback applier owns the + // transition. Bail so we don't double-advance the inner initializers list. + if (newError?.FDv1Fallback == true) return; + // when an initializer has an issue, fall back if (newState == DataSourceState.Interrupted || newState == DataSourceState.Off || newError != null) { @@ -163,6 +176,9 @@ public void Apply(ChangeSet changeSet) return; } + // Empty selector with directive: still defer to the FDv1 fallback applier. + if (changeSet.FDv1Fallback) return; + _actionable.DisposeCurrent(); _actionable.GoToNext(); _actionable.StartCurrent(); @@ -201,6 +217,15 @@ internal ActionApplierTimedFallbackAndRecovery(ICompositeSourceActionable action public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { + // When the FDv1 directive rides on the status, the FDv1 fallback applier owns the + // transition. Bail before scheduling timers or advancing within the inner sync + // list -- otherwise we'd briefly start the next sync before the outer composite + // disposes us. + if (newError?.FDv1Fallback == true) + { + return; + } + lock (_lock) { // If there's a pending fallback task and status is not Interrupted, cancel it @@ -364,7 +389,11 @@ public void Apply(ChangeSet changeSet) /// Action applier that blacklists the current datasource when init occurs or when Off status is seen, /// then disposes the current datasource, goes to the next datasource, and starts it. /// - private class ActionApplierBlacklistWhenSuccessOrOff : IDataSourceObserver + /// + /// When the FDv1 directive rides on the input (changeset or error info), this applier + /// bails so the FDv1 fallback applier can drive the transition uncontested. + /// + internal class ActionApplierBlacklistWhenSuccessOrOff : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; @@ -375,24 +404,26 @@ public ActionApplierBlacklistWhenSuccessOrOff(ICompositeSourceActionable actiona public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { + if (newState != DataSourceState.Off) return; + if (newError?.FDv1Fallback == true) return; + // When Off status is seen, blacklist current, dispose current, go to next, and start current - if (newState == DataSourceState.Off) - { - _actionable.BlockCurrent(); - _actionable.DisposeCurrent(); - _actionable.GoToNext(); - _actionable.StartCurrent(); - } + _actionable.BlockCurrent(); + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); } public void Apply(ChangeSet changeSet) { // If this change has a selector, then we know we can move out of the current phase. // This doesn't look at the type of the changeset (Full, Partial, None), because having - // a selector means that we have some payload. + // a selector means that we have some payload. // From a forward development perspective this could be because we had a local stale selector which was // persisted in some way, and we are getting up to date via an initializer. if (changeSet.Selector.IsEmpty) return; + if (changeSet.FDv1Fallback) return; + _actionable.BlockCurrent(); _actionable.DisposeCurrent(); _actionable.GoToNext(); @@ -400,9 +431,28 @@ public void Apply(ChangeSet changeSet) } } - private class FDv1FallbackActionApplier : IDataSourceObserver + /// + /// Action applier that observes an FDv1 fallback signal and advances the outer composite + /// to the FDv1 fallback synchronizer entry. The directive may arrive either on + /// (errorInfo.FDv1Fallback) or on + /// (changeSet.FDv1Fallback) when a successful payload also carried the directive header. + /// + /// + /// Phase-agnostic: when attached to either the initializers entry or the synchronizers + /// entry of the outer FDv2 composite, + /// removes every FDv2 entry in one shot, leaving the FDv1 fallback entry (if configured) + /// as the next stop. When no FDv1 fallback entry was configured the outer list is exhausted + /// and the composite halts the data system. + /// + /// The single-shot _triggered flag makes this applier idempotent in case the directive + /// arrives more than once on the same propagation chain (for example, on a successful FDv2 + /// response the source emits Apply with the flag and then UpdateStatus(Off, FDv1Fallback) + /// during shutdown). + /// + internal class FDv1FallbackActionApplier : IDataSourceObserver { private readonly ICompositeSourceActionable _actionable; + private int _triggered; public FDv1FallbackActionApplier(ICompositeSourceActionable actionable) { @@ -411,18 +461,27 @@ public FDv1FallbackActionApplier(ICompositeSourceActionable actionable) public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) { - if (newError != null && newError.Value.FDv1Fallback) - { - _actionable.BlockCurrent(); // blacklist the synchronizers altogether - _actionable.DisposeCurrent(); // dispose the synchronizers - _actionable.GoToNext(); // go to the FDv1 fallback synchronizer - _actionable.StartCurrent(); // start the FDv1 fallback synchronizer - } + if (newError?.FDv1Fallback != true) return; + Trigger(); } public void Apply(ChangeSet changeSet) { - // this FDv1 fallback action applier doesn't care about apply, it only looks for the FDv1Fallback flag in the errors + if (!changeSet.FDv1Fallback) return; + Trigger(); + } + + private void Trigger() + { + if (Interlocked.CompareExchange(ref _triggered, 1, 0) != 0) return; + + _actionable.BlockAll(kind => kind == CompositeEntryKind.FDv2); + // DisposeCurrent is defensive: a well-behaved source will have already shut + // itself down by the time the directive is observed, but DisposeCurrent is + // idempotent and guards against sources that don't. + _actionable.DisposeCurrent(); + _actionable.GoToNext(); + _actionable.StartCurrent(); } } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs index b8f92834..2a45241a 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingDataSource.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading; @@ -99,13 +100,35 @@ private async Task UpdateTaskAsync() ?.FirstOrDefault(); } - ProcessPollingResponse(response.Value); + var fdv1Fallback = HasFDv1FallbackHeader(response.Value.Headers); + ProcessPollingResponse(response.Value, fdv1Fallback); + + // On a successful response carrying the directive, the changeset above already + // rode the FDv1Fallback flag through to the appliers, which trigger the fallback + // transition. Tear down the polling loop here so we stop hitting the server. + if (fdv1Fallback) + { + _log.Info("LaunchDarkly polling response indicates fallback to FDv1"); + var fallbackError = new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }; + // Resolve _initTask so the polling source's Start() task doesn't leak when + // ProcessChangeSet failed (e.g. transient data store error): Shutdown below + // permanently cancels the poll loop, so the task would otherwise never + // complete. TrySet is a no-op if ProcessChangeSet already set it to true. + _initTask.TrySetResult(false); + Shutdown(fallbackError); + return; + } } catch (UnsuccessfulResponseException ex) { var recoverable = HttpErrors.IsRecoverable(ex.StatusCode); var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode, recoverable); - + // Check for LD fallback header if (ex.Headers != null) { @@ -157,7 +180,7 @@ private async Task UpdateTaskAsync() } } - private void ProcessPollingResponse(FDv2PollingResponse response) + private void ProcessPollingResponse(FDv2PollingResponse response, bool fdv1Fallback) { lock (_protocolLock) { @@ -166,7 +189,7 @@ private void ProcessPollingResponse(FDv2PollingResponse response) foreach (var evt in response.Events) { var action = _protocolHandler.HandleEvent(evt); - ProcessProtocolAction(action); + ProcessProtocolAction(action, fdv1Fallback); } } } @@ -185,12 +208,12 @@ private void HandleJsonError(string message) _dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo); } - private void ProcessProtocolAction(IFDv2ProtocolAction action) + private void ProcessProtocolAction(IFDv2ProtocolAction action, bool fdv1Fallback) { switch (action) { case FDv2ActionChangeset changesetAction: - ProcessChangeSet(changesetAction.Changeset); + ProcessChangeSet(changesetAction.Changeset, fdv1Fallback); break; case FDv2ActionError errorAction: _log.Error("FDv2 error event: {0} - {1}", errorAction.Id, errorAction.Reason); @@ -218,12 +241,12 @@ private void ProcessProtocolAction(IFDv2ProtocolAction action) } } - private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet) + private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet, bool fdv1Fallback) { if (!(_dataSourceUpdates is ITransactionalDataSourceUpdates transactionalDataSourceUpdates)) throw new InvalidOperationException("Cannot apply updates to non-transactional data source"); - var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId); + var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId, fdv1Fallback); // If the update fails, then we wait until the next poll and try again. // This is different from a streaming data source, which will need to re-start to get an initial @@ -266,5 +289,15 @@ private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo) _canceler?.Cancel(); _dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo); } + + internal static bool HasFDv1FallbackHeader( + IEnumerable>> headers) + { + if (headers == null) return false; + return headers + .Where(h => string.Equals(h.Key, "x-ld-fd-fallback", StringComparison.OrdinalIgnoreCase)) + .SelectMany(h => h.Value) + .Any(v => string.Equals(v, "true", StringComparison.OrdinalIgnoreCase)); + } } } diff --git a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs index d791055b..4615e145 100644 --- a/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs +++ b/pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2StreamingDataSource.cs @@ -46,6 +46,11 @@ internal delegate IEventSource EventSourceCreator(Uri streamUri, private volatile string _environmentId; + // _fdv1FallbackRequested is set to true when the streaming response carries the + // x-ld-fd-fallback: true header. Once any payload is successfully applied while this is set, + // the SDK shuts the stream down and signals the FDv1 fallback action applier. + private volatile bool _fdv1FallbackRequested; + /// /// When the store enters a failed state, and we don't have "data source monitoring", we want to log /// a message that we are restarting the event source. We don't want to log this message on multiple @@ -218,13 +223,31 @@ private void OnMessage(object sender, MessageReceivedEventArgs e) case FDv2ActionChangeset changeAction: { var changeset = changeAction.Changeset; + var fdv1Fallback = _fdv1FallbackRequested; var storeError = !_transactionalDataSourceUpdates.Apply( - FDv2ChangeSetTranslator.ToChangeSet(changeAction.Changeset, _log, _environmentId)); + FDv2ChangeSetTranslator.ToChangeSet(changeAction.Changeset, _log, _environmentId, fdv1Fallback)); if (!storeError) { _lastStoreUpdateFailed.GetAndSet(false); MaybeMarkInitialized(); + + // On a successful response carrying the FDv1 directive, the changeset + // above already rode the FDv1Fallback flag through to the appliers, which + // trigger the fallback transition. Tear down the stream here so we stop + // reconnecting. + if (fdv1Fallback) + { + _log.Info("LaunchDarkly streaming response indicates fallback to FDv1"); + var fallbackError = new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }; + Shutdown(fallbackError); + return; + } } else { @@ -341,6 +364,12 @@ private void OnOpen(object sender, StateChangedEventArgs e) _environmentId = e.Headers?.FirstOrDefault((item) => item.Key.ToLower() == HeaderConstants.EnvironmentId).Value ?.FirstOrDefault(); + + // Capture the FDv1 fallback header from the connection-open response. The SDK applies + // any payload that arrives on this stream and then shuts the stream down so the action + // applier can switch to the FDv1 fallback synchronizer. + _fdv1FallbackRequested = FDv2PollingDataSource.HasFDv1FallbackHeader(e.Headers); + _log.Debug("EventSource Opened"); RecordStreamInit(false); } diff --git a/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs b/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs index 3b943c87..70450eac 100644 --- a/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs +++ b/pkgs/sdk/server/src/Subsystems/DataStoreTypes.cs @@ -436,13 +436,23 @@ public struct ChangeSet /// public IEnumerable>> Data { get; } + /// + /// Whether the response that produced this changeset also carried the FDv1 fallback + /// directive. When true, the data system applies the payload but should hand control + /// to the FDv1 fallback synchronizer instead of continuing the FDv2 protocol. Defaults + /// to false. + /// + public bool FDv1Fallback { get; } + internal ChangeSet(ChangeSetType type, Selector selector, - IEnumerable>> data, string environmentId) + IEnumerable>> data, string environmentId, + bool fdv1Fallback = false) { Type = type; Selector = selector; Data = data; EnvironmentId = environmentId; + FDv1Fallback = fdv1Fallback; } } } diff --git a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs index 9545d6c7..1ecebf6c 100644 --- a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/CompositeSourceTest.cs @@ -65,11 +65,11 @@ public async Task CanFallbackOnInterrupted() }; // Create CompositeSource with three factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, actionApplierFactory), - (secondSourceFactory, actionApplierFactory), - (thirdSourceFactory, actionApplierFactory) + (firstSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (thirdSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -147,10 +147,10 @@ public async Task BlacklistsDataSourceFactoryAfterOffState() }; // Create CompositeSource with two factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, firstActionApplierFactory), - (secondSourceFactory, secondActionApplierFactory) + (firstSourceFactory, firstActionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, secondActionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger, circular: true); @@ -261,10 +261,10 @@ public async Task DisabledDataSourceCannotTriggerActions() }; // Create CompositeSource with two factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, firstActionApplierFactory), - (secondSourceFactory, secondActionApplierFactory) + (firstSourceFactory, firstActionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, secondActionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger, circular: true); @@ -339,9 +339,9 @@ public async Task DisposeReportsOffState() ActionApplierFactory actionApplierFactory = (actionable) => { return new MockActionApplier(actionable); }; // Create CompositeSource with one factory tuple - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (sourceFactory, actionApplierFactory) + (sourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -416,11 +416,11 @@ public async Task AllThreeSourcesFailReportsOffWithExhaustedMessage() }; // Create CompositeSource with three factory tuples - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)> + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)> { - (firstSourceFactory, actionApplierFactory), - (secondSourceFactory, actionApplierFactory), - (thirdSourceFactory, actionApplierFactory) + (firstSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (secondSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2), + (thirdSourceFactory, actionApplierFactory, CompositeEntryKind.FDv2) }; var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); @@ -471,7 +471,7 @@ public async Task NoSourcesProvidedReportsOffWithExhaustedMessage() var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); // Create CompositeSource with empty factory tuples list - var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory)>(); + var factoryTuples = new List<(SourceFactory Factory, ActionApplierFactory ActionApplierFactory, CompositeEntryKind Kind)>(); var compositeSource = new CompositeSource("test-composite", capturingSink, factoryTuples, TestLogger); diff --git a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs index a17462b7..ce2ea494 100644 --- a/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs +++ b/pkgs/sdk/server/test/Internal/DataSources/CompositeDataSource/SourcesListTest.cs @@ -184,6 +184,121 @@ public void NonCircularListReturnsNullAfterConsumingAllElements() Assert.Null(underTest.Next()); Assert.Null(underTest.Next()); } + + [Fact] + public void RemoveAllNullPredicateRemovesNothing() + { + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal(0, underTest.RemoveAll(null)); + Assert.Equal(3, underTest.Length); + Assert.Equal("1", underTest.Next()); + } + + [Fact] + public void RemoveAllOnEmptyListIsNoOp() + { + var underTest = new SourcesList(false); + Assert.Equal(0, underTest.RemoveAll(_ => true)); + Assert.Equal(0, underTest.Length); + Assert.Null(underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesNoneLeavesListIntact() + { + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); // head -> 1 (pos = 1 after Next) + + Assert.Equal(0, underTest.RemoveAll(s => s == "missing")); + Assert.Equal(3, underTest.Length); + + // Head is unchanged: next yields the previous next ("2"). + Assert.Equal("2", underTest.Next()); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesEverythingClearsListAndResetsHead() + { + var underTest = new SourcesList(true, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + + Assert.Equal(3, underTest.RemoveAll(_ => true)); + Assert.Equal(0, underTest.Length); + Assert.Equal(0, underTest.Pos); + Assert.Null(underTest.Next()); + Assert.Null(underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesOnlyEntriesBeforeHeadAdjustsPos() + { + // After consuming "1" and "2", head is at index 2 ("3"). Remove "1" and "2": list + // becomes ["3"] and head must point at "3" (pos = 0) so the next Next() returns "3". + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + + Assert.Equal(2, underTest.RemoveAll(s => s == "1" || s == "2")); + Assert.Equal(1, underTest.Length); + Assert.Equal(0, underTest.Pos); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesOnlyEntriesAfterHeadLeavesPosUnchanged() + { + // Head at index 1 ("2"). Removing entries that all live at indices > pos must not + // shift pos -- the next Next() should still return "2". + var underTest = new SourcesList(false, new[] { "1", "2", "3", "4" }); + Assert.Equal("1", underTest.Next()); // pos = 1 + + Assert.Equal(2, underTest.RemoveAll(s => s == "3" || s == "4")); + Assert.Equal(2, underTest.Length); + Assert.Equal(1, underTest.Pos); + Assert.Equal("2", underTest.Next()); + } + + [Fact] + public void RemoveAllPredicateMatchesHeadButNotPosIndex() + { + // Head at index 1 ("2"). Removing "2" itself: pos was 1; removed index 1 is NOT + // before pos, so pos stays at 1, but the list shrinks so pos lands on the element + // that used to be at index 2 ("3"). Subsequent Next() returns "3". + var underTest = new SourcesList(false, new[] { "1", "2", "3" }); + Assert.Equal("1", underTest.Next()); // pos = 1, head -> "2" + + Assert.Equal(1, underTest.RemoveAll(s => s == "2")); + Assert.Equal(2, underTest.Length); + Assert.Equal("3", underTest.Next()); + } + + [Fact] + public void RemoveAllCircularResetsPosWhenListShrinksBelowOldPos() + { + // Circular list, all positions consumed; pos has wrapped. After RemoveAll leaves a + // single element, pos must be reset to 0 so Next() returns it. + var underTest = new SourcesList(true, new[] { "1", "2", "3", "4" }); + Assert.Equal("1", underTest.Next()); + Assert.Equal("2", underTest.Next()); + Assert.Equal("3", underTest.Next()); + Assert.Equal("4", underTest.Next()); // circular: pos wraps back to 0 + + Assert.Equal(3, underTest.RemoveAll(s => s != "2")); + Assert.Equal(1, underTest.Length); + Assert.Equal("2", underTest.Next()); + } + + [Fact] + public void RemoveAllReturnsCountOfMatchedEntries() + { + var underTest = new SourcesList(false, new[] { "a", "b", "a", "c", "a" }); + Assert.Equal(3, underTest.RemoveAll(s => s == "a")); + Assert.Equal(2, underTest.Length); + Assert.Equal("b", underTest.Next()); + Assert.Equal("c", underTest.Next()); + } } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs index 16fdfe47..1f73862b 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2DataSourceTest.cs @@ -1418,32 +1418,45 @@ private class MockCompositeSourceActionable : ICompositeSourceActionable public bool GoToFirstCalled { get; private set; } public bool StartCurrentCalled { get; private set; } public bool BlockCurrentCalled { get; private set; } + public bool BlockAllCalled { get; private set; } + public int GoToNextCallCount { get; private set; } + public int BlockCurrentCallCount { get; private set; } + public int BlockAllCallCount { get; private set; } + public Predicate LastBlockAllPredicate { get; private set; } + public List CallSequence { get; } = new List(); private bool _isAtFirst = false; public void DisposeCurrent() { DisposeCurrentCalled = true; + CallSequence.Add(nameof(DisposeCurrent)); } public void GoToNext() { GoToNextCalled = true; + GoToNextCallCount++; + CallSequence.Add(nameof(GoToNext)); } public void GoToFirst() { GoToFirstCalled = true; + CallSequence.Add(nameof(GoToFirst)); } public Task StartCurrent() { StartCurrentCalled = true; + CallSequence.Add(nameof(StartCurrent)); return Task.FromResult(true); } public void BlockCurrent() { BlockCurrentCalled = true; + BlockCurrentCallCount++; + CallSequence.Add(nameof(BlockCurrent)); } public bool IsAtFirst() @@ -1456,6 +1469,14 @@ public void SetIsAtFirst(bool value) _isAtFirst = value; } + public void BlockAll(Predicate kindMatches) + { + BlockAllCalled = true; + BlockAllCallCount++; + LastBlockAllPredicate = kindMatches; + CallSequence.Add(nameof(BlockAll)); + } + public void Reset() { DisposeCurrentCalled = false; @@ -1463,9 +1484,748 @@ public void Reset() GoToFirstCalled = false; StartCurrentCalled = false; BlockCurrentCalled = false; + BlockAllCalled = false; + GoToNextCallCount = 0; + BlockCurrentCallCount = 0; + BlockAllCallCount = 0; + LastBlockAllPredicate = null; + CallSequence.Clear(); _isAtFirst = false; } } + + [Fact] + public void FDv1FallbackActionApplierIgnoresStatusWithoutFDv1Fallback() + { + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + // Status updates without an FDv1Fallback error must not advance the composite. + applier.UpdateStatus(DataSourceState.Off, null); + applier.UpdateStatus(DataSourceState.Interrupted, + new DataSourceStatus.ErrorInfo { Kind = DataSourceStatus.ErrorKind.NetworkError }); + applier.UpdateStatus(DataSourceState.Valid, + new DataSourceStatus.ErrorInfo { FDv1Fallback = false }); + + Assert.False(mockActionable.BlockCurrentCalled); + Assert.False(mockActionable.GoToNextCalled); + Assert.False(mockActionable.StartCurrentCalled); + } + + [Fact] + public void FDv1FallbackActionApplierUsesBlockAllAndAdvances() + { + // The applier blocks every FDv2 entry from the outer list (so the FDv1 fallback + // entry is the next stop), then advances to it. No skip-counting -- the entry list + // mutation handles both initializer-phase and synchronizer-phase wirings uniformly. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + applier.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + + Assert.Equal( + new List { "BlockAll", "DisposeCurrent", "GoToNext", "StartCurrent" }, + mockActionable.CallSequence); + Assert.NotNull(mockActionable.LastBlockAllPredicate); + Assert.True(mockActionable.LastBlockAllPredicate(CompositeEntryKind.FDv2)); + Assert.False(mockActionable.LastBlockAllPredicate(CompositeEntryKind.FDv1Fallback)); + } + + [Fact] + public void FDv1FallbackActionApplierTriggersOnApplyCarryingDirective() + { + // Successful FDv2 responses that also carry the directive ride the flag on the + // ChangeSet rather than UpdateStatus. The applier must trigger from either path. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + applier.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + new Dictionary>(), + null, + fdv1Fallback: true)); + + Assert.Equal( + new List { "BlockAll", "DisposeCurrent", "GoToNext", "StartCurrent" }, + mockActionable.CallSequence); + } + + [Fact] + public void FDv1FallbackActionApplierIsIdempotent() + { + // On a successful response the source emits Apply(directive) and then + // UpdateStatus(Off, FDv1Fallback) during shutdown. The applier must trigger only + // once; the second event is a no-op so we don't double-advance off the FDv1 entry. + var mockActionable = new MockCompositeSourceActionable(); + var applier = new FDv2DataSource.FDv1FallbackActionApplier(mockActionable); + + applier.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + new Dictionary>(), + null, + fdv1Fallback: true)); + applier.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo { FDv1Fallback = true }); + + Assert.Equal(1, mockActionable.BlockAllCallCount); + Assert.Equal(1, mockActionable.GoToNextCallCount); + } + + // End-to-end test: synchronizer reports Off+FDv1Fallback (mirrors the FDv2 streaming + // source's Shutdown path on a 403 + x-ld-fd-fallback header). The SDK must engage the + // FDv1 fallback synchronizer and reach Initialized via that path. The harness suite + // "directive on streaming error engages FDv1 fallback" exercises this. + [Fact] + public async Task SynchronizerOffWithFDv1FallbackErrorEngagesFDv1FallbackSynchronizer() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // Synchronizer reports Off (unrecoverable HTTP error) with the FDv1 fallback flag set + // -- the same error the FDv2 streaming source emits for a 403+directive response. + var synchronizerStartCount = 0; + SourceFactory synchronizerFactory = (updatesSink) => + { + synchronizerStartCount++; + return new MockDataSourceWithInit(async () => + { + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 403, + FDv1Fallback = true, + Recoverable = false, + Time = DateTime.Now + }); + await Task.Yield(); + }); + }; + + // FDv1 fallback synchronizer applies a payload and reaches Valid -- the SDK must + // reach Initialized via this path (not via the FDv2 sync, which failed). + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, Selector.Empty, fdv1Data.Data, null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult, "Start should complete via the FDv1 fallback path"); + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + + // The Apply from FDv1 reached the data store -- this is the load-bearing + // observation that proves the FDv1 fallback engaged. + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test: synchronizer applies a payload first, then reports Off+FDv1Fallback + // (mirrors the streaming success path with x-ld-fd-fallback header). The SDK must apply + // the initial payload AND then engage FDv1 fallback. Harness suite "directive on + // streaming success applies payload then engages FDv1" exercises this. + [Fact] + public async Task SynchronizerSuccessThenFDv1FallbackEngagesFDv1FallbackSynchronizer() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var syncData = new FullDataSet( + new Dictionary>()); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + SourceFactory synchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + // Apply a payload (the SDK's first observable side effect). + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "synchronizer-state"), + syncData.Data, + null)); + // Then signal Off+FDv1Fallback. The action applier must engage FDv1 even + // though the data system is already Initialized via the payload above. + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + FDv1Fallback = true, + Time = DateTime.Now + }); + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult); + + // Two Applies: first from the synchronizer's initial payload, then from FDv1. + var firstChangeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal("synchronizer-state", firstChangeSet.Selector.State); + var secondChangeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondChangeSet.Selector.State); + + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test: an initializer reports Off+FDv1Fallback. The SDK must skip the FDv2 + // synchronizer chain entirely and engage the FDv1 fallback synchronizer. Harness suite + // "directive on polling initializer skips FDv2 synchronizers" exercises this. + [Fact] + public async Task InitializerOffWithFDv1FallbackErrorSkipsSynchronizersAndEngagesFDv1() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + SourceFactory initializerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 403, + FDv1Fallback = true, + Recoverable = false, + Time = DateTime.Now + }); + await Task.Yield(); + }); + + // Synchronizer must NOT have its underlying data source's Start called. The composite + // may call the factory while walking past the entry, but the action applier must + // not start it. + var synchronizerStarted = false; + SourceFactory synchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + synchronizerStarted = true; + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, Selector.Empty, fdv1Data.Data, null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List { initializerFactory }, + new List { synchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startResult = await dataSource.Start(); + Assert.True(startResult, "Start should complete via the FDv1 fallback path"); + + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + Assert.True(fdv1Started, "FDv1 fallback synchronizer should have been started"); + Assert.False(synchronizerStarted, + "FDv2 synchronizer must not be started when the initializer signals FDv1 fallback"); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test: synchronizer reports the FDv1 fallback directive but no FDv1 + // fallback is configured. The SDK must HALT (Requirement 1.6.3(4)) -- which in + // practice means the synchronizer's underlying data source must be disposed so it + // stops trying to reconnect. The data system reaches Off via outer composite + // exhaustion. Mirrors the harness suite "directive without FDv1 fallback configured + // halts the data system". + [Fact] + public void SynchronizerFDv1FallbackWithoutFallbackConfiguredHaltsDataSystem() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + + var synchronizerDisposed = false; + SourceFactory synchronizerFactory = (updatesSink) => + new HaltMockDataSource( + onStart: async () => + { + // 500 + directive: a recoverable status that would normally drive retries. + // The directive must take precedence and halt those retries. + updatesSink.UpdateStatus( + DataSourceState.Interrupted, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.ErrorResponse, + StatusCode = 500, + FDv1Fallback = true, + Recoverable = true, + Time = DateTime.Now + }); + await Task.Yield(); + }, + onDispose: () => synchronizerDisposed = true); + + // No FDv1 fallback configured (empty list). + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { synchronizerFactory }, + new List(), + TestLogger); + + try + { + var startTask = dataSource.Start(); + + // The data system must transition to Off via outer composite exhaustion. Wait a + // few status updates -- intermediate Interrupted statuses come first. + var statuses = capturingSink.WaitForStatusUpdates(2, TimeSpan.FromSeconds(5)); + Assert.Contains(statuses, s => s.State == DataSourceState.Off); + + // Start() must resolve (with false) rather than hang. Without the + // InitializationTracker's no-fallback shortcut, the tracker would stay in + // FallingBack forever because the outer composite's exhaustion-driven Off goes + // directly to the external sink, bypassing the tracker's observers. + Assert.True(startTask.Wait(TimeSpan.FromSeconds(5)), + "Start() must resolve when FDv1 directive arrives but no fallback is configured"); + Assert.False(startTask.Result, + "Start() should resolve with false when no FDv1 fallback is configured to take over"); + + // Critically, the synchronizer must have been disposed -- this is what stops the + // streaming source from reconnecting in the harness scenario. + Assert.True(synchronizerDisposed, + "synchronizer source must be disposed so the streaming source stops reconnecting"); + } + finally + { + dataSource.Dispose(); + } + } + + // Mock data source that reports its disposal -- used to verify that the FDv1 fallback + // applier disposes the current synchronizer when no fallback is configured. + private class HaltMockDataSource : IDataSource + { + private readonly Func _onStart; + private readonly Action _onDispose; + private bool _initialized; + + public HaltMockDataSource(Func onStart, Action onDispose) + { + _onStart = onStart; + _onDispose = onDispose; + } + + public async Task Start() + { + await _onStart(); + _initialized = true; + return true; + } + + public bool Initialized => _initialized; + + public void Dispose() => _onDispose(); + } + + // End-to-end test using the REAL FDv2StreamingDataSource (with a mock IEventSource so we + // can drive synthetic SSE events). This catches harness-style regressions in the + // streaming source's interaction with the FDv2DataSource composite that the simpler + // mock-based tests above cannot reach. Mirrors the harness suite "directive on streaming + // success applies payload then engages FDv1". + [Fact] + public async Task RealStreamingSourceWithFallbackHeaderEngagesFDv1FallbackAfterApply() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // The streaming factory is invoked asynchronously by the OUTER composite's queue + // processor. Use a TCS to know when the streaming source has been constructed and + // its event handlers wired to the mock event source -- otherwise the test thread can + // race ahead and trigger the mock before the streaming source is listening. + var streamingMock = new IntegrationMockEventSource(); + var streamingReady = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + SourceFactory streamingFactory = (updatesSink) => + { + var context = BasicContext.WithDataSourceUpdates( + new LaunchDarkly.Sdk.Server.Internal.DataSystem.DataSourceUpdatesV2ToV1Adapter(updatesSink)); + var source = new FDv2StreamingDataSource( + context, + context.DataSourceUpdates, + new Uri("http://example.com"), + TimeSpan.FromMilliseconds(10), + () => Selector.Empty, + (uri, http) => streamingMock); + streamingReady.TrySetResult(true); + return source; + }; + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { streamingFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + // Wait for the streaming factory to wire up before triggering events on the mock. + Assert.True(await Task.WhenAny(streamingReady.Task, Task.Delay(TimeSpan.FromSeconds(2))) + == streamingReady.Task, + "streaming source should be constructed within 2s of dataSource.Start()"); + + // Drive the mock event source with the headers + a complete xfer-full payload. + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + streamingMock.TriggerOpen(headers); + streamingMock.TriggerMessage(new LaunchDarkly.EventSource.MessageReceivedEventArgs( + new LaunchDarkly.EventSource.MessageEvent("server-intent", + @"{""payloads"":[{""id"":""p1"",""target"":1,""intentCode"":""xfer-full"",""reason"":""r""}]}", + null))); + streamingMock.TriggerMessage(new LaunchDarkly.EventSource.MessageReceivedEventArgs( + new LaunchDarkly.EventSource.MessageEvent("payload-transferred", + @"{""state"":""(p:p1:1)"",""version"":1}", + null))); + + var startResult = await startTask; + Assert.True(startResult, "Start should complete via the streaming Apply"); + + // Two Applies expected: streaming xfer-full (non-empty selector), then FDv1. + var firstApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(1)); + Assert.Equal(ChangeSetType.Full, firstApply.Type); + var secondApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondApply.Selector.State); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer should be engaged when streaming success carries the FDv1 directive"); + Assert.True(streamingMock.IsClosed, + "Streaming event source should be closed after the directive is applied"); + } + finally + { + dataSource.Dispose(); + } + } + + // Regression test for the Bugbot-flagged double-advance bug. The polling source on a + // successful response with the x-ld-fd-fallback header now rides the directive on the + // ChangeSet (see FDv2PollingDataSource), so the blacklist applier and the FDv1 fallback + // applier both observe it inline: the blacklist bails and the FDv1 fallback applier + // calls BlockAll(FDv2) before advancing. The outer composite must NOT start the FDv2 + // synchronizer and MUST start the FDv1 fallback synchronizer. + [Fact] + public async Task InitializerSuccessWithFDv1FallbackDirectiveDoesNotOverAdvance() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var initializerData = new FullDataSet( + new Dictionary>()); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + // The polling-style initializer: applies a non-empty changeset with the FDv1 + // directive flag set, then reports Off + FDv1Fallback during shutdown. This mirrors + // FDv2PollingDataSource.UpdateTaskAsync when a successful 200 response carries the + // x-ld-fd-fallback header. The flag on the changeset is what makes the blacklist + // applier bail synchronously; without it the test would exercise the old race-prone + // path instead of the new bail path. + SourceFactory pollingInitializerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(1, "init-state"), + initializerData.Data, + null, + fdv1Fallback: true)); + updatesSink.UpdateStatus( + DataSourceState.Off, + new DataSourceStatus.ErrorInfo + { + Kind = DataSourceStatus.ErrorKind.Unknown, + Time = DateTime.Now, + FDv1Fallback = true + }); + await Task.Yield(); + }); + + // The FDv2 synchronizer must never be constructed -- if the bug is present and the + // outer composite double-advances after the initializer's Apply, we would land on + // (and start) this synchronizer. + var streamingSynchronizerStarted = false; + SourceFactory streamingSynchronizerFactory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + streamingSynchronizerStarted = true; + await Task.Yield(); + }); + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Make(2, "fdv1-state"), + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List { pollingInitializerFactory }, + new List { streamingSynchronizerFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + // The initializer's Apply rides the FDv1Fallback flag on the ChangeSet, so the + // blacklist applier sees it inline and bails. The FDv1 fallback applier triggers + // off the same Apply, calls BlockAll(FDv2) to remove the synchronizers entry, + // then advances to the FDv1 fallback entry uncontested. + var startResult = await startTask; + Assert.True(startResult, + "Start should complete via the FDv1 fallback synchronizer's Apply"); + + // Two Applies expected on the outer sink: the initializer's data, then the FDv1 + // fallback synchronizer's data. + var firstApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("init-state", firstApply.Selector.State); + var secondApply = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal("fdv1-state", secondApply.Selector.State); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer must be engaged when the initializer's success " + + "response carries the FDv1 directive"); + Assert.False(streamingSynchronizerStarted, + "FDv2 streaming synchronizer must NOT be started: the FDv1 directive should " + + "skip past the synchronizers entry. If the outer composite double-advances " + + "(blacklist applier's Apply-driven advancement is not deferred), this entry " + + "would be reached and started before the FDv1 fallback entry."); + } + finally + { + dataSource.Dispose(); + } + } + + // End-to-end test using the REAL FDv2StreamingDataSource: 403 error response with the + // x-ld-fd-fallback header. Mirrors the harness suite "directive on streaming error + // engages FDv1 fallback". + [Fact] + public async Task RealStreamingSource403WithFallbackHeaderEngagesFDv1Fallback() + { + var capturingSink = new CapturingDataSourceUpdatesWithHeaders(); + var fdv1Data = new FullDataSet( + new Dictionary>()); + + var streamingMock = new IntegrationMockEventSource(); + var streamingReady = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + SourceFactory streamingFactory = (updatesSink) => + { + var context = BasicContext.WithDataSourceUpdates( + new LaunchDarkly.Sdk.Server.Internal.DataSystem.DataSourceUpdatesV2ToV1Adapter(updatesSink)); + var source = new FDv2StreamingDataSource( + context, + context.DataSourceUpdates, + new Uri("http://example.com"), + TimeSpan.FromMilliseconds(10), + () => Selector.Empty, + (uri, http) => streamingMock); + streamingReady.TrySetResult(true); + return source; + }; + + var fdv1Started = false; + SourceFactory fdv1Factory = (updatesSink) => + new MockDataSourceWithInit(async () => + { + fdv1Started = true; + updatesSink.Apply(new ChangeSet( + ChangeSetType.Full, + Selector.Empty, + fdv1Data.Data, + null)); + updatesSink.UpdateStatus(DataSourceState.Valid, null); + await Task.Yield(); + }); + + var dataSource = FDv2DataSource.CreateFDv2DataSource( + capturingSink, + new List(), + new List { streamingFactory }, + new List { fdv1Factory }, + TestLogger); + + try + { + var startTask = dataSource.Start(); + + Assert.True(await Task.WhenAny(streamingReady.Task, Task.Delay(TimeSpan.FromSeconds(2))) + == streamingReady.Task, + "streaming source should be constructed within 2s of dataSource.Start()"); + + // The streaming source receives a 403 with the FDv1 fallback header. This is the + // exact exception the EventSource library throws on a non-2xx response carrying + // the directive (see EventSourceServiceUnsuccessfulResponseException in 5.3.0+). + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + var ex = new LaunchDarkly.EventSource.EventSourceServiceUnsuccessfulResponseException( + 403, headers); + streamingMock.TriggerError(ex); + + var startResult = await startTask; + Assert.True(startResult, + "Start should complete: tracker reaches Initialized via FDv1 fallback Apply"); + + var changeSet = capturingSink.Applies.ExpectValue(TimeSpan.FromSeconds(2)); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + Assert.True(fdv1Started, + "FDv1 fallback synchronizer should be engaged on a 403 + directive response"); + } + finally + { + dataSource.Dispose(); + } + } + + // Mock EventSource for integration tests -- duplicates the helper from + // FDv2StreamingDataSourceTest to keep the integration test contained in this file. + // Suppress unused-event warnings: we only use Opened/MessageReceived/Error here. +#pragma warning disable 67 + private class IntegrationMockEventSource : LaunchDarkly.EventSource.IEventSource + { + public event EventHandler Opened; + public event EventHandler Closed; + public event EventHandler MessageReceived; + public event EventHandler Error; + public event EventHandler CommentReceived; + + public bool IsClosed { get; private set; } + public LaunchDarkly.EventSource.ReadyState ReadyState { get; private set; } = + LaunchDarkly.EventSource.ReadyState.Closed; + + public Task StartAsync() + { + ReadyState = LaunchDarkly.EventSource.ReadyState.Open; + return Task.CompletedTask; + } + + public void Close() => IsClosed = true; + public void Restart(bool forceNewConnection = false) { } + + public void TriggerOpen(IEnumerable>> headers = null) => + Opened?.Invoke(this, new LaunchDarkly.EventSource.StateChangedEventArgs( + LaunchDarkly.EventSource.ReadyState.Open, headers)); + + public void TriggerMessage(LaunchDarkly.EventSource.MessageReceivedEventArgs args) => + MessageReceived?.Invoke(this, args); + + public void TriggerError(Exception exception) => + Error?.Invoke(this, new LaunchDarkly.EventSource.ExceptionEventArgs(exception)); + } +#pragma warning restore 67 + + private class CapturingObserver : IDataSourceObserver + { + public int UpdateStatusCallCount { get; private set; } + public int ApplyCallCount { get; private set; } + + public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? newError) + => UpdateStatusCallCount++; + + public void Apply(ChangeSet changeSet) => ApplyCallCount++; + } } } diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs index 12f16dbf..2e37787d 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs @@ -977,6 +977,71 @@ public async Task UnrecoverableHttpErrorWithFallbackHeaderFalseDoesNotSetFDv1Fal } } + [Fact] + public async Task SuccessfulResponseWithFallbackHeaderAppliesPayloadThenSignalsFDv1Fallback() + { + // The server can include x-ld-fd-fallback: true on a successful 200 response. The SDK + // must apply the payload first (so evaluations see the freshest data the server is + // willing to give us) and then emit Off + FDv1Fallback so the action applier hands off + // to the FDv1 fallback synchronizer. + var headers = new List>> + { + new KeyValuePair>( + "x-ld-fd-fallback", new[] { "true" }) + }; + + _mockRequestor.Setup(r => r.PollingRequestAsync(It.IsAny())) + .ReturnsAsync(CreatePollingResponseWithHeaders( + new[] { CreateServerIntentEvent("none", "test-payload", 1) }, + headers)); + + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + // The payload must be applied -- the SDK's first observable side effect is the + // ChangeSet reaching the data store. + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.None, changeSet.Type); + + // After the payload, the data source emits Off with FDv1Fallback=true so the action + // applier can swap to FDv1. + var status = _updateSink.StatusUpdates.ExpectValue(); + Assert.Equal(DataSourceState.Off, status.State); + Assert.NotNull(status.LastError); + Assert.True( + status.LastError.Value.FDv1Fallback, + "FDv1Fallback should be set when the success response carried the fallback header"); + + // Init task completes successfully because the payload was applied. + var result = await startTask; + Assert.True(result); + } + } + + [Fact] + public async Task SuccessfulResponseWithoutFallbackHeaderDoesNotSignalFDv1Fallback() + { + // Sanity check: a successful response with no fallback header must NOT trigger fallback. + _mockRequestor.Setup(r => r.PollingRequestAsync(It.IsAny())) + .ReturnsAsync(CreatePollingResponse( + CreateServerIntentEvent("none", "test-payload", 1))); + + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + var result = await startTask; + Assert.True(result); + + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.None, changeSet.Type); + + // No status update should appear -- the data source remains active and would simply + // poll again on the next tick without falling back. + _updateSink.StatusUpdates.ExpectNoValue(TimeSpan.FromMilliseconds(50)); + } + } + [Fact] public async Task UnrecoverableHttpError403ReportsOffStatusWithRecoverableFalse() { diff --git a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs index ba56a176..1adaa80e 100644 --- a/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs +++ b/pkgs/sdk/server/test/Internal/FDv2DataSources/FDv2StreamingDataSourceTest.cs @@ -710,6 +710,77 @@ public async Task UnrecoverableHttpErrorWithFallbackHeaderFalseDoesNotSetFDv1Fal } } + [Fact] + public async Task SuccessfulStreamWithFallbackHeaderAppliesPayloadThenSignalsFDv1Fallback() + { + // The server can ride x-ld-fd-fallback: true on a successful streaming connection. The + // SDK must apply any payload that arrives on the stream and then shut the stream down + // with FDv1Fallback=true so the action applier hands off to the FDv1 fallback + // synchronizer. Without this behavior the stream would stay open against FDv2 forever. + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + var headers = new List>> + { + new KeyValuePair>("x-ld-fd-fallback", new[] { "true" }) + }; + _mockEventSource.TriggerOpen(headers); + + // Send a complete xfer-full payload so the protocol applies a ChangeSet. + _mockEventSource.TriggerMessage(CreateMessageEvent("server-intent", + CreateServerIntentJson("xfer-full", "p1", 1))); + _mockEventSource.TriggerMessage(CreateMessageEvent("payload-transferred", + CreatePayloadTransferredJson("(p:p1:1)", 1))); + + // The payload must be applied first. + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + // Then the data source emits Off with FDv1Fallback=true so the action applier can + // swap to FDv1. + var status = _updateSink.StatusUpdates.ExpectValue(); + Assert.Equal(DataSourceState.Off, status.State); + Assert.NotNull(status.LastError); + Assert.True( + status.LastError.Value.FDv1Fallback, + "FDv1Fallback should be set when the success response carried the fallback header"); + + // Init task completes successfully because the payload was applied. + var result = await startTask; + Assert.True(result); + + // The event source is closed so the stream stops consuming events from FDv2. + Assert.True(_mockEventSource.IsClosed); + } + } + + [Fact] + public async Task SuccessfulStreamWithoutFallbackHeaderDoesNotSignalFDv1Fallback() + { + // Sanity check: a normal successful streaming response must not produce a fallback + // signal nor close the stream. + using (var dataSource = MakeDataSource()) + { + var startTask = dataSource.Start(); + + _mockEventSource.TriggerOpen(); + _mockEventSource.TriggerMessage(CreateMessageEvent("server-intent", + CreateServerIntentJson("xfer-full", "p1", 1))); + _mockEventSource.TriggerMessage(CreateMessageEvent("payload-transferred", + CreatePayloadTransferredJson("(p:p1:1)", 1))); + + var changeSet = _updateSink.Applies.ExpectValue(); + Assert.Equal(ChangeSetType.Full, changeSet.Type); + + var result = await startTask; + Assert.True(result); + + _updateSink.StatusUpdates.ExpectNoValue(TimeSpan.FromMilliseconds(50)); + Assert.False(_mockEventSource.IsClosed); + } + } + [Fact] public async Task UnrecoverableHttpError403ReportsOffStatusWithRecoverableFalse() {