Skip to content

Commit bcfccfd

Browse files
committed
wip
1 parent 0bbdeed commit bcfccfd

4 files changed

Lines changed: 59 additions & 9 deletions

File tree

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System.Collections.Generic;
2+
using Cleipnir.ResilientFunctions.Domain;
3+
4+
namespace Cleipnir.ResilientFunctions.Storage.Session;
5+
6+
public class PositionsStorageSession : IStorageSession
7+
{
8+
public Dictionary<string, long> Positions { get; } = new();
9+
public long MaxPosition { get; set; }
10+
11+
public void Add(EffectId effectId)
12+
{
13+
var serializedEffectId = effectId.Serialize();
14+
if (Positions.ContainsKey(serializedEffectId))
15+
return;
16+
17+
var maxPosition = MaxPosition;
18+
MaxPosition++;
19+
Positions[serializedEffectId] = maxPosition;
20+
}
21+
}

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,9 @@ public static StoredEffect CreateState(StoredState storedState)
162162
StoredException: null
163163
);
164164
}
165-
166165
};
166+
public record StoredEffectWithPosition(StoredEffect Effect, long Position);
167+
167168
public record StoredState(StateId StateId, byte[] StateJson);
168169

169170
public record IdWithParam(StoredId StoredId, string HumanInstanceId, byte[]? Param);

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect
4545
if (changes.Count == 0)
4646
return;
4747

48-
await using var batch = sqlGenerator.UpdateEffects(changes).ToNpgsqlBatch();
48+
var positionsSession = session as PositionsStorageSession;
49+
await using var batch = sqlGenerator.UpdateEffects(changes, positionsSession).ToNpgsqlBatch();
4950
await using var conn = await CreateConnection();
5051
batch.WithConnection(conn);
5152

@@ -59,7 +60,7 @@ public async Task<Dictionary<StoredId, List<StoredEffect>>> GetEffectResults(IEn
5960

6061
await using var reader = await command.ExecuteReaderAsync();
6162
var effects = await sqlGenerator.ReadEffectsForIds(reader, storedIds);
62-
return effects;
63+
return effects.ToDictionary(kv => kv.Key, kv => kv.Value.Select(s => s.Effect).ToList());
6364
}
6465

6566
private string? _removeSql;
@@ -85,4 +86,29 @@ private async Task<NpgsqlConnection> CreateConnection()
8586
await conn.OpenAsync();
8687
return conn;
8788
}
89+
90+
private async Task<PositionsStorageSession> CreateSession(StoredId storedId)
91+
=> await CreateSessions([storedId]).SelectAsync(d => d[storedId]);
92+
private async Task<Dictionary<StoredId, PositionsStorageSession>> CreateSessions(IEnumerable<StoredId> storedIds)
93+
{
94+
sqlGenerator.GetEffects(storedIds);
95+
96+
}
97+
98+
private Dictionary<StoredId, PositionsStorageSession> CreateSessions(Dictionary<StoredId, List<StoredEffectWithPosition>> effects)
99+
{
100+
var dictionary = new Dictionary<StoredId, PositionsStorageSession>();
101+
foreach (var storedId in effects.Keys)
102+
{
103+
var session = new PositionsStorageSession();
104+
dictionary[storedId] = session;
105+
foreach (var (effect, position) in effects[storedId].OrderBy(e => e.Position))
106+
{
107+
session.MaxPosition = position;
108+
session.Positions[effect.EffectId.Serialize()] = position;
109+
}
110+
}
111+
112+
return dictionary;
113+
}
88114
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Cleipnir.ResilientFunctions.Helpers;
88
using Cleipnir.ResilientFunctions.Messaging;
99
using Cleipnir.ResilientFunctions.Storage;
10+
using Cleipnir.ResilientFunctions.Storage.Session;
1011
using Cleipnir.ResilientFunctions.Storage.Utils;
1112
using Npgsql;
1213

@@ -76,11 +77,11 @@ public async Task<IReadOnlyList<StoredEffect>> ReadEffects(NpgsqlDataReader read
7677

7778
return functions;
7879
}
79-
public async Task<Dictionary<StoredId, List<StoredEffect>>> ReadEffectsForIds(NpgsqlDataReader reader, IEnumerable<StoredId> storedIds)
80+
public async Task<Dictionary<StoredId, List<StoredEffectWithPosition>>> ReadEffectsForIds(NpgsqlDataReader reader, IEnumerable<StoredId> storedIds)
8081
{
81-
var effects = new Dictionary<StoredId, List<StoredEffect>>();
82+
var effects = new Dictionary<StoredId, List<StoredEffectWithPosition>>();
8283
foreach (var storedId in storedIds)
83-
effects[storedId] = new List<StoredEffect>();
84+
effects[storedId] = new List<StoredEffectWithPosition>();
8485

8586
while (await reader.ReadAsync())
8687
{
@@ -92,16 +93,16 @@ public async Task<Dictionary<StoredId, List<StoredEffect>>> ReadEffectsForIds(Np
9293
var effectId = reader.GetString(5);
9394

9495
var se = new StoredEffect(EffectId.Deserialize(effectId), status, result, JsonHelper.FromJson<StoredException>(exception));
95-
effects[id].Add(se);
96+
effects[id].Add(new StoredEffectWithPosition(se, position));
9697
}
9798

9899
return effects;
99100
}
100101

101-
public IEnumerable<StoreCommand> UpdateEffects(IReadOnlyList<StoredEffectChange> changes)
102+
public IEnumerable<StoreCommand> UpdateEffects(IReadOnlyList<StoredEffectChange> changes, PositionsStorageSession session)
102103
{
103104
var commands = new List<StoreCommand>(changes.Count);
104-
105+
105106
// INSERT
106107
{
107108
var sql= $@"
@@ -112,6 +113,7 @@ INSERT INTO {tablePrefix}_effects
112113

113114
foreach (var (storedId, _, _, storedEffect) in changes.Where(s => s.Operation == CrudOperation.Insert))
114115
{
116+
115117
var command = StoreCommand.Create(sql);
116118
command.AddParameter(storedId.AsGuid);
117119
command.AddParameter(storedEffect!.StoredEffectId.Value.ToLong());

0 commit comments

Comments
 (0)