Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
T = TypeVar("T")


def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]:
iterator = iter(generator)

try:
current = next(iterator)
except StopIteration:
yield None, True
return # Return an empty iterator

for next_item in iterator:
Expand Down Expand Up @@ -206,60 +207,66 @@ def stream_slices(self) -> Iterable[StreamSlice]:
for partition, is_last_slice in iterate_with_last_flag(
parent_stream.generate_partitions()
):
if partition is None:
break
for parent_record, is_last_record_in_slice in iterate_with_last_flag(
partition.read()
):
# In the previous CDK implementation, state management was done internally by the stream.
# However, this could cause issues when doing availability check for example as the availability
# check would progress the state so state management was moved outside of the read method.
# Hence, we need to call the cursor here.
# Note that we call observe and close_partition before emitting the associated record as the
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
# record was consumed.
parent_stream.cursor.observe(parent_record)
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
record_data = parent_record.data

try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
emit_slice = parent_record is not None
if parent_record is not None:
# In the previous CDK implementation, state management was done internally by the stream.
# However, this could cause issues when doing availability check for example as the availability
# check would progress the state so state management was moved outside of the read method.
# Hence, we need to call the cursor here.
# Note that we call observe and close_partition before emitting the associated record as the
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
# record was consumed.
parent_stream.cursor.observe(parent_record)
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
except KeyError:
# FIXME a log here would go a long way for debugging
continue

# Add extra fields
extracted_extra_fields = self._extract_extra_fields(
record_data, extra_fields
)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
record_data,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}
record_data = parent_record.data

try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
emit_slice = False

if emit_slice:
# Add extra fields
extracted_extra_fields = self._extract_extra_fields(
record_data, extra_fields
)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
record_data,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()

yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
if emit_slice:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)

