This repository was archived by the owner on Feb 23, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 98
Expand file tree
/
Copy pathretry_base.py
More file actions
370 lines (311 loc) · 12.6 KB
/
retry_base.py
File metadata and controls
370 lines (311 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared classes and functions for retrying requests.
:class:`_BaseRetry` is the base class for :class:`Retry`,
:class:`AsyncRetry`, :class:`StreamingRetry`, and :class:`AsyncStreamingRetry`.
"""
from __future__ import annotations
import logging
import random
import time
from enum import Enum
from typing import Any, Callable, Optional, Iterator, TYPE_CHECKING
import requests.exceptions
from google.api_core import exceptions
from google.auth import exceptions as auth_exceptions
if TYPE_CHECKING:
import sys
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
_DEFAULT_INITIAL_DELAY = 1.0 # seconds
_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
_DEFAULT_DELAY_MULTIPLIER = 2.0
_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
_LOGGER = logging.getLogger("google.api_core.retry")
def if_exception_type(
*exception_types: type[Exception],
) -> Callable[[Exception], bool]:
"""Creates a predicate to check if the exception is of a given type.
Args:
exception_types (Sequence[:func:`type`]): The exception types to check
for.
Returns:
Callable[Exception]: A predicate that returns True if the provided
exception is of the given type(s).
"""
def if_exception_type_predicate(exception: Exception) -> bool:
"""Bound predicate for checking an exception type."""
return isinstance(exception, exception_types)
return if_exception_type_predicate
# pylint: disable=invalid-name
# Pylint sees this as a constant, but it is also an alias that should be
# considered a function.
if_transient_error = if_exception_type(
exceptions.InternalServerError,
exceptions.TooManyRequests,
exceptions.ServiceUnavailable,
requests.exceptions.ConnectionError,
requests.exceptions.ChunkedEncodingError,
auth_exceptions.TransportError,
)
"""A predicate that checks if an exception is a transient API error.
The following server errors are considered transient:
- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC
``INTERNAL(13)`` and its subclasses.
- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429
- :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503
- :class:`requests.exceptions.ConnectionError`
- :class:`requests.exceptions.ChunkedEncodingError` - The server declared
chunked encoding but sent an invalid chunk.
- :class:`google.auth.exceptions.TransportError` - Used to indicate an
error occurred during an HTTP request.
"""
# pylint: enable=invalid-name
def exponential_sleep_generator(
initial: float, maximum: float, multiplier: float = _DEFAULT_DELAY_MULTIPLIER
):
"""Generates sleep intervals based on the exponential back-off algorithm.
This implements the `Truncated Exponential Back-off`_ algorithm.
.. _Truncated Exponential Back-off:
https://cloud.google.com/storage/docs/exponential-backoff
Args:
initial (float): The minimum amount of time to delay. This must
be greater than 0.
maximum (float): The maximum amount of time to delay.
multiplier (float): The multiplier applied to the delay.
Yields:
float: successive sleep intervals.
"""
max_delay = min(initial, maximum)
while True:
yield random.uniform(0.0, max_delay)
max_delay = min(max_delay * multiplier, maximum)
class RetryFailureReason(Enum):
"""
The cause of a failed retry, used when building exceptions
"""
TIMEOUT = 0
NON_RETRYABLE_ERROR = 1
def build_retry_error(
exc_list: list[Exception],
reason: RetryFailureReason,
timeout_val: float | None,
**kwargs: Any,
) -> tuple[Exception, Exception | None]:
"""
Default exception_factory implementation.
Returns a RetryError if the failure is due to a timeout, otherwise
returns the last exception encountered.
Args:
- exc_list: list of exceptions that occurred during the retry
- reason: reason for the retry failure.
Can be TIMEOUT or NON_RETRYABLE_ERROR
- timeout_val: the original timeout value for the retry (in seconds), for use in the exception message
Returns:
- tuple: a tuple of the exception to be raised, and the cause exception if any
"""
if reason == RetryFailureReason.TIMEOUT:
# return RetryError with the most recent exception as the cause
src_exc = exc_list[-1] if exc_list else None
timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
return (
exceptions.RetryError(
f"Timeout {timeout_val_str}exceeded",
src_exc,
),
src_exc,
)
elif exc_list:
# return most recent exception encountered
return exc_list[-1], None
else:
# no exceptions were given in exc_list. Raise generic RetryError
return exceptions.RetryError("Unknown error", None), None
def _retry_error_helper(
exc: Exception,
deadline: float | None,
sleep_iterator: Iterator[float],
error_list: list[Exception],
predicate_fn: Callable[[Exception], bool],
on_error_fn: Callable[[Exception], None] | None,
exc_factory_fn: Callable[
[list[Exception], RetryFailureReason, float | None],
tuple[Exception, Exception | None],
],
original_timeout: float | None,
) -> float:
"""
Shared logic for handling an error for all retry implementations
- Raises an error on timeout or non-retryable error
- Calls on_error_fn if provided
- Logs the error
Args:
- exc: the exception that was raised
- deadline: the deadline for the retry, calculated as a diff from time.monotonic()
- sleep_iterator: iterator to draw the next backoff value from
- error_list: the list of exceptions that have been raised so far
- predicate_fn: takes `exc` and returns true if the operation should be retried
- on_error_fn: callback to execute when a retryable error occurs
- exc_factory_fn: callback used to build the exception to be raised on terminal failure
- original_timeout_val: the original timeout value for the retry (in seconds),
to be passed to the exception factory for building an error message
Returns:
- the sleep value chosen before the next attempt
"""
error_list.append(exc)
if not predicate_fn(exc):
final_exc, source_exc = exc_factory_fn(
error_list,
RetryFailureReason.NON_RETRYABLE_ERROR,
original_timeout,
)
raise final_exc from source_exc
if on_error_fn is not None:
on_error_fn(exc)
# next_sleep is fetched after the on_error callback, to allow clients
# to update sleep_iterator values dynamically in response to errors
try:
next_sleep = next(sleep_iterator)
except StopIteration:
raise ValueError("Sleep generator stopped yielding sleep values.") from exc
if deadline is not None and time.monotonic() + next_sleep > deadline:
final_exc, source_exc = exc_factory_fn(
error_list,
RetryFailureReason.TIMEOUT,
original_timeout,
)
raise final_exc from source_exc
_LOGGER.debug(
"Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep)
)
return next_sleep
class _BaseRetry(object):
"""
Base class for retry configuration objects. This class is intended to capture retry
and backoff configuration that is common to both synchronous and asynchronous retries,
for both unary and streaming RPCs. It is not intended to be instantiated directly,
but rather to be subclassed by the various retry configuration classes.
"""
def __init__(
self,
predicate: Callable[[Exception], bool] = if_transient_error,
initial: float = _DEFAULT_INITIAL_DELAY,
maximum: float = _DEFAULT_MAXIMUM_DELAY,
multiplier: float = _DEFAULT_DELAY_MULTIPLIER,
timeout: Optional[float] = _DEFAULT_DEADLINE,
on_error: Optional[Callable[[Exception], Any]] = None,
**kwargs: Any,
) -> None:
self._predicate = predicate
self._initial = initial
self._multiplier = multiplier
self._maximum = maximum
self._timeout = kwargs.get("deadline", timeout)
self._deadline = self._timeout
self._on_error = on_error
def __call__(self, *args, **kwargs) -> Any:
raise NotImplementedError("Not implemented in base class")
@property
def deadline(self) -> float | None:
"""
DEPRECATED: use ``timeout`` instead. Refer to the ``Retry`` class
documentation for details.
"""
return self._timeout
@property
def timeout(self) -> float | None:
return self._timeout
def with_deadline(self, deadline: float | None) -> Self:
"""Return a copy of this retry with the given timeout.
DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class
documentation for details.
Args:
deadline (float|None): How long to keep retrying, in seconds. If None,
no timeout is enforced.
Returns:
Retry: A new retry instance with the given timeout.
"""
return self.with_timeout(deadline)
def with_timeout(self, timeout: float | None) -> Self:
"""Return a copy of this retry with the given timeout.
Args:
timeout (float): How long to keep retrying, in seconds. If None,
no timeout will be enforced.
Returns:
Retry: A new retry instance with the given timeout.
"""
return type(self)(
predicate=self._predicate,
initial=self._initial,
maximum=self._maximum,
multiplier=self._multiplier,
timeout=timeout,
on_error=self._on_error,
)
def with_predicate(self, predicate: Callable[[Exception], bool]) -> Self:
"""Return a copy of this retry with the given predicate.
Args:
predicate (Callable[Exception]): A callable that should return
``True`` if the given exception is retryable.
Returns:
Retry: A new retry instance with the given predicate.
"""
return type(self)(
predicate=predicate,
initial=self._initial,
maximum=self._maximum,
multiplier=self._multiplier,
timeout=self._timeout,
on_error=self._on_error,
)
def with_delay(
self,
initial: Optional[float] = None,
maximum: Optional[float] = None,
multiplier: Optional[float] = None,
) -> Self:
"""Return a copy of this retry with the given delay options.
Args:
initial (float): The minimum amount of time to delay (in seconds). This must
be greater than 0. If None, the current value is used.
maximum (float): The maximum amount of time to delay (in seconds). If None, the
current value is used.
multiplier (float): The multiplier applied to the delay. If None, the current
value is used.
Returns:
Retry: A new retry instance with the given delay options.
"""
return type(self)(
predicate=self._predicate,
initial=initial if initial is not None else self._initial,
maximum=maximum if maximum is not None else self._maximum,
multiplier=multiplier if multiplier is not None else self._multiplier,
timeout=self._timeout,
on_error=self._on_error,
)
def __str__(self) -> str:
return (
"<{} predicate={}, initial={:.1f}, maximum={:.1f}, "
"multiplier={:.1f}, timeout={}, on_error={}>".format(
type(self).__name__,
self._predicate,
self._initial,
self._maximum,
self._multiplier,
self._timeout, # timeout can be None, thus no {:.1f}
self._on_error,
)
)