-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathretries.py
More file actions
227 lines (176 loc) · 6.61 KB
/
retries.py
File metadata and controls
227 lines (176 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT."""
import asyncio
import random
import time
from typing import List
import httpx
class BackoffStrategy:
initial_interval: int
max_interval: int
exponent: float
max_elapsed_time: int
def __init__(
self,
initial_interval: int,
max_interval: int,
exponent: float,
max_elapsed_time: int,
):
self.initial_interval = initial_interval
self.max_interval = max_interval
self.exponent = exponent
self.max_elapsed_time = max_elapsed_time
class RetryConfig:
strategy: str
backoff: BackoffStrategy
retry_connection_errors: bool
def __init__(
self, strategy: str, backoff: BackoffStrategy, retry_connection_errors: bool
):
self.strategy = strategy
self.backoff = backoff
self.retry_connection_errors = retry_connection_errors
class Retries:
config: RetryConfig
status_codes: List[str]
def __init__(self, config: RetryConfig, status_codes: List[str]):
self.config = config
self.status_codes = status_codes
class TemporaryError(Exception):
response: httpx.Response
def __init__(self, response: httpx.Response):
self.response = response
class PermanentError(Exception):
inner: Exception
def __init__(self, inner: Exception):
self.inner = inner
def retry(func, retries: Retries):
if retries.config.strategy == "backoff":
def do_request() -> httpx.Response:
res: httpx.Response
try:
res = func()
for code in retries.status_codes:
if "X" in code.upper():
code_range = int(code[0])
status_major = res.status_code / 100
if code_range <= status_major < code_range + 1:
raise TemporaryError(res)
else:
parsed_code = int(code)
if res.status_code == parsed_code:
raise TemporaryError(res)
except httpx.ConnectError as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except httpx.RemoteProtocolError as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except httpx.TimeoutException as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except TemporaryError:
raise
except Exception as exception:
raise PermanentError(exception) from exception
return res
return retry_with_backoff(
do_request,
retries.config.backoff.initial_interval,
retries.config.backoff.max_interval,
retries.config.backoff.exponent,
retries.config.backoff.max_elapsed_time,
)
return func()
async def retry_async(func, retries: Retries):
if retries.config.strategy == "backoff":
async def do_request() -> httpx.Response:
res: httpx.Response
try:
res = await func()
for code in retries.status_codes:
if "X" in code.upper():
code_range = int(code[0])
status_major = res.status_code / 100
if code_range <= status_major < code_range + 1:
raise TemporaryError(res)
else:
parsed_code = int(code)
if res.status_code == parsed_code:
raise TemporaryError(res)
except httpx.ConnectError as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except httpx.RemoteProtocolError as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except httpx.TimeoutException as exception:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except TemporaryError:
raise
except Exception as exception:
raise PermanentError(exception) from exception
return res
return await retry_with_backoff_async(
do_request,
retries.config.backoff.initial_interval,
retries.config.backoff.max_interval,
retries.config.backoff.exponent,
retries.config.backoff.max_elapsed_time,
)
return await func()
def retry_with_backoff(
func,
initial_interval=500,
max_interval=60000,
exponent=1.5,
max_elapsed_time=3600000,
):
start = round(time.time() * 1000)
retries = 0
while True:
try:
return func()
except PermanentError as exception:
raise exception.inner
except Exception as exception: # pylint: disable=broad-exception-caught
now = round(time.time() * 1000)
if now - start > max_elapsed_time:
if isinstance(exception, TemporaryError):
return exception.response
raise
sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1)
sleep = min(sleep, max_interval / 1000)
time.sleep(sleep)
retries += 1
async def retry_with_backoff_async(
func,
initial_interval=500,
max_interval=60000,
exponent=1.5,
max_elapsed_time=3600000,
):
start = round(time.time() * 1000)
retries = 0
while True:
try:
return await func()
except PermanentError as exception:
raise exception.inner
except Exception as exception: # pylint: disable=broad-exception-caught
now = round(time.time() * 1000)
if now - start > max_elapsed_time:
if isinstance(exception, TemporaryError):
return exception.response
raise
sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1)
sleep = min(sleep, max_interval / 1000)
await asyncio.sleep(sleep)
retries += 1