Skip to content

Commit 661cb5c

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix(cdk): reorder api_budget initialization before dynamic stream discovery
Move set_api_budget() call before self.dynamic_streams evaluation in both ConcurrentDeclarativeSource.streams() and ManifestDeclarativeSource.streams(). Previously, dynamic stream discovery HTTP requests bypassed the configured rate limiter because set_api_budget() was called after self.dynamic_streams was evaluated. This caused connectors like source-airtable to hit 429 rate limit errors during connection checks and syncs. Resolves airbytehq/oncall#11954 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 4aaafcf commit 661cb5c

4 files changed

Lines changed: 135 additions & 6 deletions

File tree

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
314314
}
315315
)
316316

317-
stream_configs = (
318-
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
319-
)
320-
321317
api_budget_model = self._source_config.get("api_budget")
322318
if api_budget_model:
323319
self._constructor.set_api_budget(api_budget_model, config)
324320

321+
stream_configs = (
322+
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
323+
)
324+
325325
source_streams = [
326326
self._constructor.create_component(
327327
(

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,12 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
406406
if self._spec_component:
407407
self._spec_component.validate_config(self._config)
408408

409-
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
410-
411409
api_budget_model = self._source_config.get("api_budget")
412410
if api_budget_model:
413411
self._constructor.set_api_budget(api_budget_model, self._config)
414412

413+
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
414+
415415
prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
416416

417417
source_streams = [

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,3 +2348,81 @@ def test_given_invalid_config_streams_validates_config_and_raises():
23482348

23492349
with pytest.raises(ValueError):
23502350
source.streams(input_config)
2351+
2352+
2353+
def test_api_budget_is_set_before_dynamic_streams_evaluated():
2354+
"""Verify that set_api_budget is called before dynamic_streams is accessed in streams().
2355+
2356+
This is a regression test for https://github.com/airbytehq/oncall/issues/11954
2357+
where dynamic stream discovery HTTP requests bypassed the configured rate limiter
2358+
because set_api_budget was called after self.dynamic_streams was evaluated.
2359+
"""
2360+
manifest = {
2361+
"version": "3.8.2",
2362+
"type": "DeclarativeSource",
2363+
"check": {"type": "CheckStream", "stream_names": ["lists"]},
2364+
"streams": [
2365+
{
2366+
"type": "DeclarativeStream",
2367+
"name": "lists",
2368+
"schema_loader": {
2369+
"type": "InlineSchemaLoader",
2370+
"schema": {"type": "object"},
2371+
},
2372+
"retriever": {
2373+
"type": "SimpleRetriever",
2374+
"requester": {
2375+
"type": "HttpRequester",
2376+
"url_base": "https://example.org",
2377+
"path": "/test",
2378+
"authenticator": {"type": "NoAuth"},
2379+
},
2380+
"record_selector": {
2381+
"type": "RecordSelector",
2382+
"extractor": {"type": "DpathExtractor", "field_path": []},
2383+
},
2384+
},
2385+
}
2386+
],
2387+
}
2388+
2389+
source = ManifestDeclarativeSource(source_config=manifest)
2390+
2391+
call_order: list[str] = []
2392+
original_set_api_budget = source._constructor.set_api_budget
2393+
2394+
def tracking_set_api_budget(*args, **kwargs):
2395+
call_order.append("set_api_budget")
2396+
return original_set_api_budget(*args, **kwargs)
2397+
2398+
original_dynamic_stream_configs = source._dynamic_stream_configs
2399+
2400+
def tracking_dynamic_stream_configs(*args, **kwargs):
2401+
call_order.append("dynamic_stream_configs")
2402+
return original_dynamic_stream_configs(*args, **kwargs)
2403+
2404+
# Add an api_budget to the source config so set_api_budget is actually called
2405+
source._source_config["api_budget"] = {
2406+
"type": "HTTPAPIBudget",
2407+
"policies": [
2408+
{
2409+
"type": "MovingWindowCallRatePolicy",
2410+
"rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}],
2411+
"matchers": [],
2412+
}
2413+
],
2414+
}
2415+
2416+
with (
2417+
patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget),
2418+
patch.object(
2419+
source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs
2420+
),
2421+
):
2422+
source.streams(config={})
2423+
2424+
assert "set_api_budget" in call_order, "set_api_budget was never called"
2425+
assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called"
2426+
assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), (
2427+
f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}"
2428+
)

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5518,3 +5518,54 @@ def test_get_partition_router(stream_factory, expected_type):
55185518
assert isinstance(router, SubstreamPartitionRouter)
55195519
elif expected_type == "GroupingPartitionRouter":
55205520
assert isinstance(router, GroupingPartitionRouter)
5521+
5522+
5523+
def test_api_budget_is_set_before_dynamic_streams_evaluated():
5524+
"""Verify that set_api_budget is called before dynamic_streams is accessed in streams().
5525+
5526+
This is a regression test for https://github.com/airbytehq/oncall/issues/11954
5527+
where dynamic stream discovery HTTP requests bypassed the configured rate limiter
5528+
because set_api_budget was called after self.dynamic_streams was evaluated.
5529+
"""
5530+
source = ConcurrentDeclarativeSource(
5531+
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
5532+
)
5533+
5534+
call_order: list[str] = []
5535+
original_set_api_budget = source._constructor.set_api_budget
5536+
5537+
def tracking_set_api_budget(*args, **kwargs):
5538+
call_order.append("set_api_budget")
5539+
return original_set_api_budget(*args, **kwargs)
5540+
5541+
original_dynamic_stream_configs = source._dynamic_stream_configs
5542+
5543+
def tracking_dynamic_stream_configs(*args, **kwargs):
5544+
call_order.append("dynamic_stream_configs")
5545+
return original_dynamic_stream_configs(*args, **kwargs)
5546+
5547+
# Add an api_budget to the source config so set_api_budget is actually called
5548+
source._source_config["api_budget"] = {
5549+
"type": "HTTPAPIBudget",
5550+
"policies": [
5551+
{
5552+
"type": "MovingWindowCallRatePolicy",
5553+
"rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}],
5554+
"matchers": [],
5555+
}
5556+
],
5557+
}
5558+
5559+
with (
5560+
patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget),
5561+
patch.object(
5562+
source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs
5563+
),
5564+
):
5565+
source.streams(config=_CONFIG)
5566+
5567+
assert "set_api_budget" in call_order, "set_api_budget was never called"
5568+
assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called"
5569+
assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), (
5570+
f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}"
5571+
)

0 commit comments

Comments
 (0)