diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index e015e8bb3..c356f275f 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -561,7 +561,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); effect.TryGet("alias", out _).ShouldBeFalse(); @@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); // Verify the effect is immediately available (eager loading) effect.TryGet("test_alias", out var result).ShouldBeTrue(); @@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task< session, clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush); result.ShouldBe("hello world"); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs index 403c0a8d5..bdcc9d09d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs @@ -4,6 +4,7 @@ using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Storage; using Cleipnir.ResilientFunctions.Tests.Utils; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -43,7 +44,7 @@ public void PrintSingleCompletedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = "└─ ✓ [1]\n"; @@ -75,7 +76,7 @@ public void PrintEffectWithAlias() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = "└─ ✓ [1] my-effect\n"; @@ -111,7 +112,7 @@ public void PrintFailedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n"; @@ -143,7 +144,7 @@ public void PrintStartedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = "└─ ⋯ [1] in-progress\n"; @@ -189,7 +190,7 @@ public void PrintEffectHierarchy() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = @@ -245,7 +246,7 @@ public void PrintDeepEffectHierarchy() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = @@ -295,7 +296,7 @@ public void PrintMultipleRootEffects() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = @@ -330,7 +331,7 @@ public void PrintComplexEffectTree() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = @@ -365,7 +366,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect() clearChildren: false ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = @@ -398,7 +399,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors() clearChildren: false ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts())); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance)); var output = effect.ExecutionTree(); var expected = diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs index b9fd4d41d..b001c93a4 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs @@ -1,10 +1,18 @@ using System.Threading; using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; +public enum FlowStatus +{ + Running = 0, + Waiting = 1, + Completed = 2 +} + public class FlowState { private readonly Lock _lock = new(); @@ -18,17 +26,42 @@ public class FlowState public bool Suspended { get; private set; } public Task SuspendedTask { get; } + private FlowStatus _status = FlowStatus.Running; + public FlowStatus Status + { + get + { + lock (_lock) + return _status; + } + set + { + lock (_lock) + if (_status != FlowStatus.Completed) + _status = value; + } + } + public FlowState( StoredId id, int subflows, int waitingSubflows, - FlowTimeouts timeouts) + FlowTimeouts timeouts, + Task completed) { Id = id; Subflows = subflows; WaitingSubflows = waitingSubflows; Timeouts = timeouts; SuspendedTask = _suspendedTcs.Task; + + _ = completed.ContinueWith(_ => Status = FlowStatus.Completed); + } + + public bool Waiting() + { + lock (_lock) + return Subflows == WaitingSubflows; } public void SubflowStarted() @@ -49,25 +82,20 @@ public void SubflowWaiting() WaitingSubflows++; } - public bool TryResumeSubflow() + public Task ResumeSubflow() { lock (_lock) if (Suspended) - return false; + return ForeverTask.Instance; else WaitingSubflows--; - return true; + return Task.CompletedTask; } public void Interrupt() { - lock (_lock) - if (Suspended) - return; - else - WaitingSubflows = 0; - + if (Suspended) return; QueueManager?.Interrupt(); } diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index 361d8343c..870d79a52 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -11,10 +11,10 @@ public class FlowsManager private readonly Dictionary _dict = new(); private readonly Lock _lock = new(); - public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts) + public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts, Task completed) { lock (_lock) - return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts); + return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts, completed); } public void RemoveFlow(StoredId id, FlowState flowState) @@ -38,4 +38,23 @@ public void Interrupt(IReadOnlyList ids) flowState.Interrupt(); } + /* + public async Task CheckForSuspension() + { + while (true) + { + var waitingFlows = new List(); + lock (_dict) + { + waitingFlows = _dict.Values.Where(s => s.) + foreach (var flowState in _dict.Values) + { + flowState. + } + } + + await Task.Delay(250); + } + }*/ + } diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index b9aab539a..e861ea04a 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -51,7 +51,8 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta if (parentWorkflow.Effect.Contains(scheduledAlreadyParentId!)) return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach); - var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState); + var tcs = new TaskCompletionSource(); + var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState, tcs.Task); await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask); if (!created) @@ -59,7 +60,6 @@ public async Task> ScheduleInvoke(FlowInstance flowInsta CurrentFlow._workflow.Value = workflow; - var tcs = new TaskCompletionSource(); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -151,10 +151,10 @@ public Task> BulkSchedule(IEnumerable> public async Task> ScheduleRestart(StoredId storedId) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId); + var tcs = new TaskCompletionSource(); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, tcs.Task); var flowId = new FlowId(_flowType, humanInstanceId); - var tcs = new TaskCompletionSource(); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -205,10 +205,10 @@ public async Task> ScheduleRestart(StoredId storedId) internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Action onCompletion) { - var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf); + var tcs = new TaskCompletionSource(); + var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf, tcs.Task); var flowId = new FlowId(_flowType, humanInstanceId); - var tcs = new TaskCompletionSource(); _ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId))); _ = Task.Run(async () => { @@ -237,7 +237,7 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act }); } - private async Task PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState) + private async Task PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState, Task completed) { var disposables = new List(capacity: 3); var success = false; @@ -270,7 +270,7 @@ await _invocationHelper.PersistFunctionInStore( ); var flowTimeouts = new FlowTimeouts(); - var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); + var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed); var effect = _invocationHelper.CreateEffect( storedId, @@ -307,16 +307,16 @@ await _invocationHelper.PersistFunctionInStore( } private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowState FlowState, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null); - private async Task PrepareForReInvocation(StoredId storedId) + private async Task PrepareForReInvocation(StoredId storedId, Task completed) { var restartedFunction = await _invocationHelper.RestartFunction(storedId); if (restartedFunction == null) throw UnexpectedStateException.ConcurrentModification(storedId); - return await PrepareForReInvocation(storedId, restartedFunction); + return await PrepareForReInvocation(storedId, restartedFunction, completed); } - private async Task PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction) + private async Task PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction, Task completed) { var disposables = new List(capacity: 3); try @@ -328,7 +328,7 @@ private async Task PrepareForReInvocation(StoredId storedI disposables.Add(isWorkflowRunningDisposable); var flowTimeouts = new FlowTimeouts(); - var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); + var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed); var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, flowState); diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index d355d481d..8752a479d 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -152,8 +152,7 @@ internal void Interrupt() } finally { - if (!_flowState.TryResumeSubflow()) - await ForeverTask.Instance; + await _flowState.ResumeSubflow(); await delayCts.CancelAsync(); delayCts.Dispose();