Skip to content

Commit b856e7c

Browse files
authored
Merge branch 'main' into devin/1774478445-memory-failfast
2 parents c819363 + 69cd63d commit b856e7c

13 files changed

Lines changed: 1746 additions & 17 deletions

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 218 additions & 14 deletions
Large diffs are not rendered by default.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,19 @@
7676
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
7777
ModelToComponentFactory,
7878
)
79+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
80+
GroupingPartitionRouter,
81+
)
82+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
83+
SubstreamPartitionRouter,
84+
)
7985
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
8086
from airbyte_cdk.sources.declarative.spec.spec import Spec
8187
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
8288
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
8389
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
8490
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
91+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
8592
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
8693
from airbyte_cdk.sources.utils.slice_logger import (
8794
AlwaysLogSliceLogger,
@@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
405412
if api_budget_model:
406413
self._constructor.set_api_budget(api_budget_model, self._config)
407414

415+
prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
416+
408417
source_streams = [
409418
self._constructor.create_component(
410419
(
@@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
416425
self._config,
417426
emit_connector_builder_messages=self._emit_connector_builder_messages,
418427
)
419-
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
428+
for stream_config in prepared_configs
420429
]
430+
431+
self._apply_stream_groups(source_streams)
432+
421433
return source_streams
422434

435+
def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
436+
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.
437+
438+
Iterates over the resolved manifest's stream_groups and matches group membership
439+
against actual created stream instances by name. Validates that no stream shares a
440+
group with any of its parent streams, which would cause a deadlock.
441+
"""
442+
stream_groups = self._source_config.get("stream_groups", {})
443+
if not stream_groups:
444+
return
445+
446+
# Build stream_name -> group_name mapping from the resolved manifest
447+
stream_name_to_group: Dict[str, str] = {}
448+
for group_name, group_config in stream_groups.items():
449+
for stream_ref in group_config.get("streams", []):
450+
if isinstance(stream_ref, dict):
451+
stream_name = stream_ref.get("name", "")
452+
if stream_name:
453+
stream_name_to_group[stream_name] = group_name
454+
455+
# Validate no stream shares a group with any of its ancestor streams
456+
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
457+
458+
def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
459+
"""Recursively collect all ancestor stream names."""
460+
ancestors: Set[str] = set()
461+
inst = stream_name_to_instance.get(stream_name)
462+
if not isinstance(inst, DefaultStream):
463+
return ancestors
464+
router = inst.get_partition_router()
465+
if isinstance(router, GroupingPartitionRouter):
466+
router = router.underlying_partition_router
467+
if not isinstance(router, SubstreamPartitionRouter):
468+
return ancestors
469+
for parent_config in router.parent_stream_configs:
470+
parent_name = parent_config.stream.name
471+
ancestors.add(parent_name)
472+
ancestors.update(_collect_all_ancestor_names(parent_name))
473+
return ancestors
474+
475+
for stream in streams:
476+
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
477+
continue
478+
group_name = stream_name_to_group[stream.name]
479+
for ancestor_name in _collect_all_ancestor_names(stream.name):
480+
if stream_name_to_group.get(ancestor_name) == group_name:
481+
raise ValueError(
482+
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
483+
f"are both in group '{group_name}'. "
484+
f"A child stream must not share a group with its parent to avoid deadlock."
485+
)
486+
487+
# Apply group to matching stream instances
488+
for stream in streams:
489+
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
490+
stream.block_simultaneous_read = stream_name_to_group[stream.name]
491+
423492
@staticmethod
424493
def _initialize_cache_for_parent_streams(
425494
stream_configs: List[Dict[str, Any]],

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ properties:
4545
"$ref": "#/definitions/ConcurrencyLevel"
4646
api_budget:
4747
"$ref": "#/definitions/HTTPAPIBudget"
48+
stream_groups:
49+
title: Stream Groups
50+
description: >
51+
Groups of streams that share a common resource and should not be read simultaneously.
52+
Each group defines a set of stream references and an action that controls how concurrent
53+
reads are managed. Only applies to ConcurrentDeclarativeSource.
54+
type: object
55+
additionalProperties:
56+
"$ref": "#/definitions/StreamGroup"
4857
max_concurrent_async_job_count:
4958
title: Maximum Concurrent Asynchronous Jobs
5059
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
@@ -1832,6 +1841,15 @@ definitions:
18321841
description: The headers to match.
18331842
type: object
18341843
additionalProperties: true
1844+
weight:
1845+
title: Weight
1846+
description: >
1847+
The weight of a request matching this matcher when acquiring a call from the rate limiter.
1848+
Different endpoints can consume different amounts from a shared budget by specifying
1849+
different weights. If not set, each request counts as 1.
1850+
anyOf:
1851+
- type: integer
1852+
- type: string
18351853
additionalProperties: true
18361854
DefaultErrorHandler:
18371855
title: Default Error Handler
@@ -4191,6 +4209,43 @@ definitions:
41914209
- "$ref": "#/definitions/ConfigRemoveFields"
41924210
- "$ref": "#/definitions/CustomConfigTransformation"
41934211
default: []
4212+
StreamGroup:
4213+
title: Stream Group
4214+
description: >
4215+
A group of streams that share a common resource and should not be read simultaneously.
4216+
Streams in the same group will be blocked from concurrent reads based on the specified action.
4217+
type: object
4218+
required:
4219+
- streams
4220+
- action
4221+
properties:
4222+
streams:
4223+
title: Streams
4224+
description: >
4225+
List of references to streams that belong to this group.
4226+
type: array
4227+
items:
4228+
anyOf:
4229+
- "$ref": "#/definitions/DeclarativeStream"
4230+
action:
4231+
title: Action
4232+
description: The action to apply to streams in this group.
4233+
"$ref": "#/definitions/BlockSimultaneousSyncsAction"
4234+
BlockSimultaneousSyncsAction:
4235+
title: Block Simultaneous Syncs Action
4236+
description: >
4237+
Action that prevents streams in the same group from being read concurrently.
4238+
When applied to a stream group, streams with this action will be deferred if
4239+
another stream in the same group is currently active.
4240+
This is useful for APIs that don't allow concurrent access to the same
4241+
endpoint or session. Only applies to ConcurrentDeclarativeSource.
4242+
type: object
4243+
required:
4244+
- type
4245+
properties:
4246+
type:
4247+
type: string
4248+
enum: [BlockSimultaneousSyncsAction]
41944249
SubstreamPartitionRouter:
41954250
title: Substream Partition Router
41964251
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ class Config:
486486
headers: Optional[Dict[str, Any]] = Field(
487487
None, description="The headers to match.", title="Headers"
488488
)
489+
weight: Optional[Union[int, str]] = Field(
490+
None,
491+
description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.",
492+
title="Weight",
493+
)
489494

490495

491496
class DpathExtractor(BaseModel):
@@ -2394,6 +2399,11 @@ class Config:
23942399
spec: Optional[Spec] = None
23952400
concurrency_level: Optional[ConcurrencyLevel] = None
23962401
api_budget: Optional[HTTPAPIBudget] = None
2402+
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
2403+
None,
2404+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2405+
title="Stream Groups",
2406+
)
23972407
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
23982408
None,
23992409
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
@@ -2429,6 +2439,11 @@ class Config:
24292439
spec: Optional[Spec] = None
24302440
concurrency_level: Optional[ConcurrencyLevel] = None
24312441
api_budget: Optional[HTTPAPIBudget] = None
2442+
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
2443+
None,
2444+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2445+
title="Stream Groups",
2446+
)
24322447
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
24332448
None,
24342449
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
@@ -3096,6 +3111,23 @@ class AsyncRetriever(BaseModel):
30963111
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
30973112

30983113

3114+
class BlockSimultaneousSyncsAction(BaseModel):
3115+
type: Literal["BlockSimultaneousSyncsAction"]
3116+
3117+
3118+
class StreamGroup(BaseModel):
3119+
streams: List[str] = Field(
3120+
...,
3121+
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
3122+
title="Streams",
3123+
)
3124+
action: BlockSimultaneousSyncsAction = Field(
3125+
...,
3126+
description="The action to apply to streams in this group.",
3127+
title="Action",
3128+
)
3129+
3130+
30993131
class SubstreamPartitionRouter(BaseModel):
31003132
type: Literal["SubstreamPartitionRouter"]
31013133
parent_stream_configs: List[ParentStreamConfig] = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4387,12 +4387,21 @@ def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate:
43874387
def create_http_request_matcher(
43884388
self, model: HttpRequestRegexMatcherModel, config: Config, **kwargs: Any
43894389
) -> HttpRequestRegexMatcher:
4390+
weight = model.weight
4391+
if weight is not None:
4392+
if isinstance(weight, str):
4393+
weight = int(InterpolatedString.create(weight, parameters={}).eval(config))
4394+
else:
4395+
weight = int(weight)
4396+
if weight < 1:
4397+
raise ValueError(f"weight must be >= 1, got {weight}")
43904398
return HttpRequestRegexMatcher(
43914399
method=model.method,
43924400
url_base=model.url_base,
43934401
url_path_pattern=model.url_path_pattern,
43944402
params=model.params,
43954403
headers=model.headers,
4404+
weight=weight,
43964405
)
43974406

43984407
def set_api_budget(self, component_definition: ComponentDefinition, config: Config) -> None:

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,22 @@ def __init__(
166166
url_path_pattern: Optional[str] = None,
167167
params: Optional[Mapping[str, Any]] = None,
168168
headers: Optional[Mapping[str, Any]] = None,
169+
weight: Optional[int] = None,
169170
):
170171
"""
171172
:param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
172173
:param url_base: Base URL (scheme://host) that must match.
173174
:param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
174175
:param params: Dictionary of query parameters that must be present in the request.
175176
:param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177+
:param weight: The weight of a request matching this matcher. If set, this value is used
178+
when acquiring a call from the rate limiter, enabling cost-based rate limiting
179+
where different endpoints consume different amounts from a shared budget.
180+
If not set, each request counts as 1.
176181
"""
182+
if weight is not None and weight < 1:
183+
raise ValueError(f"weight must be >= 1, got {weight}")
184+
self._weight = weight
177185
self._method = method.upper() if method else None
178186

179187
# Normalize the url_base if provided: remove trailing slash.
@@ -242,11 +250,16 @@ def __call__(self, request: Any) -> bool:
242250

243251
return True
244252

253+
@property
254+
def weight(self) -> Optional[int]:
255+
"""The weight of a request matching this matcher, or None if not set."""
256+
return self._weight
257+
245258
def __str__(self) -> str:
246259
regex = self._url_path_pattern.pattern if self._url_path_pattern else None
247260
return (
248261
f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
249-
f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
262+
f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})"
250263
)
251264

252265

@@ -265,6 +278,22 @@ def matches(self, request: Any) -> bool:
265278
return True
266279
return any(matcher(request) for matcher in self._matchers)
267280

281+
def get_weight(self, request: Any) -> int:
282+
"""Get the weight for a request based on the first matching matcher.
283+
284+
If a matcher has a weight configured, that weight is used.
285+
Otherwise, defaults to 1.
286+
287+
:param request: a request object
288+
:return: the weight for this request
289+
"""
290+
for matcher in self._matchers:
291+
if matcher(request):
292+
if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None:
293+
return matcher.weight
294+
return 1
295+
return 1
296+
268297

269298
class UnlimitedCallRatePolicy(BaseCallRatePolicy):
270299
"""
@@ -420,6 +449,11 @@ def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
420449
def try_acquire(self, request: Any, weight: int) -> None:
421450
if not self.matches(request):
422451
raise ValueError("Request does not match the policy")
452+
lowest_limit = min(rate.limit for rate in self._bucket.rates)
453+
if weight > lowest_limit:
454+
raise ValueError(
455+
f"Weight can not exceed the lowest configured rate limit ({lowest_limit})"
456+
)
423457

424458
try:
425459
self._limiter.try_acquire(request, weight=weight)
@@ -596,7 +630,8 @@ def _do_acquire(
596630
# sometimes we spend all budget before a second attempt, so we have a few more attempts
597631
for attempt in range(1, self._maximum_attempts_to_acquire):
598632
try:
599-
policy.try_acquire(request, weight=1)
633+
weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1
634+
policy.try_acquire(request, weight=weight)
600635
return
601636
except CallRateLimitHit as exc:
602637
last_exception = exc

airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,21 @@ def cursor(self) -> Cursor:
8585
:return: The cursor associated with this stream.
8686
"""
8787

88+
@property
89+
def block_simultaneous_read(self) -> str:
90+
"""
91+
Override to return a non-empty group name if this stream should block simultaneous reads.
92+
When a non-empty string is returned, prevents starting partition generation for this stream if:
93+
- Another stream with the same group name is already active
94+
- Any of its parent streams are in an active group
95+
96+
This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
97+
to prevent them from running concurrently, even if they don't have a parent-child relationship.
98+
99+
:return: Group name for blocking (non-empty string), or "" to allow concurrent reading
100+
"""
101+
return "" # Default: allow concurrent reading
102+
88103
@abstractmethod
89104
def check_availability(self) -> StreamAvailability:
90105
"""

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]:
196196
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
197197
return self._cursor
198198

199+
@property
200+
def block_simultaneous_read(self) -> str:
201+
"""Returns the blocking group name from the underlying stream"""
202+
return self._abstract_stream.block_simultaneous_read
203+
199204
# FIXME the lru_cache seems to be mostly there because of typing issue
200205
@lru_cache(maxsize=None)
201206
def get_json_schema(self) -> Mapping[str, Any]:

0 commit comments

Comments
 (0)