From eaba03e0c9b5ba9a73a6b962049a5fcd9d6aaf13 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 12 Apr 2026 10:40:39 +0200 Subject: [PATCH 1/3] Use FlowState directly in QueueManager QueueManager now holds a direct FlowState reference (attached by FlowsManager.AddFlow) instead of routing per-flow calls through FlowsManager and a dictionary lookup by StoredId. Removes the corresponding per-id dispatch methods from FlowsManager. Also includes an in-flight Threads -> Subflows rename on FlowState. --- .claude/settings.local.json | 3 +- .../MessagesSubscriptionTests.cs | 9 ++-- .../Messaging/TestTemplates/MessagesTests.cs | 33 +++++---------- .../CoreRuntime/FlowState.cs | 36 ++++++++-------- .../CoreRuntime/FlowsManager.cs | 41 ++++--------------- .../Invocation/InvocationHelper.cs | 4 +- .../CoreRuntime/Invocation/Invoker.cs | 4 +- .../Queuing/QueueManager.cs | 20 ++++----- 8 files changed, 54 insertions(+), 96 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b9ba1db78..ffcecbd79 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -22,7 +22,8 @@ "Bash(git commit -m \"$\\(cat <<''EOF''\nRemoved unused QueueFlag and associated tests\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions diff)", "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions log --oneline -5)", - "Bash(git commit:*)" + "Bash(git commit:*)", + "Bash(git checkout:*)" ], "deny": [], "ask": [] diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index d6f646c5a..d5d00322f 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -570,8 +570,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -633,8 +632,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout); @@ -694,8 +692,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs index daad1c5a7..2557c8f05 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs @@ -45,8 +45,7 @@ protected async Task MessagesSunshineScenario(Task functionStore unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -97,8 +96,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -144,8 +142,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -196,8 +193,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -249,8 +245,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -303,8 +298,7 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -359,8 +353,7 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -413,8 +406,7 @@ async Task (workflow) => unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) }, - flowsManager + SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) } ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -479,8 +471,7 @@ async Task (workflow) => unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -547,8 +538,7 @@ async Task (workflow) => unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); @@ -577,8 +567,7 @@ async Task (workflow) => unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs index 066c45523..a0205de79 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs @@ -13,8 +13,8 @@ public class FlowState public StoredId Id { get; } public QueueManager QueueManager { get; } - public int Threads { get; private set; } - public int WaitingThreads { get; private set; } + public int Subflows { get; private set; } + public int WaitingSubflows { get; private set; } public FlowTimeouts Timeouts { get; } public AsyncSignal InterruptSignal { get; } = new(); public bool Suspended { get; private set; } @@ -23,43 +23,43 @@ public class FlowState public FlowState( StoredId id, QueueManager queueManager, - int threads, - int waitingThreads, + int subflows, + int waitingSubflows, FlowTimeouts timeouts) { Id = id; QueueManager = queueManager; - Threads = threads; - WaitingThreads = waitingThreads; + Subflows = subflows; + WaitingSubflows = waitingSubflows; Timeouts = timeouts; SuspendedTask = _suspendedTcs.Task; } - public void NewThreadStarted() + public void SubflowStarted() { lock (_lock) - Threads++; + Subflows++; } - public void ThreadCompleted() + public void SubflowCompleted() { lock (_lock) - Threads--; + Subflows--; } - public void ThreadSuspended() + public void SubflowWaiting() { lock (_lock) - WaitingThreads++; + WaitingSubflows++; } - public bool ResumeThread() + public bool ResumeSubflow() { lock (_lock) if (Suspended) return false; else - WaitingThreads--; + WaitingSubflows--; return true; } @@ -67,12 +67,10 @@ public bool ResumeThread() public void Interrupt() { lock (_lock) - { if (Suspended) return; - - WaitingThreads = 0; - } + else + WaitingSubflows = 0; InterruptSignal.Fire(); } @@ -80,7 +78,7 @@ public void Interrupt() public bool Suspend() { lock (_lock) - if (Threads == WaitingThreads) + if (Subflows == WaitingSubflows) Suspended = true; else return false; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index bba1b18b8..c6d850c77 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -47,7 +47,11 @@ private async Task TimeoutCheckLoop() public FlowState AddFlow(StoredId id, QueueManager queueManager, FlowTimeouts timeouts) { lock (_lock) - return _dict[id] = new FlowState(id, queueManager, threads: 1, waitingThreads: 0, timeouts); + { + var flowState = new FlowState(id, queueManager, subflows: 1, waitingSubflows: 0, timeouts); + queueManager.AttachFlowState(flowState); + return _dict[id] = flowState; + } } public void RemoveFlow(StoredId id, FlowState flowState) @@ -76,45 +80,14 @@ public void StartThread(StoredId id) { lock (_lock) if (_dict.TryGetValue(id, out var flowState)) - flowState.NewThreadStarted(); + flowState.SubflowStarted(); } public void CompleteThread(StoredId id) { lock (_lock) if (_dict.TryGetValue(id, out var flowState)) - flowState.ThreadCompleted(); - } - - public bool ThreadResumed(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - return flowState.ResumeThread(); - - return false; - } - - public void SuspendThread(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - flowState.ThreadSuspended(); - } - - public Task GetInterruptedSignal(StoredId id) - { - lock (_lock) - return _dict.TryGetValue(id, out var flowState) - ? flowState.InterruptSignal.Wait() - : Task.CompletedTask; - } - - public void SignalInterrupt(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - flowState.InterruptSignal.Fire(); + flowState.SubflowCompleted(); } [DoesNotReturn] diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 5970a5cab..5baee51e2 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -433,8 +433,8 @@ public async Task CreateExistingSemaphores(FlowId flowId) public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect, FlowsManager flowsManager) => new(effect, _functionStore.SemaphoreStore, storedId, Interrupt, flowsManager); - public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler, FlowsManager flowsManager) - => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings, flowsManager); + public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler) + => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings); public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 4fb5d13d7..09f6efef0 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -289,7 +289,7 @@ await _invocationHelper.PersistFunctionInStore( var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); @@ -340,7 +340,7 @@ private async Task PrepareForReInvocation(StoredId storedI var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 3802a1261..caf4d7b4a 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -6,6 +6,7 @@ using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -23,12 +24,14 @@ public class QueueManager( FlowTimeouts timeouts, UtcNow utcNow, SettingsWithDefaults settings, - FlowsManager flowsManager, int maxIdempotencyKeyCount = 100, TimeSpan? maxIdempotencyKeyTtl = null) : IDisposable { private readonly Lock _lock = new(); + private FlowState _flowState = null!; + + public void AttachFlowState(FlowState flowState) => _flowState = flowState; private readonly EffectId _toRemoveNextId = new([-1, 0]); private readonly EffectId _idempotencyKeysId = new([-1, -1]); @@ -151,14 +154,14 @@ public async Task FetchMessagesOnce() } finally { - flowsManager.SignalInterrupt(storedId); + _flowState.InterruptSignal.Fire(); _semaphoreSlim.Release(); } } private (MessageData? Matched, int PositionToRemoveIndex, Task PulseTask) TryTakeMessage(MessagePredicate predicate) { - var interruptedSignal = flowsManager.GetInterruptedSignal(storedId); + var interruptedSignal = _flowState.InterruptSignal.Wait(); lock (_lock) { @@ -272,20 +275,17 @@ public async Task AfterFlush() return matched.Envelope; } - flowsManager.SuspendThread(storedId); + _flowState.SubflowWaiting(); await Task.WhenAny(interruptSignal, timeoutTask, maxWaitTask); - var success = flowsManager.ThreadResumed(storedId); + var success = _flowState.ResumeSubflow(); if (!success) await new TaskCompletionSource().Task; - + if (timeoutTask.IsCompleted) return null; if (maxWaitTask.IsCompleted) - { - await flowsManager.Suspend(storedId); - return null; - } + throw new SuspendInvocationException(); } } From 074891ec7c46231505c28008fdbc8cbfd22f0ea2 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 12 Apr 2026 11:16:36 +0200 Subject: [PATCH 2/3] Inject FlowState into QueueManager via constructor Break the FlowState <-> QueueManager construction cycle by dropping QueueManager from FlowState. FlowsManager now exposes CreateFlowState so callers can build FlowState first, pass it into QueueManager via ctor, and then register it with AddFlow(flowState). Separates the two interrupt signals: - QueueManager-local _interruptSignal fires when new messages arrive (woken by Subscribe). - flowState.InterruptSignal fires on external interrupt and is picked up by QueueManager's Initialize loop to trigger FetchMessagesOnce. FlowsManager.Interrupt no longer reaches into QueueManager; it just fires flowState.Interrupt(). --- .claude/settings.local.json | 3 +- .../MessagesSubscriptionTests.cs | 12 +++-- .../Messaging/TestTemplates/MessagesTests.cs | 44 ++++++++++++++----- .../CoreRuntime/FlowState.cs | 4 -- .../CoreRuntime/FlowsManager.cs | 23 ++++------ .../Invocation/InvocationHelper.cs | 4 +- .../CoreRuntime/Invocation/Invoker.cs | 36 +++++++-------- .../Queuing/QueueManager.cs | 26 +++++++---- 8 files changed, 87 insertions(+), 65 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index ffcecbd79..d3b9349f0 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -23,7 +23,8 @@ "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions diff)", "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions log --oneline -5)", "Bash(git commit:*)", - "Bash(git checkout:*)" + "Bash(git checkout:*)", + "Bash(git push:*)" ], "deny": [], "ask": [] diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index d5d00322f..b4d71eff1 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -561,19 +561,21 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, exceptionThrowingSerializer, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( @@ -623,19 +625,21 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, minimumTimeout); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, minimumTimeout, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); @@ -683,19 +687,21 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); // Pull envelope for specific receiver diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs index 2557c8f05..5c17cc8e1 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs @@ -36,19 +36,21 @@ protected async Task MessagesSunshineScenario(Task functionStore var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -87,19 +89,21 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100)); @@ -133,19 +137,21 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -184,19 +190,21 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -236,19 +244,21 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -289,19 +299,21 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); var message2 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -344,19 +356,21 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -397,19 +411,21 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) } ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); lock (queueClients) queueClients[workflow.FlowId.Instance.Value] = queueClient; @@ -462,19 +478,21 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); while (true) @@ -529,19 +547,21 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) @@ -558,19 +578,21 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); + flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs index a0205de79..51483e830 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs @@ -1,7 +1,6 @@ using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; @@ -12,7 +11,6 @@ public class FlowState private readonly TaskCompletionSource _suspendedTcs = new(); public StoredId Id { get; } - public QueueManager QueueManager { get; } public int Subflows { get; private set; } public int WaitingSubflows { get; private set; } public FlowTimeouts Timeouts { get; } @@ -22,13 +20,11 @@ public class FlowState public FlowState( StoredId id, - QueueManager queueManager, int subflows, int waitingSubflows, FlowTimeouts timeouts) { Id = id; - QueueManager = queueManager; Subflows = subflows; WaitingSubflows = waitingSubflows; Timeouts = timeouts; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index c6d850c77..1b5b26156 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; -using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; @@ -44,14 +43,13 @@ private async Task TimeoutCheckLoop() public void Dispose() => _disposed = true; - public FlowState AddFlow(StoredId id, QueueManager queueManager, FlowTimeouts timeouts) + public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts) + => new(id, subflows: 1, waitingSubflows: 0, timeouts); + + public void AddFlow(FlowState flowState) { lock (_lock) - { - var flowState = new FlowState(id, queueManager, subflows: 1, waitingSubflows: 0, timeouts); - queueManager.AttachFlowState(flowState); - return _dict[id] = flowState; - } + _dict[flowState.Id] = flowState; } public void RemoveFlow(StoredId id, FlowState flowState) @@ -64,16 +62,11 @@ public void RemoveFlow(StoredId id, FlowState flowState) public async Task Interrupt(IReadOnlyList ids) { await _functionStore.ResetInterrupted(ids); - + lock (_lock) foreach (var id in ids) - { - if (!_dict.TryGetValue(id, out var flowState)) - continue; - - flowState.Interrupt(); - Task.Run(() => flowState.QueueManager.FetchMessagesOnce()); - } + if (_dict.TryGetValue(id, out var flowState)) + flowState.Interrupt(); } public void StartThread(StoredId id) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 5baee51e2..ef0a08026 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -433,8 +433,8 @@ public async Task CreateExistingSemaphores(FlowId flowId) public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect, FlowsManager flowsManager) => new(effect, _functionStore.SemaphoreStore, storedId, Interrupt, flowsManager); - public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler) - => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings); + public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler) + => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings); public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 09f6efef0..d6578f3ce 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -54,7 +54,7 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta if (parentWorkflow.Effect.Contains(scheduledAlreadyParentId!)) return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); - var (created, workflow, disposables, queueManager, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); + var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask); CurrentFlow._workflow.Value = workflow; @@ -62,11 +62,7 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); + _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -158,15 +154,11 @@ public Task> BulkSchedule(IEnumerable> public async Task> ScheduleRestart(StoredId storedId) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId); var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); + _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -217,15 +209,11 @@ public async Task> ScheduleRestart(StoredId storedId) internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Action onCompletion) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf); var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); + _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -289,7 +277,9 @@ await _invocationHelper.PersistFunctionInStore( var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler); + + var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); @@ -299,6 +289,7 @@ await _invocationHelper.PersistFunctionInStore( workflow, Disposable.Combine(disposables), queueManager, + flowState, flowTimeouts ); } @@ -312,7 +303,7 @@ await _invocationHelper.PersistFunctionInStore( if (!success) Disposable.Combine(disposables).Dispose(); } } - private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null); + private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowState FlowState, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null); private async Task PrepareForReInvocation(StoredId storedId) { @@ -340,7 +331,8 @@ private async Task PrepareForReInvocation(StoredId storedI var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler); + var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); @@ -364,6 +356,7 @@ private async Task PrepareForReInvocation(StoredId storedI workflow, Disposable.Combine(disposables), queueManager, + flowState, flowTimeouts, parent, storageSession @@ -383,6 +376,7 @@ private record PreparedReInvocation( Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, + FlowState FlowState, FlowTimeouts FlowTimeouts, StoredId? Parent, IStorageSession? StorageSession diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index caf4d7b4a..c7349e20c 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -7,6 +7,7 @@ using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -20,6 +21,7 @@ public class QueueManager( IMessageStore messageStore, ISerializer serializer, Effect effect, + FlowState flowState, UnhandledExceptionHandler unhandledExceptionHandler, FlowTimeouts timeouts, UtcNow utcNow, @@ -29,9 +31,6 @@ public class QueueManager( : IDisposable { private readonly Lock _lock = new(); - private FlowState _flowState = null!; - - public void AttachFlowState(FlowState flowState) => _flowState = flowState; private readonly EffectId _toRemoveNextId = new([-1, 0]); private readonly EffectId _idempotencyKeysId = new([-1, -1]); @@ -40,6 +39,7 @@ public class QueueManager( private IdempotencyKeys? _idempotencyKeys; private int _nextToRemoveIndex = 0; + private readonly AsyncSignal _interruptSignal = new(); private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1); private bool _initialized = false; private volatile bool _disposed; @@ -84,6 +84,16 @@ private async Task Initialize() { _semaphoreSlim.Release(); } + + _ = Task.Run(async () => + { + while (!_disposed) + { + await flowState.InterruptSignal.Wait(); + if (!_disposed) + await FetchMessagesOnce(); + } + }); await FetchMessagesOnce(); _ = Task.Run(FetchMessages); @@ -154,14 +164,14 @@ public async Task FetchMessagesOnce() } finally { - _flowState.InterruptSignal.Fire(); + _interruptSignal.Fire(); _semaphoreSlim.Release(); } } private (MessageData? Matched, int PositionToRemoveIndex, Task PulseTask) TryTakeMessage(MessagePredicate predicate) { - var interruptedSignal = _flowState.InterruptSignal.Wait(); + var interruptedSignal = _interruptSignal.Wait(); lock (_lock) { @@ -274,10 +284,10 @@ public async Task AfterFlush() timeouts.RemoveTimeout(timeoutId); return matched.Envelope; } - - _flowState.SubflowWaiting(); + + flowState.SubflowWaiting(); await Task.WhenAny(interruptSignal, timeoutTask, maxWaitTask); - var success = _flowState.ResumeSubflow(); + var success = flowState.ResumeSubflow(); if (!success) await new TaskCompletionSource().Task; From b784181768afea0f686efd55b0322592f5199d4b Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 18 Apr 2026 10:19:54 +0200 Subject: [PATCH 3/3] Short-circuit PrepareForInvocation when flow already persisted When PersistFunctionInStore returns persisted=false the caller only needs the flag to take the early-return branch, so skip constructing effect, correlations, semaphores, flow state, queue manager and workflow. Also completes the CreateFlowState+AddFlow -> CreateFlow migration across the remaining Invoker call sites and the Messaging test templates. --- .../MessagesSubscriptionTests.cs | 9 ++--- .../Messaging/TestTemplates/MessagesTests.cs | 33 +++++++------------ .../CoreRuntime/FlowsManager.cs | 7 ++-- .../CoreRuntime/Invocation/Invoker.cs | 20 +++++++---- 4 files changed, 30 insertions(+), 39 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index b4d71eff1..5102650c2 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -561,7 +561,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -575,7 +575,6 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task( @@ -625,7 +624,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, minimumTimeout); + var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -639,7 +638,6 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -701,7 +699,6 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task functionStore var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -50,7 +50,6 @@ protected async Task MessagesSunshineScenario(Task functionStore SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -89,7 +88,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -103,7 +102,6 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100)); @@ -137,7 +135,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -151,7 +149,6 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -190,7 +187,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -204,7 +201,6 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -244,7 +240,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -258,7 +254,6 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -299,7 +294,7 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -313,7 +308,6 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task(workflow, workflow.Effect.CreateNextImplicitId()); var message2 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -356,7 +350,7 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -370,7 +364,6 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -411,7 +404,7 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -425,7 +418,6 @@ async Task (workflow) => SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) } ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); lock (queueClients) queueClients[workflow.FlowId.Instance.Value] = queueClient; @@ -478,7 +470,7 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -492,7 +484,6 @@ async Task (workflow) => SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); while (true) @@ -547,7 +538,7 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -561,7 +552,6 @@ async Task (workflow) => SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) @@ -578,7 +568,7 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, @@ -592,7 +582,6 @@ async Task (workflow) => SettingsWithDefaults.Default ); - flowsManager.AddFlow(flowState); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index 1b5b26156..9cb3482b6 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -43,13 +43,10 @@ private async Task TimeoutCheckLoop() public void Dispose() => _disposed = true; - public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts) - => new(id, subflows: 1, waitingSubflows: 0, timeouts); - - public void AddFlow(FlowState flowState) + public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts) { lock (_lock) - _dict[flowState.Id] = flowState; + return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);; } public void RemoveFlow(StoredId id, FlowState flowState) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index d6578f3ce..10dc56060 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -57,12 +57,12 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask); - CurrentFlow._workflow.Value = workflow; if (!created) return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); + CurrentFlow._workflow.Value = workflow; + var tcs = new TaskCompletionSource(); - _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -158,7 +158,6 @@ public async Task> ScheduleRestart(StoredId storedId) var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -213,7 +212,6 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - _flowsManager.AddFlow(flowState); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -264,6 +262,16 @@ await _invocationHelper.PersistFunctionInStore( disposables.Add(isWorkflowRunningDisposable); success = persisted; + if (!persisted) + return new PreparedInvocation( + Persisted: false, + Workflow: null!, + Disposable.Combine(disposables), + QueueManager: null!, + FlowState: null!, + FlowTimeouts: null! + ); + var flowTimeouts = new FlowTimeouts(); var effect = _invocationHelper.CreateEffect( @@ -278,7 +286,7 @@ await _invocationHelper.PersistFunctionInStore( var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts); + var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); @@ -331,7 +339,7 @@ private async Task PrepareForReInvocation(StoredId storedI var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts); + var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId);