Skip to content

Commit 13c6f47

Browse files
fix(cdk): improve rate limiting error messages to suggest reducing concurrency or workers
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
1 parent 3e65ad5 commit 13c6f47

5 files changed

Lines changed: 17 additions & 7 deletions

File tree

airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
429: ErrorResolution(
6262
response_action=ResponseAction.RATE_LIMITED,
6363
failure_type=FailureType.transient_error,
64-
error_message="HTTP Status Code: 429. Error: Too many requests.",
64+
error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.",
6565
),
6666
500: ErrorResolution(
6767
response_action=ResponseAction.RETRY,

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,19 @@ def _send_with_retry(
293293

294294
return response
295295
except BaseBackoffException as e:
296-
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
296+
if isinstance(e, RateLimitBackoffException):
297+
self._logger.error("Rate limit retries exhausted.", exc_info=True)
298+
raise AirbyteTracedException(
299+
internal_message=f"Rate limit retries exhausted. Exception: {e}",
300+
message="Rate limit exceeded and retries exhausted. Try decreasing concurrency or the number of workers to stay within API rate limits.",
301+
failure_type=e.failure_type or FailureType.transient_error,
302+
exception=e,
303+
stream_descriptor=StreamDescriptor(name=self._name),
304+
)
305+
self._logger.error("Retries exhausted with backoff exception.", exc_info=True)
297306
raise AirbyteTracedException(
298307
internal_message=f"Exhausted available request attempts. Exception: {e}",
299-
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
308+
message=f"Exhausted available request attempts. Exception: {e}",
300309
failure_type=e.failure_type or FailureType.system_error,
301310
exception=e,
302311
stream_descriptor=StreamDescriptor(name=self._name),

airbyte_cdk/sources/streams/http/rate_limiting.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None:
146146
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
147147
)
148148
logger.info(
149-
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
149+
f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. "
150+
f"Try decreasing concurrency or the number of workers to stay within API rate limits."
150151
)
151152

152153
return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function

unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
ErrorResolution(
6060
response_action=ResponseAction.RETRY,
6161
failure_type=FailureType.transient_error,
62-
error_message="HTTP Status Code: 429. Error: Too many requests.",
62+
error_message="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.",
6363
),
6464
id="test_http_code_matches_retry_action",
6565
),

unit_tests/sources/streams/http/test_http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def get_error_handler(self) -> Optional[ErrorHandler]:
222222
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)
223223

224224
with pytest.raises(
225-
AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests."
225+
AirbyteTracedException, match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits."
226226
):
227227
list(stream.read_records(SyncMode.full_refresh))
228228
if retries <= 0:
@@ -316,7 +316,7 @@ def test_raise_on_http_errors_off_429(mocker):
316316
mocker.patch.object(requests.Session, "send", return_value=req)
317317
with pytest.raises(
318318
AirbyteTracedException,
319-
match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.",
319+
match="Rate limit exceeded. Try decreasing concurrency or the number of workers to stay within API rate limits.",
320320
):
321321
stream.exit_on_rate_limit = True
322322
list(stream.read_records(SyncMode.full_refresh))

0 commit comments

Comments
 (0)