Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,16 @@ definitions:
description: The headers to match.
type: object
additionalProperties: true
cost:
title: Cost
description: >
The cost of a request matching this matcher in the API's rate limit cost model.
When set, this value is passed as the weight when acquiring a call from the rate limiter,
enabling cost-based rate limiting where different endpoints consume different amounts
from a shared budget. If not set, each request counts as 1.
anyOf:
- type: integer
- type: string
additionalProperties: true
DefaultErrorHandler:
title: Default Error Handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ class Config:
headers: Optional[Dict[str, Any]] = Field(
None, description="The headers to match.", title="Headers"
)
cost: Optional[Union[int, str]] = Field(
None,
description="The cost of a request matching this matcher in the API's rate limit cost model. When set, this value is passed as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.",
title="Cost",
)


class DpathExtractor(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4387,12 +4387,19 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate:
def create_http_request_matcher(
self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any
) -> HttpRequestRegexMatcher:
cost = model.cost
if cost is not None:
if isinstance(cost, str):
cost = int(InterpolatedString.create(cost, parameters={}).eval(config))
else:
cost = int(cost)
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated
return HttpRequestRegexMatcher(
method=model.method,
url_base=model.url_base,
url_path_pattern=model.url_path_pattern,
params=model.params,
headers=model.headers,
cost=cost,
)

def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None:
Expand Down
33 changes: 31 additions & 2 deletions airbyte_cdk/sources/streams/call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,19 @@ def __init__(
url_path_pattern: Optional[str] = None,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, Any]] = None,
cost: Optional[int] = None,
):
"""
:param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
:param url_base: Base URL (scheme://host) that must match.
:param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
:param params: Dictionary of query parameters that must be present in the request.
:param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
:param cost: The cost (weight) of a request matching this matcher. If set, this value is used
as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting.
If not set, each request counts as 1.
"""
self._cost = cost
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated
self._method = method.upper() if method else None

# Normalize the url_base if provided: remove trailing slash.
Expand Down Expand Up @@ -242,11 +247,16 @@ def __call__(self, request: Any) -> bool:

return True

@property
def cost(self) -> Optional[int]:
"""The cost (weight) of a request matching this matcher, or None if not set."""
return self._cost

def __str__(self) -> str:
regex = self._url_path_pattern.pattern if self._url_path_pattern else None
return (
f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, cost={self._cost})"
)


Expand All @@ -265,6 +275,24 @@ def matches(self, request: Any) -> bool:
return True
return any(matcher(request) for matcher in self._matchers)

def get_cost(self, request: Any) -> int:
"""Get the cost (weight) for a request based on the first matching matcher.

If a matcher has a cost configured, that cost is used as the weight.
Otherwise, defaults to 1.

:param request: a request object
:return: the cost/weight for this request
"""
for matcher in self._matchers:
if (
matcher(request)
and isinstance(matcher, HttpRequestRegexMatcher)
and matcher.cost is not None
):
return matcher.cost
return 1
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class UnlimitedCallRatePolicy(BaseCallRatePolicy):
"""
Expand Down Expand Up @@ -596,7 +624,8 @@ def _do_acquire(
# sometimes we spend all budget before a second attempt, so we have a few more attempts
for attempt in range(1, self._maximum_attempts_to_acquire):
try:
policy.try_acquire(request, weight=1)
weight = policy.get_cost(request) if isinstance(policy, BaseCallRatePolicy) else 1
policy.try_acquire(request, weight=weight)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return
except CallRateLimitHit as exc:
last_exception = exc
Expand Down
95 changes: 95 additions & 0 deletions unit_tests/sources/streams/test_call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,13 @@ def test_http_request_matching(mocker):
users_policy.matches.side_effect = HttpRequestMatcher(
url="http://domain/api/users", method="GET"
)
users_policy.get_cost.return_value = 1
groups_policy.matches.side_effect = HttpRequestMatcher(
url="http://domain/api/groups", method="POST"
)
groups_policy.get_cost.return_value = 1
root_policy.matches.side_effect = HttpRequestMatcher(method="GET")
root_policy.get_cost.return_value = 1
api_budget = APIBudget(
policies=[
users_policy,
Expand Down Expand Up @@ -360,6 +363,98 @@ def test_with_cache(self, mocker, requests_mock):
assert MovingWindowCallRatePolicy.try_acquire.call_count == 1


class TestCostBasedRateLimiting:
"""Tests for cost-based rate limiting where different endpoints consume different amounts from a shared budget."""

def test_matcher_cost_default_none(self):
"""HttpRequestRegexMatcher cost defaults to None when not specified."""
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test")
assert matcher.cost is None

def test_matcher_cost_is_stored(self):
"""HttpRequestRegexMatcher stores the cost value when provided."""
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", cost=60)
assert matcher.cost == 60

def test_policy_get_cost_returns_matcher_cost(self):
"""BaseCallRatePolicy.get_cost returns cost from the matching matcher."""
policy = MovingWindowCallRatePolicy(
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=120)],
rates=[Rate(1000, timedelta(hours=1))],
)
req = Request("GET", "https://example.com/api/expensive")
assert policy.get_cost(req) == 120

def test_policy_get_cost_defaults_to_1(self):
"""BaseCallRatePolicy.get_cost returns 1 when no matcher has a cost set."""
policy = MovingWindowCallRatePolicy(
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/default")],
rates=[Rate(1000, timedelta(hours=1))],
)
req = Request("GET", "https://example.com/api/default")
assert policy.get_cost(req) == 1

def test_policy_get_cost_no_matching_matcher(self):
"""BaseCallRatePolicy.get_cost returns 1 when no matcher matches the request."""
policy = MovingWindowCallRatePolicy(
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", cost=50)],
rates=[Rate(1000, timedelta(hours=1))],
)
req = Request("GET", "https://example.com/api/unmatched")
assert policy.get_cost(req) == 1

def test_api_budget_uses_cost_as_weight(self):
"""APIBudget._do_acquire passes the matcher's cost as weight to try_acquire."""
policy = MovingWindowCallRatePolicy(
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", cost=10)],
rates=[Rate(100, timedelta(hours=1))],
)
budget = APIBudget(policies=[policy])

# Make requests — each costs 10 from the budget of 100
for i in range(10):
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)

# The 11th request should exceed the budget (10 * 10 = 100, one more = 110 > 100)
with pytest.raises(CallRateLimitHit):
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)

def test_cost_1_backward_compatible(self):
"""When cost is not set, behavior is identical to the old hardcoded weight=1."""
policy = MovingWindowCallRatePolicy(
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/normal")],
rates=[Rate(5, timedelta(hours=1))],
)
budget = APIBudget(policies=[policy])

for i in range(5):
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)

with pytest.raises(CallRateLimitHit):
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)

def test_shared_budget_different_costs(self):
"""Multiple matchers with different costs sharing one policy correctly consume the shared budget."""
# Shared policy matches both endpoints via regex
policy = MovingWindowCallRatePolicy(
matchers=[
HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", cost=1),
HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=10),
],
rates=[Rate(20, timedelta(hours=1))],
)
budget = APIBudget(policies=[policy])

# Make 1 expensive request (costs 10) and 10 cheap requests (cost 1 each) = total 20
budget.acquire_call(Request("GET", "https://example.com/api/expensive"), block=False)
for i in range(10):
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)

# Budget is now at 20/20 — any further request should fail
with pytest.raises(CallRateLimitHit):
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)


class TestHttpRequestRegexMatcher:
"""
Tests for the new regex-based logic:
Expand Down
Loading