Skip to content

Commit 64efedf

Browse files
Refactor RetryStrategy to be async
This refactors retry strategies to be async. This is needed for when strategies need to internally wait or if they need to protect access to a shared resource.
1 parent 7878b12 commit 64efedf

14 files changed

Lines changed: 473 additions & 436 deletions

File tree

codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private void generateService(PythonWriter writer) {
7171
}
7272

7373
writer.addDependency(SmithyPythonDependency.SMITHY_CORE);
74-
writer.addImport("smithy_core.retries", "RetryStrategyResolver");
74+
writer.addImport("smithy_core.aio.retries", "RetryStrategyResolver");
7575
writer.write("""
7676
def __init__(self, config: $1T | None = None, plugins: list[$2T] | None = None):
7777
$3C
@@ -215,7 +215,7 @@ private void writeSharedOperationInit(
215215
writer.addImport("smithy_core.aio.client", "RequestPipeline");
216216
writer.addImport("smithy_core.exceptions", "ExpectationNotMetError");
217217
writer.addImport("smithy_core.retries", "RetryStrategyOptions");
218-
writer.addImport("smithy_core.interfaces.retries", "RetryStrategy");
218+
writer.addImport("smithy_core.aio.interfaces.retries", "RetryStrategy");
219219
writer.addStdlibImport("copy", "deepcopy");
220220

221221
writer.write("""

codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ private void generateRequestTest(OperationShape operation, HttpRequestTestCase t
181181
} else {
182182
path = "";
183183
}
184-
writer.addImport("smithy_core.retries", "SimpleRetryStrategy");
184+
writer.addImport("smithy_core.aio.retries", "SimpleRetryStrategy");
185185
writeClientBlock(context.symbolProvider().toSymbol(service), testCase, Optional.of(() -> {
186186
writer.write("""
187187
config = $T(

codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public final class ConfigGenerator implements Runnable {
5858
.name("RetryStrategy | RetryStrategyOptions")
5959
.addReference(Symbol.builder()
6060
.name("RetryStrategy")
61-
.namespace("smithy_core.interfaces.retries", ".")
61+
.namespace("smithy_core.aio.interfaces.retries", ".")
6262
.addDependency(SmithyPythonDependency.SMITHY_CORE)
6363
.build())
6464
.addReference(Symbol.builder()

packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
from smithy_core import URI
1111
from smithy_core.aio.interfaces.identity import IdentityResolver
12+
from smithy_core.aio.interfaces.retries import RetryStrategy
13+
from smithy_core.aio.retries import SimpleRetryStrategy
1214
from smithy_core.exceptions import SmithyIdentityError
13-
from smithy_core.interfaces.retries import RetryStrategy
14-
from smithy_core.retries import SimpleRetryStrategy
1515
from smithy_http import Field, Fields
1616
from smithy_http.aio import HTTPRequest
1717
from smithy_http.aio.interfaces import HTTPClient

packages/smithy-aws-core/tests/unit/identity/test_imds.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
TokenCache,
1717
)
1818
from smithy_core import URI
19-
from smithy_core.retries import SimpleRetryStrategy
19+
from smithy_core.aio.retries import SimpleRetryStrategy
2020
from smithy_http.aio import HTTPRequest
2121

2222

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "breaking",
3+
"description": "Refactored retry strategies to be async, allowing them to wait internally or use async synchronization primitives if necessary."
4+
}

packages/smithy-core/src/smithy_core/aio/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
)
2323
from ..interfaces import Endpoint, TypedProperties
2424
from ..interfaces.auth import AuthOption, AuthSchemeResolver
25-
from ..interfaces.retries import RetryStrategy
2625
from ..schemas import APIOperation
2726
from ..serializers import SerializeableShape
2827
from ..shapes import ShapeID
@@ -37,6 +36,7 @@
3736
)
3837
from .interfaces.auth import AuthScheme
3938
from .interfaces.eventstream import EventReceiver
39+
from .interfaces.retries import RetryStrategy
4040
from .utils import seek
4141

4242
if TYPE_CHECKING:
@@ -330,7 +330,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
330330
return await self._handle_attempt(call, request_context, request_future)
331331

332332
retry_strategy = call.retry_strategy
333-
retry_token = retry_strategy.acquire_initial_retry_token(
333+
retry_token = await retry_strategy.acquire_initial_retry_token(
334334
token_scope=call.retry_scope
335335
)
336336

@@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
349349

350350
if isinstance(output_context.response, Exception):
351351
try:
352-
retry_token = retry_strategy.refresh_retry_token_for_retry(
352+
retry_token = await retry_strategy.refresh_retry_token_for_retry(
353353
token_to_renew=retry_token,
354354
error=output_context.response,
355355
)
@@ -364,7 +364,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
364364

365365
await seek(request_context.transport_request.body, 0)
366366
else:
367-
retry_strategy.record_success(token=retry_token)
367+
await retry_strategy.record_success(token=retry_token)
368368
return output_context
369369

370370
async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Protocol, runtime_checkable
4+
5+
from ...interfaces.retries import RetryBackoffStrategy, RetryToken
6+
7+
8+
@runtime_checkable
9+
class RetryStrategy(Protocol):
10+
"""Issuer of :py:class:`RetryToken`s."""
11+
12+
backoff_strategy: RetryBackoffStrategy
13+
"""The strategy used by returned tokens to compute delay duration values."""
14+
15+
max_attempts: int
16+
"""Upper limit on total attempt count (initial attempt plus retries)."""
17+
18+
async def acquire_initial_retry_token(
19+
self, *, token_scope: str | None = None
20+
) -> RetryToken:
21+
"""Create a base retry token for the start of a request.
22+
23+
:param token_scope: An arbitrary string accepted by the retry strategy to
24+
separate tokens into scopes.
25+
:returns: A retry token, to be used for determining the retry delay, refreshing
26+
the token after a failure, and recording success after success.
27+
:raises RetryError: If the retry strategy has no available tokens.
28+
"""
29+
...
30+
31+
async def refresh_retry_token_for_retry(
32+
self, *, token_to_renew: RetryToken, error: Exception
33+
) -> RetryToken:
34+
"""Replace an existing retry token from a failed attempt with a new token.
35+
36+
After a failed operation call, this method is called to exchange a retry token
37+
that was previously obtained by calling :py:func:`acquire_initial_retry_token`
38+
or this method with a new retry token for the next attempt. This method can
39+
either choose to allow another retry and send a new or updated token, or reject
40+
the retry attempt and raise the error.
41+
42+
:param token_to_renew: The token used for the previous failed attempt.
43+
:param error: The error that triggered the need for a retry.
44+
:raises RetryError: If no further retry attempts are allowed.
45+
"""
46+
...
47+
48+
async def record_success(self, *, token: RetryToken) -> None:
49+
"""Return token after successful completion of an operation.
50+
51+
Upon successful completion of the operation, a user calls this function to
52+
record that the operation was successful.
53+
54+
:param token: The token used for the previous successful attempt.
55+
"""
56+
...
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from functools import lru_cache
4+
from typing import Any, Literal
5+
6+
from ..exceptions import RetryError
7+
from ..interfaces import retries as retries_interface
8+
from ..retries import (
9+
ExponentialBackoffJitterType,
10+
ExponentialRetryBackoffStrategy,
11+
RetryStrategyOptions,
12+
SimpleRetryToken,
13+
StandardRetryQuota,
14+
StandardRetryToken,
15+
)
16+
from .interfaces.retries import RetryStrategy
17+
18+
RetryStrategyType = Literal["simple", "standard"]
19+
20+
21+
class RetryStrategyResolver:
22+
"""Retry strategy resolver that caches retry strategies based on configuration options.
23+
24+
This resolver caches retry strategy instances based on their configuration to reuse existing
25+
instances of RetryStrategy with the same settings. Uses LRU cache for thread-safe caching.
26+
"""
27+
28+
async def resolve_retry_strategy(
29+
self, *, retry_strategy: RetryStrategy | RetryStrategyOptions | None
30+
) -> RetryStrategy:
31+
"""Resolve a retry strategy from the provided options, using cache when possible.
32+
33+
:param retry_strategy: An explicitly configured retry strategy or options for creating one.
34+
"""
35+
if isinstance(retry_strategy, RetryStrategy):
36+
return retry_strategy
37+
elif retry_strategy is None:
38+
retry_strategy = RetryStrategyOptions()
39+
elif not isinstance(retry_strategy, RetryStrategyOptions): # type: ignore[reportUnnecessaryIsInstance]
40+
raise TypeError(
41+
f"retry_strategy must be RetryStrategy, RetryStrategyOptions, or None, "
42+
f"got {type(retry_strategy).__name__}"
43+
)
44+
return self._create_retry_strategy(
45+
retry_strategy.retry_mode, retry_strategy.max_attempts
46+
)
47+
48+
@lru_cache
49+
def _create_retry_strategy(
50+
self, retry_mode: RetryStrategyType, max_attempts: int | None
51+
) -> RetryStrategy:
52+
kwargs = {"max_attempts": max_attempts}
53+
filtered_kwargs: dict[str, Any] = {
54+
k: v for k, v in kwargs.items() if v is not None
55+
}
56+
match retry_mode:
57+
case "simple":
58+
return SimpleRetryStrategy(**filtered_kwargs)
59+
case "standard":
60+
return StandardRetryStrategy(**filtered_kwargs)
61+
case _:
62+
raise ValueError(f"Unknown retry mode: {retry_mode}")
63+
64+
65+
class SimpleRetryStrategy:
66+
def __init__(
67+
self,
68+
*,
69+
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
70+
max_attempts: int = 5,
71+
):
72+
"""Retry strategy that simply invokes the given backoff strategy.
73+
74+
:param backoff_strategy: The backoff strategy used by returned tokens to compute
75+
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.
76+
77+
:param max_attempts: Upper limit on total number of attempts made, including
78+
initial attempt and retries.
79+
"""
80+
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy()
81+
self.max_attempts = max_attempts
82+
83+
async def acquire_initial_retry_token(
84+
self, *, token_scope: str | None = None
85+
) -> SimpleRetryToken:
86+
"""Create a base retry token for the start of a request.
87+
88+
:param token_scope: This argument is ignored by this retry strategy.
89+
"""
90+
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
91+
return SimpleRetryToken(retry_count=0, retry_delay=retry_delay)
92+
93+
async def refresh_retry_token_for_retry(
94+
self,
95+
*,
96+
token_to_renew: retries_interface.RetryToken,
97+
error: Exception,
98+
) -> SimpleRetryToken:
99+
"""Replace an existing retry token from a failed attempt with a new token.
100+
101+
This retry strategy always returns a token until the attempt count stored in
102+
the new token exceeds the ``max_attempts`` value.
103+
104+
:param token_to_renew: The token used for the previous failed attempt.
105+
:param error: The error that triggered the need for a retry.
106+
:raises RetryError: If no further retry attempts are allowed.
107+
"""
108+
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
109+
retry_count = token_to_renew.retry_count + 1
110+
if retry_count >= self.max_attempts:
111+
raise RetryError(
112+
f"Reached maximum number of allowed attempts: {self.max_attempts}"
113+
) from error
114+
retry_delay = self.backoff_strategy.compute_next_backoff_delay(retry_count)
115+
return SimpleRetryToken(retry_count=retry_count, retry_delay=retry_delay)
116+
else:
117+
raise RetryError(f"Error is not retryable: {error}") from error
118+
119+
async def record_success(self, *, token: retries_interface.RetryToken) -> None:
120+
"""Not used by this retry strategy."""
121+
122+
def __deepcopy__(self, memo: Any) -> "SimpleRetryStrategy":
123+
return self
124+
125+
126+
class StandardRetryStrategy:
127+
def __init__(
128+
self,
129+
*,
130+
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
131+
max_attempts: int = 3,
132+
retry_quota: StandardRetryQuota | None = None,
133+
):
134+
"""Standard retry strategy using truncated binary exponential backoff
135+
with full jitter.
136+
137+
:param backoff_strategy: The backoff strategy used by returned tokens to compute
138+
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.
139+
140+
:param max_attempts: Upper limit on total number of attempts made, including
141+
initial attempt and retries.
142+
143+
:param retry_quota: The retry quota to use for managing retry capacity. Defaults
144+
to a new :py:class:`StandardRetryQuota` instance.
145+
"""
146+
if max_attempts < 0:
147+
raise ValueError(
148+
f"max_attempts must be a non-negative integer, got {max_attempts}"
149+
)
150+
151+
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy(
152+
backoff_scale_value=1,
153+
max_backoff=20,
154+
jitter_type=ExponentialBackoffJitterType.FULL,
155+
)
156+
self.max_attempts = max_attempts
157+
self._retry_quota = retry_quota or StandardRetryQuota()
158+
159+
async def acquire_initial_retry_token(
160+
self, *, token_scope: str | None = None
161+
) -> StandardRetryToken:
162+
"""Create a base retry token for the start of a request.
163+
164+
:param token_scope: This argument is ignored by this retry strategy.
165+
"""
166+
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
167+
return StandardRetryToken(retry_count=0, retry_delay=retry_delay)
168+
169+
async def refresh_retry_token_for_retry(
170+
self,
171+
*,
172+
token_to_renew: retries_interface.RetryToken,
173+
error: Exception,
174+
) -> StandardRetryToken:
175+
"""Replace an existing retry token from a failed attempt with a new token.
176+
177+
This retry strategy always returns a token until the attempt count stored in
178+
the new token exceeds the ``max_attempts`` value.
179+
180+
:param token_to_renew: The token used for the previous failed attempt.
181+
:param error: The error that triggered the need for a retry.
182+
:raises RetryError: If no further retry attempts are allowed.
183+
"""
184+
if not isinstance(token_to_renew, StandardRetryToken):
185+
raise TypeError(
186+
f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}"
187+
)
188+
189+
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
190+
retry_count = token_to_renew.retry_count + 1
191+
if retry_count >= self.max_attempts:
192+
raise RetryError(
193+
f"Reached maximum number of allowed attempts: {self.max_attempts}"
194+
) from error
195+
196+
# Acquire additional quota for this retry attempt
197+
# (may raise a RetryError if none is available)
198+
quota_acquired = self._retry_quota.acquire(error=error)
199+
200+
if error.retry_after is not None:
201+
retry_delay = error.retry_after
202+
else:
203+
retry_delay = self.backoff_strategy.compute_next_backoff_delay(
204+
retry_count
205+
)
206+
207+
return StandardRetryToken(
208+
retry_count=retry_count,
209+
retry_delay=retry_delay,
210+
quota_acquired=quota_acquired,
211+
)
212+
else:
213+
raise RetryError(f"Error is not retryable: {error}") from error
214+
215+
async def record_success(self, *, token: retries_interface.RetryToken) -> None:
216+
"""Release retry quota back based on the amount consumed by the last retry.
217+
218+
:param token: The token used for the previous successful attempt.
219+
"""
220+
if not isinstance(token, StandardRetryToken):
221+
raise TypeError(
222+
f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}"
223+
)
224+
self._retry_quota.release(release_amount=token.quota_acquired)
225+
226+
def __deepcopy__(self, memo: Any) -> "StandardRetryStrategy":
227+
return self

0 commit comments

Comments
 (0)