Skip to content

Commit 2053d22

Browse files
authored
Add InterruptedWatchdog scaffolding (#148)
1 parent 39b52d8 commit 2053d22

12 files changed

Lines changed: 160 additions & 118 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,12 @@ public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAre
251251
=> GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAreInterrupted(FunctionStoreFactory.Create());
252252

253253
[TestMethod]
254-
public override Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotExist()
255-
=> GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotExist(FunctionStoreFactory.Create());
254+
public override Task GetInterruptedFunctionsReturnsIdOnceWhenInterruptedMultipleTimes()
255+
=> GetInterruptedFunctionsReturnsIdOnceWhenInterruptedMultipleTimes(FunctionStoreFactory.Create());
256256

257257
[TestMethod]
258-
public override Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions()
259-
=> GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions(FunctionStoreFactory.Create());
258+
public override Task GetInterruptedFunctionsIncludesPostponedInterruptedFunction()
259+
=> GetInterruptedFunctionsIncludesPostponedInterruptedFunction(FunctionStoreFactory.Create());
260260

261261
[TestMethod]
262262
public override Task GetResultsReturnsResultsForExistingFunctions()

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
560560
{
561561

562562
var flowTimeouts = new FlowTimeouts();
563-
var flowsManager = new FlowsManager(functionStore);
563+
var flowsManager = new FlowsManager();
564564
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
565565
var queueManager = new QueueManager(
566566
workflow.FlowId,
@@ -623,7 +623,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
623623
{
624624
storedId = workflow.StoredId;
625625
var minimumTimeout = new FlowTimeouts();
626-
var flowsManager = new FlowsManager(functionStore);
626+
var flowsManager = new FlowsManager();
627627
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
628628
var queueManager = new QueueManager(
629629
workflow.FlowId,
@@ -684,7 +684,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
684684
{
685685

686686
var flowTimeouts = new FlowTimeouts();
687-
var flowsManager = new FlowsManager(functionStore);
687+
var flowsManager = new FlowsManager();
688688
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
689689
var queueManager = new QueueManager(
690690
workflow.FlowId,

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 24 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,30 +2107,15 @@ protected async Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAre
21072107
interruptedFunctions.Count.ShouldBe(0);
21082108
}
21092109

2110-
public abstract Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotExist();
2111-
protected async Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotExist(Task<IFunctionStore> storeTask)
2110+
public abstract Task GetInterruptedFunctionsReturnsIdOnceWhenInterruptedMultipleTimes();
2111+
protected async Task GetInterruptedFunctionsReturnsIdOnceWhenInterruptedMultipleTimes(Task<IFunctionStore> storeTask)
21122112
{
21132113
var store = await storeTask;
2114-
var functionId1 = TestStoredId.Create();
2115-
var functionId2 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2116-
var nonExistentId1 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2117-
var nonExistentId2 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2114+
var functionId = TestStoredId.Create();
21182115

21192116
var session = await store.CreateFunction(
2120-
functionId1,
2121-
"humanInstanceId1",
2122-
param: Test.SimpleStoredParameter,
2123-
leaseExpiration: DateTime.UtcNow.Ticks,
2124-
postponeUntil: null,
2125-
timestamp: DateTime.UtcNow.Ticks,
2126-
parent: null,
2127-
owner: null
2128-
);
2129-
session.ShouldBeNull();
2130-
2131-
session = await store.CreateFunction(
2132-
functionId2,
2133-
"humanInstanceId2",
2117+
functionId,
2118+
"humanInstanceId",
21342119
param: Test.SimpleStoredParameter,
21352120
leaseExpiration: DateTime.UtcNow.Ticks,
21362121
postponeUntil: null,
@@ -2140,31 +2125,25 @@ protected async Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotE
21402125
);
21412126
session.ShouldBeNull();
21422127

2143-
await store.Interrupt(functionId1).ShouldBeTrueAsync();
2144-
await store.Interrupt(functionId2).ShouldBeTrueAsync();
2128+
await store.Interrupt(functionId).ShouldBeTrueAsync();
2129+
await store.Interrupt(functionId).ShouldBeTrueAsync();
2130+
await store.Interrupt(functionId).ShouldBeTrueAsync();
21452131

2146-
// No functions are interrupted despite existing
21472132
var interruptedFunctions = await store.GetInterruptedFunctions();
2148-
2149-
// Should return the 2 interrupted functions
2150-
interruptedFunctions.Count.ShouldBe(2);
2151-
interruptedFunctions.Any(id => id == functionId1).ShouldBeTrue();
2152-
interruptedFunctions.Any(id => id == functionId2).ShouldBeTrue();
2133+
interruptedFunctions.Count.ShouldBe(1);
2134+
interruptedFunctions.Single().ShouldBe(functionId);
21532135
}
21542136

2155-
public abstract Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions();
2156-
protected async Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions(Task<IFunctionStore> storeTask)
2137+
public abstract Task GetInterruptedFunctionsIncludesPostponedInterruptedFunction();
2138+
protected async Task GetInterruptedFunctionsIncludesPostponedInterruptedFunction(Task<IFunctionStore> storeTask)
21572139
{
21582140
var store = await storeTask;
2159-
var functionId1 = TestStoredId.Create();
2160-
var functionId2 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2161-
var functionId3 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2162-
var functionId4 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2141+
var executingId = TestStoredId.Create();
2142+
var postponedId = StoredId.Create(executingId.Type, Guid.NewGuid().ToString());
21632143

2164-
// Create 4 functions
21652144
var session = await store.CreateFunction(
2166-
functionId1,
2167-
"humanInstanceId1",
2145+
executingId,
2146+
"humanInstanceId-exec",
21682147
param: Test.SimpleStoredParameter,
21692148
leaseExpiration: DateTime.UtcNow.Ticks,
21702149
postponeUntil: null,
@@ -2175,56 +2154,24 @@ protected async Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFuncti
21752154
session.ShouldBeNull();
21762155

21772156
session = await store.CreateFunction(
2178-
functionId2,
2179-
"humanInstanceId2",
2180-
param: Test.SimpleStoredParameter,
2181-
leaseExpiration: DateTime.UtcNow.Ticks,
2182-
postponeUntil: null,
2183-
timestamp: DateTime.UtcNow.Ticks,
2184-
parent: null,
2185-
owner: null
2186-
);
2187-
session.ShouldBeNull();
2188-
2189-
session = await store.CreateFunction(
2190-
functionId3,
2191-
"humanInstanceId3",
2192-
param: Test.SimpleStoredParameter,
2193-
leaseExpiration: DateTime.UtcNow.Ticks,
2194-
postponeUntil: null,
2195-
timestamp: DateTime.UtcNow.Ticks,
2196-
parent: null,
2197-
owner: null
2198-
);
2199-
session.ShouldBeNull();
2200-
2201-
session = await store.CreateFunction(
2202-
functionId4,
2203-
"humanInstanceId4",
2157+
postponedId,
2158+
"humanInstanceId-postponed",
22042159
param: Test.SimpleStoredParameter,
22052160
leaseExpiration: DateTime.UtcNow.Ticks,
2206-
postponeUntil: null,
2161+
postponeUntil: DateTime.UtcNow.Ticks,
22072162
timestamp: DateTime.UtcNow.Ticks,
22082163
parent: null,
22092164
owner: null
22102165
);
22112166
session.ShouldBeNull();
22122167

2213-
// Interrupt all 4 functions
2214-
await store.Interrupt(functionId1).ShouldBeTrueAsync();
2215-
await store.Interrupt(functionId2).ShouldBeTrueAsync();
2216-
await store.Interrupt(functionId3).ShouldBeTrueAsync();
2217-
await store.Interrupt(functionId4).ShouldBeTrueAsync();
2168+
await store.Interrupt(executingId).ShouldBeTrueAsync();
2169+
await store.Interrupt(postponedId).ShouldBeTrueAsync();
22182170

2219-
// Get all interrupted functions
22202171
var interruptedFunctions = await store.GetInterruptedFunctions();
2221-
2222-
// Should return all 4 interrupted functions
2223-
interruptedFunctions.Count.ShouldBe(4);
2224-
interruptedFunctions.Any(id => id == functionId1).ShouldBeTrue();
2225-
interruptedFunctions.Any(id => id == functionId2).ShouldBeTrue();
2226-
interruptedFunctions.Any(id => id == functionId3).ShouldBeTrue();
2227-
interruptedFunctions.Any(id => id == functionId4).ShouldBeTrue();
2172+
interruptedFunctions.Count.ShouldBe(2);
2173+
interruptedFunctions.Any(id => id == executingId).ShouldBeTrue();
2174+
interruptedFunctions.Any(id => id == postponedId).ShouldBeTrue();
22282175
}
22292176

22302177
public abstract Task GetResultsReturnsResultsForExistingFunctions();

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
4343
clearChildren: true
4444
);
4545

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
46+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
4747
var output = effect.ExecutionTree();
4848

4949
var expected = "└─ ✓ [1]\n";
@@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
7575
clearChildren: true
7676
);
7777

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
78+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
7979
var output = effect.ExecutionTree();
8080

8181
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +111,7 @@ public void PrintFailedEffect()
111111
clearChildren: true
112112
);
113113

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
114+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
115115
var output = effect.ExecutionTree();
116116

117117
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +143,7 @@ public void PrintStartedEffect()
143143
clearChildren: true
144144
);
145145

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
146+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
147147
var output = effect.ExecutionTree();
148148

149149
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
189189
clearChildren: true
190190
);
191191

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
192+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
193193
var output = effect.ExecutionTree();
194194

195195
var expected =
@@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
245245
clearChildren: true
246246
);
247247

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
248+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
249249
var output = effect.ExecutionTree();
250250

251251
var expected =
@@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
295295
clearChildren: true
296296
);
297297

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
298+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
299299
var output = effect.ExecutionTree();
300300

