diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index e41a0d9a1..5d2524b55 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -39,12 +39,13 @@ T = TypeVar("T") -def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]: +def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]: iterator = iter(generator) try: current = next(iterator) except StopIteration: + yield None, True return # Return an empty iterator for next_item in iterator: @@ -206,60 +207,66 @@ def stream_slices(self) -> Iterable[StreamSlice]: for partition, is_last_slice in iterate_with_last_flag( parent_stream.generate_partitions() ): + if partition is None: + break for parent_record, is_last_record_in_slice in iterate_with_last_flag( partition.read() ): - # In the previous CDK implementation, state management was done internally by the stream. - # However, this could cause issues when doing availability check for example as the availability - # check would progress the state so state management was moved outside of the read method. - # Hence, we need to call the cursor here. - # Note that we call observe and close_partition before emitting the associated record as the - # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the - # record was consumed. - parent_stream.cursor.observe(parent_record) - parent_partition = ( - parent_record.associated_slice.partition - if parent_record.associated_slice - else {} - ) - record_data = parent_record.data - - try: - partition_value = dpath.get( - record_data, # type: ignore [arg-type] - parent_field, + emit_slice = parent_record is not None + if parent_record is not None: + # In the previous CDK implementation, state management was done internally by the stream. + # However, this could cause issues when doing availability check for example as the availability + # check would progress the state so state management was moved outside of the read method. + # Hence, we need to call the cursor here. + # Note that we call observe and close_partition before emitting the associated record as the + # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the + # record was consumed. + parent_stream.cursor.observe(parent_record) + parent_partition = ( + parent_record.associated_slice.partition + if parent_record.associated_slice + else {} ) - except KeyError: - # FIXME a log here would go a long way for debugging - continue - - # Add extra fields - extracted_extra_fields = self._extract_extra_fields( - record_data, extra_fields - ) - - if parent_stream_config.lazy_read_pointer: - extracted_extra_fields = { - "child_response": self._extract_child_response( - record_data, - parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config - ), - **extracted_extra_fields, - } + record_data = parent_record.data + + try: + partition_value = dpath.get( + record_data, # type: ignore [arg-type] + parent_field, + ) + except KeyError: + # FIXME a log here would go a long way for debugging + emit_slice = False + + if emit_slice: + # Add extra fields + extracted_extra_fields = self._extract_extra_fields( + record_data, extra_fields + ) + + if parent_stream_config.lazy_read_pointer: + extracted_extra_fields = { + "child_response": self._extract_child_response( + record_data, + parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config + ), + **extracted_extra_fields, + } if is_last_record_in_slice: parent_stream.cursor.close_partition(partition) if is_last_slice: parent_stream.cursor.ensure_at_least_one_state_emitted() - yield StreamSlice( - partition={ - partition_field: partition_value, - "parent_slice": parent_partition or {}, - }, - cursor_slice={}, - extra_fields=extracted_extra_fields, - ) + if emit_slice: + yield StreamSlice( + partition={ + partition_field: partition_value, + "parent_slice": parent_partition or {}, + }, + cursor_slice={}, + extra_fields=extracted_extra_fields, + ) yield from [] diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index feab9bbc0..b69849ebe 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -20,6 +20,7 @@ from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( ParentStreamConfig, SubstreamPartitionRouter, + iterate_with_last_flag, ) from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, @@ -611,7 +612,8 @@ def test_request_option( ), { "first_stream": { - "lookback_window": 0, + "lookback_window": 1, + "state": {"cursor": "2022-01-01"}, "states": [ {"cursor": {"cursor": "2021-01-02"}, "partition": {"slice": "first"}}, {"cursor": {"cursor": "2022-01-01"}, "partition": {"slice": "second"}}, @@ -1079,3 +1081,225 @@ def test_cartesian_product_stream_slicer_warning_log_message( assert warning_message in logged_warnings else: assert warning_message not in logged_warnings + + +@pytest.mark.parametrize( + "input_iterable,expected_output", + [ + pytest.param([], [(None, True)], id="empty_generator_yields_none_sentinel"), + pytest.param([1], [(1, True)], id="single_item"), + pytest.param([1, 2], [(1, False), (2, True)], id="two_items"), + pytest.param([1, 2, 3], [(1, False), (2, False), (3, True)], id="three_items"), + pytest.param(["a", "b"], [("a", False), ("b", True)], id="string_items"), + ], +) +def test_iterate_with_last_flag(input_iterable, expected_output): + result = list(iterate_with_last_flag(input_iterable)) + assert result == expected_output + + +def test_substream_partition_router_no_cursor_update_when_partition_has_no_records(): + """ + Test that when a partition has no records, the cursor is still properly closed + but no slices are yielded for that partition. + This tests the fix for SubstreamPartitionRouter updating cursor value when no records + were read in partition. + """ + mock_slices = [ + StreamSlice(partition={"slice": "first"}, cursor_slice={}), + StreamSlice(partition={"slice": "second"}, cursor_slice={}), + ] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockStream( + [ + InMemoryPartition( + "partition_1", + "first_stream", + mock_slices[0], + _build_records_for_slice( + [{"id": "record_1"}, {"id": "record_2"}], mock_slices[0] + ), + ), + InMemoryPartition( + "partition_2", + "first_stream", + mock_slices[1], + [], + ), + ], + "first_stream", + ), + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + slices = list(partition_router.stream_slices()) + assert slices == [ + {"partition_field": "record_1", "parent_slice": {"slice": "first"}}, + {"partition_field": "record_2", "parent_slice": {"slice": "first"}}, + ] + + +def test_substream_partition_router_handles_empty_parent_partitions(): + """ + Test that when a parent stream generates no partitions (empty generator), + the stream_slices method returns early without errors. + """ + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockStream( + [], + "first_stream", + ), + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + slices = list(partition_router.stream_slices()) + assert slices == [] + + +def test_substream_partition_router_closes_all_partitions_even_when_no_records(): + """ + Test that cursor.close_partition() is called for all parent stream partitions, + even when a partition produces no parent records. + This validates that partition lifecycle is properly managed regardless of record count. + """ + mock_slices = [ + StreamSlice(partition={"slice": "first"}, cursor_slice={}), + StreamSlice(partition={"slice": "second"}, cursor_slice={}), + StreamSlice(partition={"slice": "third"}, cursor_slice={}), + ] + + partition_1 = InMemoryPartition( + "partition_1", + "first_stream", + mock_slices[0], + _build_records_for_slice([{"id": "record_1"}], mock_slices[0]), + ) + partition_2 = InMemoryPartition( + "partition_2", + "first_stream", + mock_slices[1], + [], + ) + partition_3 = InMemoryPartition( + "partition_3", + "first_stream", + mock_slices[2], + _build_records_for_slice([{"id": "record_3"}], mock_slices[2]), + ) + + mock_cursor = Mock() + mock_cursor.stream_slices.return_value = [] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockStream( + [partition_1, partition_2, partition_3], + "first_stream", + cursor=mock_cursor, + ), + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + slices = list(partition_router.stream_slices()) + + assert slices == [ + {"partition_field": "record_1", "parent_slice": {"slice": "first"}}, + {"partition_field": "record_3", "parent_slice": {"slice": "third"}}, + ] + + assert mock_cursor.close_partition.call_count == 3 + + close_partition_calls = mock_cursor.close_partition.call_args_list + assert close_partition_calls[0][0][0] == partition_1 + assert close_partition_calls[1][0][0] == partition_2 + assert close_partition_calls[2][0][0] == partition_3 + + +def test_substream_partition_router_closes_partition_even_when_parent_key_missing(): + """ + Test that cursor.close_partition() is called even when the parent_key extraction + fails with a KeyError. This ensures partition lifecycle is properly managed + regardless of whether the slice can be emitted. + """ + mock_slices = [ + StreamSlice(partition={"slice": "first"}, cursor_slice={}), + StreamSlice(partition={"slice": "second"}, cursor_slice={}), + ] + + # First partition has a record with the expected "id" key + partition_1 = InMemoryPartition( + "partition_1", + "first_stream", + mock_slices[0], + _build_records_for_slice([{"id": "record_1"}], mock_slices[0]), + ) + # Second partition has a record missing the "id" key (will cause KeyError) + partition_2 = InMemoryPartition( + "partition_2", + "first_stream", + mock_slices[1], + _build_records_for_slice([{"other_field": "value"}], mock_slices[1]), + ) + + mock_cursor = Mock() + mock_cursor.stream_slices.return_value = [] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockStream( + [partition_1, partition_2], + "first_stream", + cursor=mock_cursor, + ), + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + slices = list(partition_router.stream_slices()) + + # Only the first partition's record should produce a slice + # The second partition's record is missing the "id" key, so no slice is emitted + assert slices == [ + {"partition_field": "record_1", "parent_slice": {"slice": "first"}}, + ] + + # Both partitions should be closed, even though the second one had a KeyError + assert mock_cursor.close_partition.call_count == 2 + + close_partition_calls = mock_cursor.close_partition.call_args_list + assert close_partition_calls[0][0][0] == partition_1 + assert close_partition_calls[1][0][0] == partition_2