Skip to content

Commit f88c50a

Browse files
committed
Add InterruptedWatchdog scaffolding
The watchdog polls GetInterruptedFunctions and calls FlowsManager.Interrupt with ids owned by this replica (i.e. present in FlowsManager's in-memory dict). FlowState.Interrupt and FlowsManager.Interrupt are stubbed for now: the existing implementations raced with SuspendFunction's Interrupted-flag check (clearing the flag mid-flight caused suspended-vs-postponed-status to be decided wrong, leaving flows stuck). Adds FilterOwned helper.
1 parent 39b52d8 commit f88c50a

4 files changed

Lines changed: 95 additions & 9 deletions

File tree

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ public bool TryResumeSubflow()
6262

6363
public void Interrupt()
6464
{
65-
lock (_lock)
66-
if (Suspended)
67-
return;
68-
else
69-
WaitingSubflows = 0;
70-
71-
InterruptSignal.Fire();
7265
}
7366

7467
public bool Suspend()

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Generic;
2+
using System.Linq;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using Cleipnir.ResilientFunctions.Storage;
@@ -23,14 +24,20 @@ public void RemoveFlow(StoredId id, FlowState flowState)
2324
_dict.Remove(id);
2425
}
2526

26-
public async Task Interrupt(IReadOnlyList<StoredId> ids)
27+
public IReadOnlyList<StoredId> FilterOwned(IEnumerable<StoredId> ids)
2728
{
28-
await functionStore.ResetInterrupted(ids);
29+
lock (_lock)
30+
return ids.Where(_dict.ContainsKey).ToList();
31+
}
2932

33+
public Task Interrupt(IReadOnlyList<StoredId> ids)
34+
{
3035
lock (_lock)
3136
foreach (var id in ids)
3237
if (_dict.TryGetValue(id, out var flowState))
3338
flowState.Interrupt();
39+
40+
return Task.CompletedTask;
3441
}
3542

3643
public void StartThread(StoredId id)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Cleipnir.ResilientFunctions.Domain;
4+
using Cleipnir.ResilientFunctions.Domain.Exceptions;
5+
using Cleipnir.ResilientFunctions.Helpers;
6+
using Cleipnir.ResilientFunctions.Storage;
7+
8+
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
9+
10+
internal class InterruptedWatchdog
11+
{
12+
private readonly IFunctionStore _functionStore;
13+
private readonly FlowsManager _flowsManager;
14+
private readonly ShutdownCoordinator _shutdownCoordinator;
15+
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
16+
private readonly TimeSpan _checkFrequency;
17+
private readonly TimeSpan _delayStartUp;
18+
private readonly UtcNow _utcNow;
19+
20+
public InterruptedWatchdog(
21+
IFunctionStore functionStore,
22+
FlowsManager flowsManager,
23+
ShutdownCoordinator shutdownCoordinator,
24+
UnhandledExceptionHandler unhandledExceptionHandler,
25+
TimeSpan checkFrequency,
26+
TimeSpan delayStartUp,
27+
UtcNow utcNow)
28+
{
29+
_functionStore = functionStore;
30+
_flowsManager = flowsManager;
31+
_shutdownCoordinator = shutdownCoordinator;
32+
_unhandledExceptionHandler = unhandledExceptionHandler;
33+
_checkFrequency = checkFrequency;
34+
_delayStartUp = delayStartUp;
35+
_utcNow = utcNow;
36+
}
37+
38+
public async Task Start()
39+
{
40+
await Task.Delay(_delayStartUp);
41+
42+
Start:
43+
try
44+
{
45+
while (!_shutdownCoordinator.ShutdownInitiated)
46+
{
47+
var now = _utcNow();
48+
49+
var interrupted = await _functionStore.GetInterruptedFunctions();
50+
var owned = _flowsManager.FilterOwned(interrupted);
51+
if (owned.Count > 0)
52+
await _flowsManager.Interrupt(owned);
53+
54+
var timeElapsed = _utcNow() - now;
55+
var delay = (_checkFrequency - timeElapsed).RoundUpToZero();
56+
57+
await Task.Delay(delay);
58+
}
59+
}
60+
catch (Exception thrownException)
61+
{
62+
_unhandledExceptionHandler.Invoke(
63+
new FrameworkException(
64+
$"{nameof(InterruptedWatchdog)} execution failed - retrying in 5 seconds",
65+
innerException: thrownException
66+
)
67+
);
68+
69+
await Task.Delay(5_000);
70+
goto Start;
71+
}
72+
}
73+
}

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class FunctionsRegistry : IDisposable
3131
private volatile bool _disposed;
3232
private readonly Lock _sync = new();
3333
private readonly ReplicaWatchdog _replicaWatchdog;
34+
private readonly InterruptedWatchdog _interruptedWatchdog;
3435
private readonly FlowsManager _flowsManager;
3536

3637
public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null)
@@ -61,10 +62,22 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null
6162
utcNow,
6263
_settings.UnhandledExceptionHandler
6364
);
65+
66+
_interruptedWatchdog = new InterruptedWatchdog(
67+
_functionStore,
68+
_flowsManager,
69+
_shutdownCoordinator,
70+
_settings.UnhandledExceptionHandler,
71+
_settings.WatchdogCheckFrequency,
72+
_settings.DelayStartup,
73+
utcNow
74+
);
75+
6476
if (_settings.EnableWatchdogs)
6577
{
6678
_replicaWatchdog.Initialize().GetAwaiter().GetResult();
6779
_ = _replicaWatchdog.Start();
80+
_ = Task.Run(_interruptedWatchdog.Start);
6881
}
6982
}
7083

0 commit comments

Comments
 (0)