Skip to content

Commit d6c6330

Browse files
Support activity retry options
1 parent f289808 commit d6c6330

9 files changed

Lines changed: 200 additions & 18 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ For a fuller deployable example, see
5454
[`examples/order_processing`](examples/order_processing), which runs a
5555
multi-activity order workflow against a local server with Docker Compose.
5656

57+
## Activity retries and timeouts
58+
59+
Configure per-call activity retries and deadlines from workflow code:
60+
61+
```python
62+
from durable_workflow import ActivityRetryPolicy
63+
64+
result = yield ctx.schedule_activity(
65+
"charge-card",
66+
[order],
67+
retry_policy=ActivityRetryPolicy(
68+
max_attempts=4,
69+
initial_interval_seconds=1,
70+
backoff_coefficient=2,
71+
maximum_interval_seconds=30,
72+
non_retryable_error_types=["ValidationError"],
73+
),
74+
start_to_close_timeout=120,
75+
schedule_to_close_timeout=300,
76+
heartbeat_timeout=15,
77+
)
78+
```
79+
5780
## Features
5881

5982
- **Async-first**: Built on `httpx` and `asyncio`

docs/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ pip install 'durable-workflow[prometheus]'
1818

1919
- **[Client](reference/client.md)** — start workflows, signal, query, update, wait for results, manage schedules.
2020
- **[Worker](reference/worker.md)** — poll the server for workflow and activity tasks, dispatch to registered handlers.
21-
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ContinueAsNew`, `StartChildWorkflow`, and the workflow decorator.
21+
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ActivityRetryPolicy`, `ContinueAsNew`, `StartChildWorkflow`, and the workflow decorator.
2222
- **[Activity](reference/activity.md)** — activity decorator and execution context.
2323
- **[Errors](reference/errors.md)** — typed exceptions raised by the client and worker.
24-
- **[Retry policy](reference/retry_policy.md)** — retry configuration for activities and workflows.
24+
- **[Retry policy](reference/retry_policy.md)**HTTP transport retry configuration for the client.
2525
- **[Metrics](reference/metrics.md)** — pluggable recorders, including a Prometheus adapter.
2626
- **[Serializer](reference/serializer.md)** — payload encoding and decoding helpers.
2727
- **[Sync helpers](reference/sync.md)** — blocking wrappers around the async client for scripts and tests.

src/durable_workflow/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@
4646
NoopMetrics,
4747
PrometheusMetrics,
4848
)
49+
from .retry_policy import RetryPolicy, TransportRetryPolicy
4950
from .worker import Worker
50-
from .workflow import ContinueAsNew, StartChildWorkflow
51+
from .workflow import ActivityRetryPolicy, ContinueAsNew, StartChildWorkflow
5152

5253
__all__ = [
5354
"__version__",
5455
"ActivityCancelled",
5556
"ActivityContext",
5657
"ActivityInfo",
58+
"ActivityRetryPolicy",
5759
"ChildWorkflowFailed",
5860
"Client",
5961
"ContinueAsNew",
@@ -83,7 +85,9 @@
8385
"NoopMetrics",
8486
"QueryFailed",
8587
"PrometheusMetrics",
88+
"RetryPolicy",
8689
"ServerError",
90+
"TransportRetryPolicy",
8791
"Unauthorized",
8892
"UpdateRejected",
8993
"WorkflowAlreadyStarted",

src/durable_workflow/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
_raise_for_status,
3636
)
3737
from .metrics import CLIENT_REQUEST_DURATION_SECONDS, CLIENT_REQUESTS, NOOP_METRICS, MetricsRecorder
38-
from .retry_policy import RetryPolicy
38+
from .retry_policy import TransportRetryPolicy
3939

4040
PROTOCOL_VERSION = "1.0"
4141
CONTROL_PLANE_VERSION = "2"
@@ -330,15 +330,15 @@ def __init__(
330330
worker_token: str | None = None,
331331
namespace: str = "default",
332332
timeout: float = 60.0,
333-
retry_policy: RetryPolicy | None = None,
333+
retry_policy: TransportRetryPolicy | None = None,
334334
metrics: MetricsRecorder | None = None,
335335
) -> None:
336336
self.base_url = base_url.rstrip("/")
337337
self.token = token
338338
self.control_token = control_token
339339
self.worker_token = worker_token
340340
self.namespace = namespace
341-
self.retry_policy = retry_policy or RetryPolicy()
341+
self.retry_policy = retry_policy or TransportRetryPolicy()
342342
self.metrics = metrics or NOOP_METRICS
343343
self._http = httpx.AsyncClient(base_url=self.base_url, timeout=timeout)
344344

