-
Notifications
You must be signed in to change notification settings - Fork 749
Expand file tree
/
Copy pathutils.py
More file actions
120 lines (102 loc) · 3.9 KB
/
utils.py
File metadata and controls
120 lines (102 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from __future__ import annotations
import asyncio
import inspect
import logging
import sys
import time
from typing import TYPE_CHECKING, TypeVar, cast, overload
import pytest
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
logger = logging.getLogger(__name__)
T = TypeVar('T')
run_alone_on_mac = pytest.mark.run_alone if sys.platform == 'darwin' else lambda x: x
async def _maybe_await(value: Awaitable[T] | T) -> T:
"""Await `value` if it is awaitable, otherwise return it unchanged.
Lets `call_with_exp_backoff` and `poll_until_condition` accept both sync and async callables.
"""
if inspect.isawaitable(value):
return await cast('Awaitable[T]', value)
return cast('T', value)
@overload
async def call_with_exp_backoff(
fn: Callable[[], Awaitable[T]],
condition: Callable[[T], bool] = ...,
*,
max_retries: int = ...,
base_delay: float = ...,
) -> T: ...
@overload
async def call_with_exp_backoff(
fn: Callable[[], T],
condition: Callable[[T], bool] = ...,
*,
max_retries: int = ...,
base_delay: float = ...,
) -> T: ...
async def call_with_exp_backoff(
fn: Callable[[], Awaitable[T] | T],
condition: Callable[[T], bool] = bool,
*,
max_retries: int = 5,
base_delay: float = 1.0,
) -> T:
"""Call `fn`, retrying with exponential backoff until `condition(result)` is True.
Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to
`max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is
returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion.
This is useful for eventually-consistent state where the expected value may take a moment to appear. The
default condition checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once without retries.
Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant.
"""
result = await _maybe_await(fn())
for attempt in range(max_retries):
if condition(result):
return result
delay = base_delay * 2**attempt
logger.info(
'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries
)
await asyncio.sleep(delay)
result = await _maybe_await(fn())
return result
@overload
async def poll_until_condition(
fn: Callable[[], Awaitable[T]],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
) -> T: ...
@overload
async def poll_until_condition(
fn: Callable[[], T],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
) -> T: ...
async def poll_until_condition(
fn: Callable[[], Awaitable[T] | T],
condition: Callable[[T], bool] = bool,
*,
timeout: float = 5,
poll_interval: float = 0.05,
) -> T:
"""Poll `fn` until `condition(result)` is True or the timeout expires.
Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed.
Returns the last polled result regardless of whether the condition was met, so the caller can run its own
assertion. The default condition checks for a truthy result.
Use this instead of a fixed `asyncio.sleep` when waiting for some state to settle (e.g. autoscaling
concurrency) that may take a variable amount of time. Unlike `call_with_exp_backoff`, the interval between
polls stays constant.
"""
deadline = time.monotonic() + timeout
result = await _maybe_await(fn())
while not condition(result):
remaining = deadline - time.monotonic()
if remaining <= 0:
break
await asyncio.sleep(min(poll_interval, remaining))
result = await _maybe_await(fn())
return result