Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions airbyte_cdk/sources/streams/http/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,25 @@ class DefaultBackoffException(BaseBackoffException):

class RateLimitBackoffException(BaseBackoffException):
pass


class RetryRequestException(BaseBackoffException):
"""Unified retry signal raised by HttpClient when a request should be retried."""

def __init__(
self,
request: requests.PreparedRequest,
response: Optional[Union[requests.Response, Exception]],
error_message: str = "",
failure_type: Optional[FailureType] = None,
backoff_time: Optional[float] = None,
retry_endlessly: bool = False,
):
self.backoff_time = backoff_time
self.retry_endlessly = retry_endlessly
super().__init__(
request=request,
response=response,
error_message=error_message,
failure_type=failure_type,
)
130 changes: 65 additions & 65 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os
import time
import urllib
from pathlib import Path
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
Expand Down Expand Up @@ -36,20 +37,12 @@
ResponseAction,
)
from airbyte_cdk.sources.streams.http.exceptions import (
BaseBackoffException,
DefaultBackoffException,
RateLimitBackoffException,
RequestBodyException,
UserDefinedBackoffException,
RetryRequestException,
)
from airbyte_cdk.sources.streams.http.pagination_reset_exception import (
PaginationResetRequiredException,
)
from airbyte_cdk.sources.streams.http.rate_limiting import (
http_client_default_backoff_handler,
rate_limit_default_backoff_handler,
user_defined_backoff_handler,
)
from airbyte_cdk.sources.utils.types import JsonType
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
Expand Down Expand Up @@ -258,54 +251,76 @@ def _max_time(self) -> int:
else self._DEFAULT_MAX_TIME
)

def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float:
"""Compute the backoff duration in seconds for a retry attempt.

If the exception carries a user-defined `backoff_time`, that value plus
one second is returned (preserving the legacy +1 s behaviour). Otherwise
an exponential back-off with base 2 and no jitter is used:
``2 ** attempt`` seconds.
"""
if exc.backoff_time is not None:
return exc.backoff_time + 1 # extra second to cover fractions
return float(2**attempt)

def _send_with_retry(
self,
request: requests.PreparedRequest,
request_kwargs: Mapping[str, Any],
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
exit_on_rate_limit: Optional[bool] = False,
) -> requests.Response:
"""
Sends a request with retry logic.

Args:
request (requests.PreparedRequest): The prepared HTTP request to send.
request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
"""Send a request with an explicit retry loop.

