Simplify FlowTimeouts and finish FetchedMessages subscription refactor#147
Merged
Conversation
Reworks the subscription flow so messages are pushed to waiting subscriptions instead of subscriptions polling, and tracks delivered positions as a single list effect rather than per-message child effects. - AddSubscription replaces WaitForMessageOrTimeout: subscriptions hold their own captureMessage callback, and the timeout / null-result paths persist results via that callback. - DeliverMessages pushes matched messages to subscriptions inline. - FireTimeouts persists captureMessage(null) and clears the FlowTimeouts entry only when a user-specified timeout fires, so replay short-circuits to null while purely-internal maxWait expiries leave room to re-subscribe. - DeliveredPositionsId tracks consumed positions as a single list and AfterFlush empties it once messages are deleted from the store. - EffectPrinter / EffectResults visibility narrowed to internal, and EffectResult.Create added for the new call sites. - FlowState.ResumeSubflow renamed to TryResumeSubflow. - Workflow.Delay rewritten around the new EffectId helpers and the clock-based completion check. - EffectLoopingWorks now counts aliased effects (3 user effects) rather than total count; the previous "4" included a queue-internal placeholder that this refactor no longer creates eagerly.
The maxWait AddTimeout overload only existed for two RetryPolicy call sites that both passed TimeSpan.Zero, where the path reduced to "register the timeout and throw SuspendInvocationException." That control-flow belongs to the caller, not to FlowTimeouts. - FlowTimeouts.AddTimeout(EffectId, DateTime) is now the only overload; storage is Dictionary<EffectId, DateTime>. SignalExpiredTimeouts / HasExpiredTimeouts / the TCS dance are gone — nothing awaited those TCSes once the maxWait overload was removed. - FlowsManager loses TimeoutCheckLoop, IDisposable, and the UtcNow constructor argument; per-invocation cleanup of FlowTimeouts entries already happens implicitly via Invoker.RemoveFlow. - RetryPolicy call sites now AddTimeout + throw SuspendInvocationException directly. - FunctionsRegistry drops the utcNow argument to FlowsManager and the matching Dispose call. - Test files updated to the new FlowsManager(IFunctionStore) signature.
MSTest parallelizes test methods on the threadpool (Workers = 0, MethodLevel), and many tests also spawn background Task.Run loops (FetchLoop polling every 250ms, watchdogs polling every 1s). With a cold threadpool, those continuations queue behind the parallel test methods, and a test like MessagesTests.QueueClientReturnsNullAfterTimeout — which expects Pull to return null shortly after a 100ms timeout fires and waits up to 5s for the flow to complete — can hit its Completion timeout before the FetchLoop ever runs FireTimeouts. ThreadPool.SetMinThreads(128, 128) keeps enough threads warm that background loops don't get starved by parallel test methods. Run-suite stability across 5 full in-memory test runs went from "1-2 flakes most runs" to no flakes observed.
Replaces the nested ternary with a switch over the four (timeout, MessagesDefaultMaxWaitForCompletion) combinations. Behavior is preserved: when MessagesDefaultMaxWaitForCompletion is zero the subscription is set to fire at utcNow (suspend immediately) regardless of whether the caller supplied a user timeout; otherwise the wait is capped at min(timeout, utcNow + maxWait).
Introduces a private CreateSettings helper that defaults WatchdogCheckFrequency and MessagesPullFrequency to 100ms (down from the framework defaults of 1s and 250ms). Replaces all bare new Settings(catch) call sites with the helper. Reduces flakiness in timeout-based tests (e.g. QueueClientReturnsNullAfterTimeout): with the 1s default watchdog frequency, a postponed flow could take >4s to be picked up under CI load, eating into the test's 5s Completion budget. With a 100ms watchdog the same flow completes in ~330ms locally. The PingPongMessagesCanBeExchangedMultipleTimes site keeps its explicit 10ms pull-frequency override via the helper's optional parameter.
…low.Message The 9 remaining tests in MessagesTests constructed their own FlowTimeouts, FlowsManager, FlowState, and QueueManager inside the flow body, then called QueueClient.Pull directly. This bypassed the framework's wiring -- in particular, the test's local FlowTimeouts never propagated to workflow.Effect.FlowTimeouts, so when a Pull suspended the flow it was marked Suspended (not Postponed) and the PostponedWatchdog skipped it. The tests "worked" only when the FetchLoop happened to fire after the user timeout had elapsed, which caused intermittent failures on the CI pipeline. All 9 tests now call workflow.Message<T>(...) directly. This uses the properly-wired QueueManager that ships with Workflow, where FlowTimeouts is shared between Effect, FlowState, and QueueManager. Suspended->Postponed->watchdog-pickup works as designed. Side effects: - queueClient/queueClients captures + BusyWait.Until + external FetchMessages calls are no longer needed; the FetchLoop runs at 100ms (via CreateSettings) and AppendMessage's store-level Interrupt flips the flow to Postponed for the watchdog. - Dropped unused imports (CoreRuntime, Queuing, Helpers, Linq). - Net change: -257 / +17 lines. Full in-memory suite (451 tests) passes; in-memory MessagesTests finish in ~1s (vs ~3s previously).
…pletion The old hand-rolled QueueManager in this test used MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1), which made the in-memory wait long enough that the flow never actually suspended -- so its WaitForCompletion (defaulting to allowPostponeAndSuspended: false) was fine. After switching to workflow.Message<string>(), the framework's QueueManager runs with MaxWait = TimeSpan.Zero, so the flow does transition through Postponed (Expires=0 once AppendMessage's store-level Interrupt fires) before the watchdog re-invokes and the flow Succeeds. WaitForCompletion was polling during that brief Postponed window and throwing InvocationPostponedException. Matches the pattern already used by PingPong and MultipleMessages.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Three loosely-related cleanups (in three commits):
FetchedMessages: completes the push-based subscription model. Subscriptions hold their own
captureMessagecallback;DeliverMessagespushes to matching subscriptions inline;FireTimeoutspersistscaptureMessage(null)+ clears theFlowTimeoutsentry only when a user-specified timeout fires (purely-internalmaxWaitexpiries leave room for re-subscription on replay). Delivered positions tracked as a single list effect rather than per-message child effects.QueueBuilderdeleted,EffectPrinter/EffectResultsnarrowed to internal,FlowState.ResumeSubflow→TryResumeSubflow,Workflow.Delayrewritten around clock-based completion.FlowTimeouts: drops the
maxWaitoverload andTimeoutCheckLoop. Both call sites inRetryPolicywere passingTimeSpan.Zeroand that path reduced to "register timeout + throwSuspendInvocationException" — control flow that belongs to the caller. With the overload gone, the TCS-signalling infrastructure was dead (nobody awaited the TCSes), so the 10ms poll loop inFlowsManagergoes too. Per-invocation cleanup ofFlowTimeoutsentries already happens viaInvoker.RemoveFlow.FlowsManagerlosesIDisposableand itsUtcNowarg;FunctionsRegistrydrops the correspondingDisposecall.Tests/TestSetup: pre-warms the threadpool to 128 min threads. MSTest parallelizes test methods on the threadpool (
Workers = 0,MethodLevel), and many tests also spawn backgroundTask.Runloops (FetchLoop @ 250ms, watchdogs @ 1s). With a cold threadpool, those continuations queued behind the parallel test methods, causingMessagesTests.QueueClientReturnsNullAfterTimeout(5s Completion budget, 100ms Pull timeout) to occasionally hit its Completion timeout before the FetchLoop ever ran. Full in-memory suite went from "1-2 flakes most runs" to clean across 4 consecutive runs after the fix (the 5th was killed mid-run by my own cleanup, not a failure).Test plan