Skip to content

Commit f1f84c9

Browse files
feat: add cost-based rate limiting support to HttpRequestRegexMatcher
Add optional 'cost' field to HttpRequestRegexMatcher that allows specifying per-request cost/weight in rate limiting policies. This enables cost-based rate limiting where different endpoints consume different amounts from a shared budget (e.g., Amplitude's Dashboard REST API). Changes: - YAML schema: add 'cost' field (int or string) to HttpRequestRegexMatcher - Python model: add 'cost' field to HttpRequestRegexMatcher model - call_rate.py: store cost on matcher, add get_cost() to BaseCallRatePolicy, update APIBudget._do_acquire() to use cost as weight instead of hardcoded 1 - model_to_component_factory.py: wire up cost field with interpolation support - Tests: 8 new tests for cost-based rate limiting behavior Backward compatible: cost defaults to None (treated as 1). Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
1 parent 0e57414 commit f1f84c9

5 files changed

Lines changed: 144 additions & 2 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1832,6 +1832,16 @@ definitions:
18321832
description: The headers to match.
18331833
type: object
18341834
additionalProperties: true
1835+
cost:
1836+
title: Cost
1837+
description: >
1838+
The cost of a request matching this matcher in the API's rate limit cost model.
1839+
When set, this value is passed as the weight when acquiring a call from the rate limiter,
1840+
enabling cost-based rate limiting where different endpoints consume different amounts
1841+
from a shared budget. If not set, each request counts as 1.
1842+
anyOf:
1843+
- type: integer
1844+
- type: string
18351845
additionalProperties: true
18361846
DefaultErrorHandler:
18371847
title: Default Error Handler

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ class Config:
486486
headers: Optional[Dict[str, Any]] = Field(
487487
None, description="The headers to match.", title="Headers"
488488
)
489+
cost: Optional[Union[int, str]] = Field(
490+
None,
491+
description="The cost of a request matching this matcher in the API's rate limit cost model. When set, this value is passed as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.",
492+
title="Cost",
493+
)
489494

490495

491496
class DpathExtractor(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4387,12 +4387,19 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate:
43874387
def create_http_request_matcher(
43884388
self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any
43894389
) -> HttpRequestRegexMatcher:
4390+
cost = model.cost
4391+
if cost is not None:
4392+
if isinstance(cost, str):
4393+
cost = int(InterpolatedString.create(cost, parameters={}).eval(config))
4394+
else:
4395+
cost = int(cost)
43904396
return HttpRequestRegexMatcher(
43914397
method=model.method,
43924398
url_base=model.url_base,
43934399
url_path_pattern=model.url_path_pattern,
43944400
params=model.params,
43954401
headers=model.headers,
4402+
cost=cost,
43964403
)
43974404

43984405
def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None:

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,19 @@ def __init__(
166166
url_path_pattern: Optional[str] = None,
167167
params: Optional[Mapping[str, Any]] = None,
168168
headers: Optional[Mapping[str, Any]] = None,
169+
cost: Optional[int] = None,
169170
):
170171
"""
171172
:param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
172173
:param url_base: Base URL (scheme://host) that must match.
173174
:param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
174175
:param params: Dictionary of query parameters that must be present in the request.
175176
:param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177+
:param cost: The cost (weight) of a request matching this matcher. If set, this value is used
178+
as the weight when acquiring a call from the rate limiter, enabling cost-based rate limiting.
179+
If not set, each request counts as 1.
176180
"""
181+
self._cost = cost
177182
self._method = method.upper() if method else None
178183

179184
# Normalize the url_base if provided: remove trailing slash.
@@ -242,11 +247,16 @@ def __call__(self, request: Any) -> bool:
242247

243248
return True
244249

250+
@property
251+
def cost(self) -> Optional[int]:
252+
"""The cost (weight) of a request matching this matcher, or None if not set."""
253+
return self._cost
254+
245255
def __str__(self) -> str:
246256
regex = self._url_path_pattern.pattern if self._url_path_pattern else None
247257
return (
248258
f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
249-
f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
259+
f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, cost={self._cost})"
250260
)
251261

252262

@@ -265,6 +275,20 @@ def matches(self, request: Any) -> bool:
265275
return True
266276
return any(matcher(request) for matcher in self._matchers)
267277

278+
def get_cost(self, request: Any) -> int:
279+
"""Get the cost (weight) for a request based on the first matching matcher.
280+
281+
If a matcher has a cost configured, that cost is used as the weight.
282+
Otherwise, defaults to 1.
283+
284+
:param request: a request object
285+
:return: the cost/weight for this request
286+
"""
287+
for matcher in self._matchers:
288+
if matcher(request) and isinstance(matcher, HttpRequestRegexMatcher) and matcher.cost is not None:
289+
return matcher.cost
290+
return 1
291+
268292

269293
class UnlimitedCallRatePolicy(BaseCallRatePolicy):
270294
"""
@@ -596,7 +620,8 @@ def _do_acquire(
596620
# sometimes we spend all budget before a second attempt, so we have a few more attempts
597621
for attempt in range(1, self._maximum_attempts_to_acquire):
598622
try:
599-
policy.try_acquire(request, weight=1)
623+
weight = policy.get_cost(request) if isinstance(policy, BaseCallRatePolicy) else 1
624+
policy.try_acquire(request, weight=weight)
600625
return
601626
except CallRateLimitHit as exc:
602627
last_exception = exc

unit_tests/sources/streams/test_call_rate.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,13 @@ def test_http_request_matching(mocker):
140140
users_policy.matches.side_effect = HttpRequestMatcher(
141141
url="http://domain/api/users", method="GET"
142142
)
143+
users_policy.get_cost.return_value = 1
143144
groups_policy.matches.side_effect = HttpRequestMatcher(
144145
url="http://domain/api/groups", method="POST"
145146
)
147+
groups_policy.get_cost.return_value = 1
146148
root_policy.matches.side_effect = HttpRequestMatcher(method="GET")
149+
root_policy.get_cost.return_value = 1
147150
api_budget = APIBudget(
148151
policies=[
149152
users_policy,
@@ -360,6 +363,98 @@ def test_with_cache(self, mocker, requests_mock):
360363
assert MovingWindowCallRatePolicy.try_acquire.call_count == 1
361364

362365

366+
class TestCostBasedRateLimiting:
367+
"""Tests for cost-based rate limiting where different endpoints consume different amounts from a shared budget."""
368+
369+
def test_matcher_cost_default_none(self):
370+
"""HttpRequestRegexMatcher cost defaults to None when not specified."""
371+
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test")
372+
assert matcher.cost is None
373+
374+
def test_matcher_cost_is_stored(self):
375+
"""HttpRequestRegexMatcher stores the cost value when provided."""
376+
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", cost=60)
377+
assert matcher.cost == 60
378+
379+
def test_policy_get_cost_returns_matcher_cost(self):
380+
"""BaseCallRatePolicy.get_cost returns cost from the matching matcher."""
381+
policy = MovingWindowCallRatePolicy(
382+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=120)],
383+
rates=[Rate(1000, timedelta(hours=1))],
384+
)
385+
req = Request("GET", "https://example.com/api/expensive")
386+
assert policy.get_cost(req) == 120
387+
388+
def test_policy_get_cost_defaults_to_1(self):
389+
"""BaseCallRatePolicy.get_cost returns 1 when no matcher has a cost set."""
390+
policy = MovingWindowCallRatePolicy(
391+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/default")],
392+
rates=[Rate(1000, timedelta(hours=1))],
393+
)
394+
req = Request("GET", "https://example.com/api/default")
395+
assert policy.get_cost(req) == 1
396+
397+
def test_policy_get_cost_no_matching_matcher(self):
398+
"""BaseCallRatePolicy.get_cost returns 1 when no matcher matches the request."""
399+
policy = MovingWindowCallRatePolicy(
400+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", cost=50)],
401+
rates=[Rate(1000, timedelta(hours=1))],
402+
)
403+
req = Request("GET", "https://example.com/api/unmatched")
404+
assert policy.get_cost(req) == 1
405+
406+
def test_api_budget_uses_cost_as_weight(self):
407+
"""APIBudget._do_acquire passes the matcher's cost as weight to try_acquire."""
408+
policy = MovingWindowCallRatePolicy(
409+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", cost=10)],
410+
rates=[Rate(100, timedelta(hours=1))],
411+
)
412+
budget = APIBudget(policies=[policy])
413+
414+
# Make requests — each costs 10 from the budget of 100
415+
for i in range(10):
416+
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)
417+
418+
# The 11th request should exceed the budget (10 * 10 = 100, one more = 110 > 100)
419+
with pytest.raises(CallRateLimitHit):
420+
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)
421+
422+
def test_cost_1_backward_compatible(self):
423+
"""When cost is not set, behavior is identical to the old hardcoded weight=1."""
424+
policy = MovingWindowCallRatePolicy(
425+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/normal")],
426+
rates=[Rate(5, timedelta(hours=1))],
427+
)
428+
budget = APIBudget(policies=[policy])
429+
430+
for i in range(5):
431+
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)
432+
433+
with pytest.raises(CallRateLimitHit):
434+
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)
435+
436+
def test_shared_budget_different_costs(self):
437+
"""Multiple matchers with different costs sharing one policy correctly consume the shared budget."""
438+
# Shared policy matches both endpoints via regex
439+
policy = MovingWindowCallRatePolicy(
440+
matchers=[
441+
HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", cost=1),
442+
HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", cost=10),
443+
],
444+
rates=[Rate(20, timedelta(hours=1))],
445+
)
446+
budget = APIBudget(policies=[policy])
447+
448+
# Make 1 expensive request (costs 10) and 10 cheap requests (cost 1 each) = total 20
449+
budget.acquire_call(Request("GET", "https://example.com/api/expensive"), block=False)
450+
for i in range(10):
451+
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)
452+
453+
# Budget is now at 20/20 — any further request should fail
454+
with pytest.raises(CallRateLimitHit):
455+
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)
456+
457+
363458
class TestHttpRequestRegexMatcher:
364459
"""
365460
Tests for the new regex-based logic:

0 commit comments

Comments
 (0)