Skip to content

Commit 9029049

Browse files
test: add missing unit tests for GroupingPartitionRouter, active_groups check, and get_partition_router
Co-Authored-By: unknown <>
1 parent d01ee31 commit 9029049

File tree

2 files changed

+229
-0
lines changed

2 files changed

+229
-0
lines changed

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5351,3 +5351,147 @@ def test_apply_stream_groups_allows_parent_child_in_different_groups():
53515351

53525352
assert parent.block_simultaneous_read == "group_a"
53535353
assert child.block_simultaneous_read == "group_b"
5354+
5355+
5356+
def _make_child_stream_with_grouping_router(
5357+
child_name: str, parent_stream: DefaultStream
5358+
) -> DefaultStream:
5359+
"""Create a DefaultStream with GroupingPartitionRouter wrapping SubstreamPartitionRouter."""
5360+
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
5361+
ConcurrentCursorFactory,
5362+
ConcurrentPerPartitionCursor,
5363+
)
5364+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
5365+
GroupingPartitionRouter,
5366+
)
5367+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
5368+
ParentStreamConfig,
5369+
SubstreamPartitionRouter,
5370+
)
5371+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
5372+
DeclarativePartitionFactory,
5373+
StreamSlicerPartitionGenerator,
5374+
)
5375+
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
5376+
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
5377+
EpochValueConcurrentStreamStateConverter,
5378+
)
5379+
5380+
substream_router = SubstreamPartitionRouter(
5381+
parent_stream_configs=[
5382+
ParentStreamConfig(
5383+
stream=parent_stream,
5384+
parent_key="id",
5385+
partition_field="parent_id",
5386+
config={},
5387+
parameters={},
5388+
)
5389+
],
5390+
config={},
5391+
parameters={},
5392+
)
5393+
5394+
grouping_router = GroupingPartitionRouter(
5395+
group_size=10,
5396+
underlying_partition_router=substream_router,
5397+
config={},
5398+
)
5399+
5400+
cursor_factory = ConcurrentCursorFactory(lambda *args, **kwargs: Mock())
5401+
message_repository = InMemoryMessageRepository()
5402+
state_converter = EpochValueConcurrentStreamStateConverter()
5403+
5404+
per_partition_cursor = ConcurrentPerPartitionCursor(
5405+
cursor_factory=cursor_factory,
5406+
partition_router=grouping_router,
5407+
stream_name=child_name,
5408+
stream_namespace=None,
5409+
stream_state={},
5410+
message_repository=message_repository,
5411+
connector_state_manager=Mock(),
5412+
connector_state_converter=state_converter,
5413+
cursor_field=Mock(cursor_field_key="updated_at"),
5414+
)
5415+
5416+
partition_factory = Mock(spec=DeclarativePartitionFactory)
5417+
partition_generator = StreamSlicerPartitionGenerator(
5418+
partition_factory=partition_factory,
5419+
stream_slicer=per_partition_cursor,
5420+
)
5421+
5422+
cursor = FinalStateCursor(
5423+
stream_name=child_name, stream_namespace=None, message_repository=message_repository
5424+
)
5425+
return DefaultStream(
5426+
partition_generator=partition_generator,
5427+
name=child_name,
5428+
json_schema={},
5429+
primary_key=[],
5430+
cursor_field=None,
5431+
logger=logging.getLogger(f"test.{child_name}"),
5432+
cursor=cursor,
5433+
)
5434+
5435+
5436+
def test_apply_stream_groups_raises_on_parent_child_in_same_group_with_grouping_router():
5437+
"""Test _apply_stream_groups detects deadlock when GroupingPartitionRouter wraps SubstreamPartitionRouter."""
5438+
parent = _make_default_stream("parent_stream")
5439+
child = _make_child_stream_with_grouping_router("child_stream", parent)
5440+
5441+
source = Mock()
5442+
source._source_config = {
5443+
"stream_groups": {
5444+
"my_group": {
5445+
"streams": [
5446+
{"name": "parent_stream", "type": "DeclarativeStream"},
5447+
{"name": "child_stream", "type": "DeclarativeStream"},
5448+
],
5449+
"action": {"type": "BlockSimultaneousSyncsAction"},
5450+
}
5451+
}
5452+
}
5453+
5454+
with pytest.raises(ValueError, match="child stream must not share a group with its parent"):
5455+
ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child])
5456+
5457+
5458+
@pytest.mark.parametrize(
5459+
"stream_factory,expected_type",
5460+
[
5461+
pytest.param(
5462+
lambda: _make_default_stream("plain_stream"),
5463+
type(None),
5464+
id="no_partition_router_returns_none",
5465+
),
5466+
pytest.param(
5467+
lambda: _make_child_stream_with_parent("child", _make_default_stream("parent")),
5468+
"SubstreamPartitionRouter",
5469+
id="substream_returns_substream_router",
5470+
),
5471+
pytest.param(
5472+
lambda: _make_child_stream_with_grouping_router(
5473+
"child", _make_default_stream("parent")
5474+
),
5475+
"GroupingPartitionRouter",
5476+
id="grouping_returns_grouping_router",
5477+
),
5478+
],
5479+
)
5480+
def test_get_partition_router(stream_factory, expected_type):
5481+
"""Test DefaultStream.get_partition_router returns the correct router type."""
5482+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
5483+
GroupingPartitionRouter,
5484+
)
5485+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
5486+
SubstreamPartitionRouter,
5487+
)
5488+
5489+
stream = stream_factory()
5490+
router = stream.get_partition_router()
5491+
5492+
if expected_type is type(None):
5493+
assert router is None
5494+
elif expected_type == "SubstreamPartitionRouter":
5495+
assert isinstance(router, SubstreamPartitionRouter)
5496+
elif expected_type == "GroupingPartitionRouter":
5497+
assert isinstance(router, GroupingPartitionRouter)

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
)
2929
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
3030
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
31+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
32+
GroupingPartitionRouter,
33+
)
3134
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
3235
SubstreamPartitionRouter,
3336
)
@@ -1440,3 +1443,85 @@ def test_is_done_raises_when_partition_generation_queue_not_empty():
14401443

