Skip to content

Commit 3191bd1

Browse files
sophiecuiyclaude
andcommitted
fix: correct backoff timing, wire retry_endlessly, add backoff cap
- Fix exponential backoff sequence: 2^(attempt-1) to match old backoff.expo (1, 2, 4, 8s) instead of 2^attempt (2, 4, 8, 16s) - Wire retry_endlessly into the retry loop so rate-limited requests without exit_on_rate_limit genuinely retry past the budget - Guard retry_endlessly with user_defined_backoff_time is None to preserve the old mutually-exclusive branching (custom backoff always bounded, endless only for rate limits without a strategy) - Add _MAX_BACKOFF_SECONDS (300s) cap on exponential backoff - Split test_backoff_strategy_endless into two tests that verify bounded vs endless behavior independently Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ed42701 commit 3191bd1

2 files changed

Lines changed: 58 additions & 23 deletions

File tree

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,17 +251,20 @@ def _max_time(self) -> int:
251251
else self._DEFAULT_MAX_TIME
252252
)
253253

254+
_MAX_BACKOFF_SECONDS: float = 300 # 5-minute ceiling for exponential backoff
255+
254256
def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float:
255257
"""Compute the backoff duration in seconds for a retry attempt.
256258
257-
If the exception carries a user-defined `backoff_time`, that value plus
259+
If the exception carries a user-defined ``backoff_time``, that value plus
258260
one second is returned (preserving the legacy +1 s behaviour). Otherwise
259-
an exponential back-off with base 2 and no jitter is used:
260-
``2 ** attempt`` seconds.
261+
an exponential back-off of ``2 ** (attempt - 1)`` seconds is used (matching
262+
the previous ``backoff.expo`` with base=2, factor=1), capped at
263+
``_MAX_BACKOFF_SECONDS``.
261264
"""
262265
if exc.backoff_time is not None:
263266
return exc.backoff_time + 1 # extra second to cover fractions
264-
return float(2**attempt)
267+
return min(float(2 ** (attempt - 1)), self._MAX_BACKOFF_SECONDS)
265268

266269
def _send_with_retry(
267270
self,
@@ -293,12 +296,11 @@ def _send_with_retry(
293296
attempt += 1
294297
elapsed = time.monotonic() - start_time
295298

296-
# Determine whether we have exhausted retries.
297-
budget_exhausted = False
298-
if attempt >= max_tries:
299-
budget_exhausted = True
300-
elif elapsed >= max_time:
301-
budget_exhausted = True
299+
# Rate-limited requests retry indefinitely unless exit_on_rate_limit was set.
300+
# All other retryable errors are bounded by max_tries / max_time.
301+
budget_exhausted = not exc.retry_endlessly and (
302+
attempt >= max_tries or elapsed >= max_time
303+
)
302304

303305
if budget_exhausted:
304306
self._logger.error("Retries exhausted with backoff exception.", exc_info=True)
@@ -532,9 +534,13 @@ def _handle_error_resolution(
532534
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
533535
)
534536

537+
# Only retry endlessly when rate-limited AND no custom backoff strategy matched.
538+
# When a strategy provides a specific backoff_time, retries are always bounded
539+
# by max_tries/max_time (matching the old mutually-exclusive branching).
535540
retry_endlessly = (
536541
error_resolution.response_action == ResponseAction.RATE_LIMITED
537542
and not exit_on_rate_limit
543+
and user_defined_backoff_time is None
538544
)
539545

540546
raise RetryRequestException(

unit_tests/sources/streams/http/test_http_client.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -689,14 +689,9 @@ def backoff_time(self, *args, **kwargs):
689689
assert len(trace_messages) == mocked_send.call_count
690690

691691

692-
@pytest.mark.parametrize(
693-
"exit_on_rate_limit, expected_call_count, expected_error",
694-
[[True, 6, RetryRequestException], [False, 6, RetryRequestException]],
695-
)
696692
@pytest.mark.usefixtures("mock_sleep")
697-
def test_backoff_strategy_endless(
698-
exit_on_rate_limit: bool, expected_call_count: int, expected_error: Exception
699-
):
693+
def test_backoff_strategy_rate_limited_with_exit_on_rate_limit():
694+
"""When exit_on_rate_limit=True, 429 responses exhaust max_tries then raise."""
700695
http_client = HttpClient(
701696
name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock())
702697
)
@@ -705,18 +700,47 @@ def test_backoff_strategy_endless(
705700
mocked_response.status_code = 429
706701
mocked_response.headers = {}
707702
mocked_response.ok = False
708-
session_send = MagicMock(spec=requests.Session.send)
709-
session_send.return_value = mocked_response
710703

711704
with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send:
712-
with pytest.raises(AirbyteTracedException) as e:
705+
with pytest.raises(AirbyteTracedException):
713706
http_client.send_request(
714707
http_method="get",
715708
url="https://test_base_url.com/v1/endpoint",
716709
request_kwargs={},
717-
exit_on_rate_limit=exit_on_rate_limit,
710+
exit_on_rate_limit=True,
718711
)
719-
assert mocked_send.call_count == expected_call_count
712+
assert mocked_send.call_count == 6 # 1 initial + 5 retries
713+
714+
715+
@pytest.mark.usefixtures("mock_sleep")
716+
def test_backoff_strategy_rate_limited_retries_endlessly():
717+
"""When exit_on_rate_limit=False, 429 responses retry past max_tries until success."""
718+
http_client = HttpClient(
719+
name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock())
720+
)
721+
722+
rate_limited_response = MagicMock(spec=requests.Response)
723+
rate_limited_response.status_code = 429
724+
rate_limited_response.headers = {}
725+
rate_limited_response.ok = False
726+
727+
success_response = MagicMock(spec=requests.Response)
728+
success_response.status_code = 200
729+
success_response.headers = {}
730+
success_response.ok = True
731+
732+
# Fail 10 times (well past max_tries=6), then succeed
733+
side_effects = [rate_limited_response] * 10 + [success_response]
734+
735+
with patch.object(requests.Session, "send", side_effect=side_effects) as mocked_send:
736+
_, response = http_client.send_request(
737+
http_method="get",
738+
url="https://test_base_url.com/v1/endpoint",
739+
request_kwargs={},
740+
exit_on_rate_limit=False,
741+
)
742+
assert response.status_code == 200
743+
assert mocked_send.call_count == 11 # 10 rate-limited + 1 success
720744

721745

722746
def test_given_different_headers_then_response_is_not_cached(requests_mock):
@@ -834,7 +858,12 @@ def backoff_time(self, response_or_exception, attempt_count):
834858
)
835859

836860
with pytest.raises(AirbyteTracedException) as e:
837-
http_client.send_request(http_method="get", url="https://airbyte.io/", request_kwargs={})
861+
http_client.send_request(
862+
http_method="get",
863+
url="https://airbyte.io/",
864+
request_kwargs={},
865+
exit_on_rate_limit=True, # ensure rate-limited retries are bounded so the test terminates
866+
)
838867
assert e.value.failure_type == expected_failure_type
839868

840869

0 commit comments

Comments
 (0)