Skip to content

Commit 5f1398f

Browse files
authored
chore: fixed timeout retry strategy (#504)
* chore: adding fixed timeout retry strategy * chore: grab client timeout out of client call details * chore: update synch retry interceptor * chore: linting * chore: remove unnecessary decoding logic and leave a comment * chore: add comment about client call details value
1 parent 1230510 commit 5f1398f

5 files changed

Lines changed: 157 additions & 4 deletions

File tree

src/momento/internal/aio/_retry_interceptor.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import logging
5+
from datetime import datetime, timedelta
56
from typing import Callable
67

78
import grpc
@@ -34,20 +35,44 @@ async def intercept_unary_unary(
3435
client_call_details: grpc.aio._interceptor.ClientCallDetails,
3536
request: grpc.aio._typing.RequestType,
3637
) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType:
38+
call = None
3739
attempt_number = 1
40+
# The overall deadline is calculated from the timeout set on the client call details.
41+
# That value is set in our gRPC configurations and, while typed as optional, will never be None here.
42+
overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0)
43+
# variable to capture the penultimate call to a deadline-aware retry strategy, which
44+
# will hold the call object before a terminal DEADLINE_EXCEEDED response is returned
45+
last_call = None
46+
3847
while True:
48+
if attempt_number > 1:
49+
retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline)
50+
if retry_deadline is not None:
51+
client_call_details = grpc.aio._interceptor.ClientCallDetails(
52+
client_call_details.method,
53+
retry_deadline,
54+
client_call_details.metadata,
55+
client_call_details.credentials,
56+
client_call_details.wait_for_ready,
57+
)
58+
last_call = call
59+
3960
call = await continuation(client_call_details, request)
4061
response_code = await call.code()
4162

4263
if response_code == grpc.StatusCode.OK:
4364
return call
4465

4566
retryTime = self._retry_strategy.determine_when_to_retry(
46-
RetryableProps(response_code, client_call_details.method.decode("utf-8"), attempt_number)
67+
# Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded
68+
# but the sync interceptor gets it as a string.
69+
RetryableProps(
70+
response_code, client_call_details.method.decode("utf-8"), attempt_number, overall_deadline
71+
)
4772
)
4873

4974
if retryTime is None:
50-
return call
75+
return last_call or call
5176

5277
attempt_number += 1
5378
await asyncio.sleep(retryTime)

src/momento/internal/synchronous/_retry_interceptor.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
import time
5+
from datetime import datetime, timedelta
56
from typing import Callable, TypeVar
67

78
import grpc
@@ -35,20 +36,42 @@ def intercept_unary_unary(
3536
client_call_details: grpc.ClientCallDetails,
3637
request: RequestType,
3738
) -> InterceptorCall | ResponseType:
39+
call = None
3840
attempt_number = 1
41+
# The overall deadline is calculated from the timeout set on the client call details.
42+
# That value is set in our gRPC configurations and, while typed as optional, will never be None here.
43+
overall_deadline = datetime.now() + timedelta(seconds=client_call_details.timeout or 0.0)
44+
# variable to capture the penultimate call to a deadline-aware retry strategy, which
45+
# will hold the call object before a terminal DEADLINE_EXCEEDED response is returned
46+
last_call = None
47+
3948
while True:
49+
if attempt_number > 1:
50+
retry_deadline = self._retry_strategy.calculate_retry_deadline(overall_deadline)
51+
if retry_deadline is not None:
52+
client_call_details = grpc.aio._interceptor.ClientCallDetails(
53+
client_call_details.method,
54+
retry_deadline,
55+
client_call_details.metadata,
56+
client_call_details.credentials,
57+
client_call_details.wait_for_ready,
58+
)
59+
last_call = call
60+
4061
call = continuation(client_call_details, request)
4162
response_code = call.code() # type: ignore[attr-defined] # noqa: F401
4263

4364
if response_code == grpc.StatusCode.OK:
4465
return call
4566

4667
retryTime = self._retry_strategy.determine_when_to_retry(
47-
RetryableProps(response_code, client_call_details.method, attempt_number)
68+
# Note: the async interceptor gets `client_call_details.method` as a binary string that needs to be decoded
69+
# but the sync interceptor gets it as a string.
70+
RetryableProps(response_code, client_call_details.method, attempt_number, overall_deadline)
4871
)
4972

5073
if retryTime is None:
51-
return call
74+
return last_call or call
5275

5376
attempt_number += 1
5477
time.sleep(retryTime)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import logging
2+
import random
3+
from datetime import datetime, timedelta
4+
from typing import Optional
5+
6+
import grpc
7+
8+
from .default_eligibility_strategy import DefaultEligibilityStrategy
9+
from .eligibility_strategy import EligibilityStrategy
10+
from .retry_strategy import RetryStrategy
11+
from .retryable_props import RetryableProps
12+
13+
logger = logging.getLogger("fixed-timeout-retry-strategy")
14+
15+
16+
class FixedTimeoutRetryStrategy(RetryStrategy):
17+
def __init__(
18+
self,
19+
*,
20+
retry_timeout_millis: int,
21+
retry_delay_interval_millis: int,
22+
eligibility_strategy: DefaultEligibilityStrategy = DefaultEligibilityStrategy(),
23+
):
24+
self._eligibility_strategy: EligibilityStrategy = eligibility_strategy
25+
self._retry_timeout_millis: int = retry_timeout_millis
26+
self._retry_delay_interval_millis: int = retry_delay_interval_millis
27+
28+
def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]:
29+
"""Determines whether a grpc call can be retried and how long to wait before that retry.
30+
31+
Args:
32+
props (RetryableProps): Information about the grpc call, its last invocation, and how many times the call
33+
has been made.
34+
35+
:Returns
36+
The time in seconds before the next retry should occur or None if no retry should be attempted.
37+
"""
38+
logger.debug(
39+
"Determining whether request is eligible for retry; status code: %s, request type: %s, attemptNumber: %d",
40+
props.grpc_status, # type: ignore[misc]
41+
props.grpc_method,
42+
props.attempt_number,
43+
)
44+
45+
if props.overall_deadline is None:
46+
logger.debug("Overall deadline is None; not retrying.")
47+
return None
48+
49+
# If a retry attempt's timeout has passed but the client's overall timeout has not yet passed,
50+
# we should reset the deadline and retry.
51+
if (
52+
props.attempt_number > 0
53+
and props.grpc_status == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore[misc]
54+
and props.overall_deadline > datetime.now()
55+
):
56+
return self.get_jitter_in_millis(props)
57+
58+
if self._eligibility_strategy.is_eligible_for_retry(props) is False:
59+
logger.debug(
60+
"Request path: %s; retryable status code: %s. Request is not retryable.",
61+
props.grpc_method,
62+
props.grpc_status, # type: ignore[misc]
63+
)
64+
return None
65+
66+
return self.get_jitter_in_millis(props)
67+
68+
def get_jitter_in_millis(self, props: RetryableProps) -> float:
69+
timeout_with_jitter = self.add_jitter(self._retry_delay_interval_millis)
70+
logger.debug(
71+
"Determined request is retryable; retrying after %d ms: [method: %s, status: %s, attempt: %d]",
72+
timeout_with_jitter,
73+
props.grpc_method,
74+
props.grpc_status, # type: ignore[misc]
75+
props.attempt_number,
76+
)
77+
return timeout_with_jitter / 1000.0
78+
79+
def add_jitter(self, base_delay: int) -> int:
80+
return int((0.2 * random.random() + 0.9) * float(base_delay))
81+
82+
def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]:
83+
"""Calculates the deadline for a retry attempt using the retry timeout, but clips it to the overall deadline if the overall deadline is sooner.
84+
85+
Args:
86+
overall_deadline (datetime): The overall deadline for the operation.
87+
88+
Returns:
89+
float: The calculated retry deadline.
90+
"""
91+
logger.debug(
92+
f"Calculating retry deadline:\nnow: {datetime.now()}\noverall deadline: {overall_deadline}\n"
93+
+ f"retry timeout millis: {self._retry_timeout_millis}"
94+
)
95+
if datetime.now() + timedelta(milliseconds=self._retry_timeout_millis) > overall_deadline:
96+
return (overall_deadline - datetime.now()).total_seconds() * 1000
97+
return self._retry_timeout_millis

src/momento/retry/retry_strategy.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import ABC, abstractmethod
2+
from datetime import datetime
23
from typing import Optional
34

45
from .retryable_props import RetryableProps
@@ -8,3 +9,7 @@ class RetryStrategy(ABC):
89
@abstractmethod
910
def determine_when_to_retry(self, props: RetryableProps) -> Optional[float]:
1011
pass
12+
13+
# Currently used only by the FixedTimeoutRetryStrategy
14+
def calculate_retry_deadline(self, overall_deadline: datetime) -> Optional[float]:
15+
return None

src/momento/retry/retryable_props.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from dataclasses import dataclass
2+
from datetime import datetime
3+
from typing import Optional
24

35
import grpc
46

@@ -9,3 +11,4 @@ class RetryableProps:
911
grpc_status: grpc.StatusCode
1012
grpc_method: str
1113
attempt_number: int
14+
overall_deadline: Optional[datetime] = None

0 commit comments

Comments
 (0)