Skip to content

Commit ed42701

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
refactor(http-client): replace nested backoff decorators with explicit retry loop
- Add RetryRequestException to exceptions.py extending BaseBackoffException - Rewrite _send_with_retry as explicit while-loop with _compute_backoff helper - Simplify _handle_error_resolution to raise single RetryRequestException - Delete 3 internal backoff handlers from rate_limiting.py (keep default_backoff_handler) - Remove unused imports from http_client.py - Update test_http_client.py to use RetryRequestException - Preserve backward compatibility: all old exception classes remain importable Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 4aaafcf commit ed42701

4 files changed

Lines changed: 108 additions & 188 deletions

File tree

airbyte_cdk/sources/streams/http/exceptions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,25 @@ class DefaultBackoffException(BaseBackoffException):
6969

7070
class RateLimitBackoffException(BaseBackoffException):
7171
pass
72+
73+
74+
class RetryRequestException(BaseBackoffException):
75+
"""Unified retry signal raised by HttpClient when a request should be retried."""
76+
77+
def __init__(
78+
self,
79+
request: requests.PreparedRequest,
80+
response: Optional[Union[requests.Response, Exception]],
81+
error_message: str = "",
82+
failure_type: Optional[FailureType] = None,
83+
backoff_time: Optional[float] = None,
84+
retry_endlessly: bool = False,
85+
):
86+
self.backoff_time = backoff_time
87+
self.retry_endlessly = retry_endlessly
88+
super().__init__(
89+
request=request,
90+
response=response,
91+
error_message=error_message,
92+
failure_type=failure_type,
93+
)

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 65 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66
import os
7+
import time
78
import urllib
89
from pathlib import Path
910
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
@@ -36,20 +37,12 @@
3637
ResponseAction,
3738
)
3839
from airbyte_cdk.sources.streams.http.exceptions import (
39-
BaseBackoffException,
40-
DefaultBackoffException,
41-
RateLimitBackoffException,
4240
RequestBodyException,
43-
UserDefinedBackoffException,
41+
RetryRequestException,
4442
)
4543
from airbyte_cdk.sources.streams.http.pagination_reset_exception import (
4644
PaginationResetRequiredException,
4745
)
48-
from airbyte_cdk.sources.streams.http.rate_limiting import (
49-
http_client_default_backoff_handler,
50-
rate_limit_default_backoff_handler,
51-
user_defined_backoff_handler,
52-
)
5346
from airbyte_cdk.sources.utils.types import JsonType
5447
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
5548
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
@@ -258,54 +251,76 @@ def _max_time(self) -> int:
258251
else self._DEFAULT_MAX_TIME
259252
)
260253

