Skip to content

Commit 0874f12

Browse files
feat: assert partition generation queue is empty when all streams are done
Adds a safety check in is_done() that raises AirbyteTracedException (system_error) if streams remain in the partition generation queue after all streams are marked done. Also moves inline imports to module level and updates test mocks to use DefaultStream with get_partition_router(). Co-Authored-By: unknown <>
1 parent 94c4b82 commit 0874f12

2 files changed

Lines changed: 65 additions & 13 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
)
1414
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
1515
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
16+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
17+
SubstreamPartitionRouter,
18+
)
1619
from airbyte_cdk.sources.message import MessageRepository
1720
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
21+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
1822
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
1923
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
2024
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
@@ -354,6 +358,15 @@ def is_done(self) -> bool:
354358
for stream_name in self._stream_name_to_instance.keys()
355359
]
356360
)
361+
if is_done and self._stream_instances_to_start_partition_generation:
362+
stuck_stream_names = [
363+
s.name for s in self._stream_instances_to_start_partition_generation
364+
]
365+
raise AirbyteTracedException(
366+
message="Partition generation queue is not empty after all streams completed.",
367+
internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
368+
failure_type=FailureType.system_error,
369+
)
357370
if is_done and self._exceptions_per_stream_name:
358371
error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
359372
self._logger.info(error_message)
@@ -376,11 +389,6 @@ def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
376389
For example, if we have: epics -> issues -> comments
377390
Then for comments, this returns {issues, epics}.
378391
"""
379-
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
380-
SubstreamPartitionRouter,
381-
)
382-
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
383-
384392
parent_names: Set[str] = set()
385393
stream = self._stream_name_to_instance.get(stream_name)
386394

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@
2828
)
2929
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
3030
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
31+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
32+
SubstreamPartitionRouter,
33+
)
3134
from airbyte_cdk.sources.message import LogMessage, MessageRepository
3235
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
36+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
3337
from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
3438
from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
3539
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
@@ -822,18 +826,22 @@ def _create_mock_stream(self, name: str, block_simultaneous_read: str = ""):
822826
def _create_mock_stream_with_parent(
823827
self, name: str, parent_stream, block_simultaneous_read: str = ""
824828
):
825-
"""Helper to create a mock stream with a parent stream"""
826-
stream = self._create_mock_stream(name, block_simultaneous_read)
829+
"""Helper to create a mock stream with a parent stream."""
830+
stream = Mock(spec=DefaultStream)
831+
stream.name = name
832+
stream.block_simultaneous_read = block_simultaneous_read
833+
stream.as_airbyte_stream.return_value = AirbyteStream(
834+
name=name,
835+
json_schema={},
836+
supported_sync_modes=[SyncMode.full_refresh],
837+
)
838+
stream.cursor.ensure_at_least_one_state_emitted = Mock()
827839

828-
# Mock the retriever and partition router for parent relationship
829-
mock_retriever = Mock()
830-
mock_partition_router = Mock()
840+
mock_partition_router = Mock(spec=SubstreamPartitionRouter)
831841
mock_parent_config = Mock()
832842
mock_parent_config.stream = parent_stream
833-
834843
mock_partition_router.parent_stream_configs = [mock_parent_config]
835-
mock_retriever.partition_router = mock_partition_router
836-
stream.retriever = mock_retriever
844+
stream.get_partition_router.return_value = mock_partition_router
837845

838846
return stream
839847

@@ -1396,3 +1404,39 @@ def test_child_starts_after_parent_completes_via_partition_complete_sentinel(sel
13961404
]
13971405
assert len(started_messages) == 1
13981406
assert started_messages[0].trace.stream_status.stream_descriptor.name == "child"
1407+
1408+
1409+
def test_is_done_raises_when_partition_generation_queue_not_empty():
1410+
"""Test is_done raises AirbyteTracedException if streams remain in the partition generation queue."""
1411+
partition_enqueuer = Mock(spec=PartitionEnqueuer)
1412+
thread_pool_manager = Mock(spec=ThreadPoolManager)
1413+
logger = Mock(spec=logging.Logger)
1414+
slice_logger = Mock(spec=SliceLogger)
1415+
message_repository = Mock(spec=MessageRepository)
1416+
message_repository.consume_queue.return_value = []
1417+
partition_reader = Mock(spec=PartitionReader)
1418+
1419+
stream = Mock(spec=AbstractStream)
1420+
stream.name = "stuck_stream"
1421+
stream.block_simultaneous_read = ""
1422+
stream.as_airbyte_stream.return_value = AirbyteStream(
1423+
name="stuck_stream",
1424+
json_schema={},
1425+
supported_sync_modes=[SyncMode.full_refresh],
1426+
)
1427+
1428+
handler = ConcurrentReadProcessor(
1429+
[stream],
1430+
partition_enqueuer,
1431+
thread_pool_manager,
1432+
logger,
1433+
slice_logger,
1434+
message_repository,
1435+
partition_reader,
1436+
)
1437+
1438+
# Artificially mark the stream as done without removing it from the partition generation queue
1439+
handler._streams_done.add("stuck_stream")
1440+
1441+
with pytest.raises(AirbyteTracedException, match="remained in the partition generation queue"):
1442+
handler.is_done()

0 commit comments

Comments
 (0)