src/durable_workflow/retry_policy.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
33
.. warning::
44
5-
This :class:`RetryPolicy` covers **only client-side HTTP retries** for
5+
:class:`TransportRetryPolicy` covers **only client-side HTTP retries** for
66
transient transport errors (connection failures, timeouts, 5xx responses,
77
429 rate-limiting). It is **not** the activity retry policy. Activity-level
8-
retry and timeout configuration is tracked in
9-
https://github.com/zorporation/durable-workflow/issues/392 and will land on
8+
retry and timeout configuration lives on
9+
:class:`durable_workflow.workflow.ActivityRetryPolicy` and is passed to
1010
``ctx.schedule_activity(..., retry_policy=...)``.
1111
"""
1212

@@ -24,9 +24,9 @@
2424

2525

2626
@dataclass
27-
class RetryPolicy:
27+
class TransportRetryPolicy:
2828
"""
29-
Retry policy for transient server errors.
29+
Retry policy for transient HTTP transport errors.
3030
3131
Retries requests that fail with transient errors (connection errors,
3232
timeouts, 5xx server errors, 429 rate limit). Does not retry client
@@ -95,3 +95,9 @@ async def execute(self, fn: Callable[[], Awaitable[T]]) -> T:
9595
if last_exc:
9696
raise last_exc
9797
raise RuntimeError("retry loop exhausted with no exception")
98+
99+
100+
# Backward-compatible alias for earlier 0.x releases. Prefer
101+
# TransportRetryPolicy in new code so it is not confused with workflow-level
102+
# activity retry policy.
103+
RetryPolicy = TransportRetryPolicy

src/durable_workflow/sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
WorkflowList,
1919
)
2020
from .metrics import MetricsRecorder
21-
from .retry_policy import RetryPolicy
21+
from .retry_policy import TransportRetryPolicy
2222

2323

