From a3b35631a845c0cdee8828729f34207944d545ab Mon Sep 17 00:00:00 2001 From: brianjlai Date: Mon, 13 Oct 2025 21:41:46 -0700 Subject: [PATCH 1/6] add configured catalog to SimpleRetriever and only fetch properties that are selected in the json_schema --- .../concurrent_declarative_source.py | 1 + .../parsers/model_to_component_factory.py | 18 +++ .../query_properties/property_chunking.py | 9 +- .../query_properties/query_properties.py | 30 +++- .../retrievers/simple_retriever.py | 9 +- .../test_model_to_component_factory.py | 31 +++- .../query_properties/test_query_properties.py | 148 +++++++++++++++++- 7 files changed, 230 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index ef666dc51..781bb64d1 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -169,6 +169,7 @@ def __init__( component_factory = ModelToComponentFactory( emit_connector_builder_messages=emit_connector_builder_messages, message_repository=ConcurrentMessageRepository(queue, message_repository), + configured_catalog=catalog, connector_state_manager=self._connector_state_manager, max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f58aa30e5..8344ec1bc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,6 +26,7 @@ get_type_hints, ) +from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream from isodate import parse_duration from pydantic.v1 import BaseModel from requests import Response @@ -42,6 +43,7 @@ AirbyteStateMessage, AirbyteStateType, AirbyteStreamState, + ConfiguredAirbyteCatalog, FailureType, Level, StreamDescriptor, @@ -668,6 +670,7 @@ def __init__( message_repository: Optional[MessageRepository] = None, connector_state_manager: Optional[ConnectorStateManager] = None, max_concurrent_async_job_count: Optional[int] = None, + configured_catalog: Optional[ConfiguredAirbyteCatalog] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -678,6 +681,9 @@ def __init__( self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) + self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream( + configured_catalog + ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) @@ -796,6 +802,14 @@ def _init_mappings(self) -> None: # Needed for the case where we need to perform a second parse on the fields of a custom component self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR} + @staticmethod + def _create_stream_name_to_configured_stream( + configured_catalog: Optional[ConfiguredAirbyteCatalog], + ) -> Mapping[str, ConfiguredAirbyteStream]: + if configured_catalog is None: + return {} + return {stream.stream.name: stream for stream in configured_catalog.streams} + def create_component( self, model_type: Type[BaseModel], @@ -3302,6 +3316,8 @@ def _get_url(req: Requester) -> str: model.ignore_stream_slicer_parameters_on_paginated_requests or False ) + configured_stream = self._stream_name_to_configured_stream.get(name) + if ( model.partition_router and isinstance(model.partition_router, SubstreamPartitionRouterModel) @@ -3337,6 +3353,7 @@ def _get_url(req: Requester) -> str: request_option_provider=request_options_provider, cursor=None, config=config, + configured_airbyte_stream=configured_stream, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters=model.parameters or {}, ) @@ -3358,6 +3375,7 @@ def _get_url(req: Requester) -> str: request_option_provider=request_options_provider, cursor=None, config=config, + configured_airbyte_stream=configured_stream, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, additional_query_properties=query_properties, log_formatter=self._get_log_formatter(log_formatter, name), diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 06ade8253..0a3400df8 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -2,7 +2,7 @@ from dataclasses import InitVar, dataclass from enum import Enum -from typing import Any, Iterable, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional, Set from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( @@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) def get_request_property_chunks( - self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] + self, + property_fields: Iterable[str], + always_include_properties: Optional[List[str]], + configured_properties: Optional[Set[str]], ) -> Iterable[List[str]]: if not self.property_limit: single_property_chunk = list(property_fields) @@ -53,6 +56,8 @@ def get_request_property_chunks( for property_field in property_fields: # If property_limit_type is not defined, we default to property_count which is just an incrementing count # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size + if configured_properties is not None and property_field not in configured_properties: + continue property_field_size = ( len(property_field) + 3 # The +3 represents the extra characters for encoding the delimiter in between properties diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 37b26d171..edcee66da 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -1,8 +1,9 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, Optional, Set, Union +from airbyte_cdk.models import ConfiguredAirbyteStream from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, PropertyChunking, @@ -26,13 +27,17 @@ class QueryProperties: parameters: InitVar[Mapping[str, Any]] def get_request_property_chunks( - self, stream_slice: Optional[StreamSlice] = None + self, + stream_slice: Optional[StreamSlice] = None, + configured_stream: Optional[ConfiguredAirbyteStream] = None, ) -> Iterable[List[str]]: """ Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable. :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object + :param configured_stream: The customer configured stream being synced which is needed to identify which + record fields to query for and emit. """ fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): @@ -40,9 +45,26 @@ def get_request_property_chunks( else: fields = self.property_list if self.property_list else [] + configured_properties = self._get_configured_properties(configured_stream) + if self.property_chunking: yield from self.property_chunking.get_request_property_chunks( - property_fields=fields, always_include_properties=self.always_include_properties + property_fields=fields, + always_include_properties=self.always_include_properties, + configured_properties=configured_properties, ) else: - yield list(fields) + # A schema might have no extra properties enabled which is valid and represented by an empty set + if configured_properties is not None: + yield from [[field for field in fields if field in configured_properties]] + else: + yield list(fields) + + @staticmethod + def _get_configured_properties( + configured_stream: Optional[ConfiguredAirbyteStream] = None, + ) -> Optional[Set[str]]: + if configured_stream: + # todo double check that configured catalog only contains enabled fields + return set(configured_stream.stream.json_schema.get("properties", {}).keys()) + return None diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 56cb83fcd..cec37065a 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -25,7 +25,10 @@ from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.models import AirbyteMessage +from airbyte_cdk.models import ( + AirbyteMessage, + ConfiguredAirbyteStream, +) from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( @@ -97,6 +100,7 @@ class SimpleRetriever(Retriever): cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False additional_query_properties: Optional[QueryProperties] = None + configured_airbyte_stream: Optional[ConfiguredAirbyteStream] = None log_formatter: Optional[Callable[[requests.Response], Any]] = None pagination_tracker_factory: Callable[[], PaginationTracker] = field( default_factory=lambda: lambda: PaginationTracker() @@ -389,7 +393,8 @@ def _read_pages( and self.additional_query_properties.property_chunking ): for properties in self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice + stream_slice=stream_slice, + configured_stream=self.configured_airbyte_stream, ): stream_slice = StreamSlice( partition=stream_slice.partition or {}, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 903eeb14c..27e55446a 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -12,6 +12,13 @@ import freezegun import pytest import requests +from airbyte_protocol_dataclasses.models.airbyte_protocol import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) from freezegun.api import FakeDatetime from pydantic.v1 import ValidationError @@ -230,6 +237,28 @@ def test_create_component_type_mismatch(): factory.create_component(CheckStreamModel, manifest["check"], {}) +def test_create_component_with_configured_catalog(): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + factory_with_catalog = ModelToComponentFactory(configured_catalog=configured_catalog) + + assert factory_with_catalog._stream_name_to_configured_stream == { + "test": configured_catalog.streams[0] + } + + def test_full_config_stream(): content = """ decoder: @@ -1217,7 +1246,7 @@ def test_stream_with_custom_requester_and_query_properties(requests_mock): http_method: "GET" request_parameters: not_query: 1 - query: + query: type: QueryProperties property_list: - id diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index e67383951..26fc91f49 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -1,7 +1,15 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. +from typing import List from unittest.mock import Mock +from airbyte_protocol_dataclasses.models import ( + AirbyteStream, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, QueryProperties, @@ -16,7 +24,28 @@ CONFIG = {} +def _create_configured_airbyte_stream(configured_properties: List[str]): + return ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="nonary_game_01", + namespace=None, + json_schema={ + "properties": { + property: {"type": ["null", "string"]} for property in configured_properties + } + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + ) + + def test_get_request_property_chunks_static_list_with_chunking(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june"] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) query_properties = QueryProperties( @@ -43,15 +72,32 @@ def test_get_request_property_chunks_static_list_with_chunking(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list( + query_properties.get_request_property_chunks( + stream_slice=stream_slice, configured_stream=configured_airbyte_stream + ) + ) - assert len(property_chunks) == 3 - assert property_chunks[0] == ["ace", "snake", "santa"] - assert property_chunks[1] == ["clover", "junpei", "june"] - assert property_chunks[2] == ["seven", "lotus", "nine"] + assert len(property_chunks) == 2 + assert property_chunks[0] == ["santa", "clover", "junpei"] + assert property_chunks[1] == ["june"] def test_get_request_property_chunks_static_list_with_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + [ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) query_properties = QueryProperties( @@ -78,7 +124,11 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list( + query_properties.get_request_property_chunks( + stream_slice=stream_slice, configured_stream=configured_airbyte_stream + ) + ) assert len(property_chunks) == 3 assert property_chunks[0] == ["zero", "ace", "snake", "santa"] @@ -87,6 +137,10 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( def test_get_request_property_chunks_dynamic_endpoint(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) @@ -108,8 +162,88 @@ def test_get_request_property_chunks_dynamic_endpoint(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list( + query_properties.get_request_property_chunks( + stream_slice=stream_slice, configured_stream=configured_airbyte_stream + ) + ) assert len(property_chunks) == 2 assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"] + + +def test_get_request_property_chunks_with_configured_catalog_static_list(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + # Simulate configured_airbyte_stream whose json_schema only enables 'luna', 'phi', 'sigma' + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june"] + ) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list( + query_properties.get_request_property_chunks( + stream_slice=stream_slice, configured_stream=configured_airbyte_stream + ) + ) + + assert len(property_chunks) == 2 + assert property_chunks[0] == ["santa", "clover", "junpei"] + assert property_chunks[1] == ["june"] + + +def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): + configured_airbyte_stream = _create_configured_airbyte_stream(["luna", "phi", "sigma"]) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) + properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + + query_properties = QueryProperties( + property_list=properties_from_endpoint_mock, + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=5, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list( + query_properties.get_request_property_chunks( + stream_slice=stream_slice, configured_stream=configured_airbyte_stream + ) + ) + + assert len(property_chunks) == 1 + assert property_chunks[0] == ["luna", "phi", "sigma"] From de0b73f287643e7ebe4346b150fc41548099a5ad Mon Sep 17 00:00:00 2001 From: brianjlai Date: Tue, 14 Oct 2025 14:37:05 -0700 Subject: [PATCH 2/6] fix tests and clean up a few parts of the code --- .../parsers/model_to_component_factory.py | 8 +-- .../query_properties/query_properties.py | 13 +++-- .../test_property_chunking.py | 7 ++- .../test_property_chunking.py | 53 ++++++++++++++++++- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 8344ec1bc..1a5304ae4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -806,9 +806,11 @@ def _init_mappings(self) -> None: def _create_stream_name_to_configured_stream( configured_catalog: Optional[ConfiguredAirbyteCatalog], ) -> Mapping[str, ConfiguredAirbyteStream]: - if configured_catalog is None: - return {} - return {stream.stream.name: stream for stream in configured_catalog.streams} + return ( + {stream.stream.name: stream for stream in configured_catalog.streams} + if configured_catalog + else {} + ) def create_component( self, diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index edcee66da..4f17b5172 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -39,14 +39,14 @@ def get_request_property_chunks( :param configured_stream: The customer configured stream being synced which is needed to identify which record fields to query for and emit. """ + configured_properties = self._get_configured_properties(configured_stream) + fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) else: fields = self.property_list if self.property_list else [] - configured_properties = self._get_configured_properties(configured_stream) - if self.property_chunking: yield from self.property_chunking.get_request_property_chunks( property_fields=fields, @@ -54,7 +54,6 @@ def get_request_property_chunks( configured_properties=configured_properties, ) else: - # A schema might have no extra properties enabled which is valid and represented by an empty set if configured_properties is not None: yield from [[field for field in fields if field in configured_properties]] else: @@ -64,7 +63,13 @@ def get_request_property_chunks( def _get_configured_properties( configured_stream: Optional[ConfiguredAirbyteStream] = None, ) -> Optional[Set[str]]: + """ + Returns the set of properties that have been selected for the configured stream. The intent being that + we should only query for selected properties not all since disabled properties are discarded. + + When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. + This is different from the empty set where the json_schema was empty and no schema fields were selected. + """ if configured_stream: - # todo double check that configured catalog only contains enabled fields return set(configured_stream.stream.json_schema.get("properties", {}).keys()) return None diff --git a/unit_tests/connector_builder/test_property_chunking.py b/unit_tests/connector_builder/test_property_chunking.py index ccae4a336..88346b78d 100644 --- a/unit_tests/connector_builder/test_property_chunking.py +++ b/unit_tests/connector_builder/test_property_chunking.py @@ -186,7 +186,12 @@ "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": {}, + "properties": { + "one": {"type": ["null", "string"]}, + "two": {"type": ["null", "string"]}, + "three": {"type": ["null", "string"]}, + "four": {"type": ["null", "string"]}, + }, }, "supported_sync_modes": ["full_refresh"], "source_defined_cursor": False, diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index 671cc26bc..c20658e14 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -72,6 +72,7 @@ def test_get_request_property_chunks( property_limit, expected_property_chunks, ): + configured_properties = set(property_fields) property_fields = iter(property_fields) property_chunking = PropertyChunking( property_limit_type=property_limit_type, @@ -83,7 +84,9 @@ def test_get_request_property_chunks( property_chunks = list( property_chunking.get_request_property_chunks( - property_fields=property_fields, always_include_properties=always_include_properties + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=configured_properties, ) ) @@ -92,6 +95,54 @@ def test_get_request_property_chunks( assert property_chunks[i] == expected_property_chunk +def test_get_request_property_chunks_empty_configured_properties(): + expected_property_chunks = [["white", "lotus"]] + + always_include_properties = ["white", "lotus"] + property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"] + configured_properties = set() + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=configured_properties, + ) + ) + assert property_chunks == expected_property_chunks + + +def test_get_request_property_chunks_none_configured_properties(): + expected_property_chunks = [ + ["white", "lotus", "maui", "taormina"], + ["white", "lotus", "koh_samui", "saint_jean_cap_ferrat"], + ] + + always_include_properties = ["white", "lotus"] + property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"] + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=None, + ) + ) + assert property_chunks == expected_property_chunks + + def test_get_merge_key(): record = Record(stream_name="test", data={"id": "0"}) property_chunking = PropertyChunking( From 15d8769eb5c29ea4e4f9cf5f2f73d3e34a0082d8 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Sun, 19 Oct 2025 20:28:04 -0700 Subject: [PATCH 3/6] introduce a JsonSchemaPropertySelector component to allow for more flexibility to modify selected properties --- .../declarative_component_schema.yaml | 36 +++++- .../models/declarative_component_schema.py | 30 +++++ .../parsers/model_to_component_factory.py | 50 +++++++- .../json_schema_property_selector.py | 38 ++++++ .../property_selector/__init__.py | 10 ++ .../json_schema_property_selector.py | 52 ++++++++ .../property_selector/property_selector.py | 24 ++++ .../query_properties/query_properties.py | 22 +--- .../retrievers/simple_retriever.py | 7 +- .../test_model_to_component_factory.py | 19 +++ .../property_selector/__init__.py | 1 + .../test_json_schema_property_selector.py | 94 ++++++++++++++ .../query_properties/test_query_properties.py | 119 +++++++++++++----- 13 files changed, 443 insertions(+), 59 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py create mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py create mode 100644 unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a77bc34ea..f834ca20e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2520,6 +2520,34 @@ definitions: type: type: string enum: [JsonlDecoder] + JsonSchemaPropertySelector: + title: Json Schema Property Selector + description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record. + type: object + required: + - type + properties: + type: + type: string + enum: [JsonSchemaPropertySelector] + transformations: + title: Transformations + description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests. + linkable: true + type: array + items: + anyOf: + - "$ref": "#/definitions/AddFields" + - "$ref": "#/definitions/RemoveFields" + - "$ref": "#/definitions/KeysToLower" + - "$ref": "#/definitions/KeysToSnakeCase" + - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" + - "$ref": "#/definitions/KeysReplace" + - "$ref": "#/definitions/CustomTransformation" + $parameters: + type: object + additionalProperties: true KeysToLower: title: Keys to Lower Case description: A transformation that renames all keys to lower case. @@ -3410,6 +3438,10 @@ definitions: title: Property Chunking description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request. "$ref": "#/definitions/PropertyChunking" + property_selector: + title: Property Selector + description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request. + "$ref": "#/definitions/JsonSchemaPropertySelector" $parameters: type: object additionalProperties: true @@ -3746,7 +3778,7 @@ definitions: properties: type: type: string - enum: [ PaginationReset ] + enum: [PaginationReset] action: type: string enum: @@ -3763,7 +3795,7 @@ definitions: properties: type: type: string - enum: [ PaginationResetLimits ] + enum: [PaginationResetLimits] number_of_records: type: integer GzipDecoder: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 915f942d0..35186ef71 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel): ) +class JsonSchemaPropertySelector(BaseModel): + type: Literal["JsonSchemaPropertySelector"] + transformations: Optional[ + List[ + Union[ + AddFields, + RemoveFields, + KeysToLower, + KeysToSnakeCase, + FlattenFields, + DpathFlattenFields, + KeysReplace, + CustomTransformation, + ] + ] + ] = Field( + None, + description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.", + title="Transformations", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ListPartitionRouter(BaseModel): type: Literal["ListPartitionRouter"] cursor_field: str = Field( @@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel): description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.", title="Property Chunking", ) + property_selector: Optional[JsonSchemaPropertySelector] = Field( + None, + description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.", + title="Property Selector", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1a5304ae4..799a9c0bb 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -316,6 +316,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -503,6 +506,9 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import ( GroupByKey, ) @@ -740,6 +746,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector, GzipDecoderModel: self.create_gzip_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, @@ -3003,7 +3010,7 @@ def create_property_chunking( ) def create_query_properties( - self, model: QueryPropertiesModel, config: Config, **kwargs: Any + self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any ) -> QueryProperties: if isinstance(model.property_list, list): property_list = model.property_list @@ -3020,10 +3027,43 @@ def create_query_properties( else None ) + property_selector = ( + self._create_component_from_model( + model=model.property_selector, config=config, stream_name=stream_name, **kwargs + ) + if model.property_selector + else None + ) + return QueryProperties( property_list=property_list, always_include_properties=model.always_include_properties, property_chunking=property_chunking, + property_selector=property_selector, + config=config, + parameters=model.parameters or {}, + ) + + def create_json_schema_property_selector( + self, + model: JsonSchemaPropertySelectorModel, + config: Config, + *, + stream_name: str, + **kwargs: Any, + ) -> JsonSchemaPropertySelector: + configured_stream = self._stream_name_to_configured_stream.get(stream_name) + + transformations = [] + if model.transformations: + for transformation_model in model.transformations: + transformations.append( + self._create_component_from_model(model=transformation_model, config=config) + ) + + return JsonSchemaPropertySelector( + configured_stream=configured_stream, + properties_transformations=transformations, config=config, parameters=model.parameters or {}, ) @@ -3251,7 +3291,7 @@ def _get_url(req: Requester) -> str: if len(query_properties_definitions) == 1: query_properties = self._create_component_from_model( - model=query_properties_definitions[0], config=config + model=query_properties_definitions[0], stream_name=name, config=config ) # Removes QueryProperties components from the interpolated mappings because it has been designed @@ -3277,11 +3317,13 @@ def _get_url(req: Requester) -> str: query_properties = self.create_query_properties( model=query_properties_definition, + stream_name=name, config=config, ) elif hasattr(model.requester, "query_properties") and model.requester.query_properties: query_properties = self.create_query_properties( model=model.requester.query_properties, + stream_name=name, config=config, ) @@ -3318,8 +3360,6 @@ def _get_url(req: Requester) -> str: model.ignore_stream_slicer_parameters_on_paginated_requests or False ) - configured_stream = self._stream_name_to_configured_stream.get(name) - if ( model.partition_router and isinstance(model.partition_router, SubstreamPartitionRouterModel) @@ -3355,7 +3395,6 @@ def _get_url(req: Requester) -> str: request_option_provider=request_options_provider, cursor=None, config=config, - configured_airbyte_stream=configured_stream, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters=model.parameters or {}, ) @@ -3377,7 +3416,6 @@ def _get_url(req: Requester) -> str: request_option_provider=request_options_provider, cursor=None, config=config, - configured_airbyte_stream=configured_stream, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, additional_query_properties=query_properties, log_formatter=self._get_log_formatter(log_formatter, name), diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py new file mode 100644 index 000000000..c25d81bba --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py @@ -0,0 +1,38 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass, field +from typing import Any, List, Mapping, Optional, Set + +from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config + + +@dataclass +class JsonSchemaPropertySelector: + """ + A class that contains a list of transformations to apply to properties. + """ + + configured_stream: ConfiguredAirbyteStream + config: Config + parameters: InitVar[Mapping[str, Any]] + properties_transformations: List[RecordTransformation] = field(default_factory=lambda: []) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters + + def select(self) -> Set[str]: + properties = set() + for schema_property in self.configured_stream.stream.json_schema.get( + "properties", {} + ).keys(): + if self.properties_transformations: + for transformation in self.properties_transformations: + transformation.transform( + schema_property, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected + config=self.config, + ) + properties.add(schema_property) + return properties diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py new file mode 100644 index 000000000..e9dbbd8d9 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import ( + JsonSchemaPropertySelector, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import ( + PropertySelector, +) + +__all__ = ["JsonSchemaPropertySelector", "PropertySelector"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py new file mode 100644 index 000000000..0186d599e --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py @@ -0,0 +1,52 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass, field +from typing import Any, List, Mapping, Optional, Set + +from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import ( + PropertySelector, +) +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config + + +@dataclass +class JsonSchemaPropertySelector(PropertySelector): + """ + A class that contains a list of transformations to apply to properties. + """ + + config: Config + parameters: InitVar[Mapping[str, Any]] + # For other non-read operations, there is no configured catalog and therefore no schema selection + configured_stream: Optional[ConfiguredAirbyteStream] = None + properties_transformations: List[RecordTransformation] = field(default_factory=lambda: []) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters + + def select(self) -> Optional[Set[str]]: + """ + Returns the set of properties that have been selected for the configured stream. The intent being that + we should only query for selected properties not all since disabled properties are discarded. + + When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. + This is different from the empty set where the json_schema was empty and no schema fields were selected. + """ + + # For CHECK/DISCOVER operations, there is no catalog and therefor no configured stream or selected + # columns. In this case we return None which is interpreted by the QueryProperties component to not + # perform any filtering of schema properties and fetch all of them + if self.configured_stream is None: + return None + + schema_properties = self.configured_stream.stream.json_schema.get("properties", {}) + if self.properties_transformations: + for transformation in self.properties_transformations: + transformation.transform( + schema_properties, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected + config=self.config, + ) + return set(schema_properties.keys()) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py new file mode 100644 index 000000000..bd1d05c75 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py @@ -0,0 +1,24 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional, Set + + +@dataclass +class PropertySelector(ABC): + """ + Describes the interface for selecting and transforming properties from a configured stream's schema + to determine which properties should be queried from the API. + """ + + @abstractmethod + def select(self) -> Optional[Set[str]]: + """ + Selects and returns the set of properties that should be queried from the API based on the + configured stream's schema and any applicable transformations. + + Returns: + Set[str]: The set of property names to query + """ + pass diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 4f17b5172..f671c2ade 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -8,6 +8,9 @@ PropertiesFromEndpoint, PropertyChunking, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + PropertySelector, +) from airbyte_cdk.sources.types import Config, StreamSlice @@ -23,13 +26,13 @@ class QueryProperties: property_list: Optional[Union[List[str], PropertiesFromEndpoint]] always_include_properties: Optional[List[str]] property_chunking: Optional[PropertyChunking] + property_selector: Optional[PropertySelector] config: Config parameters: InitVar[Mapping[str, Any]] def get_request_property_chunks( self, stream_slice: Optional[StreamSlice] = None, - configured_stream: Optional[ConfiguredAirbyteStream] = None, ) -> Iterable[List[str]]: """ Uses the defined property_list to fetch the total set of properties dynamically or from a static list @@ -39,7 +42,7 @@ def get_request_property_chunks( :param configured_stream: The customer configured stream being synced which is needed to identify which record fields to query for and emit. """ - configured_properties = self._get_configured_properties(configured_stream) + configured_properties = self.property_selector.select() if self.property_selector else None fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): @@ -58,18 +61,3 @@ def get_request_property_chunks( yield from [[field for field in fields if field in configured_properties]] else: yield list(fields) - - @staticmethod - def _get_configured_properties( - configured_stream: Optional[ConfiguredAirbyteStream] = None, - ) -> Optional[Set[str]]: - """ - Returns the set of properties that have been selected for the configured stream. The intent being that - we should only query for selected properties not all since disabled properties are discarded. - - When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. - This is different from the empty set where the json_schema was empty and no schema fields were selected. - """ - if configured_stream: - return set(configured_stream.stream.json_schema.get("properties", {}).keys()) - return None diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index cec37065a..10b472caf 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -25,10 +25,7 @@ from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.models import ( - AirbyteMessage, - ConfiguredAirbyteStream, -) +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( @@ -100,7 +97,6 @@ class SimpleRetriever(Retriever): cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False additional_query_properties: Optional[QueryProperties] = None - configured_airbyte_stream: Optional[ConfiguredAirbyteStream] = None log_formatter: Optional[Callable[[requests.Response], Any]] = None pagination_tracker_factory: Callable[[], PaginationTracker] = field( default_factory=lambda: lambda: PaginationTracker() @@ -394,7 +390,6 @@ def _read_pages( ): for properties in self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice, - configured_stream=self.configured_airbyte_stream, ): stream_slice = StreamSlice( partition=stream_slice.partition or {}, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 27e55446a..4463358c0 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -140,6 +140,9 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, @@ -163,6 +166,9 @@ ) from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy @@ -4510,6 +4516,12 @@ def test_simple_retriever_with_query_properties(): record_merge_strategy: type: GroupByKeyMergeStrategy key: ["id"] + property_selector: + type: JsonSchemaPropertySelector + transformations: + - type: KeysReplace + old: "properties_" + new: "" analytics_stream: type: DeclarativeStream incremental_sync: @@ -4551,6 +4563,13 @@ def test_simple_retriever_with_query_properties(): ] assert query_properties.always_include_properties == ["id"] + property_selector = query_properties.property_selector + assert isinstance(property_selector, JsonSchemaPropertySelector) + assert len(property_selector.properties_transformations) == 1 + assert property_selector.properties_transformations == [ + KeysReplaceTransformation(old="properties_", new="", parameters={}) + ] + property_chunking = retriever.additional_query_properties.property_chunking assert isinstance(property_chunking, PropertyChunking) assert property_chunking.property_limit_type == PropertyLimitType.property_count diff --git a/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py b/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py new file mode 100644 index 000000000..58b636bf9 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. diff --git a/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py b/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py new file mode 100644 index 000000000..12f1d8d93 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py @@ -0,0 +1,94 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from typing import List, Optional, Set + +import pytest +from airbyte_protocol_dataclasses.models import ( + AirbyteStream, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) +from airbyte_cdk.sources.declarative.transformations import RecordTransformation, RemoveFields +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +CONFIG = {} + + +def _create_configured_airbyte_stream(configured_properties: List[str]): + return ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="players", + namespace=None, + json_schema={ + "properties": { + property: {"type": ["null", "string"]} for property in configured_properties + } + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + ) + + +@pytest.mark.parametrize( + "configured_stream, transformations, expected_properties", + [ + pytest.param( + # Test case 1: configured stream with 5 fields, RemoveFields and KeysReplaceTransformation + _create_configured_airbyte_stream( + ["id", "first_name", "last_name", "number", "team", "properties_statistics"] + ), + [ + RemoveFields(field_pointers=[["id"]], parameters={}), + KeysReplaceTransformation(old="properties_", new="", parameters={}), + ], + {"first_name", "last_name", "number", "team", "statistics"}, + id="test_select_properties_with_transformations", + ), + pytest.param( + None, + [], + None, + id="configured_stream_is_none", + ), + pytest.param( + # Test case 3: configured stream has no properties + ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="players", + namespace=None, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ), + ), + [], + set(), + id="configured_stream_no_properties_key_in_json_schema", + ), + ], +) +def test_select_properties( + configured_stream: Optional[ConfiguredAirbyteStream], + transformations: List[RecordTransformation], + expected_properties: Optional[Set[str]], +): + json_schema_property_selector = JsonSchemaPropertySelector( + configured_stream=configured_stream, + properties_transformations=transformations, + config=CONFIG, + parameters={}, + ) + + selected_properties = json_schema_property_selector.select() + + assert selected_properties == expected_properties diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index 26fc91f49..58e38b183 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -18,7 +18,11 @@ PropertyChunking, PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.declarative.transformations import RemoveFields from airbyte_cdk.sources.types import StreamSlice CONFIG = {} @@ -41,9 +45,9 @@ def _create_configured_airbyte_stream(configured_properties: List[str]): ) -def test_get_request_property_chunks_static_list_with_chunking(): +def test_get_request_property_chunks_static_list_with_chunking_property_selection(): configured_airbyte_stream = _create_configured_airbyte_stream( - ["santa", "clover", "junpei", "june"] + ["santa", "clover", "junpei", "june", "remove_me"] ) stream_slice = StreamSlice(cursor_slice={}, partition={}) @@ -59,6 +63,7 @@ def test_get_request_property_chunks_static_list_with_chunking(): "seven", "lotus", "nine", + "remove_me", ], always_include_properties=None, property_chunking=PropertyChunking( @@ -68,15 +73,19 @@ def test_get_request_property_chunks_static_list_with_chunking(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) - property_chunks = list( - query_properties.get_request_property_chunks( - stream_slice=stream_slice, configured_stream=configured_airbyte_stream - ) - ) + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) assert len(property_chunks) == 2 assert property_chunks[0] == ["santa", "clover", "junpei"] @@ -120,15 +129,19 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) - property_chunks = list( - query_properties.get_request_property_chunks( - stream_slice=stream_slice, configured_stream=configured_airbyte_stream - ) - ) + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) assert len(property_chunks) == 3 assert property_chunks[0] == ["zero", "ace", "snake", "santa"] @@ -158,15 +171,19 @@ def test_get_request_property_chunks_dynamic_endpoint(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) - property_chunks = list( - query_properties.get_request_property_chunks( - stream_slice=stream_slice, configured_stream=configured_airbyte_stream - ) - ) + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) assert len(property_chunks) == 2 assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] @@ -200,15 +217,19 @@ def test_get_request_property_chunks_with_configured_catalog_static_list(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) - property_chunks = list( - query_properties.get_request_property_chunks( - stream_slice=stream_slice, configured_stream=configured_airbyte_stream - ) - ) + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) assert len(property_chunks) == 2 assert property_chunks[0] == ["santa", "clover", "junpei"] @@ -216,7 +237,9 @@ def test_get_request_property_chunks_with_configured_catalog_static_list(): def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): - configured_airbyte_stream = _create_configured_airbyte_stream(["luna", "phi", "sigma"]) + configured_airbyte_stream = _create_configured_airbyte_stream( + ["luna", "phi", "sigma", "remove_me"] + ) stream_slice = StreamSlice(cursor_slice={}, partition={}) @@ -235,15 +258,55 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) - property_chunks = list( - query_properties.get_request_property_chunks( - stream_slice=stream_slice, configured_stream=configured_airbyte_stream - ) - ) + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) assert len(property_chunks) == 1 assert property_chunks[0] == ["luna", "phi", "sigma"] + + +def test_get_request_property_no_property_selection(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + property_selector=None, + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["ace", "snake", "santa"] + assert property_chunks[1] == ["clover", "junpei", "june"] + assert property_chunks[2] == ["seven", "lotus", "nine"] From 4b872d621209cee717dd520e24a230052da650d8 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Sun, 19 Oct 2025 20:41:51 -0700 Subject: [PATCH 4/6] fix tests --- .../sources/declarative/retrievers/test_simple_retriever.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 63a375823..5caec9a34 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1096,6 +1096,7 @@ def test_simple_retriever_with_additional_query_properties(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, ) @@ -1167,6 +1168,7 @@ def test_simple_retriever_with_additional_query_properties_but_without_property_ property_list=["first_name", "last_name", "nonary", "bracelet"], always_include_properties=[], property_chunking=None, + property_selector=None, config=config, parameters={}, ) @@ -1349,6 +1351,7 @@ def test_simple_retriever_with_additional_query_properties_single_chunk(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, ) @@ -1508,6 +1511,7 @@ def test_simple_retriever_still_emit_records_if_no_merge_key(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, ) From 19f4cf7fca9e1e7183e6461cefaa45a50af62ef7 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 22 Oct 2025 16:37:05 -0700 Subject: [PATCH 5/6] pr feedback and bug fix so we always include properties if no property chunking --- .../json_schema_property_selector.py | 38 ------- .../json_schema_property_selector.py | 2 +- .../query_properties/query_properties.py | 13 ++- .../query_properties/test_query_properties.py | 99 +++++++++++++++++++ 4 files changed, 111 insertions(+), 41 deletions(-) delete mode 100644 airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py deleted file mode 100644 index c25d81bba..000000000 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/json_schema_property_selector.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - -from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, Optional, Set - -from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream - -from airbyte_cdk.sources.declarative.transformations import RecordTransformation -from airbyte_cdk.sources.types import Config - - -@dataclass -class JsonSchemaPropertySelector: - """ - A class that contains a list of transformations to apply to properties. - """ - - configured_stream: ConfiguredAirbyteStream - config: Config - parameters: InitVar[Mapping[str, Any]] - properties_transformations: List[RecordTransformation] = field(default_factory=lambda: []) - - def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._parameters = parameters - - def select(self) -> Set[str]: - properties = set() - for schema_property in self.configured_stream.stream.json_schema.get( - "properties", {} - ).keys(): - if self.properties_transformations: - for transformation in self.properties_transformations: - transformation.transform( - schema_property, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected - config=self.config, - ) - properties.add(schema_property) - return properties diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py index 0186d599e..c60906e9d 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py @@ -36,7 +36,7 @@ def select(self) -> Optional[Set[str]]: This is different from the empty set where the json_schema was empty and no schema fields were selected. """ - # For CHECK/DISCOVER operations, there is no catalog and therefor no configured stream or selected + # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected # columns. In this case we return None which is interpreted by the QueryProperties component to not # perform any filtering of schema properties and fetch all of them if self.configured_stream is None: diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index f671c2ade..424caad77 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -58,6 +58,15 @@ def get_request_property_chunks( ) else: if configured_properties is not None: - yield from [[field for field in fields if field in configured_properties]] + all_fields = ( + [field for field in fields if field in configured_properties] + if configured_properties is not None + else list(fields) + ) else: - yield list(fields) + all_fields = list(fields) + + if self.always_include_properties: + all_fields = list(self.always_include_properties) + all_fields + + yield all_fields diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index 58e38b183..dc4721b07 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -149,6 +149,105 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( assert property_chunks[2] == ["zero", "seven", "lotus", "nine"] +def test_get_request_no_property_chunking_selected_properties_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june", "remove_me"] + ) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=None, + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 1 + assert property_chunks[0] == ["zero", "santa", "clover", "junpei", "june"] + + +def test_get_request_no_property_chunking_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + [ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + ) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=None, + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 1 + assert property_chunks[0] == [ + "zero", + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + + def test_get_request_property_chunks_dynamic_endpoint(): configured_airbyte_stream = _create_configured_airbyte_stream( ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] From afc67fd69eb5191bde257b801afff8d1210e831d Mon Sep 17 00:00:00 2001 From: brianjlai Date: Thu, 23 Oct 2025 12:01:36 -0700 Subject: [PATCH 6/6] don't modify the original schema when we run transforms --- .../property_selector/json_schema_property_selector.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py index c60906e9d..821a4586e 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py @@ -1,5 +1,5 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. - +import copy from dataclasses import InitVar, dataclass, field from typing import Any, List, Mapping, Optional, Set @@ -42,11 +42,13 @@ def select(self) -> Optional[Set[str]]: if self.configured_stream is None: return None - schema_properties = self.configured_stream.stream.json_schema.get("properties", {}) + schema_properties = copy.deepcopy( + self.configured_stream.stream.json_schema.get("properties", {}) + ) if self.properties_transformations: for transformation in self.properties_transformations: transformation.transform( - schema_properties, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected + record=schema_properties, config=self.config, ) return set(schema_properties.keys())