Skip to content

Commit cd8b10e

Browse files
committed
Merge branch 'main' into bundled-optional-exts
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
2 parents 88cdef6 + 71b26be commit cd8b10e

5 files changed

Lines changed: 449 additions & 162 deletions

File tree

dapr/ext/workflow/_durabletask/aio/client.py

Lines changed: 100 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12+
import asyncio
1213
import logging
14+
import time
1315
import uuid
1416
from datetime import datetime
1517
from typing import Any, Optional, Sequence, Union
@@ -35,6 +37,7 @@
3537
TOutput,
3638
WorkflowIdReusePolicy,
3739
WorkflowState,
40+
_TransientTimeout,
3841
new_orchestration_state,
3942
)
4043

@@ -121,34 +124,33 @@ async def get_orchestration_state(
121124
return new_orchestration_state(req.instanceId, res)
122125

123126
async def wait_for_orchestration_start(
124-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
127+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
125128
) -> Optional[WorkflowState]:
126129
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
127-
try:
128-
grpc_timeout = None if timeout == 0 else timeout
129-
self._logger.info(
130-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
131-
)
130+
self._logger.info(
131+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
132+
)
133+
134+
async def _call(grpc_timeout):
132135
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
133136
req, timeout=grpc_timeout
134137
)
135138
return new_orchestration_state(req.instanceId, res)
136-
except grpc.RpcError as rpc_error:
137-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
138-
# Replace gRPC error with the built-in TimeoutError
139-
raise TimeoutError('Timed-out waiting for the orchestration to start')
140-
else:
141-
raise
139+
140+
try:
141+
return await self._call_with_transient_retry(instance_id, timeout, _call)
142+
except _TransientTimeout:
143+
raise TimeoutError('Timed-out waiting for the orchestration to start')
142144

