Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager();
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts, completed: ForeverTask.Instance);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
Expand Down Expand Up @@ -624,7 +624,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
storedId = workflow.StoredId;
var minimumTimeout = new FlowTimeouts();
var flowsManager = new FlowsManager();
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
var flowState = flowsManager.CreateFlowState(workflow.StoredId, minimumTimeout, completed: ForeverTask.Instance);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
Expand Down Expand Up @@ -685,7 +685,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager();
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts, completed: ForeverTask.Instance);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));

effect.TryGet<int>("alias", out _).ShouldBeFalse();

Expand Down Expand Up @@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
storageSession: null,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));

// Verify the effect is immediately available (eager loading)
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
Expand Down Expand Up @@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));

var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
result.ShouldBe("hello world");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void PrintSingleCompletedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1]\n";
Expand Down Expand Up @@ -75,7 +76,7 @@ public void PrintEffectWithAlias()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1] my-effect\n";
Expand Down Expand Up @@ -111,7 +112,7 @@ public void PrintFailedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
Expand Down Expand Up @@ -143,7 +144,7 @@ public void PrintStartedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected = "└─ ⋯ [1] in-progress\n";
Expand Down Expand Up @@ -189,7 +190,7 @@ public void PrintEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -245,7 +246,7 @@ public void PrintDeepEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -295,7 +296,7 @@ public void PrintMultipleRootEffects()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -330,7 +331,7 @@ public void PrintComplexEffectTree()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -365,7 +366,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -398,7 +399,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts(), completed: ForeverTask.Instance));
var output = effect.ExecutionTree();

var expected =
Expand Down
48 changes: 38 additions & 10 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Queuing;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.CoreRuntime;

public enum FlowStatus
{
Running = 0,
Waiting = 1,
Completed = 2
}

public class FlowState
{
private readonly Lock _lock = new();
Expand All @@ -18,17 +26,42 @@ public class FlowState
public bool Suspended { get; private set; }
public Task SuspendedTask { get; }

private FlowStatus _status = FlowStatus.Running;
public FlowStatus Status
{
get
{
lock (_lock)
return _status;
}
set
{
lock (_lock)
if (_status != FlowStatus.Completed)
_status = value;
}
}

public FlowState(
StoredId id,
int subflows,
int waitingSubflows,
FlowTimeouts timeouts)
FlowTimeouts timeouts,
Task completed)
{
Id = id;
Subflows = subflows;
WaitingSubflows = waitingSubflows;
Timeouts = timeouts;
SuspendedTask = _suspendedTcs.Task;

_ = completed.ContinueWith(_ => Status = FlowStatus.Completed);
}

public bool Waiting()
{
lock (_lock)
return Subflows == WaitingSubflows;
}

public void SubflowStarted()
Expand All @@ -49,25 +82,20 @@ public void SubflowWaiting()
WaitingSubflows++;
}

public bool TryResumeSubflow()
public Task ResumeSubflow()
{
lock (_lock)
if (Suspended)
return false;
return ForeverTask.Instance;
else
WaitingSubflows--;

return true;
return Task.CompletedTask;
}

public void Interrupt()
{
lock (_lock)
if (Suspended)
return;
else
WaitingSubflows = 0;

if (Suspended) return;
QueueManager?.Interrupt();
}

Expand Down
23 changes: 21 additions & 2 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public class FlowsManager
private readonly Dictionary<StoredId, FlowState> _dict = new();
private readonly Lock _lock = new();

public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts, Task completed)
{
lock (_lock)
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts, completed);
}

public void RemoveFlow(StoredId id, FlowState flowState)
Expand All @@ -38,4 +38,23 @@ public void Interrupt(IReadOnlyList<StoredId> ids)
flowState.Interrupt();
}

/*
public async Task CheckForSuspension()
{
while (true)
{
var waitingFlows = new List<FlowState>();
lock (_dict)
{
waitingFlows = _dict.Values.Where(s => s.)
foreach (var flowState in _dict.Values)
{
flowState.
}
}

await Task.Delay(250);
}
}*/

}
24 changes: 12 additions & 12 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
if (parentWorkflow.Effect.Contains(scheduledAlreadyParentId!))
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);

var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState);
var tcs = new TaskCompletionSource<TReturn>();
var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState, tcs.Task);
await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask);

if (!created)
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);

CurrentFlow._workflow.Value = workflow;

var tcs = new TaskCompletionSource<TReturn>();
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -151,10 +151,10 @@ public Task<InnerScheduled<TReturn>> BulkSchedule(IEnumerable<BulkWork<TParam>>

public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
{
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId);
var tcs = new TaskCompletionSource<TReturn>();
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, tcs.Task);
var flowId = new FlowId(_flowType, humanInstanceId);

var tcs = new TaskCompletionSource<TReturn>();
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -205,10 +205,10 @@ public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)

internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Action onCompletion)
{
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf);
var tcs = new TaskCompletionSource<TReturn>();
var (inner, param, humanInstanceId, workflow, disposables, queueManager, flowState, timeouts, parent, storageSession) = await PrepareForReInvocation(storedId, rf, tcs.Task);
var flowId = new FlowId(_flowType, humanInstanceId);

var tcs = new TaskCompletionSource<TReturn>();
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -237,7 +237,7 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act
});
}

private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState)
private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState, Task completed)
{
var disposables = new List<IDisposable>(capacity: 3);
var success = false;
Expand Down Expand Up @@ -270,7 +270,7 @@ await _invocationHelper.PersistFunctionInStore(
);

var flowTimeouts = new FlowTimeouts();
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed);

var effect = _invocationHelper.CreateEffect(
storedId,
Expand Down Expand Up @@ -307,16 +307,16 @@ await _invocationHelper.PersistFunctionInStore(
}
private record PreparedInvocation(bool Persisted, Workflow Workflow, IDisposable Disposables, QueueManager QueueManager, FlowState FlowState, FlowTimeouts FlowTimeouts, IStorageSession? StorageSession = null);

private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId)
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, Task completed)
{
var restartedFunction = await _invocationHelper.RestartFunction(storedId);
if (restartedFunction == null)
throw UnexpectedStateException.ConcurrentModification(storedId);

return await PrepareForReInvocation(storedId, restartedFunction);
return await PrepareForReInvocation(storedId, restartedFunction, completed);
}

private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction)
private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedId, RestartedFunction restartedFunction, Task completed)
{
var disposables = new List<IDisposable>(capacity: 3);
try
Expand All @@ -328,7 +328,7 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
disposables.Add(isWorkflowRunningDisposable);

var flowTimeouts = new FlowTimeouts();
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts, completed);

var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, flowState);

Expand Down
3 changes: 1 addition & 2 deletions Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ internal void Interrupt()
}
finally
{
if (!_flowState.TryResumeSubflow())
await ForeverTask.Instance;
await _flowState.ResumeSubflow();

await delayCts.CancelAsync();
delayCts.Dispose();
Expand Down