Skip to content

Interrupt-driven message fetch; merge FetchedMessages into QueueManager#150

Merged
stidsborg merged 2 commits into
mainfrom
interrupt-driven-fetch
May 24, 2026
Merged

Interrupt-driven message fetch; merge FetchedMessages into QueueManager#150
stidsborg merged 2 commits into
mainfrom
interrupt-driven-fetch

Conversation

@stidsborg
Copy link
Copy Markdown
Owner

Summary

  • Interrupt-driven fetch. Replaces the per-flow FetchLoop polling task with an event-driven model. InterruptedWatchdogFlowsManager.InterruptQueueManager.InterruptFetchAndNotify. Cuts a background Task.Run per flow and removes the MessagesPullFrequency latency floor on delivery.
  • Per-subscription timeouts. Each Subscribe schedules its own Task.Delay with a CancellationTokenSource, cancelled when the message arrives. Replaces the centralized FireTimeouts scan. The continuation only runs OnlyOnRanToCompletion and guards on _subscriptions.Remove(...) to prevent overwriting a delivered message with a null capture.
  • Merge FetchedMessages into QueueManager. The split was an artifact of the polling architecture. A single class better describes "the message queue for a single flow." MessageData is now QueueManager.MessageData (public nested record used by QueueClient's capture callback).
  • Init ordering. _effect.RegisterQueueManager(this) and _flowState.QueueManager = this are now set after initialization completes, so the InterruptedWatchdog can't reach FetchAndNotify while idempotency-key state and delivered-position cleanup are still being set up.

Test plan

  • All 451 in-memory tests pass locally (dotnet test ./Core/Cleipnir.ResilientFunctions.Tests --filter "FullyQualifiedName~InMemoryTests")
  • Database-backed test suites (Postgres / MariaDB / SQL Server)

stidsborg added 2 commits May 24, 2026 10:17
Wires up InterruptedWatchdog -> FlowsManager.Interrupt -> QueueManager.Interrupt
so message fetching becomes event-driven instead of relying on a per-flow
FetchLoop background task. Each Subscribe schedules its own per-subscription
Task.Delay for timeout/suspend handling, replacing the centralized
FireTimeouts scan.

Also merges FetchedMessages into QueueManager - the split was an artifact of
the previous polling architecture and a single class better describes "the
message queue for a single flow."
@stidsborg stidsborg merged commit 7cb6833 into main May 24, 2026
8 checks passed
@stidsborg stidsborg deleted the interrupt-driven-fetch branch May 24, 2026 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant