Skip to content

Commit 2248697

Browse files
committed
Effect: hold FlowState directly instead of FlowsManager + StoredId
RunParallelle now calls flowState.SubflowStarted/Completed directly, skipping the per-call dictionary lookup and lock in FlowsManager. FlowsManager.StartThread/CompleteThread are removed as unused. Reordered Invoker.PrepareFor(Re)Invocation so CreateFlow runs before CreateEffect, since Effect now requires the FlowState at construction.
1 parent b34b72b commit 2248697

6 files changed

Lines changed: 24 additions & 39 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
4343
clearChildren: true
4444
);
4545

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
46+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
4747
var output = effect.ExecutionTree();
4848

4949
var expected = "└─ ✓ [1]\n";
@@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
7575
clearChildren: true
7676
);
7777

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
78+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
7979
var output = effect.ExecutionTree();
8080

8181
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +111,7 @@ public void PrintFailedEffect()
111111
clearChildren: true
112112
);
113113

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
114+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
115115
var output = effect.ExecutionTree();
116116

117117
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +143,7 @@ public void PrintStartedEffect()
143143
clearChildren: true
144144
);
145145

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
146+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
147147
var output = effect.ExecutionTree();
148148

149149
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
189189
clearChildren: true
190190
);
191191

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
192+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
193193
var output = effect.ExecutionTree();
194194

195195
var expected =
@@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
245245
clearChildren: true
246246
);
247247

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
248+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
249249
var output = effect.ExecutionTree();
250250

251251
var expected =
@@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
295295
clearChildren: true
296296
);
297297

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
298+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
299299
var output = effect.ExecutionTree();
300300

301301
var expected =
@@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
330330
clearChildren: true
331331
);
332332

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
333+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
334334
var output = effect.ExecutionTree();
335335

336336
var expected =
@@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365365
clearChildren: false
366366
);
367367

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
368+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
369369
var output = effect.ExecutionTree();
370370

371371
var expected =
@@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398398
clearChildren: false
399399
);
400400

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(), storedId);
401+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowState(storedId, subflows: 1, waitingSubflows: 0, new FlowTimeouts()));
402402
var output = effect.ExecutionTree();
403403

404404
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,4 @@ public void Interrupt(IReadOnlyList<StoredId> ids)
3838
flowState.Interrupt();
3939
}
4040

41-
public void StartThread(StoredId id)
42-
{
43-
lock (_lock)
44-
if (_dict.TryGetValue(id, out var flowState))
45-
flowState.SubflowStarted();
46-
}
47-
48-
public void CompleteThread(StoredId id)
49-
{
50-
lock (_lock)
51-
if (_dict.TryGetValue(id, out var flowState))
52-
flowState.SubflowCompleted();
53-
}
5441
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ await _functionStore.BulkScheduleFunctions(
385385
public MessageWriter CreateMessageWriter(StoredId storedId)
386386
=> new MessageWriter(storedId, _functionStore.MessageStore, Serializer);
387387

388-
public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession, FlowsManager flowsManager)
388+
public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession, FlowState flowState)
389389
{
390390
var effectsStore = _functionStore.EffectsStore;
391391

@@ -399,7 +399,7 @@ public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<Store
399399
_clearChildren
400400
);
401401

402-
var effect = new Effect(effectResults, UtcNow, flowTimeouts, flowsManager, storedId);
402+
var effect = new Effect(effectResults, UtcNow, flowTimeouts, flowState);
403403
return effect;
404404
}
405405

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,17 @@ await _invocationHelper.PersistFunctionInStore(
270270
);
271271

272272
var flowTimeouts = new FlowTimeouts();
273+
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
273274

274275
var effect = _invocationHelper.CreateEffect(
275276
storedId,
276277
flowId,
277278
initialState == null ? [] : _invocationHelper.MapInitialEffects(initialState.Effects, flowId),
278279
flowTimeouts,
279280
storageSession,
280-
_flowsManager
281+
flowState
281282
);
282283

283-
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
284284
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
285285
disposables.Add(queueManager);
286286
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
@@ -328,10 +328,10 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
328328
disposables.Add(isWorkflowRunningDisposable);
329329

330330
var flowTimeouts = new FlowTimeouts();
331+
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
331332

332-
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager);
333+
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, flowState);
333334

334-
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
335335
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
336336
disposables.Add(queueManager);
337337
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@ public class Effect
2121
private readonly EffectResults effectResults;
2222
private readonly UtcNow utcNow;
2323
private readonly FlowTimeouts flowTimeouts;
24-
private readonly FlowsManager flowsManager;
25-
private readonly StoredId storedId;
24+
private readonly FlowState flowState;
2625

27-
internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId)
26+
internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowState flowState)
2827
{
2928
this.effectResults = effectResults;
3029
this.utcNow = utcNow;
3130
this.flowTimeouts = flowTimeouts;
32-
this.flowsManager = flowsManager;
33-
this.storedId = storedId;
31+
this.flowState = flowState;
3432
}
3533

3634

@@ -297,11 +295,11 @@ public async Task<TSeed> AggregateEach<T, TSeed>(
297295

298296
public Task<T> RunParallelle<T>(Func<Task<T>> work)
299297
{
300-
flowsManager.StartThread(storedId);
298+
flowState.SubflowStarted();
301299
var task = Capture(() => Task.Run(work));
302300
return task.ContinueWith(t =>
303301
{
304-
flowsManager.CompleteThread(storedId);
302+
flowState.SubflowCompleted();
305303
return t.GetAwaiter().GetResult();
306304
});
307305
}

0 commit comments

Comments
 (0)