Skip to content

Commit fd640a3

Browse files
committed
Fail Suspend/Postpone when function is currently interrupted
Previously, SuspendFunction and PostponeFunction silently absorbed the interrupted flag at the storage layer: they converted the requested state into Postponed(expires=0) and cleared the flag, returning true. The runtime never saw that an interrupt had been observed. Now both methods return false when the row has interrupted=true, letting the caller treat the interrupt as a real signal: - SuspendFunction: gains AND interrupted = FALSE in its WHERE clause - PostponeFunction: same, but gated on a new failIfInterrupted=true default param so InvocationHelper.Reschedule can still pass false to consume the interrupt and queue the function for immediate re-pickup - InvocationHelper.PersistResult Postpone case now returns Reschedule on false (was: Success on both branches, swallowing failure) Replaces 4 obsolete tests that pinned the old silent-convert behaviour with 3 new tests covering the new contract across all 4 stores.
1 parent 2053d22 commit fd640a3

16 files changed

Lines changed: 252 additions & 209 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,8 @@ public override Task EpochIsNotIncrementedOnSuspension()
127127
=> EpochIsNotIncrementedOnSuspension(FunctionStoreFactory.Create());
128128

129129
[TestMethod]
130-
public override Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction()
131-
=> SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction(FunctionStoreFactory.Create());
132-
133-
[TestMethod]
134-
public override Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch()
135-
=> FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch(FunctionStoreFactory.Create());
130+
public override Task SuspendingInterruptedFunctionReturnsFalse()
131+
=> SuspendingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());
136132

137133
[TestMethod]
138134
public override Task InterruptCountCanBeIncrementedForExecutingFunction()
@@ -187,16 +183,16 @@ public override Task MultipleFunctionsStatusCanBeFetched()
187183
=> MultipleFunctionsStatusCanBeFetched(FunctionStoreFactory.Create());
188184

189185
[TestMethod]
190-
public override Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted()
191-
=> InterruptedFunctionIsNotPostponedToZeroWhenInterrupted(FunctionStoreFactory.Create());
186+
public override Task PostponingInterruptedFunctionReturnsFalse()
187+
=> PostponingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());
192188

193189
[TestMethod]
194190
public override Task InterruptNothingWorks()
195191
=> InterruptNothingWorks(FunctionStoreFactory.Create());
196192

197193
[TestMethod]
198-
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
199-
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
194+
public override Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag()
195+
=> PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag(FunctionStoreFactory.Create());
200196

201197
[TestMethod]
202198
public override Task FunctionCanBeCreatedWithMessagesAndEffects()

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 27 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -927,14 +927,14 @@ await store.SuspendFunction(
927927
storedFunction.ShouldNotBeNull();
928928
}
929929

