From e2de3d4f29d3d24cfce54c61518fa2947981bc4e Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 15 Oct 2025 17:53:17 -0400 Subject: [PATCH 1/4] fix hubspot property chunking --- .../sources/declarative/retrievers/simple_retriever.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 56cb83fcd..e184bd32f 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -384,10 +384,7 @@ def _read_pages( response = None try: - if ( - self.additional_query_properties - and self.additional_query_properties.property_chunking - ): + if self.additional_query_properties: for properties in self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ): From 0bf488eba3c3e961b054ae321042d228c48ab000 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 15 Oct 2025 18:05:29 -0400 Subject: [PATCH 2/4] emit record when not chunking --- .../retrievers/simple_retriever.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index e184bd32f..79de4a69b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -398,15 +398,21 @@ def _read_pages( ) for current_record in records_generator_fn(response): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record + if self.additional_query_properties.property_chunking: + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) ) - ) - if merge_key: - _deep_merge(merged_records[merge_key], current_record) + if merge_key: + _deep_merge(merged_records[merge_key], current_record) + else: + # We should still emit records even if the record did not have a merge key + pagination_tracker.observe(current_record) + last_page_size += 1 + last_record = current_record + yield current_record else: - # We should still emit records even if the record did not have a merge key pagination_tracker.observe(current_record) last_page_size += 1 last_record = current_record From 69a490ba89bb62e580e0336bc08d6b792ba1d691 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 15 Oct 2025 22:07:42 +0000 Subject: [PATCH 3/4] Auto-fix lint and format issues --- .../sources/declarative/retrievers/simple_retriever.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 79de4a69b..7e40dc9c1 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -399,10 +399,8 @@ def _read_pages( for current_record in records_generator_fn(response): if self.additional_query_properties.property_chunking: - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record - ) + merge_key = self.additional_query_properties.property_chunking.get_merge_key( + current_record ) if merge_key: _deep_merge(merged_records[merge_key], current_record) From 5816cd8280a2fd9dcf71afd6f8b9a62740c00284 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 15 Oct 2025 18:46:52 -0400 Subject: [PATCH 4/4] adding test for query properties without propertychunking --- .../retrievers/test_simple_retriever.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index fda3a124b..63a375823 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1118,6 +1118,82 @@ def test_simple_retriever_with_additional_query_properties(): assert actual_records == expected_records +def test_simple_retriever_with_additional_query_properties_but_without_property_chunking(): + stream_name = "stream_name" + expected_records = [ + Record( + data={"id": "a", "field": "value_first_page"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "field": "value_second_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [{"whatever": 1}]}).encode("utf-8") + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "field": "value_first_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "b", "field": "value_second_page"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=None, + config=config, + parameters={}, + ) + + paginator = _mock_paginator() + paginator.next_page_token.side_effect = [{"next_page_token": 1}, None] + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + paginator=paginator, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 2 + assert actual_records == expected_records + assert requester.send_request.call_args_list[0].kwargs["stream_slice"].extra_fields + + def test_simple_retriever_with_additional_query_properties_single_chunk(): stream_name = "stream_name" expected_records = [