Skip to content

Commit 71b26be

Browse files
fix(ext-workflow): retry transient gRPC errors in wait_for_orchestration (#1069)
* fix(ext-workflow): retry transient gRPC errors in wait_for_orchestration_* wait_for_orchestration_start and wait_for_orchestration_completion call the workflow runtime through the local Dapr sidecar. Immediately after a sidecar restart (placement re-dissemination not yet applied, actor registration still propagating, etc.), the sidecar can return FAILED_PRECONDITION or UNAVAILABLE for an instance whose persistent state is intact. The previous implementation surfaced these as a hard error to the caller, so a client polling a long-running workflow would fail permanently even though the workflow itself was recoverable. Apply the same fix to both the sync and async clients: - TaskHubGrpcClient (sync) and AsyncTaskHubGrpcClient (async) both route their wait methods through a _call_with_transient_retry helper. The async variant uses asyncio.sleep; otherwise identical. - Retry FAILED_PRECONDITION and UNAVAILABLE with capped exponential backoff (0.5s, doubling, cap 5s). - Respect the caller's timeout. timeout in (0, None) means unbounded. The first call passes the user's timeout verbatim so behavior on a healthy runtime is unchanged. On retry, both the sleep and the per-call gRPC deadline are clamped to the remaining budget against a monotonic deadline anchored to the start of the loop — neither one can overshoot the user-provided timeout. - DEADLINE_EXCEEDED and budget exhaustion both surface as the public TimeoutError (preserved through a private _TransientTimeout sentinel; moved below the import block to satisfy E402). - Non-transient RpcErrors propagate immediately, unchanged. Behavior on a healthy runtime is unchanged: the first call succeeds and no retry loop runs. Adds tests covering the retry behaviors: retry-then-succeed for both transient codes, exhaustion surfacing as TimeoutError, and non-transient codes propagating without retry. Signed-off-by: Javier Aliaga <javier@diagrid.io> * fix(ext-workflow): bound transient retries and address review feedback Cap continuous transient-error retries in unbounded mode (timeout=0/None) at 30s via _MAX_TRANSIENT_RETRY_SECONDS, then re-raise the original RpcError. This preserves the pre-retry contract: timeout=0 still waits indefinitely for a healthy workflow and never raises TimeoutError, but a permanently-unavailable sidecar now surfaces the original error instead of retrying forever. Also address review feedback: - Type wait_for_orchestration_* timeout as Optional[int] (None is a supported, tested input meaning unbounded). - Fix sync "up to Nones" log message to treat None as indefinite, matching the async client. - Correct the retry-helper docstring: the first call passes grpc_timeout (None when unbounded), not the timeout value verbatim. Add a test covering unbounded-mode transient exhaustion surfacing as the original RpcError (not TimeoutError, not a hang). Signed-off-by: Javier Aliaga <javier@diagrid.io> --------- Signed-off-by: Javier Aliaga <javier@diagrid.io> Co-authored-by: Sam <sam@diagrid.io>
1 parent 2d1ec77 commit 71b26be

3 files changed

Lines changed: 337 additions & 50 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/aio/client.py

Lines changed: 100 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12+
import asyncio
1213
import logging
14+
import time
1315
import uuid
1416
from datetime import datetime
1517
from typing import Any, Optional, Sequence, Union
@@ -33,6 +35,7 @@
3335
TOutput,
3436
WorkflowIdReusePolicy,
3537
WorkflowState,
38+
_TransientTimeout,
3639
new_orchestration_state,
3740
)
3841
from google.protobuf import wrappers_pb2
@@ -120,34 +123,33 @@ async def get_orchestration_state(
120123
return new_orchestration_state(req.instanceId, res)
121124

122125
async def wait_for_orchestration_start(
123-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
126+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
124127
) -> Optional[WorkflowState]:
125128
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
126-
try:
127-
grpc_timeout = None if timeout == 0 else timeout
128-
self._logger.info(
129-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
130-
)
129+
self._logger.info(
130+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
131+
)
132+
133+
async def _call(grpc_timeout):
131134
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
132135
req, timeout=grpc_timeout
133136
)
134137
return new_orchestration_state(req.instanceId, res)
135-
except grpc.RpcError as rpc_error:
136-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
137-
# Replace gRPC error with the built-in TimeoutError
138-
raise TimeoutError('Timed-out waiting for the orchestration to start')
139-
else:
140-
raise
138+
139+
try:
140+
return await self._call_with_transient_retry(instance_id, timeout, _call)
141+
except _TransientTimeout:
142+
raise TimeoutError('Timed-out waiting for the orchestration to start')
141143