930-
public abstract Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction();
931-
protected async Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction(Task<IFunctionStore> storeTask)
930+
public abstract Task SuspendingInterruptedFunctionReturnsFalse();
931+
protected async Task SuspendingInterruptedFunctionReturnsFalse(Task<IFunctionStore> storeTask)
932932
{
933933
var functionId = TestStoredId.Create();
934-
934+
935935
var store = await storeTask;
936936
await store.CreateFunction(
937-
functionId,
937+
functionId,
938938
"humanInstanceId",
939939
param: Test.SimpleStoredParameter,
940940
leaseExpiration: DateTime.UtcNow.Ticks,
@@ -944,67 +944,20 @@ await store.CreateFunction(
944944
owner: ReplicaId.Empty
945945
).ShouldNotBeNullAsync();
946946

947-
await store.MessageStore.AppendMessage(
948-
functionId,
949-
new StoredMessage("some message".ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
950-
);
947+
await store.Interrupt(functionId).ShouldBeTrueAsync();
951948

952-
await store.Interrupt(functionId);
953-
954949
await store.SuspendFunction(
955950
functionId,
956951
timestamp: DateTime.UtcNow.Ticks,
957952
expectedReplica: ReplicaId.Empty,
958953
effects: null,
959954
messages: null,
960955
storageSession: null
961-
).ShouldBeTrueAsync();
962-
963-
var storedFunction = await store.GetFunction(functionId);
964-
storedFunction.ShouldNotBeNull();
965-
storedFunction.Status.ShouldBe(Status.Postponed);
966-
storedFunction.Expires.ShouldBe(0);
967-
}
968-
969-
public abstract Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch();
970-
protected async Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch(Task<IFunctionStore> storeTask)
971-
{
972-
var functionId = TestStoredId.Create();
973-
974-
var store = await storeTask;
975-
await store.CreateFunction(
976-
functionId,
977-
"humanInstanceId",
978-
param: Test.SimpleStoredParameter,
979-
leaseExpiration: DateTime.UtcNow.Ticks,
980-
postponeUntil: null,
981-
timestamp: DateTime.UtcNow.Ticks,
982-
parent: null,
983-
owner: ReplicaId.Empty
984-
).ShouldNotBeNullAsync();
985-
986-
await store.MessageStore.AppendMessage(
987-
functionId,
988-
new StoredMessage("hello world".ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
989-
);
956+
).ShouldBeFalseAsync();
990957

991-
await store.Interrupt(functionId);
992-
993-
var success = await store.SuspendFunction(
994-
functionId,
995-
timestamp: DateTime.UtcNow.Ticks,
996-
expectedReplica: ReplicaId.Empty,
997-
effects: null,
998-
messages: null,
999-
storageSession: null
1000-
);
1001-
1002-
success.ShouldBeTrue();
1003-
1004-
var storedFunction = await store.GetFunction(functionId);
1005-
storedFunction.ShouldNotBeNull();
1006-
storedFunction.Status.ShouldBe(Status.Postponed);
1007-
storedFunction.Expires.ShouldBe(0);
958+
var storedFunction = await store.GetFunction(functionId).ShouldNotBeNullAsync();
959+
storedFunction.Status.ShouldBe(Status.Executing);
960+
(await store.GetInterruptedFunctions()).Any(id => id == functionId).ShouldBeTrue();
1008961
}
1009962

1010963
public abstract Task InterruptCountCanBeIncrementedForExecutingFunction();
@@ -1411,8 +1364,8 @@ await store.SucceedFunction(
14111364
statusAndEpoch2.Status.ShouldBe(Status.Succeeded);
14121365
}
14131366

1414-
public abstract Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted();
1415-
protected async Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted(Task<IFunctionStore> storeTask)
1367+
public abstract Task PostponingInterruptedFunctionReturnsFalse();
1368+
protected async Task PostponingInterruptedFunctionReturnsFalse(Task<IFunctionStore> storeTask)
14161369
{
14171370
var storedId = TestStoredId.Create();
14181371
var store = await storeTask;
@@ -1430,20 +1383,19 @@ await store.CreateFunction(
14301383

14311384
await store.Interrupt([storedId]);
14321385

1433-
var success = await store.PostponeFunction(
1386+
await store.PostponeFunction(
14341387
storedId,
1435-
postponeUntil: 0,
1436-
timestamp: 100,
1388+
postponeUntil: DateTime.UtcNow.Ticks + 100_000,
1389+
timestamp: DateTime.UtcNow.Ticks,
14371390
expectedReplica: ReplicaId.Empty,
14381391
effects: null,
14391392
messages: null,
14401393
storageSession: null
1441-
);
1442-
success.ShouldBeTrue();
1394+
).ShouldBeFalseAsync();
14431395

14441396
var sf = await store.GetFunction(storedId).ShouldNotBeNullAsync();
1445-
sf.Status.ShouldBe(Status.Postponed);
1446-
sf.Expires.ShouldBe(0);
1397+
sf.Status.ShouldBe(Status.Executing);
1398+
(await store.GetInterruptedFunctions()).Any(id => id == storedId).ShouldBeTrue();
14471399
}
14481400

14491401
public abstract Task InterruptNothingWorks();
@@ -1452,9 +1404,9 @@ protected async Task InterruptNothingWorks(Task<IFunctionStore> storeTask)
14521404
var store = await storeTask;
14531405
await store.Interrupt(storedIds: []);
14541406
}
1455-
1456-
public abstract Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction();
1457-
protected async Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(Task<IFunctionStore> storeTask)
1407+
1408+
public abstract Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag();
1409+
protected async Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag(Task<IFunctionStore> storeTask)
14581410
{
14591411
var storedId = TestStoredId.Create();
14601412
var store = await storeTask;
@@ -1472,19 +1424,21 @@ await store.CreateFunction(
14721424

14731425
await store.Interrupt([storedId]);
14741426

1475-
var success = await store.PostponeFunction(
1427+
await store.PostponeFunction(
14761428
storedId,
14771429
postponeUntil: 0,
1478-
timestamp: 0,
1430+
timestamp: DateTime.UtcNow.Ticks,
14791431
expectedReplica: ReplicaId.Empty,
14801432
effects: null,
14811433
messages: null,
1482-
storageSession: null
1483-
);
1484-
success.ShouldBeTrue();
1434+
storageSession: null,
1435+
failIfInterrupted: false
1436+
).ShouldBeTrueAsync();
14851437

14861438
var sf = await store.GetFunction(storedId).ShouldNotBeNullAsync();
14871439
sf.Status.ShouldBe(Status.Postponed);
1440+
sf.Expires.ShouldBe(0);
1441+
(await store.GetInterruptedFunctions()).Any(id => id == storedId).ShouldBeFalse();
14881442
}
14891443

14901444
public abstract Task FunctionCanBeCreatedWithMessagesAndEffects();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,18 @@ public async Task<bool> PostponeFunction(
131131
ReplicaId expectedReplica,
132132
IReadOnlyList<StoredEffect>? effects,
133133
IReadOnlyList<StoredMessage>? messages,
134-
IStorageSession? storageSession
134+
IStorageSession? storageSession,
135+
bool failIfInterrupted = true
135136
)
136137
{
137138
if (_crashed)
138139
throw new TimeoutException();
139140

140-
var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession);
141+
var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession, failIfInterrupted);
141142
AfterPostponeFunctionFlag.Raise();
142143

143144
return result;
144-
}
145+
}
145146