301301
var expected =
@@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
330330
clearChildren: true
331331
);
332332

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
333+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
334334
var output = effect.ExecutionTree();
335335

336336
var expected =
@@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365365
clearChildren: false
366366
);
367367

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
368+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
369369
var output = effect.ExecutionTree();
370370

371371
var expected =
@@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398398
clearChildren: false
399399
);
400400

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
401+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
402402
var output = effect.ExecutionTree();
403403

404404
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using System.Collections.Generic;
2+
using System.Linq;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using Cleipnir.ResilientFunctions.Storage;
56

67
namespace Cleipnir.ResilientFunctions.CoreRuntime;
78

8-
public class FlowsManager(IFunctionStore functionStore)
9+
public class FlowsManager
910
{
1011
private readonly Dictionary<StoredId, FlowState> _dict = new();
1112
private readonly Lock _lock = new();
@@ -23,14 +24,22 @@ public void RemoveFlow(StoredId id, FlowState flowState)
2324
_dict.Remove(id);
2425
}
2526

26-
public async Task Interrupt(IReadOnlyList<StoredId> ids)
27+
public IReadOnlyList<StoredId> FilterOwned(IEnumerable<StoredId> ids)
2728
{
28-
await functionStore.ResetInterrupted(ids);
29-
3029
lock (_lock)
31-
foreach (var id in ids)
32-
if (_dict.TryGetValue(id, out var flowState))
33-
flowState.Interrupt();
30+
return ids.Where(_dict.ContainsKey).ToList();
31+
}
32+
33+
public Task Interrupt(IReadOnlyList<StoredId> ids)
34+
{
35+
/*
36+
* lock (_lock)
37+
foreach (var id in ids)
38+
if (_dict.TryGetValue(id, out var flowState))
39+
flowState.Interrupt();
40+
41+
*/
42+
return Task.CompletedTask;
3443
}
3544

3645
public void StartThread(StoredId id)

0 commit comments

Comments
 (0)