Skip to content

Commit 8b40cfd

Browse files
authored
Use PositionStorageSession in SqlServerEffectsStore
1 parent a0e904b commit 8b40cfd

3 files changed

Lines changed: 110 additions & 128 deletions

File tree

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs

Lines changed: 59 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Cleipnir.ResilientFunctions.Helpers;
99
using Cleipnir.ResilientFunctions.Messaging;
1010
using Cleipnir.ResilientFunctions.Storage;
11+
using Cleipnir.ResilientFunctions.Storage.Session;
1112
using Cleipnir.ResilientFunctions.Storage.Utils;
1213
using Microsoft.Data.SqlClient;
1314

@@ -37,122 +38,61 @@ ELSE Expires
3738
return StoreCommand.Create(sql);
3839
}
3940

40-
public StoreCommand UpdateEffects(IReadOnlyList<StoredEffectChange> changes, string paramPrefix)
41+
public StoreCommand? UpdateEffects(StoredId storedId, IReadOnlyList<StoredEffectChange> changes, PositionsStorageSession session, string paramPrefix)
4142
{
42-
var storeCommands = new List<StoreCommand>(3);
43-
44-
//INSERTION
43+
var storeCommands = new List<StoreCommand>(2);
44+
45+
// DELETES
4546
{
46-
var inserts = changes
47-
.Where(c => c.Operation == CrudOperation.Insert)
48-
.Select(c => new
49-
{
50-
Id = c.StoredId.AsGuid,
51-
Position = c.EffectId.ToStoredEffectId().Value.ToLong(),
52-
WorkStatus = (int)c.StoredEffect!.WorkStatus,
53-
Result = c.StoredEffect!.Result,
54-
Exception = c.StoredEffect!.StoredException,
55-
EffectId = c.StoredEffect!.EffectId
56-
})
47+
var positionsToDelete = changes
48+
.Select(c =>
49+
session.Positions.ContainsKey(c.EffectId.Serialize())
50+
? session.Positions[c.EffectId.Serialize()]
51+
: -1)
52+
.Where(p => p != -1)
5753
.ToList();
58-
59-
var parameterValues = inserts
60-
.Select((_, i) => $"(@{paramPrefix}InsertionFlowId{i}, @{paramPrefix}InsertionPosition{i}, @{paramPrefix}InsertionEffectId{i}, @{paramPrefix}InsertionStatus{i}, @{paramPrefix}InsertionResult{i}, @{paramPrefix}InsertionException{i})")
61-
.StringJoin(", ");
62-
63-
var parameters = new List<ParameterValueAndName>();
64-
for (var i = 0; i < inserts.Count; i++)
54+
55+
if (positionsToDelete.Any())
6556
{
66-
var upsert = inserts[i];
67-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionFlowId{i}", upsert.Id));
68-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionPosition{i}", upsert.Position));
69-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionEffectId{i}", upsert.EffectId.Serialize().Value));
70-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionStatus{i}", upsert.WorkStatus));
71-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionResult{i}",
72-
upsert.Result ?? (object)SqlBinary.Null));
73-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}InsertionException{i}",
74-
JsonHelper.ToJson(upsert.Exception) ?? (object)DBNull.Value));
57+
var removeSql = @$"
58+
DELETE FROM {tablePrefix}_Effects
59+
WHERE Id = '{storedId.AsGuid}' AND Position IN ({positionsToDelete.Select(p => p.ToString()).StringJoin(separator: ", ")});";
60+
storeCommands.Add(StoreCommand.Create(removeSql));
7561
}
76-
77-
var insertSql = $@"
78-
INSERT INTO {tablePrefix}_Effects
79-
VALUES {parameterValues} ";
80-
81-
if (inserts.Any())
82-
storeCommands.Add(StoreCommand.Create(insertSql, parameters));
8362
}
8463

