Skip to content

Commit 09f45af

Browse files
committed
fix(ext-workflow): retry transient gRPC errors in wait_for_orchestration_*
wait_for_orchestration_start and wait_for_orchestration_completion call the workflow runtime through the local Dapr sidecar. Immediately after a sidecar restart (placement re-dissemination not yet applied, actor registration still propagating, etc.), the sidecar can return FAILED_PRECONDITION or UNAVAILABLE for an instance whose persistent state is intact. The previous implementation surfaced these as a hard error to the caller, so a client polling a long-running workflow would fail permanently even though the workflow itself was recoverable. Apply the same fix to both the sync and async clients: - TaskHubGrpcClient (sync) and AsyncTaskHubGrpcClient (async) both route their wait methods through a _call_with_transient_retry helper. The async variant uses asyncio.sleep; otherwise identical. - Retry FAILED_PRECONDITION and UNAVAILABLE with capped exponential backoff (0.5s, doubling, cap 5s). - Respect the caller's timeout. timeout in (0, None) means unbounded. The first call passes the user's timeout verbatim so behavior on a healthy runtime is unchanged. On retry, both the sleep and the per-call gRPC deadline are clamped to the remaining budget against a monotonic deadline anchored to the start of the loop — neither one can overshoot the user-provided timeout. - DEADLINE_EXCEEDED and budget exhaustion both surface as the public TimeoutError (preserved through a private _TransientTimeout sentinel; moved below the import block to satisfy E402). - Non-transient RpcErrors propagate immediately, unchanged. Behavior on a healthy runtime is unchanged: the first call succeeds and no retry loop runs. Adds tests covering the retry behaviors: retry-then-succeed for both transient codes, exhaustion surfacing as TimeoutError, and non-transient codes propagating without retry. Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent ad59aa8 commit 09f45af

3 files changed

Lines changed: 260 additions & 46 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/aio/client.py

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12+
import asyncio
1213
import logging
14+
import time
1315
import uuid
1416
from datetime import datetime
1517
from typing import Any, Optional, Sequence, Union
@@ -33,6 +35,7 @@
3335
TOutput,
3436
WorkflowIdReusePolicy,
3537
WorkflowState,
38+
_TransientTimeout,
3639
new_orchestration_state,
3740
)
3841
from google.protobuf import wrappers_pb2
@@ -123,31 +126,30 @@ async def wait_for_orchestration_start(
123126
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
124127
) -> Optional[WorkflowState]:
125128
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
126-
try:
127-
grpc_timeout = None if timeout == 0 else timeout
128-
self._logger.info(
129-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
130-
)
129+
self._logger.info(
130+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
131+
)
132+
133+
async def _call(grpc_timeout):
131134
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
132135
req, timeout=grpc_timeout
133136
)
134137
return new_orchestration_state(req.instanceId, res)
135-
except grpc.RpcError as rpc_error:
136-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
137-
# Replace gRPC error with the built-in TimeoutError
138-
raise TimeoutError('Timed-out waiting for the orchestration to start')
139-
else:
140-
raise
138+
139+
try:
140+
return await self._call_with_transient_retry(instance_id, timeout, _call)
141+
except _TransientTimeout:
142+
raise TimeoutError('Timed-out waiting for the orchestration to start')
141143

