Skip to content

Commit 8f0b997

Browse files
feat: add rate limit logging to HttpClient and WaitTimeFromHeaderBackoffStrategy
Co-Authored-By: alexandre@airbyte.io <alexandre@airbyte.io>
1 parent 57c70ba commit 8f0b997

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
import logging
56
import re
67
from dataclasses import InitVar, dataclass
78
from typing import Any, Mapping, Optional, Union
@@ -19,6 +20,8 @@
1920
from airbyte_cdk.sources.types import Config
2021
from airbyte_cdk.utils import AirbyteTracedException
2122

23+
logger = logging.getLogger("airbyte")
24+
2225

2326
@dataclass
2427
class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy):
@@ -57,6 +60,13 @@ def backoff_time(
5760
header_value = None
5861
if isinstance(response_or_exception, requests.Response):
5962
header_value = get_numeric_value_from_header(response_or_exception, header, regex)
63+
if header_value is not None:
64+
logger.info(
65+
"Rate limit header '%s' detected with value: %s (status code: %d)",
66+
header,
67+
header_value,
68+
response_or_exception.status_code,
69+
)
6070
if (
6171
self.max_waiting_time_in_seconds
6272
and header_value

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,12 @@ def _handle_error_resolution(
440440

441441
# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
442442
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
443+
self._logger.info(
444+
"Rate limited: emitting RATE_LIMITED stream status for stream '%s' (status code: %s, url: %s)",
445+
self._name,
446+
response.status_code if response is not None else "N/A",
447+
request.url,
448+
)
443449
# TODO: Update to handle with message repository when concurrent message repository is ready
444450
reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
445451
message = orjson.dumps(
@@ -524,6 +530,15 @@ def _handle_error_resolution(
524530
if backoff_time:
525531
user_defined_backoff_time = backoff_time
526532
break
533+
if user_defined_backoff_time is not None:
534+
self._logger.info(
535+
"Rate limit backoff: waiting %.2f seconds before retry (attempt %d, status code: %s, action: %s, url: %s)",
536+
user_defined_backoff_time,
537+
self._request_attempt_count[request],
538+
response.status_code if response is not None else "N/A",
539+
error_resolution.response_action.value,
540+
request.url,
541+
)
527542
error_message = (
528543
error_resolution.error_message
529544
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."

0 commit comments

Comments
 (0)