85-
//UPDATE
64+
// INSERT and UPDATES
8665
{
87-
var upserts = changes
88-
.Where(c => c.Operation == CrudOperation.Update)
89-
.Select(c => new
90-
{
91-
Id = c.StoredId.AsGuid,
92-
Position = c.EffectId.ToStoredEffectId().Value.ToLong(),
93-
WorkStatus = (int)c.StoredEffect!.WorkStatus,
94-
Result = c.StoredEffect!.Result,
95-
Exception = c.StoredEffect!.StoredException,
96-
EffectId = c.StoredEffect!.EffectId
97-
})
66+
var insertsAndUpdates = changes
67+
.Where(c => c.Operation == CrudOperation.Insert || c.Operation == CrudOperation.Update)
9868
.ToList();
9969

100-
var parameterValues = upserts
101-
.Select((_, i) =>
102-
$"(@{paramPrefix}Id{i}, @{paramPrefix}Position{i}, @{paramPrefix}EffectId{i}, @{paramPrefix}Status{i}, @{paramPrefix}Result{i}, @{paramPrefix}Exception{i})")
103-
.StringJoin(", ");
104-
105-
var setSql = $@"
106-
MERGE INTO {tablePrefix}_Effects
107-
USING (VALUES {parameterValues})
108-
AS source (Id, Position, EffectId, Status, Result, Exception)
109-
ON {tablePrefix}_Effects.Id = source.Id AND {tablePrefix}_Effects.Position = source.Position
110-
WHEN MATCHED THEN
111-
UPDATE SET Status = source.Status, Result = source.Result, Exception = source.Exception
112-
WHEN NOT MATCHED THEN
113-
INSERT (Id, Position, EffectId, Status, Result, Exception)
114-
VALUES (source.Id, source.Position, source.EffectId, source.Status, source.Result, source.Exception);";
115-
116-
var parameters = new List<ParameterValueAndName>();
117-
118-
for (var i = 0; i < upserts.Count; i++)
70+
for (var i = 0; i < insertsAndUpdates.Count; i++)
11971
{
120-
var upsert = upserts[i];
121-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}Id{i}", upsert.Id));
122-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}Position{i}", upsert.Position));
123-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}EffectId{i}", upsert.EffectId.Serialize().Value));
124-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}Status{i}", upsert.WorkStatus));
125-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}Result{i}",
126-
upsert.Result ?? (object)SqlBinary.Null));
127-
parameters.Add(new ParameterValueAndName($"@{paramPrefix}Exception{i}",
128-
JsonHelper.ToJson(upsert.Exception) ?? (object)DBNull.Value));
72+
var (_, effectId, _, storedEffect) = insertsAndUpdates[i];
73+
var position = session.Add(effectId.Serialize());
74+
75+
var sql = $@"
76+
INSERT INTO {tablePrefix}_Effects
77+
(Id, Position, EffectId, Status, Result, Exception)
78+
VALUES
79+
(@{paramPrefix}Id{i}, @{paramPrefix}Position{i}, @{paramPrefix}EffectId{i}, @{paramPrefix}Status{i}, @{paramPrefix}Result{i}, @{paramPrefix}Exception{i});";
80+
81+
var command = StoreCommand.Create(sql);
82+
command.AddParameter($"@{paramPrefix}Id{i}", storedId.AsGuid);
83+
command.AddParameter($"@{paramPrefix}Position{i}", position);
84+
command.AddParameter($"@{paramPrefix}EffectId{i}", storedEffect!.EffectId.Serialize().Value);
85+
command.AddParameter($"@{paramPrefix}Status{i}", (int)storedEffect.WorkStatus);
86+
command.AddParameter($"@{paramPrefix}Result{i}", storedEffect.Result ?? (object)SqlBinary.Null);
87+
command.AddParameter($"@{paramPrefix}Exception{i}", JsonHelper.ToJson(storedEffect.StoredException) ?? (object)DBNull.Value);
88+
89+
storeCommands.Add(command);
12990
}
130-
131-
if (upserts.Any())
132-
storeCommands.Add(StoreCommand.Create(setSql, parameters));
133-
}
134-
135-
//DELETE
136-
{
137-
var removes = changes
138-
.Where(c => c.Operation == CrudOperation.Delete)
139-
.Select(c => new { Id = c.StoredId.AsGuid, Position = c.EffectId.ToStoredEffectId().Value.ToLong() })
140-
.GroupBy(c => c.Id)
141-
.ToList();
142-
var predicates = removes
143-
.Select(r =>
144-
$"(Id = '{r.Key}' AND Position IN ({r.Select(t => $"{t.Position}").StringJoin(", ")}))")
145-
.StringJoin($" OR {Environment.NewLine}");
146-
147-
var removeSql = @$"
148-
DELETE FROM {tablePrefix}_effects
149-
WHERE {predicates}";
150-
if (removes.Any())
151-
storeCommands.Add(StoreCommand.Create(removeSql));
152-
15391
}
15492

155-
return StoreCommand.Merge(storeCommands)!;
93+
return storeCommands.Count == 0
94+
? null
95+
: StoreCommand.Merge(storeCommands)!;
15696
}
15797

