Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ protected async Task QueueClientPullsFiveMessagesAndTimesOutOnSixth(Task<IFuncti

for (var i = 0; i < 6; i++)
{
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(250));
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(1_000));
messages.Add(message ?? "NULL");
}

Expand Down Expand Up @@ -560,7 +560,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
{

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowsManager = new FlowsManager(functionStore);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
Expand Down Expand Up @@ -623,7 +623,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
{
storedId = workflow.StoredId;
var minimumTimeout = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowsManager = new FlowsManager(functionStore);
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
var queueManager = new QueueManager(
workflow.FlowId,
Expand Down Expand Up @@ -684,7 +684,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
{

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowsManager = new FlowsManager(functionStore);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Core/Cleipnir.ResilientFunctions.Tests/TestSetup.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)]
[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)]
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);

effect.TryGet<int>("alias", out _).ShouldBeFalse();

Expand Down Expand Up @@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
storageSession: null,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);

// Verify the effect is immediately available (eager loading)
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
Expand Down Expand Up @@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId);

var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
result.ShouldBe("hello world");
Expand Down Expand Up @@ -1027,7 +1027,7 @@ await elms.CaptureEach(
storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue();
storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue();
storedEffects.Single(e => e.Alias == i.ToString()).EffectId.ShouldBe(new EffectId([1,i,0]));
storedEffects.Count.ShouldBe(4);
storedEffects.Count(e => e.Alias != null).ShouldBe(3);

await messageWriter.AppendMessage(i.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1]\n";
Expand Down Expand Up @@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1] my-effect\n";
Expand Down Expand Up @@ -111,7 +111,7 @@ public void PrintFailedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
Expand Down Expand Up @@ -143,7 +143,7 @@ public void PrintStartedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected = "└─ ⋯ [1] in-progress\n";
Expand Down Expand Up @@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId);
var output = effect.ExecutionTree();

var expected =
Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void SubflowWaiting()
WaitingSubflows++;
}

public bool ResumeSubflow()
public bool TryResumeSubflow()
{
lock (_lock)
if (Suspended)
Expand Down
46 changes: 5 additions & 41 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,33 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;

namespace Cleipnir.ResilientFunctions.CoreRuntime;

public class FlowTimeouts
{
private readonly Lock _lock = new();
private Dictionary<EffectId, Tuple<DateTime, TaskCompletionSource>> Timeouts { get; } = new();
private Dictionary<EffectId, DateTime> Timeouts { get; } = new();

public DateTime? MinimumTimeout
{
get
{
lock (_lock)
return GetMinimumTimeout();
return Timeouts.Values.Count != 0 ? Timeouts.Values.Min() : (DateTime?)null;
}
}

private DateTime? GetMinimumTimeout()
=> Timeouts.Values.Count != 0 ? Timeouts.Values.Min(t => t.Item1) : (DateTime?)null;

public async Task AddTimeout(EffectId effectId, DateTime timeout, TimeSpan? maxWait = null)
public void AddTimeout(EffectId effectId, DateTime timeout)
{
TaskCompletionSource tcs;
lock (_lock)
{
tcs = new TaskCompletionSource();
Timeouts[effectId] = Tuple.Create(timeout, tcs);
}

if (maxWait == null || timeout <= DateTime.UtcNow)
{
await tcs.Task;
return;
}

var completed = await Task.WhenAny(tcs.Task, Task.Delay(maxWait.Value));
if (completed != tcs.Task)
throw new SuspendInvocationException();
Timeouts[effectId] = timeout;
}

public void RemoveTimeout(EffectId effectId)
{
lock (_lock)
Timeouts.Remove(effectId);
}

public bool HasExpiredTimeouts(DateTime now)
{
lock (_lock)
return GetMinimumTimeout() <= now;
}

public void SignalExpiredTimeouts(DateTime now)
{
lock (_lock)
foreach (var (effectId, (timeout, tcs)) in Timeouts.ToList())
if (timeout <= now)
{
Timeouts.Remove(effectId);
Task.Run(() => tcs.TrySetResult());
}
}
}
}
31 changes: 3 additions & 28 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.CoreRuntime;

public class FlowsManager : IDisposable
public class FlowsManager
{
private readonly Dictionary<StoredId, FlowState> _dict = new();
private readonly Lock _lock = new();
private readonly IFunctionStore _functionStore;
private readonly UtcNow _utcNow;
private volatile bool _disposed;

public FlowsManager(IFunctionStore functionStore, UtcNow utcNow)
public FlowsManager(IFunctionStore functionStore)
{
_functionStore = functionStore;
_utcNow = utcNow;
_ = Task.Run(TimeoutCheckLoop);
}

private async Task TimeoutCheckLoop()
{
while (!_disposed)
{
var expiredStates = new List<FlowState>();
var now = _utcNow();
lock (_lock)
foreach (var (_, status) in _dict)
if (status.Timeouts.HasExpiredTimeouts(now))
expiredStates.Add(status);

foreach (var status in expiredStates)
status.Timeouts.SignalExpiredTimeouts(now);

await Task.Delay(10);
}
}

public void Dispose() => _disposed = true;

public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
{
lock (_lock)
Expand Down Expand Up @@ -77,4 +52,4 @@ public void CompleteThread(StoredId id)
if (_dict.TryGetValue(id, out var flowState))
flowState.SubflowCompleted();
}
}
}
31 changes: 17 additions & 14 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Queuing;
Expand All @@ -14,12 +15,12 @@ public class Workflow
internal StoredId StoredId { get; }
public Effect Effect { get; }

private QueueManager _queueManager;
private readonly QueueManager _queueManager;
private readonly UtcNow _utcNow;
private MessageWriter MessageWriter { get; }


public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter)
internal Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter)
{
FlowId = flowId;
StoredId = storedId;
Expand All @@ -32,23 +33,25 @@ public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager qu
public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias);
public Task Delay(DateTime until, bool suspend = true, string? alias = null)
{
var effectId = Effect.TakeNextImplicitId();
var timeoutId = EffectId.CreateWithCurrentContext(effectId);
var timeoutId = Effect.CreateNextImplicitId();

async Task Inner()
{
var expiry = await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false);

if (expiry == -1)
{
var expiry = (await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false))
.ToDateTime();

var now = _utcNow();
if (now > expiry)
return;
}

var maxWait = suspend ? TimeSpan.Zero : (TimeSpan?)null;
await Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime(), maxWait);

await Effect.Upsert(timeoutId, -1L, alias, flush: false);
Effect.FlowTimeouts.RemoveTimeout(timeoutId);
Effect.FlowTimeouts.AddTimeout(timeoutId, expiry);
if (suspend)
throw new SuspendInvocationException();

//do in-memory wait
var delay = expiry - now;
await Task.Delay(delay);
Effect.FlowTimeouts.RemoveTimeout(timeoutId);
}

return Inner();
Expand Down
18 changes: 17 additions & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/Effect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@ public enum ResiliencyLevel
AtLeastOnceDelayFlush
}

public class Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
public class Effect
{
private readonly EffectResults effectResults;
private readonly UtcNow utcNow;
private readonly FlowTimeouts flowTimeouts;
private readonly FlowsManager flowsManager;
private readonly StoredId storedId;

internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
{
this.effectResults = effectResults;
this.utcNow = utcNow;
this.flowTimeouts = flowTimeouts;
this.flowsManager = flowsManager;
this.storedId = storedId;
}


internal bool Contains(int id) => Contains(CreateEffectId(id));
internal bool Contains(EffectId effectId) => effectResults.Contains(effectId);

Expand Down
Loading