Skip to content

Commit b784181

Browse files
committed
Short-circuit PrepareForInvocation when flow already persisted
When PersistFunctionInStore returns persisted=false the caller only needs the flag to take the early-return branch, so skip constructing effect, correlations, semaphores, flow state, queue manager and workflow. Also completes the CreateFlowState+AddFlow -> CreateFlow migration across the remaining Invoker call sites and the Messaging test templates.
1 parent 074891e commit b784181

4 files changed

Lines changed: 30 additions & 39 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
561561

562562
var flowTimeouts = new FlowTimeouts();
563563
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
564-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
564+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
565565
var queueManager = new QueueManager(
566566
workflow.FlowId,
567567
workflow.StoredId,
@@ -575,7 +575,6 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
575575
SettingsWithDefaults.Default
576576
);
577577

578-
flowsManager.AddFlow(flowState);
579578
var queueClient = await queueManager.CreateQueueClient();
580579

581580
var message = await queueClient.Pull<GoodMessage>(
@@ -625,7 +624,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
625624
storedId = workflow.StoredId;
626625
var minimumTimeout = new FlowTimeouts();
627626
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
628-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, minimumTimeout);
627+
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
629628
var queueManager = new QueueManager(
630629
workflow.FlowId,
631630
workflow.StoredId,
@@ -639,7 +638,6 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
639638
SettingsWithDefaults.Default
640639
);
641640

642-
flowsManager.AddFlow(flowState);
643641

644642
var queueClient = await queueManager.CreateQueueClient();
645643

@@ -687,7 +685,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
687685

688686
var flowTimeouts = new FlowTimeouts();
689687
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
690-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
688+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
691689
var queueManager = new QueueManager(
692690
workflow.FlowId,
693691
workflow.StoredId,
@@ -701,7 +699,6 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
701699
SettingsWithDefaults.Default
702700
);
703701

704-
flowsManager.AddFlow(flowState);
705702
var queueClient = await queueManager.CreateQueueClient();
706703

707704
// Pull envelope for specific receiver

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
3636

3737
var flowTimeouts = new FlowTimeouts();
3838
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
39-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
39+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
4040
var queueManager = new QueueManager(
4141
workflow.FlowId,
4242
workflow.StoredId,
@@ -50,7 +50,6 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
5050
SettingsWithDefaults.Default
5151
);
5252

53-
flowsManager.AddFlow(flowState);
5453
queueClient = await queueManager.CreateQueueClient();
5554
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
5655

@@ -89,7 +88,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
8988

9089
var flowTimeouts = new FlowTimeouts();
9190
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
92-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
91+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
9392
var queueManager = new QueueManager(
9493
workflow.FlowId,
9594
workflow.StoredId,
@@ -103,7 +102,6 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
103102
SettingsWithDefaults.Default
104103
);
105104

106-
flowsManager.AddFlow(flowState);
107105
var queueClient = await queueManager.CreateQueueClient();
108106
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
109107

@@ -137,7 +135,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
137135

138136
var flowTimeouts = new FlowTimeouts();
139137
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
140-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
138+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
141139
var queueManager = new QueueManager(
142140
workflow.FlowId,
143141
workflow.StoredId,
@@ -151,7 +149,6 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
151149
SettingsWithDefaults.Default
152150
);
153151

