Skip to content

Commit f63311d

Browse files
committed
fix(ext-workflow): bound transient retries and address review feedback
Cap continuous transient-error retries in unbounded mode (timeout=0/None) at 30s via _MAX_TRANSIENT_RETRY_SECONDS, then re-raise the original RpcError. This preserves the pre-retry contract: timeout=0 still waits indefinitely for a healthy workflow and never raises TimeoutError, but a permanently-unavailable sidecar now surfaces the original error instead of retrying forever. Also address review feedback: - Type wait_for_orchestration_* timeout as Optional[int] (None is a supported, tested input meaning unbounded). - Fix sync "up to Nones" log message to treat None as indefinite, matching the async client. - Correct the retry-helper docstring: the first call passes grpc_timeout (None when unbounded), not the timeout value verbatim. Add a test covering unbounded-mode transient exhaustion surfacing as the original RpcError (not TimeoutError, not a hang). Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 09f45af commit f63311d

3 files changed

Lines changed: 89 additions & 16 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/aio/client.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def get_orchestration_state(
123123
return new_orchestration_state(req.instanceId, res)
124124

125125
async def wait_for_orchestration_start(
126-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
126+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
127127
) -> Optional[WorkflowState]:
128128
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
129129
self._logger.info(
@@ -142,7 +142,7 @@ async def _call(grpc_timeout):
142142
raise TimeoutError('Timed-out waiting for the orchestration to start')
143143

144144
async def wait_for_orchestration_completion(
145-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
145+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
146146
) -> Optional[WorkflowState]:
147147
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
148148
self._logger.info(
@@ -187,17 +187,26 @@ async def _call(grpc_timeout):
187187
grpc.StatusCode.UNAVAILABLE,
188188
)
189189

190+
# See TaskHubGrpcClient._MAX_TRANSIENT_RETRY_SECONDS — same grace window for
191+
# unbounded (timeout=0) callers so a down sidecar surfaces the original
192+
# error instead of retrying forever.
193+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
194+
190195
async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
191196
"""Async mirror of TaskHubGrpcClient._call_with_transient_retry.
192197
Retries FAILED_PRECONDITION/UNAVAILABLE with capped exponential
193198
backoff while clamping sleep and per-call gRPC timeout to the
194-
remaining budget. The first call passes ``timeout`` verbatim so
195-
callers observe identical behavior on a healthy runtime.
199+
remaining budget. The first call uses the caller's timeout unchanged
200+
(``None`` when unbounded) so callers observe identical behavior on a
201+
healthy runtime. In unbounded
202+
mode, continuous transient retries are capped at
203+
``_MAX_TRANSIENT_RETRY_SECONDS`` before the original error propagates.
196204
"""
197205
unbounded = timeout in (0, None)
198206
deadline = None if unbounded else time.monotonic() + timeout
199207
grpc_timeout = None if unbounded else timeout
200208
backoff = 0.5
209+
transient_deadline = None # unbounded mode only; anchored on first transient
201210
while True:
202211
try:
203212
return await call_fn(grpc_timeout)
@@ -208,16 +217,26 @@ async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
208217
if code not in self._TRANSIENT_RPC_CODES:
209218
raise
210219

220+
now = time.monotonic()
221+
222+
if unbounded:
223+
if transient_deadline is None:
224+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
225+
elif now >= transient_deadline:
226+
raise
227+
211228
if deadline is None:
212229
remaining = None
213230
else:
214-
remaining = deadline - time.monotonic()
231+
remaining = deadline - now
215232
if remaining <= 0:
216233
raise _TransientTimeout()
217234

218235
sleep_for = min(backoff, 5.0)
219236
if remaining is not None:
220237
sleep_for = min(sleep_for, remaining)
238+
if transient_deadline is not None:
239+
sleep_for = min(sleep_for, transient_deadline - now)
221240
self._logger.warning(
222241
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
223242
f'retrying in {sleep_for:.2f}s'

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/client.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,11 @@ def get_orchestration_state(
224224
return new_orchestration_state(req.instanceId, res)
225225

226226
def wait_for_orchestration_start(
227-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
227+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
228228
) -> Optional[WorkflowState]:
229229
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
230230
self._logger.info(
231-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
231+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
232232
)
233233

234234
def _call(grpc_timeout):
@@ -241,11 +241,11 @@ def _call(grpc_timeout):
241241
raise TimeoutError('Timed-out waiting for the orchestration to start')
242242

243243
def wait_for_orchestration_completion(
244-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
244+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
245245
) -> Optional[WorkflowState]:
246246
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
247247
self._logger.info(
248-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
248+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
249249
)
250250

251251
def _call(grpc_timeout):
@@ -286,15 +286,25 @@ def _call(grpc_timeout):
286286
grpc.StatusCode.UNAVAILABLE,
287287
)
288288

289+
# When the caller sets no timeout (timeout=0), bound how long we keep
290+
# retrying *consecutive* transient errors so a permanently-unavailable
291+
# sidecar surfaces the original error instead of hanging forever. This
292+
# window comfortably covers placement re-dissemination after a restart;
293+
# a slow-but-healthy workflow never enters this path (it just blocks in
294+
# the long-poll), so its indefinite wait is preserved.
295+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
296+
289297
def _call_with_transient_retry(self, instance_id, timeout, call_fn):
290298
"""Run a gRPC wait call, retrying transient errors until the user
291299
timeout deadline. Re-raises non-transient errors immediately.
292-
timeout in (0, None) means unbounded; we still retry transients with
293-
backoff.
294-
295-
The first call passes ``timeout`` verbatim to ``call_fn`` so callers
296-
observe identical behavior to a non-retrying client when no transient
297-
occurs (preserves prior public behavior). On a retry, both the sleep
300+
timeout in (0, None) means unbounded; transients are still retried,
301+
but only for up to ``_MAX_TRANSIENT_RETRY_SECONDS`` of continuous
302+
failures, after which the original transient error propagates.
303+
304+
The first call passes the caller's ``grpc_timeout`` (``None`` when
305+
unbounded) to ``call_fn`` so callers observe identical behavior to a
306+
non-retrying client when no transient occurs (preserves prior public
307+
behavior). On a retry, both the sleep
298308
and the per-call gRPC deadline are clamped to the remaining budget so
299309
the helper never sleeps past ``timeout`` or starts a gRPC call with
300310
no time left.
@@ -303,6 +313,7 @@ def _call_with_transient_retry(self, instance_id, timeout, call_fn):
303313
deadline = None if unbounded else time.monotonic() + timeout
304314
grpc_timeout = None if unbounded else timeout
305315
backoff = 0.5
316+
transient_deadline = None # unbounded mode only; anchored on first transient
306317
while True:
307318
try:
308319
return call_fn(grpc_timeout)
@@ -313,18 +324,31 @@ def _call_with_transient_retry(self, instance_id, timeout, call_fn):
313324
if code not in self._TRANSIENT_RPC_CODES:
314325
raise
315326

327+
now = time.monotonic()
328+
329+
# In unbounded mode the user budget can't end the loop, so cap
330+
# continuous transient retries and re-raise the original error
331+
# (matching pre-retry behavior) once the grace window elapses.
332+
if unbounded:
333+
if transient_deadline is None:
334+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
335+
elif now >= transient_deadline:
336+
raise
337+
316338
# Compute remaining budget once and reuse so the sleep and the
317339
# next per-call grpc_timeout agree on "how much time is left".
318340
if deadline is None:
319341
remaining = None
320342
else:
321-
remaining = deadline - time.monotonic()
343+
remaining = deadline - now
322344
if remaining <= 0:
323345
raise _TransientTimeout()
324346

325347
sleep_for = min(backoff, 5.0)
326348
if remaining is not None:
327349
sleep_for = min(sleep_for, remaining)
350+
if transient_deadline is not None:
351+
sleep_for = min(sleep_for, transient_deadline - now)
328352
self._logger.warning(
329353
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
330354
f'retrying in {sleep_for:.2f}s'

ext/dapr-ext-workflow/tests/durabletask/test_orchestration_wait.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,33 @@ def test_wait_for_orchestration_start_non_transient_propagates(monkeypatch):
156156
with pytest.raises(grpc.RpcError):
157157
c.wait_for_orchestration_start(instance_id, timeout=10)
158158
assert c._stub.WaitForInstanceStart.call_count == 1
159+
160+
161+
def test_wait_for_orchestration_start_unbounded_transient_gives_up_with_rpc_error(monkeypatch):
162+
"""With timeout=0 (unbounded), persistent transient errors are retried only
163+
for the grace window, then the original RpcError propagates — NOT a hang and
164+
NOT a TimeoutError, preserving the pre-retry contract that timeout=0 surfaces
165+
the gRPC error rather than TimeoutError."""
166+
instance_id = 'test-instance'
167+
168+
# Advance well past _MAX_TRANSIENT_RETRY_SECONDS on each transient so the
169+
# grace window is exhausted within a couple of retries.
170+
fake_time = [0.0]
171+
172+
def fake_monotonic():
173+
fake_time[0] += 20.0 # 20, 40, 60, ... — anchors at 20, deadline 50
174+
return fake_time[0]
175+
176+
monkeypatch.setattr('dapr.ext.workflow._durabletask.client.time.monotonic', fake_monotonic)
177+
monkeypatch.setattr('dapr.ext.workflow._durabletask.client.time.sleep', lambda s: None)
178+
179+
c = TaskHubGrpcClient()
180+
c._stub = Mock()
181+
c._stub.WaitForInstanceStart.side_effect = _make_rpc_error(grpc.StatusCode.UNAVAILABLE)
182+
183+
with pytest.raises(grpc.RpcError) as exc_info:
184+
c.wait_for_orchestration_start(instance_id, timeout=0)
185+
assert not isinstance(exc_info.value, TimeoutError)
186+
# Retried at least once before giving up (proves it didn't fail-fast like the
187+
# non-transient path, and didn't loop forever).
188+
assert c._stub.WaitForInstanceStart.call_count >= 2

0 commit comments

Comments
 (0)