Skip to content

Commit d8fefad

Browse files
authored
Simplify FlowTimeouts and finish FetchedMessages subscription refactor (#147)
1 parent e2448e0 commit d8fefad

19 files changed

Lines changed: 218 additions & 521 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ protected async Task QueueClientPullsFiveMessagesAndTimesOutOnSixth(Task<IFuncti
196196

197197
for (var i = 0; i < 6; i++)
198198
{
199-
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(250));
199+
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(1_000));
200200
messages.Add(message ?? "NULL");
201201
}
202202

@@ -560,7 +560,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
560560
{
561561

562562
var flowTimeouts = new FlowTimeouts();
563-
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
563+
var flowsManager = new FlowsManager(functionStore);
564564
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
565565
var queueManager = new QueueManager(
566566
workflow.FlowId,
@@ -623,7 +623,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
623623
{
624624
storedId = workflow.StoredId;
625625
var minimumTimeout = new FlowTimeouts();
626-
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
626+
var flowsManager = new FlowsManager(functionStore);
627627
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
628628
var queueManager = new QueueManager(
629629
workflow.FlowId,
@@ -684,7 +684,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
684684
{
685685

686686
var flowTimeouts = new FlowTimeouts();
687-
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
687+
var flowsManager = new FlowsManager(functionStore);
688688
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
689689
var queueManager = new QueueManager(
690690
workflow.FlowId,

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 41 additions & 293 deletions
Large diffs are not rendered by default.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
using Microsoft.VisualStudio.TestTools.UnitTesting;
1+
using Microsoft.VisualStudio.TestTools.UnitTesting;
22

3-
[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)]
3+
[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)]

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");
@@ -1027,7 +1027,7 @@ await elms.CaptureEach(
10271027
storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue();
10281028
storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue();
10291029
storedEffects.Single(e => e.Alias == i.ToString()).EffectId.ShouldBe(new EffectId([1,i,0]));
1030-
storedEffects.Count.ShouldBe(4);
1030+
storedEffects.Count(e => e.Alias != null).ShouldBe(3);
10311031

10321032
await messageWriter.AppendMessage(i.ToString());
10331033
}

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
4343
clearChildren: true
4444
);
4545

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
46+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
4747
var output = effect.ExecutionTree();
4848

4949
var expected = "└─ ✓ [1]\n";
@@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
7575
clearChildren: true
7676
);
7777

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
78+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
7979
var output = effect.ExecutionTree();
8080

8181
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +111,7 @@ public void PrintFailedEffect()
111111
clearChildren: true
112112
);
113113

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
114+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
115115
var output = effect.ExecutionTree();
116116

117117
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +143,7 @@ public void PrintStartedEffect()
143143
clearChildren: true
144144
);
145145

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
146+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
147147
var output = effect.ExecutionTree();
148148

149149
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
189189
clearChildren: true
190190
);
191191

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
192+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
193193
var output = effect.ExecutionTree();
194194

195195
var expected =
@@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
245245
clearChildren: true
246246
);
247247

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
248+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
249249
var output = effect.ExecutionTree();
250250

251251
var expected =
@@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
295295
clearChildren: true
296296
);
297297

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
298+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
299299
var output = effect.ExecutionTree();
300300

301301
var expected =
@@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
330330
clearChildren: true
331331
);
332332

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
333+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
334334
var output = effect.ExecutionTree();
335335

336336
var expected =
@@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365365
clearChildren: false
366366
);
367367

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
368+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
369369
var output = effect.ExecutionTree();
370370

371371
var expected =
@@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398398
clearChildren: false
399399
);
400400

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
401+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
402402
var output = effect.ExecutionTree();
403403

404404
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void SubflowWaiting()
4949
WaitingSubflows++;
5050
}
5151

