Skip to content

Commit e34b439

Browse files
committed
Add ResetInterrupted method to function stores
Add ResetInterrupted(IReadOnlyList<StoredId>) to IFunctionStore and all implementations to allow clearing the interrupted flag for a set of stored IDs.
1 parent f22b63c commit e34b439

10 files changed

Lines changed: 87 additions & 7 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ public Task<bool> Interrupt(StoredId storedId)
187187

188188
public Task Interrupt(IReadOnlyList<StoredId> storedIds) => _inner.Interrupt(storedIds);
189189

190+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds) => _inner.ResetInterrupted(storedIds);
191+
190192
public Task<bool?> Interrupted(StoredId storedId)
191193
=> _crashed
192194
? Task.FromException<bool?>(new TimeoutException())

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ Task<bool> SuspendFunction(
104104

105105
Task<bool> Interrupt(StoredId storedId);
106106
Task Interrupt(IReadOnlyList<StoredId> storedIds);
107-
Task<bool?> Interrupted(StoredId storedId);
107+
Task ResetInterrupted(IReadOnlyList<StoredId> storedIds);
108+
Task<bool?> Interrupted(StoredId storedId);
108109

109110
Task<Status?> GetFunctionStatus(StoredId storedId);
110111
Task<IReadOnlyList<StatusAndId>> GetFunctionsStatus(IEnumerable<StoredId> storedIds);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,18 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
450450
await Interrupt(storedId);
451451
}
452452

453+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
454+
{
455+
lock (_sync)
456+
{
457+
foreach (var storedId in storedIds)
458+
if (_states.TryGetValue(storedId, out var state))
459+
state.Interrupted = false;
460+
}
461+
462+
return Task.CompletedTask;
463+
}
464+
453465
public Task<bool?> Interrupted(StoredId storedId)
454466
{
455467
lock (_sync)

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ public Task Interrupt(IReadOnlyList<StoredId> storedIds)
170170
? Task.FromException(new TimeoutException())
171171
: _inner.Interrupt(storedIds);
172172

173+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
174+
=> _crashed
175+
? Task.FromException(new TimeoutException())
176+
: _inner.ResetInterrupted(storedIds);
177+
173178
public Task<bool?> Interrupted(StoredId storedId)
174179
=> _crashed
175180
? Task.FromException<bool?>(new TimeoutException())

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,12 +670,22 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
670670
{
671671
if (storedIds.Count == 0)
672672
return;
673-
673+
674674
await using var conn = await CreateOpenConnection(_connectionString);
675675
await using var cmd = _sqlGenerator.Interrupt(storedIds).ToSqlCommand(conn);
676676
await cmd.ExecuteNonQueryAsync();
677677
}
678678

679+
public async Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
680+
{
681+
if (storedIds.Count == 0)
682+
return;
683+
684+
await using var conn = await CreateOpenConnection(_connectionString);
685+
await using var cmd = _sqlGenerator.ResetInterrupted(storedIds).ToSqlCommand(conn);
686+
await cmd.ExecuteNonQueryAsync();
687+
}
688+
679689
private string? _setParametersSql;
680690
public async Task<bool> SetParameters(
681691
StoredId storedId,

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,17 @@ ELSE expires
3333

3434
return StoreCommand.Create(sql);
3535
}
36-
36+
37+
public StoreCommand ResetInterrupted(IEnumerable<StoredId> storedIds)
38+
{
39+
var sql = @$"
40+
UPDATE {tablePrefix}
41+
SET interrupted = FALSE
42+
WHERE Id IN ({storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ")});";
43+
44+
return StoreCommand.Create(sql);
45+
}
46+
3747
private string? _getEffectResultsSql;
3848
public StoreCommand GetEffects(StoredId storedId)
3949
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,12 +668,22 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
668668
{
669669
if (storedIds.Count == 0)
670670
return;
671-
671+
672672
await using var conn = await CreateConnection();
673673
await using var command = _sqlGenerator.Interrupt(storedIds).ToNpgsqlCommand(conn);
674674
await command.ExecuteNonQueryAsync();
675675
}
676676

677+
public async Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
678+
{
679+
if (storedIds.Count == 0)
680+
return;
681+
682+
await using var conn = await CreateConnection();
683+
await using var command = _sqlGenerator.ResetInterrupted(storedIds).ToNpgsqlCommand(conn);
684+
await command.ExecuteNonQueryAsync();
685+
}
686+
677687
private string? _interruptedSql;
678688
public async Task<bool?> Interrupted(StoredId storedId)
679689
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,17 @@ ELSE expires
3737

3838
return StoreCommand.Create(sql, values: [ storedIds.Select(id => id.AsGuid).ToArray() ]);
3939
}
40-
40+
41+
public StoreCommand ResetInterrupted(IEnumerable<StoredId> storedIds)
42+
{
43+
var sql = @$"
44+
UPDATE {tablePrefix}
45+
SET interrupted = FALSE
46+
WHERE Id = ANY($1)";
47+
48+
return StoreCommand.Create(sql, values: [ storedIds.Select(id => id.AsGuid).ToArray() ]);
49+
}
50+
4151
private string? _getEffectResultsSql;
4252
public StoreCommand GetEffects(StoredId storedId)
4353
{

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,17 @@ ELSE Expires
3838

3939
return StoreCommand.Create(sql);
4040
}
41-
41+
42+
public StoreCommand ResetInterrupted(IEnumerable<StoredId> storedIds)
43+
{
44+
var sql = @$"
45+
UPDATE {tablePrefix}
46+
SET Interrupted = 0
47+
WHERE Id IN ({storedIds.Select(id => $"'{id.AsGuid}'").StringJoin(", ")});";
48+
49+
return StoreCommand.Create(sql);
50+
}
51+
4252
public StoreCommand InsertEffects(StoredId storedId, IReadOnlyList<StoredEffectChange> changes, SnapshotStorageSession session, string paramPrefix)
4353
{
4454
foreach (var change in changes)

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,12 +730,22 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
730730
{
731731
if (storedIds.Count == 0)
732732
return;
733-
733+
734734
await using var conn = await _connFunc();
735735
await using var cmd = _sqlGenerator.Interrupt(storedIds).ToSqlCommand(conn);
736736
await cmd.ExecuteNonQueryAsync();
737737
}
738738

739+
public async Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
740+
{
741+
if (storedIds.Count == 0)
742+
return;
743+
744+
await using var conn = await _connFunc();
745+
await using var cmd = _sqlGenerator.ResetInterrupted(storedIds).ToSqlCommand(conn);
746+
await cmd.ExecuteNonQueryAsync();
747+
}
748+
739749
private string? _setParametersSql;
740750
public async Task<bool> SetParameters(
741751
StoredId storedId,

0 commit comments

Comments
 (0)