From eeef9c47fed957787209765129daba233ebe629f Mon Sep 17 00:00:00 2001 From: Kiril Angov Date: Mon, 11 May 2026 09:34:43 -0400 Subject: [PATCH 1/5] Prime global auto-mirror sources on startup Remember unrestricted auto-mirror sources across reconnects and discover them once from the cluster so namespace events can reconcile before watch replay catches up. --- .../Configuration/ReflectorOptions.cs | 2 +- .../Mirroring/ConfigMapMirror.cs | 17 ++- .../Core/MirroringPropertiesExtensions.cs | 12 ++ .../Mirroring/Core/ResourceMirror.cs | 134 +++++++++++++++++- .../LabelSelectorMatchTests.cs | 39 +++++ 5 files changed, 195 insertions(+), 9 deletions(-) diff --git a/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs b/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs index 68a05a1a..baa9727a 100644 --- a/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs +++ b/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs @@ -3,4 +3,4 @@ public class ReflectorOptions { public WatcherOptions? Watcher { get; set; } -} \ No newline at end of file +} diff --git a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs index 018eb531..ac5a2052 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class ConfigMapMirror(ILogger logger, IKubernetes kubernetes) : ResourceMirror(logger, kubernetes) { + protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( + CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + var list = await Kubernetes.CoreV1.ListConfigMapForAllNamespacesAsync( + continueParameter: continueParameter, + cancellationToken: cancellationToken); + foreach (var cm in list.Items) + RefreshRememberedGlobalAutoMirrorFromResource(cm); + continueParameter = list.Metadata?.ContinueProperty; + } while (!string.IsNullOrEmpty(continueParameter)); + } + protected override async Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken) => [ @@ -53,4 +68,4 @@ protected override async Task OnResourceGet(NamespacedName refId, CancellationToken cancellationToken) => await Kubernetes.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace, cancellationToken: cancellationToken); -} \ No newline at end of file +} diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs index 404ecdae..e5e9687c 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs @@ -101,6 +101,18 @@ public static bool CanBeReflectedToNamespace(this MirroringProperties properties public static bool CanBeReflectedToNamespace(this MirroringProperties properties, V1Namespace ns) => properties.Allowed && MatchNamespace(properties.AllowedNamespaces, properties.AllowedNamespacesSelector, ns); + /// + /// True when this source is configured to auto-reflect into every namespace: reflection is + /// allowed and enabled with no namespace name patterns or label selectors constraining either + /// the allowed set or the auto-reflection set. + /// + public static bool ReflectsAutoToAllNamespaces(this MirroringProperties properties) => + properties is { Allowed: true, AutoEnabled: true } && + string.IsNullOrEmpty(properties.AllowedNamespaces) && + string.IsNullOrEmpty(properties.AllowedNamespacesSelector) && + string.IsNullOrEmpty(properties.AutoNamespaces) && + string.IsNullOrEmpty(properties.AutoNamespacesSelector); + /// /// Checks if the source properties allow auto-reflection to the given namespace (by name only). /// Use the overload accepting V1Namespace for label selector support. diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs index c6d80e7f..24b6f907 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; +using System.Linq; using System.Net; +using System.Threading; using ES.FX.Additions.KubernetesClient.Models; using ES.FX.Additions.KubernetesClient.Models.Extensions; using ES.FX.Additions.Newtonsoft.Json.Serialization; @@ -26,6 +28,11 @@ public abstract class ResourceMirror(ILogger logger, IKubernetes kube private readonly ConcurrentDictionary _notFoundCache = new(); private readonly ConcurrentDictionary _propertiesCache = new(); private readonly ConcurrentDictionary _lastWarnedSelectorErrors = new(); + private readonly ConcurrentDictionary _rememberedGlobalAutoMirrorSources = new(); + private readonly SemaphoreSlim _clusterDiscoveryLock = new(1, 1); + private readonly SemaphoreSlim _eagerAutoMirrorPrimeLock = new(1, 1); + private int _clusterDiscoveryCompleted; + protected readonly IKubernetes Kubernetes = kubernetes; protected readonly ILogger Logger = logger; @@ -49,9 +56,16 @@ public Task Handle(WatcherClosed notification, CancellationToken cancellationTok // Clear all resource caches but preserve _namespaceCache — it is owned by the // NamespaceWatcher and must survive resource watcher restarts so label-selector // checks remain functional during the replay that rebuilds resource state. - Logger.LogDebug("Cleared sources for {Type} resources", typeof(TResource).Name); - - _autoSources.Clear(); + // + // Preserve _rememberedGlobalAutoMirrorSources for the same reason: after a reconnect the + // watch replay may lag behind namespace events; sources that previously reflected auto into + // all namespaces are primed from the API using this set (see EnsureEagerAutoMirrorSourcesPrimed). + // Cold-start cluster discovery does not need to run again for the lifetime of the process. + // + // Preserve _autoSources across reconnect so namespace reconciliation still knows which sources + // to GET while replay lags (scoped auto-mirror sources are not in the global remembered set). + Logger.LogDebug( + "Cleared resource caches for {Type} resources (auto-source keys preserved)", typeof(TResource).Name); _notFoundCache.Clear(); _propertiesCache.Clear(); _autoReflectionCache.Clear(); @@ -103,6 +117,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati } _autoSources.Remove(objNsName, out _); + _rememberedGlobalAutoMirrorSources.TryRemove(objNsName, out _); _directReflectionCache.Remove(objNsName, out _); _autoReflectionCache.Remove(objNsName, out _); } @@ -135,10 +150,18 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati //Cache the namespace for label selector lookups _namespaceCache.AddOrUpdate(ns.Name(), ns, (_, _) => ns); - //Update all auto-sources - foreach (var sourceNsName in _autoSources.Keys) + await EnsureEagerAutoMirrorSourcesPrimed(cancellationToken); + + //Update all auto-sources (including remembered global-auto sources while replay lags) + foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray()) { - if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) continue; + if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) + { + var obj = await TryResourceGet(sourceNsName, cancellationToken); + if (obj is null) continue; + await HandleUpsert(obj, cancellationToken); + if (!_propertiesCache.TryGetValue(sourceNsName, out properties)) continue; + } var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []); var reflectionNsName = sourceNsName with { Namespace = ns.Name() }; @@ -196,7 +219,7 @@ await ResourceReflect( _namespaceCache.TryRemove(ns.Name(), out _); //Remove any auto-reflections targeting this namespace - foreach (var sourceNsName in _autoSources.Keys) + foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray()) { var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []); var reflectionNsName = sourceNsName with { Namespace = ns.Name() }; @@ -268,6 +291,8 @@ private async Task HandleUpsert(TResource obj, CancellationToken cancellationTok //Update the status of an auto-source _autoSources.AddOrUpdate(objNsName, isAutoSource, (_, _) => isAutoSource); + UpdateRememberedGlobalAutoMirror(objNsName, objProperties); + //If not allowed or auto is disabled, remove the cache for auto-reflections if (!isAutoSource) _autoReflectionCache.Remove(objNsName, out _); @@ -624,6 +649,101 @@ private async Task ResourceReflect(NamespacedName sourceNsName, NamespacedName r protected abstract Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken); + private IEnumerable AutoMirrorSourceCandidates() => + _autoSources.Keys.Union(_rememberedGlobalAutoMirrorSources.Keys); + + private void UpdateRememberedGlobalAutoMirror(NamespacedName sourceNsName, MirroringProperties properties) + { + if (properties.ReflectsAutoToAllNamespaces()) + _rememberedGlobalAutoMirrorSources.TryAdd(sourceNsName, 0); + else + _rememberedGlobalAutoMirrorSources.TryRemove(sourceNsName, out _); + } + + /// + /// Updates the remembered global-auto source set from a live API object (cluster discovery or watch). + /// + protected void RefreshRememberedGlobalAutoMirrorFromResource(TResource obj) + { + UpdateRememberedGlobalAutoMirror(obj.NamespacedName(), obj.GetMirroringProperties()); + } + + /// + /// One-time LIST of the cluster so global auto-mirror sources are known before the informer + /// replay catches up (fresh pod or race). Default: no-op for tests or specialized mirrors. + /// + protected virtual Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync(CancellationToken cancellationToken) => + Task.CompletedTask; + + private async Task EnsureClusterGlobalSourcesDiscoveredOnceAsync(CancellationToken cancellationToken) + { + if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) + return; + + await _clusterDiscoveryLock.WaitAsync(cancellationToken); + try + { + if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) + return; + + await DiscoverGlobalAutoMirrorSourcesFromClusterAsync(cancellationToken); + Volatile.Write(ref _clusterDiscoveryCompleted, 1); + Logger.LogDebug( + "Completed one-time cluster scan for global auto-mirror {ResourceKind} sources ({Count} candidates)", + typeof(TResource).Name, + _rememberedGlobalAutoMirrorSources.Count); + } + finally + { + _clusterDiscoveryLock.Release(); + } + } + + /// + /// Ensures global auto-mirror sources are discoverable (cold start) and loaded into the cache + /// before namespace-driven reconciliation when the watch has not replayed yet. + /// + private async Task EnsureEagerAutoMirrorSourcesPrimed(CancellationToken cancellationToken) + { + await EnsureClusterGlobalSourcesDiscoveredOnceAsync(cancellationToken); + + if (_rememberedGlobalAutoMirrorSources.IsEmpty) + return; + + var anyMissing = false; + foreach (var nsName in _rememberedGlobalAutoMirrorSources.Keys) + if (!_propertiesCache.ContainsKey(nsName)) + { + anyMissing = true; + break; + } + + if (!anyMissing) + return; + + await _eagerAutoMirrorPrimeLock.WaitAsync(cancellationToken); + try + { + foreach (var nsName in _rememberedGlobalAutoMirrorSources.Keys) + { + if (_propertiesCache.ContainsKey(nsName)) + continue; + var obj = await TryResourceGet(nsName, cancellationToken); + if (obj is null) + { + Logger.LogDebug("Eager auto-mirror source {Source} not available yet", nsName); + continue; + } + + await HandleUpsert(obj, cancellationToken); + } + } + finally + { + _eagerAutoMirrorPrimeLock.Release(); + } + } + private async Task TryResourceGet(NamespacedName resourceNsName, CancellationToken cancellationToken) { diff --git a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs index 1a33253c..3dc173a4 100644 --- a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs +++ b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs @@ -297,4 +297,43 @@ public void GetLabelSelectorErrors_EmptySelectors_ReturnsEmpty() }; Assert.Empty(props.GetLabelSelectorErrors()); } + + [Fact] + public void ReflectsAutoToAllNamespaces_TrueWhenUnrestricted() + { + var props = new MirroringProperties + { + Allowed = true, + AutoEnabled = true, + AllowedNamespaces = string.Empty, + AllowedNamespacesSelector = string.Empty, + AutoNamespaces = string.Empty, + AutoNamespacesSelector = string.Empty + }; + Assert.True(props.ReflectsAutoToAllNamespaces()); + } + + [Fact] + public void ReflectsAutoToAllNamespaces_FalseWhenAutoScopedByName() + { + var props = new MirroringProperties + { + Allowed = true, + AutoEnabled = true, + AutoNamespaces = "^staging$" + }; + Assert.False(props.ReflectsAutoToAllNamespaces()); + } + + [Fact] + public void ReflectsAutoToAllNamespaces_FalseWhenAllowedScopedBySelector() + { + var props = new MirroringProperties + { + Allowed = true, + AutoEnabled = true, + AllowedNamespacesSelector = "env=prod" + }; + Assert.False(props.ReflectsAutoToAllNamespaces()); + } } From c1502c67051693f70e4a4cb5b93c1d7ec6edac6a Mon Sep 17 00:00:00 2001 From: Kiril Angov Date: Mon, 11 May 2026 09:43:25 -0400 Subject: [PATCH 2/5] Discover global secret mirror sources from cluster --- .../Mirroring/SecretMirror.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs index bf95e3bf..f350f184 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class SecretMirror(ILogger logger, IKubernetes kubernetesClient) : ResourceMirror(logger, kubernetesClient) { + protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( + CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + var list = await Kubernetes.CoreV1.ListSecretForAllNamespacesAsync( + continueParameter: continueParameter, + cancellationToken: cancellationToken); + foreach (var secret in list.Items) + RefreshRememberedGlobalAutoMirrorFromResource(secret); + continueParameter = list.Metadata?.ContinueProperty; + } while (!string.IsNullOrEmpty(continueParameter)); + } + protected override async Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken) => [ From a26d7ef79f88481eebb3126636ba903956a8ce34 Mon Sep 17 00:00:00 2001 From: Kiril Angov Date: Tue, 26 May 2026 10:20:17 -0400 Subject: [PATCH 3/5] Generalize remembered auto-mirror sources Track all auto-mirror sources instead of only unrestricted ones during discovery and eager priming, and add a helper for identifying auto-mirror sources from mirroring properties. --- .../Mirroring/ConfigMapMirror.cs | 4 +- .../Core/MirroringPropertiesExtensions.cs | 6 ++ .../Mirroring/Core/ResourceMirror.cs | 70 ++++++++----------- .../Mirroring/SecretMirror.cs | 4 +- .../LabelSelectorMatchTests.cs | 13 ++++ 5 files changed, 54 insertions(+), 43 deletions(-) diff --git a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs index ac5a2052..952d9a1e 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs @@ -9,7 +9,7 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class ConfigMapMirror(ILogger logger, IKubernetes kubernetes) : ResourceMirror(logger, kubernetes) { - protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( + protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync( CancellationToken cancellationToken) { string? continueParameter = null; @@ -19,7 +19,7 @@ protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( continueParameter: continueParameter, cancellationToken: cancellationToken); foreach (var cm in list.Items) - RefreshRememberedGlobalAutoMirrorFromResource(cm); + RefreshRememberedAutoMirrorFromResource(cm); continueParameter = list.Metadata?.ContinueProperty; } while (!string.IsNullOrEmpty(continueParameter)); } diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs index e5e9687c..8deca9b4 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs @@ -101,6 +101,12 @@ public static bool CanBeReflectedToNamespace(this MirroringProperties properties public static bool CanBeReflectedToNamespace(this MirroringProperties properties, V1Namespace ns) => properties.Allowed && MatchNamespace(properties.AllowedNamespaces, properties.AllowedNamespacesSelector, ns); + /// + /// True when this object is an auto-mirror source (reflection allowed and auto-reflection enabled). + /// + public static bool IsAutoMirrorSource(this MirroringProperties properties) => + properties is { Allowed: true, AutoEnabled: true }; + /// /// True when this source is configured to auto-reflect into every namespace: reflection is /// allowed and enabled with no namespace name patterns or label selectors constraining either diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs index 24b6f907..a9189514 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs @@ -28,7 +28,7 @@ public abstract class ResourceMirror(ILogger logger, IKubernetes kube private readonly ConcurrentDictionary _notFoundCache = new(); private readonly ConcurrentDictionary _propertiesCache = new(); private readonly ConcurrentDictionary _lastWarnedSelectorErrors = new(); - private readonly ConcurrentDictionary _rememberedGlobalAutoMirrorSources = new(); + private readonly ConcurrentDictionary _rememberedAutoMirrorSources = new(); private readonly SemaphoreSlim _clusterDiscoveryLock = new(1, 1); private readonly SemaphoreSlim _eagerAutoMirrorPrimeLock = new(1, 1); private int _clusterDiscoveryCompleted; @@ -57,13 +57,12 @@ public Task Handle(WatcherClosed notification, CancellationToken cancellationTok // NamespaceWatcher and must survive resource watcher restarts so label-selector // checks remain functional during the replay that rebuilds resource state. // - // Preserve _rememberedGlobalAutoMirrorSources for the same reason: after a reconnect the - // watch replay may lag behind namespace events; sources that previously reflected auto into - // all namespaces are primed from the API using this set (see EnsureEagerAutoMirrorSourcesPrimed). - // Cold-start cluster discovery does not need to run again for the lifetime of the process. + // Preserve _rememberedAutoMirrorSources for the same reason: after a reconnect the watch replay + // may lag behind namespace events; auto-mirror sources are primed from the API using this set + // (see EnsureEagerAutoMirrorSourcesPrimed). Cold-start cluster discovery does not run again. // // Preserve _autoSources across reconnect so namespace reconciliation still knows which sources - // to GET while replay lags (scoped auto-mirror sources are not in the global remembered set). + // to GET while replay lags. Logger.LogDebug( "Cleared resource caches for {Type} resources (auto-source keys preserved)", typeof(TResource).Name); _notFoundCache.Clear(); @@ -117,7 +116,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati } _autoSources.Remove(objNsName, out _); - _rememberedGlobalAutoMirrorSources.TryRemove(objNsName, out _); + _rememberedAutoMirrorSources.TryRemove(objNsName, out _); _directReflectionCache.Remove(objNsName, out _); _autoReflectionCache.Remove(objNsName, out _); } @@ -152,7 +151,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati await EnsureEagerAutoMirrorSourcesPrimed(cancellationToken); - //Update all auto-sources (including remembered global-auto sources while replay lags) + //Update all auto-sources (including remembered auto-mirror sources while replay lags) foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray()) { if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) @@ -286,12 +285,12 @@ private async Task HandleUpsert(TResource obj, CancellationToken cancellationTok } - var isAutoSource = objProperties is { Allowed: true, AutoEnabled: true }; + var isAutoSource = objProperties.IsAutoMirrorSource(); //Update the status of an auto-source _autoSources.AddOrUpdate(objNsName, isAutoSource, (_, _) => isAutoSource); - UpdateRememberedGlobalAutoMirror(objNsName, objProperties); + UpdateRememberedAutoMirror(objNsName, objProperties); //If not allowed or auto is disabled, remove the cache for auto-reflections if (!isAutoSource) _autoReflectionCache.Remove(objNsName, out _); @@ -650,32 +649,32 @@ protected abstract Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken); private IEnumerable AutoMirrorSourceCandidates() => - _autoSources.Keys.Union(_rememberedGlobalAutoMirrorSources.Keys); + _autoSources.Keys.Union(_rememberedAutoMirrorSources.Keys); - private void UpdateRememberedGlobalAutoMirror(NamespacedName sourceNsName, MirroringProperties properties) + private void UpdateRememberedAutoMirror(NamespacedName sourceNsName, MirroringProperties properties) { - if (properties.ReflectsAutoToAllNamespaces()) - _rememberedGlobalAutoMirrorSources.TryAdd(sourceNsName, 0); + if (properties.IsAutoMirrorSource()) + _rememberedAutoMirrorSources.TryAdd(sourceNsName, 0); else - _rememberedGlobalAutoMirrorSources.TryRemove(sourceNsName, out _); + _rememberedAutoMirrorSources.TryRemove(sourceNsName, out _); } /// - /// Updates the remembered global-auto source set from a live API object (cluster discovery or watch). + /// Updates the remembered auto-mirror source set from a live API object (cluster discovery or watch). /// - protected void RefreshRememberedGlobalAutoMirrorFromResource(TResource obj) + protected void RefreshRememberedAutoMirrorFromResource(TResource obj) { - UpdateRememberedGlobalAutoMirror(obj.NamespacedName(), obj.GetMirroringProperties()); + UpdateRememberedAutoMirror(obj.NamespacedName(), obj.GetMirroringProperties()); } /// - /// One-time LIST of the cluster so global auto-mirror sources are known before the informer - /// replay catches up (fresh pod or race). Default: no-op for tests or specialized mirrors. + /// One-time LIST of the cluster so auto-mirror sources are known before the informer replay + /// catches up (fresh pod or race). Default: no-op for tests or specialized mirrors. /// - protected virtual Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync(CancellationToken cancellationToken) => + protected virtual Task DiscoverAutoMirrorSourcesFromClusterAsync(CancellationToken cancellationToken) => Task.CompletedTask; - private async Task EnsureClusterGlobalSourcesDiscoveredOnceAsync(CancellationToken cancellationToken) + private async Task EnsureClusterAutoSourcesDiscoveredOnceAsync(CancellationToken cancellationToken) { if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) return; @@ -686,12 +685,12 @@ private async Task EnsureClusterGlobalSourcesDiscoveredOnceAsync(CancellationTok if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) return; - await DiscoverGlobalAutoMirrorSourcesFromClusterAsync(cancellationToken); + await DiscoverAutoMirrorSourcesFromClusterAsync(cancellationToken); Volatile.Write(ref _clusterDiscoveryCompleted, 1); Logger.LogDebug( - "Completed one-time cluster scan for global auto-mirror {ResourceKind} sources ({Count} candidates)", + "Completed one-time cluster scan for auto-mirror {ResourceKind} sources ({Count} remembered)", typeof(TResource).Name, - _rememberedGlobalAutoMirrorSources.Count); + _rememberedAutoMirrorSources.Count); } finally { @@ -700,31 +699,24 @@ private async Task EnsureClusterGlobalSourcesDiscoveredOnceAsync(CancellationTok } /// - /// Ensures global auto-mirror sources are discoverable (cold start) and loaded into the cache - /// before namespace-driven reconciliation when the watch has not replayed yet. + /// Ensures auto-mirror sources are discoverable (cold start) and loaded into the cache before + /// namespace-driven reconciliation when the watch has not replayed yet. /// private async Task EnsureEagerAutoMirrorSourcesPrimed(CancellationToken cancellationToken) { - await EnsureClusterGlobalSourcesDiscoveredOnceAsync(cancellationToken); + await EnsureClusterAutoSourcesDiscoveredOnceAsync(cancellationToken); - if (_rememberedGlobalAutoMirrorSources.IsEmpty) + var candidates = AutoMirrorSourceCandidates().ToArray(); + if (candidates.Length == 0) return; - var anyMissing = false; - foreach (var nsName in _rememberedGlobalAutoMirrorSources.Keys) - if (!_propertiesCache.ContainsKey(nsName)) - { - anyMissing = true; - break; - } - - if (!anyMissing) + if (candidates.All(ns => _propertiesCache.ContainsKey(ns))) return; await _eagerAutoMirrorPrimeLock.WaitAsync(cancellationToken); try { - foreach (var nsName in _rememberedGlobalAutoMirrorSources.Keys) + foreach (var nsName in candidates) { if (_propertiesCache.ContainsKey(nsName)) continue; diff --git a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs index f350f184..ab6d6b90 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs @@ -9,7 +9,7 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class SecretMirror(ILogger logger, IKubernetes kubernetesClient) : ResourceMirror(logger, kubernetesClient) { - protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( + protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync( CancellationToken cancellationToken) { string? continueParameter = null; @@ -19,7 +19,7 @@ protected override async Task DiscoverGlobalAutoMirrorSourcesFromClusterAsync( continueParameter: continueParameter, cancellationToken: cancellationToken); foreach (var secret in list.Items) - RefreshRememberedGlobalAutoMirrorFromResource(secret); + RefreshRememberedAutoMirrorFromResource(secret); continueParameter = list.Metadata?.ContinueProperty; } while (!string.IsNullOrEmpty(continueParameter)); } diff --git a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs index 3dc173a4..42fe771d 100644 --- a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs +++ b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs @@ -298,6 +298,19 @@ public void GetLabelSelectorErrors_EmptySelectors_ReturnsEmpty() Assert.Empty(props.GetLabelSelectorErrors()); } + [Fact] + public void IsAutoMirrorSource_TrueWhenAllowedAndAutoEnabled() + { + var props = new MirroringProperties + { + Allowed = true, + AutoEnabled = true, + AllowedNamespaces = "^staging$" + }; + Assert.True(props.IsAutoMirrorSource()); + Assert.False(props.ReflectsAutoToAllNamespaces()); + } + [Fact] public void ReflectsAutoToAllNamespaces_TrueWhenUnrestricted() { From 0bd1c82a16a8f8190141c797e978e0b8daafa05c Mon Sep 17 00:00:00 2001 From: Kiril Angov Date: Tue, 26 May 2026 10:22:00 -0400 Subject: [PATCH 4/5] Remove unused auto-mirror all-namespaces helper Add tests that cover disabled and disallowed auto-mirror sources instead --- .../Core/MirroringPropertiesExtensions.cs | 12 ------ .../Mirroring/Core/ResourceMirror.cs | 2 +- .../LabelSelectorMatchTests.cs | 38 +++---------------- 3 files changed, 7 insertions(+), 45 deletions(-) diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs index 8deca9b4..a2d48b0e 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs @@ -107,18 +107,6 @@ public static bool CanBeReflectedToNamespace(this MirroringProperties properties public static bool IsAutoMirrorSource(this MirroringProperties properties) => properties is { Allowed: true, AutoEnabled: true }; - /// - /// True when this source is configured to auto-reflect into every namespace: reflection is - /// allowed and enabled with no namespace name patterns or label selectors constraining either - /// the allowed set or the auto-reflection set. - /// - public static bool ReflectsAutoToAllNamespaces(this MirroringProperties properties) => - properties is { Allowed: true, AutoEnabled: true } && - string.IsNullOrEmpty(properties.AllowedNamespaces) && - string.IsNullOrEmpty(properties.AllowedNamespacesSelector) && - string.IsNullOrEmpty(properties.AutoNamespaces) && - string.IsNullOrEmpty(properties.AutoNamespacesSelector); - /// /// Checks if the source properties allow auto-reflection to the given namespace (by name only). /// Use the overload accepting V1Namespace for label selector support. diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs index a9189514..af169380 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs @@ -812,4 +812,4 @@ private bool CanBeAutoReflectedToNamespaceCached(MirroringProperties properties, return properties.CanBeAutoReflectedToNamespace(ns); } -} \ No newline at end of file +} diff --git a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs index 42fe771d..1b1e37a3 100644 --- a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs +++ b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs @@ -308,45 +308,19 @@ public void IsAutoMirrorSource_TrueWhenAllowedAndAutoEnabled() AllowedNamespaces = "^staging$" }; Assert.True(props.IsAutoMirrorSource()); - Assert.False(props.ReflectsAutoToAllNamespaces()); } [Fact] - public void ReflectsAutoToAllNamespaces_TrueWhenUnrestricted() + public void IsAutoMirrorSource_FalseWhenAutoDisabled() { - var props = new MirroringProperties - { - Allowed = true, - AutoEnabled = true, - AllowedNamespaces = string.Empty, - AllowedNamespacesSelector = string.Empty, - AutoNamespaces = string.Empty, - AutoNamespacesSelector = string.Empty - }; - Assert.True(props.ReflectsAutoToAllNamespaces()); + var props = new MirroringProperties { Allowed = true, AutoEnabled = false }; + Assert.False(props.IsAutoMirrorSource()); } [Fact] - public void ReflectsAutoToAllNamespaces_FalseWhenAutoScopedByName() + public void IsAutoMirrorSource_FalseWhenNotAllowed() { - var props = new MirroringProperties - { - Allowed = true, - AutoEnabled = true, - AutoNamespaces = "^staging$" - }; - Assert.False(props.ReflectsAutoToAllNamespaces()); - } - - [Fact] - public void ReflectsAutoToAllNamespaces_FalseWhenAllowedScopedBySelector() - { - var props = new MirroringProperties - { - Allowed = true, - AutoEnabled = true, - AllowedNamespacesSelector = "env=prod" - }; - Assert.False(props.ReflectsAutoToAllNamespaces()); + var props = new MirroringProperties { Allowed = false, AutoEnabled = true }; + Assert.False(props.IsAutoMirrorSource()); } } From b0fe89d944558cf9e0d80590cb3fc44c64eb2b21 Mon Sep 17 00:00:00 2001 From: Kiril Angov Date: Tue, 26 May 2026 10:26:31 -0400 Subject: [PATCH 5/5] Use TryRemove for concurrent mirror caches --- .../Mirroring/Core/ResourceMirror.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs index af169380..1752a7d9 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs @@ -90,7 +90,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati //Remove from the not found, since it exists - _notFoundCache.Remove(obj.NamespacedName(), out _); + _notFoundCache.TryRemove(obj.NamespacedName(), out _); switch (notification.EventType) { @@ -100,7 +100,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati break; case WatchEventType.Deleted: { - _propertiesCache.Remove(objNsName, out _); + _propertiesCache.TryRemove(objNsName, out _); _lastWarnedSelectorErrors.TryRemove(objNsName, out _); var properties = obj.GetMirroringProperties(); @@ -115,10 +115,10 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati await OnResourceDelete(reflectionNsName); } - _autoSources.Remove(objNsName, out _); + _autoSources.TryRemove(objNsName, out _); _rememberedAutoMirrorSources.TryRemove(objNsName, out _); - _directReflectionCache.Remove(objNsName, out _); - _autoReflectionCache.Remove(objNsName, out _); + _directReflectionCache.TryRemove(objNsName, out _); + _autoReflectionCache.TryRemove(objNsName, out _); } else { @@ -293,12 +293,12 @@ private async Task HandleUpsert(TResource obj, CancellationToken cancellationTok UpdateRememberedAutoMirror(objNsName, objProperties); //If not allowed or auto is disabled, remove the cache for auto-reflections - if (!isAutoSource) _autoReflectionCache.Remove(objNsName, out _); + if (!isAutoSource) _autoReflectionCache.TryRemove(objNsName, out _); //If reflection is disabled, remove the reflections cache and stop reflecting if (!objProperties.Allowed) { - _directReflectionCache.Remove(objNsName, out _); + _directReflectionCache.TryRemove(objNsName, out _); return; }