Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down Expand Up @@ -154,6 +155,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ await store.PostponeFunction(
Guid.Empty.ToReplicaId(),
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down
16 changes: 6 additions & 10 deletions Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,8 @@ public override Task EpochIsNotIncrementedOnSuspension()
=> EpochIsNotIncrementedOnSuspension(FunctionStoreFactory.Create());

[TestMethod]
public override Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction()
=> SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction(FunctionStoreFactory.Create());

[TestMethod]
public override Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch()
=> FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch(FunctionStoreFactory.Create());
public override Task SuspendingInterruptedFunctionReturnsFalse()
=> SuspendingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());

[TestMethod]
public override Task InterruptCountCanBeIncrementedForExecutingFunction()
Expand Down Expand Up @@ -187,16 +183,16 @@ public override Task MultipleFunctionsStatusCanBeFetched()
=> MultipleFunctionsStatusCanBeFetched(FunctionStoreFactory.Create());

[TestMethod]
public override Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted()
=> InterruptedFunctionIsNotPostponedToZeroWhenInterrupted(FunctionStoreFactory.Create());
public override Task PostponingInterruptedFunctionReturnsFalse()
=> PostponingInterruptedFunctionReturnsFalse(FunctionStoreFactory.Create());

[TestMethod]
public override Task InterruptNothingWorks()
=> InterruptNothingWorks(FunctionStoreFactory.Create());

[TestMethod]
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
public override Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag()
=> PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag(FunctionStoreFactory.Create());

[TestMethod]
public override Task FunctionCanBeCreatedWithMessagesAndEffects()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down
104 changes: 32 additions & 72 deletions Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down Expand Up @@ -281,6 +282,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down Expand Up @@ -318,6 +320,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.NewId(),
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeFalseAsync();

Expand Down Expand Up @@ -859,6 +862,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
);

Expand Down Expand Up @@ -927,14 +931,14 @@ await store.SuspendFunction(
storedFunction.ShouldNotBeNull();
}

public abstract Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction();
protected async Task SuspensionDoesNotSucceedOnExpectedMessagesCountMismatchButPostponesFunction(Task<IFunctionStore> storeTask)
public abstract Task SuspendingInterruptedFunctionReturnsFalse();
protected async Task SuspendingInterruptedFunctionReturnsFalse(Task<IFunctionStore> storeTask)
{
var functionId = TestStoredId.Create();

var store = await storeTask;
await store.CreateFunction(
functionId,
functionId,
"humanInstanceId",
param: Test.SimpleStoredParameter,
leaseExpiration: DateTime.UtcNow.Ticks,
Expand All @@ -944,67 +948,20 @@ await store.CreateFunction(
owner: ReplicaId.Empty
).ShouldNotBeNullAsync();

await store.MessageStore.AppendMessage(
functionId,
new StoredMessage("some message".ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
);
await store.Interrupt(functionId).ShouldBeTrueAsync();

await store.Interrupt(functionId);

await store.SuspendFunction(
functionId,
timestamp: DateTime.UtcNow.Ticks,
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
storageSession: null
).ShouldBeTrueAsync();

var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();
storedFunction.Status.ShouldBe(Status.Postponed);
storedFunction.Expires.ShouldBe(0);
}

public abstract Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch();
protected async Task FunctionIsStillExecutingOnSuspensionAndInterruptCountMismatch(Task<IFunctionStore> storeTask)
{
var functionId = TestStoredId.Create();

var store = await storeTask;
await store.CreateFunction(
functionId,
"humanInstanceId",
param: Test.SimpleStoredParameter,
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks,
parent: null,
owner: ReplicaId.Empty
).ShouldNotBeNullAsync();

await store.MessageStore.AppendMessage(
functionId,
new StoredMessage("hello world".ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Position: 0)
);
).ShouldBeFalseAsync();

await store.Interrupt(functionId);

var success = await store.SuspendFunction(
functionId,
timestamp: DateTime.UtcNow.Ticks,
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
storageSession: null
);

success.ShouldBeTrue();

var storedFunction = await store.GetFunction(functionId);
storedFunction.ShouldNotBeNull();
storedFunction.Status.ShouldBe(Status.Postponed);
storedFunction.Expires.ShouldBe(0);
var storedFunction = await store.GetFunction(functionId).ShouldNotBeNullAsync();
storedFunction.Status.ShouldBe(Status.Executing);
(await store.GetInterruptedFunctions()).Any(id => id == functionId).ShouldBeTrue();
}

public abstract Task InterruptCountCanBeIncrementedForExecutingFunction();
Expand Down Expand Up @@ -1411,8 +1368,8 @@ await store.SucceedFunction(
statusAndEpoch2.Status.ShouldBe(Status.Succeeded);
}

public abstract Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted();
protected async Task InterruptedFunctionIsNotPostponedToZeroWhenInterrupted(Task<IFunctionStore> storeTask)
public abstract Task PostponingInterruptedFunctionReturnsFalse();
protected async Task PostponingInterruptedFunctionReturnsFalse(Task<IFunctionStore> storeTask)
{
var storedId = TestStoredId.Create();
var store = await storeTask;
Expand All @@ -1430,20 +1387,20 @@ await store.CreateFunction(

await store.Interrupt([storedId]);

var success = await store.PostponeFunction(
await store.PostponeFunction(
storedId,
postponeUntil: 0,
timestamp: 100,
postponeUntil: DateTime.UtcNow.Ticks + 100_000,
timestamp: DateTime.UtcNow.Ticks,
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
);
success.ShouldBeTrue();
).ShouldBeFalseAsync();

var sf = await store.GetFunction(storedId).ShouldNotBeNullAsync();
sf.Status.ShouldBe(Status.Postponed);
sf.Expires.ShouldBe(0);
sf.Status.ShouldBe(Status.Executing);
(await store.GetInterruptedFunctions()).Any(id => id == storedId).ShouldBeTrue();
}

public abstract Task InterruptNothingWorks();
Expand All @@ -1452,9 +1409,9 @@ protected async Task InterruptNothingWorks(Task<IFunctionStore> storeTask)
var store = await storeTask;
await store.Interrupt(storedIds: []);
}
public abstract Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction();
protected async Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(Task<IFunctionStore> storeTask)

public abstract Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag();
protected async Task PostponingInterruptedFunctionWithFailIfInterruptedFalseSucceedsAndClearsFlag(Task<IFunctionStore> storeTask)
{
var storedId = TestStoredId.Create();
var store = await storeTask;
Expand All @@ -1472,19 +1429,21 @@ await store.CreateFunction(

await store.Interrupt([storedId]);

var success = await store.PostponeFunction(
await store.PostponeFunction(
storedId,
postponeUntil: 0,
timestamp: 0,
timestamp: DateTime.UtcNow.Ticks,
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: false,
storageSession: null
);
success.ShouldBeTrue();
).ShouldBeTrueAsync();

var sf = await store.GetFunction(storedId).ShouldNotBeNullAsync();
sf.Status.ShouldBe(Status.Postponed);
sf.Expires.ShouldBe(0);
(await store.GetInterruptedFunctions()).Any(id => id == storedId).ShouldBeFalse();
}

public abstract Task FunctionCanBeCreatedWithMessagesAndEffects();
Expand Down Expand Up @@ -1948,6 +1907,7 @@ await store.PostponeFunction(
expectedReplica: ReplicaId.Empty,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession: null
).ShouldBeTrueAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ public async Task<bool> PostponeFunction(
ReplicaId expectedReplica,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
bool failIfInterrupted,
IStorageSession? storageSession
)
{
if (_crashed)
throw new TimeoutException();

var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession);
var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, failIfInterrupted, storageSession);
AfterPostponeFunctionFlag.Raise();

return result;
}
}

