Skip to content

Simplify FlowTimeouts and finish FetchedMessages subscription refactor#147

Merged
stidsborg merged 10 commits into
mainfrom
flowtimeouts-cleanup
May 16, 2026
Merged

Simplify FlowTimeouts and finish FetchedMessages subscription refactor#147
stidsborg merged 10 commits into
mainfrom
flowtimeouts-cleanup

Conversation

@stidsborg
Copy link
Copy Markdown
Owner

Summary

Three loosely-related cleanups (in three commits):

  • FetchedMessages: completes the push-based subscription model. Subscriptions hold their own captureMessage callback; DeliverMessages pushes to matching subscriptions inline; FireTimeouts persists captureMessage(null) + clears the FlowTimeouts entry only when a user-specified timeout fires (purely-internal maxWait expiries leave room for re-subscription on replay). Delivered positions tracked as a single list effect rather than per-message child effects. QueueBuilder deleted, EffectPrinter / EffectResults narrowed to internal, FlowState.ResumeSubflowTryResumeSubflow, Workflow.Delay rewritten around clock-based completion.

  • FlowTimeouts: drops the maxWait overload and TimeoutCheckLoop. Both call sites in RetryPolicy were passing TimeSpan.Zero and that path reduced to "register timeout + throw SuspendInvocationException" — control flow that belongs to the caller. With the overload gone, the TCS-signalling infrastructure was dead (nobody awaited the TCSes), so the 10ms poll loop in FlowsManager goes too. Per-invocation cleanup of FlowTimeouts entries already happens via Invoker.RemoveFlow. FlowsManager loses IDisposable and its UtcNow arg; FunctionsRegistry drops the corresponding Dispose call.

  • Tests/TestSetup: pre-warms the threadpool to 128 min threads. MSTest parallelizes test methods on the threadpool (Workers = 0, MethodLevel), and many tests also spawn background Task.Run loops (FetchLoop @ 250ms, watchdogs @ 1s). With a cold threadpool, those continuations queued behind the parallel test methods, causing MessagesTests.QueueClientReturnsNullAfterTimeout (5s Completion budget, 100ms Pull timeout) to occasionally hit its Completion timeout before the FetchLoop ever ran. Full in-memory suite went from "1-2 flakes most runs" to clean across 4 consecutive runs after the fix (the 5th was killed mid-run by my own cleanup, not a failure).

Test plan

  • `dotnet build Cleipnir.ResilientFunctions.sln` — clean
  • In-memory test suite — 451/451 passing (4 consecutive full-suite runs)
  • Each previously-failing test passes in isolation:
    • `MessagesSubscriptionTests.RegisteredTimeoutIsRemovedWhenPullingMessage`
    • `MessagesSubscriptionTests.QueueClientReturnsNullAfterTimeout`
    • `InMemoryTests.RFunctionTests.EffectTests.EffectLoopingWorks`
  • DB-backed suites (PostgreSQL/SqlServer/MariaDB) — not run locally; please verify in CI

stidsborg added 10 commits May 15, 2026 13:27
Reworks the subscription flow so messages are pushed to waiting
subscriptions instead of subscriptions polling, and tracks delivered
positions as a single list effect rather than per-message child effects.

- AddSubscription replaces WaitForMessageOrTimeout: subscriptions hold
  their own captureMessage callback, and the timeout / null-result paths
  persist results via that callback.
- DeliverMessages pushes matched messages to subscriptions inline.
- FireTimeouts persists captureMessage(null) and clears the FlowTimeouts
  entry only when a user-specified timeout fires, so replay short-circuits
  to null while purely-internal maxWait expiries leave room to re-subscribe.
- DeliveredPositionsId tracks consumed positions as a single list and
  AfterFlush empties it once messages are deleted from the store.
- EffectPrinter / EffectResults visibility narrowed to internal, and
  EffectResult.Create added for the new call sites.
- FlowState.ResumeSubflow renamed to TryResumeSubflow.
- Workflow.Delay rewritten around the new EffectId helpers and the
  clock-based completion check.
- EffectLoopingWorks now counts aliased effects (3 user effects) rather
  than total count; the previous "4" included a queue-internal placeholder
  that this refactor no longer creates eagerly.
The maxWait AddTimeout overload only existed for two RetryPolicy call
sites that both passed TimeSpan.Zero, where the path reduced to
"register the timeout and throw SuspendInvocationException." That
control-flow belongs to the caller, not to FlowTimeouts.

- FlowTimeouts.AddTimeout(EffectId, DateTime) is now the only overload;
  storage is Dictionary<EffectId, DateTime>. SignalExpiredTimeouts /
  HasExpiredTimeouts / the TCS dance are gone — nothing awaited those
  TCSes once the maxWait overload was removed.
