diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 56cb83fcd..7e40dc9c1 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -384,10 +384,7 @@ def _read_pages( response = None try: - if ( - self.additional_query_properties - and self.additional_query_properties.property_chunking - ): + if self.additional_query_properties: for properties in self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ): @@ -401,15 +398,19 @@ def _read_pages( ) for current_record in records_generator_fn(response): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( + if self.additional_query_properties.property_chunking: + merge_key = self.additional_query_properties.property_chunking.get_merge_key( current_record ) - ) - if merge_key: - _deep_merge(merged_records[merge_key], current_record) + if merge_key: + _deep_merge(merged_records[merge_key], current_record) + else: + # We should still emit records even if the record did not have a merge key + pagination_tracker.observe(current_record) + last_page_size += 1 + last_record = current_record + yield current_record else: - # We should still emit records even if the record did not have a merge key pagination_tracker.observe(current_record) last_page_size += 1 last_record = current_record diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index fda3a124b..63a375823 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1118,6 +1118,82 @@ def test_simple_retriever_with_additional_query_properties(): assert actual_records == expected_records +def test_simple_retriever_with_additional_query_properties_but_without_property_chunking(): + stream_name = "stream_name" + expected_records = [ + Record( + data={"id": "a", "field": "value_first_page"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "field": "value_second_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [{"whatever": 1}]}).encode("utf-8") + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "field": "value_first_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "b", "field": "value_second_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=None, + config=config, + parameters={}, + ) + + paginator = _mock_paginator() + paginator.next_page_token.side_effect = [{"next_page_token": 1}, None] + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + paginator=paginator, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 2 + assert actual_records == expected_records + assert requester.send_request.call_args_list[0].kwargs["stream_slice"].extra_fields + + def test_simple_retriever_with_additional_query_properties_single_chunk(): stream_name = "stream_name" expected_records = [