Skip to content

Commit 0787f79

Browse files
committed
fix(cdk): propagate api_budget through CustomRequester HttpClient overrides
1 parent f5377a8 commit 0787f79

3 files changed

Lines changed: 164 additions & 1 deletion

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1865,6 +1865,7 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
18651865
# or an explicit kwarg has already provided one. Without this, custom requesters
18661866
# silently lose the connector-level HTTPAPIBudget and any configured rate-limit
18671867
# policies have no effect at runtime.
1868+
injected_api_budget = False
18681869
if (
18691870
self._api_budget is not None
18701871
and "api_budget" in component_fields
@@ -1873,8 +1874,35 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
18731874
and issubclass(custom_component_class, HttpRequester)
18741875
):
18751876
kwargs["api_budget"] = self._api_budget
1877+
injected_api_budget = True
18761878

1877-
return custom_component_class(**kwargs)
1879+
custom_component = custom_component_class(**kwargs)
1880+
if injected_api_budget and isinstance(custom_component, HttpRequester):
1881+
self._sync_injected_api_budget_with_http_client(custom_component)
1882+
1883+
return custom_component
1884+
1885+
@staticmethod
1886+
def _sync_injected_api_budget_with_http_client(custom_requester: HttpRequester) -> None:
1887+
"""
1888+
Custom requesters can replace `_http_client` in `__post_init__` without forwarding `api_budget`.
1889+
If the factory injected a manifest-level budget and the replacement client kept the default empty
1890+
budget, sync the active client back to the requester's injected budget.
1891+
"""
1892+
http_client = getattr(custom_requester, "_http_client", None)
1893+
http_client_api_budget = getattr(http_client, "_api_budget", None)
1894+
injected_api_budget = custom_requester.api_budget
1895+
1896+
if (
1897+
http_client is None
1898+
or http_client_api_budget is None
1899+
or injected_api_budget is None
1900+
or http_client_api_budget is injected_api_budget
1901+
):
1902+
return
1903+
1904+
if len(getattr(http_client_api_budget, "_policies", [])) == 0:
1905+
http_client._api_budget = injected_api_budget
18781906

