Skip to content

Commit e58655d

Browse files
committed
fix(ext-workflow): retry transient gRPC errors in wait_for_orchestration_*
wait_for_orchestration_start and wait_for_orchestration_completion call the workflow runtime through the local Dapr sidecar. Immediately after a sidecar restart (placement re-dissemination not yet applied, actor registration still propagating, etc.), the sidecar can return FAILED_PRECONDITION or UNAVAILABLE for an instance whose persistent state is intact. The previous implementation surfaced these as a hard error to the caller, so a client polling a long-running workflow would fail permanently even though the workflow itself was recoverable. Wrap both wait methods in a single _call_with_transient_retry helper: - Retry FAILED_PRECONDITION and UNAVAILABLE with exponential backoff (0.5s, doubling, capped at 5s). - Respect the caller's timeout. timeout in (0, None) means unbounded. The first call passes the user's timeout verbatim so behavior on a healthy runtime is unchanged. On retry, the per-call gRPC deadline is the remaining budget against a monotonic deadline anchored to the start of the loop. - DEADLINE_EXCEEDED and budget exhaustion both surface as the public TimeoutError (preserved through a private _TransientTimeout sentinel). - Non-transient RpcErrors propagate immediately, unchanged. Behavior on a healthy runtime is unchanged: the first call succeeds and no retry loop runs. Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 8571c3e commit e58655d

1 file changed

Lines changed: 74 additions & 23 deletions

File tree

  • ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/client.py

Lines changed: 74 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@
1010
# limitations under the License.
1111

1212
import logging
13+
import time
1314
import uuid
1415
from dataclasses import dataclass
1516
from datetime import datetime
1617
from enum import Enum
1718
from typing import Any, Optional, Sequence, TypeVar, Union
1819

20+
21+
class _TransientTimeout(Exception):
22+
"""Internal sentinel: the retry loop exhausted the user-provided timeout
23+
budget. Callers convert this to a public ``TimeoutError``."""
24+
1925
import dapr.ext.workflow._durabletask.internal.helpers as helpers
2026
import dapr.ext.workflow._durabletask.internal.orchestrator_service_pb2_grpc as stubs
2127
import dapr.ext.workflow._durabletask.internal.protos as pb
@@ -220,29 +226,28 @@ def wait_for_orchestration_start(
220226
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
221227
) -> Optional[WorkflowState]:
222228
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
223-
try:
224-
grpc_timeout = None if timeout == 0 else timeout
225-
self._logger.info(
226-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
227-
)
229+
self._logger.info(
230+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
231+
)
232+
233+
def _call(grpc_timeout):
228234
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
229235
return new_orchestration_state(req.instanceId, res)
230-
except grpc.RpcError as rpc_error:
231-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
232-
# Replace gRPC error with the built-in TimeoutError
233-
raise TimeoutError('Timed-out waiting for the orchestration to start')
234-
else:
235-
raise
236+
237+
try:
238+
return self._call_with_transient_retry(instance_id, timeout, _call)
239+
except _TransientTimeout:
240+
raise TimeoutError('Timed-out waiting for the orchestration to start')
236241

237242
def wait_for_orchestration_completion(
238243
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
239244
) -> Optional[WorkflowState]:
240245
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
241-
try:
242-
grpc_timeout = None if timeout == 0 else timeout
243-
self._logger.info(
244-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
245-
)
246+
self._logger.info(
247+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
248+
)
249+
250+
def _call(grpc_timeout):
246251
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
247252
req, timeout=grpc_timeout
248253
)
@@ -262,14 +267,60 @@ def wait_for_orchestration_completion(
262267
self._logger.info(f"Instance '{instance_id}' was terminated.")
263268
elif state.runtime_status == OrchestrationStatus.COMPLETED:
264269
self._logger.info(f"Instance '{instance_id}' completed.")
265-
266270
return state
267-
except grpc.RpcError as rpc_error:
268-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
269-
# Replace gRPC error with the built-in TimeoutError
270-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
271-
else:
272-
raise
271+
272+
try:
273+
return self._call_with_transient_retry(instance_id, timeout, _call)
274+
except _TransientTimeout:
275+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
276+
277+
# Transient gRPC codes that indicate the workflow runtime is temporarily
278+
# unable to locate the workflow actor — typically immediately after a Dapr
279+
# sidecar restart (e.g. recovery from chaos). The placement service has the
280+
# actor registration, but local daprd hasn't received the dissemination yet.
281+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
282+
# though the workflow runtime state is intact.
283+
_TRANSIENT_RPC_CODES = (
284+
grpc.StatusCode.FAILED_PRECONDITION,
285+
grpc.StatusCode.UNAVAILABLE,
286+
)
287+
288+
def _call_with_transient_retry(self, instance_id, timeout, call_fn):
289+
"""Run a gRPC wait call, retrying transient errors until the user
290+
timeout deadline. Re-raises non-transient errors immediately.
291+
timeout in (0, None) means unbounded; we still retry transients with
292+
backoff.
293+
294+
The first call passes ``timeout`` verbatim to ``call_fn`` so callers
295+
observe identical behavior to a non-retrying client when no transient
296+
occurs (preserves prior public behavior). On a retry, the per-call
297+
gRPC deadline is the remaining budget against a monotonic deadline
298+
anchored to the start of the loop.
299+
"""
300+
unbounded = timeout in (0, None)
301+
deadline = None if unbounded else time.monotonic() + timeout
302+
grpc_timeout = None if unbounded else timeout
303+
backoff = 0.5
304+
while True:
305+
try:
306+
return call_fn(grpc_timeout)
307+
except grpc.RpcError as rpc_error:
308+
code = rpc_error.code() # type: ignore
309+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
310+
raise _TransientTimeout()
311+
if code not in self._TRANSIENT_RPC_CODES:
312+
raise
313+
if deadline is not None and time.monotonic() >= deadline:
314+
raise _TransientTimeout()
315+
self._logger.warning(
316+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
317+
f"retrying in {backoff:.1f}s"
318+
)
319+
time.sleep(min(backoff, 5.0))
320+
backoff = min(backoff * 2, 5.0)
321+
grpc_timeout = (
322+
None if deadline is None else max(0.1, deadline - time.monotonic())
323+
)
273324

274325
def raise_orchestration_event(
275326
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

0 commit comments

Comments
 (0)