142144
async def wait_for_orchestration_completion(
143-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
145+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
144146
) -> Optional[WorkflowState]:
145147
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
146-
try:
147-
grpc_timeout = None if timeout == 0 else timeout
148-
self._logger.info(
149-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
150-
)
148+
self._logger.info(
149+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
150+
)
151+
152+
async def _call(grpc_timeout):
151153
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
152154
req, timeout=grpc_timeout
153155
)
@@ -167,14 +169,87 @@ async def wait_for_orchestration_completion(
167169
self._logger.info(f"Instance '{instance_id}' was terminated.")
168170
elif state.runtime_status == OrchestrationStatus.COMPLETED:
169171
self._logger.info(f"Instance '{instance_id}' completed.")
170-
171172
return state
172-
except grpc.RpcError as rpc_error:
173-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
174-
# Replace gRPC error with the built-in TimeoutError
175-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
176-
else:
177-
raise
173+
174+
try:
175+
return await self._call_with_transient_retry(instance_id, timeout, _call)
176+
except _TransientTimeout:
177+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
178+
179+
# Transient gRPC codes that indicate the workflow runtime is temporarily
180+
# unable to locate the workflow actor — typically immediately after a Dapr
181+
# sidecar restart (e.g. recovery from chaos). The placement service has the
182+
# actor registration, but local daprd hasn't received the dissemination yet.
183+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
184+
# though the workflow runtime state is intact.
185+
_TRANSIENT_RPC_CODES = (
186+
grpc.StatusCode.FAILED_PRECONDITION,
187+
grpc.StatusCode.UNAVAILABLE,
188+
)
189+
190+
# See TaskHubGrpcClient._MAX_TRANSIENT_RETRY_SECONDS — same grace window for
191+
# unbounded (timeout=0) callers so a down sidecar surfaces the original
192+
# error instead of retrying forever.
193+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
194+
195+
async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
196+
"""Async mirror of TaskHubGrpcClient._call_with_transient_retry.
197+
Retries FAILED_PRECONDITION/UNAVAILABLE with capped exponential
198+
backoff while clamping sleep and per-call gRPC timeout to the
199+
remaining budget. The first call uses the caller's timeout unchanged
200+
(``None`` when unbounded) so callers observe identical behavior on a
201+
healthy runtime. In unbounded
202+
mode, continuous transient retries are capped at
203+
``_MAX_TRANSIENT_RETRY_SECONDS`` before the original error propagates.
204+
"""
205+
unbounded = timeout in (0, None)
206+
deadline = None if unbounded else time.monotonic() + timeout
207+
grpc_timeout = None if unbounded else timeout
208+
backoff = 0.5
209+
transient_deadline = None # unbounded mode only; anchored on first transient
210+
while True:
211+
try:
212+
return await call_fn(grpc_timeout)
213+
except grpc.RpcError as rpc_error:
214+
code = rpc_error.code() # type: ignore
215+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
216+
raise _TransientTimeout()
217+
if code not in self._TRANSIENT_RPC_CODES:
218+
raise
219+
220+
now = time.monotonic()
221+
222+
if unbounded:
223+
if transient_deadline is None:
224+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
225+
elif now >= transient_deadline:
226+
raise
227+
228+
if deadline is None:
229+
remaining = None
230+
else:
231+
remaining = deadline - now
232+
if remaining <= 0:
233+
raise _TransientTimeout()
234+
235+
sleep_for = min(backoff, 5.0)
236+
if remaining is not None:
237+
sleep_for = min(sleep_for, remaining)
238+
if transient_deadline is not None:
239+
sleep_for = min(sleep_for, transient_deadline - now)
240+
self._logger.warning(
241+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
242+
f'retrying in {sleep_for:.2f}s'
243+
)
244+
await asyncio.sleep(sleep_for)
245+
backoff = min(backoff * 2, 5.0)
246+
247+
if deadline is None:
248+
grpc_timeout = None
249+
else:
250+
grpc_timeout = deadline - time.monotonic()
251+
if grpc_timeout <= 0:
252+
raise _TransientTimeout()
178253

179254
async def raise_orchestration_event(
180255
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/client.py

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# limitations under the License.
1111

1212
import logging
13+
import time
1314
import uuid
1415
from dataclasses import dataclass
1516
from datetime import datetime
@@ -25,6 +26,12 @@
2526
from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2627
from google.protobuf import wrappers_pb2
2728

29+
30+
class _TransientTimeout(Exception):
31+
"""Internal sentinel: the retry loop exhausted the user-provided timeout
32+
budget. Callers convert this to a public ``TimeoutError``."""
33+
34+
2835
TInput = TypeVar('TInput')
2936
TOutput = TypeVar('TOutput')
3037

@@ -217,32 +224,31 @@ def get_orchestration_state(
217224
return new_orchestration_state(req.instanceId, res)
218225

219226
def wait_for_orchestration_start(
220-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
227+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
221228
) -> Optional[WorkflowState]:
222229
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
223-
try:
224-
grpc_timeout = None if timeout == 0 else timeout
225-
self._logger.info(
226-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
227-
)
230+
self._logger.info(
231+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
232+
)
233+
234+
def _call(grpc_timeout):
228235
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
229236
return new_orchestration_state(req.instanceId, res)
230-
except grpc.RpcError as rpc_error:
231-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
232-
# Replace gRPC error with the built-in TimeoutError
233-
raise TimeoutError('Timed-out waiting for the orchestration to start')
234-
else:
235-
raise
237+
238+
try:
239+
return self._call_with_transient_retry(instance_id, timeout, _call)
240+
except _TransientTimeout:
241+
raise TimeoutError('Timed-out waiting for the orchestration to start')
236242

237243
def wait_for_orchestration_completion(
238-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
244+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
239245
) -> Optional[WorkflowState]:
240246
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
241-
try:
242-
grpc_timeout = None if timeout == 0 else timeout
243-
self._logger.info(
244-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
245-
)
247+
self._logger.info(
248+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
249+
)
250+
251+
def _call(grpc_timeout):
246252
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
247253
req, timeout=grpc_timeout
248254
)
@@ -262,14 +268,100 @@ def wait_for_orchestration_completion(
262268
self._logger.info(f"Instance '{instance_id}' was terminated.")
263269
elif state.runtime_status == OrchestrationStatus.COMPLETED:
264270
self._logger.info(f"Instance '{instance_id}' completed.")
265-
266271
return state
267-
except grpc.RpcError as rpc_error:
268-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
269-
# Replace gRPC error with the built-in TimeoutError
270-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
271-
else:
272-
raise
272+
273+
try:
274+
return self._call_with_transient_retry(instance_id, timeout, _call)
275+
except _TransientTimeout:
276+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
277+
278+
# Transient gRPC codes that indicate the workflow runtime is temporarily
279+
# unable to locate the workflow actor — typically immediately after a Dapr
280+
# sidecar restart (e.g. recovery from chaos). The placement service has the
281+
# actor registration, but local daprd hasn't received the dissemination yet.
282+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
283+
# though the workflow runtime state is intact.
284+
_TRANSIENT_RPC_CODES = (
285+
grpc.StatusCode.FAILED_PRECONDITION,
286+
grpc.StatusCode.UNAVAILABLE,
287+
)
288+
289+
# When the caller sets no timeout (timeout=0), bound how long we keep
290+
# retrying *consecutive* transient errors so a permanently-unavailable
291+
# sidecar surfaces the original error instead of hanging forever. This
292+
# window comfortably covers placement re-dissemination after a restart;
293+
# a slow-but-healthy workflow never enters this path (it just blocks in
294+
# the long-poll), so its indefinite wait is preserved.
295+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
296+
297+
def _call_with_transient_retry(self, instance_id, timeout, call_fn):
298+
"""Run a gRPC wait call, retrying transient errors until the user
299+
timeout deadline. Re-raises non-transient errors immediately.
300+
timeout in (0, None) means unbounded; transients are still retried,
301+
but only for up to ``_MAX_TRANSIENT_RETRY_SECONDS`` of continuous
302+
failures, after which the original transient error propagates.
303+
304+
The first call passes the caller's ``grpc_timeout`` (``None`` when
305+
unbounded) to ``call_fn`` so callers observe identical behavior to a
306+
non-retrying client when no transient occurs (preserves prior public
307+
behavior). On a retry, both the sleep
308+
and the per-call gRPC deadline are clamped to the remaining budget so
309+
the helper never sleeps past ``timeout`` or starts a gRPC call with
310+
no time left.
311+
"""
312+
unbounded = timeout in (0, None)
313+
deadline = None if unbounded else time.monotonic() + timeout
314+
grpc_timeout = None if unbounded else timeout
315+
backoff = 0.5
316+
transient_deadline = None # unbounded mode only; anchored on first transient
317+
while True:
318+
try:
319+
return call_fn(grpc_timeout)
320+
except grpc.RpcError as rpc_error:
321+
code = rpc_error.code() # type: ignore
322+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
323+
raise _TransientTimeout()
324+
if code not in self._TRANSIENT_RPC_CODES:
325+
raise
326+
327+
now = time.monotonic()
328+
329+
# In unbounded mode the user budget can't end the loop, so cap
330+
# continuous transient retries and re-raise the original error
331+
# (matching pre-retry behavior) once the grace window elapses.
332+
if unbounded:
333+
if transient_deadline is None:
334+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
335+
elif now >= transient_deadline:
336+
raise
337+
338+
# Compute remaining budget once and reuse so the sleep and the
339+
# next per-call grpc_timeout agree on "how much time is left".
340+
if deadline is None:
341+
remaining = None
342+
else:
343+
remaining = deadline - now
344+
if remaining <= 0:
345+
raise _TransientTimeout()
346+
347+
sleep_for = min(backoff, 5.0)
348+
if remaining is not None:
349+
sleep_for = min(sleep_for, remaining)
350+
if transient_deadline is not None:
351+
sleep_for = min(sleep_for, transient_deadline - now)
352+
self._logger.warning(
353+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
354+
f'retrying in {sleep_for:.2f}s'
355+
)
356+
time.sleep(sleep_for)
357+
backoff = min(backoff * 2, 5.0)
358+
359+
if deadline is None:
360+
grpc_timeout = None
361+
else:
362+
grpc_timeout = deadline - time.monotonic()
363+
if grpc_timeout <= 0:
364+
raise _TransientTimeout()
273365

274366
def raise_orchestration_event(
275367
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

0 commit comments

Comments
 (0)