Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .claude/settings.local.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
"Bash(git commit -m \"$\\(cat <<''EOF''\nRemoved unused QueueFlag and associated tests\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions diff)",
"Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions log --oneline -5)",
"Bash(git commit:*)"
"Bash(git commit:*)",
"Bash(git checkout:*)",
"Bash(git push:*)"
],
"deny": [],
"ask": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,20 +561,20 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
exceptionThrowingSerializer,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

var message = await queueClient.Pull<GoodMessage>(
Expand Down Expand Up @@ -624,20 +624,20 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
storedId = workflow.StoredId;
var minimumTimeout = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
minimumTimeout,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout);

var queueClient = await queueManager.CreateQueueClient();

Expand Down Expand Up @@ -685,20 +685,20 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

// Pull envelope for specific receiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
queueClient = await queueManager.CreateQueueClient();
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());

Expand Down Expand Up @@ -88,20 +88,20 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));

Expand Down Expand Up @@ -135,20 +135,20 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
var message = await queueClient.Pull<object>(
workflow,
Expand Down Expand Up @@ -187,20 +187,20 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
var message = await queueClient.Pull<object>(
workflow,
Expand Down Expand Up @@ -240,20 +240,20 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
var message = await queueClient.Pull<string>(
workflow,
Expand Down Expand Up @@ -294,20 +294,20 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
Expand Down Expand Up @@ -350,20 +350,20 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
Expand Down Expand Up @@ -404,20 +404,20 @@ async Task (workflow) =>

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) },
flowsManager
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) }
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();
lock (queueClients)
queueClients[workflow.FlowId.Instance.Value] = queueClient;
Expand Down Expand Up @@ -470,20 +470,20 @@ async Task (workflow) =>

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

while (true)
Expand Down Expand Up @@ -538,20 +538,20 @@ async Task (workflow) =>

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

for (var i = 0; i < 10; i++)
Expand All @@ -568,20 +568,20 @@ async Task (workflow) =>

var flowTimeouts = new FlowTimeouts();
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
var queueManager = new QueueManager(
workflow.FlowId,
workflow.StoredId,
functionStore.MessageStore,
DefaultSerializer.Instance,
workflow.Effect,
flowState,
unhandledExceptionHandler,
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
flowsManager
SettingsWithDefaults.Default
);

flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
var queueClient = await queueManager.CreateQueueClient();

for (var i = 0; i < 10; i++)
Expand Down
Loading
Loading