|
20 | 20 | from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( |
21 | 21 | ParentStreamConfig, |
22 | 22 | SubstreamPartitionRouter, |
| 23 | + iterate_with_last_flag, |
23 | 24 | ) |
24 | 25 | from airbyte_cdk.sources.declarative.requesters.request_option import ( |
25 | 26 | RequestOption, |
@@ -1079,3 +1080,95 @@ def test_cartesian_product_stream_slicer_warning_log_message( |
1079 | 1080 | assert warning_message in logged_warnings |
1080 | 1081 | else: |
1081 | 1082 | assert warning_message not in logged_warnings |
| 1083 | + |
| 1084 | + |
| 1085 | +@pytest.mark.parametrize( |
| 1086 | + "input_iterable,expected_output", |
| 1087 | + [ |
| 1088 | + pytest.param([], [(None, True)], id="empty_generator_yields_none_sentinel"), |
| 1089 | + pytest.param([1], [(1, True)], id="single_item"), |
| 1090 | + pytest.param([1, 2], [(1, False), (2, True)], id="two_items"), |
| 1091 | + pytest.param([1, 2, 3], [(1, False), (2, False), (3, True)], id="three_items"), |
| 1092 | + pytest.param(["a", "b"], [("a", False), ("b", True)], id="string_items"), |
| 1093 | + ], |
| 1094 | +) |
| 1095 | +def test_iterate_with_last_flag(input_iterable, expected_output): |
| 1096 | + result = list(iterate_with_last_flag(input_iterable)) |
| 1097 | + assert result == expected_output |
| 1098 | + |
| 1099 | + |
| 1100 | +def test_substream_partition_router_no_cursor_update_when_partition_has_no_records(): |
| 1101 | + """ |
| 1102 | + Test that when a partition has no records, the cursor is still properly closed |
| 1103 | + but no slices are yielded for that partition. |
| 1104 | + This tests the fix for SubstreamPartitionRouter updating cursor value when no records |
| 1105 | + were read in partition. |
| 1106 | + """ |
| 1107 | + mock_slices = [ |
| 1108 | + StreamSlice(partition={"slice": "first"}, cursor_slice={}), |
| 1109 | + StreamSlice(partition={"slice": "second"}, cursor_slice={}), |
| 1110 | + ] |
| 1111 | + |
| 1112 | + partition_router = SubstreamPartitionRouter( |
| 1113 | + parent_stream_configs=[ |
| 1114 | + ParentStreamConfig( |
| 1115 | + stream=MockStream( |
| 1116 | + [ |
| 1117 | + InMemoryPartition( |
| 1118 | + "partition_1", |
| 1119 | + "first_stream", |
| 1120 | + mock_slices[0], |
| 1121 | + _build_records_for_slice( |
| 1122 | + [{"id": "record_1"}, {"id": "record_2"}], mock_slices[0] |
| 1123 | + ), |
| 1124 | + ), |
| 1125 | + InMemoryPartition( |
| 1126 | + "partition_2", |
| 1127 | + "first_stream", |
| 1128 | + mock_slices[1], |
| 1129 | + [], |
| 1130 | + ), |
| 1131 | + ], |
| 1132 | + "first_stream", |
| 1133 | + ), |
| 1134 | + parent_key="id", |
| 1135 | + partition_field="partition_field", |
| 1136 | + parameters={}, |
| 1137 | + config={}, |
| 1138 | + ) |
| 1139 | + ], |
| 1140 | + parameters={}, |
| 1141 | + config={}, |
| 1142 | + ) |
| 1143 | + |
| 1144 | + slices = list(partition_router.stream_slices()) |
| 1145 | + assert slices == [ |
| 1146 | + {"partition_field": "record_1", "parent_slice": {"slice": "first"}}, |
| 1147 | + {"partition_field": "record_2", "parent_slice": {"slice": "first"}}, |
| 1148 | + ] |
| 1149 | + |
| 1150 | + |
| 1151 | +def test_substream_partition_router_handles_empty_parent_partitions(): |
| 1152 | + """ |
| 1153 | + Test that when a parent stream generates no partitions (empty generator), |
| 1154 | + the stream_slices method returns early without errors. |
| 1155 | + """ |
| 1156 | + partition_router = SubstreamPartitionRouter( |
| 1157 | + parent_stream_configs=[ |
| 1158 | + ParentStreamConfig( |
| 1159 | + stream=MockStream( |
| 1160 | + [], |
| 1161 | + "first_stream", |
| 1162 | + ), |
| 1163 | + parent_key="id", |
| 1164 | + partition_field="partition_field", |
| 1165 | + parameters={}, |
| 1166 | + config={}, |
| 1167 | + ) |
| 1168 | + ], |
| 1169 | + parameters={}, |
| 1170 | + config={}, |
| 1171 | + ) |
| 1172 | + |
| 1173 | + slices = list(partition_router.stream_slices()) |
| 1174 | + assert slices == [] |
0 commit comments