Skip to content

Commit 5f1d352

Browse files
brianstrauchclaude
andauthored
Fix continue-as-new handling in workflow_streams subscribe (#1581)
* Fix continue-as-new detection in workflow_streams subscribe _follow_continue_as_new described the workflow with no run id, which returns the current run. After a continue-as-new that is the new RUNNING run, never CONTINUED_AS_NEW (which only sits on the old, closed run), so the check never fired and subscribe() stopped during a rollover instead of following the stream. Capture the run id each poll's update is admitted to (start_update with WaitForStage ACCEPTED, read workflow_run_id, then await result) and describe that specific run on failure. A rolled-over run reports CONTINUED_AS_NEW, a terminal run reports a terminal status, and a still-RUNNING run is a transient error that should surface. This also avoids mistaking an unrelated new execution that reused the workflow id for a successor. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Retry poll rejected while stream is draining for continue-as-new The poll update's validator rejected new polls during detach-for-CAN with an untyped RuntimeError, which subscribe() did not classify and re-raised — ending the subscription with an error during a routine rollover. Give the validator the well-known StreamDraining ApplicationError type and have subscribe() back off and retry on it, so the poll lands on the successor run once the rollover completes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Add CODEOWNERS entries for workflow_streams * Address review feedback on workflow_streams continue-as-new fix - Hoist StreamDraining/TruncatedOffset error types to constants in _types.py so the raise sites (_stream.py) and handling sites (_client.py) cannot diverge. - Annotate _describe_polled_run return type. - Drop dead non_retryable=True flags: all three ApplicationErrors are raised from update validators/handlers, where retry policy does not apply, so the flag was a no-op. - Document poll_cooldown semantics and warn against timedelta(0). - Add test verifying the subscribe flow captures the polled run id. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b0b934f commit 5f1d352

5 files changed

Lines changed: 295 additions & 16 deletions

File tree

.github/CODEOWNERS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@
1010
# other than the SDK team. For each one, we add the owning team,
1111
# as well as @temporalio/sdk, so the SDK team can continue to
1212
# manage repo-wide concerns.
13-
/temporalio/contrib/common/ @temporalio/ai-sdk @temporalio/sdk
1413
/temporalio/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
1514
/temporalio/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
1615
/temporalio/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
1716
/temporalio/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
1817
/temporalio/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk
18+
/temporalio/contrib/workflow_streams/ @temporalio/ai-sdk @temporalio/sdk
1919
/tests/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
2020
/tests/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
2121
/tests/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
2222
/tests/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
2323
/tests/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk
24+
/tests/contrib/workflow_streams/ @temporalio/ai-sdk @temporalio/sdk

temporalio/contrib/workflow_streams/_client.py

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@
3131
from temporalio.api.common.v1 import Payload
3232
from temporalio.client import (
3333
Client,
34+
WorkflowExecutionDescription,
3435
WorkflowExecutionStatus,
3536
WorkflowHandle,
3637
WorkflowUpdateFailedError,
3738
WorkflowUpdateRPCTimeoutOrCancelledError,
39+
WorkflowUpdateStage,
3840
)
3941
from temporalio.converter import DataConverter, PayloadConverter
4042
from temporalio.service import RPCError, RPCStatusCode
4143

4244
from ._topic_handle import TopicHandle
4345
from ._types import (
46+
STREAM_DRAINING_ERROR_TYPE,
47+
TRUNCATED_OFFSET_ERROR_TYPE,
4448
PollInput,
4549
PollResult,
4650
PublishEntry,
@@ -127,6 +131,10 @@ def __init__(
127131
self._pending_seq: int = 0
128132
self._pending_since: float | None = None
129133
self._topic_types: dict[str, type[Any]] = {}
134+
# Run id the most recent poll's update was admitted to. Captured before
135+
# waiting for the outcome so a mid-poll continue-as-new can be detected by
136+
# describing that specific run. None until the first poll is admitted.
137+
self._polled_run_id: str | None = None
130138

131139
@classmethod
132140
def create(
@@ -504,9 +512,11 @@ async def subscribe(
504512
``Payload`` — useful for heterogeneous topics where
505513
the caller dispatches on ``Payload.metadata`` or wants
506514
to forward the bytes without decoding.
507-
poll_cooldown: Minimum interval between polls to avoid
508-
overwhelming the workflow when items arrive faster
509-
than the poll round-trip. Defaults to 100ms.
515+
poll_cooldown: Minimum interval between polls when caught
516+
up (backlogs always drain at full speed). Defaults to
517+
100ms. Avoid ``timedelta(0)``: an idle subscriber
518+
busy-loops, and each poll grows workflow history toward
519+
its limit. Use 0 only in tests.
510520
511521
Yields:
512522
:class:`WorkflowStreamItem` for each matching item.
@@ -528,22 +538,37 @@ async def subscribe(
528538
offset = from_offset
529539
while True:
530540
try:
531-
result: PollResult = await self._handle.execute_update(
541+
# Wait only for ACCEPTED so the handle (and the run id it was
542+
# admitted to) is available before we block on the outcome; if
543+
# the run continues-as-new mid-poll, result() fails but we still
544+
# know which run to inspect.
545+
handle = await self._handle.start_update(
532546
"__temporal_workflow_stream_poll",
533547
PollInput(topics=topic_filter, from_offset=offset),
548+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
534549
result_type=PollResult,
535550
)
551+
self._polled_run_id = handle.workflow_run_id
552+
result: PollResult = await handle.result()
536553
except asyncio.CancelledError:
537554
return
538555
except WorkflowUpdateFailedError as e:
539556
cause_type = getattr(e.cause, "type", None)
540-
if cause_type == "TruncatedOffset":
557+
if cause_type == TRUNCATED_OFFSET_ERROR_TYPE:
541558
# Subscriber fell behind truncation. Retry from
542559
# offset 0 which the stream treats as "from the
543560
# beginning of whatever exists" (i.e., from
544561
# base_offset).
545562
offset = 0
546563
continue
564+
if cause_type == STREAM_DRAINING_ERROR_TYPE:
565+
# Workflow is detaching for continue-as-new. Back off and
566+
# retry; the poll lands on the successor run once the
567+
# rollover completes.
568+
cooldown_secs = poll_cooldown.total_seconds()
569+
if cooldown_secs > 0:
570+
await asyncio.sleep(cooldown_secs)
571+
continue
547572
if cause_type == "AcceptedUpdateCompletedWorkflow":
548573
# Workflow returned (or continued-as-new) before
549574
# this poll's update completed. Either follow the
@@ -586,15 +611,32 @@ async def subscribe(
586611
if not result.more_ready and cooldown_secs > 0:
587612
await asyncio.sleep(cooldown_secs)
588613

614+
async def _describe_polled_run(self) -> WorkflowExecutionDescription:
615+
"""Describe the specific run the most recent poll was admitted to.
616+
617+
Describing that run (rather than the latest) is what lets a
618+
continue-as-new be detected: a rolled-over run is closed with status
619+
CONTINUED_AS_NEW, whereas the latest run would report RUNNING. Falls
620+
back to the latest run when no run id has been captured yet, or when no
621+
client is available to target a specific run.
622+
"""
623+
if self._client is not None:
624+
return await self._client.get_workflow_handle(
625+
self._workflow_id, run_id=self._polled_run_id
626+
).describe()
627+
return await self._handle.describe()
628+
589629
async def _follow_continue_as_new(self) -> bool:
590-
"""Check if the workflow continued-as-new and re-target the handle.
630+
"""Check if the polled run continued-as-new and re-target the handle.
591631
592-
Returns True if the handle was updated (caller should retry).
632+
Returns True if the handle was updated (caller should retry). The
633+
successor run id is not needed — re-targeting to an unpinned handle
634+
makes the next poll address the latest (successor) run.
593635
"""
594636
if self._client is None:
595637
return False
596638
try:
597-
desc = await self._handle.describe()
639+
desc = await self._describe_polled_run()
598640
except Exception:
599641
return False
600642
if desc.status == WorkflowExecutionStatus.CONTINUED_AS_NEW:
@@ -603,14 +645,14 @@ async def _follow_continue_as_new(self) -> bool:
603645
return False
604646

605647
async def _workflow_in_terminal_state(self) -> bool:
606-
"""Return True if the workflow has reached a terminal state.
648+
"""Return True if the polled run has reached a terminal state.
607649
608650
Used by ``subscribe()`` to distinguish "workflow finished —
609651
stream is done" from "wrong workflow id" when a poll RPC
610652
returns NOT_FOUND.
611653
"""
612654
try:
613-
desc = await self._handle.describe()
655+
desc = await self._describe_polled_run()
614656
except Exception:
615657
return False
616658
return desc.status in (

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838
from ._topic_handle import WorkflowTopicHandle
3939
from ._types import (
40+
STREAM_DRAINING_ERROR_TYPE,
41+
TRUNCATED_OFFSET_ERROR_TYPE,
4042
PollInput,
4143
PollResult,
4244
PublisherState,
@@ -357,7 +359,6 @@ def truncate(self, up_to_offset: int) -> None:
357359
f"Cannot truncate to offset {up_to_offset}: "
358360
f"valid range is [{self._base_offset}, {self._base_offset + len(self._log)})",
359361
type="TruncateOutOfRange",
360-
non_retryable=True,
361362
)
362363
self._log = self._log[log_index:]
363364
self._base_offset = up_to_offset
@@ -419,8 +420,7 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
419420
raise ApplicationError(
420421
f"Requested offset {payload.from_offset} has been truncated. "
421422
f"Current base offset is {self._base_offset}.",
422-
type="TruncatedOffset",
423-
non_retryable=True,
423+
type=TRUNCATED_OFFSET_ERROR_TYPE,
424424
)
425425
all_new = self._log[log_offset:]
426426
if payload.topics:
@@ -460,9 +460,17 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
460460
)
461461

462462
def _validate_poll(self, _payload: PollInput) -> None:
463-
"""Reject new polls when pollers are detached for continue-as-new."""
463+
"""Reject new polls when pollers are detached for continue-as-new.
464+
465+
Uses the well-known ``StreamDraining`` type so a subscriber recognizes
466+
the rollover-in-progress and retries until its poll lands on the
467+
successor run, rather than surfacing the rejection as an error.
468+
"""
464469
if self._detaching:
465-
raise RuntimeError("Workflow pollers are detached for continue-as-new")
470+
raise ApplicationError(
471+
"Workflow pollers are detached for continue-as-new",
472+
type=STREAM_DRAINING_ERROR_TYPE,
473+
)
466474

467475
def _on_offset(self) -> int:
468476
"""Return the current global offset (base_offset + log length)."""

temporalio/contrib/workflow_streams/_types.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626

2727
T = TypeVar("T")
2828

29+
# Well-known ``ApplicationError.type`` values the stream workflow uses to reject
30+
# polls, and which ``WorkflowStreamClient.subscribe`` recognizes to drive retry
31+
# behavior. Defined here so the raise sites (``_stream.py``) and the handling
32+
# sites (``_client.py``) cannot diverge.
33+
STREAM_DRAINING_ERROR_TYPE = "StreamDraining"
34+
TRUNCATED_OFFSET_ERROR_TYPE = "TruncatedOffset"
35+
2936

3037
# basedpyright flags _-prefixed module-level functions as unused even when
3138
# sibling modules import them (_stream.py, _client.py). Vanilla pyright does

0 commit comments

Comments
 (0)