Skip to content

Commit 54e46c2

Browse files
committed
Use PositionStorageSession in MariaDbEffectsStore
1 parent 8b40cfd commit 54e46c2

3 files changed

Lines changed: 123 additions & 77 deletions

File tree

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,23 @@ public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect
4040
if (changes.Count == 0)
4141
return;
4242

43+
var positionsSession = session as PositionsStorageSession ?? await CreateSession(storedId);
4344
await using var conn = await CreateConnection();
4445
await using var command = sqlGenerator
45-
.UpdateEffects(changes)!
46-
.ToSqlCommand(conn);
46+
.UpdateEffects(storedId, changes, positionsSession)
47+
?.ToSqlCommand(conn);
4748

48-
await command.ExecuteNonQueryAsync();
49+
if (command != null)
50+
await command.ExecuteNonQueryAsync();
4951
}
5052

5153
public async Task<Dictionary<StoredId, List<StoredEffect>>> GetEffectResults(IEnumerable<StoredId> storedIds)
54+
=> (await GetEffectResultsWithPositions(storedIds))
55+
.ToDictionary(kv => kv.Key, kv => kv.Value.Select(s => s.Effect).ToList());
56+
57+
private async Task<Dictionary<StoredId, List<StoredEffectWithPosition>>> GetEffectResultsWithPositions(IEnumerable<StoredId> storedIds)
5258
{
59+
storedIds = storedIds.ToList();
5360
await using var conn = await CreateConnection();
5461
await using var command = sqlGenerator.GetEffects(storedIds).ToSqlCommand(conn);
5562
await using var reader = await command.ExecuteReaderAsync();
@@ -80,4 +87,26 @@ private async Task<MySqlConnection> CreateConnection()
8087
await conn.OpenAsync();
8188
return conn;
8289
}
90+
91+
private async Task<PositionsStorageSession> CreateSession(StoredId storedId)
92+
=> await CreateSessions([storedId]).SelectAsync(d => d[storedId]);
93+
private async Task<Dictionary<StoredId, PositionsStorageSession>> CreateSessions(IEnumerable<StoredId> storedIds)
94+
=> CreateSessions(await GetEffectResultsWithPositions(storedIds));
95+
96+
private Dictionary<StoredId, PositionsStorageSession> CreateSessions(Dictionary<StoredId, List<StoredEffectWithPosition>> effects)
97+
{
98+
var dictionary = new Dictionary<StoredId, PositionsStorageSession>();
99+
foreach (var storedId in effects.Keys)
100+
{
101+
var session = new PositionsStorageSession();
102+
dictionary[storedId] = session;
103+
foreach (var (effect, position) in effects[storedId].OrderBy(e => e.Position))
104+
{
105+
session.MaxPosition = position;
106+
session.Positions[effect.EffectId.Serialize()] = position;
107+
}
108+
}
109+
110+
return dictionary;
111+
}
83112
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public async Task TruncateTables()
128128
.CreateFunction(storedId, humanInstanceId, param, leaseExpiration, postponeUntil, timestamp, parent, owner, ignoreDuplicate: true)
129129
.ToSqlCommand(conn);
130130
var affectedRows = await command.ExecuteNonQueryAsync();
131-
return affectedRows == 1 ? new EmptyStorageSession() : null;
131+
return affectedRows == 1 ? new PositionsStorageSession() : null;
132132
}
133133
else
134134
{
@@ -141,10 +141,13 @@ public async Task TruncateTables()
141141
storeCommand = storeCommand.Merge(messagesCommand);
142142
}
143143

144+
var session = new PositionsStorageSession();
144145
if (effects?.Any() ?? false)
145146
{
146147
var effectsCommand = _sqlGenerator.UpdateEffects(
147-
effects.Select(e => new StoredEffectChange(storedId, e.EffectId, CrudOperation.Insert, e)).ToList()
148+
storedId,
149+
changes: effects.Select(e => new StoredEffectChange(storedId, e.EffectId, CrudOperation.Insert, e)).ToList(),
150+
session
148151
);
149152
storeCommand = storeCommand.Merge(effectsCommand);
150153
}
@@ -155,7 +158,7 @@ public async Task TruncateTables()
155158
try
156159
{
157160
await command.ExecuteNonQueryAsync();
158-
return new EmptyStorageSession();
161+
return session;
159162
}
160163
catch (MySqlException ex) when (ex.Number == 1062)
161164
{
@@ -199,26 +202,35 @@ INSERT IGNORE INTO {_tablePrefix}
199202
var restartCommand = _sqlGenerator.RestartExecution(storedId, replicaId);
200203
var effectsCommand = _sqlGenerator.GetEffects(storedId);
201204
var messagesCommand = _sqlGenerator.GetMessages(storedId, skip: 0);
202-
205+
203206
await using var conn = await CreateOpenConnection(_connectionString);
204207
await using var command = StoreCommand
205208
.Merge(restartCommand, effectsCommand, messagesCommand)
206209
.ToSqlCommand(conn);
207-
210+
208211
var reader = await command.ExecuteReaderAsync();
209212
if (reader.RecordsAffected != 1)
210213
return null;
211-
214+
212215
var sf = await _sqlGenerator.ReadToStoredFunction(storedId, reader);
213216
if (sf?.OwnerId != replicaId)
214217
return null;
215218
await reader.NextResultAsync();
216-
217-
var effects = await _sqlGenerator.ReadEffects(reader);
219+
220+
var effectsWithPositions = await _sqlGenerator.ReadEffectsWithPositions(reader);
221+
var effects = effectsWithPositions.Select(e => e.Effect).ToList();
218222
await reader.NextResultAsync();
219-
223+
220224
var messages = await _sqlGenerator.ReadMessages(reader);
221-
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, new EmptyStorageSession());
225+
226+
var session = new PositionsStorageSession();
227+
foreach (var (effect, position) in effectsWithPositions.OrderBy(e => e.Position))
228+
{
229+
session.MaxPosition = position;
230+
session.Positions[effect.EffectId.Serialize()] = position;
231+
}
232+
233+
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, session);
222234
}
223235

224236
private string? _getExpiredFunctionsSql;

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 69 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Cleipnir.ResilientFunctions.Helpers;
44
using Cleipnir.ResilientFunctions.Messaging;
55
using Cleipnir.ResilientFunctions.Storage;
6+
using Cleipnir.ResilientFunctions.Storage.Session;
67
using Cleipnir.ResilientFunctions.Storage.Utils;
78
using MySqlConnector;
89

@@ -51,8 +52,11 @@ public StoreCommand GetEffects(StoredId storedId)
5152
}
5253

5354
public async Task<IReadOnlyList<StoredEffect>> ReadEffects(MySqlDataReader reader)
55+
=> (await ReadEffectsWithPositions(reader)).Select(e => e.Effect).ToList();
56+
57+
public async Task<IReadOnlyList<StoredEffectWithPosition>> ReadEffectsWithPositions(MySqlDataReader reader)
5458
{
55-
var functions = new List<StoredEffect>();
59+
var functions = new List<StoredEffectWithPosition>();
5660
while (await reader.ReadAsync())
5761
{
5862
var position = reader.GetInt64(0);
@@ -61,11 +65,14 @@ public async Task<IReadOnlyList<StoredEffect>> ReadEffects(MySqlDataReader reade
6165
var exception = reader.IsDBNull(3) ? null : reader.GetString(3);
6266
var effectId = reader.GetString(4);
6367
functions.Add(
64-
new StoredEffect(
65-
EffectId.Deserialize(effectId),
66-
status,
67-
result,
68-
JsonHelper.FromJson<StoredException>(exception)
68+
new StoredEffectWithPosition(
69+
new StoredEffect(
70+
EffectId.Deserialize(effectId),
71+
status,
72+
result,
73+
JsonHelper.FromJson<StoredException>(exception)
74+
),
75+
position
6976
)
7077
);
7178
}
@@ -84,12 +91,12 @@ public StoreCommand GetEffects(IEnumerable<StoredId> storedIds)
8491
return command;
8592
}
8693

87-
public async Task<Dictionary<StoredId, List<StoredEffect>>> ReadEffectsForMultipleStoredIds(MySqlDataReader reader, IEnumerable<StoredId> storedIds)
94+
public async Task<Dictionary<StoredId, List<StoredEffectWithPosition>>> ReadEffectsForMultipleStoredIds(MySqlDataReader reader, IEnumerable<StoredId> storedIds)
8895
{
89-
var storedEffects = new Dictionary<StoredId, List<StoredEffect>>();
96+
var storedEffects = new Dictionary<StoredId, List<StoredEffectWithPosition>>();
9097
foreach (var storedId in storedIds)
91-
storedEffects[storedId] = new List<StoredEffect>();
92-
98+
storedEffects[storedId] = new List<StoredEffectWithPosition>();
99+
93100
while (await reader.ReadAsync())
94101
{
95102
var id = reader.GetString(0).ToGuid().ToStoredId();
@@ -100,77 +107,75 @@ public async Task<Dictionary<StoredId, List<StoredEffect>>> ReadEffectsForMultip
100107
var effectId = reader.GetString(5);
101108

102109
storedEffects[id].Add(
103-
new StoredEffect(
104-
EffectId.Deserialize(effectId),
105-
status,
106-
result,
107-
JsonHelper.FromJson<StoredException>(exception)
110+
new StoredEffectWithPosition(
111+
new StoredEffect(
112+
EffectId.Deserialize(effectId),
113+
status,
114+
result,
115+
JsonHelper.FromJson<StoredException>(exception)
116+
),
117+
position
108118
)
109119
);
110120
}
111121

112122
return storedEffects;
113123
}
114124

115-
public StoreCommand UpdateEffects(IReadOnlyList<StoredEffectChange> changes)
125+
public StoreCommand? UpdateEffects(StoredId storedId, IReadOnlyList<StoredEffectChange> changes, PositionsStorageSession session)
116126
{
117-
var upsertCommand = default(StoreCommand);
118-
119-
if (changes.Any(c => c.Operation == CrudOperation.Update || c.Operation == CrudOperation.Insert))
127+
var storeCommands = new List<StoreCommand>(2);
128+
129+
// DELETES
120130
{
121-
var upserts = changes
122-
.Where(c => c.Operation == CrudOperation.Update || c.Operation == CrudOperation.Insert)
123-
.Select(c => new
124-
{
125-
Instance = c.StoredId.AsGuid,
126-
Position = c.EffectId.ToStoredEffectId().Value.ToLong(),
127-
WorkStatus = (int)c.StoredEffect!.WorkStatus,
128-
Result = c.StoredEffect!.Result,
129-
Exception = c.StoredEffect!.StoredException,
130-
EffectId = c.StoredEffect!.EffectId
131-
})
131+
var positionsToDelete = changes
132+
.Select(c =>
133+
session.Positions.ContainsKey(c.EffectId.Serialize())
134+
? session.Positions[c.EffectId.Serialize()]
135+
: -1)
136+
.Where(p => p != -1)
132137
.ToList();
133-
134-
var setSql = $@"
135-
INSERT INTO {tablePrefix}_effects
136-
(id, position, status, result, exception, effect_id)
137-
VALUES
138-
{"(?, ?, ?, ?, ?, ?)".Replicate(upserts.Count).StringJoin(", ")}
139-
ON DUPLICATE KEY UPDATE
140-
status = VALUES(status), result = VALUES(result), exception = VALUES(exception);";
141138

142-
upsertCommand = StoreCommand.Create(setSql);
143-
foreach (var upsert in upserts)
139+
if (positionsToDelete.Any())
144140
{
145-
upsertCommand.AddParameter(upsert.Instance.ToString("N"));
146-
upsertCommand.AddParameter(upsert.Position);
147-
upsertCommand.AddParameter(upsert.WorkStatus);
148-
upsertCommand.AddParameter(upsert.Result ?? (object) DBNull.Value);
149-
upsertCommand.AddParameter(JsonHelper.ToJson(upsert.Exception) ?? (object) DBNull.Value);
150-
upsertCommand.AddParameter(upsert.EffectId.Serialize().Value);
151-
}
141+
var removeSql = @$"
142+
DELETE FROM {tablePrefix}_effects
143+
WHERE id = '{storedId.AsGuid:N}' AND position IN ({positionsToDelete.Select(p => p.ToString()).StringJoin(separator: ", ")});";
144+
storeCommands.Add(StoreCommand.Create(removeSql));
145+
}
152146
}
153147

154-
var removeCommand = default(StoreCommand);
155-
if (changes.Any(c => c.Operation == CrudOperation.Delete))
148+
// INSERT and UPDATES
156149
{
157-
var removes = changes
158-
.Where(c => c.Operation == CrudOperation.Delete)
159-
.Select(c => new { Id = c.StoredId.AsGuid, Position = c.EffectId.ToStoredEffectId().Value.ToLong() })
160-
.GroupBy(a => a.Id, a => a.Position)
150+
var insertsAndUpdates = changes
151+
.Where(c => c.Operation == CrudOperation.Insert || c.Operation == CrudOperation.Update)
161152
.ToList();
162-
var predicates = removes
163-
.Select(r =>
164-
$"(id = '{r.Key:N}' AND position IN ({r.Select(id => $"{id}").StringJoin(", ")}))")
165-
.StringJoin($" OR {Environment.NewLine}");
166-
var removeSql = @$"
167-
DELETE FROM {tablePrefix}_effects
168-
WHERE {predicates}";
169-
170-
removeCommand = StoreCommand.Create(removeSql);
153+
154+
foreach (var (_, effectId, _, storedEffect) in insertsAndUpdates)
155+
{
156+
var position = session.Add(effectId.Serialize());
157+
158+
var sql = $@"
159+
INSERT INTO {tablePrefix}_effects
160+
(id, position, status, result, exception, effect_id)
161+
VALUES
162+
(?, ?, ?, ?, ?, ?);";
163+
164+
var command = StoreCommand.Create(sql);
165+
command.AddParameter(storedId.AsGuid.ToString("N"));
166+
command.AddParameter(position);
167+
command.AddParameter((int)storedEffect!.WorkStatus);
168+
command.AddParameter(storedEffect.Result ?? (object)DBNull.Value);
169+
command.AddParameter(JsonHelper.ToJson(storedEffect.StoredException) ?? (object)DBNull.Value);
170+
command.AddParameter(storedEffect.EffectId.Serialize().Value);
171+
172+
storeCommands.Add(command);
173+
}
171174
}
172-
173-
return StoreCommand.Merge([upsertCommand, removeCommand])!;
175+
176+
return storeCommands.Count == 0
177+
? null
178+
: StoreCommand.Merge(storeCommands)!;
174179
}
175180

176181
private string? _createFunctionSql;

0 commit comments

Comments
 (0)