Skip to content

Commit 94c4b82

Browse files
feat: validate no parent-child streams share a group to prevent deadlock
_apply_stream_groups now checks that no stream shares a group with any of its parent streams (via get_partition_router). Raises ValueError at config time if a deadlock-causing configuration is detected. Co-Authored-By: unknown <>
1 parent d09ee9b commit 94c4b82

2 files changed

Lines changed: 139 additions & 1 deletion

File tree

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,8 +430,13 @@ def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
430430
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.
431431
432432
Iterates over the resolved manifest's stream_groups and matches group membership
433-
against actual created stream instances by name.
433+
against actual created stream instances by name. Validates that no stream shares a
434+
group with any of its parent streams, which would cause a deadlock.
434435
"""
436+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
437+
SubstreamPartitionRouter,
438+
)
439+
435440
stream_groups = self._source_config.get("stream_groups", {})
436441
if not stream_groups:
437442
return
@@ -445,6 +450,23 @@ def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
445450
if stream_name:
446451
stream_name_to_group[stream_name] = group_name
447452

453+
# Validate no stream shares a group with its parent streams
454+
for stream in streams:
455+
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
456+
continue
457+
partition_router = stream.get_partition_router()
458+
if not isinstance(partition_router, SubstreamPartitionRouter):
459+
continue
460+
group_name = stream_name_to_group[stream.name]
461+
for parent_config in partition_router.parent_stream_configs:
462+
parent_name = parent_config.stream.name
463+
if stream_name_to_group.get(parent_name) == group_name:
464+
raise ValueError(
465+
f"Stream '{stream.name}' and its parent stream '{parent_name}' "
466+
f"are both in group '{group_name}'. "
467+
f"A child stream must not share a group with its parent to avoid deadlock."
468+
)
469+
448470
# Apply group to matching stream instances
449471
for stream in streams:
450472
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5171,6 +5171,75 @@ def _make_default_stream(name: str) -> DefaultStream:
51715171
)
51725172

51735173

5174+
def _make_child_stream_with_parent(child_name: str, parent_stream: DefaultStream) -> DefaultStream:
5175+
"""Create a DefaultStream that has a SubstreamPartitionRouter pointing to parent_stream."""
5176+
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
5177+
ConcurrentCursorFactory,
5178+
ConcurrentPerPartitionCursor,
5179+
)
5180+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
5181+
ParentStreamConfig,
5182+
SubstreamPartitionRouter,
5183+
)
5184+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
5185+
DeclarativePartitionFactory,
5186+
StreamSlicerPartitionGenerator,
5187+
)
5188+
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
5189+
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
5190+
EpochValueConcurrentStreamStateConverter,
5191+
)
5192+
5193+
partition_router = SubstreamPartitionRouter(
5194+
parent_stream_configs=[
5195+
ParentStreamConfig(
5196+
stream=parent_stream,
5197+
parent_key="id",
5198+
partition_field="parent_id",
5199+
config={},
5200+
parameters={},
5201+
)
5202+
],
5203+
config={},
5204+
parameters={},
5205+
)
5206+
5207+
cursor_factory = ConcurrentCursorFactory(lambda *args, **kwargs: Mock())
5208+
message_repository = InMemoryMessageRepository()
5209+
state_converter = EpochValueConcurrentStreamStateConverter()
5210+
5211+
per_partition_cursor = ConcurrentPerPartitionCursor(
5212+
cursor_factory=cursor_factory,
5213+
partition_router=partition_router,
5214+
stream_name=child_name,
5215+
stream_namespace=None,
5216+
stream_state={},
5217+
message_repository=message_repository,
5218+
connector_state_manager=Mock(),
5219+
connector_state_converter=state_converter,
5220+
cursor_field=Mock(cursor_field_key="updated_at"),
5221+
)
5222+
5223+
partition_factory = Mock(spec=DeclarativePartitionFactory)
5224+
partition_generator = StreamSlicerPartitionGenerator(
5225+
partition_factory=partition_factory,
5226+
stream_slicer=per_partition_cursor,
5227+
)
5228+
5229+
cursor = FinalStateCursor(
5230+
stream_name=child_name, stream_namespace=None, message_repository=message_repository
5231+
)
5232+
return DefaultStream(
5233+
partition_generator=partition_generator,
5234+
name=child_name,
5235+
json_schema={},
5236+
primary_key=[],
5237+
cursor_field=None,
5238+
logger=logging.getLogger(f"test.{child_name}"),
5239+
cursor=cursor,
5240+
)
5241+
5242+
51745243
@pytest.mark.parametrize(
51755244
"source_config,stream_names,expected_groups",
51765245
[
@@ -5235,3 +5304,50 @@ def test_apply_stream_groups(source_config, stream_names, expected_groups):
52355304

52365305
for stream in streams:
52375306
assert stream.block_simultaneous_read == expected_groups[stream.name]
5307+
5308+
5309+
def test_apply_stream_groups_raises_on_parent_child_in_same_group():
5310+
"""Test _apply_stream_groups raises ValueError when a child and its parent are in the same group."""
5311+
parent = _make_default_stream("parent_stream")
5312+
child = _make_child_stream_with_parent("child_stream", parent)
5313+
5314+
source = Mock()
5315+
source._source_config = {
5316+
"stream_groups": {
5317+
"my_group": {
5318+
"streams": [
5319+
{"name": "parent_stream", "type": "DeclarativeStream"},
5320+
{"name": "child_stream", "type": "DeclarativeStream"},
5321+
],
5322+
"action": {"type": "BlockSimultaneousSyncsAction"},
5323+
}
5324+
}
5325+
}
5326+
5327+
with pytest.raises(ValueError, match="child stream must not share a group with its parent"):
5328+
ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child])
5329+
5330+
5331+
def test_apply_stream_groups_allows_parent_child_in_different_groups():
5332+
"""Test _apply_stream_groups allows a child and its parent in different groups."""
5333+
parent = _make_default_stream("parent_stream")
5334+
child = _make_child_stream_with_parent("child_stream", parent)
5335+
5336+
source = Mock()
5337+
source._source_config = {
5338+
"stream_groups": {
5339+
"group_a": {
5340+
"streams": [{"name": "parent_stream", "type": "DeclarativeStream"}],
5341+
"action": {"type": "BlockSimultaneousSyncsAction"},
5342+
},
5343+
"group_b": {
5344+
"streams": [{"name": "child_stream", "type": "DeclarativeStream"}],
5345+
"action": {"type": "BlockSimultaneousSyncsAction"},
5346+
},
5347+
}
5348+
}
5349+
5350+
ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child])
5351+
5352+
assert parent.block_simultaneous_read == "group_a"
5353+
assert child.block_simultaneous_read == "group_b"

0 commit comments

Comments
 (0)