Skip to content

Commit 30fe861

Browse files
authored
Merge branch 'main' into devin/1773523110-fix-serialization-fallback-complex-types
2 parents e3973ed + 007066b commit 30fe861

5 files changed

Lines changed: 177 additions & 2 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1832,6 +1832,15 @@ definitions:
18321832
description: The headers to match.
18331833
type: object
18341834
additionalProperties: true
1835+
weight:
1836+
title: Weight
1837+
description: >
1838+
The weight of a request matching this matcher when acquiring a call from the rate limiter.
1839+
Different endpoints can consume different amounts from a shared budget by specifying
1840+
different weights. If not set, each request counts as 1.
1841+
anyOf:
1842+
- type: integer
1843+
- type: string
18351844
additionalProperties: true
18361845
DefaultErrorHandler:
18371846
title: Default Error Handler

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ class Config:
486486
headers: Optional[Dict[str, Any]] = Field(
487487
None, description="The headers to match.", title="Headers"
488488
)
489+
weight: Optional[Union[int, str]] = Field(
490+
None,
491+
description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.",
492+
title="Weight",
493+
)
489494

490495

491496
class DpathExtractor(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4387,12 +4387,21 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate:
43874387
def create_http_request_matcher(
43884388
self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any
43894389
) -> HttpRequestRegexMatcher:
4390+
weight = model.weight
4391+
if weight is not None:
4392+
if isinstance(weight, str):
4393+
weight = int(InterpolatedString.create(weight, parameters={}).eval(config))
4394+
else:
4395+
weight = int(weight)
4396+
if weight < 1:
4397+
raise ValueError(f"weight must be >= 1, got {weight}")
43904398
return HttpRequestRegexMatcher(
43914399
method=model.method,
43924400
url_base=model.url_base,
43934401
url_path_pattern=model.url_path_pattern,
43944402
params=model.params,
43954403
headers=model.headers,
4404+
weight=weight,
43964405
)
43974406

43984407
def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None:

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,22 @@ def __init__(
166166
url_path_pattern: Optional[str] = None,
167167
params: Optional[Mapping[str, Any]] = None,
168168
headers: Optional[Mapping[str, Any]] = None,
169+
weight: Optional[int] = None,
169170
):
170171
"""
171172
:param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
172173
:param url_base: Base URL (scheme://host) that must match.
173174
:param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
174175
:param params: Dictionary of query parameters that must be present in the request.
175176
:param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177+
:param weight: The weight of a request matching this matcher. If set, this value is used
178+
when acquiring a call from the rate limiter, enabling cost-based rate limiting
179+
where different endpoints consume different amounts from a shared budget.
180+
If not set, each request counts as 1.
176181
"""
182+
if weight is not None and weight < 1:
183+
raise ValueError(f"weight must be >= 1, got {weight}")
184+
self._weight = weight
177185
self._method = method.upper() if method else None
178186

179187
# Normalize the url_base if provided: remove trailing slash.
@@ -242,11 +250,16 @@ def __call__(self, request: Any) -> bool:
242250

243251
return True
244252

253+
@property
254+
def weight(self) -> Optional[int]:
255+
"""The weight of a request matching this matcher, or None if not set."""
256+
return self._weight
257+
245258
def __str__(self) -> str:
246259
regex = self._url_path_pattern.pattern if self._url_path_pattern else None
247260
return (
248261
f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
249-
f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
262+
f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})"
250263
)
251264

252265

@@ -265,6 +278,22 @@ def matches(self, request: Any) -> bool:
265278
return True
266279
return any(matcher(request) for matcher in self._matchers)
267280

281+
def get_weight(self, request: Any) -> int:
282+
"""Get the weight for a request based on the first matching matcher.
283+
284+
If a matcher has a weight configured, that weight is used.
285+
Otherwise, defaults to 1.
286+
287+
:param request: a request object
288+
:return: the weight for this request
289+
"""
290+
for matcher in self._matchers:
291+
if matcher(request):
292+
if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None:
293+
return matcher.weight
294+
return 1
295+
return 1
296+
268297

