Skip to content

Commit 6c74574

Browse files
author
Alex Wang
committed
feat: with retry helper
1 parent 85f2d24 commit 6c74574

5 files changed

Lines changed: 612 additions & 1 deletion

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""Demonstrates with_retry wrapping a wait_for_callback operation.
2+
3+
The callback may fail multiple times before succeeding. The with_retry helper
4+
retries the entire callback flow (including creating a new callback each attempt)
5+
with exponential backoff between attempts.
6+
"""
7+
8+
from typing import Any
9+
10+
from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
11+
from aws_durable_execution_sdk_python.context import DurableContext
12+
from aws_durable_execution_sdk_python.execution import durable_execution
13+
from aws_durable_execution_sdk_python.retries import (
14+
RetryStrategyConfig,
15+
WithRetryConfig,
16+
with_retry,
17+
)
18+
19+
20+
@durable_execution
21+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
22+
"""Handler demonstrating with_retry around a wait_for_callback.
23+
24+
The external system may fail to process the callback multiple times.
25+
with_retry will re-create the callback and wait again on each retry,
26+
with exponential backoff between attempts.
27+
"""
28+
29+
def retryable_callback_flow(ctx: DurableContext, attempt: int) -> str:
30+
"""The retryable block: create a callback and wait for the result."""
31+
32+
def submitter(callback_id: str, _callback_ctx) -> None:
33+
"""Submit the callback ID to an external system."""
34+
# In real usage, this would send the callback_id to an external
35+
# system (e.g., via API call, SQS message, etc.)
36+
print(f"Attempt {attempt}: Submitted callback {callback_id}")
37+
38+
config = WaitForCallbackConfig(
39+
timeout=Duration.from_seconds(30),
40+
heartbeat_timeout=Duration.from_seconds(60),
41+
)
42+
43+
return ctx.wait_for_callback(
44+
submitter, name=f"external-call-attempt-{attempt}", config=config
45+
)
46+
47+
retry_config = WithRetryConfig(
48+
retry_strategy_config=RetryStrategyConfig(
49+
max_attempts=5,
50+
initial_delay=Duration.from_seconds(2),
51+
backoff_rate=1.0,
52+
),
53+
)
54+
55+
result = with_retry(
56+
context,
57+
func=retryable_callback_flow,
58+
config=retry_config,
59+
name="callback-with-retry",
60+
)
61+
62+
return {
63+
"success": True,
64+
"result": result,
65+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Tests for with_retry_callback example.
2+
3+
Demonstrates that with_retry retries the entire wait_for_callback flow
4+
when the callback fails. The external system fails 2 times before
5+
succeeding on the 3rd attempt.
6+
"""
7+
8+
import pytest
9+
from src.with_retry import with_retry_callback
10+
from test.conftest import deserialize_operation_payload
11+
12+
from aws_durable_execution_sdk_python.execution import InvocationStatus
13+
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
14+
15+
16+
@pytest.mark.example
17+
@pytest.mark.durable_execution(
18+
handler=with_retry_callback.handler,
19+
lambda_function_name="With Retry Callback",
20+
)
21+
def test_with_retry_callback_fails_twice_then_succeeds(durable_runner):
22+
"""Test that with_retry retries the callback flow after failures.
23+
24+
The external system sends callback failure 2 times, then succeeds
25+
on the 3rd attempt. with_retry handles the failures and retries
26+
the entire wait_for_callback block.
27+
"""
28+
with durable_runner:
29+
execution_arn = durable_runner.run_async(input=None, timeout=60)
30+
31+
# Attempt 1: external system fails
32+
callback_id_1 = durable_runner.wait_for_callback(
33+
execution_arn=execution_arn,
34+
name="external-call-attempt-1 create callback id",
35+
)
36+
durable_runner.send_callback_failure(
37+
callback_id=callback_id_1,
38+
error=ErrorObject.from_message("External system unavailable"),
39+
)
40+
41+
# Attempt 2: external system fails again
42+
callback_id_2 = durable_runner.wait_for_callback(
43+
execution_arn=execution_arn,
44+
name="external-call-attempt-2 create callback id",
45+
)
46+
durable_runner.send_callback_failure(
47+
callback_id=callback_id_2,
48+
error=ErrorObject.from_message("External system timeout"),
49+
)
50+
51+
# Attempt 3: external system succeeds
52+
callback_id_3 = durable_runner.wait_for_callback(
53+
execution_arn=execution_arn,
54+
name="external-call-attempt-3 create callback id",
55+
)
56+
durable_runner.send_callback_success(
57+
callback_id=callback_id_3,
58+
result="approval granted".encode(),
59+
)
60+
61+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
62+
63+
assert result.status is InvocationStatus.SUCCEEDED
64+
65+
result_data = deserialize_operation_payload(result.result)
66+
assert result_data == {
67+
"success": True,
68+
"result": "approval granted",
69+
}

src/aws_durable_execution_sdk_python/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,25 @@
2424
# Core decorator - used in every durable function
2525
from aws_durable_execution_sdk_python.execution import durable_execution
2626

