Skip to content

Commit 69cd63d

Browse files
tolik0octavia-squidington-iiidevin-ai-integration[bot]
authored
feat: Add block_simultaneous_read with top-level stream_groups interface (#870)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: alfredo.garcia@airbyte.io
1 parent 007066b commit 69cd63d

File tree

10 files changed

+1569
-15
lines changed

10 files changed

+1569
-15
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 218 additions & 14 deletions
Large diffs are not rendered by default.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,19 @@
7676
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
7777
ModelToComponentFactory,
7878
)
79+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
80+
GroupingPartitionRouter,
81+
)
82+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
83+
SubstreamPartitionRouter,
84+
)
7985
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
8086
from airbyte_cdk.sources.declarative.spec.spec import Spec
8187
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
8288
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
8389
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
8490
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
91+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
8592
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
8693
from airbyte_cdk.sources.utils.slice_logger import (
8794
AlwaysLogSliceLogger,
@@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
405412
if api_budget_model:
406413
self._constructor.set_api_budget(api_budget_model, self._config)
407414

415+
prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
416+
408417
source_streams = [
409418
self._constructor.create_component(
410419
(
@@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
416425
self._config,
417426
emit_connector_builder_messages=self._emit_connector_builder_messages,
418427
)
419-
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
428+
for stream_config in prepared_configs
420429
]
430+
431+
self._apply_stream_groups(source_streams)
432+
421433
return source_streams
422434

435+
def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
436+
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.
437+
438+
Iterates over the resolved manifest's stream_groups and matches group membership
439+
against actual created stream instances by name. Validates that no stream shares a
440+
group with any of its parent streams, which would cause a deadlock.
441+
"""
442+
stream_groups = self._source_config.get("stream_groups", {})
443+
if not stream_groups:
444+
return
445+
446+
# Build stream_name -> group_name mapping from the resolved manifest
447+
stream_name_to_group: Dict[str, str] = {}
448+
for group_name, group_config in stream_groups.items():
449+
for stream_ref in group_config.get("streams", []):
450+
if isinstance(stream_ref, dict):
451+
stream_name = stream_ref.get("name", "")
452+
if stream_name:
453+
stream_name_to_group[stream_name] = group_name
454+
455+
# Validate no stream shares a group with any of its ancestor streams
456+
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
457+
458+
def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
459+
"""Recursively collect all ancestor stream names."""
460+
ancestors: Set[str] = set()
461+
inst = stream_name_to_instance.get(stream_name)
462+
if not isinstance(inst, DefaultStream):
463+
return ancestors
464+
router = inst.get_partition_router()
465+
if isinstance(router, GroupingPartitionRouter):
466+
router = router.underlying_partition_router
467+
if not isinstance(router, SubstreamPartitionRouter):
468+
return ancestors
469+
for parent_config in router.parent_stream_configs:
470+
parent_name = parent_config.stream.name
471+
ancestors.add(parent_name)
472+
ancestors.update(_collect_all_ancestor_names(parent_name))
473+
return ancestors
474+
475+
for stream in streams:
476+
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
477+
continue
478+
group_name = stream_name_to_group[stream.name]
479+
for ancestor_name in _collect_all_ancestor_names(stream.name):
480+
if stream_name_to_group.get(ancestor_name) == group_name:
481+
raise ValueError(
482+
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
483+
f"are both in group '{group_name}'. "
484+
f"A child stream must not share a group with its parent to avoid deadlock."
485+
)
486+
487+
# Apply group to matching stream instances
488+
for stream in streams:
489+
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
490+
stream.block_simultaneous_read = stream_name_to_group[stream.name]
491+
423492
@staticmethod
424493
def _initialize_cache_for_parent_streams(
425494
stream_configs: List[Dict[str, Any]],

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ properties:
4545
"$ref": "#/definitions/ConcurrencyLevel"
4646
api_budget:
4747
"$ref": "#/definitions/HTTPAPIBudget"
48+
stream_groups:
49+
title: Stream Groups
50+
description: >
51+
Groups of streams that share a common resource and should not be read simultaneously.
52+
Each group defines a set of stream references and an action that controls how concurrent
53+
reads are managed. Only applies to ConcurrentDeclarativeSource.
54+
type: object
55+
additionalProperties:
56+
"$ref": "#/definitions/StreamGroup"
4857
max_concurrent_async_job_count:
4958
title: Maximum Concurrent Asynchronous Jobs
5059
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
@@ -4200,6 +4209,43 @@ definitions:
42004209
- "$ref": "#/definitions/ConfigRemoveFields"
42014210
- "$ref": "#/definitions/CustomConfigTransformation"
42024211
default: []
4212+
StreamGroup:
4213+
title: Stream Group
4214+
description: >
4215+
A group of streams that share a common resource and should not be read simultaneously.
4216+
Streams in the same group will be blocked from concurrent reads based on the specified action.
4217+
type: object
4218+
required:
4219+
- streams
4220+
- action
4221+
properties:
4222+
streams:
4223+
title: Streams
4224+
description: >
4225+
List of references to streams that belong to this group.
4226+
type: array
4227+
items:
4228+
anyOf:
4229+
- "$ref": "#/definitions/DeclarativeStream"
4230+
action:
4231+
title: Action
4232+
description: The action to apply to streams in this group.
4233+
"$ref": "#/definitions/BlockSimultaneousSyncsAction"
4234+
BlockSimultaneousSyncsAction:
4235+
title: Block Simultaneous Syncs Action
4236+
description: >
4237+
Action that prevents streams in the same group from being read concurrently.
4238+
When applied to a stream group, streams with this action will be deferred if
4239+
another stream in the same group is currently active.
4240+
This is useful for APIs that don't allow concurrent access to the same
4241+
endpoint or session. Only applies to ConcurrentDeclarativeSource.
4242+
type: object
4243+
required:
4244+
- type
4245+
properties:
4246+
type:
4247+
type: string
4248+
enum: [BlockSimultaneousSyncsAction]
42034249
SubstreamPartitionRouter:
42044250
title: Substream Partition Router
42054251
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,6 +2399,11 @@ class Config:
23992399
spec: Optional[Spec] = None
24002400
concurrency_level: Optional[ConcurrencyLevel] = None
24012401
api_budget: Optional[HTTPAPIBudget] = None
2402+
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
2403+
None,
2404+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2405+
title="Stream Groups",
2406+
)
24022407
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
24032408
None,
24042409
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
@@ -2434,6 +2439,11 @@ class Config:
24342439
spec: Optional[Spec] = None
24352440
concurrency_level: Optional[ConcurrencyLevel] = None
24362441
api_budget: Optional[HTTPAPIBudget] = None
2442+
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
2443+
None,
2444+
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
2445+
title="Stream Groups",
2446+
)
24372447
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
24382448
None,
24392449
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
@@ -3101,6 +3111,23 @@ class AsyncRetriever(BaseModel):
31013111
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
31023112

31033113

3114+
class BlockSimultaneousSyncsAction(BaseModel):
3115+
type: Literal["BlockSimultaneousSyncsAction"]
3116+
3117+
3118+
class StreamGroup(BaseModel):
3119+
streams: List[str] = Field(
3120+
...,
3121+
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
3122+
title="Streams",
3123+
)
3124+
action: BlockSimultaneousSyncsAction = Field(
3125+
...,
3126+
description="The action to apply to streams in this group.",
3127+
title="Action",
3128+
)
3129+
3130+
31043131
class SubstreamPartitionRouter(BaseModel):
31053132
type: Literal["SubstreamPartitionRouter"]
31063133
parent_stream_configs: List[ParentStreamConfig] = Field(

airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,21 @@ def cursor(self) -> Cursor:
8585
:return: The cursor associated with this stream.
8686
"""
8787

88+
@property
89+
def block_simultaneous_read(self) -> str:
90+
"""
91+
Override to return a non-empty group name if this stream should block simultaneous reads.
92+
When a non-empty string is returned, prevents starting partition generation for this stream if:
93+
- Another stream with the same group name is already active
94+
- Any of its parent streams are in an active group
95+
96+
This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
97+
to prevent them from running concurrently, even if they don't have a parent-child relationship.
98+
99+
:return: Group name for blocking (non-empty string), or "" to allow concurrent reading
100+
"""
101+
return "" # Default: allow concurrent reading
102+
88103
@abstractmethod
89104
def check_availability(self) -> StreamAvailability:
90105
"""

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]:
196196
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
197197
return self._cursor
198198

199+
@property
200+
def block_simultaneous_read(self) -> str:
201+
"""Returns the blocking group name from the underlying stream"""
202+
return self._abstract_stream.block_simultaneous_read
203+
199204
# FIXME the lru_cache seems to be mostly there because of typing issue
200205
@lru_cache(maxsize=None)
201206
def get_json_schema(self) -> Mapping[str, Any]:

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
77

88
from airbyte_cdk.models import AirbyteStream, SyncMode
9+
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
10+
ConcurrentPerPartitionCursor,
11+
)
12+
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
13+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
14+
StreamSlicerPartitionGenerator,
15+
)
916
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1017
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
1118
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
@@ -26,6 +33,7 @@ def __init__(
2633
cursor: Cursor,
2734
namespace: Optional[str] = None,
2835
supports_file_transfer: bool = False,
36+
block_simultaneous_read: str = "",
2937
) -> None:
3038
self._stream_partition_generator = partition_generator
3139
self._name = name
@@ -36,6 +44,7 @@ def __init__(
3644
self._cursor = cursor
3745
self._namespace = namespace
3846
self._supports_file_transfer = supports_file_transfer
47+
self._block_simultaneous_read = block_simultaneous_read
3948

4049
def generate_partitions(self) -> Iterable[Partition]:
4150
yield from self._stream_partition_generator.generate()
@@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None:
94103
def cursor(self) -> Cursor:
95104
return self._cursor
96105

106+
@property
107+
def block_simultaneous_read(self) -> str:
108+
"""Returns the blocking group name for this stream, or empty string if no blocking"""
109+
return self._block_simultaneous_read
110+
111+
@block_simultaneous_read.setter
112+
def block_simultaneous_read(self, value: str) -> None:
113+
self._block_simultaneous_read = value
114+
115+
def get_partition_router(self) -> PartitionRouter | None:
116+
"""Return the partition router for this stream, or None if not available."""
117+
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
118+
return None
119+
stream_slicer = self._stream_partition_generator._stream_slicer
120+
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
121+
return None
122+
return stream_slicer._partition_router
123+
97124
def check_availability(self) -> StreamAvailability:
98125
"""
99126
Check stream availability by attempting to read the first record of the stream.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5281,6 +5281,62 @@ def test_catalog_defined_cursor_field_stream_missing():
52815281
assert stream._cursor_field.supports_catalog_defined_cursor_field == True
52825282

52835283

5284+
def test_block_simultaneous_read_from_stream_groups():
5285+
"""Test that factory-created streams default to empty block_simultaneous_read.
5286+
5287+
The factory no longer handles stream_groups — that's done by
5288+
ConcurrentDeclarativeSource._apply_stream_groups after stream creation.
5289+
This test verifies the factory creates streams without group info.
5290+
"""
5291+
content = """
5292+
definitions:
5293+
parent_stream:
5294+
type: DeclarativeStream
5295+
name: "parent"
5296+
primary_key: "id"
5297+
retriever:
5298+
type: SimpleRetriever
5299+
requester:
5300+
type: HttpRequester
5301+
url_base: "https://api.example.com"
5302+
path: "/parent"
5303+
http_method: "GET"
5304+
authenticator:
5305+
type: BearerAuthenticator
5306+
api_token: "{{ config['api_key'] }}"
5307+
record_selector:
5308+
type: RecordSelector
5309+
extractor:
5310+
type: DpathExtractor
5311+
field_path: []
5312+
schema_loader:
5313+
type: InlineSchemaLoader
5314+
schema:
5315+
type: object
5316+
properties:
5317+
id:
5318+
type: string
5319+
"""
5320+
5321+
config = {"api_key": "test_key"}
5322+
5323+
parsed_manifest = YamlDeclarativeSource._parse(content)
5324+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
5325+
5326+
factory = ModelToComponentFactory()
5327+
5328+
parent_manifest = transformer.propagate_types_and_parameters(
5329+
"", resolved_manifest["definitions"]["parent_stream"], {}
5330+
)
5331+
parent_stream: DefaultStream = factory.create_component(
5332+
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
5333+
)
5334+
5335+
assert isinstance(parent_stream, DefaultStream)
5336+
assert parent_stream.name == "parent"
5337+
assert parent_stream.block_simultaneous_read == ""
5338+
5339+
52845340
def get_schema_loader(stream: DefaultStream):
52855341
assert isinstance(
52865342
stream._stream_partition_generator._partition_factory._schema_loader,

0 commit comments

Comments
 (0)