Skip to content

Commit 756a966

Browse files
fix: make deadlock validation check all ancestors, not just direct parents
Co-Authored-By: unknown <>
1 parent 9029049 commit 756a966

File tree

2 files changed

+46
-10
lines changed

2 files changed

+46
-10
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -452,21 +452,34 @@ def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
452452
if stream_name:
453453
stream_name_to_group[stream_name] = group_name
454454

455-
# Validate no stream shares a group with its parent streams
455+
# Validate no stream shares a group with any of its ancestor streams
456+
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
457+
458+
def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
459+
"""Recursively collect all ancestor stream names."""
460+
ancestors: Set[str] = set()
461+
inst = stream_name_to_instance.get(stream_name)
462+
if not isinstance(inst, DefaultStream):
463+
return ancestors
464+
router = inst.get_partition_router()
465+
if isinstance(router, GroupingPartitionRouter):
466+
router = router.underlying_partition_router
467+
if not isinstance(router, SubstreamPartitionRouter):
468+
return ancestors
469+
for parent_config in router.parent_stream_configs:
470+
parent_name = parent_config.stream.name
471+
ancestors.add(parent_name)
472+
ancestors.update(_collect_all_ancestor_names(parent_name))
473+
return ancestors
474+
456475
for stream in streams:
457476
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
458477
continue
459-
partition_router = stream.get_partition_router()
460-
if isinstance(partition_router, GroupingPartitionRouter):
461-
partition_router = partition_router.underlying_partition_router
462-
if not isinstance(partition_router, SubstreamPartitionRouter):
463-
continue
464478
group_name = stream_name_to_group[stream.name]
465-
for parent_config in partition_router.parent_stream_configs:
466-
parent_name = parent_config.stream.name
467-
if stream_name_to_group.get(parent_name) == group_name:
479+
for ancestor_name in _collect_all_ancestor_names(stream.name):
480+
if stream_name_to_group.get(ancestor_name) == group_name:
468481
raise ValueError(
469-
f"Stream '{stream.name}' and its parent stream '{parent_name}' "
482+
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
470483
f"are both in group '{group_name}'. "
471484
f"A child stream must not share a group with its parent to avoid deadlock."
472485
)

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5433,6 +5433,29 @@ def _make_child_stream_with_grouping_router(
54335433
)
54345434

54355435

5436+
def test_apply_stream_groups_raises_on_grandparent_child_in_same_group():
5437+
"""Test _apply_stream_groups detects deadlock when a grandchild and grandparent share a group."""
5438+
grandparent = _make_default_stream("grandparent_stream")
5439+
parent = _make_child_stream_with_parent("parent_stream", grandparent)
5440+
child = _make_child_stream_with_parent("child_stream", parent)
5441+
5442+
source = Mock()
5443+
source._source_config = {
5444+
"stream_groups": {
5445+
"my_group": {
5446+
"streams": [
5447+
{"name": "grandparent_stream", "type": "DeclarativeStream"},
5448+
{"name": "child_stream", "type": "DeclarativeStream"},
5449+
],
5450+
"action": {"type": "BlockSimultaneousSyncsAction"},
5451+
}
5452+
}
5453+
}
5454+
5455+
with pytest.raises(ValueError, match="child stream must not share a group with its parent"):
5456+
ConcurrentDeclarativeSource._apply_stream_groups(source, [grandparent, parent, child])
5457+
5458+
54365459
def test_apply_stream_groups_raises_on_parent_child_in_same_group_with_grouping_router():
54375460
"""Test _apply_stream_groups detects deadlock when GroupingPartitionRouter wraps SubstreamPartitionRouter."""
54385461
parent = _make_default_stream("parent_stream")

0 commit comments

Comments
 (0)