269298
class UnlimitedCallRatePolicy(BaseCallRatePolicy):
270299
"""
@@ -420,6 +449,11 @@ def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
420449
def try_acquire(self, request: Any, weight: int) -> None:
421450
if not self.matches(request):
422451
raise ValueError("Request does not match the policy")
452+
lowest_limit = min(rate.limit for rate in self._bucket.rates)
453+
if weight > lowest_limit:
454+
raise ValueError(
455+
f"Weight can not exceed the lowest configured rate limit ({lowest_limit})"
456+
)
423457

424458
try:
425459
self._limiter.try_acquire(request, weight=weight)
@@ -596,7 +630,8 @@ def _do_acquire(
596630
# sometimes we spend all budget before a second attempt, so we have a few more attempts
597631
for attempt in range(1, self._maximum_attempts_to_acquire):
598632
try:
599-
policy.try_acquire(request, weight=1)
633+
weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1
634+
policy.try_acquire(request, weight=weight)
600635
return
601636
except CallRateLimitHit as exc:
602637
last_exception = exc

unit_tests/sources/streams/test_call_rate.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,13 @@ def test_http_request_matching(mocker):
140140
users_policy.matches.side_effect = HttpRequestMatcher(
141141
url="http://domain/api/users", method="GET"
142142
)
143+
users_policy.get_weight.return_value = 1
143144
groups_policy.matches.side_effect = HttpRequestMatcher(
144145
url="http://domain/api/groups", method="POST"
145146
)
147+
groups_policy.get_weight.return_value = 1
146148
root_policy.matches.side_effect = HttpRequestMatcher(method="GET")
149+
root_policy.get_weight.return_value = 1
147150
api_budget = APIBudget(
148151
policies=[
149152
users_policy,
@@ -360,6 +363,120 @@ def test_with_cache(self, mocker, requests_mock):
360363
assert MovingWindowCallRatePolicy.try_acquire.call_count == 1
361364

362365

366+
class TestWeightBasedRateLimiting:
367+
"""Tests for weight-based rate limiting where different endpoints consume different amounts from a shared budget."""
368+
369+
def test_matcher_weight_default_none(self):
370+
"""HttpRequestRegexMatcher weight defaults to None when not specified."""
371+
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test")
372+
assert matcher.weight is None
373+
374+
def test_matcher_weight_is_stored(self):
375+
"""HttpRequestRegexMatcher stores the weight value when provided."""
376+
matcher = HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=60)
377+
assert matcher.weight == 60
378+
379+
def test_matcher_rejects_zero_weight(self):
380+
"""HttpRequestRegexMatcher raises ValueError for weight=0."""
381+
with pytest.raises(ValueError, match="weight must be >= 1"):
382+
HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=0)
383+
384+
def test_matcher_rejects_negative_weight(self):
385+
"""HttpRequestRegexMatcher raises ValueError for negative weight."""
386+
with pytest.raises(ValueError, match="weight must be >= 1"):
387+
HttpRequestRegexMatcher(url_path_pattern=r"/api/test", weight=-5)
388+
389+
def test_policy_get_weight_returns_matcher_weight(self):
390+
"""BaseCallRatePolicy.get_weight returns weight from the matching matcher."""
391+
policy = MovingWindowCallRatePolicy(
392+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", weight=120)],
393+
rates=[Rate(1000, timedelta(hours=1))],
394+
)
395+
req = Request("GET", "https://example.com/api/expensive")
396+
assert policy.get_weight(req) == 120
397+
398+
def test_policy_get_weight_defaults_to_1(self):
399+
"""BaseCallRatePolicy.get_weight returns 1 when no matcher has a weight set."""
400+
policy = MovingWindowCallRatePolicy(
401+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/default")],
402+
rates=[Rate(1000, timedelta(hours=1))],
403+
)
404+
req = Request("GET", "https://example.com/api/default")
405+
assert policy.get_weight(req) == 1
406+
407+
def test_policy_get_weight_no_matching_matcher(self):
408+
"""BaseCallRatePolicy.get_weight returns 1 when no matcher matches the request."""
409+
policy = MovingWindowCallRatePolicy(
410+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/other", weight=50)],
411+
rates=[Rate(1000, timedelta(hours=1))],
412+
)
413+
req = Request("GET", "https://example.com/api/unmatched")
414+
assert policy.get_weight(req) == 1
415+
416+
def test_api_budget_uses_weight(self):
417+
"""APIBudget._do_acquire passes the matcher's weight to try_acquire."""
418+
policy = MovingWindowCallRatePolicy(
419+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", weight=10)],
420+
rates=[Rate(100, timedelta(hours=1))],
421+
)
422+
budget = APIBudget(policies=[policy])
423+
424+
# Make requests — each weighs 10 from the budget of 100
425+
for i in range(10):
426+
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)
427+
428+
# The 11th request should exceed the budget (10 * 10 = 100, one more = 110 > 100)
429+
with pytest.raises(CallRateLimitHit):
430+
budget.acquire_call(Request("GET", "https://example.com/api/heavy"), block=False)
431+
432+
def test_weight_1_backward_compatible(self):
433+
"""When weight is not set, behavior is identical to the old hardcoded weight=1."""
434+
policy = MovingWindowCallRatePolicy(
435+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/normal")],
436+
rates=[Rate(5, timedelta(hours=1))],
437+
)
438+
budget = APIBudget(policies=[policy])
439+
440+
for i in range(5):
441+
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)
442+
443+
with pytest.raises(CallRateLimitHit):
444+
budget.acquire_call(Request("GET", "https://example.com/api/normal"), block=False)
445+
446+
def test_shared_budget_different_weights(self):
447+
"""Multiple matchers with different weights sharing one policy correctly consume the shared budget."""
448+
# Shared policy matches both endpoints via regex
449+
policy = MovingWindowCallRatePolicy(
450+
matchers=[
451+
HttpRequestRegexMatcher(url_path_pattern=r"/api/cheap", weight=1),
452+
HttpRequestRegexMatcher(url_path_pattern=r"/api/expensive", weight=10),
453+
],
454+
rates=[Rate(20, timedelta(hours=1))],
455+
)
456+
budget = APIBudget(policies=[policy])
457+
458+
# Make 1 expensive request (weight 10) and 10 cheap requests (weight 1 each) = total 20
459+
budget.acquire_call(Request("GET", "https://example.com/api/expensive"), block=False)
460+
for i in range(10):
461+
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)
462+
463+
# Budget is now at 20/20 — any further request should fail
464+
with pytest.raises(CallRateLimitHit):
465+
budget.acquire_call(Request("GET", "https://example.com/api/cheap"), block=False)
466+
467+
def test_moving_window_rejects_weight_exceeding_limit(self):
468+
"""MovingWindowCallRatePolicy raises ValueError when weight exceeds the lowest configured rate limit."""
469+
policy = MovingWindowCallRatePolicy(
470+
matchers=[HttpRequestRegexMatcher(url_path_pattern=r"/api/heavy", weight=50)],
471+
rates=[Rate(10, timedelta(hours=1)), Rate(100, timedelta(days=1))],
472+
)
473+
req = Request("GET", "https://example.com/api/heavy")
474+
with pytest.raises(
475+
ValueError, match="Weight can not exceed the lowest configured rate limit"
476+
):
477+
policy.try_acquire(req, weight=50)
478+
479+
363480
class TestHttpRequestRegexMatcher:
364481
"""
365482
Tests for the new regex-based logic:

0 commit comments

Comments
 (0)