14411444
with pytest.raises(AirbyteTracedException, match="remained in the partition generation queue"):
14421445
handler.is_done()
1446+
1447+
1448+
def test_is_done_raises_when_active_groups_not_empty():
1449+
"""Test is_done raises AirbyteTracedException if active groups remain after all streams complete."""
1450+
partition_enqueuer = Mock(spec=PartitionEnqueuer)
1451+
thread_pool_manager = Mock(spec=ThreadPoolManager)
1452+
logger = Mock(spec=logging.Logger)
1453+
slice_logger = Mock(spec=SliceLogger)
1454+
message_repository = Mock(spec=MessageRepository)
1455+
message_repository.consume_queue.return_value = []
1456+
partition_reader = Mock(spec=PartitionReader)
1457+
1458+
stream = Mock(spec=AbstractStream)
1459+
stream.name = "stuck_stream"
1460+
stream.block_simultaneous_read = "my_group"
1461+
stream.as_airbyte_stream.return_value = AirbyteStream(
1462+
name="stuck_stream",
1463+
json_schema={},
1464+
supported_sync_modes=[SyncMode.full_refresh],
1465+
)
1466+
1467+
handler = ConcurrentReadProcessor(
1468+
[stream],
1469+
partition_enqueuer,
1470+
thread_pool_manager,
1471+
logger,
1472+
slice_logger,
1473+
message_repository,
1474+
partition_reader,
1475+
)
1476+
1477+
# Mark stream as done but leave the group active (simulating a bug)
1478+
handler._streams_done.add("stuck_stream")
1479+
handler._stream_instances_to_start_partition_generation.clear()
1480+
handler._active_groups["my_group"] = {"stuck_stream"}
1481+
1482+
with pytest.raises(
1483+
AirbyteTracedException, match="still active after all streams were marked done"
1484+
):
1485+
handler.is_done()
1486+
1487+
1488+
def test_collect_parent_stream_names_unwraps_grouping_partition_router():
1489+
"""Test _collect_all_parent_stream_names unwraps GroupingPartitionRouter to find parents."""
1490+
partition_enqueuer = Mock(spec=PartitionEnqueuer)
1491+
thread_pool_manager = Mock(spec=ThreadPoolManager)
1492+
logger = Mock(spec=logging.Logger)
1493+
slice_logger = Mock(spec=SliceLogger)
1494+
message_repository = Mock(spec=MessageRepository)
1495+
message_repository.consume_queue.return_value = []
1496+
partition_reader = Mock(spec=PartitionReader)
1497+
1498+
parent_stream = Mock(spec=AbstractStream)
1499+
parent_stream.name = "parent"
1500+
parent_stream.block_simultaneous_read = ""
1501+
1502+
# Child has a GroupingPartitionRouter wrapping a SubstreamPartitionRouter
1503+
child_stream = Mock(spec=DefaultStream)
1504+
child_stream.name = "child"
1505+
child_stream.block_simultaneous_read = ""
1506+
1507+
mock_substream_router = Mock(spec=SubstreamPartitionRouter)
1508+
mock_parent_config = Mock()
1509+
mock_parent_config.stream = parent_stream
1510+
mock_substream_router.parent_stream_configs = [mock_parent_config]
1511+
1512+
mock_grouping_router = Mock(spec=GroupingPartitionRouter)
1513+
mock_grouping_router.underlying_partition_router = mock_substream_router
1514+
child_stream.get_partition_router.return_value = mock_grouping_router
1515+
1516+
handler = ConcurrentReadProcessor(
1517+
[parent_stream, child_stream],
1518+
partition_enqueuer,
1519+
thread_pool_manager,
1520+
logger,
1521+
slice_logger,
1522+
message_repository,
1523+
partition_reader,
1524+
)
1525+
1526+
parent_names = handler._collect_all_parent_stream_names("child")
1527+
assert parent_names == {"parent"}

0 commit comments

Comments
 (0)