Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2ae8596
feat: make queue and reconciliation leader-aware
kimpenhaus May 30, 2026
8fa468a
chore: allow inheritance for LeaderElectiontype.Custom
kimpenhaus Jun 2, 2026
73621d0
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 2, 2026
61b583e
chore: allow ReconcileSingle to be overridden
kimpenhaus Jun 2, 2026
5dab455
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 3, 2026
47c6934
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 12, 2026
e2d3810
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 12, 2026
ed53402
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 18, 2026
b526621
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 18, 2026
9d8484d
feat: add operator registration validation framework
kimpenhaus Jun 20, 2026
1b3a8a1
fix(operator)!: clear queue and gate intake on leadership loss
kimpenhaus Jun 20, 2026
6d1a78a
feat(operator): add logging for queue intake suspension and leadershi…
kimpenhaus Jun 21, 2026
d1f53b6
feat(operator): handle queue drops on leadership loss and ensure dedu…
kimpenhaus Jun 21, 2026
c4085b7
feat(operator): ensure idempotent queue processing loops and proper d…
kimpenhaus Jun 21, 2026
72d561c
fix(operator): prevent ObjectDisposedException in queue loops on lead…
kimpenhaus Jun 21, 2026
1104206
docs(operator): clarify behavior of queue suspension and reconciliati…
kimpenhaus Jun 21, 2026
9c25269
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 21, 2026
5dffd3e
fix(operator): ensure graceful shutdown of queue processing loops and…
kimpenhaus Jun 21, 2026
74d4def
refactor(operator): implement `RestartableHostedService` for enhanced…
kimpenhaus Jun 21, 2026
dcd1f0e
docs(operator): describe behavior of queue-consumer services based on…
kimpenhaus Jun 21, 2026
49597fa
refactor(operator): centralize leadership event handling with `Leader…
kimpenhaus Jun 21, 2026
afb40a6
fix(operator): ensure proper loop restart and reconciliation metrics …
kimpenhaus Jun 21, 2026
6cae0b9
fix(operator): enhance cache tagging, loop restart logic, and backoff…
kimpenhaus Jun 22, 2026
487b7b7
fix(operator): enforce FusionCache tagging and enhance leadership fau…
kimpenhaus Jun 22, 2026
e4691ee
test(operator): add tests for backoff escalation, reset logic, and le…
kimpenhaus Jun 22, 2026
5d4f222
refactor(operator): simplify comments in `OperatorRegistrationValidat…
kimpenhaus Jun 22, 2026
9a28669
refactor(operator): remove redundant comments in `OperatorBuilder` re…
kimpenhaus Jun 22, 2026
66c6e64
fix(operator): propagate `CancellationToken` in watcher error handlin…
kimpenhaus Jun 22, 2026
82642b3
refactor(operator): remove unnecessary fields, destructor, and redund…
kimpenhaus Jun 22, 2026
d1382a2
refactor(operator): simplify comment for `TimedEntityQueue.ReadyCount`
kimpenhaus Jun 22, 2026
d24d462
Merge branch 'main' into bugfix/clear-queue-when-no-leadership
kimpenhaus Jun 22, 2026
608ce0f
refactor(operator): improve argument clarity in `TimedEntityQueue.Add…
kimpenhaus Jun 22, 2026
5f97f4a
test(operator): remove redundant comments in `LeaderAwareResourceWatc…
kimpenhaus Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 166 additions & 15 deletions docs/docs/operator/advanced-configuration.mdx

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions docs/docs/operator/building-blocks/controllers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ public class V1DemoEntityController(
V1DemoEntity entity,
CancellationToken cancellationToken)
{
// Schedule a follow-up reconciliation in 30 seconds without returning a result
queue(entity, ReconciliationType.Modified, ReconciliationTriggerSource.Operator,
// Schedule a follow-up reconciliation in 30 seconds without returning a result.
// The delegate returns Task<bool>; await it (false means the enqueue was dropped).
await queue(entity, ReconciliationType.Modified, ReconciliationTriggerSource.Operator,
TimeSpan.FromSeconds(30), retryCount: 0, cancellationToken);

return ReconciliationResult<V1DemoEntity>.Success(entity);
Expand Down
12 changes: 12 additions & 0 deletions docs/docs/operator/caching.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ provided that configures the **active** cache (whichever strategy is selected).
caches individually, register named FusionCache instances directly on the DI container using the
FusionCache builder API.

:::warning[Tagging must stay enabled]
The resource watcher tags every deduplication entry with its entity type so that a leadership-aware
watcher can, on leadership loss, drop only that type's entries via FusionCache's `RemoveByTag` (without
wiping the dedup tokens of other entity types that share the same named cache). FusionCache tagging is
enabled by default — if you supply a custom cache configuration, do **not** disable it
(`FusionCacheOptions.DisableTagging`), otherwise the tagged writes throw at runtime.

If you enable `OperatorSettings.ValidateRegistrations`, this is checked at startup: a cache with tagging
disabled aborts host startup with a clear `InvalidRegistrationException` instead of failing later at
runtime.
:::

**Example: L2 cache for the default `ByGeneration` strategy**

```csharp
Expand Down
2 changes: 0 additions & 2 deletions examples/OtelOperator/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
using KubeOps.Operator.Web.Builder;

using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;

const string operatorName = "otel-operator";

Expand Down
14 changes: 14 additions & 0 deletions src/KubeOps.Abstractions/Builder/OperatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,18 @@ public sealed record OperatorSettings
/// scrape the metrics, register an OpenTelemetry exporter for the meter named <see cref="Name"/>.
/// </remarks>
public required bool EnableMetrics { get; init; }

/// <summary>
/// Indicates whether the operator validates, on host startup, that its dependency injection
/// registrations are complete and consistent with the configured
/// <see cref="LeaderElectionType"/> and <see cref="QueueStrategy"/>. Disabled by default.
/// </summary>
/// <remarks>
/// When enabled, the operator verifies — for every managed entity — that the components implied by
/// the configuration are registered. If anything is missing, host startup aborts with an
/// <c>InvalidRegistrationException</c> listing the gaps. This catches registration mistakes
/// (for example a forgotten watcher or queue consumer in a manually wired setup) that would
/// otherwise let the operator start without processing any resources.
/// </remarks>
public bool ValidateRegistrations { get; init; }
}
8 changes: 8 additions & 0 deletions src/KubeOps.Abstractions/Builder/OperatorSettingsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public sealed partial class OperatorSettingsBuilder
/// </summary>
public bool EnableMetrics { get; set; } = true;

/// <summary>
/// Indicates whether the operator validates, on host startup, that its dependency injection
/// registrations are complete and consistent with the configuration. Disabled by default. See
/// <see cref="OperatorSettings.ValidateRegistrations"/> for details.
/// </summary>
public bool ValidateRegistrations { get; set; }

/// <summary>
/// Produces an immutable <see cref="OperatorSettings"/> record from the current configuration.
/// </summary>
Expand All @@ -132,6 +139,7 @@ public sealed partial class OperatorSettingsBuilder
ReconcileStrategy = ReconcileStrategy,
ParallelReconciliation = ParallelReconciliation.Build(),
EnableMetrics = EnableMetrics,
ValidateRegistrations = ValidateRegistrations,
};

[GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ public static OperatorSettingsBuilder WithMetrics(
return builder;
}

/// <summary>
/// Sets whether the operator validates, on host startup, that its dependency injection registrations
/// are complete and consistent with the configuration. Disabled by default. See
/// <see cref="OperatorSettings.ValidateRegistrations"/> for details.
/// </summary>
/// <param name="builder">The builder to configure.</param>
/// <param name="value"><c>true</c> to enable validation (default); <c>false</c> to disable.</param>
/// <returns>The same <paramref name="builder"/> instance for chaining.</returns>
public static OperatorSettingsBuilder WithRegistrationValidation(
this OperatorSettingsBuilder builder, bool value = true)
{
builder.ValidateRegistrations = value;
return builder;
}

/// <summary>Configures parallel reconciliation settings inline via a delegate.</summary>
/// <param name="builder">The builder to configure.</param>
/// <param name="configure">An action that configures the <see cref="ParallelReconciliationSettingsBuilder"/>.</param>
Expand Down
8 changes: 7 additions & 1 deletion src/KubeOps.Abstractions/Reconciliation/Queue/EntityQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ namespace KubeOps.Abstractions.Reconciliation.Queue;
/// <param name="cancellationToken">
/// A token to monitor for cancellation requests while waiting for the queue duration to elapse.
/// </param>
/// <returns>
/// A task whose result is <see langword="true"/> if the entity was scheduled, or <see langword="false"/> if it
/// was dropped (for example because <paramref name="cancellationToken"/> was already cancelled, or the queue's
/// intake is suspended on a leadership transition). Await it to observe failures of a custom queue and to react
/// to a dropped enqueue.
/// </returns>
/// <remarks>
/// <para>
/// This delegate is injected into controllers and other components via dependency injection to enable
Expand All @@ -62,6 +68,6 @@ namespace KubeOps.Abstractions.Reconciliation.Queue;
/// <seealso cref="ReconciliationType"/>
/// <seealso cref="ReconciliationTriggerSource"/>
/// <seealso cref="IEntityQueueFactory"/>
public delegate void EntityQueue<in TEntity>(
public delegate Task<bool> EntityQueue<in TEntity>(
TEntity entity, ReconciliationType type, ReconciliationTriggerSource reconciliationTriggerSource, TimeSpan queueIn, int retryCount, CancellationToken cancellationToken)
where TEntity : IKubernetesObject<V1ObjectMeta>;
4 changes: 1 addition & 3 deletions src/KubeOps.Operator/Builder/CacheExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ internal static class CacheExtensions
/// <returns>The modified service collection with resource watcher caching configured.</returns>
internal static IServiceCollection WithResourceWatcherEntityCaching(this IServiceCollection services, OperatorSettings settings)
{
var cacheName = settings.ReconcileStrategy == ReconcileStrategy.ByResourceVersion
? CacheConstants.CacheNames.ResourceWatcherByResourceVersion
: CacheConstants.CacheNames.ResourceWatcher;
var cacheName = CacheConstants.ResourceWatcherCacheNameFor(settings.ReconcileStrategy);

var builder = services.AddFusionCache(cacheName);

Expand Down
34 changes: 33 additions & 1 deletion src/KubeOps.Operator/Builder/OperatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace KubeOps.Operator.Builder;

internal sealed class OperatorBuilder : IOperatorBuilder
{
private OperatorRegistrationRegistry? _registrationRegistry;

public OperatorBuilder(IServiceCollection services, OperatorSettings settings)
{
Settings = settings;
Expand All @@ -51,6 +53,8 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
Services.TryAddScoped<IEntityController<TEntity>, TImplementation>();
Services.TryAddSingleton<IReconciler<TEntity>, Reconciler<TEntity>>();

RegisterRegistrationValidation(typeof(TEntity));

// Queue
Services.TryAddTransient<IEntityQueueFactory, EntityQueueFactory>();
Services.TryAddTransient<EntityQueue<TEntity>>(services =>
Expand All @@ -59,7 +63,16 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
if (Settings.QueueStrategy == QueueStrategy.InMemory)
{
Services.TryAddSingleton<ITimedEntityQueue<TEntity>, TimedEntityQueue<TEntity>>();
Services.AddHostedService<EntityQueueBackgroundService<TEntity>>();

switch (Settings.LeaderElectionType)
{
case LeaderElectionType.None:
Services.AddHostedService<EntityQueueBackgroundService<TEntity>>();
break;
case LeaderElectionType.Single:
Services.AddHostedService<LeaderAwareEntityQueueBackgroundService<TEntity>>();
break;
}
}

// Leader Election
Expand Down Expand Up @@ -108,6 +121,8 @@ public IOperatorBuilder AddFinalizer<TImplementation, TEntity>(string identifier
services.GetRequiredService<IEventFinalizerAttacherFactory>()
.Create<TImplementation, TEntity>(identifier));

RegisterRegistrationValidation(typeof(TEntity));

return this;
}

Expand Down Expand Up @@ -165,4 +180,21 @@ private void AddOperatorBase()
Services.AddLeaderElection();
}
}

private void RegisterRegistrationValidation(Type entityType)
{
if (!Settings.ValidateRegistrations)
{
return;
}

if (_registrationRegistry is null)
{
_registrationRegistry = new(Services);
Services.AddSingleton(_registrationRegistry);
Services.AddHostedService<OperatorRegistrationValidator>();
}

_registrationRegistry.Add(entityType);
}
}
34 changes: 34 additions & 0 deletions src/KubeOps.Operator/Builder/OperatorRegistrationRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using Microsoft.Extensions.DependencyInjection;

namespace KubeOps.Operator.Builder;

/// <summary>
/// Bridges build-time information to the <see cref="OperatorRegistrationValidator"/> running at host
/// startup: it tracks the entity types managed by the operator and exposes the service collection so
/// the validator can inspect the registrations.
/// </summary>
/// <param name="services">The service collection the operator is registered into.</param>
internal sealed class OperatorRegistrationRegistry(IServiceCollection services)
{
private readonly HashSet<Type> _managedEntities = [];

/// <summary>
/// Gets the entity types that are managed by the operator (i.e. that have a controller registered).
/// </summary>
public IReadOnlyCollection<Type> ManagedEntities => _managedEntities;

/// <summary>
/// Gets the service collection the operator is registered into.
/// </summary>
public IServiceCollection Services => services;

/// <summary>
/// Registers an entity type as managed by the operator.
/// </summary>
/// <param name="entityType">The entity type to register.</param>
public void Add(Type entityType) => _managedEntities.Add(entityType);
}
Loading
Loading