Skip to content

Commit d09ee9b

Browse files
refactor: add get_partition_router() helper to DefaultStream
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names with DefaultStream.get_partition_router() that safely traverses the internal partition_generator -> stream_slicer -> partition_router chain using isinstance checks. Co-Authored-By: unknown <>
1 parent 7ba206f commit d09ee9b

2 files changed

Lines changed: 33 additions & 26 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -371,42 +371,30 @@ def _is_stream_done(self, stream_name: str) -> bool:
371371
return stream_name in self._streams_done
372372

373373
def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
374-
"""
375-
Recursively collect all parent stream names for a given stream.
376-
For example, if we have: epics -> issues -> comments
377-
Then for comments, this returns {issues, epics}
374+
"""Recursively collect all parent stream names for a given stream.
378375
379-
:param stream_name: The stream to collect parents for
380-
:return: Set of all parent stream names (recursively)
376+
For example, if we have: epics -> issues -> comments
377+
Then for comments, this returns {issues, epics}.
381378
"""
379+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
380+
SubstreamPartitionRouter,
381+
)
382+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
383+
382384
parent_names: Set[str] = set()
383385
stream = self._stream_name_to_instance.get(stream_name)
384386

385387
if not stream:
386388
return parent_names
387389

388-
# Get partition router if it exists (this is where parent streams are defined)
389-
partition_router = None
390-
391-
# Try DefaultStream path first (_stream_partition_generator._stream_slicer._partition_router)
392-
if (
393-
hasattr(stream, "_stream_partition_generator")
394-
and hasattr(stream._stream_partition_generator, "_stream_slicer")
395-
and hasattr(stream._stream_partition_generator._stream_slicer, "_partition_router")
396-
):
397-
partition_router = stream._stream_partition_generator._stream_slicer._partition_router
398-
# Fallback to legacy path (retriever.partition_router) for backward compatibility and test mocks
399-
elif hasattr(stream, "retriever") and hasattr(stream.retriever, "partition_router"):
400-
partition_router = stream.retriever.partition_router
390+
partition_router = (
391+
stream.get_partition_router() if isinstance(stream, DefaultStream) else None
392+
)
401393

402-
# SubstreamPartitionRouter has parent_stream_configs
403-
if partition_router and hasattr(partition_router, "parent_stream_configs"):
394+
if isinstance(partition_router, SubstreamPartitionRouter):
404395
for parent_config in partition_router.parent_stream_configs:
405-
parent_stream = parent_config.stream
406-
parent_name = parent_stream.name
396+
parent_name = parent_config.stream.name
407397
parent_names.add(parent_name)
408-
409-
# Recursively collect grandparents, great-grandparents, etc.
410398
parent_names.update(self._collect_all_parent_stream_names(parent_name))
411399

412400
return parent_names

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
#
44

55
from logging import Logger
6-
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
6+
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Mapping, Optional, Union
7+
8+
if TYPE_CHECKING:
9+
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
710

811
from airbyte_cdk.models import AirbyteStream, SyncMode
912
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
@@ -105,6 +108,22 @@ def block_simultaneous_read(self) -> str:
105108
def block_simultaneous_read(self, value: str) -> None:
106109
self._block_simultaneous_read = value
107110

111+
def get_partition_router(self) -> "PartitionRouter | None":
112+
"""Return the partition router for this stream, or None if not available."""
113+
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
114+
ConcurrentPerPartitionCursor,
115+
)
116+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
117+
StreamSlicerPartitionGenerator,
118+
)
119+
120+
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
121+
return None
122+
stream_slicer = self._stream_partition_generator._stream_slicer
123+
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
124+
return None
125+
return stream_slicer._partition_router
126+
108127
def check_availability(self) -> StreamAvailability:
109128
"""
110129
Check stream availability by attempting to read the first record of the stream.

0 commit comments

Comments
 (0)