Skip to content

Commit 151de33

Browse files
committed
Simplify GetMessagesForReplica to List<StoredMessages> and add ignorePositions
Return a List<StoredMessages> (StoredId + its messages) instead of a Dictionary<StoredId, List<StoredMessage>>, and accept an ignorePositions parameter so already-pushed messages are not re-delivered. MessageWatchdog (version 1) keeps an ever-growing set of pushed positions and passes it each tick; this will be refined once the QueueManager reports the positions it has persisted into its effects. Stores filter the ignore-set via a stable, parameterized query to avoid plan-cache churn: Postgres '!= ALL($2)', SQL Server STRING_SPLIT, MariaDB FIND_IN_SET. Adds a cross-store test covering empty and non-empty ignore sets.
1 parent 5698d89 commit 151de33

16 files changed

Lines changed: 116 additions & 36 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,8 @@ public override Task CrashedReplicaMessagesAreFetched()
125125
[TestMethod]
126126
public override Task MessageReplicaCanBeReassigned()
127127
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
128+
129+
[TestMethod]
130+
public override Task GetMessagesForReplicaExcludesIgnoredPositions()
131+
=> GetMessagesForReplicaExcludesIgnoredPositions(FunctionStoreFactory.Create());
128132
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,4 +1269,46 @@ protected async Task MessageReplicaCanBeReassigned(Task<IFunctionStore> function
12691269
afterFlow2.Single(m => m.Position == bPosition).Replica.ShouldBe(newReplica);
12701270
afterFlow2.Single(m => m.Position == cPosition).Replica.ShouldBe(otherReplica);
12711271
}
1272+
1273+
public abstract Task GetMessagesForReplicaExcludesIgnoredPositions();
1274+
protected async Task GetMessagesForReplicaExcludesIgnoredPositions(Task<IFunctionStore> functionStoreTask)
1275+
{
1276+
var functionStore = await functionStoreTask;
1277+
var messageStore = functionStore.MessageStore;
1278+
var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes();
1279+
1280+
var replica = ReplicaId.NewId();
1281+
var otherReplica = ReplicaId.NewId();
1282+
1283+
var flow1 = TestStoredId.Create();
1284+
var flow2 = TestStoredId.Create();
1285+
1286+
// idle flows -> the stored replica falls back to the publishing replica we provide
1287+
await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: replica));
1288+
await messageStore.AppendMessage(flow1, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: replica));
1289+
await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: replica));
1290+
// owned by a different replica -> must never be returned
1291+
await messageStore.AppendMessage(flow2, new StoredMessage("d".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: otherReplica));
1292+
1293+
// no positions ignored -> all of the replica's messages, grouped by flow and ordered by position
1294+
var all = await messageStore.GetMessagesForReplica(replica, ignorePositions: []);
1295+
all.Count.ShouldBe(2);
1296+
var flow1Group = all.Single(g => g.StoredId == flow1);
1297+
var flow2Group = all.Single(g => g.StoredId == flow2);
1298+
flow1Group.Messages.Select(m => (string) m.DefaultDeserialize()).ShouldBe(["a", "b"]);
1299+
flow2Group.Messages.Select(m => (string) m.DefaultDeserialize()).ShouldBe(["c"]); // "d" belongs to otherReplica
1300+
1301+
var aPosition = flow1Group.Messages.Single(m => (string) m.DefaultDeserialize() == "a").Position;
1302+
var bPosition = flow1Group.Messages.Single(m => (string) m.DefaultDeserialize() == "b").Position;
1303+
1304+
// ignoring "b" leaves only "a" in flow1; flow2 is unaffected
1305+
var ignoredB = await messageStore.GetMessagesForReplica(replica, ignorePositions: [bPosition]);
1306+
ignoredB.Single(g => g.StoredId == flow1).Messages.Select(m => (string) m.DefaultDeserialize()).ShouldBe(["a"]);
1307+
ignoredB.Single(g => g.StoredId == flow2).Messages.Select(m => (string) m.DefaultDeserialize()).ShouldBe(["c"]);
1308+
1309+
// ignoring every position of a flow drops the whole group
1310+
var ignoredFlow1 = await messageStore.GetMessagesForReplica(replica, ignorePositions: [aPosition, bPosition]);
1311+
ignoredFlow1.ShouldAllBe(g => g.StoredId != flow1);
1312+
ignoredFlow1.Single(g => g.StoredId == flow2).Messages.Single().DefaultDeserialize().ShouldBe("c");
1313+
}
12721314
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void Interrupt(IReadOnlyList<StoredId> ids)
4242
flowState.Interrupt();
4343
}
4444

