diff --git a/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs b/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs index 68a05a1a..baa9727a 100644 --- a/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs +++ b/src/ES.Kubernetes.Reflector/Configuration/ReflectorOptions.cs @@ -3,4 +3,4 @@ public class ReflectorOptions { public WatcherOptions? Watcher { get; set; } -} \ No newline at end of file +} diff --git a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs index 018eb531..952d9a1e 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class ConfigMapMirror(ILogger logger, IKubernetes kubernetes) : ResourceMirror(logger, kubernetes) { + protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync( + CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + var list = await Kubernetes.CoreV1.ListConfigMapForAllNamespacesAsync( + continueParameter: continueParameter, + cancellationToken: cancellationToken); + foreach (var cm in list.Items) + RefreshRememberedAutoMirrorFromResource(cm); + continueParameter = list.Metadata?.ContinueProperty; + } while (!string.IsNullOrEmpty(continueParameter)); + } + protected override async Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken) => [ @@ -53,4 +68,4 @@ protected override async Task OnResourceGet(NamespacedName refId, CancellationToken cancellationToken) => await Kubernetes.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace, cancellationToken: cancellationToken); -} \ No newline at end of file +} diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs index 404ecdae..a2d48b0e 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/MirroringPropertiesExtensions.cs @@ -101,6 +101,12 @@ public static bool CanBeReflectedToNamespace(this MirroringProperties properties public static bool CanBeReflectedToNamespace(this MirroringProperties properties, V1Namespace ns) => properties.Allowed && MatchNamespace(properties.AllowedNamespaces, properties.AllowedNamespacesSelector, ns); + /// + /// True when this object is an auto-mirror source (reflection allowed and auto-reflection enabled). + /// + public static bool IsAutoMirrorSource(this MirroringProperties properties) => + properties is { Allowed: true, AutoEnabled: true }; + /// /// Checks if the source properties allow auto-reflection to the given namespace (by name only). /// Use the overload accepting V1Namespace for label selector support. diff --git a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs index c6d80e7f..1752a7d9 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; +using System.Linq; using System.Net; +using System.Threading; using ES.FX.Additions.KubernetesClient.Models; using ES.FX.Additions.KubernetesClient.Models.Extensions; using ES.FX.Additions.Newtonsoft.Json.Serialization; @@ -26,6 +28,11 @@ public abstract class ResourceMirror(ILogger logger, IKubernetes kube private readonly ConcurrentDictionary _notFoundCache = new(); private readonly ConcurrentDictionary _propertiesCache = new(); private readonly ConcurrentDictionary _lastWarnedSelectorErrors = new(); + private readonly ConcurrentDictionary _rememberedAutoMirrorSources = new(); + private readonly SemaphoreSlim _clusterDiscoveryLock = new(1, 1); + private readonly SemaphoreSlim _eagerAutoMirrorPrimeLock = new(1, 1); + private int _clusterDiscoveryCompleted; + protected readonly IKubernetes Kubernetes = kubernetes; protected readonly ILogger Logger = logger; @@ -49,9 +56,15 @@ public Task Handle(WatcherClosed notification, CancellationToken cancellationTok // Clear all resource caches but preserve _namespaceCache — it is owned by the // NamespaceWatcher and must survive resource watcher restarts so label-selector // checks remain functional during the replay that rebuilds resource state. - Logger.LogDebug("Cleared sources for {Type} resources", typeof(TResource).Name); - - _autoSources.Clear(); + // + // Preserve _rememberedAutoMirrorSources for the same reason: after a reconnect the watch replay + // may lag behind namespace events; auto-mirror sources are primed from the API using this set + // (see EnsureEagerAutoMirrorSourcesPrimed). Cold-start cluster discovery does not run again. + // + // Preserve _autoSources across reconnect so namespace reconciliation still knows which sources + // to GET while replay lags. + Logger.LogDebug( + "Cleared resource caches for {Type} resources (auto-source keys preserved)", typeof(TResource).Name); _notFoundCache.Clear(); _propertiesCache.Clear(); _autoReflectionCache.Clear(); @@ -77,7 +90,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati //Remove from the not found, since it exists - _notFoundCache.Remove(obj.NamespacedName(), out _); + _notFoundCache.TryRemove(obj.NamespacedName(), out _); switch (notification.EventType) { @@ -87,7 +100,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati break; case WatchEventType.Deleted: { - _propertiesCache.Remove(objNsName, out _); + _propertiesCache.TryRemove(objNsName, out _); _lastWarnedSelectorErrors.TryRemove(objNsName, out _); var properties = obj.GetMirroringProperties(); @@ -102,9 +115,10 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati await OnResourceDelete(reflectionNsName); } - _autoSources.Remove(objNsName, out _); - _directReflectionCache.Remove(objNsName, out _); - _autoReflectionCache.Remove(objNsName, out _); + _autoSources.TryRemove(objNsName, out _); + _rememberedAutoMirrorSources.TryRemove(objNsName, out _); + _directReflectionCache.TryRemove(objNsName, out _); + _autoReflectionCache.TryRemove(objNsName, out _); } else { @@ -135,10 +149,18 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati //Cache the namespace for label selector lookups _namespaceCache.AddOrUpdate(ns.Name(), ns, (_, _) => ns); - //Update all auto-sources - foreach (var sourceNsName in _autoSources.Keys) + await EnsureEagerAutoMirrorSourcesPrimed(cancellationToken); + + //Update all auto-sources (including remembered auto-mirror sources while replay lags) + foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray()) { - if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) continue; + if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) + { + var obj = await TryResourceGet(sourceNsName, cancellationToken); + if (obj is null) continue; + await HandleUpsert(obj, cancellationToken); + if (!_propertiesCache.TryGetValue(sourceNsName, out properties)) continue; + } var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []); var reflectionNsName = sourceNsName with { Namespace = ns.Name() }; @@ -196,7 +218,7 @@ await ResourceReflect( _namespaceCache.TryRemove(ns.Name(), out _); //Remove any auto-reflections targeting this namespace - foreach (var sourceNsName in _autoSources.Keys) + foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray()) { var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []); var reflectionNsName = sourceNsName with { Namespace = ns.Name() }; @@ -263,18 +285,20 @@ private async Task HandleUpsert(TResource obj, CancellationToken cancellationTok } - var isAutoSource = objProperties is { Allowed: true, AutoEnabled: true }; + var isAutoSource = objProperties.IsAutoMirrorSource(); //Update the status of an auto-source _autoSources.AddOrUpdate(objNsName, isAutoSource, (_, _) => isAutoSource); + UpdateRememberedAutoMirror(objNsName, objProperties); + //If not allowed or auto is disabled, remove the cache for auto-reflections - if (!isAutoSource) _autoReflectionCache.Remove(objNsName, out _); + if (!isAutoSource) _autoReflectionCache.TryRemove(objNsName, out _); //If reflection is disabled, remove the reflections cache and stop reflecting if (!objProperties.Allowed) { - _directReflectionCache.Remove(objNsName, out _); + _directReflectionCache.TryRemove(objNsName, out _); return; } @@ -624,6 +648,94 @@ private async Task ResourceReflect(NamespacedName sourceNsName, NamespacedName r protected abstract Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken); + private IEnumerable AutoMirrorSourceCandidates() => + _autoSources.Keys.Union(_rememberedAutoMirrorSources.Keys); + + private void UpdateRememberedAutoMirror(NamespacedName sourceNsName, MirroringProperties properties) + { + if (properties.IsAutoMirrorSource()) + _rememberedAutoMirrorSources.TryAdd(sourceNsName, 0); + else + _rememberedAutoMirrorSources.TryRemove(sourceNsName, out _); + } + + /// + /// Updates the remembered auto-mirror source set from a live API object (cluster discovery or watch). + /// + protected void RefreshRememberedAutoMirrorFromResource(TResource obj) + { + UpdateRememberedAutoMirror(obj.NamespacedName(), obj.GetMirroringProperties()); + } + + /// + /// One-time LIST of the cluster so auto-mirror sources are known before the informer replay + /// catches up (fresh pod or race). Default: no-op for tests or specialized mirrors. + /// + protected virtual Task DiscoverAutoMirrorSourcesFromClusterAsync(CancellationToken cancellationToken) => + Task.CompletedTask; + + private async Task EnsureClusterAutoSourcesDiscoveredOnceAsync(CancellationToken cancellationToken) + { + if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) + return; + + await _clusterDiscoveryLock.WaitAsync(cancellationToken); + try + { + if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0) + return; + + await DiscoverAutoMirrorSourcesFromClusterAsync(cancellationToken); + Volatile.Write(ref _clusterDiscoveryCompleted, 1); + Logger.LogDebug( + "Completed one-time cluster scan for auto-mirror {ResourceKind} sources ({Count} remembered)", + typeof(TResource).Name, + _rememberedAutoMirrorSources.Count); + } + finally + { + _clusterDiscoveryLock.Release(); + } + } + + /// + /// Ensures auto-mirror sources are discoverable (cold start) and loaded into the cache before + /// namespace-driven reconciliation when the watch has not replayed yet. + /// + private async Task EnsureEagerAutoMirrorSourcesPrimed(CancellationToken cancellationToken) + { + await EnsureClusterAutoSourcesDiscoveredOnceAsync(cancellationToken); + + var candidates = AutoMirrorSourceCandidates().ToArray(); + if (candidates.Length == 0) + return; + + if (candidates.All(ns => _propertiesCache.ContainsKey(ns))) + return; + + await _eagerAutoMirrorPrimeLock.WaitAsync(cancellationToken); + try + { + foreach (var nsName in candidates) + { + if (_propertiesCache.ContainsKey(nsName)) + continue; + var obj = await TryResourceGet(nsName, cancellationToken); + if (obj is null) + { + Logger.LogDebug("Eager auto-mirror source {Source} not available yet", nsName); + continue; + } + + await HandleUpsert(obj, cancellationToken); + } + } + finally + { + _eagerAutoMirrorPrimeLock.Release(); + } + } + private async Task TryResourceGet(NamespacedName resourceNsName, CancellationToken cancellationToken) { @@ -700,4 +812,4 @@ private bool CanBeAutoReflectedToNamespaceCached(MirroringProperties properties, return properties.CanBeAutoReflectedToNamespace(ns); } -} \ No newline at end of file +} diff --git a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs index bf95e3bf..ab6d6b90 100644 --- a/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs +++ b/src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring; public class SecretMirror(ILogger logger, IKubernetes kubernetesClient) : ResourceMirror(logger, kubernetesClient) { + protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync( + CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + var list = await Kubernetes.CoreV1.ListSecretForAllNamespacesAsync( + continueParameter: continueParameter, + cancellationToken: cancellationToken); + foreach (var secret in list.Items) + RefreshRememberedAutoMirrorFromResource(secret); + continueParameter = list.Metadata?.ContinueProperty; + } while (!string.IsNullOrEmpty(continueParameter)); + } + protected override async Task OnResourceWithNameList(string itemRefName, CancellationToken cancellationToken) => [ diff --git a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs index 1a33253c..1b1e37a3 100644 --- a/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs +++ b/tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs @@ -297,4 +297,30 @@ public void GetLabelSelectorErrors_EmptySelectors_ReturnsEmpty() }; Assert.Empty(props.GetLabelSelectorErrors()); } + + [Fact] + public void IsAutoMirrorSource_TrueWhenAllowedAndAutoEnabled() + { + var props = new MirroringProperties + { + Allowed = true, + AutoEnabled = true, + AllowedNamespaces = "^staging$" + }; + Assert.True(props.IsAutoMirrorSource()); + } + + [Fact] + public void IsAutoMirrorSource_FalseWhenAutoDisabled() + { + var props = new MirroringProperties { Allowed = true, AutoEnabled = false }; + Assert.False(props.IsAutoMirrorSource()); + } + + [Fact] + public void IsAutoMirrorSource_FalseWhenNotAllowed() + { + var props = new MirroringProperties { Allowed = false, AutoEnabled = true }; + Assert.False(props.IsAutoMirrorSource()); + } }