Returns:
requests.Response: The HTTP response received from the server after retries.
Replaces the previous three-layer ``backoff`` decorator chain with a
single ``while True`` loop that catches `RetryRequestException`,
computes the appropriate back-off, and sleeps before retrying.
"""

max_retries = self._max_retries
max_tries = max(0, max_retries) + 1
max_tries = max(0, self._max_retries) + 1
max_time = self._max_time
attempt = 0
start_time = time.monotonic()

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
self._send
)
rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
backoff_handler = http_client_default_backoff_handler(
max_tries=max_tries, max_time=max_time
)
# backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
try:
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise AirbyteTracedException(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
failure_type=e.failure_type or FailureType.system_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
)
while True:
try:
return self._send(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
)
except RetryRequestException as exc:
attempt += 1
elapsed = time.monotonic() - start_time

# Determine whether we have exhausted retries.
budget_exhausted = False
if attempt >= max_tries:
budget_exhausted = True
elif elapsed >= max_time:
budget_exhausted = True

if budget_exhausted:
self._logger.error("Retries exhausted with backoff exception.", exc_info=True)
raise AirbyteTracedException(
internal_message=f"Exhausted available request attempts. Exception: {exc}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {exc}",
failure_type=exc.failure_type or FailureType.system_error,
exception=exc,
stream_descriptor=StreamDescriptor(name=self._name),
)

backoff_seconds = self._compute_backoff(exc, attempt)

if exc.response is not None and isinstance(exc.response, requests.Response):
self._logger.info(
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
)
self._logger.info(
f"Caught retryable error '{exc!s}' after {attempt} tries. "
f"Waiting {backoff_seconds} seconds then retrying..."
)
time.sleep(backoff_seconds)

def _send(
self,
Expand Down Expand Up @@ -503,7 +518,7 @@ def _handle_error_resolution(
ResponseAction.RATE_LIMITED,
ResponseAction.REFRESH_TOKEN_THEN_RETRY,
):
user_defined_backoff_time = None
user_defined_backoff_time: Optional[float] = None
for backoff_strategy in self._backoff_strategies:
backoff_time = backoff_strategy.backoff_time(
response_or_exception=response if response is not None else exc,
Expand All @@ -522,28 +537,13 @@ def _handle_error_resolution(
and not exit_on_rate_limit
)

if user_defined_backoff_time:
raise UserDefinedBackoffException(
backoff=user_defined_backoff_time,
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
)

elif retry_endlessly:
raise RateLimitBackoffException(
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
)

raise DefaultBackoffException(
raise RetryRequestException(
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
backoff_time=user_defined_backoff_time,
retry_endlessly=retry_endlessly,
)

elif response:
Expand Down
102 changes: 1 addition & 101 deletions airbyte_cdk/sources/streams/http/rate_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@

import logging
import sys
import time
from typing import Any, Callable, Mapping, Optional

import backoff
from requests import PreparedRequest, RequestException, Response, codes, exceptions

from .exceptions import (
DefaultBackoffException,
RateLimitBackoffException,
UserDefinedBackoffException,
)
from .exceptions import DefaultBackoffException

TRANSIENT_EXCEPTIONS = (
DefaultBackoffException,
Expand Down Expand Up @@ -69,98 +64,3 @@ def should_give_up(exc: Exception) -> bool:
factor=factor,
**kwargs,
)


def http_client_default_backoff_handler(
max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def log_retry_attempt(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, RequestException) and exc.response:
logger.info(
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
)
logger.info(
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
)

def should_give_up(exc: Exception) -> bool:
# If made it here, the ResponseAction was RETRY and therefore should not give up
return False

return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
backoff.expo,
TRANSIENT_EXCEPTIONS,
jitter=None,
on_backoff=log_retry_attempt,
giveup=should_give_up,
max_tries=max_tries,
max_time=max_time,
**kwargs,
)


def user_defined_backoff_handler(
max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def sleep_on_ratelimit(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, UserDefinedBackoffException):
retry_after = exc.backoff
sleep_time = retry_after + 1 # extra second to cover any fractions of second
if exc.response is not None:
logger.info(
f"UserDefinedBackoffException: Rate limit exceeded (HTTP {exc.response.status_code}). Retrying in {sleep_time} seconds."
)
else:
logger.info(
f"UserDefinedBackoffException: Rate limit exceeded. Retrying in {sleep_time} seconds."
)
time.sleep(sleep_time)

def log_give_up(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, RequestException):
logger.error(
f"Max retry limit reached after {details['elapsed']:.1f}s. Request: {exc.request}, Response: {exc.response}"
)
else:
logger.error("Max retry limit reached for unknown request and response")

# Suppress the backoff library's default log that misleadingly reports interval (0s) instead of actual sleep time
kwargs.pop("logger", None)

return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
backoff.constant,
UserDefinedBackoffException,
interval=0, # skip waiting, we'll wait in on_backoff handler
on_backoff=sleep_on_ratelimit,
on_giveup=log_give_up,
jitter=None,
max_tries=max_tries,
max_time=max_time,
logger=None,
**kwargs,
)


def rate_limit_default_backoff_handler(
**kwargs: Any,
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def log_retry_attempt(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, RequestException) and exc.response:
logger.info(
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
)
logger.info(
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
)

return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
backoff.expo,
RateLimitBackoffException,
jitter=None,
on_backoff=log_retry_attempt,
**kwargs,
)
Loading
Loading