15898
private string? _getEffectsSql;
@@ -173,20 +113,23 @@ public StoreCommand GetEffects(StoredId storedId, string paramPrefix = "")
173113
}
174114

175115
public async Task<IReadOnlyList<StoredEffect>> ReadEffects(SqlDataReader reader)
116+
=> (await ReadEffectsWithPositions(reader)).Select(e => e.Effect).ToList();
117+
118+
public async Task<IReadOnlyList<StoredEffectWithPosition>> ReadEffectsWithPositions(SqlDataReader reader)
176119
{
177-
var storedEffects = new List<StoredEffect>();
120+
var storedEffects = new List<StoredEffectWithPosition>();
178121
while (reader.HasRows && await reader.ReadAsync())
179122
{
180123
var position = reader.GetInt64(0);
181124
var effectId = reader.GetString(1);
182-
183-
var status = (WorkStatus) reader.GetInt32(2);
184-
var result = reader.IsDBNull(3) ? null : (byte[]) reader.GetValue(3);
125+
126+
var status = (WorkStatus)reader.GetInt32(2);
127+
var result = reader.IsDBNull(3) ? null : (byte[])reader.GetValue(3);
185128
var exception = reader.IsDBNull(4) ? null : reader.GetString(4);
186129

187130
var storedException = exception == null ? null : JsonSerializer.Deserialize<StoredException>(exception);
188131
var storedEffect = new StoredEffect(EffectId.Deserialize(effectId), status, result, storedException);
189-
storedEffects.Add(storedEffect);
132+
storedEffects.Add(new StoredEffectWithPosition(storedEffect, position));
190133
}
191134

192135
return storedEffects;
@@ -203,26 +146,26 @@ public StoreCommand GetEffects(IEnumerable<StoredId> storedIds)
203146
return command;
204147
}
205148

206-
public async Task<Dictionary<StoredId, List<StoredEffect>>> ReadEffectsForMultipleStoredIds(SqlDataReader reader, IEnumerable<StoredId> storedIds)
149+
public async Task<Dictionary<StoredId, List<StoredEffectWithPosition>>> ReadEffectsForMultipleStoredIds(SqlDataReader reader, IEnumerable<StoredId> storedIds)
207150
{
208-
var storedEffects = new Dictionary<StoredId, List<StoredEffect>>();
151+
var storedEffects = new Dictionary<StoredId, List<StoredEffectWithPosition>>();
209152
foreach (var storedId in storedIds)
210-
storedEffects[storedId] = new List<StoredEffect>();
211-
153+
storedEffects[storedId] = new List<StoredEffectWithPosition>();
154+
212155
while (reader.HasRows && await reader.ReadAsync())
213156
{
214157
var storedId = reader.GetGuid(0).ToStoredId();
215158
var position = reader.GetInt64(1);
216159
var effectId = reader.GetString(2);
217-
218-
var status = (WorkStatus) reader.GetInt32(3);
219-
var result = reader.IsDBNull(4) ? null : (byte[]) reader.GetValue(4);
160+
161+
var status = (WorkStatus)reader.GetInt32(3);
162+
var result = reader.IsDBNull(4) ? null : (byte[])reader.GetValue(4);
220163
var exception = reader.IsDBNull(5) ? null : reader.GetString(5);
221164

222165
var storedException = exception == null ? null : JsonSerializer.Deserialize<StoredException>(exception);
223166
var storedEffect = new StoredEffect(EffectId.Deserialize(effectId), status, result, storedException);
224-
225-
storedEffects[storedId].Add(storedEffect);
167+
168+
storedEffects[storedId].Add(new StoredEffectWithPosition(storedEffect, position));
226169
}
227170

228171
return storedEffects;

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,26 @@ public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect
5252
if (changes.Count == 0)
5353
return;
5454

55+
var positionsSession = session as PositionsStorageSession ?? await CreateSession(storedId);
5556
await using var conn = await CreateConnection();
5657
await using var command = sqlGenerator
57-
.UpdateEffects(changes, paramPrefix: "")
58-
.ToSqlCommand(conn);
58+
.UpdateEffects(storedId, changes, positionsSession, paramPrefix: "")
59+
?.ToSqlCommand(conn);
5960

60-
await command.ExecuteNonQueryAsync();
61+
if (command != null)
62+
await command.ExecuteNonQueryAsync();
6163
}
6264