146147
public Task<bool> FailFunction(
147148
StoredId storedId,

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public async Task<PersistResultOutcome> PersistResult(StoredId storedId, Result<
138138
effects: null,
139139
messages: null,
140140
storageSession
141-
) ? PersistResultOutcome.Success : PersistResultOutcome.Success;
141+
) ? PersistResultOutcome.Success : PersistResultOutcome.Reschedule;
142142
case Outcome.Fail:
143143
return await _functionStore.FailFunction(
144144
storedId,
@@ -508,7 +508,8 @@ public async Task<bool> Reschedule(StoredId id, TParam param)
508508
_replicaId,
509509
effects: null,
510510
messages: null,
511-
storageSession: null
511+
storageSession: null,
512+
failIfInterrupted: false
512513
);
513514
}
514515
}

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ Task<bool> PostponeFunction(
7373
ReplicaId expectedReplica,
7474
IReadOnlyList<StoredEffect>? effects,
7575
IReadOnlyList<StoredMessage>? messages,
76-
IStorageSession? storageSession
76+
IStorageSession? storageSession,
77+
bool failIfInterrupted = true
7778
);
7879

7980
Task<bool> FailFunction(

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,17 +298,19 @@ public Task<bool> PostponeFunction(
298298
ReplicaId? expectedReplica,
299299
IReadOnlyList<StoredEffect>? effects,
300300
IReadOnlyList<StoredMessage>? messages,
301-
IStorageSession? storageSession)
301+
IStorageSession? storageSession,
302+
bool failIfInterrupted = true)
302303
{
303304
lock (_sync)
304305
{
305306
if (!_states.ContainsKey(storedId)) return false.ToTask();
306307

307308
var state = _states[storedId];
308309
if (state.Owner != expectedReplica) return false.ToTask();
310+
if (failIfInterrupted && state.Interrupted) return false.ToTask();
309311

310312
state.Status = Status.Postponed;
311-
state.Expires = state.Interrupted ? 0 : postponeUntil;
313+
state.Expires = postponeUntil;
312314
state.Interrupted = false;
313315
state.Timestamp = timestamp;
314316
state.Owner = null;
@@ -358,12 +360,13 @@ public Task<bool> SuspendFunction(
358360
var state = _states[storedId];
359361
if (state.Owner != expectedReplica)
360362
return false.ToTask();
363+
if (state.Interrupted)
364+
return false.ToTask();
361365

362-
state.Status = state.Interrupted ? Status.Postponed : Status.Suspended;
366+
state.Status = Status.Suspended;
363367
state.Expires = 0;
364368
state.Timestamp = timestamp;
365369
state.Owner = null;
366-
state.Interrupted = false;
367370

368371
return true.ToTask();
369372
}

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ public Task<bool> PostponeFunction(
118118
ReplicaId expectedReplica,
119119
IReadOnlyList<StoredEffect>? effects,
120120
IReadOnlyList<StoredMessage>? messages,
121-
IStorageSession? storageSession
121+
IStorageSession? storageSession,
122+
bool failIfInterrupted = true
122123
) => _crashed
123124
? Task.FromException<bool>(new TimeoutException())
124-
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession);
125+
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession, failIfInterrupted);
125126