18791907
@staticmethod
18801908
def _get_class_from_fully_qualified_class_name(

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4368,6 +4368,98 @@ def test_api_budget_propagated_to_custom_requester_subclass_of_http_requester():
43684368
assert custom_requester._http_client._api_budget is custom_requester.api_budget
43694369

43704370

4371+
def test_api_budget_propagated_to_custom_requester_that_replaces_http_client():
4372+
"""Injected api budgets must survive requesters that replace `_http_client` in `__post_init__`.
4373+
4374+
Some connector requesters call `super().__post_init__()` and then swap in a custom `HttpClient`
4375+
implementation without forwarding `api_budget`. The budget should still be applied to the active
4376+
client so manifest-level rate limits continue to work.
4377+
"""
4378+
manifest_api_budget = {
4379+
"type": "HTTPAPIBudget",
4380+
"policies": [
4381+
{
4382+
"type": "MovingWindowCallRatePolicy",
4383+
"rates": [
4384+
{
4385+
"type": "Rate",
4386+
"limit": 60,
4387+
"interval": "PT1M",
4388+
}
4389+
],
4390+
"matchers": [],
4391+
}
4392+
],
4393+
}
4394+
4395+
custom_requester_definition = {
4396+
"type": "CustomRequester",
4397+
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClient",
4398+
"url_base": "https://example.org",
4399+
"path": "/v1/data",
4400+
"http_method": "GET",
4401+
}
4402+
4403+
config: Mapping[str, Any] = {}
4404+
local_factory = ModelToComponentFactory()
4405+
local_factory.set_api_budget(manifest_api_budget, config)
4406+
4407+
custom_requester = local_factory.create_component(
4408+
model_type=CustomRequesterModel,
4409+
component_definition=custom_requester_definition,
4410+
config=config,
4411+
name="custom_stream",
4412+
)
4413+
4414+
assert isinstance(custom_requester, HttpRequester)
4415+
assert custom_requester.api_budget is not None
4416+
assert custom_requester._http_client._api_budget is custom_requester.api_budget
4417+
4418+
4419+
def test_api_budget_not_overwriting_non_empty_budget_on_replaced_http_client():
4420+
"""A requester that intentionally installs its own budget should keep it."""
4421+
manifest_api_budget = {
4422+
"type": "HTTPAPIBudget",
4423+
"policies": [
4424+
{
4425+
"type": "MovingWindowCallRatePolicy",
4426+
"rates": [
4427+
{
4428+
"type": "Rate",
4429+
"limit": 60,
4430+
"interval": "PT1M",
4431+
}
4432+
],
4433+
"matchers": [],
4434+
}
4435+
],
4436+
}
4437+
4438+
custom_requester_definition = {
4439+
"type": "CustomRequester",
4440+
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClientAndOwnBudget",
4441+
"url_base": "https://example.org",
4442+
"path": "/v1/data",
4443+
"http_method": "GET",
4444+
}
4445+
4446+
config: Mapping[str, Any] = {}
4447+
local_factory = ModelToComponentFactory()
4448+
local_factory.set_api_budget(manifest_api_budget, config)
4449+
4450+
custom_requester = local_factory.create_component(
4451+
model_type=CustomRequesterModel,
4452+
component_definition=custom_requester_definition,
4453+
config=config,
4454+
name="custom_stream",
4455+
)
4456+
4457+
assert isinstance(custom_requester, HttpRequester)
4458+
assert custom_requester.api_budget is not None
4459+
assert custom_requester._http_client._api_budget is not custom_requester.api_budget
4460+
assert len(custom_requester._http_client._api_budget._policies) == 1
4461+
4462+
43714463
def test_api_budget_not_propagated_to_non_http_requester_custom_components():
43724464
"""Custom components that do NOT subclass `HttpRequester` must not receive `api_budget`.
43734465

unit_tests/sources/declarative/parsers/testing_components.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
from dataclasses import dataclass, field
6+
from datetime import timedelta
67
from typing import Any, ClassVar, List, Mapping, Optional
78

89
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
@@ -21,6 +22,8 @@
2122
RequestInput,
2223
)
2324
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
25+
from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy, Rate
26+
from airbyte_cdk.sources.streams.http import HttpClient
2427

2528

2629
@dataclass
@@ -114,3 +117,43 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
114117
parameters=parameters or {},
115118
)
116119
super().__post_init__(parameters)
120+
121+
122+
@dataclass
123+
class TestingRequesterWithReplacedHttpClient(HttpRequester):
124+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
125+
super().__post_init__(parameters)
126+
self._http_client = HttpClient(
127+
name=self.name,
128+
logger=self.logger,
129+
error_handler=self.error_handler,
130+
authenticator=self._authenticator,
131+
use_cache=self.use_cache,
132+
backoff_strategy=None,
133+
disable_retries=self.disable_retries,
134+
message_repository=self.message_repository,
135+
)
136+
137+
138+
@dataclass
139+
class TestingRequesterWithReplacedHttpClientAndOwnBudget(HttpRequester):
140+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
141+
super().__post_init__(parameters)
142+
self._http_client = HttpClient(
143+
name=self.name,
144+
logger=self.logger,
145+
error_handler=self.error_handler,
146+
api_budget=APIBudget(
147+
policies=[
148+
MovingWindowCallRatePolicy(
149+
rates=[Rate(limit=1, interval=timedelta(seconds=30))],
150+
matchers=[],
151+
)
152+
]
153+
),
154+
authenticator=self._authenticator,
155+
use_cache=self.use_cache,
156+
backoff_strategy=None,
157+
disable_retries=self.disable_retries,
158+
message_repository=self.message_repository,
159+
)

0 commit comments

Comments
 (0)