Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
}
)

stream_configs = (
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
)

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, config)

stream_configs = (
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
)

source_streams = [
self._constructor.create_component(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,12 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
if self._spec_component:
self._spec_component.validate_config(self._config)

stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, self._config)

stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))

source_streams = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2348,3 +2348,81 @@ def test_given_invalid_config_streams_validates_config_and_raises():

with pytest.raises(ValueError):
source.streams(input_config)


def test_api_budget_is_set_before_dynamic_streams_evaluated():
"""Verify that set_api_budget is called before dynamic_streams is accessed in streams().

This is a regression test for https://github.com/airbytehq/oncall/issues/11954
where dynamic stream discovery HTTP requests bypassed the configured rate limiter
because set_api_budget was called after self.dynamic_streams was evaluated.
"""
manifest = {
"version": "3.8.2",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["lists"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "lists",
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {"type": "object"},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://example.org",
"path": "/test",
"authenticator": {"type": "NoAuth"},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
},
}
],
}

source = ManifestDeclarativeSource(source_config=manifest)

call_order: list[str] = []
original_set_api_budget = source._constructor.set_api_budget

def tracking_set_api_budget(*args, **kwargs):
call_order.append("set_api_budget")
return original_set_api_budget(*args, **kwargs)

original_dynamic_stream_configs = source._dynamic_stream_configs

def tracking_dynamic_stream_configs(*args, **kwargs):
call_order.append("dynamic_stream_configs")
return original_dynamic_stream_configs(*args, **kwargs)

# Add an api_budget to the source config so set_api_budget is actually called
source._source_config["api_budget"] = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}],
"matchers": [],
}
],
}

with (
patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget),
patch.object(
source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs
),
):
source.streams(config={})

assert "set_api_budget" in call_order, "set_api_budget was never called"
assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called"
assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), (
f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -5518,3 +5518,54 @@ def test_get_partition_router(stream_factory, expected_type):
assert isinstance(router, SubstreamPartitionRouter)
elif expected_type == "GroupingPartitionRouter":
assert isinstance(router, GroupingPartitionRouter)


def test_api_budget_is_set_before_dynamic_streams_evaluated():
"""Verify that set_api_budget is called before dynamic_streams is accessed in streams().

This is a regression test for https://github.com/airbytehq/oncall/issues/11954
where dynamic stream discovery HTTP requests bypassed the configured rate limiter
because set_api_budget was called after self.dynamic_streams was evaluated.
"""
source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
)

call_order: list[str] = []
original_set_api_budget = source._constructor.set_api_budget

def tracking_set_api_budget(*args, **kwargs):
call_order.append("set_api_budget")
return original_set_api_budget(*args, **kwargs)

original_dynamic_stream_configs = source._dynamic_stream_configs

def tracking_dynamic_stream_configs(*args, **kwargs):
call_order.append("dynamic_stream_configs")
return original_dynamic_stream_configs(*args, **kwargs)

# Add an api_budget to the source config so set_api_budget is actually called
source._source_config["api_budget"] = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [{"type": "Rate", "limit": 5, "interval": "PT1S"}],
"matchers": [],
}
],
}

with (
patch.object(source._constructor, "set_api_budget", side_effect=tracking_set_api_budget),
patch.object(
source, "_dynamic_stream_configs", side_effect=tracking_dynamic_stream_configs
),
):
source.streams(config=_CONFIG)

assert "set_api_budget" in call_order, "set_api_budget was never called"
assert "dynamic_stream_configs" in call_order, "dynamic_stream_configs was never called"
assert call_order.index("set_api_budget") < call_order.index("dynamic_stream_configs"), (
f"set_api_budget must be called before dynamic_stream_configs, but call order was: {call_order}"
)
Loading