27+
# Retry helpers
28+
from aws_durable_execution_sdk_python.retries import WithRetryConfig, with_retry
29+
2730
# Essential context types - passed to user functions
2831
from aws_durable_execution_sdk_python.types import StepContext
2932

33+
3034
__all__ = [
3135
"BatchResult",
3236
"DurableContext",
3337
"DurableExecutionsError",
3438
"InvocationError",
3539
"StepContext",
3640
"ValidationError",
41+
"WithRetryConfig",
3742
"__version__",
3843
"durable_execution",
3944
"durable_step",
4045
"durable_wait_for_callback",
4146
"durable_with_child_context",
47+
"with_retry",
4248
]

src/aws_durable_execution_sdk_python/retries.py

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,20 @@
55
import math
66
import re
77
from dataclasses import dataclass, field
8-
from typing import TYPE_CHECKING
8+
from typing import TYPE_CHECKING, TypeVar
99

1010
from aws_durable_execution_sdk_python.config import Duration, JitterStrategy
11+
from aws_durable_execution_sdk_python.exceptions import SuspendExecution
12+
1113

1214
if TYPE_CHECKING:
1315
from collections.abc import Callable
1416

17+
from aws_durable_execution_sdk_python.config import ChildConfig
18+
from aws_durable_execution_sdk_python.types import DurableContext
19+
20+
T = TypeVar("T")
21+
1522
Numeric = int | float
1623

1724
# Default pattern that matches all error messages
@@ -172,3 +179,92 @@ def critical(cls) -> Callable[[Exception, int], RetryDecision]:
172179
jitter_strategy=JitterStrategy.NONE,
173180
)
174181
)
182+
183+
184+
@dataclass(frozen=True)
185+
class WithRetryConfig:
186+
"""Configuration for with_retry.
187+
188+
Wraps the existing RetryStrategyConfig (same config used for step
189+
retries) and adds execution-mode options specific to with_retry.
190+
191+
Attributes:
192+
retry_strategy_config: RetryStrategyConfig controlling retry
193+
behavior (max_attempts, initial_delay, backoff_rate, jitter,
194+
error filtering). The same config used for step retries.
195+
wrap_with_run_in_child_context: Whether to wrap the retry loop in
196+
a child context for isolation. Default True.
197+
child_context_config: Optional ChildConfig forwarded to
198+
run_in_child_context when wrapping is enabled. Ignored when
199+
wrap_with_run_in_child_context is False.
200+
"""
201+
202+
retry_strategy_config: RetryStrategyConfig
203+
wrap_with_run_in_child_context: bool = True
204+
child_context_config: ChildConfig | None = None
205+
206+
207+
def with_retry(
208+
context: DurableContext,
209+
func: Callable[[DurableContext, int], T],
210+
config: WithRetryConfig,
211+
name: str | None = None,
212+
) -> T:
213+
"""Retry a block of durable logic with configurable backoff.
214+
215+
Semantically a run_in_child_context with a retry policy wrapped around
216+
it — on failure the whole function body is re-run from the beginning
217+
with configurable backoff.
218+
219+
Unlike context.step() which retries a single atomic operation,
220+
with_retry retries an entire function body that may contain multiple
221+
durable operations (steps, waits, invokes, callbacks, etc.).
222+
223+
Uses the existing RetryStrategyConfig (via WithRetryConfig), so retry
224+
configuration is consistent across the SDK.
225+
226+
Args:
227+
context: The DurableContext to execute within.
228+
func: A callable that accepts (DurableContext, attempt: int) and
229+
returns T. The function body may contain multiple durable
230+
operations.
231+
config: WithRetryConfig containing a RetryStrategyConfig plus
232+
execution-mode options.
233+
name: Optional name for the child context and backoff waits.
234+
When provided, backoff waits are named
235+
"{name}-backoff-{attempt}".
236+
237+
Returns:
238+
The result of func on successful execution.
239+
240+
Raises:
241+
The exception from the last failed attempt when retries are
242+
exhausted or the retry strategy returns should_retry=False.
243+
SuspendExecution: Re-raised immediately (SDK control flow).
244+
"""
245+
retry_strategy = create_retry_strategy(config.retry_strategy_config)
246+
247+
def run_loop(ctx: DurableContext) -> T:
248+
attempt = 0
249+
while True:
250+
attempt += 1
251+
try:
252+
return func(ctx, attempt)
253+
except SuspendExecution:
254+
raise # SDK control flow - never intercept
255+
except Exception as err:
256+
decision = retry_strategy(err, attempt)
257+
if not decision.should_retry:
258+
raise
259+
wait_name = f"{name}-backoff-{attempt}" if name else None
260+
print("Going to wait " + wait_name + " " + str(err))
261+
ctx.wait(duration=decision.delay, name=wait_name)
262+
263+
if config.wrap_with_run_in_child_context:
264+
return context.run_in_child_context(
265+
run_loop,
266+
name=name,
267+
config=config.child_context_config,
268+
)
269+
else:
270+
return run_loop(context)

0 commit comments

Comments
 (0)