- FlowsManager loses TimeoutCheckLoop, IDisposable, and the UtcNow
  constructor argument; per-invocation cleanup of FlowTimeouts entries
  already happens implicitly via Invoker.RemoveFlow.
- RetryPolicy call sites now AddTimeout + throw SuspendInvocationException
  directly.
- FunctionsRegistry drops the utcNow argument to FlowsManager and the
  matching Dispose call.
- Test files updated to the new FlowsManager(IFunctionStore) signature.
MSTest parallelizes test methods on the threadpool (Workers = 0,
MethodLevel), and many tests also spawn background Task.Run loops
(FetchLoop polling every 250ms, watchdogs polling every 1s). With a
cold threadpool, those continuations queue behind the parallel test
methods, and a test like MessagesTests.QueueClientReturnsNullAfterTimeout
— which expects Pull to return null shortly after a 100ms timeout fires
and waits up to 5s for the flow to complete — can hit its Completion
timeout before the FetchLoop ever runs FireTimeouts.

ThreadPool.SetMinThreads(128, 128) keeps enough threads warm that
background loops don't get starved by parallel test methods. Run-suite
stability across 5 full in-memory test runs went from "1-2 flakes most
runs" to no flakes observed.
Replaces the nested ternary with a switch over the four
(timeout, MessagesDefaultMaxWaitForCompletion) combinations. Behavior
is preserved: when MessagesDefaultMaxWaitForCompletion is zero the
subscription is set to fire at utcNow (suspend immediately) regardless
of whether the caller supplied a user timeout; otherwise the wait is
capped at min(timeout, utcNow + maxWait).
Introduces a private CreateSettings helper that defaults
WatchdogCheckFrequency and MessagesPullFrequency to 100ms (down from
the framework defaults of 1s and 250ms). Replaces all bare
new Settings(catch) call sites with the helper.

Reduces flakiness in timeout-based tests (e.g.
QueueClientReturnsNullAfterTimeout): with the 1s default watchdog
frequency, a postponed flow could take >4s to be picked up under CI
load, eating into the test's 5s Completion budget. With a 100ms
watchdog the same flow completes in ~330ms locally.

The PingPongMessagesCanBeExchangedMultipleTimes site keeps its
explicit 10ms pull-frequency override via the helper's optional
parameter.
…low.Message

The 9 remaining tests in MessagesTests constructed their own
FlowTimeouts, FlowsManager, FlowState, and QueueManager inside the
flow body, then called QueueClient.Pull directly. This bypassed the
framework's wiring -- in particular, the test's local FlowTimeouts
never propagated to workflow.Effect.FlowTimeouts, so when a Pull
suspended the flow it was marked Suspended (not Postponed) and the
PostponedWatchdog skipped it. The tests "worked" only when the
FetchLoop happened to fire after the user timeout had elapsed, which
caused intermittent failures on the CI pipeline.

All 9 tests now call workflow.Message<T>(...) directly. This uses the
properly-wired QueueManager that ships with Workflow, where
FlowTimeouts is shared between Effect, FlowState, and QueueManager.
Suspended->Postponed->watchdog-pickup works as designed.

Side effects:
- queueClient/queueClients captures + BusyWait.Until + external
  FetchMessages calls are no longer needed; the FetchLoop runs at
  100ms (via CreateSettings) and AppendMessage's store-level Interrupt
  flips the flow to Postponed for the watchdog.
- Dropped unused imports (CoreRuntime, Queuing, Helpers, Linq).
- Net change: -257 / +17 lines.

Full in-memory suite (451 tests) passes; in-memory MessagesTests
finish in ~1s (vs ~3s previously).
…pletion

The old hand-rolled QueueManager in this test used
MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1), which
made the in-memory wait long enough that the flow never actually
suspended -- so its WaitForCompletion (defaulting to
allowPostponeAndSuspended: false) was fine.

After switching to workflow.Message<string>(), the framework's
QueueManager runs with MaxWait = TimeSpan.Zero, so the flow does
transition through Postponed (Expires=0 once AppendMessage's
store-level Interrupt fires) before the watchdog re-invokes and the
flow Succeeds. WaitForCompletion was polling during that brief
Postponed window and throwing InvocationPostponedException.

Matches the pattern already used by PingPong and MultipleMessages.
@stidsborg stidsborg merged commit d8fefad into main May 16, 2026
8 checks passed
@stidsborg stidsborg deleted the flowtimeouts-cleanup branch May 16, 2026 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant