Skip to content

Commit cb75ced

Browse files
brianstrauchclaude
andcommitted
Retry poll rejected while stream is draining for continue-as-new
The poll update's validator rejected new polls during detach-for-CAN with an untyped RuntimeError, which subscribe() did not classify and re-raised — ending the subscription with an error during a routine rollover. Give the validator the well-known StreamDraining ApplicationError type and have subscribe() back off and retry on it, so the poll lands on the successor run once the rollover completes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 9eda6e7 commit cb75ced

3 files changed

Lines changed: 131 additions & 2 deletions

File tree

temporalio/contrib/workflow_streams/_client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,14 @@ async def subscribe(
556556
# base_offset).
557557
offset = 0
558558
continue
559+
if cause_type == "StreamDraining":
560+
# Workflow is detaching for continue-as-new. Back off and
561+
# retry; the poll lands on the successor run once the
562+
# rollover completes.
563+
cooldown_secs = poll_cooldown.total_seconds()
564+
if cooldown_secs > 0:
565+
await asyncio.sleep(cooldown_secs)
566+
continue
559567
if cause_type == "AcceptedUpdateCompletedWorkflow":
560568
# Workflow returned (or continued-as-new) before
561569
# this poll's update completed. Either follow the

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,18 @@ async def _on_poll(self, payload: PollInput) -> PollResult:
460460
)
461461

462462
def _validate_poll(self, _payload: PollInput) -> None:
463-
"""Reject new polls when pollers are detached for continue-as-new."""
463+
"""Reject new polls when pollers are detached for continue-as-new.
464+
465+
Uses the well-known ``StreamDraining`` type so a subscriber recognizes
466+
the rollover-in-progress and retries until its poll lands on the
467+
successor run, rather than surfacing the rejection as an error.
468+
"""
464469
if self._detaching:
465-
raise RuntimeError("Workflow pollers are detached for continue-as-new")
470+
raise ApplicationError(
471+
"Workflow pollers are detached for continue-as-new",
472+
type="StreamDraining",
473+
non_retryable=True,
474+
)
466475

467476
def _on_offset(self) -> int:
468477
"""Return the current global offset (base_offset + log length)."""

tests/contrib/workflow_streams/test_workflow_streams.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,118 @@ async def test_follow_continue_as_new_describes_polled_run(client: Client) -> No
20662066
await new_handle.signal(ContinueAsNewHelperWorkflow.close)
20672067

20682068

2069+
@workflow.defn
2070+
class DrainingGateWorkflow:
2071+
"""CAN workflow that detaches pollers and then *holds* in the draining state
2072+
until released, so a subscriber deterministically hits the draining poll
2073+
rejection before the rollover completes."""
2074+
2075+
@workflow.init
2076+
def __init__(self, input: CANWorkflowInputTyped) -> None:
2077+
self.stream = WorkflowStream(prior_state=input.stream_state)
2078+
self._should_continue = False
2079+
self._release = False
2080+
self._closed = False
2081+
2082+
@workflow.signal
2083+
def close(self) -> None:
2084+
self._closed = True
2085+
2086+
@workflow.signal
2087+
def trigger_continue(self) -> None:
2088+
self._should_continue = True
2089+
2090+
@workflow.signal
2091+
def release(self) -> None:
2092+
self._release = True
2093+
2094+
@workflow.run
2095+
async def run(self, _input: CANWorkflowInputTyped) -> None:
2096+
del _input
2097+
await workflow.wait_condition(lambda: self._should_continue or self._closed)
2098+
if self._closed:
2099+
return
2100+
# Detach but stay open until released, so new polls are rejected with
2101+
# StreamDraining for a deterministic window.
2102+
self.stream.detach_pollers()
2103+
await workflow.wait_condition(lambda: self._release)
2104+
await workflow.wait_condition(workflow.all_handlers_finished)
2105+
workflow.continue_as_new(
2106+
args=[CANWorkflowInputTyped(stream_state=self.stream.get_state())]
2107+
)
2108+
2109+
2110+
@pytest.mark.asyncio
2111+
async def test_subscribe_retries_while_draining(client: Client) -> None:
2112+
"""A poll rejected because the stream is draining for continue-as-new must
2113+
be retried, not surfaced as an error: the subscription stays alive through
2114+
the rollover and resumes on the successor run."""
2115+
async with new_worker(client, DrainingGateWorkflow) as worker:
2116+
handle = await client.start_workflow(
2117+
DrainingGateWorkflow.run,
2118+
CANWorkflowInputTyped(),
2119+
id=f"workflow-stream-draining-{uuid.uuid4()}",
2120+
task_queue=worker.task_queue,
2121+
)
2122+
await handle.signal(
2123+
"__temporal_workflow_stream_publish",
2124+
PublishInput(
2125+
items=[PublishEntry(topic="events", data=_wire_bytes(b"item-0"))],
2126+
publisher_id="pub",
2127+
sequence=1,
2128+
),
2129+
)
2130+
2131+
stream = WorkflowStreamClient.create(client, handle.id)
2132+
received: list[WorkflowStreamItem] = []
2133+
2134+
async def consume() -> None:
2135+
async for item in stream.subscribe(
2136+
from_offset=0, poll_cooldown=timedelta(0), result_type=bytes
2137+
):
2138+
received.append(item)
2139+
2140+
async def received_count() -> int:
2141+
return len(received)
2142+
2143+
task = asyncio.create_task(consume())
2144+
new_handle = client.get_workflow_handle(handle.id)
2145+
try:
2146+
await assert_eq_eventually(1, received_count)
2147+
2148+
# Detach; the subscriber's polls are now rejected with StreamDraining.
2149+
await handle.signal(DrainingGateWorkflow.trigger_continue)
2150+
# The subscription must keep retrying, not error out.
2151+
await asyncio.sleep(1.0)
2152+
assert not task.done(), "draining rejection must not end the subscription"
2153+
2154+
# Release: the workflow continues-as-new; the subscription resumes on
2155+
# the successor run and receives an item published there.
2156+
await handle.signal(DrainingGateWorkflow.release)
2157+
await assert_eq_eventually(
2158+
True, lambda: _is_different_run(handle, new_handle)
2159+
)
2160+
await new_handle.signal(
2161+
"__temporal_workflow_stream_publish",
2162+
PublishInput(
2163+
items=[PublishEntry(topic="events", data=_wire_bytes(b"item-1"))],
2164+
publisher_id="pub",
2165+
sequence=2,
2166+
),
2167+
)
2168+
2169+
await assert_eq_eventually(2, received_count)
2170+
assert [i.data for i in received] == [b"item-0", b"item-1"]
2171+
assert [i.offset for i in received] == [0, 1]
2172+
finally:
2173+
task.cancel()
2174+
try:
2175+
await task
2176+
except asyncio.CancelledError:
2177+
pass
2178+
await new_handle.signal(DrainingGateWorkflow.close)
2179+
2180+
20692181
# ---------------------------------------------------------------------------
20702182
# Cross-workflow workflow stream (Scenario 1)
20712183
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)