From c1090802b5abf8aafead57c0492790524cf8727a Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 18 Aug 2025 14:40:37 +0200 Subject: [PATCH 1/2] feat(Core): Updated the spec of the `Operator` resource by adding a new `Cleanup` property, used to configure the clean-up of managed workflow instances feat(Operator): Updated the `WorkflowInstanceController` to run a background loop to clean-up outlived workflow instances Signed-off-by: Charles d'Avernas --- .../Resources/OperatorCleanupOptions.cs | 35 ++++++++++++ .../Synapse.Core/Resources/OperatorSpec.cs | 6 ++ .../Resources/RunnerConfiguration.cs | 1 + .../Services/WorkflowInstanceController.cs | 55 ++++++++++++++++++- .../Services/WorkflowInstanceHandler.cs | 1 - 5 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs diff --git a/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs b/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs new file mode 100644 index 000000000..0bbb2f5fa --- /dev/null +++ b/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs @@ -0,0 +1,35 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Resources; + +/// +/// Represents the options used to configure the cleanup behavior of a Synapse Operator +/// +[DataContract] +public record OperatorCleanupOptions +{ + + /// + /// Gets or sets the time to live for completed workflow instances. Defaults to 7 days. If null, the operator will not delete completed workflow instances. + /// + [DataMember(Order = 1, Name = "ttl"), JsonPropertyOrder(1), JsonPropertyName("ttl"), YamlMember(Order = 1, Alias = "ttl")] + public TimeSpan? Ttl { get; set; } = TimeSpan.FromDays(7); + + /// + /// Gets or sets the interval at which the operator sweeps for completed workflow instances to delete. Defaults to 5 minutes. + /// + [DataMember(Order = 2, Name = "interval"), JsonPropertyOrder(2), JsonPropertyName("interval"), YamlMember(Order = 2, Alias = "interval")] + public TimeSpan Interval { get; set; } = TimeSpan.FromMinutes(5); + +} diff --git a/src/core/Synapse.Core/Resources/OperatorSpec.cs b/src/core/Synapse.Core/Resources/OperatorSpec.cs index fc420dbf3..3474c5b4c 100644 --- a/src/core/Synapse.Core/Resources/OperatorSpec.cs +++ b/src/core/Synapse.Core/Resources/OperatorSpec.cs @@ -36,4 +36,10 @@ public record OperatorSpec [DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")] public virtual IDictionary? Selector { get; set; } + /// + /// Gets/sets options controlling retention of completed workflow instances and cleanup sweep behavior. + /// + [DataMember(Order = 3, Name = "cleanup"), JsonPropertyOrder(3), JsonPropertyName("cleanup"), YamlMember(Order = 3, Alias = "cleanup")] + public virtual OperatorCleanupOptions? Cleanup { get; set; } = new(); + } diff --git a/src/core/Synapse.Core/Resources/RunnerConfiguration.cs b/src/core/Synapse.Core/Resources/RunnerConfiguration.cs index f50562648..bc537acfb 100644 --- a/src/core/Synapse.Core/Resources/RunnerConfiguration.cs +++ b/src/core/Synapse.Core/Resources/RunnerConfiguration.cs @@ -55,4 +55,5 @@ public record RunnerConfiguration /// [DataMember(Order = 5, Name = "publishLifecycleEvents"), JsonPropertyOrder(5), JsonPropertyName("publishLifecycleEvents"), YamlMember(Order = 5, Alias = "publishLifecycleEvents")] public virtual bool? PublishLifecycleEvents { get; set; } = true; + } \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index b4c289039..aa9c49340 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.Services; namespace Synapse.Operator.Services; @@ -61,6 +60,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) await base.StartAsync(cancellationToken).ConfigureAwait(false); this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); + if (this.Operator?.Resource?.Spec?.Cleanup != null)_ = Task.Run(() => this.CleanupAsync(), CancellationTokenSource.Token); } /// @@ -70,6 +70,59 @@ protected override Task ReconcileAsync(CancellationToken cancellationToken = def return base.ReconcileAsync(cancellationToken); } + /// + protected virtual async Task CleanupAsync() + { + while (!CancellationTokenSource.IsCancellationRequested) + { + try + { + if (this.Operator?.Resource?.Spec?.Cleanup == null) break; + var cutoff = DateTimeOffset.UtcNow - this.Operator?.Resource?.Spec?.Cleanup.Ttl; + var deleted = 0; + var selectors = this.Options.LabelSelectors; + + await foreach (var instance in this.Repository.GetAllAsync(labelSelectors: selectors, cancellationToken: CancellationTokenSource.Token)) + { + if (Handlers.ContainsKey(instance.GetQualifiedName())) continue; + if (instance.IsOperative) continue; + var finishedAt = instance.Status?.EndedAt ?? instance.Metadata.CreationTimestamp; + if (finishedAt <= cutoff && !this.Handlers.ContainsKey(instance.GetQualifiedName())) + { + try { await this.TryReleaseAsync(instance, CancellationTokenSource.Token).ConfigureAwait(false); } catch { } + try + { + await this.Repository.RemoveAsync(instance.GetName(), instance.GetNamespace(), false, CancellationTokenSource.Token).ConfigureAwait(false); + deleted++; + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Failed to delete expired workflow instance {instance}", instance.GetQualifiedName()); + } + } + } + } + catch (OperationCanceledException) when (CancellationTokenSource.Token.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + this.Logger.LogError(ex, "Instance cleanup sweep failed"); + try { await Task.Delay(TimeSpan.FromSeconds(5), CancellationTokenSource.Token).ConfigureAwait(false); } catch { } + } + try + { + var delay = this.Operator?.Resource?.Spec?.Cleanup?.Interval ?? TimeSpan.FromMinutes(5); + await Task.Delay(delay, CancellationTokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested) + { + break; + } + } + } + /// /// Creates a new for the specified workflow /// diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs index d0de68eb7..a556834fb 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs @@ -12,7 +12,6 @@ // limitations under the License. using Neuroglia.Data.Infrastructure.Services; -using Neuroglia.Mediation; using System.Text; namespace Synapse.Operator.Services; From 828ed722d0d86dea6a533710940137ba4e4bafa2 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 18 Aug 2025 14:40:58 +0200 Subject: [PATCH 2/2] Include changes that were left out Signed-off-by: Charles d'Avernas --- .../Synapse.Operator/Services/WorkflowInstanceController.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index aa9c49340..8f8b8b834 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -60,7 +60,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) await base.StartAsync(cancellationToken).ConfigureAwait(false); this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); - if (this.Operator?.Resource?.Spec?.Cleanup != null)_ = Task.Run(() => this.CleanupAsync(), CancellationTokenSource.Token); + if (this.Operator?.Resource?.Spec?.Cleanup != null)_ = Task.Run(this.CleanupAsync, CancellationTokenSource.Token); } ///