yield from []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
ParentStreamConfig,
SubstreamPartitionRouter,
iterate_with_last_flag,
)
from airbyte_cdk.sources.declarative.requesters.request_option import (
RequestOption,
Expand Down Expand Up @@ -611,7 +612,8 @@ def test_request_option(
),
{
"first_stream": {
"lookback_window": 0,
"lookback_window": 1,
"state": {"cursor": "2022-01-01"},
"states": [
{"cursor": {"cursor": "2021-01-02"}, "partition": {"slice": "first"}},
{"cursor": {"cursor": "2022-01-01"}, "partition": {"slice": "second"}},
Expand Down Expand Up @@ -1079,3 +1081,225 @@ def test_cartesian_product_stream_slicer_warning_log_message(
assert warning_message in logged_warnings
else:
assert warning_message not in logged_warnings


@pytest.mark.parametrize(
"input_iterable,expected_output",
[
pytest.param([], [(None, True)], id="empty_generator_yields_none_sentinel"),
pytest.param([1], [(1, True)], id="single_item"),
pytest.param([1, 2], [(1, False), (2, True)], id="two_items"),
pytest.param([1, 2, 3], [(1, False), (2, False), (3, True)], id="three_items"),
pytest.param(["a", "b"], [("a", False), ("b", True)], id="string_items"),
],
)
def test_iterate_with_last_flag(input_iterable, expected_output):
result = list(iterate_with_last_flag(input_iterable))
assert result == expected_output


def test_substream_partition_router_no_cursor_update_when_partition_has_no_records():
"""
Test that when a partition has no records, the cursor is still properly closed
but no slices are yielded for that partition.
This tests the fix for SubstreamPartitionRouter updating cursor value when no records
were read in partition.
"""
mock_slices = [
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
]

partition_router = SubstreamPartitionRouter(
parent_stream_configs=[
ParentStreamConfig(
stream=MockStream(
[
InMemoryPartition(
"partition_1",
"first_stream",
mock_slices[0],
_build_records_for_slice(
[{"id": "record_1"}, {"id": "record_2"}], mock_slices[0]
),
),
InMemoryPartition(
"partition_2",
"first_stream",
mock_slices[1],
[],
),
],
"first_stream",
),
parent_key="id",
partition_field="partition_field",
parameters={},
config={},
)
],
parameters={},
config={},
)

slices = list(partition_router.stream_slices())
assert slices == [
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
{"partition_field": "record_2", "parent_slice": {"slice": "first"}},
]


def test_substream_partition_router_handles_empty_parent_partitions():
"""
Test that when a parent stream generates no partitions (empty generator),
the stream_slices method returns early without errors.
"""
partition_router = SubstreamPartitionRouter(
parent_stream_configs=[
ParentStreamConfig(
stream=MockStream(
[],
"first_stream",
),
parent_key="id",
partition_field="partition_field",
parameters={},
config={},
)
],
parameters={},
config={},
)

slices = list(partition_router.stream_slices())
assert slices == []


def test_substream_partition_router_closes_all_partitions_even_when_no_records():
"""
Test that cursor.close_partition() is called for all parent stream partitions,
even when a partition produces no parent records.
This validates that partition lifecycle is properly managed regardless of record count.
"""
mock_slices = [
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
StreamSlice(partition={"slice": "third"}, cursor_slice={}),
]

partition_1 = InMemoryPartition(
"partition_1",
"first_stream",
mock_slices[0],
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
)
partition_2 = InMemoryPartition(
"partition_2",
"first_stream",
mock_slices[1],
[],
)
partition_3 = InMemoryPartition(
"partition_3",
"first_stream",
mock_slices[2],
_build_records_for_slice([{"id": "record_3"}], mock_slices[2]),
)

mock_cursor = Mock()
mock_cursor.stream_slices.return_value = []

partition_router = SubstreamPartitionRouter(
parent_stream_configs=[
ParentStreamConfig(
stream=MockStream(
[partition_1, partition_2, partition_3],
"first_stream",
cursor=mock_cursor,
),
parent_key="id",
partition_field="partition_field",
parameters={},
config={},
)
],
parameters={},
config={},
)

slices = list(partition_router.stream_slices())

assert slices == [
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
{"partition_field": "record_3", "parent_slice": {"slice": "third"}},
]

assert mock_cursor.close_partition.call_count == 3

close_partition_calls = mock_cursor.close_partition.call_args_list
assert close_partition_calls[0][0][0] == partition_1
assert close_partition_calls[1][0][0] == partition_2
assert close_partition_calls[2][0][0] == partition_3


def test_substream_partition_router_closes_partition_even_when_parent_key_missing():
"""
Test that cursor.close_partition() is called even when the parent_key extraction
fails with a KeyError. This ensures partition lifecycle is properly managed
regardless of whether the slice can be emitted.
"""
mock_slices = [
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
]

# First partition has a record with the expected "id" key
partition_1 = InMemoryPartition(
"partition_1",
"first_stream",
mock_slices[0],
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
)
# Second partition has a record missing the "id" key (will cause KeyError)
partition_2 = InMemoryPartition(
"partition_2",
"first_stream",
mock_slices[1],
_build_records_for_slice([{"other_field": "value"}], mock_slices[1]),
)

mock_cursor = Mock()
mock_cursor.stream_slices.return_value = []

partition_router = SubstreamPartitionRouter(
parent_stream_configs=[
ParentStreamConfig(
stream=MockStream(
[partition_1, partition_2],
"first_stream",
cursor=mock_cursor,
),
parent_key="id",
partition_field="partition_field",
parameters={},
config={},
)
],
parameters={},
config={},
)

slices = list(partition_router.stream_slices())

# Only the first partition's record should produce a slice
# The second partition's record is missing the "id" key, so no slice is emitted
assert slices == [
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
]

# Both partitions should be closed, even though the second one had a KeyError
assert mock_cursor.close_partition.call_count == 2

close_partition_calls = mock_cursor.close_partition.call_args_list
assert close_partition_calls[0][0][0] == partition_1
assert close_partition_calls[1][0][0] == partition_2