Skip to content

Commit 05f401d

Browse files
committed
FlowState: add Status, thread invoker Task, simplify resume/interrupt
- FlowState constructor takes a completion Task and hooks ContinueWith to flip Status to Completed once the invoker's TaskCompletionSource resolves. Setter guards Completed as terminal. - Implement Waiting() as Subflows == WaitingSubflows. - Replace TryResumeSubflow() (bool) with ResumeSubflow() returning ForeverTask.Instance when suspended or Task.CompletedTask otherwise; collapses the QueueManager call site to a single await. - Drop WaitingSubflows = 0 in Interrupt(): it desynchronised the counter (later ResumeSubflow calls drove it negative, blocking future Suspend()). - Invoker creates the TCS before PrepareFor[Re]Invocation and threads tcs.Task through CreateFlowState so FlowState can observe completion. Scaffolding for centralized suspension via FlowsManager; nothing calls Suspend() yet, so behaviour is unchanged.
1 parent 65920aa commit 05f401d

7 files changed

Lines changed: 89 additions & 42 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
561561

562562
var flowTimeouts = new FlowTimeouts();
563563
var flowsManager = new FlowsManager();
564-
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
564+
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts, completed: ForeverTask.Instance);
565565
var queueManager = new QueueManager(
566566
workflow.FlowId,
567567
workflow.StoredId,
@@ -624,7 +624,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
624624
storedId = workflow.StoredId;
625625
var minimumTimeout = new FlowTimeouts();
626626
var flowsManager = new FlowsManager();
627-
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
627+
var flowState = flowsManager.CreateFlowState(workflow.StoredId, minimumTimeout, completed: ForeverTask.Instance);
628628
var queueManager = new QueueManager(
629629
workflow.FlowId,
630630
workflow.StoredId,
@@ -685,7 +685,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
685685

686686
var flowTimeouts = new FlowTimeouts();
687687
var flowsManager = new FlowsManager();
688-
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
688+
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts, completed: ForeverTask.Instance);
689689
var queueManager = new QueueManager(
690690
workflow.FlowId,
691691
workflow.StoredId,

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Cleipnir.ResilientFunctions.CoreRuntime;
55
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
66
using Cleipnir.ResilientFunctions.Domain;
7+
using Cleipnir.ResilientFunctions.Helpers;
78
using Cleipnir.ResilientFunctions.Storage;
89
using Cleipnir.ResilientFunctions.Tests.Utils;
910
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -43,7 +44,7 @@ public void PrintSingleCompletedEffect()
4344
clearChildren: true
4445
);
4546

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
47+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
4748
var output = effect.ExecutionTree();
4849

4950
var expected = "└─ ✓ [1]\n";
@@ -75,7 +76,7 @@ public void PrintEffectWithAlias()
7576
clearChildren: true
7677
);
7778

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
79+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
7980
var output = effect.ExecutionTree();
8081

8182
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +112,7 @@ public void PrintFailedEffect()
111112
clearChildren: true
112113
);
113114

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
115+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
115116
var output = effect.ExecutionTree();
116117

117118
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +144,7 @@ public void PrintStartedEffect()
143144
clearChildren: true
144145
);
145146

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
147+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
147148
var output = effect.ExecutionTree();
148149

149150
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +190,7 @@ public void PrintEffectHierarchy()
189190
clearChildren: true
190191
);
191192

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
193+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
193194
var output = effect.ExecutionTree();
194195

195196
var expected =
@@ -245,7 +246,7 @@ public void PrintDeepEffectHierarchy()
245246
clearChildren: true
246247
);
247248

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
249+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
249250
var output = effect.ExecutionTree();
250251

251252
var expected =
@@ -295,7 +296,7 @@ public void PrintMultipleRootEffects()
295296
clearChildren: true
296297
);
297298

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
299+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
299300
var output = effect.ExecutionTree();
300301

301302
var expected =
@@ -330,7 +331,7 @@ public void PrintComplexEffectTree()
330331
clearChildren: true
331332
);
332333

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
334+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
334335
var output = effect.ExecutionTree();
335336

336337
var expected =
@@ -365,7 +366,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365366
clearChildren: false
366367
);
367368

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
369+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
369370
var output = effect.ExecutionTree();
370371

371372
var expected =
@@ -398,7 +399,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398399
clearChildren: false
399400
);
400401

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
402+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
402403
var output = effect.ExecutionTree();
403404

404405
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
using System.Threading;
22
using System.Threading.Tasks;
3+
using Cleipnir.ResilientFunctions.Helpers;
34
using Cleipnir.ResilientFunctions.Queuing;
45
using Cleipnir.ResilientFunctions.Storage;
56

67
namespace Cleipnir.ResilientFunctions.CoreRuntime;
78

9+
public enum FlowStatus
10+
{
11+
Running = 0,
12+
Waiting = 1,
13+
Completed = 2
14+
}
15+
816
public class FlowState
917
{
1018
private readonly Lock _lock = new();
@@ -18,17 +26,42 @@ public class FlowState
1826
public bool Suspended { get; private set; }
1927
public Task SuspendedTask { get; }
2028

29+
private FlowStatus _status = FlowStatus.Running;
30+
public FlowStatus Status
31+
{
32+
get
33+
{
34+
lock (_lock)
35+
return _status;
36+
}
37+
set
38+
{
39+
lock (_lock)
40+
if (_status != FlowStatus.Completed)
41+
_status = value;
42+
}
43+
}
44+
2145
public FlowState(
2246
StoredId id,
2347
int subflows,
2448
int waitingSubflows,
25-
FlowTimeouts timeouts)
49+
FlowTimeouts timeouts,
50+
Task completed)
2651
{
2752
Id = id;
2853
Subflows = subflows;
2954
WaitingSubflows = waitingSubflows;
3055
Timeouts = timeouts;
3156
SuspendedTask = _suspendedTcs.Task;
57+
58+
_ = completed.ContinueWith(_ => Status = FlowStatus.Completed);
59+
}
60+
61+
public bool Waiting()
62+
{
63+
lock (_lock)
64+
return Subflows == WaitingSubflows;
3265
}
3366

