Skip to content

FlowState: add Status, thread invoker Task, simplify resume/interrupt#154

Open
stidsborg wants to merge 1 commit into
mainfrom
flowstate-status-and-task
Open

FlowState: add Status, thread invoker Task, simplify resume/interrupt#154
stidsborg wants to merge 1 commit into
mainfrom
flowstate-status-and-task

Conversation

@stidsborg
Copy link
Copy Markdown
Owner

Summary

Scaffolding for centralised suspension via FlowsManager. No behaviour change yet — nothing calls FlowState.Suspend() in this PR.

  • FlowState constructor now takes a completion Task and hooks a ContinueWith that flips Status to Completed when the invoker's TaskCompletionSource resolves. Setter guards Completed as terminal.
  • Waiting() returns Subflows == WaitingSubflows.
  • TryResumeSubflow() (returning bool) becomes ResumeSubflow() returning ForeverTask.Instance when suspended or Task.CompletedTask otherwise. Call site in QueueManager.Subscribe collapses to a single await.
  • Interrupt() no longer zeros WaitingSubflows: that desynchronised the counter — subsequent ResumeSubflow calls drove it negative, which silently prevented future Suspend() from ever triggering. Now it just delegates to QueueManager.Interrupt() when not suspended.
  • Invoker creates its TaskCompletionSource before PrepareFor[Re]Invocation and threads tcs.Task through CreateFlowState, so FlowState can observe the invocation's terminal state directly.

Test files that hand-roll FlowState/CreateFlowState (PrintEffectsTests, EffectTests, MessagesSubscriptionTests) now pass ForeverTask.Instance for the completion Task.

Follow-ups (not in this PR)

  • Implement CheckForSuspension loop in FlowsManager and start it from FunctionsRegistry.
  • Make Suspend() fault live QueueManager subscriptions with SuspendInvocationException so the user-function task exits and the existing PersistResult path runs.
  • Decouple MessagesDefaultMaxWaitForCompletion from per-subscription auto-suspension in QueueManager.Subscribe.

Test plan

  • In-memory suite: 451/451 pass
  • PostgreSQL suite: 377/377 pass
  • SqlServer / MariaDB: single-test runs pass for the central test paths; full-suite hang on PingPongMessagesCanBeExchangedMultipleTimes is pre-existing (reproduced on main baseline without this branch)

- FlowState constructor takes a completion Task and hooks ContinueWith to
  flip Status to Completed once the invoker's TaskCompletionSource resolves.
  Setter guards Completed as terminal.
- Implement Waiting() as Subflows == WaitingSubflows.
- Replace TryResumeSubflow() (bool) with ResumeSubflow() returning
  ForeverTask.Instance when suspended or Task.CompletedTask otherwise;
  collapses the QueueManager call site to a single await.
- Drop WaitingSubflows = 0 in Interrupt(): it desynchronised the counter
  (later ResumeSubflow calls drove it negative, blocking future Suspend()).
- Invoker creates the TCS before PrepareFor[Re]Invocation and threads
  tcs.Task through CreateFlowState so FlowState can observe completion.

Scaffolding for centralized suspension via FlowsManager; nothing calls
Suspend() yet, so behaviour is unchanged.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant