Skip to content

Commit 5afe25e

Browse files
committed
Add GetCrashedReplicaMessages and SetReplica to IMessageStore
GetCrashedReplicaMessages returns the (flow, position) identifiers of undelivered messages owned by a replica that is no longer alive (not in the supplied live-replica set). SetReplica re-assigns the messages at the given positions to a new replica, guarded by an expected-replica check so a concurrent takeover is not clobbered. Implemented across the in-memory, PostgreSQL, MariaDB and SqlServer stores, with shared message-store tests covering crashed-replica fetching and guarded re-assignment.
1 parent 838dd6a commit 5afe25e

13 files changed

Lines changed: 337 additions & 1 deletion

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
117117
[TestMethod]
118118
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
119119
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());
120+
121+
[TestMethod]
122+
public override Task CrashedReplicaMessagesAreFetched()
123+
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());
124+
125+
[TestMethod]
126+
public override Task MessageReplicaCanBeReassigned()
127+
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
120128
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,4 +1203,70 @@ await messageStore.ReplaceMessage(
12031203
var afterReplaceIdle = (await messageStore.GetMessages(idleFlow)).ToList();
12041204
afterReplaceIdle.Single(m => m.Position == idleMessages[1].Position).Replica.ShouldBe(publisher);
12051205
}
1206+
1207+
public abstract Task CrashedReplicaMessagesAreFetched();
1208+
protected async Task CrashedReplicaMessagesAreFetched(Task<IFunctionStore> functionStoreTask)
1209+
{
1210+
var functionStore = await functionStoreTask;
1211+
var messageStore = functionStore.MessageStore;
1212+
var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes();
1213+
1214+
var liveReplica = ReplicaId.NewId();
1215+
var crashedReplica1 = ReplicaId.NewId();
1216+
var crashedReplica2 = ReplicaId.NewId();
1217+
1218+
var flow1 = TestStoredId.Create();
1219+
var flow2 = TestStoredId.Create();
1220+
1221+
// owned by a live replica -> not crashed
1222+
await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: liveReplica));
1223+
// owned by crashed replicas -> crashed
1224+
await messageStore.AppendMessage(flow1, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica1));
1225+
await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica2));
1226+
1227+
var flow1Messages = await messageStore.GetMessages(flow1);
1228+
var flow2Messages = await messageStore.GetMessages(flow2);
1229+
var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position;
1230+
var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position;
1231+
1232+
var crashed = await messageStore.GetCrashedReplicaMessages([liveReplica]);
1233+
1234+
crashed.Count.ShouldBe(2);
1235+
crashed.ShouldContain(t => t.Item1 == flow1 && t.Item2 == bPosition);
1236+
crashed.ShouldContain(t => t.Item1 == flow2 && t.Item2 == cPosition);
1237+
}
1238+
1239+
public abstract Task MessageReplicaCanBeReassigned();
1240+
protected async Task MessageReplicaCanBeReassigned(Task<IFunctionStore> functionStoreTask)
1241+
{
1242+
var functionStore = await functionStoreTask;
1243+
var messageStore = functionStore.MessageStore;
1244+
var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes();
1245+
1246+
var crashedReplica = ReplicaId.NewId();
1247+
var newReplica = ReplicaId.NewId();
1248+
var otherReplica = ReplicaId.NewId();
1249+
1250+
var flow1 = TestStoredId.Create();
1251+
var flow2 = TestStoredId.Create();
1252+
1253+
await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica));
1254+
await messageStore.AppendMessage(flow2, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica));
1255+
// owned by a different replica -> must not be reassigned even though its position is included
1256+
await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: otherReplica));
1257+
1258+
var aPosition = (await messageStore.GetMessages(flow1)).Single().Position;
1259+
var flow2Messages = await messageStore.GetMessages(flow2);
1260+
var bPosition = flow2Messages.Single(m => m.Replica == crashedReplica).Position;
1261+
var cPosition = flow2Messages.Single(m => m.Replica == otherReplica).Position;
1262+
1263+
await messageStore.SetReplica([aPosition, bPosition, cPosition], newReplica, expectedReplica: crashedReplica);
1264+
1265+
var afterFlow1 = await messageStore.GetMessages(flow1);
1266+
var afterFlow2 = await messageStore.GetMessages(flow2);
1267+
1268+
afterFlow1.Single().Replica.ShouldBe(newReplica);
1269+
afterFlow2.Single(m => m.Position == bPosition).Replica.ShouldBe(newReplica);
1270+
afterFlow2.Single(m => m.Position == cPosition).Replica.ShouldBe(otherReplica);
1271+
}
12061272
}

Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Threading.Tasks;
34
using Cleipnir.ResilientFunctions.Domain;
45
using Cleipnir.ResilientFunctions.Storage;
@@ -31,4 +32,17 @@ public interface IMessageStore
3132
/// Used by the MessageWatchdog to push messages to live flows owned by this replica.
3233
/// </summary>
3334
Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId);
35+
36+
/// <summary>
37+
/// Returns the (flow, position) identifiers of the undelivered messages owned by a replica that is no
38+
/// longer alive (its replica is not contained in <paramref name="liveReplicas"/>).
39+
/// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via <see cref="SetReplica"/>.
40+
/// </summary>
41+
Task<List<Tuple<StoredId, long>>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas);
42+
43+
/// <summary>
44+
/// Re-assigns the messages at the provided positions to <paramref name="newReplica"/>,
45+
/// but only those still owned by <paramref name="expectedReplica"/>.
46+
/// </summary>
47+
Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica);
3448
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,5 +688,33 @@ public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(Rep
688688
}
689689
}
690690