126127
public Task<bool> FailFunction(
127128
StoredId storedId,

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,8 @@ public override Task EpochIsNotIncrementedOnSuspension()
116116
=> EpochIsNotIncrementedOnSuspension(FunctionStoreFactory.Create());
117117

118118
[TestMethod]
119-
public override Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction()
120-
=> SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction(FunctionStoreFactory.Create());
121-
122-
[TestMethod]
123-
public override Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch()
124-
=> FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch(FunctionStoreFactory.Create());
119+
public override Task SuspendingInterruptedFunctionReturnsFalse()
120+
=> SuspendingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());
125121

126122
[TestMethod]
127123
public override Task InterruptCountCanBeIncrementedForExecutingFunction()
@@ -176,16 +172,16 @@ public override Task MultipleFunctionsStatusCanBeFetched()
176172
=> MultipleFunctionsStatusCanBeFetched(FunctionStoreFactory.Create());
177173

178174
[TestMethod]
179-
public override Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted()
180-
=> InterruptedFunctionIsNotPostponedToZeroWhenInterrupted(FunctionStoreFactory.Create());
181-
175+
public override Task PostponingInterruptedFunctionReturnsFalse()
176+
=> PostponingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());
177+
182178
[TestMethod]
183179
public override Task InterruptNothingWorks()
184180
=> InterruptNothingWorks(FunctionStoreFactory.Create());
185181

186182
[TestMethod]
187-
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
188-
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
183+
public override Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag()
184+
=> PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag(FunctionStoreFactory.Create());
189185

190186
[TestMethod]
191187
public override Task FunctionCanBeCreatedWithMessagesAndEffects()

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,15 +509,16 @@ public async Task<bool> PostponeFunction(
509509
ReplicaId expectedReplica,
510510
IReadOnlyList<StoredEffect>? effects,
511511
IReadOnlyList<StoredMessage>? messages,
512-
IStorageSession? storageSession)
512+
IStorageSession? storageSession,
513+
bool failIfInterrupted = true)
513514
{
514515
byte[]? effectsBytes = null;
515516
if (storageSession is SnapshotStorageSession session && session.Effects.Count > 0)
516517
effectsBytes = session.Serialize();
517518

518519
await using var conn = await CreateOpenConnection(_connectionString);
519520
await using var command = _sqlGenerator
520-
.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effectsBytes)
521+
.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, failIfInterrupted, effectsBytes)
521522
.ToSqlCommand(conn);
522523

523524
var affectedRows = await command.ExecuteNonQueryAsync();

0 commit comments

Comments
 (0)