Skip to content

Commit 58ffd0d

Browse files
fix(cdk): use skip_slice flag to ensure partition closure even on KeyError
- Replace 'continue' with 'skip_slice' flag when dpath.get fails with KeyError - This ensures close_partition() and ensure_at_least_one_state_emitted() are always called, even when partition value extraction fails - Add test for partition closure when parent_key is missing from record Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
1 parent d6c9701 commit 58ffd0d

2 files changed

Lines changed: 80 additions & 14 deletions

File tree

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def stream_slices(self) -> Iterable[StreamSlice]:
212212
for parent_record, is_last_record_in_slice in iterate_with_last_flag(
213213
partition.read()
214214
):
215+
skip_slice = True
215216
if parent_record is not None:
216217
# In the previous CDK implementation, state management was done internally by the stream.
217218
# However, this could cause issues when doing availability check for example as the availability
@@ -235,28 +236,30 @@ def stream_slices(self) -> Iterable[StreamSlice]:
235236
)
236237
except KeyError:
237238
# FIXME a log here would go a long way for debugging
238-
continue
239+
pass
240+
else:
241+
skip_slice = False
239242

240-
# Add extra fields
241-
extracted_extra_fields = self._extract_extra_fields(
242-
record_data, extra_fields
243-
)
243+
# Add extra fields
244+
extracted_extra_fields = self._extract_extra_fields(
245+
record_data, extra_fields
246+
)
244247

245-
if parent_stream_config.lazy_read_pointer:
246-
extracted_extra_fields = {
247-
"child_response": self._extract_child_response(
248-
record_data,
249-
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
250-
),
251-
**extracted_extra_fields,
252-
}
248+
if parent_stream_config.lazy_read_pointer:
249+
extracted_extra_fields = {
250+
"child_response": self._extract_child_response(
251+
record_data,
252+
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
253+
),
254+
**extracted_extra_fields,
255+
}
253256

254257
if is_last_record_in_slice:
255258
parent_stream.cursor.close_partition(partition)
256259
if is_last_slice:
257260
parent_stream.cursor.ensure_at_least_one_state_emitted()
258261

259-
if parent_record is not None:
262+
if not skip_slice:
260263
yield StreamSlice(
261264
partition={
262265
partition_field: partition_value,

unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,3 +1240,66 @@ def test_substream_partition_router_closes_all_partitions_even_when_no_records()
12401240
assert close_partition_calls[0][0][0] == partition_1
12411241
assert close_partition_calls[1][0][0] == partition_2
12421242
assert close_partition_calls[2][0][0] == partition_3
1243+
1244+
1245+
def test_substream_partition_router_closes_partition_even_when_parent_key_missing():
1246+
"""
1247+
Test that cursor.close_partition() is called even when the parent_key extraction
1248+
fails with a KeyError. This ensures partition lifecycle is properly managed
1249+
regardless of whether the slice can be emitted.
1250+
"""
1251+
mock_slices = [
1252+
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
1253+
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
1254+
]
1255+
1256+
# First partition has a record with the expected "id" key
1257+
partition_1 = InMemoryPartition(
1258+
"partition_1",
1259+
"first_stream",
1260+
mock_slices[0],
1261+
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
1262+
)
1263+
# Second partition has a record missing the "id" key (will cause KeyError)
1264+
partition_2 = InMemoryPartition(
1265+
"partition_2",
1266+
"first_stream",
1267+
mock_slices[1],
1268+
_build_records_for_slice([{"other_field": "value"}], mock_slices[1]),
1269+
)
1270+
1271+
mock_cursor = Mock()
1272+
mock_cursor.stream_slices.return_value = []
1273+
1274+
partition_router = SubstreamPartitionRouter(
1275+
parent_stream_configs=[
1276+
ParentStreamConfig(
1277+
stream=MockStream(
1278+
[partition_1, partition_2],
1279+
"first_stream",
1280+
cursor=mock_cursor,
1281+
),
1282+
parent_key="id",
1283+
partition_field="partition_field",
1284+
parameters={},
1285+
config={},
1286+
)
1287+
],
1288+
parameters={},
1289+
config={},
1290+
)
1291+
1292+
slices = list(partition_router.stream_slices())
1293+
1294+
# Only the first partition's record should produce a slice
1295+
# The second partition's record is missing the "id" key, so no slice is emitted
1296+
assert slices == [
1297+
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
1298+
]
1299+
1300+
# Both partitions should be closed, even though the second one had a KeyError
1301+
assert mock_cursor.close_partition.call_count == 2
1302+
1303+
close_partition_calls = mock_cursor.close_partition.call_args_list
1304+
assert close_partition_calls[0][0][0] == partition_1
1305+
assert close_partition_calls[1][0][0] == partition_2

0 commit comments

Comments
 (0)