691+
public Task<List<Tuple<StoredId, long>>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
692+
{
693+
var live = liveReplicas.ToHashSet();
694+
lock (_sync)
695+
{
696+
var result = new List<Tuple<StoredId, long>>();
697+
foreach (var (storedId, messages) in _messages)
698+
foreach (var (position, message) in messages.OrderBy(kv => kv.Key))
699+
if (!live.Contains(message.Replica))
700+
result.Add(Tuple.Create(storedId, position));
701+
702+
return result.ToTask();
703+
}
704+
}
705+
706+
public Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
707+
{
708+
lock (_sync)
709+
{
710+
foreach (var position in positions)
711+
foreach (var messages in _messages.Values)
712+
if (messages.TryGetValue(position, out var message) && message.Replica == expectedReplica)
713+
messages[position] = message with { Replica = newReplica };
714+
715+
return Task.CompletedTask;
716+
}
717+
}
718+
691719
#endregion
692720
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
112112
[TestMethod]
113113
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
114114
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());
115+
116+
[TestMethod]
117+
public override Task CrashedReplicaMessagesAreFetched()
118+
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());
119+
120+
[TestMethod]
121+
public override Task MessageReplicaCanBeReassigned()
122+
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
115123
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,30 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
212212
return storedMessages;
213213
}
214214

215+
public async Task<List<Tuple<StoredId, long>>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
216+
{
217+
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
218+
await using var command = _sqlGenerator
219+
.GetCrashedReplicaMessages(liveReplicas)
220+
.ToSqlCommand(conn);
221+
222+
await using var reader = await command.ExecuteReaderAsync();
223+
return await _sqlGenerator.ReadStoredIdAndPositions(reader);
224+
}
225+
226+
public async Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
227+
{
228+
var positionsList = positions.ToList();
229+
if (positionsList.Count == 0)
230+
return;
231+
232+
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
233+
await using var command = _sqlGenerator
234+
.SetReplica(positionsList, newReplica, expectedReplica)
235+
.ToSqlCommand(conn);
236+
await command.ExecuteNonQueryAsync();
237+
}
238+
215239
public static StoredMessage ConvertToStoredMessage(byte[] content, long position, string? replica)
216240
{
217241
var arrs = BinaryPacker.Split(content, expectedPieces: 5);

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Runtime.Serialization;
23
using System.Text.Json;
34
using Cleipnir.ResilientFunctions.Domain;
@@ -671,6 +672,52 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
671672
return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]);
672673
}
673674