2424
def _run(coro: Any) -> Any:
@@ -152,7 +152,7 @@ def __init__(
152152
worker_token: str | None = None,
153153
namespace: str = "default",
154154
timeout: float = 60.0,
155-
retry_policy: RetryPolicy | None = None,
155+
retry_policy: TransportRetryPolicy | None = None,
156156
metrics: MetricsRecorder | None = None,
157157
) -> None:
158158
self._async = AsyncClient(

src/durable_workflow/workflow.py

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
import contextlib
2020
import hashlib
2121
import logging
22+
import math
2223
import random
2324
import uuid
24-
from collections.abc import Callable, Iterable
25+
from collections.abc import Callable, Iterable, Mapping
2526
from dataclasses import dataclass, field
2627
from datetime import datetime, timezone
2728
from typing import Any
@@ -49,23 +50,97 @@ def registry() -> dict[str, type]:
4950

5051

5152
# ── Commands yielded from a workflow ──────────────────────────────────
53+
@dataclass
54+
class ActivityRetryPolicy:
55+
"""Retry policy applied to one scheduled activity call.
56+
57+
The policy is snapped onto the durable activity execution when the
58+
workflow task completes, so later code deploys do not change the retry
59+
budget for an already-scheduled activity.
60+
"""
61+
62+
max_attempts: int = 3
63+
initial_interval_seconds: float = 1.0
64+
backoff_coefficient: float = 2.0
65+
maximum_interval_seconds: float | None = None
66+
non_retryable_error_types: list[str] = field(default_factory=list)
67+
backoff_seconds: list[int] | None = None
68+
69+
def to_dict(self) -> dict[str, Any]:
70+
"""Return the server command shape for this activity retry policy."""
71+
if self.max_attempts < 1:
72+
raise ValueError("max_attempts must be >= 1")
73+
if self.initial_interval_seconds < 0:
74+
raise ValueError("initial_interval_seconds must be >= 0")
75+
if self.backoff_coefficient < 1:
76+
raise ValueError("backoff_coefficient must be >= 1")
77+
if self.maximum_interval_seconds is not None and self.maximum_interval_seconds < 0:
78+
raise ValueError("maximum_interval_seconds must be >= 0")
79+
80+
return {
81+
"max_attempts": self.max_attempts,
82+
"backoff_seconds": self._backoff_seconds(),
83+
"non_retryable_error_types": [
84+
value.strip()
85+
for value in self.non_retryable_error_types
86+
if isinstance(value, str) and value.strip()
87+
],
88+
}
89+
90+
def _backoff_seconds(self) -> list[int]:
91+
if self.backoff_seconds is not None:
92+
return [max(0, int(seconds)) for seconds in self.backoff_seconds]
93+
94+
seconds: list[int] = []
95+
current = self.initial_interval_seconds
96+
maximum = self.maximum_interval_seconds
97+
for _ in range(max(0, self.max_attempts - 1)):
98+
value = current if maximum is None else min(current, maximum)
99+
seconds.append(max(0, int(math.ceil(value))))
100+
current *= self.backoff_coefficient
101+
return seconds
102+
103+
104+
ActivityRetryPolicyInput = ActivityRetryPolicy | Mapping[str, Any]
105+
106+
52107
@dataclass
53108
class ScheduleActivity:
54109
"""Command requesting an activity task."""
55110

56111
activity_type: str
57112
arguments: list[Any]
58113
queue: str | None = None
114+
retry_policy: ActivityRetryPolicyInput | None = None
115+
start_to_close_timeout: int | None = None
116+
schedule_to_start_timeout: int | None = None
117+
schedule_to_close_timeout: int | None = None
118+
heartbeat_timeout: int | None = None
59119

60120
def to_server_command(
61121
self, task_queue: str, *, payload_codec: str = serializer.AVRO_CODEC
62122
) -> dict[str, Any]:
63-
return {
123+
command: dict[str, Any] = {
64124
"type": "schedule_activity",
65125
"activity_type": self.activity_type,
66126
"arguments": serializer.envelope(self.arguments, codec=payload_codec),
67127
"queue": self.queue or task_queue,
68128
}
129+
if self.retry_policy is not None:
130+
command["retry_policy"] = (
131+
self.retry_policy.to_dict()
132+
if isinstance(self.retry_policy, ActivityRetryPolicy)
133+
else dict(self.retry_policy)
134+
)
135+
if self.start_to_close_timeout is not None:
136+
command["start_to_close_timeout"] = self.start_to_close_timeout
137+
if self.schedule_to_start_timeout is not None:
138+
command["schedule_to_start_timeout"] = self.schedule_to_start_timeout
139+
if self.schedule_to_close_timeout is not None:
140+
command["schedule_to_close_timeout"] = self.schedule_to_close_timeout
141+
if self.heartbeat_timeout is not None:
142+
command["heartbeat_timeout"] = self.heartbeat_timeout
143+
return command
69144

70145

71146
@dataclass
@@ -266,9 +341,27 @@ def __init__(self, *, run_id: str = "", current_time: datetime | None = None) ->
266341
self.logger = _ReplayLogger(_REPLAY_LOGGER)
267342

268343
def schedule_activity(
269-
self, activity_type: str, arguments: list[Any], *, queue: str | None = None
344+
self,
345+
activity_type: str,
346+
arguments: list[Any],
347+
*,
348+
queue: str | None = None,
349+
retry_policy: ActivityRetryPolicyInput | None = None,
350+
start_to_close_timeout: int | None = None,
351+
schedule_to_start_timeout: int | None = None,
352+
schedule_to_close_timeout: int | None = None,
353+
heartbeat_timeout: int | None = None,
270354
) -> ScheduleActivity:
271-
return ScheduleActivity(activity_type=activity_type, arguments=list(arguments), queue=queue)
355+
return ScheduleActivity(
356+
activity_type=activity_type,
357+
arguments=list(arguments),
358+
queue=queue,
359+
retry_policy=retry_policy,
360+
start_to_close_timeout=start_to_close_timeout,
361+
schedule_to_start_timeout=schedule_to_start_timeout,
362+
schedule_to_close_timeout=schedule_to_close_timeout,
363+
heartbeat_timeout=heartbeat_timeout,
364+
)
272365

273366
def start_timer(self, seconds: int) -> StartTimer:
274367
return StartTimer(delay_seconds=seconds)

tests/test_replay.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from durable_workflow import serializer, workflow
44
from durable_workflow.errors import ChildWorkflowFailed
55
from durable_workflow.workflow import (
6+
ActivityRetryPolicy,
67
CompleteWorkflow,
78
ContinueAsNew,
89
FailWorkflow,
@@ -111,6 +112,38 @@ def test_server_command_uses_payload_codec(self) -> None:
111112
assert server_cmd["arguments"]["codec"] == "json"
112113
assert serializer.decode(server_cmd["arguments"]["blob"], codec="json") == ["world"]
113114

115+
def test_schedule_activity_server_command_includes_retry_policy_and_timeouts(self) -> None:
116+
cmd = ScheduleActivity(
117+
activity_type="charge-card",
118+
arguments=[{"order_id": "o-1"}],
119+
queue="payments",
120+
retry_policy=ActivityRetryPolicy(
121+
max_attempts=4,
122+
initial_interval_seconds=1,
123+
backoff_coefficient=3,
124+
maximum_interval_seconds=10,
125+
non_retryable_error_types=["ValidationError"],
126+
),
127+
start_to_close_timeout=120,
128+
schedule_to_start_timeout=10,
129+
schedule_to_close_timeout=300,
130+
heartbeat_timeout=15,
131+
)
132+
133+
server_cmd = cmd.to_server_command("default-queue")
134+
135+
assert server_cmd["type"] == "schedule_activity"
136+
assert server_cmd["queue"] == "payments"
137+
assert server_cmd["retry_policy"] == {
138+
"max_attempts": 4,
139+
"backoff_seconds": [1, 3, 9],
140+
"non_retryable_error_types": ["ValidationError"],
141+
}
142+
assert server_cmd["start_to_close_timeout"] == 120
143+
assert server_cmd["schedule_to_start_timeout"] == 10
144+
assert server_cmd["schedule_to_close_timeout"] == 300
145+
assert server_cmd["heartbeat_timeout"] == 15
146+
114147

115148
class TestTwoActivities:
116149
def test_first_schedules(self) -> None:
@@ -368,6 +401,26 @@ def test_logger_active_when_not_replaying(self) -> None:
368401
finally:
369402
logger.removeHandler(handler)
370403

404+
def test_schedule_activity_accepts_retry_policy_and_timeouts(self) -> None:
405+
ctx = WorkflowContext(run_id="r1")
406+
policy = ActivityRetryPolicy(max_attempts=2, backoff_seconds=[7])
407+
408+
cmd = ctx.schedule_activity(
409+
"charge-card",
410+
[{"order_id": "o-1"}],
411+
retry_policy=policy,
412+
start_to_close_timeout=120,
413+
schedule_to_start_timeout=10,
414+
schedule_to_close_timeout=300,
415+
heartbeat_timeout=15,
416+
)
417+
418+
assert cmd.retry_policy is policy
419+
assert cmd.start_to_close_timeout == 120
420+
assert cmd.schedule_to_start_timeout == 10
421+
assert cmd.schedule_to_close_timeout == 300
422+
assert cmd.heartbeat_timeout == 15
423+
371424

372425
class TestReplayWithRunId:
373426
def test_run_id_passed_to_context(self) -> None:

tests/test_retry_policy.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import httpx
44
import pytest
55

6-
from durable_workflow.retry_policy import RetryPolicy
6+
from durable_workflow.retry_policy import RetryPolicy, TransportRetryPolicy
77

88

99
class TestRetryPolicy:
10+
def test_retry_policy_alias_kept_for_backward_compatibility(self) -> None:
11+
assert RetryPolicy is TransportRetryPolicy
12+
1013
def test_should_retry_connection_error(self) -> None:
1114
policy = RetryPolicy(max_attempts=3)
1215
exc = httpx.ConnectError("connection failed")

0 commit comments

Comments
 (0)