45-
public Task Push(IReadOnlyDictionary<StoredId, List<StoredMessage>> messagesByFlow)
45+
public Task Push(IReadOnlyList<StoredMessages> messagesByFlow)
4646
{
4747
List<Task> tasks = new();
4848
lock (_lock)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/MessageWatchdog.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Threading.Tasks;
35
using Cleipnir.ResilientFunctions.Domain;
46
using Cleipnir.ResilientFunctions.Domain.Exceptions;
@@ -18,6 +20,11 @@ internal class MessageWatchdog
1820
private readonly TimeSpan _delayStartUp;
1921
private readonly UtcNow _utcNow;
2022

23+
// Positions already pushed to this replica's flows. Passed as ignore-set so messages are not re-delivered
24+
// on subsequent ticks. Version 1: ever-growing - to be refined once the QueueManager reports the positions
25+
// it has persisted into its effects.
26+
private readonly HashSet<long> _pushedPositions = new();
27+
2128
public MessageWatchdog(
2229
IMessageStore messageStore,
2330
FlowsManager flowsManager,
@@ -51,9 +58,15 @@ public async Task Start()
5158

5259
// Messages destined for flows currently owned by this replica (replica = COALESCE(owner, publisher)).
5360
// FlowsManager.Push delivers only to live flows; entries for non-live flows are ignored.
54-
var messagesByFlow = await _messageStore.GetMessagesForReplica(_clusterInfo.ReplicaId);
55-
if (messagesByFlow.Count > 0)
56-
await _flowsManager.Push(messagesByFlow);
61+
var messageGroups = await _messageStore.GetMessagesForReplica(_clusterInfo.ReplicaId, _pushedPositions.ToList());
62+
if (messageGroups.Count > 0)
63+
{
64+
foreach (var group in messageGroups)
65+
foreach (var message in group.Messages)
66+
_pushedPositions.Add(message.Position);
67+
68+
await _flowsManager.Push(messageGroups);
69+
}
5770

5871
var timeElapsed = _utcNow() - now;
5972
var delay = (_checkFrequency - timeElapsed).RoundUpToZero();

Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ public interface IMessageStore
2828

2929
/// <summary>
3030
/// Returns the undelivered messages whose replica equals the provided replica, grouped by target flow.
31+
/// Messages at any of the <paramref name="ignorePositions"/> are excluded - the MessageWatchdog passes the
32+
/// positions it has already pushed so they are not re-delivered on subsequent ticks.
3133
/// Used by the MessageWatchdog to push messages to live flows owned by this replica.
3234
/// </summary>
33-
Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId);
35+
Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions);
3436

3537
/// <summary>
3638
/// Returns the (flow, position) identifiers of the undelivered messages owned by a replica that is no

Core/Cleipnir.ResilientFunctions/Messaging/StoredMessage.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Text.Json;
34
using Cleipnir.ResilientFunctions.Domain;
45
using Cleipnir.ResilientFunctions.Helpers;
@@ -12,6 +13,7 @@ public record StoredMessage(byte[] MessageContent, byte[] MessageType, long Posi
1213
}
1314

1415
public record StoredIdAndMessage(StoredId StoredId, StoredMessage StoredMessage);
16+
public record StoredMessages(StoredId StoredId, List<StoredMessage> Messages);
1517
public static class StoredIdAndMessageExtensions
1618
{
1719
public static StoredIdAndMessage ToStoredIdAndMessage(this StoredMessage storedMessage, StoredId storedId)

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -668,23 +668,24 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumer
668668
return dict;
669669
}
670670

671-
public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId)
671+
public Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions)
672672
{
673673
lock (_sync)
674674
{
675-
var dict = new Dictionary<StoredId, List<StoredMessage>>();
675+
var ignore = ignorePositions.ToHashSet();
676+
var result = new List<StoredMessages>();
676677
foreach (var (storedId, messages) in _messages)
677678
{
678679
var list = messages
679680
.OrderBy(kv => kv.Key)
680-
.Where(kv => kv.Value.Replica == replicaId)
681+
.Where(kv => kv.Value.Replica == replicaId && !ignore.Contains(kv.Key))
681682
.Select(kv => kv.Value with { Position = kv.Key })
682683
.ToList();
683684
if (list.Count > 0)
684-
dict[storedId] = list;
685+
result.Add(new StoredMessages(storedId, list));
685686
}
686687

687-
return dict.ToTask();
688+
return result.ToTask();
688689
}
689690
}
690691

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,8 @@ public override Task CrashedReplicaMessagesAreFetched()
120120
[TestMethod]
121121
public override Task MessageReplicaCanBeReassigned()
122122
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
123+
124+
[TestMethod]
125+
public override Task GetMessagesForReplicaExcludesIgnoredPositions()
126+
=> GetMessagesForReplicaExcludesIgnoredPositions(FunctionStoreFactory.Create());
123127
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,22 +192,23 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumer
192192
return storedMessages;
193193
}
194194

195-
public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId)
195+
public async Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions)
196196
{
197197
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
198198
await using var command = _sqlGenerator
199-
.GetMessagesForReplica(replicaId)
199+
.GetMessagesForReplica(replicaId, ignorePositions)
200200
.ToSqlCommand(conn);
201201

202202
await using var reader = await command.ExecuteReaderAsync();
203203

204204
var messages = await _sqlGenerator.ReadStoredIdsMessages(reader);
205-
var storedMessages = new Dictionary<StoredId, List<StoredMessage>>();
205+
var storedMessages = new List<StoredMessages>();
206206

207207
foreach (var id in messages.Keys)
208-
storedMessages[id] = messages[id]
209-
.Select(m => ConvertToStoredMessage(m.content, m.position, m.replica))
210-
.ToList();
208+
storedMessages.Add(new StoredMessages(
209+
id,
210+
messages[id].Select(m => ConvertToStoredMessage(m.content, m.position, m.replica)).ToList()
211+
));
211212

212213
return storedMessages;
213214
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -660,15 +660,15 @@ public StoreCommand GetMessages(StoredId storedId, IReadOnlyList<long> skipPosit
660660
return messages;
661661
}
662662

663-
public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
663+
public StoreCommand GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions)
664664
{
665665
var sql = @$"
666666
SELECT id, position, content, replica
667667
FROM {tablePrefix}_messages
668-
WHERE replica = ?
668+
WHERE replica = ? AND FIND_IN_SET(position, ?) = 0
669669
ORDER BY position;";
670670

671-
return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]);
671+
return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N"), string.Join(",", ignorePositions) ]);
672672
}
673673

674674
public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)

0 commit comments

Comments
 (0)