public Task<bool> FailFunction(
StoredId storedId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ public async Task<PersistResultOutcome> PersistResult(StoredId storedId, Result<
_replicaId,
effects: null,
messages: null,
failIfInterrupted: true,
storageSession
) ? PersistResultOutcome.Success : PersistResultOutcome.Success;
) ? PersistResultOutcome.Success : PersistResultOutcome.Reschedule;
case Outcome.Fail:
return await _functionStore.FailFunction(
storedId,
Expand Down Expand Up @@ -508,6 +509,7 @@ public async Task<bool> Reschedule(StoredId id, TParam param)
_replicaId,
effects: null,
messages: null,
failIfInterrupted: false,
storageSession: null
);
}
Expand Down
1 change: 1 addition & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Task<bool> PostponeFunction(
ReplicaId expectedReplica,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
bool failIfInterrupted,
IStorageSession? storageSession
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public Task<bool> PostponeFunction(
ReplicaId? expectedReplica,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
bool failIfInterrupted,
IStorageSession? storageSession)
{
lock (_sync)
Expand All @@ -306,9 +307,10 @@ public Task<bool> PostponeFunction(

var state = _states[storedId];
if (state.Owner != expectedReplica) return false.ToTask();
if (failIfInterrupted && state.Interrupted) return false.ToTask();

state.Status = Status.Postponed;
state.Expires = state.Interrupted ? 0 : postponeUntil;
state.Expires = postponeUntil;
state.Interrupted = false;
state.Timestamp = timestamp;
state.Owner = null;
Expand Down Expand Up @@ -358,12 +360,13 @@ public Task<bool> SuspendFunction(
var state = _states[storedId];
if (state.Owner != expectedReplica)
return false.ToTask();
if (state.Interrupted)
return false.ToTask();

state.Status = state.Interrupted ? Status.Postponed : Status.Suspended;
state.Status = Status.Suspended;
state.Expires = 0;
state.Timestamp = timestamp;
state.Owner = null;
state.Interrupted = false;

return true.ToTask();
}
Expand Down
3 changes: 2 additions & 1 deletion Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ public Task<bool> PostponeFunction(
ReplicaId expectedReplica,
IReadOnlyList<StoredEffect>? effects,
IReadOnlyList<StoredMessage>? messages,
bool failIfInterrupted,
IStorageSession? storageSession
) => _crashed
? Task.FromException<bool>(new TimeoutException())
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, storageSession);
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, expectedReplica, effects, messages, failIfInterrupted, storageSession);

public Task<bool> FailFunction(
StoredId storedId,
Expand Down
Loading