142144
async def wait_for_orchestration_completion(
143145
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
144146
) -> Optional[WorkflowState]:
145147
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
146-
try:
147-
grpc_timeout = None if timeout == 0 else timeout
148-
self._logger.info(
149-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
150-
)
148+
self._logger.info(
149+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
150+
)
151+
152+
async def _call(grpc_timeout):
151153
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
152154
req, timeout=grpc_timeout
153155
)
@@ -167,14 +169,68 @@ async def wait_for_orchestration_completion(
167169
self._logger.info(f"Instance '{instance_id}' was terminated.")
168170
elif state.runtime_status == OrchestrationStatus.COMPLETED:
169171
self._logger.info(f"Instance '{instance_id}' completed.")
170-
171172
return state
172-
except grpc.RpcError as rpc_error:
173-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
174-
# Replace gRPC error with the built-in TimeoutError
175-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
176-
else:
177-
raise
173+
174+
try:
175+
return await self._call_with_transient_retry(instance_id, timeout, _call)
176+
except _TransientTimeout:
177+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
178+
179+
# Transient gRPC codes that indicate the workflow runtime is temporarily
180+
# unable to locate the workflow actor — typically immediately after a Dapr
181+
# sidecar restart (e.g. recovery from chaos). The placement service has the
182+
# actor registration, but local daprd hasn't received the dissemination yet.
183+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
184+
# though the workflow runtime state is intact.
185+
_TRANSIENT_RPC_CODES = (
186+
grpc.StatusCode.FAILED_PRECONDITION,
187+
grpc.StatusCode.UNAVAILABLE,
188+
)
189+
190+
async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
191+
"""Async mirror of TaskHubGrpcClient._call_with_transient_retry.
192+
Retries FAILED_PRECONDITION/UNAVAILABLE with capped exponential
193+
backoff while clamping sleep and per-call gRPC timeout to the
194+
remaining budget. The first call passes ``timeout`` verbatim so
195+
callers observe identical behavior on a healthy runtime.
196+
"""
197+
unbounded = timeout in (0, None)
198+
deadline = None if unbounded else time.monotonic() + timeout
199+
grpc_timeout = None if unbounded else timeout
200+
backoff = 0.5
201+
while True:
202+
try:
203+
return await call_fn(grpc_timeout)
204+
except grpc.RpcError as rpc_error:
205+
code = rpc_error.code() # type: ignore
206+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
207+
raise _TransientTimeout()
208+
if code not in self._TRANSIENT_RPC_CODES:
209+
raise
210+
211+
if deadline is None:
212+
remaining = None
213+
else:
214+
remaining = deadline - time.monotonic()
215+
if remaining <= 0:
216+
raise _TransientTimeout()
217+
218+
sleep_for = min(backoff, 5.0)
219+
if remaining is not None:
220+
sleep_for = min(sleep_for, remaining)
221+
self._logger.warning(
222+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
223+
f'retrying in {sleep_for:.2f}s'
224+
)
225+
await asyncio.sleep(sleep_for)
226+
backoff = min(backoff * 2, 5.0)
227+
228+
if deadline is None:
229+
grpc_timeout = None
230+
else:
231+
grpc_timeout = deadline - time.monotonic()
232+
if grpc_timeout <= 0:
233+
raise _TransientTimeout()
178234

179235
async def raise_orchestration_event(
180236
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/client.py

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# limitations under the License.
1111

1212
import logging
13+
import time
1314
import uuid
1415
from dataclasses import dataclass
1516
from datetime import datetime
@@ -25,6 +26,12 @@
2526
from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2627
from google.protobuf import wrappers_pb2
2728

29+
30+
class _TransientTimeout(Exception):
31+
"""Internal sentinel: the retry loop exhausted the user-provided timeout
32+
budget. Callers convert this to a public ``TimeoutError``."""
33+
34+
2835
TInput = TypeVar('TInput')
2936
TOutput = TypeVar('TOutput')
3037

@@ -220,29 +227,28 @@ def wait_for_orchestration_start(
220227
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
221228
) -> Optional[WorkflowState]:
222229
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
223-
try:
224-
grpc_timeout = None if timeout == 0 else timeout
225-
self._logger.info(
226-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
227-
)
230+
self._logger.info(
231+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
232+
)
233+
234+
def _call(grpc_timeout):
228235
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
229236
return new_orchestration_state(req.instanceId, res)
230-
except grpc.RpcError as rpc_error:
231-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
232-
# Replace gRPC error with the built-in TimeoutError
233-
raise TimeoutError('Timed-out waiting for the orchestration to start')
234-
else:
235-
raise
237+
238+
try:
239+
return self._call_with_transient_retry(instance_id, timeout, _call)
240+
except _TransientTimeout:
241+
raise TimeoutError('Timed-out waiting for the orchestration to start')
236242

237243
def wait_for_orchestration_completion(
238244
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
239245
) -> Optional[WorkflowState]:
240246
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
241-
try:
242-
grpc_timeout = None if timeout == 0 else timeout
243-
self._logger.info(
244-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
245-
)
247+
self._logger.info(
248+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
249+
)
250+
251+
def _call(grpc_timeout):
246252
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
247253
req, timeout=grpc_timeout
248254
)
@@ -262,14 +268,76 @@ def wait_for_orchestration_completion(
262268
self._logger.info(f"Instance '{instance_id}' was terminated.")
263269
elif state.runtime_status == OrchestrationStatus.COMPLETED:
264270
self._logger.info(f"Instance '{instance_id}' completed.")
265-
266271
return state
267-
except grpc.RpcError as rpc_error:
268-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
269-
# Replace gRPC error with the built-in TimeoutError
270-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
271-
else:
272-
raise
272+
273+
try:
274+
return self._call_with_transient_retry(instance_id, timeout, _call)
275+
except _TransientTimeout:
276+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
277+
278+
# Transient gRPC codes that indicate the workflow runtime is temporarily
279+
# unable to locate the workflow actor — typically immediately after a Dapr
280+
# sidecar restart (e.g. recovery from chaos). The placement service has the
281+
# actor registration, but local daprd hasn't received the dissemination yet.
282+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
283+
# though the workflow runtime state is intact.
284+
_TRANSIENT_RPC_CODES = (
285+
grpc.StatusCode.FAILED_PRECONDITION,
286+
grpc.StatusCode.UNAVAILABLE,
287+
)
288+
289+
def _call_with_transient_retry(self, instance_id, timeout, call_fn):
290+
"""Run a gRPC wait call, retrying transient errors until the user
291+
timeout deadline. Re-raises non-transient errors immediately.
292+
timeout in (0, None) means unbounded; we still retry transients with
293+
backoff.
294+
295+
The first call passes ``timeout`` verbatim to ``call_fn`` so callers
296+
observe identical behavior to a non-retrying client when no transient
297+
occurs (preserves prior public behavior). On a retry, both the sleep
298+
and the per-call gRPC deadline are clamped to the remaining budget so
299+
the helper never sleeps past ``timeout`` or starts a gRPC call with
300+
no time left.
301+
"""
302+
unbounded = timeout in (0, None)
303+
deadline = None if unbounded else time.monotonic() + timeout
304+
grpc_timeout = None if unbounded else timeout
305+
backoff = 0.5
306+
while True:
307+
try:
308+
return call_fn(grpc_timeout)
309+
except grpc.RpcError as rpc_error:
310+
code = rpc_error.code() # type: ignore
311+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
312+
raise _TransientTimeout()
313+
if code not in self._TRANSIENT_RPC_CODES:
314+
raise
315+
316+
# Compute remaining budget once and reuse so the sleep and the
317+
# next per-call grpc_timeout agree on "how much time is left".
318+
if deadline is None:
319+
remaining = None
320+
else:
321+
remaining = deadline - time.monotonic()
322+
if remaining <= 0:
323+
raise _TransientTimeout()
324+
325+
sleep_for = min(backoff, 5.0)
326+
if remaining is not None:
327+
sleep_for = min(sleep_for, remaining)
328+
self._logger.warning(
329+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
330+
f'retrying in {sleep_for:.2f}s'
331+
)
332+
time.sleep(sleep_for)
333+
backoff = min(backoff * 2, 5.0)
334+
335+
if deadline is None:
336+
grpc_timeout = None
337+
else:
338+
grpc_timeout = deadline - time.monotonic()
339+
if grpc_timeout <= 0:
340+
raise _TransientTimeout()
273341

274342
def raise_orchestration_event(
275343
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

ext/dapr-ext-workflow/tests/durabletask/test_orchestration_wait.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import time
12
from unittest.mock import Mock
23

4+
import grpc
35
import pytest
46
from dapr.ext.workflow._durabletask.client import TaskHubGrpcClient
57

@@ -66,3 +68,91 @@ def test_wait_for_orchestration_completion_timeout(timeout):
6668
assert kwargs.get('timeout') is None
6769
else:
6870
assert kwargs.get('timeout') == timeout
71+
72+
73+
def _make_rpc_error(code: grpc.StatusCode) -> grpc.RpcError:
74+
err = grpc.RpcError()
75+
err.code = lambda: code # type: ignore[method-assign]
76+
err.details = lambda: f'simulated {code.name}' # type: ignore[method-assign]
77+
return err
78+
79+
80+
@pytest.mark.parametrize(
81+
'transient_code', [grpc.StatusCode.FAILED_PRECONDITION, grpc.StatusCode.UNAVAILABLE]
82+
)
83+
def test_wait_for_orchestration_start_retries_transient_then_succeeds(transient_code, monkeypatch):
84+
"""Transient gRPC error on the first call → backoff → next call succeeds."""
85+
instance_id = 'test-instance'
86+
87+
from dapr.ext.workflow._durabletask.internal.protos import (
88+
ORCHESTRATION_STATUS_RUNNING,
89+
GetInstanceResponse,
90+
WorkflowState,
91+
)
92+
93+
response = GetInstanceResponse()
94+
state = WorkflowState()
95+
state.instanceId = instance_id
96+
state.workflowStatus = ORCHESTRATION_STATUS_RUNNING
97+
response.workflowState.CopyFrom(state)
98+
99+
sleeps = []
100+
monkeypatch.setattr(
101+
'dapr.ext.workflow._durabletask.client.time.sleep', lambda s: sleeps.append(s)
102+
)
103+
104+
calls = {'n': 0}
105+
106+
def fake_call(*args, **kwargs):
107+
calls['n'] += 1
108+
if calls['n'] == 1:
109+
raise _make_rpc_error(transient_code)
110+
return response
111+
112+
c = TaskHubGrpcClient()
113+
c._stub = Mock()
114+
c._stub.WaitForInstanceStart.side_effect = fake_call
115+
116+
# The point of this test is the retry behavior, not the response payload —
117+
# the second call returns successfully (no exception), the first transient
118+
# is absorbed, and exactly one backoff sleep happens between them.
119+
c.wait_for_orchestration_start(instance_id, timeout=10)
120+
assert calls['n'] == 2
121+
assert len(sleeps) == 1 and sleeps[0] > 0
122+
123+
124+
def test_wait_for_orchestration_start_transient_exhaustion_raises_timeout(monkeypatch):
125+
"""Transient gRPC errors keep returning until the user budget runs out
126+
→ public TimeoutError, not the raw RpcError."""
127+
instance_id = 'test-instance'
128+
129+
# Advance monotonic time on every call so the deadline is reached quickly.
130+
fake_time = [0.0]
131+
132+
def fake_monotonic():
133+
fake_time[0] += 0.6 # 0.0, 0.6, 1.2, ...
134+
return fake_time[0]
135+
136+
monkeypatch.setattr('dapr.ext.workflow._durabletask.client.time.monotonic', fake_monotonic)
137+
monkeypatch.setattr('dapr.ext.workflow._durabletask.client.time.sleep', lambda s: None)
138+
139+
c = TaskHubGrpcClient()
140+
c._stub = Mock()
141+
c._stub.WaitForInstanceStart.side_effect = _make_rpc_error(grpc.StatusCode.UNAVAILABLE)
142+
143+
with pytest.raises(TimeoutError):
144+
c.wait_for_orchestration_start(instance_id, timeout=1)
145+
146+
147+
def test_wait_for_orchestration_start_non_transient_propagates(monkeypatch):
148+
"""Non-transient gRPC errors must NOT be retried — propagate directly."""
149+
instance_id = 'test-instance'
150+
monkeypatch.setattr(time, 'sleep', lambda s: None)
151+
152+
c = TaskHubGrpcClient()
153+
c._stub = Mock()
154+
c._stub.WaitForInstanceStart.side_effect = _make_rpc_error(grpc.StatusCode.PERMISSION_DENIED)
155+
156+
with pytest.raises(grpc.RpcError):
157+
c.wait_for_orchestration_start(instance_id, timeout=10)
158+
assert c._stub.WaitForInstanceStart.call_count == 1

0 commit comments

Comments
 (0)