Skip to content

Commit b66c2be

Browse files
brianstrauchclaude
andcommitted
Address review feedback on workflow_streams continue-as-new fix
- Hoist StreamDraining/TruncatedOffset error types to constants in _types.py so the raise sites (_stream.py) and handling sites (_client.py) cannot diverge. - Annotate _describe_polled_run return type. - Drop dead non_retryable=True flags: all three ApplicationErrors are raised from update validators/handlers, where retry policy does not apply, so the flag was a no-op. - Document poll_cooldown semantics and warn against timedelta(0). - Add test verifying the subscribe flow captures the polled run id. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent fe5f03d commit b66c2be

4 files changed

Lines changed: 73 additions & 11 deletions

File tree

temporalio/contrib/workflow_streams/_client.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from temporalio.api.common.v1 import Payload
3232
from temporalio.client import (
3333
Client,
34+
WorkflowExecutionDescription,
3435
WorkflowExecutionStatus,
3536
WorkflowHandle,
3637
WorkflowUpdateFailedError,
@@ -42,6 +43,8 @@
4243

4344
from ._topic_handle import TopicHandle
4445
from ._types import (
46+
STREAM_DRAINING_ERROR_TYPE,
47+
TRUNCATED_OFFSET_ERROR_TYPE,
4548
PollInput,
4649
PollResult,
4750
PublishEntry,
@@ -509,9 +512,11 @@ async def subscribe(
509512
``Payload`` — useful for heterogeneous topics where
510513
the caller dispatches on ``Payload.metadata`` or wants
511514
to forward the bytes without decoding.
512-
poll_cooldown: Minimum interval between polls to avoid
513-
overwhelming the workflow when items arrive faster
514-
than the poll round-trip. Defaults to 100ms.
515+
poll_cooldown: Minimum interval between polls when caught
516+
up (backlogs always drain at full speed). Defaults to
517+
100ms. Avoid ``timedelta(0)``: an idle subscriber
518+
busy-loops, and each poll grows workflow history toward
519+
its limit. Use 0 only in tests.
515520
516521
Yields:
517522
:class:`WorkflowStreamItem` for each matching item.
@@ -549,14 +554,14 @@ async def subscribe(
549554
return
550555
except WorkflowUpdateFailedError as e:
551556
cause_type = getattr(e.cause, "type", None)
552-
if cause_type == "TruncatedOffset":
557+
if cause_type == TRUNCATED_OFFSET_ERROR_TYPE:
553558
# Subscriber fell behind truncation. Retry from
554559
# offset 0 which the stream treats as "from the
555560
# beginning of whatever exists" (i.e., from
556561
# base_offset).
557562
offset = 0
558563
continue
559-
if cause_type == "StreamDraining":
564+
if cause_type == STREAM_DRAINING_ERROR_TYPE:
560565
# Workflow is detaching for continue-as-new. Back off and
561566
# retry; the poll lands on the successor run once the
562567
# rollover completes.
@@ -606,7 +611,7 @@ async def subscribe(
606611
if not result.more_ready and cooldown_secs > 0:
607612
await asyncio.sleep(cooldown_secs)
608613

609-
async def _describe_polled_run(self):
614+
async def _describe_polled_run(self) -> WorkflowExecutionDescription:
610615
"""Describe the specific run the most recent poll was admitted to.
611616
612617
Describing that run (rather than the latest) is what lets a

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838
from ._topic_handle import WorkflowTopicHandle
3939
from ._types import (
40+
STREAM_DRAINING_ERROR_TYPE,
41+
TRUNCATED_OFFSET_ERROR_TYPE,
4042
PollInput,
4143
PollResult,
4244
PublisherState,
@@ -357,7 +359,6 @@ def truncate(self, up_to_offset: int) -> None:
357359
f"Cannot truncate to offset {up_to_offset}: "
358360
f"valid range is [{self._base_offset}, {self._base_offset + len(self._log)})",
359361
type="TruncateOutOfRange",
360-
non_retryable=True,
361362
)
362363
self._log = self._log[log_index:]
363364
self._base_offset = up_to_offset
@@ -419,8 +420,7 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
419420
raise ApplicationError(
420421
f"Requested offset {payload.from_offset} has been truncated. "
421422
f"Current base offset is {self._base_offset}.",
422-
type="TruncatedOffset",
423-
non_retryable=True,
423+
type=TRUNCATED_OFFSET_ERROR_TYPE,
424424
)
425425
all_new = self._log[log_offset:]
426426
if payload.topics:
@@ -469,8 +469,7 @@ def _validate_poll(self, _payload: PollInput) -> None:
469469
if self._detaching:
470470
raise ApplicationError(
471471
"Workflow pollers are detached for continue-as-new",
472-
type="StreamDraining",
473-
non_retryable=True,
472+
type=STREAM_DRAINING_ERROR_TYPE,
474473
)
475474

476475
def _on_offset(self) -> int:

temporalio/contrib/workflow_streams/_types.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626

2727
T = TypeVar("T")
2828

29+
# Well-known ``ApplicationError.type`` values the stream workflow uses to reject
30+
# polls, and which ``WorkflowStreamClient.subscribe`` recognizes to drive retry
31+
# behavior. Defined here so the raise sites (``_stream.py``) and the handling
32+
# sites (``_client.py``) cannot diverge.
33+
STREAM_DRAINING_ERROR_TYPE = "StreamDraining"
34+
TRUNCATED_OFFSET_ERROR_TYPE = "TruncatedOffset"
35+
2936

3037
# basedpyright flags _-prefixed module-level functions as unused even when
3138
# sibling modules import them (_stream.py, _client.py). Vanilla pyright does

tests/contrib/workflow_streams/test_workflow_streams.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,57 @@ async def test_follow_continue_as_new_describes_polled_run(client: Client) -> No
20662066
await new_handle.signal(ContinueAsNewHelperWorkflow.close)
20672067

20682068

2069+
@pytest.mark.asyncio
2070+
async def test_subscribe_captures_polled_run_id(client: Client) -> None:
2071+
"""The ``subscribe`` loop must record the run each poll was admitted to.
2072+
2073+
``_follow_continue_as_new`` relies on ``_polled_run_id`` to describe the
2074+
*polled* run rather than the latest one (see
2075+
``test_follow_continue_as_new_describes_polled_run``). This verifies the
2076+
real subscribe flow populates that field, rather than setting it by hand.
2077+
"""
2078+
async with new_worker(client, BasicWorkflowStreamWorkflow) as worker:
2079+
handle = await client.start_workflow(
2080+
BasicWorkflowStreamWorkflow.run,
2081+
id=f"workflow-stream-polled-run-{uuid.uuid4()}",
2082+
task_queue=worker.task_queue,
2083+
)
2084+
await handle.signal(
2085+
"__temporal_workflow_stream_publish",
2086+
PublishInput(
2087+
items=[PublishEntry(topic="events", data=_wire_bytes(b"item-0"))],
2088+
publisher_id="pub",
2089+
sequence=1,
2090+
),
2091+
)
2092+
2093+
stream = WorkflowStreamClient.create(client, handle.id)
2094+
received: list[WorkflowStreamItem] = []
2095+
2096+
async def consume() -> None:
2097+
async for item in stream.subscribe(
2098+
from_offset=0, poll_cooldown=timedelta(0), result_type=bytes
2099+
):
2100+
received.append(item)
2101+
2102+
async def received_count() -> int:
2103+
return len(received)
2104+
2105+
task = asyncio.create_task(consume())
2106+
try:
2107+
await assert_eq_eventually(1, received_count)
2108+
# The poll was admitted to the run we started; subscribe must have
2109+
# captured exactly that run id, not left it unset.
2110+
assert stream._polled_run_id == handle.result_run_id
2111+
finally:
2112+
task.cancel()
2113+
try:
2114+
await task
2115+
except asyncio.CancelledError:
2116+
pass
2117+
await handle.signal(BasicWorkflowStreamWorkflow.close)
2118+
2119+
20692120
@workflow.defn
20702121
class DrainingGateWorkflow:
20712122
"""CAN workflow that detaches pollers and then *holds* in the draining state

0 commit comments

Comments
 (0)