Skip to content

Commit 7a12e52

Browse files
test(cdk): add test for close_partition called on all partitions even when no records
Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
1 parent 17430ec commit 7a12e52

1 file changed

Lines changed: 67 additions & 0 deletions

File tree

unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,3 +1172,70 @@ def test_substream_partition_router_handles_empty_parent_partitions():
11721172

11731173
slices = list(partition_router.stream_slices())
11741174
assert slices == []
1175+
1176+
1177+
def test_substream_partition_router_closes_all_partitions_even_when_no_records():
1178+
"""
1179+
Test that cursor.close_partition() is called for all parent stream partitions,
1180+
even when a partition produces no parent records.
1181+
This validates that partition lifecycle is properly managed regardless of record count.
1182+
"""
1183+
mock_slices = [
1184+
StreamSlice(partition={"slice": "first"}, cursor_slice={}),
1185+
StreamSlice(partition={"slice": "second"}, cursor_slice={}),
1186+
StreamSlice(partition={"slice": "third"}, cursor_slice={}),
1187+
]
1188+
1189+
partition_1 = InMemoryPartition(
1190+
"partition_1",
1191+
"first_stream",
1192+
mock_slices[0],
1193+
_build_records_for_slice([{"id": "record_1"}], mock_slices[0]),
1194+
)
1195+
partition_2 = InMemoryPartition(
1196+
"partition_2",
1197+
"first_stream",
1198+
mock_slices[1],
1199+
[],
1200+
)
1201+
partition_3 = InMemoryPartition(
1202+
"partition_3",
1203+
"first_stream",
1204+
mock_slices[2],
1205+
_build_records_for_slice([{"id": "record_3"}], mock_slices[2]),
1206+
)
1207+
1208+
mock_cursor = Mock()
1209+
mock_cursor.stream_slices.return_value = []
1210+
1211+
partition_router = SubstreamPartitionRouter(
1212+
parent_stream_configs=[
1213+
ParentStreamConfig(
1214+
stream=MockStream(
1215+
[partition_1, partition_2, partition_3],
1216+
"first_stream",
1217+
cursor=mock_cursor,
1218+
),
1219+
parent_key="id",
1220+
partition_field="partition_field",
1221+
parameters={},
1222+
config={},
1223+
)
1224+
],
1225+
parameters={},
1226+
config={},
1227+
)
1228+
1229+
slices = list(partition_router.stream_slices())
1230+
1231+
assert slices == [
1232+
{"partition_field": "record_1", "parent_slice": {"slice": "first"}},
1233+
{"partition_field": "record_3", "parent_slice": {"slice": "third"}},
1234+
]
1235+
1236+
assert mock_cursor.close_partition.call_count == 3
1237+
1238+
close_partition_calls = mock_cursor.close_partition.call_args_list
1239+
assert close_partition_calls[0][0][0] == partition_1
1240+
assert close_partition_calls[1][0][0] == partition_2
1241+
assert close_partition_calls[2][0][0] == partition_3

0 commit comments

Comments
 (0)