Skip to content

Commit 834f253

Browse files
committed
PostgreSqlEffectsStore uses PositionsStorageSession
1 parent bcfccfd commit 834f253

17 files changed

Lines changed: 148 additions & 104 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectIdTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void EffectIdWithStateCanBeDeserialized()
2222
public void EffectIdWithContextCanBeDeserialized()
2323
{
2424
var parentEffect = new EffectId("SomeParentId", EffectType.Effect, Context: "ESomeParentContext");
25-
var effectId = new EffectId("SomeValue", EffectType.State, Context: parentEffect.Serialize());
25+
var effectId = new EffectId("SomeValue", EffectType.State, Context: parentEffect.Serialize().Value);
2626
var serializedId = effectId.Serialize();
2727
var deserializedId = EffectId.Deserialize(serializedId);
2828

@@ -33,7 +33,7 @@ public void EffectIdWithContextCanBeDeserialized()
3333
public void EffectIdWithContextAndEscapedCharactersCanBeDeserialized()
3434
{
3535
var parentEffect = new EffectId("SomeParentId", EffectType.Effect, Context: "");
36-
var effectId = new EffectId("Some.Value\\WithBackSlash", EffectType.State, Context: parentEffect.Serialize());
36+
var effectId = new EffectId("Some.Value\\WithBackSlash", EffectType.State, Context: parentEffect.Serialize().Value);
3737
var serializedId = effectId.Serialize();
3838
var deserializedId = EffectId.Deserialize(serializedId);
3939

@@ -45,7 +45,7 @@ public void EffectIdWithBackslashIsSerializedCorrectly()
4545
{
4646
var effectId = new EffectId("\\", EffectType.State, Context: "");
4747
var serializedId = effectId.Serialize();
48-
serializedId.ShouldBe("S\\\\");
48+
serializedId.Value.ShouldBe("S\\\\");
4949
var deserializedId = EffectId.Deserialize(serializedId);
5050
deserializedId.ShouldBe(effectId);
5151
}
@@ -55,7 +55,7 @@ public void EffectIdWithDotIsSerializedCorrectly()
5555
{
5656
var effectId = new EffectId(".", EffectType.State, Context: "");
5757
var serializedId = effectId.Serialize();
58-
serializedId.ShouldBe("S\\.");
58+
serializedId.Value.ShouldBe("S\\.");
5959
var deserializedId = EffectId.Deserialize(serializedId);
6060
deserializedId.ShouldBe(effectId);
6161
}
@@ -73,11 +73,11 @@ public void EffectIdWithoutStateCanBeDeserialized()
7373
[TestMethod]
7474
public void StoredEffectIdIsBasedOnSerializedEffectIdValue()
7575
{
76-
var effectId = new EffectId("SomeId", EffectType.Effect, Context: new EffectId("SomeParentId", EffectType.Effect, Context: "ESomeParentContext").Serialize());
76+
var effectId = new EffectId("SomeId", EffectType.Effect, Context: new EffectId("SomeParentId", EffectType.Effect, Context: "ESomeParentContext").Serialize().Value);
7777
var serializedEffectId = effectId.Serialize();
7878

7979
var storedEffectId = effectId.ToStoredEffectId();
80-
storedEffectId.Value.ShouldBe(StoredIdFactory.FromString(serializedEffectId));
80+
storedEffectId.Value.ShouldBe(StoredIdFactory.FromString(serializedEffectId.Value));
8181
}
8282

8383
[TestMethod]

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ public override Task UpsertEmptyCollectionOfEffectsDoesNotThrowException()
5050
[TestMethod]
5151
public override Task EffectsForDifferentIdsCanBeFetched()
5252
=> EffectsForDifferentIdsCanBeFetched(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
53+
54+
[TestMethod]
55+
public override Task OverwriteExistingEffectWorks()
56+
=> OverwriteExistingEffectWorks(FunctionStoreFactory.Create());
5357
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,4 +397,43 @@ protected async Task EffectsForDifferentIdsCanBeFetched(Task<IEffectsStore> stor
397397
resultsId2.Any(r => r.EffectId == storedEffect1.EffectId).ShouldBeTrue();
398398
resultsId2.Any(r => r.EffectId == storedEffect2.EffectId).ShouldBeTrue();
399399
}
400+
401+
public abstract Task OverwriteExistingEffectWorks();
402+
protected async Task OverwriteExistingEffectWorks(Task<IFunctionStore> storeTask)
403+
{
404+
var store = await storeTask;
405+
var effectStore = store.EffectsStore;
406+
var storedId = TestStoredId.Create();
407+
var storageSession = await store.CreateFunction(
408+
storedId,
409+
"SomeInstanceId",
410+
param: null,
411+
leaseExpiration: 0,
412+
postponeUntil: null,
413+
timestamp: 0,
414+
parent: null,
415+
owner: null
416+
);
417+
418+
var storedEffect1 = new StoredEffect(
419+
"EffectId1".ToEffectId(),
420+
WorkStatus.Started,
421+
Result: null,
422+
StoredException: null
423+
);
424+
var storedEffect2 = new StoredEffect(
425+
"EffectId1".ToEffectId(),
426+
WorkStatus.Completed,
427+
Result: null,
428+
StoredException: null
429+
);
430+
431+
await effectStore.SetEffectResult(storedId, storedEffect1.ToStoredChange(storedId, Insert), storageSession);
432+
await effectStore.SetEffectResult(storedId, storedEffect2.ToStoredChange(storedId, Update), storageSession);
433+
434+
var storedEffects = await effectStore.GetEffectResults(storedId);
435+
var storedEffect = storedEffects.Single();
436+
storedEffect.EffectId.ShouldBe("EffectId1".ToEffectId());
437+
storedEffect.WorkStatus.ShouldBe(WorkStatus.Completed);
438+
}
400439
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public Messages CreateMessages(
394394
effect,
395395
UtcNow,
396396
flowMinimumTimeout,
397-
publishTimeoutEvent: timeoutEvent => messageWriter.AppendMessage(timeoutEvent, idempotencyKey: timeoutEvent.TimeoutId.Serialize()),
397+
publishTimeoutEvent: timeoutEvent => messageWriter.AppendMessage(timeoutEvent, idempotencyKey: timeoutEvent.TimeoutId.Serialize().Value),
398398
unhandledExceptionHandler,
399399
flowId
400400
);

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public Task<T[]> WhenAll<T>(params Task<T>[] tasks)
150150
internal string TakeNextImplicitId() => EffectContext.CurrentContext.NextImplicitId();
151151

152152
internal EffectId CreateEffectId(string id, EffectType? type = null)
153-
=> id.ToEffectId(type, context: EffectContext.CurrentContext.Parent?.Serialize());
153+
=> id.ToEffectId(type, context: EffectContext.CurrentContext.Parent?.Serialize().Value);
154154

155155
public Task Flush() => effectResults.Flush();
156156
}

Core/Cleipnir.ResilientFunctions/Domain/EffectId.cs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ namespace Cleipnir.ResilientFunctions.Domain;
55

66
public record EffectId(string Id, EffectType Type, string Context)
77
{
8-
public string Serialize()
8+
public SerializedEffectId Serialize()
99
{
1010
var (id, type, context) = this;
1111
if (id.All(c => c != '.' && c != '\\'))
12-
return context == ""
13-
? $"{(char)type}{id}"
14-
: $"{context}.{(char)type}{id}";
12+
return new SerializedEffectId(
13+
context == ""
14+
? $"{(char)type}{id}"
15+
: $"{context}.{(char)type}{id}"
16+
);
1517

1618
var escapedIdList = new List<char>(id.Length * 2);
1719
foreach (var idChar in id)
@@ -31,11 +33,14 @@ public string Serialize()
3133
}
3234

3335
var escapedId = new string(escapedIdList.ToArray());
34-
return context == ""
35-
? $"{(char)type}{escapedId}"
36-
: $"{context}.{(char)type}{escapedId}";
36+
return new SerializedEffectId(
37+
context == ""
38+
? $"{(char)type}{escapedId}"
39+
: $"{context}.{(char)type}{escapedId}"
40+
);
3741
}
3842

43+
public static EffectId Deserialize(SerializedEffectId serialized) => Deserialize(serialized.Value);
3944
public static EffectId Deserialize(string serialized)
4045
{
4146
int pos;
@@ -61,9 +66,11 @@ public static EffectId CreateWithRootContext(string id, EffectType effectType =
6166
=> new(id, effectType, Context: "");
6267

6368
public static EffectId CreateWithCurrentContext(string id, EffectType effectType)
64-
=> new(id, effectType, EffectContext.CurrentContext.Parent?.Serialize() ?? "");
69+
=> new(id, effectType, EffectContext.CurrentContext.Parent?.Serialize().Value ?? "");
6570
}
6671

72+
public record SerializedEffectId(string Value);
73+
6774
public static class EffectIdExtensions
6875
{
6976
public static EffectId ToEffectId(this string value, EffectType? effectType = null, string? context = null) => new(value, effectType ?? EffectType.Effect, context ?? "");

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public async Task InnerCapture(string id, EffectType effectType, Func<Task> work
153153
{
154154
await InitializeIfRequired();
155155

156-
var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize());
156+
var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize().Value);
157157
EffectContext.SetParent(effectId);
158158

159159
lock (_sync)
@@ -226,7 +226,7 @@ public async Task<T> InnerCapture<T>(string id, EffectType effectType, Func<Task
226226
{
227227
await InitializeIfRequired();
228228

229-
var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize());
229+
var effectId = id.ToEffectId(effectType, context: effectContext.Parent?.Serialize().Value);
230230
EffectContext.SetParent(effectId);
231231

232232
lock (_sync)

Core/Cleipnir.ResilientFunctions/Storage/Session/PositionsStorageSession.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,13 @@ namespace Cleipnir.ResilientFunctions.Storage.Session;
55

66
public class PositionsStorageSession : IStorageSession
77
{
8-
public Dictionary<string, long> Positions { get; } = new();
9-
public long MaxPosition { get; set; }
8+
public Dictionary<SerializedEffectId, long> Positions { get; } = new();
9+
public long MaxPosition { get; set; } = -1;
1010

11-
public void Add(EffectId effectId)
11+
public long Add(SerializedEffectId id)
1212
{
13-
var serializedEffectId = effectId.Serialize();
14-
if (Positions.ContainsKey(serializedEffectId))
15-
return;
16-
17-
var maxPosition = MaxPosition;
18-
MaxPosition++;
19-
Positions[serializedEffectId] = maxPosition;
13+
var position = ++MaxPosition;
14+
Positions[id] = position;
15+
return position;
2016
}
2117
}

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public record StatusAndId(StoredId StoredId, Status Status, long Expiry);
108108
public record StoredEffectId(Guid Value)
109109
{
110110
public static StoredEffectId Create(EffectId effectId)
111-
=> new(StoredIdFactory.FromString(effectId.Serialize()));
111+
=> new(StoredIdFactory.FromString(effectId.Serialize().Value));
112112
}
113113

114114
public static class StoredEffectIdExtensions

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,8 @@ public override Task UpsertEmptyCollectionOfEffectsDoesNotThrowException()
4949
[TestMethod]
5050
public override Task EffectsForDifferentIdsCanBeFetched()
5151
=> EffectsForDifferentIdsCanBeFetched(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
52+
53+
[TestMethod]
54+
public override Task OverwriteExistingEffectWorks()
55+
=> OverwriteExistingEffectWorks(FunctionStoreFactory.Create());
5256
}

0 commit comments

Comments
 (0)