diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b9ba1db78..d3b9349f0 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -22,7 +22,9 @@ "Bash(git commit -m \"$\\(cat <<''EOF''\nRemoved unused QueueFlag and associated tests\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions diff)", "Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions log --oneline -5)", - "Bash(git commit:*)" + "Bash(git commit:*)", + "Bash(git checkout:*)", + "Bash(git push:*)" ], "deny": [], "ask": [] diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index d6f646c5a..5102650c2 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -561,20 +561,20 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, exceptionThrowingSerializer, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( @@ -624,20 +624,20 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, minimumTimeout, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout); var queueClient = await queueManager.CreateQueueClient(); @@ -685,20 +685,20 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); // Pull envelope for specific receiver diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs index daad1c5a7..35c29e7cd 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs @@ -36,20 +36,20 @@ protected async Task MessagesSunshineScenario(Task functionStore var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -88,20 +88,20 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100)); @@ -135,20 +135,20 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -187,20 +187,20 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -240,20 +240,20 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message = await queueClient.Pull( workflow, @@ -294,20 +294,20 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); var message2 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -350,20 +350,20 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); @@ -404,20 +404,20 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) }, - flowsManager + SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) } ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); lock (queueClients) queueClients[workflow.FlowId.Instance.Value] = queueClient; @@ -470,20 +470,20 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); while (true) @@ -538,20 +538,20 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) @@ -568,20 +568,20 @@ async Task (workflow) => var flowTimeouts = new FlowTimeouts(); var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); + var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, workflow.StoredId, functionStore.MessageStore, DefaultSerializer.Instance, workflow.Effect, + flowState, unhandledExceptionHandler, flowTimeouts, () => DateTime.UtcNow, - SettingsWithDefaults.Default, - flowsManager + SettingsWithDefaults.Default ); - flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts); var queueClient = await queueManager.CreateQueueClient(); for (var i = 0; i < 10; i++) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs index 066c45523..51483e830 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs @@ -1,7 +1,6 @@ using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; @@ -12,9 +11,8 @@ public class FlowState private readonly TaskCompletionSource _suspendedTcs = new(); public StoredId Id { get; } - public QueueManager QueueManager { get; } - public int Threads { get; private set; } - public int WaitingThreads { get; private set; } + public int Subflows { get; private set; } + public int WaitingSubflows { get; private set; } public FlowTimeouts Timeouts { get; } public AsyncSignal InterruptSignal { get; } = new(); public bool Suspended { get; private set; } @@ -22,44 +20,42 @@ public class FlowState public FlowState( StoredId id, - QueueManager queueManager, - int threads, - int waitingThreads, + int subflows, + int waitingSubflows, FlowTimeouts timeouts) { Id = id; - QueueManager = queueManager; - Threads = threads; - WaitingThreads = waitingThreads; + Subflows = subflows; + WaitingSubflows = waitingSubflows; Timeouts = timeouts; SuspendedTask = _suspendedTcs.Task; } - public void NewThreadStarted() + public void SubflowStarted() { lock (_lock) - Threads++; + Subflows++; } - public void ThreadCompleted() + public void SubflowCompleted() { lock (_lock) - Threads--; + Subflows--; } - public void ThreadSuspended() + public void SubflowWaiting() { lock (_lock) - WaitingThreads++; + WaitingSubflows++; } - public bool ResumeThread() + public bool ResumeSubflow() { lock (_lock) if (Suspended) return false; else - WaitingThreads--; + WaitingSubflows--; return true; } @@ -67,12 +63,10 @@ public bool ResumeThread() public void Interrupt() { lock (_lock) - { if (Suspended) return; - - WaitingThreads = 0; - } + else + WaitingSubflows = 0; InterruptSignal.Fire(); } @@ -80,7 +74,7 @@ public void Interrupt() public bool Suspend() { lock (_lock) - if (Threads == WaitingThreads) + if (Subflows == WaitingSubflows) Suspended = true; else return false; diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index bba1b18b8..9cb3482b6 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; -using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; @@ -44,10 +43,10 @@ private async Task TimeoutCheckLoop() public void Dispose() => _disposed = true; - public FlowState AddFlow(StoredId id, QueueManager queueManager, FlowTimeouts timeouts) + public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts) { lock (_lock) - return _dict[id] = new FlowState(id, queueManager, threads: 1, waitingThreads: 0, timeouts); + return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);; } public void RemoveFlow(StoredId id, FlowState flowState) @@ -60,61 +59,25 @@ public void RemoveFlow(StoredId id, FlowState flowState) public async Task Interrupt(IReadOnlyList ids) { await _functionStore.ResetInterrupted(ids); - + lock (_lock) foreach (var id in ids) - { - if (!_dict.TryGetValue(id, out var flowState)) - continue; - - flowState.Interrupt(); - Task.Run(() => flowState.QueueManager.FetchMessagesOnce()); - } + if (_dict.TryGetValue(id, out var flowState)) + flowState.Interrupt(); } public void StartThread(StoredId id) { lock (_lock) if (_dict.TryGetValue(id, out var flowState)) - flowState.NewThreadStarted(); + flowState.SubflowStarted(); } public void CompleteThread(StoredId id) { lock (_lock) if (_dict.TryGetValue(id, out var flowState)) - flowState.ThreadCompleted(); - } - - public bool ThreadResumed(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - return flowState.ResumeThread(); - - return false; - } - - public void SuspendThread(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - flowState.ThreadSuspended(); - } - - public Task GetInterruptedSignal(StoredId id) - { - lock (_lock) - return _dict.TryGetValue(id, out var flowState) - ? flowState.InterruptSignal.Wait() - : Task.CompletedTask; - } - - public void SignalInterrupt(StoredId id) - { - lock (_lock) - if (_dict.TryGetValue(id, out var flowState)) - flowState.InterruptSignal.Fire(); + flowState.SubflowCompleted(); } [DoesNotReturn] diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 5970a5cab..ef0a08026 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -433,8 +433,8 @@ public async Task CreateExistingSemaphores(FlowId flowId) public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect, FlowsManager flowsManager) => new(effect, _functionStore.SemaphoreStore, storedId, Interrupt, flowsManager); - public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler, FlowsManager flowsManager) - => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings, flowsManager); + public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler) + => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings); public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 4fb5d13d7..10dc56060 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -54,19 +54,15 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta if (parentWorkflow.Effect.Contains(scheduledAlreadyParentId!)) return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); - var (created, workflow, disposables, queueManager, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); + var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask); - CurrentFlow._workflow.Value = workflow; if (!created) return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); + CurrentFlow._workflow.Value = workflow; + var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -158,15 +154,10 @@ public Task> BulkSchedule(IEnumerable> public async Task> ScheduleRestart(StoredId storedId) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId); var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -217,15 +208,10 @@ public async Task> ScheduleRestart(StoredId storedId) internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Action onCompletion) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf); var flowId = new FlowId(_flowType, humanInstanceId); var tcs = new TaskCompletionSource(); - var flowState = _flowsManager.AddFlow( - storedId, - queueManager, - timeouts - ); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -276,6 +262,16 @@ await _invocationHelper.PersistFunctionInStore( disposables.Add(isWorkflowRunningDisposable); success = persisted; + if (!persisted) + return new PreparedInvocation( + Persisted: false, + Workflow: null!, + Disposable.Combine(disposables), + QueueManager: null!, + FlowState: null!, + FlowTimeouts: null! + ); + var flowTimeouts = new FlowTimeouts(); var effect = _invocationHelper.CreateEffect( @@ -289,7 +285,9 @@ await _invocationHelper.PersistFunctionInStore( var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager); + + var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); @@ -299,6 +297,7 @@ await _invocationHelper.PersistFunctionInStore( workflow, Disposable.Combine(disposables), queueManager, + flowState, flowTimeouts ); } @@ -312,7 +311,7 @@ await _invocationHelper.PersistFunctionInStore( if (!success) Disposable.Combine(disposables).Dispose(); } } - private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null); + private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowState FlowState, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null); private async Task PrepareForReInvocation(StoredId storedId) { @@ -340,7 +339,8 @@ private async Task PrepareForReInvocation(StoredId storedI var correlations = _invocationHelper.CreateCorrelations(flowId); var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); - var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager); + var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); + var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); @@ -364,6 +364,7 @@ private async Task PrepareForReInvocation(StoredId storedI workflow, Disposable.Combine(disposables), queueManager, + flowState, flowTimeouts, parent, storageSession @@ -383,6 +384,7 @@ private record PreparedReInvocation( Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, + FlowState FlowState, FlowTimeouts FlowTimeouts, StoredId? Parent, IStorageSession? StorageSession diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 3802a1261..c7349e20c 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -6,6 +6,8 @@ using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -19,11 +21,11 @@ public class QueueManager( IMessageStore messageStore, ISerializer serializer, Effect effect, + FlowState flowState, UnhandledExceptionHandler unhandledExceptionHandler, FlowTimeouts timeouts, UtcNow utcNow, SettingsWithDefaults settings, - FlowsManager flowsManager, int maxIdempotencyKeyCount = 100, TimeSpan? maxIdempotencyKeyTtl = null) : IDisposable @@ -37,6 +39,7 @@ public class QueueManager( private IdempotencyKeys? _idempotencyKeys; private int _nextToRemoveIndex = 0; + private readonly AsyncSignal _interruptSignal = new(); private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1); private bool _initialized = false; private volatile bool _disposed; @@ -81,6 +84,16 @@ private async Task Initialize() { _semaphoreSlim.Release(); } + + _ = Task.Run(async () => + { + while (!_disposed) + { + await flowState.InterruptSignal.Wait(); + if (!_disposed) + await FetchMessagesOnce(); + } + }); await FetchMessagesOnce(); _ = Task.Run(FetchMessages); @@ -151,14 +164,14 @@ public async Task FetchMessagesOnce() } finally { - flowsManager.SignalInterrupt(storedId); + _interruptSignal.Fire(); _semaphoreSlim.Release(); } } private (MessageData? Matched, int PositionToRemoveIndex, Task PulseTask) TryTakeMessage(MessagePredicate predicate) { - var interruptedSignal = flowsManager.GetInterruptedSignal(storedId); + var interruptedSignal = _interruptSignal.Wait(); lock (_lock) { @@ -271,21 +284,18 @@ public async Task AfterFlush() timeouts.RemoveTimeout(timeoutId); return matched.Envelope; } - - flowsManager.SuspendThread(storedId); + + flowState.SubflowWaiting(); await Task.WhenAny(interruptSignal, timeoutTask, maxWaitTask); - var success = flowsManager.ThreadResumed(storedId); + var success = flowState.ResumeSubflow(); if (!success) await new TaskCompletionSource().Task; - + if (timeoutTask.IsCompleted) return null; if (maxWaitTask.IsCompleted) - { - await flowsManager.Suspend(storedId); - return null; - } + throw new SuspendInvocationException(); } }