675+
public StoreCommand GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
676+
{
677+
var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList();
678+
var notInClause = replicas.Count == 0
679+
? ""
680+
: $" AND replica NOT IN ({replicas.StringJoin(", ")})";
681+
var sql = @$"
682+
SELECT id, position
683+
FROM {tablePrefix}_messages
684+
WHERE replica IS NOT NULL{notInClause}
685+
ORDER BY position;";
686+
687+
return StoreCommand.Create(sql);
688+
}
689+
690+
public async Task<List<Tuple<StoredId, long>>> ReadStoredIdAndPositions(MySqlDataReader reader)
691+
{
692+
var result = new List<Tuple<StoredId, long>>();
693+
while (await reader.ReadAsync())
694+
{
695+
var id = reader.GetString(0).ToGuid().ToStoredId();
696+
var position = reader.GetInt64(1);
697+
result.Add(Tuple.Create(id, position));
698+
}
699+
700+
return result;
701+
}
702+
703+
public StoreCommand SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
704+
{
705+
var positionsList = positions.ToList();
706+
707+
var sql = @$"
708+
UPDATE {tablePrefix}_messages
709+
SET replica = ?
710+
WHERE position IN ({string.Join(", ", positionsList.Select(_ => "?"))}) AND replica = ?";
711+
712+
var command = StoreCommand.Create(sql);
713+
command.AddParameter(newReplica.AsGuid.ToString("N"));
714+
foreach (var position in positionsList)
715+
command.AddParameter(position);
716+
command.AddParameter(expectedReplica.AsGuid.ToString("N"));
717+
718+
return command;
719+
}
720+
674721
public StoreCommand GetMessages(IEnumerable<StoredId> storedIds)
675722
{
676723
var sql = @$"

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
113113
[TestMethod]
114114
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
115115
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());
116+
117+
[TestMethod]
118+
public override Task CrashedReplicaMessagesAreFetched()
119+
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());
120+
121+
[TestMethod]
122+
public override Task MessageReplicaCanBeReassigned()
123+
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
116124
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,26 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
211211
return storedMessages;
212212
}
213213

214+
public async Task<List<Tuple<StoredId, long>>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
215+
{
216+
await using var conn = await CreateConnection();
217+
await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn);
218+
219+
await using var reader = await command.ExecuteReaderAsync();
220+
return await sqlGenerator.ReadStoredIdAndPositions(reader);
221+
}
222+
223+
public async Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
224+
{
225+
var positionsArray = positions.ToArray();
226+
if (positionsArray.Length == 0)
227+
return;
228+
229+
await using var conn = await CreateConnection();
230+
await using var command = sqlGenerator.SetReplica(positionsArray, newReplica, expectedReplica).ToNpgsqlCommand(conn);
231+
await command.ExecuteNonQueryAsync();
232+
}
233+
214234
public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica)
215235
{
216236
var arrs = BinaryPacker.Split(content, expectedPieces: 5);

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,43 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
594594

595595
return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]);
596596
}
597+
598+
public StoreCommand GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
599+
{
600+
var sql = @$"
601+
SELECT id, position
602+
FROM {tablePrefix}_messages
603+
WHERE replica IS NOT NULL AND replica != ALL($1)
604+
ORDER BY position;";
605+
606+
return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]);
607+
}
608+
609+
public async Task<List<Tuple<StoredId, long>>> ReadStoredIdAndPositions(NpgsqlDataReader reader)
610+
{
611+
var result = new List<Tuple<StoredId, long>>();
612+
while (await reader.ReadAsync())
613+
{
614+
var id = reader.GetGuid(0).ToStoredId();
615+
var position = reader.GetInt64(1);
616+
result.Add(Tuple.Create(id, position));
617+
}
618+
619+
return result;
620+
}
621+
622+
public StoreCommand SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
623+
{
624+
var sql = @$"
625+
UPDATE {tablePrefix}_messages
626+
SET replica = $1
627+
WHERE position = ANY($2) AND replica = $3";
628+
629+
return StoreCommand.Create(
630+
sql,
631+
values: [ newReplica.AsGuid, positions.ToArray(), expectedReplica.AsGuid ]
632+
);
633+
}
597634

598635
public async Task<Dictionary<StoredId, IReadOnlyList<StoredMessage>>> ReadMessagesForMultipleStores(NpgsqlDataReader reader)
599636
{

0 commit comments

Comments
 (0)