Skip to content

Commit f9afb96

Browse files
committed
Added StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions-test and fixed InMemoryEffectsStore
1 parent 54e46c2 commit f9afb96

12 files changed

Lines changed: 143 additions & 33 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,8 @@ public override Task EffectsForDifferentIdsCanBeFetched()
5454
[TestMethod]
5555
public override Task OverwriteExistingEffectWorks()
5656
=> OverwriteExistingEffectWorks(FunctionStoreFactory.Create());
57+
58+
[TestMethod]
59+
public override Task StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions()
60+
=> StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions(FunctionStoreFactory.Create());
5761
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Linq;
1+
using System;
2+
using System.Linq;
23
using System.Threading.Tasks;
34
using Cleipnir.ResilientFunctions.Domain;
45
using Cleipnir.ResilientFunctions.Helpers;
@@ -436,4 +437,55 @@ protected async Task OverwriteExistingEffectWorks(Task<IFunctionStore> storeTask
436437
storedEffect.EffectId.ShouldBe("EffectId1".ToEffectId());
437438
storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed);
438439
}
440+
441+
public abstract Task StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions();
442+
protected async Task StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions(Task<IFunctionStore> storeTask)
443+
{
444+
var store = await storeTask;
445+
var effectStore = store.EffectsStore;
446+
var storedId = TestStoredId.Create();
447+
var storageSession1 = await store.CreateFunction(
448+
storedId,
449+
"SomeInstanceId",
450+
param: null,
451+
leaseExpiration: 0,
452+
postponeUntil: null,
453+
timestamp: 0,
454+
parent: null,
455+
owner: null
456+
);
457+
458+
var storageSession2 = await store.RestartExecution(
459+
storedId,
460+
owner: ReplicaId.Empty
461+
).SelectAsync(s => s!.StorageSession);
462+
463+
var storedEffect1 = new StoredEffect(
464+
"EffectId1".ToEffectId(),
465+
WorkStatus.Started,
466+
Result: null,
467+
StoredException: null
468+
);
469+
var storedEffect2 = new StoredEffect(
470+
"EffectId1".ToEffectId(),
471+
WorkStatus.Completed,
472+
Result: null,
473+
StoredException: null
474+
);
475+
476+
await effectStore.SetEffectResult(storedId, storedEffect1.ToStoredChange(storedId, Insert), storageSession1);
477+
try
478+
{
479+
await effectStore.SetEffectResult(storedId, storedEffect2.ToStoredChange(storedId, Update), storageSession2);
480+
}
481+
catch (Exception)
482+
{
483+
// ignored
484+
}
485+
486+
var storedEffects = await effectStore.GetEffectResults(storedId);
487+
var storedEffect = storedEffects.Single();
488+
storedEffect.EffectId.ShouldBe("EffectId1".ToEffectId());
489+
storedEffect.WorkStatus.ShouldBe(WorkStatus.Started);
490+
}
439491
}

Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,5 @@ public record SerializedEffectId(string Value);
7474
public static class EffectIdExtensions
7575
{
7676
public static EffectId ToEffectId(this string value, EffectType? effectType = null, string? context = null) => new(value, effectType ?? EffectType.Effect, context ?? "");
77+
public static SerializedEffectId ToSerializedEffectId(this string value) => new(value);
7778
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Cleipnir.ResilientFunctions.Storage;
99

1010
public class InMemoryEffectsStore : IEffectsStore
1111
{
12-
private readonly Dictionary<StoredId, Dictionary<StoredEffectId, StoredEffect>> _effects = new();
12+
private readonly Dictionary<StoredId, Dictionary<long, StoredEffect>> _effects = new();
1313
private readonly Lock _sync = new();
1414

1515
public Task Initialize() => Task.CompletedTask;
@@ -24,19 +24,28 @@ public Task Truncate()
2424

2525
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes, IStorageSession? session)
2626
{
27+
var positionsSession = session as PositionsStorageSession ?? CreateStorageSession(storedId);
28+
2729
lock (_sync)
2830
{
29-
foreach (var storedEffect in changes.Where(c => c.Operation != CrudOperation.Delete).Select(c => c.StoredEffect!))
31+
if (!_effects.ContainsKey(storedId))
32+
_effects[storedId] = new Dictionary<long, StoredEffect>();
33+
34+
foreach (var change in changes)
3035
{
31-
if (!_effects.ContainsKey(storedId))
32-
_effects[storedId] = new Dictionary<StoredEffectId, StoredEffect>();
33-
34-
_effects[storedId][storedEffect.StoredEffectId] = storedEffect;
36+
var position = positionsSession.Get(change.EffectId.Serialize());
37+
if (position != null)
38+
{
39+
positionsSession.Remove(change.EffectId.Serialize());
40+
_effects[storedId].Remove(position.Value);
41+
}
3542
}
3643

37-
foreach (var effectId in changes.Where(c => c.Operation == CrudOperation.Delete).Select(c => c.EffectId))
38-
if (_effects.ContainsKey(storedId))
39-
_effects[storedId].Remove(effectId.ToStoredEffectId());
44+
foreach (var change in changes.Where(c => c.Operation != CrudOperation.Delete))
45+
{
46+
var position = positionsSession.Add(change.EffectId.Serialize());
47+
_effects[storedId].Add(position, change.StoredEffect!);
48+
}
4049
}
4150

4251
return Task.CompletedTask;
@@ -54,20 +63,26 @@ public Task<Dictionary<StoredId, List<StoredEffect>>> GetEffectResults(IEnumerab
5463
return dict.ToTask();
5564
}
5665

57-
public Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId)
66+
public Task Remove(StoredId storedId)
5867
{
5968
lock (_sync)
60-
if (_effects.ContainsKey(storedId))
61-
_effects[storedId].Remove(effectId);
69+
_effects.Remove(storedId);
6270

6371
return Task.CompletedTask;
6472
}
6573

66-
public Task Remove(StoredId storedId)
74+
internal PositionsStorageSession CreateStorageSession(StoredId storedId)
6775
{
76+
var session = new PositionsStorageSession();
6877
lock (_sync)
69-
_effects.Remove(storedId);
78+
{
79+
if (!_effects.ContainsKey(storedId))
80+
return session;
81+
82+
foreach(var (position, effect) in _effects[storedId])
83+
session.Set(effect.EffectId.Serialize(), position);
84+
}
7085

71-
return Task.CompletedTask;
86+
return session;
7287
}
7388
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ public InMemoryFunctionStore()
8484
.GetAwaiter()
8585
.GetResult();
8686

87-
return Task.FromResult<IStorageSession?>(new EmptyStorageSession());
87+
var session = _effectsStore.CreateStorageSession(storedId);
88+
89+
return Task.FromResult<IStorageSession?>(session);
8890
}
8991
}
9092

