1010# limitations under the License.
1111
1212import logging
13+ import time
1314import uuid
1415from dataclasses import dataclass
1516from datetime import datetime
2526from dapr .ext .workflow ._durabletask .internal .grpc_interceptor import DefaultClientInterceptorImpl
2627from google .protobuf import wrappers_pb2
2728
29+
30+ class _TransientTimeout (Exception ):
31+ """Internal sentinel: the retry loop exhausted the user-provided timeout
32+ budget. Callers convert this to a public ``TimeoutError``."""
33+
34+
2835TInput = TypeVar ('TInput' )
2936TOutput = TypeVar ('TOutput' )
3037
@@ -217,32 +224,31 @@ def get_orchestration_state(
217224 return new_orchestration_state (req .instanceId , res )
218225
219226 def wait_for_orchestration_start (
220- self , instance_id : str , * , fetch_payloads : bool = False , timeout : int = 0
227+ self , instance_id : str , * , fetch_payloads : bool = False , timeout : Optional [ int ] = 0
221228 ) -> Optional [WorkflowState ]:
222229 req = pb .GetInstanceRequest (instanceId = instance_id , getInputsAndOutputs = fetch_payloads )
223- try :
224- grpc_timeout = None if timeout == 0 else timeout
225- self . _logger . info (
226- f"Waiting { 'indefinitely' if timeout == 0 else f'up to { timeout } s' } for instance ' { instance_id } ' to start."
227- )
230+ self . _logger . info (
231+ f"Waiting { 'indefinitely' if timeout in ( 0 , None ) else f'up to { timeout } s' } for instance ' { instance_id } ' to start."
232+ )
233+
234+ def _call ( grpc_timeout ):
228235 res : pb .GetInstanceResponse = self ._stub .WaitForInstanceStart (req , timeout = grpc_timeout )
229236 return new_orchestration_state (req .instanceId , res )
230- except grpc .RpcError as rpc_error :
231- if rpc_error .code () == grpc .StatusCode .DEADLINE_EXCEEDED : # type: ignore
232- # Replace gRPC error with the built-in TimeoutError
233- raise TimeoutError ('Timed-out waiting for the orchestration to start' )
234- else :
235- raise
237+
238+ try :
239+ return self ._call_with_transient_retry (instance_id , timeout , _call )
240+ except _TransientTimeout :
241+ raise TimeoutError ('Timed-out waiting for the orchestration to start' )
236242
237243 def wait_for_orchestration_completion (
238- self , instance_id : str , * , fetch_payloads : bool = True , timeout : int = 0
244+ self , instance_id : str , * , fetch_payloads : bool = True , timeout : Optional [ int ] = 0
239245 ) -> Optional [WorkflowState ]:
240246 req = pb .GetInstanceRequest (instanceId = instance_id , getInputsAndOutputs = fetch_payloads )
241- try :
242- grpc_timeout = None if timeout == 0 else timeout
243- self . _logger . info (
244- f"Waiting { 'indefinitely' if timeout == 0 else f'up to { timeout } s' } for instance ' { instance_id } ' to complete."
245- )
247+ self . _logger . info (
248+ f"Waiting { 'indefinitely' if timeout in ( 0 , None ) else f'up to { timeout } s' } for instance ' { instance_id } ' to complete."
249+ )
250+
251+ def _call ( grpc_timeout ):
246252 res : pb .GetInstanceResponse = self ._stub .WaitForInstanceCompletion (
247253 req , timeout = grpc_timeout
248254 )
@@ -262,14 +268,100 @@ def wait_for_orchestration_completion(
262268 self ._logger .info (f"Instance '{ instance_id } ' was terminated." )
263269 elif state .runtime_status == OrchestrationStatus .COMPLETED :
264270 self ._logger .info (f"Instance '{ instance_id } ' completed." )
265-
266271 return state
267- except grpc .RpcError as rpc_error :
268- if rpc_error .code () == grpc .StatusCode .DEADLINE_EXCEEDED : # type: ignore
269- # Replace gRPC error with the built-in TimeoutError
270- raise TimeoutError ('Timed-out waiting for the orchestration to complete' )
271- else :
272- raise
272+
273+ try :
274+ return self ._call_with_transient_retry (instance_id , timeout , _call )
275+ except _TransientTimeout :
276+ raise TimeoutError ('Timed-out waiting for the orchestration to complete' )
277+
278+ # Transient gRPC codes that indicate the workflow runtime is temporarily
279+ # unable to locate the workflow actor — typically immediately after a Dapr
280+ # sidecar restart (e.g. recovery from chaos). The placement service has the
281+ # actor registration, but local daprd hasn't received the dissemination yet.
282+ # Without retry, every poll fails permanently with FAILED_PRECONDITION even
283+ # though the workflow runtime state is intact.
284+ _TRANSIENT_RPC_CODES = (
285+ grpc .StatusCode .FAILED_PRECONDITION ,
286+ grpc .StatusCode .UNAVAILABLE ,
287+ )
288+
289+ # When the caller sets no timeout (timeout=0), bound how long we keep
290+ # retrying *consecutive* transient errors so a permanently-unavailable
291+ # sidecar surfaces the original error instead of hanging forever. This
292+ # window comfortably covers placement re-dissemination after a restart;
293+ # a slow-but-healthy workflow never enters this path (it just blocks in
294+ # the long-poll), so its indefinite wait is preserved.
295+ _MAX_TRANSIENT_RETRY_SECONDS = 30.0
296+
297+ def _call_with_transient_retry (self , instance_id , timeout , call_fn ):
298+ """Run a gRPC wait call, retrying transient errors until the user
299+ timeout deadline. Re-raises non-transient errors immediately.
300+ timeout in (0, None) means unbounded; transients are still retried,
301+ but only for up to ``_MAX_TRANSIENT_RETRY_SECONDS`` of continuous
302+ failures, after which the original transient error propagates.
303+
304+ The first call passes the caller's ``grpc_timeout`` (``None`` when
305+ unbounded) to ``call_fn`` so callers observe identical behavior to a
306+ non-retrying client when no transient occurs (preserves prior public
307+ behavior). On a retry, both the sleep
308+ and the per-call gRPC deadline are clamped to the remaining budget so
309+ the helper never sleeps past ``timeout`` or starts a gRPC call with
310+ no time left.
311+ """
312+ unbounded = timeout in (0 , None )
313+ deadline = None if unbounded else time .monotonic () + timeout
314+ grpc_timeout = None if unbounded else timeout
315+ backoff = 0.5
316+ transient_deadline = None # unbounded mode only; anchored on first transient
317+ while True :
318+ try :
319+ return call_fn (grpc_timeout )
320+ except grpc .RpcError as rpc_error :
321+ code = rpc_error .code () # type: ignore
322+ if code == grpc .StatusCode .DEADLINE_EXCEEDED :
323+ raise _TransientTimeout ()
324+ if code not in self ._TRANSIENT_RPC_CODES :
325+ raise
326+
327+ now = time .monotonic ()
328+
329+ # In unbounded mode the user budget can't end the loop, so cap
330+ # continuous transient retries and re-raise the original error
331+ # (matching pre-retry behavior) once the grace window elapses.
332+ if unbounded :
333+ if transient_deadline is None :
334+ transient_deadline = now + self ._MAX_TRANSIENT_RETRY_SECONDS
335+ elif now >= transient_deadline :
336+ raise
337+
338+ # Compute remaining budget once and reuse so the sleep and the
339+ # next per-call grpc_timeout agree on "how much time is left".
340+ if deadline is None :
341+ remaining = None
342+ else :
343+ remaining = deadline - now
344+ if remaining <= 0 :
345+ raise _TransientTimeout ()
346+
347+ sleep_for = min (backoff , 5.0 )
348+ if remaining is not None :
349+ sleep_for = min (sleep_for , remaining )
350+ if transient_deadline is not None :
351+ sleep_for = min (sleep_for , transient_deadline - now )
352+ self ._logger .warning (
353+ f"Transient gRPC error { code .name } waiting on instance '{ instance_id } '; "
354+ f'retrying in { sleep_for :.2f} s'
355+ )
356+ time .sleep (sleep_for )
357+ backoff = min (backoff * 2 , 5.0 )
358+
359+ if deadline is None :
360+ grpc_timeout = None
361+ else :
362+ grpc_timeout = deadline - time .monotonic ()
363+ if grpc_timeout <= 0 :
364+ raise _TransientTimeout ()
273365
274366 def raise_orchestration_event (
275367 self , instance_id : str , event_name : str , * , data : Optional [Any ] = None
0 commit comments