diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs index a5eb43b1e..cee6f3163 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs @@ -196,7 +196,7 @@ protected async Task QueueClientPullsFiveMessagesAndTimesOutOnSixth(Task(TimeSpan.FromMilliseconds(250)); + var message = await workflow.Message(TimeSpan.FromMilliseconds(1_000)); messages.Add(message ?? "NULL"); } @@ -560,7 +560,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task DateTime.UtcNow); + var flowsManager = new FlowsManager(functionStore); var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, @@ -623,7 +623,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task DateTime.UtcNow); + var flowsManager = new FlowsManager(functionStore); var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout); var queueManager = new QueueManager( workflow.FlowId, @@ -684,7 +684,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task DateTime.UtcNow); + var flowsManager = new FlowsManager(functionStore); var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); var queueManager = new QueueManager( workflow.FlowId, diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs index 7ba8735ce..8896c754b 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs @@ -1,14 +1,11 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; +using Cleipnir.ResilientFunctions.Domain.Exceptions; using Cleipnir.ResilientFunctions.Messaging; -using Cleipnir.ResilientFunctions.Queuing; using Cleipnir.ResilientFunctions.Storage; using Cleipnir.ResilientFunctions.Tests.Utils; using Shouldly; @@ -17,52 +14,35 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates; public abstract class MessagesTests { + private static Settings CreateSettings( + Action unhandledExceptionHandler, + TimeSpan? messagesPullFrequency = null) => + new( + unhandledExceptionHandler, + watchdogCheckFrequency: TimeSpan.FromMilliseconds(100), + messagesPullFrequency: messagesPullFrequency ?? TimeSpan.FromMilliseconds(100) + ); + public abstract Task MessagesSunshineScenario(); protected async Task MessagesSunshineScenario(Task functionStoreTask) { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); - QueueClient? queueClient = null; var rFunc = functionsRegistry.RegisterFunc( nameof(MessagesSunshineScenario), - inner: async Task (string _, Workflow workflow) => - { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - queueClient = await queueManager.CreateQueueClient(); - var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); - - return message; - } + inner: async Task (string _, Workflow workflow) + => await workflow.Message() ); var scheduled = await rFunc.Schedule("instanceId", ""); var messageWriter = rFunc.MessageWriters.For("instanceId".ToFlowInstance()); await messageWriter.AppendMessage("hello world"); - await BusyWait.Until(() => queueClient is not null); - await queueClient!.FetchMessages(); // Immediately fetch the message var result = await scheduled.Completion(timeout: TimeSpan.FromSeconds(5)); result.ShouldBe("hello world"); @@ -75,38 +55,15 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task fun { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); var rFunc = functionsRegistry.RegisterFunc( nameof(QueueClientReturnsNullAfterTimeout), - inner: async Task (string _, Workflow workflow) => - { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100)); - - return message; - } + inner: async Task (string _, Workflow workflow) + => await workflow.Message(TimeSpan.FromMilliseconds(100)) ); var scheduled = await rFunc.Schedule("instanceId", ""); @@ -122,41 +79,19 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); var rFunc = functionsRegistry.RegisterFunc( nameof(MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout), inner: async Task (string _, Workflow workflow) => { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - var message = await queueClient.Pull( - workflow, - workflow.Effect.CreateNextImplicitId(), - TimeSpan.Zero, - filter: m => m is string or int + var message = await workflow.Message( + filter: m => m is string or int, + waitFor: TimeSpan.Zero ); - return message == null ? "NONE" : message.ToString()!; } ); @@ -174,41 +109,17 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); var rFunc = functionsRegistry.RegisterFunc( nameof(MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst), inner: async Task (string _, Workflow workflow) => { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - var message = await queueClient.Pull( - workflow, - workflow.Effect.CreateNextImplicitId(), - filter: m => m is string or int - ); - - return message!.ToString()!; + var message = await workflow.Message(filter: m => m is string or int); + return message.ToString()!; } ); @@ -227,41 +138,15 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); var rFunc = functionsRegistry.RegisterFunc( nameof(MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond), - inner: async Task (string _, Workflow workflow) => - { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - var message = await queueClient.Pull( - workflow, - workflow.Effect.CreateNextImplicitId() - ); - - return message; - } + inner: async Task (string _, Workflow workflow) + => await workflow.Message() ); var scheduled = await rFunc.Schedule("instanceId", ""); @@ -279,39 +164,17 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task> (string _, Workflow workflow) => { - storedId = workflow.StoredId; - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); - var message2 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); - + var message1 = await workflow.Message(); + var message2 = await workflow.Message(); return Tuple.Create(message1, message2); } ); @@ -335,40 +198,17 @@ protected async Task QueueClientCanPullMultipleMessages(Task fun { var functionStore = await functionStoreTask; var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - var unhandledExceptionHandler = new UnhandledExceptionHandler(unhandledExceptionCatcher.Catch); using var functionsRegistry = new FunctionsRegistry( functionStore, - new Settings(unhandledExceptionCatcher.Catch) + CreateSettings(unhandledExceptionCatcher.Catch) ); - StoredId? storedId = null; var rFunc = functionsRegistry.RegisterFunc( nameof(QueueClientCanPullMultipleMessages), inner: async Task (string _, Workflow workflow) => { - storedId = workflow.StoredId; - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - - var message1 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); - var message2 = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); - + var message1 = await workflow.Message(); + var message2 = await workflow.Message(); return $"{message1},{message2}"; } ); @@ -393,48 +233,19 @@ protected async Task BatchedMessagesIsDeliveredToAwaitingFlows(Task(); var registration = registry.RegisterParamless( flowType, async Task (workflow) => { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) } - ); - - var queueClient = await queueManager.CreateQueueClient(); - lock (queueClients) - queueClients[workflow.FlowId.Instance.Value] = queueClient; - - await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); + await workflow.Message(); } ); await registration.Schedule("Instance#1"); await registration.Schedule("Instance#2"); - await BusyWait.Until(() => - { - lock (queueClients) - return queueClients.ContainsKey("Instance#1") && queueClients.ContainsKey("Instance#2"); - }); - await registration.SendMessages( [ new BatchedMessage("Instance#1", "hallo world", IdempotencyKey: "1"), @@ -442,15 +253,11 @@ await registration.SendMessages( ] ); - // Immediately fetch messages for both workflows - await queueClients["Instance#1"].FetchMessages(); - await queueClients["Instance#2"].FetchMessages(); - var controlPanel1 = await registration.ControlPanel("Instance#1").ShouldNotBeNullAsync(); var controlPanel2 = await registration.ControlPanel("Instance#2").ShouldNotBeNullAsync(); - await controlPanel1.WaitForCompletion(maxWait: TimeSpan.FromSeconds(2)); - await controlPanel2.WaitForCompletion(maxWait: TimeSpan.FromSeconds(2)); + await controlPanel1.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(2)); + await controlPanel2.WaitForCompletion(allowPostponeAndSuspended: true, maxWait: TimeSpan.FromSeconds(2)); unhandledExceptionCatcher.ShouldNotHaveExceptions(); } @@ -460,35 +267,15 @@ protected async Task MultipleMessagesCanBeAppendedOneAfterTheOther(Task(); var registration = registry.RegisterParamless( flowType, async Task (workflow) => { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - while (true) { - var message = await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId()); + var message = await workflow.Message(); if (message is string s) await workflow.Effect.Capture(() => messages.Add(s)); else @@ -526,8 +313,7 @@ protected async Task PingPongMessagesCanBeExchangedMultipleTimes(Task { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - for (var i = 0; i < 10; i++) { await pongRegistration.SendMessage("Pong", new Ping(i), idempotencyKey: $"Pong{i}"); - await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), filter: pong => pong.Number == i); + await workflow.Message(filter: pong => pong.Number == i); } }); @@ -565,28 +332,9 @@ async Task (workflow) => "PongFlow", async Task (workflow) => { - - var flowTimeouts = new FlowTimeouts(); - var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow); - var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts); - var queueManager = new QueueManager( - workflow.FlowId, - workflow.StoredId, - functionStore.MessageStore, - DefaultSerializer.Instance, - workflow.Effect, - flowState, - unhandledExceptionHandler, - flowTimeouts, - () => DateTime.UtcNow, - SettingsWithDefaults.Default - ); - - var queueClient = await queueManager.CreateQueueClient(); - for (var i = 0; i < 10; i++) { - await queueClient.Pull(workflow, workflow.Effect.CreateNextImplicitId(), filter: ping => ping.Number == i); + await workflow.Message(filter: ping => ping.Number == i); await pingRegistration.SendMessage("Ping", new Pong(i), idempotencyKey: $"Ping{i}"); } }); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestSetup.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestSetup.cs index 530b03abe..38707aef8 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestSetup.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestSetup.cs @@ -1,3 +1,3 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; -[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)] \ No newline at end of file +[assembly: Parallelize(Workers = 0, Scope = ExecutionScope.MethodLevel)] diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs index 61e3a7a22..6e526b4d0 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs @@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId), session, clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId); effect.TryGet("alias", out _).ShouldBeFalse(); @@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId); // Verify the effect is immediately available (eager loading) effect.TryGet("test_alias", out var result).ShouldBeTrue(); @@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task< session, clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store), storedId); var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush); result.ShouldBe("hello world"); @@ -1027,7 +1027,7 @@ await elms.CaptureEach( storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue(); storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue(); storedEffects.Single(e => e.Alias == i.ToString()).EffectId.ShouldBe(new EffectId([1,i,0])); - storedEffects.Count.ShouldBe(4); + storedEffects.Count(e => e.Alias != null).ShouldBe(3); await messageWriter.AppendMessage(i.ToString()); } diff --git a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs index 6107ec780..bb1ef9d23 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs @@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = "└─ ✓ [1]\n"; @@ -75,7 +75,7 @@ public void PrintEffectWithAlias() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = "└─ ✓ [1] my-effect\n"; @@ -111,7 +111,7 @@ public void PrintFailedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n"; @@ -143,7 +143,7 @@ public void PrintStartedEffect() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = "└─ ⋯ [1] in-progress\n"; @@ -189,7 +189,7 @@ public void PrintEffectHierarchy() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = @@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = @@ -295,7 +295,7 @@ public void PrintMultipleRootEffects() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = @@ -330,7 +330,7 @@ public void PrintComplexEffectTree() clearChildren: true ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = @@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect() clearChildren: false ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = @@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors() clearChildren: false ); - var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId); + var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore()), storedId); var output = effect.ExecutionTree(); var expected = diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs index 51483e830..251b14a4e 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs @@ -49,7 +49,7 @@ public void SubflowWaiting() WaitingSubflows++; } - public bool ResumeSubflow() + public bool TryResumeSubflow() { lock (_lock) if (Suspended) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs index 4b3d960de..dc1f8e30b 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs @@ -2,47 +2,28 @@ using System.Collections.Generic; using System.Linq; using System.Threading; -using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; namespace Cleipnir.ResilientFunctions.CoreRuntime; public class FlowTimeouts { private readonly Lock _lock = new(); - private Dictionary> Timeouts { get; } = new(); + private Dictionary Timeouts { get; } = new(); public DateTime? MinimumTimeout { get { lock (_lock) - return GetMinimumTimeout(); + return Timeouts.Values.Count != 0 ? Timeouts.Values.Min() : (DateTime?)null; } } - private DateTime? GetMinimumTimeout() - => Timeouts.Values.Count != 0 ? Timeouts.Values.Min(t => t.Item1) : (DateTime?)null; - - public async Task AddTimeout(EffectId effectId, DateTime timeout, TimeSpan? maxWait = null) + public void AddTimeout(EffectId effectId, DateTime timeout) { - TaskCompletionSource tcs; lock (_lock) - { - tcs = new TaskCompletionSource(); - Timeouts[effectId] = Tuple.Create(timeout, tcs); - } - - if (maxWait == null || timeout <= DateTime.UtcNow) - { - await tcs.Task; - return; - } - - var completed = await Task.WhenAny(tcs.Task, Task.Delay(maxWait.Value)); - if (completed != tcs.Task) - throw new SuspendInvocationException(); + Timeouts[effectId] = timeout; } public void RemoveTimeout(EffectId effectId) @@ -50,21 +31,4 @@ public void RemoveTimeout(EffectId effectId) lock (_lock) Timeouts.Remove(effectId); } - - public bool HasExpiredTimeouts(DateTime now) - { - lock (_lock) - return GetMinimumTimeout() <= now; - } - - public void SignalExpiredTimeouts(DateTime now) - { - lock (_lock) - foreach (var (effectId, (timeout, tcs)) in Timeouts.ToList()) - if (timeout <= now) - { - Timeouts.Remove(effectId); - Task.Run(() => tcs.TrySetResult()); - } - } -} \ No newline at end of file +} diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index 500f95821..7fbd6ac48 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -1,4 +1,3 @@ -using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -6,41 +5,17 @@ namespace Cleipnir.ResilientFunctions.CoreRuntime; -public class FlowsManager : IDisposable +public class FlowsManager { private readonly Dictionary _dict = new(); private readonly Lock _lock = new(); private readonly IFunctionStore _functionStore; - private readonly UtcNow _utcNow; - private volatile bool _disposed; - public FlowsManager(IFunctionStore functionStore, UtcNow utcNow) + public FlowsManager(IFunctionStore functionStore) { _functionStore = functionStore; - _utcNow = utcNow; - _ = Task.Run(TimeoutCheckLoop); } - private async Task TimeoutCheckLoop() - { - while (!_disposed) - { - var expiredStates = new List(); - var now = _utcNow(); - lock (_lock) - foreach (var (_, status) in _dict) - if (status.Timeouts.HasExpiredTimeouts(now)) - expiredStates.Add(status); - - foreach (var status in expiredStates) - status.Timeouts.SignalExpiredTimeouts(now); - - await Task.Delay(10); - } - } - - public void Dispose() => _disposed = true; - public FlowState CreateFlow(StoredId id, FlowTimeouts timeouts) { lock (_lock) @@ -77,4 +52,4 @@ public void CompleteThread(StoredId id) if (_dict.TryGetValue(id, out var flowState)) flowState.SubflowCompleted(); } -} \ No newline at end of file +} diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index aae91c5d7..05865568e 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Queuing; @@ -14,12 +15,12 @@ public class Workflow internal StoredId StoredId { get; } public Effect Effect { get; } - private QueueManager _queueManager; + private readonly QueueManager _queueManager; private readonly UtcNow _utcNow; private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) + internal Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) { FlowId = flowId; StoredId = storedId; @@ -32,23 +33,25 @@ public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager qu public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias); public Task Delay(DateTime until, bool suspend = true, string? alias = null) { - var effectId = Effect.TakeNextImplicitId(); - var timeoutId = EffectId.CreateWithCurrentContext(effectId); + var timeoutId = Effect.CreateNextImplicitId(); async Task Inner() { - var expiry = await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false); - - if (expiry == -1) - { + var expiry = (await Effect.CreateOrGet(timeoutId, until.ToUniversalTime().Ticks, alias, flush: false)) + .ToDateTime(); + + var now = _utcNow(); + if (now > expiry) return; - } - - var maxWait = suspend ? TimeSpan.Zero : (TimeSpan?)null; - await Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime(), maxWait); - await Effect.Upsert(timeoutId, -1L, alias, flush: false); - Effect.FlowTimeouts.RemoveTimeout(timeoutId); + Effect.FlowTimeouts.AddTimeout(timeoutId, expiry); + if (suspend) + throw new SuspendInvocationException(); + + //do in-memory wait + var delay = expiry - now; + await Task.Delay(delay); + Effect.FlowTimeouts.RemoveTimeout(timeoutId); } return Inner(); diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs index 0161e75b4..8e466102d 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/Effect.cs @@ -16,8 +16,24 @@ public enum ResiliencyLevel AtLeastOnceDelayFlush } -public class Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId) +public class Effect { + private readonly EffectResults effectResults; + private readonly UtcNow utcNow; + private readonly FlowTimeouts flowTimeouts; + private readonly FlowsManager flowsManager; + private readonly StoredId storedId; + + internal Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flowTimeouts, FlowsManager flowsManager, StoredId storedId) + { + this.effectResults = effectResults; + this.utcNow = utcNow; + this.flowTimeouts = flowTimeouts; + this.flowsManager = flowsManager; + this.storedId = storedId; + } + + internal bool Contains(int id) => Contains(CreateEffectId(id)); internal bool Contains(EffectId effectId) => effectResults.Contains(effectId); diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectPrinter.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectPrinter.cs index 97dba9fda..102f19e1b 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectPrinter.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectPrinter.cs @@ -5,7 +5,7 @@ namespace Cleipnir.ResilientFunctions.Domain; -public static class EffectPrinter +internal static class EffectPrinter { public static string Print(EffectResults effectResults) { diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResult.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResult.cs index 65e2464db..5b5d352f1 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResult.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResult.cs @@ -1,3 +1,6 @@ namespace Cleipnir.ResilientFunctions.Domain; -public record EffectResult(EffectId Id, object? Value, string? Alias); \ No newline at end of file +public record EffectResult(EffectId Id, object? Value, string? Alias) +{ + public static EffectResult Create(EffectId id, object? value) => new(id, value, Alias: null); +}; \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs index 7fcae14ae..059ee1302 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs @@ -11,7 +11,7 @@ namespace Cleipnir.ResilientFunctions.Domain; -public class EffectResults +internal class EffectResults { private readonly FlowId _flowId; private readonly StoredId _storedId; diff --git a/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs b/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs index e3d9138c1..2ac3c2b9d 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs @@ -2,6 +2,7 @@ using System.IO.Pipelines; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Helpers; namespace Cleipnir.ResilientFunctions.Domain; @@ -108,7 +109,10 @@ public async Task Invoke(Func> work, Effect effect, UtcNow utcNow, var hasDelayUntil = effect.TryGet(delayUntilId, out var delayUntilValue); var delayUntil = hasDelayUntil ? delayUntilValue.ToDateTime() : DateTime.MinValue; if (hasDelayUntil && delayUntil > utcNow()) - await flowTimeouts.AddTimeout(delayUntilId, delayUntil, maxWait: TimeSpan.Zero); + { + flowTimeouts.AddTimeout(delayUntilId, delayUntil); + throw new SuspendInvocationException(); + } var iterationId = effect.CreateEffectId(1); var iteration = await effect.CreateOrGet(iterationId, 0, alias: null, flush: false); @@ -146,9 +150,12 @@ await effect.Upserts( throw; if (delay >= suspendThreshold) - await flowTimeouts.AddTimeout(delayUntilId, delayUntil, maxWait: TimeSpan.Zero); - else - await Task.Delay(delay); + { + flowTimeouts.AddTimeout(delayUntilId, delayUntil); + throw new SuspendInvocationException(); + } + + await Task.Delay(delay); } } } diff --git a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs index 3eff1acc3..1a6aaebcc 100644 --- a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs +++ b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs @@ -40,7 +40,7 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null _shutdownCoordinator = new ShutdownCoordinator(); _settings = SettingsWithDefaults.Default.Merge(settings); var utcNow = _settings.UtcNow; - _flowsManager = new FlowsManager(_functionStore, utcNow); + _flowsManager = new FlowsManager(_functionStore); ClusterInfo = new ClusterInfo(ReplicaId.NewId()); @@ -439,7 +439,6 @@ public ActionRegistration RegisterAction( public void Dispose() { _disposed = true; - _flowsManager.Dispose(); _shutdownCoordinator.SignalShutdown(); _replicaWatchdog.Dispose(); } diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/FetchedMessages.cs b/Core/Cleipnir.ResilientFunctions/Queuing/FetchedMessages.cs index 4541b3577..098d5128a 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/FetchedMessages.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/FetchedMessages.cs @@ -6,6 +6,7 @@ using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -14,8 +15,7 @@ namespace Cleipnir.ResilientFunctions.Queuing; internal class FetchedMessages { private const int ReservedIdPrefix = -1; - private static readonly EffectId PendingDeletionsRoot = new([ReservedIdPrefix, 0]); - private static EffectId PendingDeletion(int index) => new([ReservedIdPrefix, 0, index]); + private static readonly EffectId DeliveredPositionsId = new([ReservedIdPrefix, 0]); private static readonly EffectId IdempotencyKeysRoot = new([ReservedIdPrefix, -1]); private readonly FlowId _flowId; @@ -26,16 +26,18 @@ internal class FetchedMessages private readonly FlowState _flowState; private readonly UnhandledExceptionHandler _unhandledExceptionHandler; private readonly FlowTimeouts _timeouts; + private readonly UtcNow _utcNow; private readonly SettingsWithDefaults _settings; private readonly IdempotencyKeys _idempotencyKeys; private readonly SemaphoreSlim _semaphore = new(1); private readonly Lock _lock = new(); private readonly List _toDeliver = new(); - private readonly HashSet _fetchedPositions = new(); private readonly List _subscriptions = new(); - private int _nextToRemoveIndex; private volatile Exception? _thrownException; + + private readonly HashSet _fetchedPositions = new(); + private readonly HashSet _deliveredPositions = new(); public Exception? ThrownException => _thrownException; @@ -61,6 +63,7 @@ public FetchedMessages( _flowState = flowState; _unhandledExceptionHandler = unhandledExceptionHandler; _timeouts = timeouts; + _utcNow = utcNow; _settings = settings; _idempotencyKeys = new IdempotencyKeys(IdempotencyKeysRoot, _effect, maxIdempotencyKeyCount, maxIdempotencyKeyTtl, utcNow); } @@ -69,8 +72,7 @@ public async Task Initialize() { _idempotencyKeys.Initialize(); - _nextToRemoveIndex = await _effect.CreateOrGet(PendingDeletionsRoot, 0, alias: null, flush: false); - var children = _effect.GetChildren(PendingDeletionsRoot); + var children = _effect.GetChildren(DeliveredPositionsId); var positions = new List(); foreach (var childId in children) { @@ -137,72 +139,57 @@ public async Task FetchOnce() } finally { - TryDispatch(); + DeliverMessages(); _semaphore.Release(); } } - public async Task WaitForMessageOrTimeout(EffectId timeoutId, MessagePredicate predicate, DateTime? timeout) + public async Task AddSubscription(EffectId id, MessagePredicate predicate, DateTime? timeout, Func> captureMessage) { - var subscription = new Subscription(predicate, timeout); - + if (timeout != null) + _timeouts.AddTimeout(id, timeout.Value); + + var utcNow = _utcNow(); + var waitBeforeNull = (timeout, _settings.MessagesDefaultMaxWaitForCompletion) switch + { + (null, { Ticks: 0 }) => utcNow, + (null, var w) => utcNow + w, + ({ } t, { Ticks: 0 }) => utcNow, + ({ } t, var w) => t < utcNow + w ? t : utcNow + w + }; + + var subscription = new Subscription(id, predicate, Timeout: waitBeforeNull, UserTimeout: timeout, captureMessage); lock (_lock) _subscriptions.Add(subscription); - TryDispatch(); - - var waitTask = timeout != null - ? _timeouts.AddTimeout(timeoutId, timeout.Value) - : Task.Delay(_settings.MessagesDefaultMaxWaitForCompletion); - _flowState.SubflowWaiting(); - await Task.WhenAny(subscription.Tcs.Task, waitTask); - var success = _flowState.ResumeSubflow(); + var result = await subscription.Tcs.Task; + var success = _flowState.TryResumeSubflow(); if (!success) await new TaskCompletionSource().Task; - lock (_lock) - { - var stillRegistered = _subscriptions.Remove(subscription); - if (stillRegistered) - subscription.Tcs.TrySetResult(null); - } - - var result = await subscription.Tcs.Task; if (result != null) - _timeouts.RemoveTimeout(timeoutId); + return result; + if (timeout != null && _utcNow() > timeout.Value) + return null; - return result; + throw new SuspendInvocationException(); } - + public async Task AfterFlush() { await _semaphore.WaitAsync(); try { - var children = _effect.GetChildren(PendingDeletionsRoot); - var nonDirtyChildren = new List(); - foreach (var childId in children) - if (!_effect.IsDirty(childId)) - nonDirtyChildren.Add(childId); - - if (nonDirtyChildren.Any()) - { - var positions = new List(); - foreach (var nonDirtyChild in nonDirtyChildren) - { - var position = _effect.Get(nonDirtyChild); - positions.Add(position); - } + if (!_effect.TryGet>(DeliveredPositionsId, out var deliveredPositions) || deliveredPositions is null) + return; - await _messageStore.DeleteMessages(_storedId, positions); - foreach (var nonDirtyChild in nonDirtyChildren) - await _effect.Clear(nonDirtyChild, flush: false); + if (deliveredPositions.Count == 0 || _effect.IsDirty(DeliveredPositionsId)) + return; - lock (_lock) - foreach (var position in positions) - _fetchedPositions.Remove(position); - } + await _messageStore.DeleteMessages(_storedId, deliveredPositions); + deliveredPositions.Clear(); + _effect.FlushlessUpsert(DeliveredPositionsId, deliveredPositions, alias: null); } catch (Exception exception) { @@ -214,7 +201,7 @@ public async Task AfterFlush() } } - private void TryDispatch() + private void DeliverMessages() { lock (_lock) for (var subscriptionIndex = 0; subscriptionIndex < _subscriptions.Count; subscriptionIndex++) @@ -225,12 +212,17 @@ private void TryDispatch() { var msg = _toDeliver[matchIndex]; _toDeliver.RemoveAt(matchIndex); - var positionToRemoveIndex = _nextToRemoveIndex++; - var toRemoveId = PendingDeletion(positionToRemoveIndex); - _effect.FlushlessUpsert(PendingDeletionsRoot, _nextToRemoveIndex, alias: null); + _deliveredPositions.Add(msg.Position); _subscriptions.RemoveAt(subscriptionIndex); - subscription.Tcs.TrySetResult(new MatchResult(msg, toRemoveId)); - TryDispatch(); + + _effect.FlushlessUpserts( + subscription.CaptureMessage(msg) + .Append(EffectResult.Create(DeliveredPositionsId, _deliveredPositions.ToList())) + ); + + _timeouts.RemoveTimeout(subscription.EffectId); + subscription.Tcs.TrySetResult(msg); + DeliverMessages(); return; } } @@ -246,6 +238,28 @@ private void FailAllSubscriptions(Exception exception) } } + public void FireTimeouts() + { + var now = _utcNow(); + lock (_lock) + for (var i = _subscriptions.Count - 1; i >= 0; i--) + { + var subscription = _subscriptions[i]; + if (subscription.Timeout is { } timeout && timeout <= now) + { + _subscriptions.RemoveAt(i); + + if (subscription.UserTimeout is { } userTimeout && userTimeout <= now) + { + _effect.FlushlessUpserts(subscription.CaptureMessage(null)); + _timeouts.RemoveTimeout(subscription.EffectId); + } + + subscription.Tcs.TrySetResult(null); + } + } + } + public record MessageData( Envelope Envelope, long Position, @@ -254,11 +268,10 @@ public record MessageData( string? Receiver, string? Sender ); + - public record MatchResult(MessageData Message, EffectId ToRemoveId); - - private record Subscription(MessagePredicate Predicate, DateTime? Timeout) + private record Subscription(EffectId EffectId, MessagePredicate Predicate, DateTime? Timeout, DateTime? UserTimeout, Func> CaptureMessage) { - public TaskCompletionSource Tcs { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + public TaskCompletionSource Tcs { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); } } diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs deleted file mode 100644 index 572f989cc..000000000 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueBuilder.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Queuing; - -public class QueueBuilder(IEnumerable> predicates, QueueManager manager) -{ - internal bool CanHandle(object msg) - { - if (msg is not T) - return false; - - return predicates.All(f => f(msg)); - } - - public QueueBuilder Where(Func predicate) - => new( - predicates.Append(msg => msg is T && predicate((T)msg)), - manager - ); - - public QueueBuilder OfType() where TChild : T - => new QueueBuilder(predicates, manager); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs index 7161f02aa..8b9339e6e 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs @@ -8,7 +8,7 @@ namespace Cleipnir.ResilientFunctions.Queuing; -public class QueueClient(QueueManager queueManager, ISerializer serializer, UtcNow utcNow) +internal class QueueClient(QueueManager queueManager, ISerializer serializer, UtcNow utcNow) { public Task FetchMessages() => queueManager.FetchMessagesOnce(); @@ -52,8 +52,7 @@ public Task PullEnvelope(Workflow workflow, EffectId parentId, Func timeout = new DateTime(timeoutTicks, DateTimeKind.Utc); } - - + if (!effect.Contains(messageId)) { var result = await queueManager.Subscribe( @@ -63,12 +62,19 @@ public Task PullEnvelope(Workflow workflow, EffectId parentId, Func messageId, messageTypeId, receiverId, - senderId + senderId, + captureMessage: msg => + msg == null + ? [EffectResult.Create(messageId, null)] + : + [ + EffectResult.Create(messageId, msg.MessageContentBytes), + EffectResult.Create(messageTypeId, msg.MessageTypeBytes), + EffectResult.Create(receiverId, msg.Receiver), + EffectResult.Create(senderId, msg.Sender), + ] ); - - if (result == null) - effect.FlushlessUpsert(messageId, null, alias: null); - + return result; } diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index dfd7468ff..a593246e1 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.CoreRuntime; @@ -12,7 +13,7 @@ namespace Cleipnir.ResilientFunctions.Queuing; public delegate bool MessagePredicate(Envelope envelope); -public class QueueManager : IDisposable +internal class QueueManager : IDisposable { private readonly Effect _effect; private readonly FlowState _flowState; @@ -95,6 +96,7 @@ public Task FetchMessagesOnce() { if (_disposed) throw new ObjectDisposedException($"{nameof(QueueManager)} is disposed already"); + return _fetchedMessages.FetchOnce(); } @@ -107,31 +109,16 @@ public Task FetchMessagesOnce() EffectId messageId, EffectId messageTypeId, EffectId receiverId, - EffectId senderId) + EffectId senderId, + Func> captureMessage) { if (_fetchedMessages.ThrownException is { } pre) throw pre; + var task = _fetchedMessages.AddSubscription(messageId, predicate, timeout, captureMessage); await FetchMessagesOnce(); - var matched = await _fetchedMessages.WaitForMessageOrTimeout(timeoutId, predicate, timeout); - - if (_fetchedMessages.ThrownException is { } post) - throw post; - - if (matched == null) - return timeout != null ? null : throw new SuspendInvocationException(); - - _effect.FlushlessUpserts( - [ - new EffectResult(matched.ToRemoveId, matched.Message.Position, Alias: null), - new EffectResult(messageId, matched.Message.MessageContentBytes, Alias: null), - new EffectResult(messageTypeId, matched.Message.MessageTypeBytes, Alias: null), - new EffectResult(receiverId, matched.Message.Receiver, Alias: null), - new EffectResult(senderId, matched.Message.Sender, Alias: null), - ]); - - return matched.Message.Envelope; + return (await task)?.Envelope; } private async Task FetchLoop() @@ -139,6 +126,8 @@ private async Task FetchLoop() while (!_disposed && _fetchedMessages.ThrownException == null) { await FetchMessagesOnce(); + _fetchedMessages.FireTimeouts(); + await Task.WhenAny( _flowState.InterruptSignal.Wait(), Task.Delay(_settings.MessagesPullFrequency));