254+
def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float:
255+
"""Compute the backoff duration in seconds for a retry attempt.
256+
257+
If the exception carries a user-defined `backoff_time`, that value plus
258+
one second is returned (preserving the legacy +1 s behaviour). Otherwise
259+
an exponential back-off with base 2 and no jitter is used:
260+
``2 ** attempt`` seconds.
261+
"""
262+
if exc.backoff_time is not None:
263+
return exc.backoff_time + 1 # extra second to cover fractions
264+
return float(2**attempt)
265+
261266
def _send_with_retry(
262267
self,
263268
request: requests.PreparedRequest,
264269
request_kwargs: Mapping[str, Any],
265270
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
266271
exit_on_rate_limit: Optional[bool] = False,
267272
) -> requests.Response:
268-
"""
269-
Sends a request with retry logic.
270-
271-
Args:
272-
request (requests.PreparedRequest): The prepared HTTP request to send.
273-
request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
273+
"""Send a request with an explicit retry loop.
274274
275-
Returns:
276-
requests.Response: The HTTP response received from the server after retries.
275+
Replaces the previous three-layer ``backoff`` decorator chain with a
276+
single ``while True`` loop that catches `RetryRequestException`,
277+
computes the appropriate back-off, and sleeps before retrying.
277278
"""
278-
279-
max_retries = self._max_retries
280-
max_tries = max(0, max_retries) + 1
279+
max_tries = max(0, self._max_retries) + 1
281280
max_time = self._max_time
281+
attempt = 0
282+
start_time = time.monotonic()
282283

283-
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
284-
self._send
285-
)
286-
rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
287-
backoff_handler = http_client_default_backoff_handler(
288-
max_tries=max_tries, max_time=max_time
289-
)
290-
# backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
291-
try:
292-
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
293-
request,
294-
request_kwargs,
295-
log_formatter=log_formatter,
296-
exit_on_rate_limit=exit_on_rate_limit,
297-
) # type: ignore # mypy can't infer that backoff_handler wraps _send
298-
299-
return response
300-
except BaseBackoffException as e:
301-
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
302-
raise AirbyteTracedException(
303-
internal_message=f"Exhausted available request attempts. Exception: {e}",
304-
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
305-
failure_type=e.failure_type or FailureType.system_error,
306-
exception=e,
307-
stream_descriptor=StreamDescriptor(name=self._name),
308-
)
284+
while True:
285+
try:
286+
return self._send(
287+
request,
288+
request_kwargs,
289+
log_formatter=log_formatter,
290+
exit_on_rate_limit=exit_on_rate_limit,
291+
)
292+
except RetryRequestException as exc:
293+
attempt += 1
294+
elapsed = time.monotonic() - start_time
295+
296+
# Determine whether we have exhausted retries.
297+
budget_exhausted = False
298+
if attempt >= max_tries:
299+
budget_exhausted = True
300+
elif elapsed >= max_time:
301+
budget_exhausted = True
302+
303+
if budget_exhausted:
304+
self._logger.error("Retries exhausted with backoff exception.", exc_info=True)
305+
raise AirbyteTracedException(
306+
internal_message=f"Exhausted available request attempts. Exception: {exc}",
307+
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {exc}",
308+
failure_type=exc.failure_type or FailureType.system_error,
309+
exception=exc,
310+
stream_descriptor=StreamDescriptor(name=self._name),
311+
)
312+
313+
backoff_seconds = self._compute_backoff(exc, attempt)
314+
315+
if exc.response is not None and isinstance(exc.response, requests.Response):
316+
self._logger.info(
317+
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
318+
)
319+
self._logger.info(
320+
f"Caught retryable error '{exc!s}' after {attempt} tries. "
321+
f"Waiting {backoff_seconds} seconds then retrying..."
322+
)
323+
time.sleep(backoff_seconds)
309324

310325
def _send(
311326
self,
@@ -503,7 +518,7 @@ def _handle_error_resolution(
503518
ResponseAction.RATE_LIMITED,
504519
ResponseAction.REFRESH_TOKEN_THEN_RETRY,
505520
):
506-
user_defined_backoff_time = None
521+
user_defined_backoff_time: Optional[float] = None
507522
for backoff_strategy in self._backoff_strategies:
508523
backoff_time = backoff_strategy.backoff_time(
509524
response_or_exception=response if response is not None else exc,
@@ -522,28 +537,13 @@ def _handle_error_resolution(
522537
and not exit_on_rate_limit
523538
)
524539

525-
if user_defined_backoff_time:
526-
raise UserDefinedBackoffException(
527-
backoff=user_defined_backoff_time,
528-
request=request,
529-
response=(response if response is not None else exc),
530-
error_message=error_message,
531-
failure_type=error_resolution.failure_type,
532-
)
533-
534-
elif retry_endlessly:
535-
raise RateLimitBackoffException(
536-
request=request,
537-
response=(response if response is not None else exc),
538-
error_message=error_message,
539-
failure_type=error_resolution.failure_type,
540-
)
541-
542-
raise DefaultBackoffException(
540+
raise RetryRequestException(
543541
request=request,
544542
response=(response if response is not None else exc),
545543
error_message=error_message,
546544
failure_type=error_resolution.failure_type,
545+
backoff_time=user_defined_backoff_time,
546+
retry_endlessly=retry_endlessly,
547547
)
548548

549549
elif response:

airbyte_cdk/sources/streams/http/rate_limiting.py

Lines changed: 1 addition & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,12 @@
44

55
import logging
66
import sys
7-
import time
87
from typing import Any, Callable, Mapping, Optional
98

109
import backoff
1110
from requests import PreparedRequest, RequestException, Response, codes, exceptions
1211

13-
from .exceptions import (
14-
DefaultBackoffException,
15-
RateLimitBackoffException,
16-
UserDefinedBackoffException,
17-
)
12+
from .exceptions import DefaultBackoffException
1813

1914
TRANSIENT_EXCEPTIONS = (
2015
DefaultBackoffException,
@@ -69,98 +64,3 @@ def should_give_up(exc: Exception) -> bool:
6964
factor=factor,
7065
**kwargs,
7166
)
72-
73-
74-
def http_client_default_backoff_handler(
75-
max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any
76-
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
77-
def log_retry_attempt(details: Mapping[str, Any]) -> None:
78-
_, exc, _ = sys.exc_info()
79-
if isinstance(exc, RequestException) and exc.response:
80-
logger.info(
81-
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
82-
)
83-
logger.info(
84-
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
85-
)
86-
87-
def should_give_up(exc: Exception) -> bool:
88-
# If made it here, the ResponseAction was RETRY and therefore should not give up
89-
return False
90-
91-
return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
92-
backoff.expo,
93-
TRANSIENT_EXCEPTIONS,
94-
jitter=None,
95-
on_backoff=log_retry_attempt,
96-
giveup=should_give_up,
97-
max_tries=max_tries,
98-
max_time=max_time,
99-
**kwargs,
100-
)
101-
102-
103-
def user_defined_backoff_handler(
104-
max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any
105-
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
106-
def sleep_on_ratelimit(details: Mapping[str, Any]) -> None:
107-
_, exc, _ = sys.exc_info()
108-
if isinstance(exc, UserDefinedBackoffException):
109-
retry_after = exc.backoff
110-
sleep_time = retry_after + 1 # extra second to cover any fractions of second
111-
if exc.response is not None:
112-
logger.info(
113-
f"UserDefinedBackoffException: Rate limit exceeded (HTTP {exc.response.status_code}). Retrying in {sleep_time} seconds."
114-
)
115-
else:
116-
logger.info(
117-
f"UserDefinedBackoffException: Rate limit exceeded. Retrying in {sleep_time} seconds."
118-
)
119-
time.sleep(sleep_time)
120-
121-
def log_give_up(details: Mapping[str, Any]) -> None:
122-
_, exc, _ = sys.exc_info()
123-
if isinstance(exc, RequestException):
124-
logger.error(
125-
f"Max retry limit reached after {details['elapsed']:.1f}s. Request: {exc.request}, Response: {exc.response}"
126-
)
127-
else:
128-
logger.error("Max retry limit reached for unknown request and response")
129-
130-
# Suppress the backoff library's default log that misleadingly reports interval (0s) instead of actual sleep time
131-
kwargs.pop("logger", None)
132-
133-
return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
134-
backoff.constant,
135-
UserDefinedBackoffException,
136-
interval=0, # skip waiting, we'll wait in on_backoff handler
137-
on_backoff=sleep_on_ratelimit,
138-
on_giveup=log_give_up,
139-
jitter=None,
140-
max_tries=max_tries,
141-
max_time=max_time,
142-
logger=None,
143-
**kwargs,
144-
)
145-
146-
147-
def rate_limit_default_backoff_handler(
148-
**kwargs: Any,
149-
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
150-
def log_retry_attempt(details: Mapping[str, Any]) -> None:
151-
_, exc, _ = sys.exc_info()
152-
if isinstance(exc, RequestException) and exc.response:
153-
logger.info(
154-
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
155-
)
156-
logger.info(
157-
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
158-
)
159-
160-
return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
161-
backoff.expo,
162-
RateLimitBackoffException,
163-
jitter=None,
164-
on_backoff=log_retry_attempt,
165-
**kwargs,
166-
)

0 commit comments

Comments
 (0)