6365
public async Task<Dictionary<StoredId, List<StoredEffect>>> GetEffectResults(IEnumerable<StoredId> storedIds)
66+
=> (await GetEffectResultsWithPositions(storedIds))
67+
.ToDictionary(kv => kv.Key, kv => kv.Value.Select(s => s.Effect).ToList());
68+
69+
private async Task<Dictionary<StoredId, List<StoredEffectWithPosition>>> GetEffectResultsWithPositions(IEnumerable<StoredId> storedIds)
6470
{
71+
storedIds = storedIds.ToList();
6572
await using var conn = await CreateConnection();
6673
await using var command = sqlGenerator.GetEffects(storedIds).ToSqlCommand(conn);
67-
74+
6875
await using var reader = await command.ExecuteReaderAsync();
6976
var effects = await sqlGenerator.ReadEffectsForMultipleStoredIds(reader, storedIds);
7077
return effects;
@@ -90,4 +97,26 @@ private async Task<SqlConnection> CreateConnection()
9097
await connection.OpenAsync();
9198
return connection;
9299
}
100+
101+
private async Task<PositionsStorageSession> CreateSession(StoredId storedId)
102+
=> await CreateSessions([storedId]).SelectAsync(d => d[storedId]);
103+
private async Task<Dictionary<StoredId, PositionsStorageSession>> CreateSessions(IEnumerable<StoredId> storedIds)
104+
=> CreateSessions(await GetEffectResultsWithPositions(storedIds));
105+
106+
private Dictionary<StoredId, PositionsStorageSession> CreateSessions(Dictionary<StoredId, List<StoredEffectWithPosition>> effects)
107+
{
108+
var dictionary = new Dictionary<StoredId, PositionsStorageSession>();
109+
foreach (var storedId in effects.Keys)
110+
{
111+
var session = new PositionsStorageSession();
112+
dictionary[storedId] = session;
113+
foreach (var (effect, position) in effects[storedId].OrderBy(e => e.Position))
114+
{
115+
session.MaxPosition = position;
116+
session.Positions[effect.EffectId.Serialize()] = position;
117+
}
118+
}
119+
120+
return dictionary;
121+
}
93122
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,24 +173,26 @@ public async Task TruncateTables()
173173
storeCommand = storeCommand.Merge(messagesCommand);
174174
}
175175

176+
var session = new PositionsStorageSession();
176177
if (effects?.Any() ?? false)
177178
{
178179
var effectsCommand = _sqlGenerator.UpdateEffects(
179-
effects.Select(e => new StoredEffectChange(storedId, e.EffectId, CrudOperation.Insert, e)).ToList(),
180+
storedId,
181+
changes: effects.Select(e => new StoredEffectChange(storedId, e.EffectId, CrudOperation.Insert, e)).ToList(),
182+
session,
180183
paramPrefix: "Effect"
181184
);
182185
storeCommand = storeCommand.Merge(effectsCommand);
183186
}
184-
187+
185188
await using var command = storeCommand.ToSqlCommand(conn);
186189
await command.ExecuteNonQueryAsync();
190+
return session;
187191
}
188192
catch (SqlException sqlException) when (sqlException.Number == SqlError.UNIQUENESS_VIOLATION)
189193
{
190194
return null;
191195
}
192-
193-
return new EmptyStorageSession();
194196
}
195197

196198
private string? _bulkScheduleFunctionsSql;
@@ -256,19 +258,27 @@ WHEN NOT MATCHED THEN
256258
await using var command = StoreCommand
257259
.Merge(restartCommand, effectsCommand, messagesCommand)
258260
.ToSqlCommand(conn);
259-
261+
260262
await using var reader = await command.ExecuteReaderAsync();
261263
var sf = _sqlGenerator.ReadToStoredFlow(storedId, reader);
262264
if (sf?.OwnerId != replicaId)
263265
return null;
264-
266+
265267
await reader.NextResultAsync();
266-
var effects = await _sqlGenerator.ReadEffects(reader);
268+
var effectsWithPositions = await _sqlGenerator.ReadEffectsWithPositions(reader);
269+
var effects = effectsWithPositions.Select(e => e.Effect).ToList();
267270

268271
await reader.NextResultAsync();
269272
var messages = await _sqlGenerator.ReadMessages(reader);
270273

271-
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, new EmptyStorageSession());
274+
var session = new PositionsStorageSession();
275+
foreach (var (effect, position) in effectsWithPositions.OrderBy(e => e.Position))
276+
{
277+
session.MaxPosition = position;
278+
session.Positions[effect.EffectId.Serialize()] = position;
279+
}
280+
281+
return new StoredFlowWithEffectsAndMessages(sf, effects, messages, session);
272282
}
273283

274284
private string? _getExpiredFunctionsSql;

0 commit comments

Comments
 (0)