3467
public void SubflowStarted()
@@ -49,25 +82,20 @@ public void SubflowWaiting()
4982
WaitingSubflows++;
5083
}
5184

52-
public bool TryResumeSubflow()
85+
public Task ResumeSubflow()
5386
{
5487
lock (_lock)
5588
if (Suspended)
56-
return false;
89+
return ForeverTask.Instance;
5790
else
5891
WaitingSubflows--;
5992

60-
return true;
93+
return Task.CompletedTask;
6194
}
6295

6396
public void Interrupt()
6497
{
65-
lock (_lock)
66-
if (Suspended)
67-
return;
68-
else
69-
WaitingSubflows = 0;
70-
98+
if (Suspended) return;
7199
QueueManager?.Interrupt();
72100
}
73101

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ public class FlowsManager
1111
private readonly Dictionary<StoredId, FlowState> _dict = new();
1212
private readonly Lock _lock = new();
1313

14-
public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
14+
public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts, Task completed)
1515
{
1616
lock (_lock)
17-
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);
17+
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts, completed);
1818
}
1919

2020
public void RemoveFlow(StoredId id, FlowState flowState)
@@ -38,4 +38,23 @@ public void Interrupt(IReadOnlyList<StoredId> ids)
3838
flowState.Interrupt();
3939
}
4040

41+
/*
42+
public async Task CheckForSuspension()
43+
{
44+
while (true)
45+
{
46+
var waitingFlows = new List<FlowState>();
47+
lock (_dict)
48+
{
49+
waitingFlows = _dict.Values.Where(s => s.)
50+
foreach (var flowState in _dict.Values)
51+
{
52+
flowState.
53+
}
54+
}
55+
56+
await Task.Delay(250);
57+
}
58+
}*/
59+
4160
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
5151
if (parentWorkflow.Effect.Contains(scheduledAlreadyParentId!))
5252
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);
5353

54-
var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState);
54+
var tcs = new TaskCompletionSource<TReturn>();
55+
var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState, tcs.Task);
5556
await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask);
5657

5758
if (!created)
5859
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);
5960

6061
CurrentFlow._workflow.Value = workflow;
6162

62-
var tcs = new TaskCompletionSource<TReturn>();
6363
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
6464
_ = Task.Run(async () =>
6565
{
@@ -151,10 +151,10 @@ public Task<InnerScheduled<TReturn>> BulkSchedule(IEnumerable<BulkWork<TParam>>
151151

152152
public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
153153
{
154-
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId);
154+
var tcs = new TaskCompletionSource<TReturn>();
155+
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, tcs.Task);
155156
var flowId = new FlowId(_flowType, humanInstanceId);
156157

157-
var tcs = new TaskCompletionSource<TReturn>();
158158
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
159159
_ = Task.Run(async () =>
160160
{
@@ -205,10 +205,10 @@ public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
205205

206206
internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Action onCompletion)
207207
{
208-
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf);
208+
var tcs = new TaskCompletionSource<TReturn>();
209+
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf, tcs.Task);
209210
var flowId = new FlowId(_flowType, humanInstanceId);
210211

211-
var tcs = new TaskCompletionSource<TReturn>();
212212
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
213213
_ = Task.Run(async () =>
214214
{
@@ -237,7 +237,7 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act
237237
});
238238
}
239239

240-
private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState)
240+
private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState, Task completed)
241241
{
242242
var disposables = new List<IDisposable>(capacity: 3);
243243
var success = false;
@@ -270,7 +270,7 @@ await _invocationHelper.PersistFunctionInStore(
270270
);
271271

272272
var flowTimeouts = new FlowTimeouts();
273-
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
273+
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed);
274274

275275
var effect = _invocationHelper.CreateEffect(
276276
storedId,
@@ -307,16 +307,16 @@ await _invocationHelper.PersistFunctionInStore(
307307
}
308308
private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowState FlowState, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null);
309309

310-
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId)
310+
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, Task completed)
311311
{
312312
var restartedFunction = await _invocationHelper.RestartFunction(storedId);
313313
if (restartedFunction == null)
314314
throw UnexpectedStateException.ConcurrentModification(storedId);
315315

316-
return await PrepareForReInvocation(storedId, restartedFunction);
316+
return await PrepareForReInvocation(storedId, restartedFunction, completed);
317317
}
318318

319-
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction)
319+
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction, Task completed)
320320
{
321321
var disposables = new List<IDisposable>(capacity: 3);
322322
try
@@ -328,7 +328,7 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
328328
disposables.Add(isWorkflowRunningDisposable);
329329

330330
var flowTimeouts = new FlowTimeouts();
331-
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
331+
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed);
332332

333333
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, flowState);
334334

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,7 @@ internal void Interrupt()
152152
}
153153
finally
154154
{
155-
if (!_flowState.TryResumeSubflow())
156-
await ForeverTask.Instance;
155+
await _flowState.ResumeSubflow();
157156

158157
await delayCts.CancelAsync();
159158
delayCts.Dispose();

0 commit comments

Comments
 (0)