52-
public bool ResumeSubflow()
52+
public bool TryResumeSubflow()
5353
{
5454
lock (_lock)
5555
if (Suspended)

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,33 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading;
5-
using System.Threading.Tasks;
65
using Cleipnir.ResilientFunctions.Domain;
7-
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
86

97
namespace Cleipnir.ResilientFunctions.CoreRuntime;
108

119
public class FlowTimeouts
1210
{
1311
private readonly Lock _lock = new();
14-
private Dictionary<EffectId, Tuple<DateTime, TaskCompletionSource>> Timeouts { get; } = new();
12+
private Dictionary<EffectId, DateTime> Timeouts { get; } = new();
1513

1614
public DateTime? MinimumTimeout
1715
{
1816
get
1917
{
2018
lock (_lock)
21-
return GetMinimumTimeout();
19+
return Timeouts.Values.Count != 0 ? Timeouts.Values.Min() : (DateTime?)null;
2220
}
2321
}
2422

25-
private DateTime? GetMinimumTimeout()
26-
=> Timeouts.Values.Count != 0 ? Timeouts.Values.Min(t => t.Item1) : (DateTime?)null;
27-
28-
public async Task AddTimeout(EffectId effectId, DateTime timeout, TimeSpan? maxWait = null)
23+
public void AddTimeout(EffectId effectId, DateTime timeout)
2924
{
30-
TaskCompletionSource tcs;
3125
lock (_lock)
32-
{
33-
tcs = new TaskCompletionSource();
34-
Timeouts[effectId] = Tuple.Create(timeout, tcs);
35-
}
36-
37-
if (maxWait == null || timeout <= DateTime.UtcNow)
38-
{
39-
await tcs.Task;
40-
return;
41-
}
42-
43-
var completed = await Task.WhenAny(tcs.Task, Task.Delay(maxWait.Value));
44-
if (completed != tcs.Task)
45-
throw new SuspendInvocationException();
26+
Timeouts[effectId] = timeout;
4627
}
4728

4829
public void RemoveTimeout(EffectId effectId)
4930
{
5031
lock (_lock)
5132
Timeouts.Remove(effectId);
5233
}
53-
54-
public bool HasExpiredTimeouts(DateTime now)
55-
{
56-
lock (_lock)
57-
return GetMinimumTimeout() <= now;
58-
}
59-
60-
public void SignalExpiredTimeouts(DateTime now)
61-
{
62-
lock (_lock)
63-
foreach (var (effectId, (timeout, tcs)) in Timeouts.ToList())
64-
if (timeout <= now)
65-
{
66-
Timeouts.Remove(effectId);
67-
Task.Run(() => tcs.TrySetResult());
68-
}
69-
}
70-
}
34+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,21 @@
1-
using System;
21
using System.Collections.Generic;
32
using System.Threading;
43
using System.Threading.Tasks;
54
using Cleipnir.ResilientFunctions.Storage;
65

76
namespace Cleipnir.ResilientFunctions.CoreRuntime;
87

9-
public class FlowsManager : IDisposable
8+
public class FlowsManager
109
{
1110
private readonly Dictionary<StoredId, FlowState> _dict = new();
1211
private readonly Lock _lock = new();
1312
private readonly IFunctionStore _functionStore;
14-
private readonly UtcNow _utcNow;
15-
private volatile bool _disposed;
1613

17-
public FlowsManager(IFunctionStore functionStore, UtcNow utcNow)
14+
public FlowsManager(IFunctionStore functionStore)
1815
{
1916
_functionStore = functionStore;
20-
_utcNow = utcNow;
21-
_ = Task.Run(TimeoutCheckLoop);
2217
}
2318

24-
private async Task TimeoutCheckLoop()
25-
{
26-
while (!_disposed)
27-
{
28-
var expiredStates = new List<FlowState>();
29-
var now = _utcNow();
30-
lock (_lock)
31-
foreach (var (_, status) in _dict)
32-
if (status.Timeouts.HasExpiredTimeouts(now))
33-
expiredStates.Add(status);
34-
35-
foreach (var status in expiredStates)
36-
status.Timeouts.SignalExpiredTimeouts(now);
37-
38-
await Task.Delay(10);
39-
}
40-
}
41-
42-
public void Dispose() => _disposed = true;
43-
4419
public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
4520
{
4621
lock (_lock)
@@ -77,4 +52,4 @@ public void CompleteThread(StoredId id)
7752
if (_dict.TryGetValue(id, out var flowState))
7853
flowState.SubflowCompleted();
7954
}
80-
}
55+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Threading.Tasks;
33
using Cleipnir.ResilientFunctions.Domain;
4+
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
45
using Cleipnir.ResilientFunctions.Messaging;
56
using Cleipnir.ResilientFunctions.Helpers;
67
using Cleipnir.ResilientFunctions.Queuing;
@@ -14,12 +15,12 @@ public class Workflow
1415
internal StoredId StoredId { get; }
1516
public Effect Effect { get; }
1617

17-
private QueueManager _queueManager;
18+
private readonly QueueManager _queueManager;
1819
private readonly UtcNow _utcNow;
1920
private MessageWriter MessageWriter { get; }
2021

2122

22-
public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter)
23+
internal Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter)
2324
{
2425
FlowId = flowId;
2526
StoredId = storedId;
@@ -32,23 +33,25 @@ public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager qu
3233
public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias);
3334
public Task Delay(DateTime until, bool suspend = true, string? alias = null)
3435
{
35-
var effectId = Effect.TakeNextImplicitId();
36-
var timeoutId = EffectId.CreateWithCurrentContext(effectId);
36+
var timeoutId = Effect.CreateNextImplicitId();
3737

3838
async Task Inner()
3939
{
40-
var expiry = await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false);
41-
42-
if (expiry == -1)
43-
{
40+
var expiry = (await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false))
41+
.ToDateTime();
42+
43+
var now = _utcNow();
44+
if (now > expiry)
4445
return;
45-
}
46-
47-
var maxWait = suspend ? TimeSpan.Zero : (TimeSpan?)null;
48-
await Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime(), maxWait);
4946

50-
await Effect.Upsert(timeoutId, -1L, alias, flush: false);
51-
Effect.FlowTimeouts.RemoveTimeout(timeoutId);
47+
Effect.FlowTimeouts.AddTimeout(timeoutId, expiry);
48+
if (suspend)
49+
throw new SuspendInvocationException();
50+
51+
//do in-memory wait
52+
var delay = expiry - now;
53+
await Task.Delay(delay);
54+
Effect.FlowTimeouts.RemoveTimeout(timeoutId);
5255
}
5356

5457
return Inner();

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,24 @@ public enum ResiliencyLevel
1616
AtLeastOnceDelayFlush
1717
}
1818

19-
public class Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
19+
public class Effect
2020
{
21+
private readonly EffectResults effectResults;
22+
private readonly UtcNow utcNow;
23+
private readonly FlowTimeouts flowTimeouts;
24+
private readonly FlowsManager flowsManager;
25+
private readonly StoredId storedId;
26+
27+
internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
28+
{
29+
this.effectResults = effectResults;
30+
this.utcNow = utcNow;
31+
this.flowTimeouts = flowTimeouts;
32+
this.flowsManager = flowsManager;
33+
this.storedId = storedId;
34+
}
35+
36+
2137
internal bool Contains(int id) => Contains(CreateEffectId(id));
2238
internal bool Contains(EffectId effectId) => effectResults.Contains(effectId);
2339

0 commit comments

Comments
 (0)