@@ -138,7 +140,7 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
138140
sf,
139141
effects,
140142
messages,
141-
new EmptyStorageSession()
143+
_effectsStore.CreateStorageSession(storedId)
142144
);
143145
}
144146

Core/Cleipnir.ResilientFunctions/Storage/Session/EmptyStorageSession.cs

Lines changed: 0 additions & 6 deletions
This file was deleted.

Core/Cleipnir.ResilientFunctions/Storage/Session/PositionsStorageSession.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using Cleipnir.ResilientFunctions.Domain;
34

@@ -14,4 +15,14 @@ public long Add(SerializedEffectId id)
1415
Positions[id] = position;
1516
return position;
1617
}
18+
19+
public void Set(SerializedEffectId id, long position)
20+
{
21+
MaxPosition = Math.Max(MaxPosition, position);
22+
Positions[id] = position;
23+
}
24+
25+
public void Remove(SerializedEffectId id) => Positions.Remove(id);
26+
27+
public long? Get(SerializedEffectId id) => Positions.TryGetValue(id, out var position) ? position : null;
1728
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,8 @@ public override Task EffectsForDifferentIdsCanBeFetched()
5353
[TestMethod]
5454
public override Task OverwriteExistingEffectWorks()
5555
=> OverwriteExistingEffectWorks(FunctionStoreFactory.Create());
56+
57+
[TestMethod]
58+
public override Task StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions()
59+
=> StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions(FunctionStoreFactory.Create());
5660
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,8 @@ public override Task EffectsForDifferentIdsCanBeFetched()
5454
[TestMethod]
5555
public override Task OverwriteExistingEffectWorks()
5656
=> OverwriteExistingEffectWorks(FunctionStoreFactory.Create());
57+
58+
[TestMethod]
59+
public override Task StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions()
60+
=> StoreCanHandleMultipleEffectsWithSameIdOnDifferentSessions(FunctionStoreFactory.Create());
5761
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,12 @@ INSERT INTO {_tableName}
247247
if (sf?.OwnerId != replicaId)
248248
return null;
249249
await reader.NextResultAsync();
250-
var effects = await _sqlGenerator.ReadEffects(reader);
251-
await reader.NextResultAsync();
250+
var (effects, session) = await _sqlGenerator.ReadEffects(reader);
252251

252+
await reader.NextResultAsync();
253253
var messages = await _sqlGenerator.ReadMessages(reader);
254-
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, new EmptyStorageSession());
254+
255+
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, session);
255256
}
256257

257258
private string? _getExpiredFunctionsSql;

0 commit comments

Comments
 (0)