Skip to content

Commit e9144e2

Browse files
fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition (#889)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 3eb9ce8 commit e9144e2

File tree

2 files changed

+277
-46
lines changed

2 files changed

+277
-46
lines changed

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 52 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@
3939
T = TypeVar("T")
4040

4141

42-
def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
42+
def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]:
4343
iterator = iter(generator)
4444

4545
try:
4646
current = next(iterator)
4747
except StopIteration:
48+
yield None, True
4849
return # Return an empty iterator
4950

5051
for next_item in iterator:
@@ -206,60 +207,66 @@ def stream_slices(self) -> Iterable[StreamSlice]:
206207
for partition, is_last_slice in iterate_with_last_flag(
207208
parent_stream.generate_partitions()
208209
):
210+
if partition is None:
211+
break
209212
for parent_record, is_last_record_in_slice in iterate_with_last_flag(
210213
partition.read()
211214
):
212-
# In the previous CDK implementation, state management was done internally by the stream.
213-
# However, this could cause issues when doing availability check for example as the availability
214-
# check would progress the state so state management was moved outside of the read method.
215-
# Hence, we need to call the cursor here.
216-
# Note that we call observe and close_partition before emitting the associated record as the
217-
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
218-
# record was consumed.
219-
parent_stream.cursor.observe(parent_record)
220-
parent_partition = (
221-
parent_record.associated_slice.partition
222-
if parent_record.associated_slice
223-
else {}
224-
)
225-
record_data = parent_record.data
226-
227-
try:
228-
partition_value = dpath.get(
229-
record_data, # type: ignore [arg-type]
230-
parent_field,
215+
emit_slice = parent_record is not None
216+
if parent_record is not None:
217+
# In the previous CDK implementation, state management was done internally by the stream.
218+
# However, this could cause issues when doing availability check for example as the availability
219+
# check would progress the state so state management was moved outside of the read method.
220+
# Hence, we need to call the cursor here.
221+
# Note that we call observe and close_partition before emitting the associated record as the
222+
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
223+
# record was consumed.
224+
parent_stream.cursor.observe(parent_record)
225+
parent_partition = (
226+
parent_record.associated_slice.partition
227+
if parent_record.associated_slice
228+
else {}
231229
)
232-
except KeyError:
233-
# FIXME a log here would go a long way for debugging
234-
continue
235-
236-
# Add extra fields
237-
extracted_extra_fields = self._extract_extra_fields(
238-
record_data, extra_fields
239-
)
240-
241-
if parent_stream_config.lazy_read_pointer:
242-
extracted_extra_fields = {
243-
"child_response": self._extract_child_response(
244-
record_data,
245-
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
246-
),
247-
**extracted_extra_fields,
248-
}
230+
record_data = parent_record.data
231+
232+
try:
233+
partition_value = dpath.get(
234+
record_data, # type: ignore [arg-type]
235+
parent_field,
236+
)
237+
except KeyError:
238+
# FIXME a log here would go a long way for debugging
239+
emit_slice = False
240+
241+
if emit_slice:
242+
# Add extra fields
243+
extracted_extra_fields = self._extract_extra_fields(
244+
record_data, extra_fields
245+
)
246+
247+
if parent_stream_config.lazy_read_pointer:
248+
extracted_extra_fields = {
249+
"child_response": self._extract_child_response(
250+
record_data,
251+
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
252+
),
253+
**extracted_extra_fields,
254+
}
249255

250256
if is_last_record_in_slice:
251257
parent_stream.cursor.close_partition(partition)
252258
if is_last_slice:
253259
parent_stream.cursor.ensure_at_least_one_state_emitted()
254260

255-
yield StreamSlice(
256-
partition={
257-
partition_field: partition_value,
258-
"parent_slice": parent_partition or {},
259-
},
260-
cursor_slice={},
261-
extra_fields=extracted_extra_fields,
262-
)
261+
if emit_slice:
262+
yield StreamSlice(
263+
partition={
264+
partition_field: partition_value,
265+
"parent_slice": parent_partition or {},
266+
},
267+
cursor_slice={},
268+
extra_fields=extracted_extra_fields,
269+
)
263270

264271
yield from []
265272

unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py

Lines changed: 225 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
2121
ParentStreamConfig,
2222
SubstreamPartitionRouter,
23+
iterate_with_last_flag,
2324
)
2425
from airbyte_cdk.sources.declarative.requesters.request_option import (
2526
RequestOption,
@@ -611,7 +612,8 @@ def test_request_option(
611612
),
612613
{
613614
"first_stream": {
614-
"lookback_window": 0,
615+
"lookback_window": 1,
616+
"state": {"cursor": "2022-01-01"},
615617
"states": [
616618
{"cursor": {"cursor": "2021-01-02"}, "partition": {"slice": "first"}},
617619
{"cursor": {"cursor": "2022-01-01"}, "partition": {"slice": "second"}},
@@ -1079,3 +1081,225 @@ def test_cartesian_product_stream_slicer_warning_log_message(
10791081
assert warning_message in logged_warnings
10801082
else:
10811083
assert warning_message not in logged_warnings
1084+
1085+
1086+
@pytest.mark.parametrize(
1087+
"input_iterable,expected_output",
1088+
[
1089+
pytest.param([], [(None, True)], id="empty_generator_yields_none_sentinel"),
1090+
pytest.param([1], [(1, True)], id="single_item"),
1091+
pytest.param([1, 2], [(1, False), (2, True)], id="two_items"),
1092+
pytest.param([1, 2, 3], [(1, False), (2, False), (3, True)], id="three_items"),
1093+
pytest.param(["a", "b"], [("a", False), ("b", True)], id="string_items"),
1094+
],
1095+
)
1096+
def test_iterate_with_last_flag(input_iterable, expected_output):
1097+
result = list(iterate_with_last_flag(input_iterable))
1098+
assert result == expected_output
1099+
1100+
1101+
def test_substream_partition_router_no_cursor_update_when_partition_has_no_records():
1102+
"""
1103+
Test that when a partition has no records, the cursor is still properly closed
1104+
but no slices are yielded for that partition.
1105+
This tests the fix for SubstreamPartitionRouter updating cursor value when no records
1106+
were read in partition.
1107+
"""
1108+
mock_slices = [
1109+
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
1110+
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
1111+
]
1112+
1113+
partition_router = SubstreamPartitionRouter(
1114+
parent_stream_configs=[
1115+
ParentStreamConfig(
1116+
stream=MockStream(
1117+
[
1118+
InMemoryPartition(
1119+
"partition_1",
1120+
"first_stream",
1121+
mock_slices[0],
1122+
_build_records_for_slice(
1123+
[{"id": "record_1"}, {"id": "record_2"}], mock_slices[0]
1124+
),
1125+
),
1126+
InMemoryPartition(
1127+
"partition_2",
1128+
"first_stream",
1129+
mock_slices[1],
1130+
[],
1131+
),
1132+
],
1133+
"first_stream",
1134+
),
1135+
parent_key="id",
1136+
partition_field="partition_field",
1137+
parameters={},
1138+
config={},
1139+
)
1140+
],
1141+
parameters={},
1142+
config={},
1143+
)
1144+
1145+
slices = list(partition_router.stream_slices())
1146+
assert slices == [
1147+
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
1148+
{"partition_field": "record_2", "parent_slice": {"slice": "first"}},
1149+
]
1150+
1151+
1152+
def test_substream_partition_router_handles_empty_parent_partitions():
1153+
"""
1154+
Test that when a parent stream generates no partitions (empty generator),
1155+
the stream_slices method returns early without errors.
1156+
"""
1157+
partition_router = SubstreamPartitionRouter(
1158+
parent_stream_configs=[
1159+
ParentStreamConfig(
1160+
stream=MockStream(
1161+
[],
1162+
"first_stream",
1163+
),
1164+
parent_key="id",
1165+
partition_field="partition_field",
1166+
parameters={},
1167+
config={},
1168+
)
1169+
],
1170+
parameters={},
1171+
config={},
1172+
)
1173+
1174+
slices = list(partition_router.stream_slices())
1175+
assert slices == []
1176+
1177+
1178+
def test_substream_partition_router_closes_all_partitions_even_when_no_records():
1179+
"""
1180+
Test that cursor.close_partition() is called for all parent stream partitions,
1181+
even when a partition produces no parent records.
1182+
This validates that partition lifecycle is properly managed regardless of record count.
1183+
"""
1184+
mock_slices = [
1185+
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
1186+
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
1187+
StreamSlice(partition={"slice": "third"}, cursor_slice={}),
1188+
]
1189+
1190+
partition_1 = InMemoryPartition(
1191+
"partition_1",
1192+
"first_stream",
1193+
mock_slices[0],
1194+
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
1195+
)
1196+
partition_2 = InMemoryPartition(
1197+
"partition_2",
1198+
"first_stream",
1199+
mock_slices[1],
1200+
[],
1201+
)
1202+
partition_3 = InMemoryPartition(
1203+
"partition_3",
1204+
"first_stream",
1205+
mock_slices[2],
1206+
_build_records_for_slice([{"id": "record_3"}], mock_slices[2]),
1207+
)
1208+
1209+
mock_cursor = Mock()
1210+
mock_cursor.stream_slices.return_value = []
1211+
1212+
partition_router = SubstreamPartitionRouter(
1213+
parent_stream_configs=[
1214+
ParentStreamConfig(
1215+
stream=MockStream(
1216+
[partition_1, partition_2, partition_3],
1217+
"first_stream",
1218+
cursor=mock_cursor,
1219+
),
1220+
parent_key="id",
1221+
partition_field="partition_field",
1222+
parameters={},
1223+
config={},
1224+
)
1225+
],
1226+
parameters={},
1227+
config={},
1228+
)
1229+
1230+
slices = list(partition_router.stream_slices())
1231+
1232+
assert slices == [
1233+
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
1234+
{"partition_field": "record_3", "parent_slice": {"slice": "third"}},
1235+
]
1236+
1237+
assert mock_cursor.close_partition.call_count == 3
1238+
1239+
close_partition_calls = mock_cursor.close_partition.call_args_list
1240+
assert close_partition_calls[0][0][0] == partition_1
1241+
assert close_partition_calls[1][0][0] == partition_2
1242+
assert close_partition_calls[2][0][0] == partition_3
1243+
1244+
1245+
def test_substream_partition_router_closes_partition_even_when_parent_key_missing():
1246+
"""
1247+
Test that cursor.close_partition() is called even when the parent_key extraction
1248+
fails with a KeyError. This ensures partition lifecycle is properly managed
1249+
regardless of whether the slice can be emitted.
1250+
"""
1251+
mock_slices = [
1252+
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
1253+
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
1254+
]
1255+
1256+
# First partition has a record with the expected "id" key
1257+
partition_1 = InMemoryPartition(
1258+
"partition_1",
1259+
"first_stream",
1260+
mock_slices[0],
1261+
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
1262+
)
1263+
# Second partition has a record missing the "id" key (will cause KeyError)
1264+
partition_2 = InMemoryPartition(
1265+
"partition_2",
1266+
"first_stream",
1267+
mock_slices[1],
1268+
_build_records_for_slice([{"other_field": "value"}], mock_slices[1]),
1269+
)
1270+
1271+
mock_cursor = Mock()
1272+
mock_cursor.stream_slices.return_value = []
1273+
1274+
partition_router = SubstreamPartitionRouter(
1275+
parent_stream_configs=[
1276+
ParentStreamConfig(
1277+
stream=MockStream(
1278+
[partition_1, partition_2],
1279+
"first_stream",
1280+
cursor=mock_cursor,
1281+
),
1282+
parent_key="id",
1283+
partition_field="partition_field",
1284+
parameters={},
1285+
config={},
1286+
)
1287+
],
1288+
parameters={},
1289+
config={},
1290+
)
1291+
1292+
slices = list(partition_router.stream_slices())
1293+
1294+
# Only the first partition's record should produce a slice
1295+
# The second partition's record is missing the "id" key, so no slice is emitted
1296+
assert slices == [
1297+
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
1298+
]
1299+
1300+
# Both partitions should be closed, even though the second one had a KeyError
1301+
assert mock_cursor.close_partition.call_count == 2
1302+
1303+
close_partition_calls = mock_cursor.close_partition.call_args_list
1304+
assert close_partition_calls[0][0][0] == partition_1
1305+
assert close_partition_calls[1][0][0] == partition_2

0 commit comments

Comments
 (0)