143145
async def wait_for_orchestration_completion(
144-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
146+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
145147
) -> Optional[WorkflowState]:
146148
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
147-
try:
148-
grpc_timeout = None if timeout == 0 else timeout
149-
self._logger.info(
150-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
151-
)
149+
self._logger.info(
150+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
151+
)
152+
153+
async def _call(grpc_timeout):
152154
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
153155
req, timeout=grpc_timeout
154156
)
@@ -168,14 +170,87 @@ async def wait_for_orchestration_completion(
168170
self._logger.info(f"Instance '{instance_id}' was terminated.")
169171
elif state.runtime_status == OrchestrationStatus.COMPLETED:
170172
self._logger.info(f"Instance '{instance_id}' completed.")
171-
172173
return state
173-
except grpc.RpcError as rpc_error:
174-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
175-
# Replace gRPC error with the built-in TimeoutError
176-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
177-
else:
178-
raise
174+
175+
try:
176+
return await self._call_with_transient_retry(instance_id, timeout, _call)
177+
except _TransientTimeout:
178+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
179+
180+
# Transient gRPC codes that indicate the workflow runtime is temporarily
181+
# unable to locate the workflow actor — typically immediately after a Dapr
182+
# sidecar restart (e.g. recovery from chaos). The placement service has the
183+
# actor registration, but local daprd hasn't received the dissemination yet.
184+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
185+
# though the workflow runtime state is intact.
186+
_TRANSIENT_RPC_CODES = (
187+
grpc.StatusCode.FAILED_PRECONDITION,
188+
grpc.StatusCode.UNAVAILABLE,
189+
)
190+
191+
# See TaskHubGrpcClient._MAX_TRANSIENT_RETRY_SECONDS — same grace window for
192+
# unbounded (timeout=0) callers so a down sidecar surfaces the original
193+
# error instead of retrying forever.
194+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
195+
196+
async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
197+
"""Async mirror of TaskHubGrpcClient._call_with_transient_retry.
198+
Retries FAILED_PRECONDITION/UNAVAILABLE with capped exponential
199+
backoff while clamping sleep and per-call gRPC timeout to the
200+
remaining budget. The first call uses the caller's timeout unchanged
201+
(``None`` when unbounded) so callers observe identical behavior on a
202+
healthy runtime. In unbounded
203+
mode, continuous transient retries are capped at
204+
``_MAX_TRANSIENT_RETRY_SECONDS`` before the original error propagates.
205+
"""
206+
unbounded = timeout in (0, None)
207+
deadline = None if unbounded else time.monotonic() + timeout
208+
grpc_timeout = None if unbounded else timeout
209+
backoff = 0.5
210+
transient_deadline = None # unbounded mode only; anchored on first transient
211+
while True:
212+
try:
213+
return await call_fn(grpc_timeout)
214+
except grpc.RpcError as rpc_error:
215+
code = rpc_error.code() # type: ignore
216+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
217+
raise _TransientTimeout()
218+
if code not in self._TRANSIENT_RPC_CODES:
219+
raise
220+
221+
now = time.monotonic()
222+
223+
if unbounded:
224+
if transient_deadline is None:
225+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
226+
elif now >= transient_deadline:
227+
raise
228+
229+
if deadline is None:
230+
remaining = None
231+
else:
232+
remaining = deadline - now
233+
if remaining <= 0:
234+
raise _TransientTimeout()
235+
236+
sleep_for = min(backoff, 5.0)
237+
if remaining is not None:
238+
sleep_for = min(sleep_for, remaining)
239+
if transient_deadline is not None:
240+
sleep_for = min(sleep_for, transient_deadline - now)
241+
self._logger.warning(
242+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
243+
f'retrying in {sleep_for:.2f}s'
244+
)
245+
await asyncio.sleep(sleep_for)
246+
backoff = min(backoff * 2, 5.0)
247+
248+
if deadline is None:
249+
grpc_timeout = None
250+
else:
251+
grpc_timeout = deadline - time.monotonic()
252+
if grpc_timeout <= 0:
253+
raise _TransientTimeout()
179254

180255
async def raise_orchestration_event(
181256
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

dapr/ext/workflow/_durabletask/client.py

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# limitations under the License.
1111

1212
import logging
13+
import time
1314
import uuid
1415
from dataclasses import dataclass
1516
from datetime import datetime
@@ -26,6 +27,12 @@
2627
from dapr.ext.workflow._durabletask import task
2728
from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2829

30+
31+
class _TransientTimeout(Exception):
32+
"""Internal sentinel: the retry loop exhausted the user-provided timeout
33+
budget. Callers convert this to a public ``TimeoutError``."""
34+
35+
2936
TInput = TypeVar('TInput')
3037
TOutput = TypeVar('TOutput')
3138

@@ -218,32 +225,31 @@ def get_orchestration_state(
218225
return new_orchestration_state(req.instanceId, res)
219226

220227
def wait_for_orchestration_start(
221-
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
228+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
222229
) -> Optional[WorkflowState]:
223230
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
224-
try:
225-
grpc_timeout = None if timeout == 0 else timeout
226-
self._logger.info(
227-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
228-
)
231+
self._logger.info(
232+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
233+
)
234+
235+
def _call(grpc_timeout):
229236
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
230237
return new_orchestration_state(req.instanceId, res)
231-
except grpc.RpcError as rpc_error:
232-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
233-
# Replace gRPC error with the built-in TimeoutError
234-
raise TimeoutError('Timed-out waiting for the orchestration to start')
235-
else:
236-
raise
238+
239+
try:
240+
return self._call_with_transient_retry(instance_id, timeout, _call)
241+
except _TransientTimeout:
242+
raise TimeoutError('Timed-out waiting for the orchestration to start')
237243

238244
def wait_for_orchestration_completion(
239-
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
245+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
240246
) -> Optional[WorkflowState]:
241247
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
242-
try:
243-
grpc_timeout = None if timeout == 0 else timeout
244-
self._logger.info(
245-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
246-
)
248+
self._logger.info(
249+
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
250+
)
251+
252+
def _call(grpc_timeout):
247253
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
248254
req, timeout=grpc_timeout
249255
)
@@ -263,14 +269,100 @@ def wait_for_orchestration_completion(
263269
self._logger.info(f"Instance '{instance_id}' was terminated.")
264270
elif state.runtime_status == OrchestrationStatus.COMPLETED:
265271
self._logger.info(f"Instance '{instance_id}' completed.")
266-
267272
return state
268-
except grpc.RpcError as rpc_error:
269-
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
270-
# Replace gRPC error with the built-in TimeoutError
271-
raise TimeoutError('Timed-out waiting for the orchestration to complete')
272-
else:
273-
raise
273+
274+
try:
275+
return self._call_with_transient_retry(instance_id, timeout, _call)
276+
except _TransientTimeout:
277+
raise TimeoutError('Timed-out waiting for the orchestration to complete')
278+
279+
# Transient gRPC codes that indicate the workflow runtime is temporarily
280+
# unable to locate the workflow actor — typically immediately after a Dapr
281+
# sidecar restart (e.g. recovery from chaos). The placement service has the
282+
# actor registration, but local daprd hasn't received the dissemination yet.
283+
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
284+
# though the workflow runtime state is intact.
285+
_TRANSIENT_RPC_CODES = (
286+
grpc.StatusCode.FAILED_PRECONDITION,
287+
grpc.StatusCode.UNAVAILABLE,
288+
)
289+
290+
# When the caller sets no timeout (timeout=0), bound how long we keep
291+
# retrying *consecutive* transient errors so a permanently-unavailable
292+
# sidecar surfaces the original error instead of hanging forever. This
293+
# window comfortably covers placement re-dissemination after a restart;
294+
# a slow-but-healthy workflow never enters this path (it just blocks in
295+
# the long-poll), so its indefinite wait is preserved.
296+
_MAX_TRANSIENT_RETRY_SECONDS = 30.0
297+
298+
def _call_with_transient_retry(self, instance_id, timeout, call_fn):
299+
"""Run a gRPC wait call, retrying transient errors until the user
300+
timeout deadline. Re-raises non-transient errors immediately.
301+
timeout in (0, None) means unbounded; transients are still retried,
302+
but only for up to ``_MAX_TRANSIENT_RETRY_SECONDS`` of continuous
303+
failures, after which the original transient error propagates.
304+
305+
The first call passes the caller's ``grpc_timeout`` (``None`` when
306+
unbounded) to ``call_fn`` so callers observe identical behavior to a
307+
non-retrying client when no transient occurs (preserves prior public
308+
behavior). On a retry, both the sleep
309+
and the per-call gRPC deadline are clamped to the remaining budget so
310+
the helper never sleeps past ``timeout`` or starts a gRPC call with
311+
no time left.
312+
"""
313+
unbounded = timeout in (0, None)
314+
deadline = None if unbounded else time.monotonic() + timeout
315+
grpc_timeout = None if unbounded else timeout
316+
backoff = 0.5
317+
transient_deadline = None # unbounded mode only; anchored on first transient
318+
while True:
319+
try:
320+
return call_fn(grpc_timeout)
321+
except grpc.RpcError as rpc_error:
322+
code = rpc_error.code() # type: ignore
323+
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
324+
raise _TransientTimeout()
325+
if code not in self._TRANSIENT_RPC_CODES:
326+
raise
327+
328+
now = time.monotonic()
329+
330+
# In unbounded mode the user budget can't end the loop, so cap
331+
# continuous transient retries and re-raise the original error
332+
# (matching pre-retry behavior) once the grace window elapses.
333+
if unbounded:
334+
if transient_deadline is None:
335+
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
336+
elif now >= transient_deadline:
337+
raise
338+
339+
# Compute remaining budget once and reuse so the sleep and the
340+
# next per-call grpc_timeout agree on "how much time is left".
341+
if deadline is None:
342+
remaining = None
343+
else:
344+
remaining = deadline - now
345+
if remaining <= 0:
346+
raise _TransientTimeout()
347+
348+
sleep_for = min(backoff, 5.0)
349+
if remaining is not None:
350+
sleep_for = min(sleep_for, remaining)
351+
if transient_deadline is not None:
352+
sleep_for = min(sleep_for, transient_deadline - now)
353+
self._logger.warning(
354+
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
355+
f'retrying in {sleep_for:.2f}s'
356+
)
357+
time.sleep(sleep_for)
358+
backoff = min(backoff * 2, 5.0)
359+
360+
if deadline is None:
361+
grpc_timeout = None
362+
else:
363+
grpc_timeout = deadline - time.monotonic()
364+
if grpc_timeout <= 0:
365+
raise _TransientTimeout()
274366

275367
def raise_orchestration_event(
276368
self, instance_id: str, event_name: str, *, data: Optional[Any] = None

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ dev = [
101101
"grpcio-tools==1.76.0",
102102
"types-python-dateutil~=2.9.0",
103103
"types-grpcio-status~=1.0.0",
104-
"coverage~=7.13.4",
104+
"coverage>=7.13.4,<7.15.0",
105105
"wheel~=0.46.3",
106106
"opentelemetry-sdk>=1.40,<1.42",
107107
"opentelemetry-instrumentation-grpc~=0.61b0",

0 commit comments

Comments
 (0)