Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));

effect.TryGet<int>("alias", out _).ShouldBeFalse();

Expand Down Expand Up @@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
storageSession: null,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));

// Verify the effect is immediately available (eager loading)
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
Expand Down Expand Up @@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
session,
clearChildren: true
);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));

var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
result.ShouldBe("hello world");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1]\n";
Expand Down Expand Up @@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected = "└─ ✓ [1] my-effect\n";
Expand Down Expand Up @@ -111,7 +111,7 @@ public void PrintFailedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
Expand Down Expand Up @@ -143,7 +143,7 @@ public void PrintStartedEffect()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected = "└─ ⋯ [1] in-progress\n";
Expand Down Expand Up @@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
clearChildren: true
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down Expand Up @@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
clearChildren: false
);

var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
var output = effect.ExecutionTree();

var expected =
Expand Down
13 changes: 0 additions & 13 deletions Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,4 @@ public void Interrupt(IReadOnlyList<StoredId> ids)
flowState.Interrupt();
}

public void StartThread(StoredId id)
{
lock (_lock)
if (_dict.TryGetValue(id, out var flowState))
flowState.SubflowStarted();
}

public void CompleteThread(StoredId id)
{
lock (_lock)
if (_dict.TryGetValue(id, out var flowState))
flowState.SubflowCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ await _functionStore.BulkScheduleFunctions(
public MessageWriter CreateMessageWriter(StoredId storedId)
=> new MessageWriter(storedId, _functionStore.MessageStore, Serializer);

public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession, FlowsManager flowsManager)
public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession, FlowState flowState)
{
var effectsStore = _functionStore.EffectsStore;

Expand All @@ -399,7 +399,7 @@ public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<Store
_clearChildren
);

var effect = new Effect(effectResults, UtcNow, flowTimeouts, flowsManager, storedId);
var effect = new Effect(effectResults, UtcNow, flowTimeouts, flowState);
return effect;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,17 @@ await _invocationHelper.PersistFunctionInStore(
);

var flowTimeouts = new FlowTimeouts();
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);

var effect = _invocationHelper.CreateEffect(
storedId,
flowId,
initialState == null ? [] : _invocationHelper.MapInitialEffects(initialState.Effects, flowId),
flowTimeouts,
storageSession,
_flowsManager
flowState
);

var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
disposables.Add(queueManager);
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
Expand Down Expand Up @@ -328,10 +328,10 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
disposables.Add(isWorkflowRunningDisposable);

var flowTimeouts = new FlowTimeouts();
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);

var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager);
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, flowState);

var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
disposables.Add(queueManager);
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
Expand Down
12 changes: 5 additions & 7 deletions Core/Cleipnir.ResilientFunctions/Domain/Effect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ public class Effect
private readonly EffectResults effectResults;
private readonly UtcNow utcNow;
private readonly FlowTimeouts flowTimeouts;
private readonly FlowsManager flowsManager;
private readonly StoredId storedId;
private readonly FlowState flowState;

internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowState flowState)
{
this.effectResults = effectResults;
this.utcNow = utcNow;
this.flowTimeouts = flowTimeouts;
this.flowsManager = flowsManager;
this.storedId = storedId;
this.flowState = flowState;
}


Expand Down Expand Up @@ -297,11 +295,11 @@ public async Task<TSeed> AggregateEach<T, TSeed>(

public Task<T> RunParallelle<T>(Func<Task<T>> work)
{
flowsManager.StartThread(storedId);
flowState.SubflowStarted();
var task = Capture(() => Task.Run(work));
return task.ContinueWith(t =>
{
flowsManager.CompleteThread(storedId);
flowState.SubflowCompleted();
return t.GetAwaiter().GetResult();
});
}
Expand Down