diff --git a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py index 818ce0179..dea33879a 100644 --- a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py @@ -314,14 +314,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea } ) - stream_configs = ( - self._stream_configs(self._source_config, config=config) + self.dynamic_streams - ) - api_budget_model = self._source_config.get("api_budget") if api_budget_model: self._constructor.set_api_budget(api_budget_model, config) + stream_configs = ( + self._stream_configs(self._source_config, config=config) + self.dynamic_streams + ) + source_streams = [ self._constructor.create_component( ( diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 292615692..fb9c0ea0b 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -406,12 +406,12 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i if self._spec_component: self._spec_component.validate_config(self._config) - stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams - api_budget_model = self._source_config.get("api_budget") if api_budget_model: self._constructor.set_api_budget(api_budget_model, self._config) + stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams + prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) source_streams = [ diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 8bc130a35..f49e8598a 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -2348,3 +2348,81 @@ def test_given_invalid_config_streams_validates_config_and_raises(): with pytest.raises(ValueError): source.streams(input_config) + + +def test_api_budget_is_set_before_dynamic_streams_evaluated(): + """Verify that set_api_budget is called before dynamic_streams is accessed in streams(). + + This is a regression test for https://github.com/airbytehq/oncall/issues/11954 + where dynamic stream discovery HTTP requests bypassed the configured rate limiter + because set_api_budget was called after self.dynamic_streams was evaluated. + """ + manifest = { + "version": "3.8.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "lists", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + } + + source = ManifestDeclarativeSource(source_config=manifest) + + call_order: list[str] = [] + original_set_api_budget = source._constructor.set_api_budget + + def tracking_set_api_budget(*args, **kwargs): + call_order.append("set_api_budget") + return original_set_api_budget(*args, **kwargs) + + original_dynamic_stream_configs = source._dynamic_stream_configs + + def tracking_dynamic_stream_configs(*args, **kwargs): + call_order.append("dynamic_stream_configs") + return original_dynamic_stream_configs(*args, **kwargs) + + # Add an api_budget to the source config so set_api_budget is actually called + source._source_config["api_budget"] = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}], + "matchers": [], + } + ], + } + + with ( + patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget), + patch.object( + source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs + ), + ): + source.streams(config={}) + + assert "set_api_budget" in call_order, "set_api_budget was never called" + assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called" + assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), ( + f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}" + ) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index bf1f61610..de86268e7 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -5518,3 +5518,54 @@ def test_get_partition_router(stream_factory, expected_type): assert isinstance(router, SubstreamPartitionRouter) elif expected_type == "GroupingPartitionRouter": assert isinstance(router, GroupingPartitionRouter) + + +def test_api_budget_is_set_before_dynamic_streams_evaluated(): + """Verify that set_api_budget is called before dynamic_streams is accessed in streams(). + + This is a regression test for https://github.com/airbytehq/oncall/issues/11954 + where dynamic stream discovery HTTP requests bypassed the configured rate limiter + because set_api_budget was called after self.dynamic_streams was evaluated. + """ + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + call_order: list[str] = [] + original_set_api_budget = source._constructor.set_api_budget + + def tracking_set_api_budget(*args, **kwargs): + call_order.append("set_api_budget") + return original_set_api_budget(*args, **kwargs) + + original_dynamic_stream_configs = source._dynamic_stream_configs + + def tracking_dynamic_stream_configs(*args, **kwargs): + call_order.append("dynamic_stream_configs") + return original_dynamic_stream_configs(*args, **kwargs) + + # Add an api_budget to the source config so set_api_budget is actually called + source._source_config["api_budget"] = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}], + "matchers": [], + } + ], + } + + with ( + patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget), + patch.object( + source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs + ), + ): + source.streams(config=_CONFIG) + + assert "set_api_budget" in call_order, "set_api_budget was never called" + assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called" + assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), ( + f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}" + )