|
9 | 9 | from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( |
10 | 10 | ConcurrentPerPartitionCursor, |
11 | 11 | ) |
| 12 | +from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( |
| 13 | + GroupingPartitionRouter, |
| 14 | +) |
12 | 15 | from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter |
13 | 16 | from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( |
14 | 17 | StreamSlicerPartitionGenerator, |
@@ -113,13 +116,20 @@ def block_simultaneous_read(self, value: str) -> None: |
113 | 116 | self._block_simultaneous_read = value |
114 | 117 |
|
115 | 118 | def get_partition_router(self) -> PartitionRouter | None: |
116 | | - """Return the partition router for this stream, or None if not available.""" |
| 119 | + """Return the partition router for this stream, or None if not available. |
| 120 | +
|
| 121 | + If the router is a GroupingPartitionRouter, unwraps it to return the |
| 122 | + underlying router so callers can inspect parent stream relationships. |
| 123 | + """ |
117 | 124 | if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): |
118 | 125 | return None |
119 | 126 | stream_slicer = self._stream_partition_generator._stream_slicer |
120 | 127 | if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): |
121 | 128 | return None |
122 | | - return stream_slicer._partition_router |
| 129 | + router = stream_slicer._partition_router |
| 130 | + if isinstance(router, GroupingPartitionRouter): |
| 131 | + return router.underlying_partition_router |
| 132 | + return router |
123 | 133 |
|
124 | 134 | def check_availability(self) -> StreamAvailability: |
125 | 135 | """ |
|
0 commit comments