Skip to content

Commit 5911051

Browse files
fix: handle GroupingPartitionRouter at call sites instead of in get_partition_router()
Co-Authored-By: unknown <>
1 parent 1fffc69 commit 5911051

3 files changed

Lines changed: 12 additions & 12 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
)
1414
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
1515
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
16+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
17+
GroupingPartitionRouter,
18+
)
1619
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
1720
SubstreamPartitionRouter,
1821
)
@@ -398,6 +401,8 @@ def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
398401
partition_router = (
399402
stream.get_partition_router() if isinstance(stream, DefaultStream) else None
400403
)
404+
if isinstance(partition_router, GroupingPartitionRouter):
405+
partition_router = partition_router.underlying_partition_router
401406

402407
if isinstance(partition_router, SubstreamPartitionRouter):
403408
for parent_config in partition_router.parent_stream_configs:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@
7676
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
7777
ModelToComponentFactory,
7878
)
79+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
80+
GroupingPartitionRouter,
81+
)
7982
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
8083
SubstreamPartitionRouter,
8184
)
@@ -454,6 +457,8 @@ def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
454457
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
455458
continue
456459
partition_router = stream.get_partition_router()
460+
if isinstance(partition_router, GroupingPartitionRouter):
461+
partition_router = partition_router.underlying_partition_router
457462
if not isinstance(partition_router, SubstreamPartitionRouter):
458463
continue
459464
group_name = stream_name_to_group[stream.name]

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@
99
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
1010
ConcurrentPerPartitionCursor,
1111
)
12-
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
13-
GroupingPartitionRouter,
14-
)
1512
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
1613
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
1714
StreamSlicerPartitionGenerator,
@@ -116,20 +113,13 @@ def block_simultaneous_read(self, value: str) -> None:
116113
self._block_simultaneous_read = value
117114

118115
def get_partition_router(self) -> PartitionRouter | None:
119-
"""Return the partition router for this stream, or None if not available.
120-
121-
If the router is a GroupingPartitionRouter, unwraps it to return the
122-
underlying router so callers can inspect parent stream relationships.
123-
"""
116+
"""Return the partition router for this stream, or None if not available."""
124117
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
125118
return None
126119
stream_slicer = self._stream_partition_generator._stream_slicer
127120
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
128121
return None
129-
router = stream_slicer._partition_router
130-
if isinstance(router, GroupingPartitionRouter):
131-
return router.underlying_partition_router
132-
return router
122+
return stream_slicer._partition_router
133123

134124
def check_availability(self) -> StreamAvailability:
135125
"""

0 commit comments

Comments
 (0)