Skip to content

Commit f353484

Browse files
authored
fix(retry): enforce overall deadline budget across attempts
- Without a shared deadline, callers with timeout=10 and max_attempts=3 could wait up to 3*(10+backoff) seconds total. - Adds total_timeout to RetryConfig so the wall-clock budget is enforced across all attempts: backoff sleeps are clipped to remaining time, and the loop exits immediately when budget is exhausted. Closes #58
1 parent 1aa2fc3 commit f353484

2 files changed

Lines changed: 100 additions & 2 deletions

File tree

sdk/src/opendecree/_retry.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class RetryConfig:
2828
max_backoff: Maximum backoff duration in seconds.
2929
multiplier: Backoff multiplier between attempts.
3030
retryable_codes: gRPC status codes that trigger a retry.
31+
total_timeout: Overall wall-clock budget in seconds shared across all
32+
attempts. When set, backoff sleeps are clipped to the remaining
33+
budget and no further attempt is made once the budget is exhausted.
34+
None means no global limit (original behavior).
3135
"""
3236

3337
max_attempts: int = 3
@@ -40,6 +44,7 @@ class RetryConfig:
4044
grpc.StatusCode.DEADLINE_EXCEEDED,
4145
)
4246
)
47+
total_timeout: float | None = None
4348

4449

4550
def write_safe_config(base: RetryConfig | None) -> RetryConfig | None:
@@ -60,6 +65,7 @@ def write_safe_config(base: RetryConfig | None) -> RetryConfig | None:
6065
max_backoff=base.max_backoff,
6166
multiplier=base.multiplier,
6267
retryable_codes=safe_codes,
68+
total_timeout=base.total_timeout,
6369
)
6470

6571

@@ -68,10 +74,13 @@ def with_retry(config: RetryConfig | None, fn: Callable[[], T]) -> T:
6874
if config is None:
6975
return fn()
7076

77+
deadline = time.monotonic() + config.total_timeout if config.total_timeout is not None else None
7178
last_err: Exception | None = None
7279
backoff = config.initial_backoff
7380

7481
for attempt in range(config.max_attempts):
82+
if deadline is not None and time.monotonic() >= deadline:
83+
break
7584
try:
7685
return fn()
7786
except grpc.RpcError as e:
@@ -80,7 +89,13 @@ def with_retry(config: RetryConfig | None, fn: Callable[[], T]) -> T:
8089
raise
8190
last_err = e
8291
jitter = random.uniform(0.5, 1.5)
83-
time.sleep(backoff * jitter)
92+
sleep_time = backoff * jitter
93+
if deadline is not None:
94+
remaining = deadline - time.monotonic()
95+
if remaining <= 0:
96+
raise
97+
sleep_time = min(sleep_time, remaining)
98+
time.sleep(sleep_time)
8499
backoff = min(backoff * config.multiplier, config.max_backoff)
85100

86101
raise last_err # type: ignore[misc] # pragma: no cover
@@ -91,10 +106,13 @@ async def async_with_retry(config: RetryConfig | None, fn: Callable[[], Awaitabl
91106
if config is None:
92107
return await fn()
93108

109+
deadline = time.monotonic() + config.total_timeout if config.total_timeout is not None else None
94110
last_err: Exception | None = None
95111
backoff = config.initial_backoff
96112

97113
for attempt in range(config.max_attempts):
114+
if deadline is not None and time.monotonic() >= deadline:
115+
break
98116
try:
99117
return await fn()
100118
except grpc.aio.AioRpcError as e:
@@ -103,7 +121,13 @@ async def async_with_retry(config: RetryConfig | None, fn: Callable[[], Awaitabl
103121
raise
104122
last_err = e
105123
jitter = random.uniform(0.5, 1.5)
106-
await asyncio.sleep(backoff * jitter)
124+
sleep_time = backoff * jitter
125+
if deadline is not None:
126+
remaining = deadline - time.monotonic()
127+
if remaining <= 0:
128+
raise
129+
sleep_time = min(sleep_time, remaining)
130+
await asyncio.sleep(sleep_time)
107131
backoff = min(backoff * config.multiplier, config.max_backoff)
108132

109133
raise last_err # type: ignore[misc] # pragma: no cover

sdk/tests/test_retry.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,77 @@ async def fn() -> str:
138138
with patch("opendecree._retry.asyncio.sleep", new_callable=AsyncMock):
139139
with pytest.raises(grpc.aio.AioRpcError):
140140
await async_with_retry(RetryConfig(max_attempts=2), fn)
141+
142+
143+
# --- Deadline budget ---
144+
145+
146+
def test_deadline_clips_sleep():
147+
"""Sleep is clipped to remaining budget so total wall time stays bounded."""
148+
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
149+
fn = MagicMock(side_effect=[err, "ok"])
150+
slept: list[float] = []
151+
152+
with patch("opendecree._retry.time.sleep", side_effect=lambda s: slept.append(s)):
153+
# monotonic: [deadline=0.0, loop-top-0=0.0, remaining=0.05, loop-top-1=0.05]
154+
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.05]):
155+
result = with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)
156+
157+
assert result == "ok"
158+
assert slept[0] <= 0.05 + 1e-9 # clipped to remaining budget
159+
160+
161+
def test_deadline_exhausted_raises_immediately():
162+
"""When budget is gone after a failure, raises without sleeping."""
163+
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
164+
fn = MagicMock(side_effect=err)
165+
slept: list[float] = []
166+
167+
with patch("opendecree._retry.time.sleep", side_effect=lambda s: slept.append(s)):
168+
# monotonic calls: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.2 (over budget)]
169+
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.2]):
170+
with pytest.raises(grpc.RpcError):
171+
with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)
172+
173+
assert slept == []
174+
175+
176+
def test_deadline_already_passed_before_second_attempt():
177+
"""Loop-top deadline check stops further attempts once budget is exhausted."""
178+
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
179+
fn = MagicMock(side_effect=[err, "ok"])
180+
181+
with patch("opendecree._retry.time.sleep"):
182+
# monotonic: [deadline=0.0, loop-top-0=0.0, remaining=0.05 (ok), loop-top-1=0.2 (over)]
183+
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.2]):
184+
with pytest.raises(grpc.RpcError):
185+
with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)
186+
187+
assert fn.call_count == 1
188+
189+
190+
def test_write_safe_config_preserves_total_timeout():
191+
base = RetryConfig(total_timeout=30.0)
192+
safe = write_safe_config(base)
193+
assert safe is not None
194+
assert safe.total_timeout == 30.0
195+
196+
197+
@pytest.mark.asyncio
198+
async def test_async_deadline_exhausted_raises_immediately():
199+
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
200+
slept: list[float] = []
201+
202+
async def fn() -> str:
203+
raise err
204+
205+
async def fake_sleep(s: float) -> None:
206+
slept.append(s)
207+
208+
with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep):
209+
# monotonic: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.2 (over budget)]
210+
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.2]):
211+
with pytest.raises(grpc.aio.AioRpcError):
212+
await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)
213+
214+
assert slept == []

0 commit comments

Comments
 (0)