Skip to content

Commit c1722c3

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: resolve FixedWindowCallRatePolicy deadlock on 429 without ratelimit-reset header
Four interacting bugs caused connectors using FixedWindowCallRatePolicy to deadlock when a 429 response arrived from an API that lacks a ratelimit-reset header: 1. next_reset_ts initialized 10 days in the future instead of now + period 2. get_reset_ts_from_response() never fell back to retry-after header 3. _update_current_window() only advanced by one period instead of catching up 4. _do_acquire() had no upper bound on sleep duration Fixes: - Initialize next_reset_ts to now + period in model_to_component_factory.py - Fall back to retry-after header in get_reset_ts_from_response() - Use while loop in _update_current_window() to advance past all elapsed periods - Cap maximum sleep in _do_acquire() to 600 seconds with a warning log Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 4aaafcf commit c1722c3

3 files changed

Lines changed: 166 additions & 5 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4340,11 +4340,10 @@ def create_fixed_window_call_rate_policy(
43404340
for matcher in model.matchers
43414341
]
43424342

4343-
# Set the initial reset timestamp to 10 days from now.
4344-
# This value will be updated by the first request.
4343+
period = parse_duration(model.period)
43454344
return FixedWindowCallRatePolicy(
4346-
next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10),
4347-
period=parse_duration(model.period),
4345+
next_reset_ts=datetime.datetime.now() + period,
4346+
period=period,
43484347
call_limit=model.call_limit,
43494348
matchers=matchers,
43504349
)

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ def update(
415415

416416
def _update_current_window(self) -> None:
417417
now = datetime.datetime.now()
418-
if now > self._next_reset_ts:
418+
while now > self._next_reset_ts:
419419
logger.debug("started new window, %s calls available now", self._call_limit)
420420
self._next_reset_ts = self._next_reset_ts + self._offset
421421
self._calls_num = 0
@@ -646,6 +646,14 @@ def _do_acquire(
646646
f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
647647
f"Sleeping for {time_to_wait} on attempt {attempt}."
648648
)
649+
max_sleep = timedelta(seconds=600)
650+
if time_to_wait > max_sleep:
651+
logger.warning(
652+
"Rate limit wait time %s exceeds maximum of %s. Capping to maximum.",
653+
time_to_wait,
654+
max_sleep,
655+
)
656+
time_to_wait = max_sleep
649657
time.sleep(time_to_wait.total_seconds())
650658
else:
651659
logger.debug(
@@ -700,6 +708,12 @@ def get_reset_ts_from_response(
700708
return datetime.datetime.fromtimestamp(
701709
int(response.headers[self._ratelimit_reset_header])
702710
)
711+
retry_after = response.headers.get("retry-after")
712+
if retry_after is not None:
713+
try:
714+
return datetime.datetime.now() + datetime.timedelta(seconds=int(retry_after))
715+
except (ValueError, OverflowError):
716+
logger.warning("Could not parse retry-after header value: %s", retry_after)
703717
return None
704718

705719
def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:

unit_tests/sources/streams/test_call_rate.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import time
77
from datetime import datetime, timedelta
88
from typing import Any, Iterable, Mapping, Optional
9+
from unittest.mock import patch
910

1011
import pytest
1112
import requests
@@ -16,6 +17,7 @@
1617
APIBudget,
1718
CallRateLimitHit,
1819
FixedWindowCallRatePolicy,
20+
HttpAPIBudget,
1921
HttpRequestMatcher,
2022
HttpRequestRegexMatcher,
2123
MovingWindowCallRatePolicy,
@@ -562,3 +564,149 @@ def test_combined_criteria(self):
562564
assert not matcher(req_bad_path)
563565
assert not matcher(req_bad_param)
564566
assert not matcher(req_bad_header)
567+
568+
569+
def test_fixed_window_update_current_window_advances_past_multiple_periods():
570+
"""_update_current_window should advance past all elapsed periods, not just one."""
571+
now = datetime.now()
572+
# Set next_reset_ts to 5 periods in the past
573+
period = timedelta(minutes=1)
574+
past_reset = now - (period * 5)
575+
policy = FixedWindowCallRatePolicy(
576+
next_reset_ts=past_reset,
577+
period=period,
578+
call_limit=10,
579+
matchers=[],
580+
)
581+
# Trigger window update via try_acquire
582+
policy.try_acquire("request", weight=1)
583+
# After advancing, next_reset_ts should be in the future
584+
assert policy._next_reset_ts > now, (
585+
"next_reset_ts should have advanced past now after multiple elapsed periods"
586+
)
587+
588+
589+
def test_fixed_window_deadlock_scenario_429_without_ratelimit_reset():
590+
"""Reproduce the deadlock: 429 with no ratelimit-reset header should not cause extreme wait times.
591+
592+
The original bug chain:
593+
1. FixedWindowCallRatePolicy created with next_reset_ts = now + 10 days
594+
2. 429 arrives without ratelimit-reset header
595+
3. available_calls set to 0, next_reset_ts unchanged
596+
4. try_acquire raises CallRateLimitHit with time_to_wait ≈ 10 days
597+
"""
598+
now = datetime.now()
599+
period = timedelta(hours=1)
600+
policy = FixedWindowCallRatePolicy(
601+
next_reset_ts=now + period,
602+
period=period,
603+
call_limit=10,
604+
matchers=[],
605+
)
606+
607+
budget = HttpAPIBudget(
608+
policies=[policy],
609+
status_codes_for_ratelimit_hit=[429],
610+
)
611+
612+
# Simulate a 429 response without ratelimit-reset but with retry-after
613+
mock_response = requests.Response()
614+
mock_response.status_code = 429
615+
mock_response.headers["retry-after"] = "60"
616+
# No ratelimit-reset header
617+
618+
mock_request = Request("GET", "http://example.com/api")
619+
budget.update_from_response(mock_request, mock_response)
620+
621+
# After update, available_calls should be 0 and reset_ts should be ~60s from now
622+
# The policy should NOT have a 10-day wait
623+
with pytest.raises(CallRateLimitHit) as exc_info:
624+
policy.try_acquire("request", weight=1)
625+
626+
# The wait time should be roughly 1 hour (the period), not 10 days
627+
assert exc_info.value.time_to_wait < timedelta(hours=2), (
628+
f"Wait time {exc_info.value.time_to_wait} is too large, likely the old 10-day bug"
629+
)
630+
631+
632+
def test_http_api_budget_get_reset_ts_from_retry_after_header():
633+
"""get_reset_ts_from_response should fall back to retry-after when ratelimit-reset is absent."""
634+
budget = HttpAPIBudget(policies=[])
635+
636+
mock_response = requests.Response()
637+
mock_response.status_code = 429
638+
mock_response.headers["retry-after"] = "120"
639+
640+
now = datetime.now()
641+
result = budget.get_reset_ts_from_response(mock_response)
642+
assert result is not None
643+
# Should be approximately 120 seconds from now
644+
expected = now + timedelta(seconds=120)
645+
assert abs((result - expected).total_seconds()) < 5, (
646+
f"Expected reset_ts ~{expected}, got {result}"
647+
)
648+
649+
650+
def test_http_api_budget_get_reset_ts_prefers_ratelimit_reset_over_retry_after():
651+
"""ratelimit-reset header should be preferred over retry-after."""
652+
budget = HttpAPIBudget(policies=[])
653+
654+
mock_response = requests.Response()
655+
mock_response.status_code = 200
656+
future_ts = int((datetime.now() + timedelta(hours=1)).timestamp())
657+
mock_response.headers["ratelimit-reset"] = str(future_ts)
658+
mock_response.headers["retry-after"] = "30"
659+
660+
result = budget.get_reset_ts_from_response(mock_response)
661+
assert result is not None
662+
# Should use ratelimit-reset (1 hour from now), not retry-after (30s)
663+
assert result > datetime.now() + timedelta(minutes=30)
664+
665+
666+
def test_http_api_budget_get_reset_ts_invalid_retry_after():
667+
"""Invalid retry-after header value should return None gracefully."""
668+
budget = HttpAPIBudget(policies=[])
669+
670+
mock_response = requests.Response()
671+
mock_response.status_code = 429
672+
mock_response.headers["retry-after"] = "not-a-number"
673+
674+
result = budget.get_reset_ts_from_response(mock_response)
675+
assert result is None
676+
677+
678+
def test_http_api_budget_get_reset_ts_no_headers():
679+
"""No rate limit headers at all should return None."""
680+
budget = HttpAPIBudget(policies=[])
681+
682+
mock_response = requests.Response()
683+
mock_response.status_code = 429
684+
685+
result = budget.get_reset_ts_from_response(mock_response)
686+
assert result is None
687+
688+
689+
def test_do_acquire_caps_sleep_duration():
690+
"""_do_acquire should cap sleep time to 600 seconds maximum."""
691+
# Use call_limit=1 so try_acquire doesn't reject weight=1, then exhaust budget
692+
policy = FixedWindowCallRatePolicy(
693+
next_reset_ts=datetime.now() + timedelta(days=10),
694+
period=timedelta(days=10),
695+
call_limit=1,
696+
matchers=[],
697+
)
698+
# Exhaust the budget so next call triggers CallRateLimitHit with ~10 day wait
699+
policy.try_acquire("warmup", weight=1)
700+
701+
budget = APIBudget(policies=[policy], maximum_attempts_to_acquire=2)
702+
703+
with patch("airbyte_cdk.sources.streams.call_rate.time.sleep") as mock_sleep:
704+
mock_sleep.side_effect = [None, None]
705+
with pytest.raises(CallRateLimitHit):
706+
budget.acquire_call(Request("GET", "http://example.com"), block=True)
707+
708+
# Sleep should have been called with at most 600 seconds
709+
assert mock_sleep.call_count > 0, "Expected sleep to be called at least once"
710+
for call_args in mock_sleep.call_args_list:
711+
sleep_seconds = call_args[0][0]
712+
assert sleep_seconds <= 600, f"Sleep duration {sleep_seconds}s exceeds the 600s cap"

0 commit comments

Comments
 (0)