Skip to content

Commit 809ffc5

Browse files
committed
Take IReadOnlySet<ReplicaId> for GetCrashedReplicaMessages liveReplicas
A set is the natural type for the membership test and matches how live replicas are already represented elsewhere (ReplicaWatchdog builds a HashSet). The in-memory store now uses the set's Contains directly instead of copying into a local HashSet.
1 parent fb7fc06 commit 809ffc5

9 files changed

Lines changed: 10 additions & 11 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,7 @@ protected async Task CrashedReplicaMessagesAreFetched(Task<IFunctionStore> funct
12291229
var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position;
12301230
var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position;
12311231

1232-
var crashed = await messageStore.GetCrashedReplicaMessages([liveReplica]);
1232+
var crashed = await messageStore.GetCrashedReplicaMessages(new HashSet<ReplicaId> { liveReplica });
12331233

12341234
crashed.Count.ShouldBe(2);
12351235
crashed.ShouldContain(t => t.StoredId == flow1 && t.Position == bPosition);

Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public interface IMessageStore
3737
/// longer alive (its replica is not contained in <paramref name="liveReplicas"/>).
3838
/// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via <see cref="SetReplica"/>.
3939
/// </summary>
40-
Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas);
40+
Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas);
4141

4242
/// <summary>
4343
/// Re-assigns the messages at the provided positions to <paramref name="newReplica"/>,

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,15 +688,14 @@ public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(Rep
688688
}
689689
}
690690

691-
public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
691+
public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
692692
{
693-
var live = liveReplicas.ToHashSet();
694693
lock (_sync)
695694
{
696695
var result = new List<StoredIdAndPosition>();
697696
foreach (var (storedId, messages) in _messages)
698697
foreach (var (position, message) in messages.OrderBy(kv => kv.Key))
699-
if (!live.Contains(message.Replica))
698+
if (!liveReplicas.Contains(message.Replica))
700699
result.Add(new StoredIdAndPosition(storedId, position));
701700

702701
return result.ToTask();

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
212212
return storedMessages;
213213
}
214214

215-
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
215+
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
216216
{
217217
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
218218
await using var command = _sqlGenerator

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
671671
return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]);
672672
}
673673

674-
public StoreCommand GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
674+
public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
675675
{
676676
var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList();
677677
var sql = @$"

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
211211
return storedMessages;
212212
}
213213

214-
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
214+
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
215215
{
216216
await using var conn = await CreateConnection();
217217
await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn);

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
595595
return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]);
596596
}
597597

598-
public StoreCommand GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
598+
public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
599599
{
600600
var sql = @$"
601601
SELECT id, position

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
675675
return command;
676676
}
677677

678-
public StoreCommand GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
678+
public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
679679
{
680680
var replicas = liveReplicas.ToList();
681681
var sql = @$"

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
221221
return storedMessages;
222222
}
223223

224-
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IEnumerable<ReplicaId> liveReplicas)
224+
public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
225225
{
226226
await using var conn = await CreateConnection();
227227
await using var cmd = _sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToSqlCommand(conn);

0 commit comments

Comments
 (0)