Skip to content

Commit 7cb6833

Browse files
authored
Interrupt-driven message fetch; merge FetchedMessages into QueueManager (#150)
1 parent 2053d22 commit 7cb6833

9 files changed

Lines changed: 253 additions & 342 deletions

File tree

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,4 +385,7 @@ FodyWeavers.xsd
385385

386386
# JetBrains Rider
387387
.idea/
388-
*.sln.iml
388+
*.sln.iml
389+
390+
# macOS
391+
.DS_Store

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ await registration.SendMessages(
256256
var controlPanel1 = await registration.ControlPanel("Instance#1").ShouldNotBeNullAsync();
257257
var controlPanel2 = await registration.ControlPanel("Instance#2").ShouldNotBeNullAsync();
258258

259-
await controlPanel1.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(2));
260-
await controlPanel2.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(2));
259+
await controlPanel1.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(5));
260+
await controlPanel2.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(5));
261261

262262
unhandledExceptionCatcher.ShouldNotHaveExceptions();
263263
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System.Threading;
22
using System.Threading.Tasks;
3-
using Cleipnir.ResilientFunctions.Helpers;
3+
using Cleipnir.ResilientFunctions.Queuing;
44
using Cleipnir.ResilientFunctions.Storage;
55

66
namespace Cleipnir.ResilientFunctions.CoreRuntime;
@@ -14,7 +14,7 @@ public class FlowState
1414
public int Subflows { get; private set; }
1515
public int WaitingSubflows { get; private set; }
1616
public FlowTimeouts Timeouts { get; }
17-
public AsyncSignal InterruptSignal { get; } = new();
17+
internal QueueManager? QueueManager { get; set; }
1818
public bool Suspended { get; private set; }
1919
public Task SuspendedTask { get; }
2020

@@ -67,8 +67,8 @@ public void Interrupt()
6767
return;
6868
else
6969
WaitingSubflows = 0;
70-
71-
InterruptSignal.Fire();
70+
71+
QueueManager?.Interrupt();
7272
}
7373

7474
public bool Suspend()

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class FlowsManager
1414
public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
1515
{
1616
lock (_lock)
17-
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);;
17+
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);
1818
}
1919

2020
public void RemoveFlow(StoredId id, FlowState flowState)
@@ -30,16 +30,12 @@ public IReadOnlyList<StoredId> FilterOwned(IEnumerable<StoredId> ids)
3030
return ids.Where(_dict.ContainsKey).ToList();
3131
}
3232

33-
public Task Interrupt(IReadOnlyList<StoredId> ids)
33+
public void Interrupt(IReadOnlyList<StoredId> ids)
3434
{
35-
/*
36-
* lock (_lock)
37-
foreach (var id in ids)
38-
if (_dict.TryGetValue(id, out var flowState))
39-
flowState.Interrupt();
40-
41-
*/
42-
return Task.CompletedTask;
35+
lock (_lock)
36+
foreach (var id in ids)
37+
if (_dict.TryGetValue(id, out var flowState))
38+
flowState.Interrupt();
4339
}
4440

4541
public void StartThread(StoredId id)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/InterruptedWatchdog.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ public async Task Start()
4949
var interrupted = await _functionStore.GetInterruptedFunctions();
5050
var owned = _flowsManager.FilterOwned(interrupted);
5151
if (owned.Count > 0)
52-
await _flowsManager.Interrupt(owned);
52+
{
53+
await _functionStore.ResetInterrupted(owned);
54+
_flowsManager.Interrupt(owned);
55+
}
5356

5457
var timeElapsed = _utcNow() - now;
5558
var delay = (_checkFrequency - timeElapsed).RoundUpToZero();

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null
7676
if (_settings.EnableWatchdogs)
7777
{
7878
_replicaWatchdog.Initialize().GetAwaiter().GetResult();
79-
_ = _replicaWatchdog.Start();
79+
_ = _replicaWatchdog.Start();
8080
_ = Task.Run(_interruptedWatchdog.Start);
8181
}
8282
}

Core/Cleipnir.ResilientFunctions/Queuing/FetchedMessages.cs

Lines changed: 0 additions & 277 deletions
This file was deleted.

Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Task<Envelope> PullEnvelope<T>(Workflow workflow, EffectId parentId, Func
5555

5656
if (!effect.Contains(messageId))
5757
{
58-
var result = await queueManager.Subscribe(
58+
return await queueManager.Subscribe(
5959
envelope => filter?.Invoke(envelope) ?? true,
6060
timeout,
6161
timeoutId,
@@ -74,8 +74,6 @@ public Task<Envelope> PullEnvelope<T>(Workflow workflow, EffectId parentId, Func
7474
EffectResult.Create(senderId, msg.Sender),
7575
]
7676
);
77-
78-
return result;
7977
}
8078

8179
if (!effect.TryGet<byte[]>(messageTypeId, out var typeNameBytes))

0 commit comments

Comments
 (0)