Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
public class ReflectorOptions
{
public WatcherOptions? Watcher { get; set; }
}
}
17 changes: 16 additions & 1 deletion src/ES.Kubernetes.Reflector/Mirroring/ConfigMapMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring;
public class ConfigMapMirror(ILogger<ConfigMapMirror> logger, IKubernetes kubernetes)
: ResourceMirror<V1ConfigMap>(logger, kubernetes)
{
protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync(
CancellationToken cancellationToken)
{
string? continueParameter = null;
do
{
var list = await Kubernetes.CoreV1.ListConfigMapForAllNamespacesAsync(
continueParameter: continueParameter,
cancellationToken: cancellationToken);
foreach (var cm in list.Items)
RefreshRememberedAutoMirrorFromResource(cm);
continueParameter = list.Metadata?.ContinueProperty;
} while (!string.IsNullOrEmpty(continueParameter));
}

protected override async Task<V1ConfigMap[]> OnResourceWithNameList(string itemRefName,
CancellationToken cancellationToken) =>
[
Expand Down Expand Up @@ -53,4 +68,4 @@ protected override async Task<V1ConfigMap> OnResourceGet(NamespacedName refId,
CancellationToken cancellationToken) =>
await Kubernetes.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace,
cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public static bool CanBeReflectedToNamespace(this MirroringProperties properties
public static bool CanBeReflectedToNamespace(this MirroringProperties properties, V1Namespace ns) =>
properties.Allowed && MatchNamespace(properties.AllowedNamespaces, properties.AllowedNamespacesSelector, ns);

/// <summary>
/// True when this object is an auto-mirror source (reflection allowed and auto-reflection enabled).
/// </summary>
public static bool IsAutoMirrorSource(this MirroringProperties properties) =>
properties is { Allowed: true, AutoEnabled: true };

/// <summary>
/// Checks if the source properties allow auto-reflection to the given namespace (by name only).
/// Use the overload accepting V1Namespace for label selector support.
Expand Down
144 changes: 128 additions & 16 deletions src/ES.Kubernetes.Reflector/Mirroring/Core/ResourceMirror.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading;
using ES.FX.Additions.KubernetesClient.Models;
using ES.FX.Additions.KubernetesClient.Models.Extensions;
using ES.FX.Additions.Newtonsoft.Json.Serialization;
Expand All @@ -26,6 +28,11 @@ public abstract class ResourceMirror<TResource>(ILogger logger, IKubernetes kube
private readonly ConcurrentDictionary<NamespacedName, bool> _notFoundCache = new();
private readonly ConcurrentDictionary<NamespacedName, MirroringProperties> _propertiesCache = new();
private readonly ConcurrentDictionary<NamespacedName, string> _lastWarnedSelectorErrors = new();
private readonly ConcurrentDictionary<NamespacedName, byte> _rememberedAutoMirrorSources = new();
private readonly SemaphoreSlim _clusterDiscoveryLock = new(1, 1);
private readonly SemaphoreSlim _eagerAutoMirrorPrimeLock = new(1, 1);
private int _clusterDiscoveryCompleted;

protected readonly IKubernetes Kubernetes = kubernetes;
protected readonly ILogger Logger = logger;

Expand All @@ -49,9 +56,15 @@ public Task Handle(WatcherClosed notification, CancellationToken cancellationTok
// Clear all resource caches but preserve _namespaceCache — it is owned by the
// NamespaceWatcher and must survive resource watcher restarts so label-selector
// checks remain functional during the replay that rebuilds resource state.
Logger.LogDebug("Cleared sources for {Type} resources", typeof(TResource).Name);

_autoSources.Clear();
//
// Preserve _rememberedAutoMirrorSources for the same reason: after a reconnect the watch replay
// may lag behind namespace events; auto-mirror sources are primed from the API using this set
// (see EnsureEagerAutoMirrorSourcesPrimed). Cold-start cluster discovery does not run again.
//
// Preserve _autoSources across reconnect so namespace reconciliation still knows which sources
// to GET while replay lags.
Logger.LogDebug(
"Cleared resource caches for {Type} resources (auto-source keys preserved)", typeof(TResource).Name);
_notFoundCache.Clear();
_propertiesCache.Clear();
_autoReflectionCache.Clear();
Expand All @@ -77,7 +90,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati


//Remove from the not found, since it exists
_notFoundCache.Remove(obj.NamespacedName(), out _);
_notFoundCache.TryRemove(obj.NamespacedName(), out _);

switch (notification.EventType)
{
Expand All @@ -87,7 +100,7 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati
break;
case WatchEventType.Deleted:
{
_propertiesCache.Remove(objNsName, out _);
_propertiesCache.TryRemove(objNsName, out _);
_lastWarnedSelectorErrors.TryRemove(objNsName, out _);
var properties = obj.GetMirroringProperties();

Expand All @@ -102,9 +115,10 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati
await OnResourceDelete(reflectionNsName);
}

_autoSources.Remove(objNsName, out _);
_directReflectionCache.Remove(objNsName, out _);
_autoReflectionCache.Remove(objNsName, out _);
_autoSources.TryRemove(objNsName, out _);
_rememberedAutoMirrorSources.TryRemove(objNsName, out _);
_directReflectionCache.TryRemove(objNsName, out _);
_autoReflectionCache.TryRemove(objNsName, out _);
}
else
{
Expand Down Expand Up @@ -135,10 +149,18 @@ public async Task Handle(WatcherEvent notification, CancellationToken cancellati
//Cache the namespace for label selector lookups
_namespaceCache.AddOrUpdate(ns.Name(), ns, (_, _) => ns);

//Update all auto-sources
foreach (var sourceNsName in _autoSources.Keys)
await EnsureEagerAutoMirrorSourcesPrimed(cancellationToken);

//Update all auto-sources (including remembered auto-mirror sources while replay lags)
foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray())
{
if (!_propertiesCache.TryGetValue(sourceNsName, out var properties)) continue;
if (!_propertiesCache.TryGetValue(sourceNsName, out var properties))
{
var obj = await TryResourceGet(sourceNsName, cancellationToken);
if (obj is null) continue;
await HandleUpsert(obj, cancellationToken);
if (!_propertiesCache.TryGetValue(sourceNsName, out properties)) continue;
}

var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []);
var reflectionNsName = sourceNsName with { Namespace = ns.Name() };
Expand Down Expand Up @@ -196,7 +218,7 @@ await ResourceReflect(
_namespaceCache.TryRemove(ns.Name(), out _);

//Remove any auto-reflections targeting this namespace
foreach (var sourceNsName in _autoSources.Keys)
foreach (var sourceNsName in AutoMirrorSourceCandidates().ToArray())
{
var autoReflections = _autoReflectionCache.GetOrAdd(sourceNsName, []);
var reflectionNsName = sourceNsName with { Namespace = ns.Name() };
Expand Down Expand Up @@ -263,18 +285,20 @@ private async Task HandleUpsert(TResource obj, CancellationToken cancellationTok
}


var isAutoSource = objProperties is { Allowed: true, AutoEnabled: true };
var isAutoSource = objProperties.IsAutoMirrorSource();

//Update the status of an auto-source
_autoSources.AddOrUpdate(objNsName, isAutoSource, (_, _) => isAutoSource);

UpdateRememberedAutoMirror(objNsName, objProperties);

//If not allowed or auto is disabled, remove the cache for auto-reflections
if (!isAutoSource) _autoReflectionCache.Remove(objNsName, out _);
if (!isAutoSource) _autoReflectionCache.TryRemove(objNsName, out _);

//If reflection is disabled, remove the reflections cache and stop reflecting
if (!objProperties.Allowed)
{
_directReflectionCache.Remove(objNsName, out _);
_directReflectionCache.TryRemove(objNsName, out _);
return;
}

Expand Down Expand Up @@ -624,6 +648,94 @@ private async Task ResourceReflect(NamespacedName sourceNsName, NamespacedName r
protected abstract Task<TResource[]> OnResourceWithNameList(string itemRefName,
CancellationToken cancellationToken);

private IEnumerable<NamespacedName> AutoMirrorSourceCandidates() =>
_autoSources.Keys.Union(_rememberedAutoMirrorSources.Keys);

private void UpdateRememberedAutoMirror(NamespacedName sourceNsName, MirroringProperties properties)
{
if (properties.IsAutoMirrorSource())
_rememberedAutoMirrorSources.TryAdd(sourceNsName, 0);
else
_rememberedAutoMirrorSources.TryRemove(sourceNsName, out _);
}

/// <summary>
/// Updates the remembered auto-mirror source set from a live API object (cluster discovery or watch).
/// </summary>
protected void RefreshRememberedAutoMirrorFromResource(TResource obj)
{
UpdateRememberedAutoMirror(obj.NamespacedName(), obj.GetMirroringProperties());
}

/// <summary>
/// One-time LIST of the cluster so auto-mirror sources are known before the informer replay
/// catches up (fresh pod or race). Default: no-op for tests or specialized mirrors.
/// </summary>
protected virtual Task DiscoverAutoMirrorSourcesFromClusterAsync(CancellationToken cancellationToken) =>
Task.CompletedTask;

private async Task EnsureClusterAutoSourcesDiscoveredOnceAsync(CancellationToken cancellationToken)
{
if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0)
return;

await _clusterDiscoveryLock.WaitAsync(cancellationToken);
try
{
if (Volatile.Read(ref _clusterDiscoveryCompleted) != 0)
return;

await DiscoverAutoMirrorSourcesFromClusterAsync(cancellationToken);
Volatile.Write(ref _clusterDiscoveryCompleted, 1);
Logger.LogDebug(
"Completed one-time cluster scan for auto-mirror {ResourceKind} sources ({Count} remembered)",
typeof(TResource).Name,
_rememberedAutoMirrorSources.Count);
}
finally
{
_clusterDiscoveryLock.Release();
}
}

/// <summary>
/// Ensures auto-mirror sources are discoverable (cold start) and loaded into the cache before
/// namespace-driven reconciliation when the watch has not replayed yet.
/// </summary>
private async Task EnsureEagerAutoMirrorSourcesPrimed(CancellationToken cancellationToken)
{
await EnsureClusterAutoSourcesDiscoveredOnceAsync(cancellationToken);

var candidates = AutoMirrorSourceCandidates().ToArray();
if (candidates.Length == 0)
return;

if (candidates.All(ns => _propertiesCache.ContainsKey(ns)))
return;

await _eagerAutoMirrorPrimeLock.WaitAsync(cancellationToken);
try
{
foreach (var nsName in candidates)
{
if (_propertiesCache.ContainsKey(nsName))
continue;
var obj = await TryResourceGet(nsName, cancellationToken);
if (obj is null)
{
Logger.LogDebug("Eager auto-mirror source {Source} not available yet", nsName);
continue;
}

await HandleUpsert(obj, cancellationToken);
}
}
finally
{
_eagerAutoMirrorPrimeLock.Release();
}
}

private async Task<TResource?> TryResourceGet(NamespacedName resourceNsName,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -700,4 +812,4 @@ private bool CanBeAutoReflectedToNamespaceCached(MirroringProperties properties,

return properties.CanBeAutoReflectedToNamespace(ns);
}
}
}
15 changes: 15 additions & 0 deletions src/ES.Kubernetes.Reflector/Mirroring/SecretMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ namespace ES.Kubernetes.Reflector.Mirroring;
public class SecretMirror(ILogger<SecretMirror> logger, IKubernetes kubernetesClient)
: ResourceMirror<V1Secret>(logger, kubernetesClient)
{
protected override async Task DiscoverAutoMirrorSourcesFromClusterAsync(
CancellationToken cancellationToken)
{
string? continueParameter = null;
do
{
var list = await Kubernetes.CoreV1.ListSecretForAllNamespacesAsync(
continueParameter: continueParameter,
cancellationToken: cancellationToken);
foreach (var secret in list.Items)
RefreshRememberedAutoMirrorFromResource(secret);
continueParameter = list.Metadata?.ContinueProperty;
} while (!string.IsNullOrEmpty(continueParameter));
}

protected override async Task<V1Secret[]> OnResourceWithNameList(string itemRefName,
CancellationToken cancellationToken) =>
[
Expand Down
26 changes: 26 additions & 0 deletions tests/ES.Kubernetes.Reflector.Tests/LabelSelectorMatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,30 @@ public void GetLabelSelectorErrors_EmptySelectors_ReturnsEmpty()
};
Assert.Empty(props.GetLabelSelectorErrors());
}

[Fact]
public void IsAutoMirrorSource_TrueWhenAllowedAndAutoEnabled()
{
var props = new MirroringProperties
{
Allowed = true,
AutoEnabled = true,
AllowedNamespaces = "^staging$"
};
Assert.True(props.IsAutoMirrorSource());
}

[Fact]
public void IsAutoMirrorSource_FalseWhenAutoDisabled()
{
var props = new MirroringProperties { Allowed = true, AutoEnabled = false };
Assert.False(props.IsAutoMirrorSource());
}

[Fact]
public void IsAutoMirrorSource_FalseWhenNotAllowed()
{
var props = new MirroringProperties { Allowed = false, AutoEnabled = true };
Assert.False(props.IsAutoMirrorSource());
}
}