Skip to content

Commit 917e5ad

Browse files
author
Alex Wang
committed
feat: Add with_retry helper
1 parent 0e30701 commit 917e5ad

6 files changed

Lines changed: 627 additions & 1 deletion

File tree

packages/aws-durable-execution-sdk-python-examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,17 @@
602602
"ExecutionTimeout": 300
603603
},
604604
"path": "./src/parallel/parallel_with_named_branches.py"
605+
},
606+
{
607+
"name": "With Retry Callback",
608+
"description": "Demonstrates with_retry wrapping a wait_for_callback operation with exponential backoff",
609+
"handler": "with_retry_callback.handler",
610+
"integration": true,
611+
"durableConfig": {
612+
"RetentionPeriodInDays": 7,
613+
"ExecutionTimeout": 300
614+
},
615+
"path": "./src/with_retry/with_retry_callback.py"
605616
}
606617
]
607618
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""Demonstrates with_retry wrapping a wait_for_callback operation.
2+
3+
The callback may fail multiple times before succeeding. The with_retry helper
4+
retries the entire callback flow (including creating a new callback each attempt)
5+
with exponential backoff between attempts.
6+
"""
7+
8+
from typing import Any
9+
10+
from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
11+
from aws_durable_execution_sdk_python.context import DurableContext
12+
from aws_durable_execution_sdk_python.execution import durable_execution
13+
from aws_durable_execution_sdk_python.retries import (
14+
RetryStrategyConfig,
15+
WithRetryConfig,
16+
create_retry_strategy,
17+
with_retry,
18+
)
19+
20+
21+
@durable_execution
22+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
23+
"""Handler demonstrating with_retry around a wait_for_callback.
24+
25+
The external system may fail to process the callback multiple times.
26+
with_retry will re-create the callback and wait again on each retry,
27+
with exponential backoff between attempts.
28+
"""
29+
30+
def retryable_callback_flow(ctx: DurableContext, attempt: int) -> str:
31+
"""The retryable block: create a callback and wait for the result."""
32+
33+
def submitter(callback_id: str, _callback_ctx) -> None:
34+
"""Submit the callback ID to an external system."""
35+
# In real usage, this would send the callback_id to an external
36+
# system (e.g., via API call, SQS message, etc.)
37+
pass
38+
39+
config = WaitForCallbackConfig(
40+
timeout=Duration.from_seconds(30),
41+
heartbeat_timeout=Duration.from_seconds(60),
42+
)
43+
44+
return ctx.wait_for_callback(
45+
submitter, name=f"external-call-attempt-{attempt}", config=config
46+
)
47+
48+
retry_config = WithRetryConfig(
49+
retry_strategy=create_retry_strategy(
50+
RetryStrategyConfig(
51+
max_attempts=5,
52+
initial_delay=Duration.from_seconds(2),
53+
backoff_rate=1.0,
54+
)
55+
),
56+
)
57+
58+
result = with_retry(
59+
context,
60+
func=retryable_callback_flow,
61+
config=retry_config,
62+
name="callback-with-retry",
63+
)
64+
65+
return {
66+
"success": True,
67+
"result": result,
68+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Tests for with_retry_callback example.
2+
3+
Demonstrates that with_retry retries the entire wait_for_callback flow
4+
when the callback fails. The external system fails 2 times before
5+
succeeding on the 3rd attempt.
6+
"""
7+
8+
import pytest
9+
from src.with_retry import with_retry_callback
10+
from test.conftest import deserialize_operation_payload
11+
12+
from aws_durable_execution_sdk_python.execution import InvocationStatus
13+
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
14+
15+
16+
@pytest.mark.example
17+
@pytest.mark.durable_execution(
18+
handler=with_retry_callback.handler,
19+
lambda_function_name="With Retry Callback",
20+
)
21+
def test_with_retry_callback_fails_twice_then_succeeds(durable_runner):
22+
"""Test that with_retry retries the callback flow after failures.
23+
24+
The external system sends callback failure 2 times, then succeeds
25+
on the 3rd attempt. with_retry handles the failures and retries
26+
the entire wait_for_callback block.
27+
"""
28+
with durable_runner:
29+
execution_arn = durable_runner.run_async(input=None, timeout=60)
30+
31+
# Attempt 1: external system fails
32+
callback_id_1 = durable_runner.wait_for_callback(
33+
execution_arn=execution_arn,
34+
name="external-call-attempt-1 create callback id",
35+
)
36+
durable_runner.send_callback_failure(
37+
callback_id=callback_id_1,
38+
error=ErrorObject.from_message("External system unavailable"),
39+
)
40+
41+
# Attempt 2: external system fails again
42+
callback_id_2 = durable_runner.wait_for_callback(
43+
execution_arn=execution_arn,
44+
name="external-call-attempt-2 create callback id",
45+
)
46+
durable_runner.send_callback_failure(
47+
callback_id=callback_id_2,
48+
error=ErrorObject.from_message("External system timeout"),
49+
)
50+
51+
# Attempt 3: external system succeeds
52+
callback_id_3 = durable_runner.wait_for_callback(
53+
execution_arn=execution_arn,
54+
name="external-call-attempt-3 create callback id",
55+
)
56+
durable_runner.send_callback_success(
57+
callback_id=callback_id_3,
58+
result="approval granted".encode(),
59+
)
60+
61+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
62+
63+
assert result.status is InvocationStatus.SUCCEEDED
64+
65+
result_data = deserialize_operation_payload(result.result)
66+
assert result_data == {
67+
"success": True,
68+
"result": "approval granted",
69+
}

packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
# Core decorator - used in every durable function
2727
from aws_durable_execution_sdk_python.execution import durable_execution
28+
from aws_durable_execution_sdk_python.retries import WithRetryConfig, with_retry
2829

2930
# Essential context types - passed to user functions
3031
from aws_durable_execution_sdk_python.types import StepContext
@@ -38,10 +39,12 @@
3839
"ParallelBranch",
3940
"StepContext",
4041
"ValidationError",
42+
"WithRetryConfig",
4143
"__version__",
4244
"durable_execution",
4345
"durable_parallel_branch",
4446
"durable_step",
4547
"durable_wait_for_callback",
4648
"durable_with_child_context",
49+
"with_retry",
4750
]

packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/retries.py

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,20 @@
55
import math
66
import re
77
from dataclasses import dataclass, field
8-
from typing import TYPE_CHECKING
8+
from typing import TYPE_CHECKING, Generic, TypeVar
99

1010
from aws_durable_execution_sdk_python.config import Duration, JitterStrategy
11+
from aws_durable_execution_sdk_python.exceptions import SuspendExecution
12+
1113

1214
if TYPE_CHECKING:
1315
from collections.abc import Callable
1416

17+
from aws_durable_execution_sdk_python.config import ChildConfig
18+
from aws_durable_execution_sdk_python.types import DurableContext
19+
20+
T = TypeVar("T")
21+
1522
Numeric = int | float
1623

1724
# Default pattern that matches all error messages
@@ -172,3 +179,99 @@ def critical(cls) -> Callable[[Exception, int], RetryDecision]:
172179
jitter_strategy=JitterStrategy.NONE,
173180
)
174181
)
182+
183+
184+
@dataclass(frozen=True)
185+
class WithRetryConfig(Generic[T]):
186+
"""Configuration for with_retry.
187+
188+
Holds a retry strategy callable (same type used by StepConfig) and
189+
adds execution-mode options specific to with_retry.
190+
191+
Attributes:
192+
retry_strategy: A callable that decides whether to retry and with
193+
what delay. Accepts (Exception, int) and returns RetryDecision.
194+
Use create_retry_strategy(RetryStrategyConfig(...)) to build one,
195+
or provide a custom callable. If None, the default retry strategy
196+
(RetryStrategyConfig defaults) is used.
197+
wrap_with_run_in_child_context: Whether to wrap the retry loop in
198+
a child context for isolation. Default True. When True, final
199+
failure is rethrown as CallableRuntimeError with the original
200+
exception on `cause`. When False, the original error is
201+
rethrown unchanged.
202+
child_context_config: Optional ChildConfig forwarded to
203+
run_in_child_context when wrapping is enabled. Ignored when
204+
wrap_with_run_in_child_context is False.
205+
"""
206+
207+
retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
208+
wrap_with_run_in_child_context: bool = True
209+
child_context_config: ChildConfig[T] | None = None
210+
211+
212+
def with_retry(
213+
context: DurableContext,
214+
func: Callable[[DurableContext, int], T],
215+
config: WithRetryConfig[T],
216+
name: str | None = None,
217+
) -> T:
218+
"""Retry a block of durable logic with configurable backoff.
219+
220+
Semantically a run_in_child_context with a retry policy wrapped around
221+
it — on failure the whole function body is re-run from the beginning
222+
with configurable backoff.
223+
224+
Unlike context.step() which retries a single atomic operation,
225+
with_retry retries an entire function body that may contain multiple
226+
durable operations (steps, waits, invokes, callbacks, etc.).
227+
228+
Args:
229+
context: The DurableContext to execute within.
230+
func: A callable that accepts (DurableContext, attempt: int) and
231+
returns T. The function body may contain multiple durable
232+
operations.
233+
config: WithRetryConfig containing a retry strategy callable plus
234+
execution-mode options.
235+
name: Optional name for the child context and backoff waits.
236+
When provided, backoff waits are named
237+
"{name}-backoff-{attempt}".
238+
239+
Returns:
240+
The result of func on successful execution.
241+
242+
Raises:
243+
The exception from the last failed attempt when retries are
244+
exhausted or the retry strategy returns should_retry=False.
245+
When wrap_with_run_in_child_context is True (default),
246+
ChildOperationExecutor.process wraps non-InvocationError /
247+
SuspendExecution exceptions as CallableRuntimeError with the
248+
original error in cause.
249+
When wrap_with_run_in_child_context is False, the original
250+
exception propagates unchanged.
251+
SuspendExecution: Re-raised immediately (SDK control flow).
252+
"""
253+
retry_strategy = config.retry_strategy or create_retry_strategy()
254+
255+
def run_loop(ctx: DurableContext) -> T:
256+
attempt = 0
257+
while True:
258+
attempt += 1
259+
try:
260+
return func(ctx, attempt)
261+
except SuspendExecution:
262+
raise # SDK control flow - never intercept
263+
except Exception as err:
264+
decision = retry_strategy(err, attempt)
265+
if not decision.should_retry:
266+
raise
267+
wait_name = f"{name}-backoff-{attempt}" if name else None
268+
ctx.wait(duration=decision.delay, name=wait_name)
269+
270+
if config.wrap_with_run_in_child_context:
271+
return context.run_in_child_context(
272+
run_loop,
273+
name=name,
274+
config=config.child_context_config,
275+
)
276+
else:
277+
return run_loop(context)

0 commit comments

Comments
 (0)