Skip to content

Commit f5377a8

Browse files
fix(cdk): propagate api_budget to CustomRequester subclasses of HttpRequester
1 parent 1256a1f commit f5377a8

2 files changed

Lines changed: 106 additions & 0 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,21 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
18591859
for class_field in component_fields.keys()
18601860
if class_field in model_args
18611861
}
1862+
1863+
# Propagate the top-level api_budget to custom components that are subclasses of
1864+
# HttpRequester (and therefore accept an `api_budget` field), unless the manifest
1865+
# or an explicit kwarg has already provided one. Without this, custom requesters
1866+
# silently lose the connector-level HTTPAPIBudget and any configured rate-limit
1867+
# policies have no effect at runtime.
1868+
if (
1869+
self._api_budget is not None
1870+
and "api_budget" in component_fields
1871+
and kwargs.get("api_budget") is None
1872+
and isinstance(custom_component_class, type)
1873+
and issubclass(custom_component_class, HttpRequester)
1874+
):
1875+
kwargs["api_budget"] = self._api_budget
1876+
18621877
return custom_component_class(**kwargs)
18631878

18641879
@staticmethod

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
from airbyte_cdk.sources.declarative.models import (
7171
CustomRecordExtractor as CustomRecordExtractorModel,
7272
)
73+
from airbyte_cdk.sources.declarative.models import CustomRequester as CustomRequesterModel
7374
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
7475
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
7576
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
@@ -4312,6 +4313,96 @@ def test_api_budget_fixed_window_policy():
43124313
assert matcher._url_path_pattern.pattern == "/v2/data"
43134314

43144315

4316+
def test_api_budget_propagated_to_custom_requester_subclass_of_http_requester():
4317+
"""Top-level `api_budget` must be forwarded to custom components that subclass `HttpRequester`.
4318+
4319+
Without this propagation, connectors using a `CustomRequester` (i.e., a Python subclass of
4320+
`HttpRequester`) silently lose the manifest-level rate-limit policies because
4321+
`create_custom_component` does not forward `self._api_budget` the way
4322+
`create_http_requester` does. See airbytehq/oncall#12011 for the reproducer.
4323+
"""
4324+
manifest_api_budget = {
4325+
"type": "HTTPAPIBudget",
4326+
"policies": [
4327+
{
4328+
"type": "MovingWindowCallRatePolicy",
4329+
"rates": [
4330+
{
4331+
"type": "Rate",
4332+
"limit": 60,
4333+
"interval": "PT1M",
4334+
}
4335+
],
4336+
"matchers": [],
4337+
}
4338+
],
4339+
}
4340+
4341+
custom_requester_definition = {
4342+
"type": "CustomRequester",
4343+
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequester",
4344+
"url_base": "https://example.org",
4345+
"path": "/v1/data",
4346+
"http_method": "GET",
4347+
}
4348+
4349+
config: Mapping[str, Any] = {}
4350+
local_factory = ModelToComponentFactory()
4351+
local_factory.set_api_budget(manifest_api_budget, config)
4352+
4353+
custom_requester = local_factory.create_component(
4354+
model_type=CustomRequesterModel,
4355+
component_definition=custom_requester_definition,
4356+
config=config,
4357+
name="custom_stream",
4358+
)
4359+
4360+
assert isinstance(custom_requester, HttpRequester)
4361+
assert custom_requester.api_budget is not None, (
4362+
"Manifest-level api_budget was not propagated to the CustomRequester instance"
4363+
)
4364+
assert len(custom_requester.api_budget._policies) == 1
4365+
policy = custom_requester.api_budget._policies[0]
4366+
assert isinstance(policy, MovingWindowCallRatePolicy)
4367+
# Also verify the underlying HttpClient received the same budget
4368+
assert custom_requester._http_client._api_budget is custom_requester.api_budget
4369+
4370+
4371+
def test_api_budget_not_propagated_to_non_http_requester_custom_components():
4372+
"""Custom components that do NOT subclass `HttpRequester` must not receive `api_budget`.
4373+
4374+
This guards against accidentally injecting an `api_budget` kwarg into arbitrary custom
4375+
components (e.g., custom error handlers, partition routers) whose constructors would
4376+
reject the unexpected keyword.
4377+
"""
4378+
manifest_api_budget = {
4379+
"type": "HTTPAPIBudget",
4380+
"policies": [
4381+
{
4382+
"type": "MovingWindowCallRatePolicy",
4383+
"rates": [{"type": "Rate", "limit": 1, "interval": "PT60S"}],
4384+
"matchers": [],
4385+
}
4386+
],
4387+
}
4388+
4389+
custom_error_handler_definition = {
4390+
"type": "CustomErrorHandler",
4391+
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingSomeComponent",
4392+
"basic_field": "expected",
4393+
}
4394+
4395+
config: Mapping[str, Any] = {}
4396+
local_factory = ModelToComponentFactory()
4397+
local_factory.set_api_budget(manifest_api_budget, config)
4398+
4399+
# Must not raise TypeError about an unexpected "api_budget" kwarg.
4400+
custom_component = local_factory.create_component(
4401+
CustomErrorHandlerModel, custom_error_handler_definition, config
4402+
)
4403+
assert custom_component.basic_field == "expected"
4404+
4405+
43154406
def test_create_grouping_partition_router_with_underlying_router():
43164407
content = """
43174408
schema_loader:

0 commit comments

Comments
 (0)