154-
flowsManager.AddFlow(flowState);
155152
var queueClient = await queueManager.CreateQueueClient();
156153
var message = await queueClient.Pull<object>(
157154
workflow,
@@ -190,7 +187,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
190187

191188
var flowTimeouts = new FlowTimeouts();
192189
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
193-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
190+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
194191
var queueManager = new QueueManager(
195192
workflow.FlowId,
196193
workflow.StoredId,
@@ -204,7 +201,6 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
204201
SettingsWithDefaults.Default
205202
);
206203

207-
flowsManager.AddFlow(flowState);
208204
var queueClient = await queueManager.CreateQueueClient();
209205
var message = await queueClient.Pull<object>(
210206
workflow,
@@ -244,7 +240,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
244240

245241
var flowTimeouts = new FlowTimeouts();
246242
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
247-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
243+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
248244
var queueManager = new QueueManager(
249245
workflow.FlowId,
250246
workflow.StoredId,
@@ -258,7 +254,6 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
258254
SettingsWithDefaults.Default
259255
);
260256

261-
flowsManager.AddFlow(flowState);
262257
var queueClient = await queueManager.CreateQueueClient();
263258
var message = await queueClient.Pull<string>(
264259
workflow,
@@ -299,7 +294,7 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
299294

300295
var flowTimeouts = new FlowTimeouts();
301296
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
302-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
297+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
303298
var queueManager = new QueueManager(
304299
workflow.FlowId,
305300
workflow.StoredId,
@@ -313,7 +308,6 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
313308
SettingsWithDefaults.Default
314309
);
315310

316-
flowsManager.AddFlow(flowState);
317311
var queueClient = await queueManager.CreateQueueClient();
318312
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
319313
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -356,7 +350,7 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
356350

357351
var flowTimeouts = new FlowTimeouts();
358352
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
359-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
353+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
360354
var queueManager = new QueueManager(
361355
workflow.FlowId,
362356
workflow.StoredId,
@@ -370,7 +364,6 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
370364
SettingsWithDefaults.Default
371365
);
372366

373-
flowsManager.AddFlow(flowState);
374367
var queueClient = await queueManager.CreateQueueClient();
375368

376369
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -411,7 +404,7 @@ async Task (workflow) =>
411404

412405
var flowTimeouts = new FlowTimeouts();
413406
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
414-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
407+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
415408
var queueManager = new QueueManager(
416409
workflow.FlowId,
417410
workflow.StoredId,
@@ -425,7 +418,6 @@ async Task (workflow) =>
425418
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) }
426419
);
427420

428-
flowsManager.AddFlow(flowState);
429421
var queueClient = await queueManager.CreateQueueClient();
430422
lock (queueClients)
431423
queueClients[workflow.FlowId.Instance.Value] = queueClient;
@@ -478,7 +470,7 @@ async Task (workflow) =>
478470

479471
var flowTimeouts = new FlowTimeouts();
480472
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
481-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
473+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
482474
var queueManager = new QueueManager(
483475
workflow.FlowId,
484476
workflow.StoredId,
@@ -492,7 +484,6 @@ async Task (workflow) =>
492484
SettingsWithDefaults.Default
493485
);
494486

495-
flowsManager.AddFlow(flowState);
496487
var queueClient = await queueManager.CreateQueueClient();
497488

498489
while (true)
@@ -547,7 +538,7 @@ async Task (workflow) =>
547538

548539
var flowTimeouts = new FlowTimeouts();
549540
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
550-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
541+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
551542
var queueManager = new QueueManager(
552543
workflow.FlowId,
553544
workflow.StoredId,
@@ -561,7 +552,6 @@ async Task (workflow) =>
561552
SettingsWithDefaults.Default
562553
);
563554

564-
flowsManager.AddFlow(flowState);
565555
var queueClient = await queueManager.CreateQueueClient();
566556

567557
for (var i = 0; i < 10; i++)
@@ -578,7 +568,7 @@ async Task (workflow) =>
578568

579569
var flowTimeouts = new FlowTimeouts();
580570
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
581-
var flowState = flowsManager.CreateFlowState(workflow.StoredId, flowTimeouts);
571+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
582572
var queueManager = new QueueManager(
583573
workflow.FlowId,
584574
workflow.StoredId,
@@ -592,7 +582,6 @@ async Task (workflow) =>
592582
SettingsWithDefaults.Default
593583
);
594584

595-
flowsManager.AddFlow(flowState);
596585
var queueClient = await queueManager.CreateQueueClient();
597586

598587
for (var i = 0; i < 10; i++)

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,10 @@ private async Task TimeoutCheckLoop()
4343

4444
public void Dispose() => _disposed = true;
4545

46-
public FlowState CreateFlowState(StoredId id, FlowTimeouts timeouts)
47-
=> new(id, subflows: 1, waitingSubflows: 0, timeouts);
48-
49-
public void AddFlow(FlowState flowState)
46+
public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts)
5047
{
5148
lock (_lock)
52-
_dict[flowState.Id] = flowState;
49+
return _dict[id] = new FlowState(id, subflows: 1, waitingSubflows: 0, timeouts);;
5350
}
5451

5552
public void RemoveFlow(StoredId id, FlowState flowState)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
5757
var (created, workflow, disposables, queueManager, flowState, timeouts, storageSession) = await PrepareForInvocation(flowId, storedId, param, parentWorkflow?.StoredId, initialState);
5858
await (parentWorkflow?.Effect.Upsert(scheduledAlreadyParentId!, true, alias: null, flush: false) ?? Task.CompletedTask);
5959

60-
CurrentFlow._workflow.Value = workflow;
6160
if (!created)
6261
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);
6362

63+
CurrentFlow._workflow.Value = workflow;
64+
6465
var tcs = new TaskCompletionSource<TReturn>();
65-
_flowsManager.AddFlow(flowState);
6666
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
6767
_ = Task.Run(async () =>
6868
{
@@ -158,7 +158,6 @@ public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
158158
var flowId = new FlowId(_flowType, humanInstanceId);
159159

160160
var tcs = new TaskCompletionSource<TReturn>();
161-
_flowsManager.AddFlow(flowState);
162161
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
163162
_ = Task.Run(async () =>
164163
{
@@ -213,7 +212,6 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act
213212
var flowId = new FlowId(_flowType, humanInstanceId);
214213

215214
var tcs = new TaskCompletionSource<TReturn>();
216-
_flowsManager.AddFlow(flowState);
217215
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
218216
_ = Task.Run(async () =>
219217
{
@@ -264,6 +262,16 @@ await _invocationHelper.PersistFunctionInStore(
264262
disposables.Add(isWorkflowRunningDisposable);
265263
success = persisted;
266264

265+
if (!persisted)
266+
return new PreparedInvocation(
267+
Persisted: false,
268+
Workflow: null!,
269+
Disposable.Combine(disposables),
270+
QueueManager: null!,
271+
FlowState: null!,
272+
FlowTimeouts: null!
273+
);
274+
267275
var flowTimeouts = new FlowTimeouts();
268276

269277
var effect = _invocationHelper.CreateEffect(
@@ -278,7 +286,7 @@ await _invocationHelper.PersistFunctionInStore(
278286
var correlations = _invocationHelper.CreateCorrelations(flowId);
279287
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager);
280288

281-
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts);
289+
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
282290
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
283291
disposables.Add(queueManager);
284292
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
@@ -331,7 +339,7 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
331339

332340
var correlations = _invocationHelper.CreateCorrelations(flowId);
333341
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager);
334-
var flowState = _flowsManager.CreateFlowState(storedId, flowTimeouts);
342+
var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts);
335343
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler);
336344
disposables.Add(queueManager);
337345
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);

0 commit comments

Comments
 (0)