diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ca595d92e..a120bfa49 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1741,6 +1741,29 @@ definitions: $parameters: type: object additionalProperties: true + KeyValueExtractor: + title: Key Value Extractor + description: Record extractor that combines keys and values from two separate extractors. + type: object + required: + - type + - keys_extractor + - values_extractor + properties: + type: + type: string + enum: [ KeyValueExtractor ] + keys_extractor: + anyOf: + - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" + values_extractor: + anyOf: + - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" + $parameters: + type: object + additionalProperties: true DpathExtractor: title: Dpath Extractor description: Record extractor that searches a decoded response over a path defined as an array of fields. @@ -3315,6 +3338,7 @@ definitions: extractor: anyOf: - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/KeyValueExtractor" - "$ref": "#/definitions/CustomRecordExtractor" record_filter: title: Record Filter diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index 8f1d18d12..6f01175e8 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -4,6 +4,7 @@ from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector +from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( @@ -18,4 +19,5 @@ "RecordFilter", "RecordSelector", "ResponseToFileExtractor", + "KeyValueExtractor", ] diff --git a/airbyte_cdk/sources/declarative/extractors/key_value_extractor.py b/airbyte_cdk/sources/declarative/extractors/key_value_extractor.py new file mode 100644 index 000000000..62df25a67 --- /dev/null +++ b/airbyte_cdk/sources/declarative/extractors/key_value_extractor.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from itertools import islice +from typing import Any, Iterable, MutableMapping + +import requests + +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor + + +@dataclass +class KeyValueExtractor(RecordExtractor): + """ + Extractor that combines keys and values from two separate extractors. + + The `keys_extractor` and `values_extractor` extract records independently + from the response. Their outputs are zipped together to form key-value mappings. + + Each key from `keys_extractor` should correspond to a key in the resulting dictionary, + and each value from `values_extractor` is the value for that key. + + Example: + keys_extractor -> yields: ["name", "age"] + values_extractor -> yields: ["Alice", 30] + result: { "name": "Alice", "age": 30 } + """ + + keys_extractor: RecordExtractor + values_extractor: RecordExtractor + + def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: + keys = list(self.keys_extractor.extract_records(response)) + values = self.values_extractor.extract_records(response) + + while True: + chunk = list(islice(values, len(keys))) + if not chunk: + break + yield dict(zip(keys, chunk)) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0aa9fa569..d7ee88fed 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -1844,6 +1842,13 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeyValueExtractor(BaseModel): + type: Literal["KeyValueExtractor"] + keys_extractor: Union[DpathExtractor, CustomRecordExtractor] + values_extractor: Union[DpathExtractor, CustomRecordExtractor] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( @@ -1881,7 +1886,7 @@ class ListPartitionRouter(BaseModel): class RecordSelector(BaseModel): type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] + extractor: Union[DpathExtractor, KeyValueExtractor, CustomRecordExtractor] record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( None, description="Responsible for filtering records to be emitted by the Source.", @@ -2164,7 +2169,7 @@ class Config: ] ] = Field( None, - description="Component used to retrieve the schema for the current stream.", + description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.", title="Schema Loader", ) transformations: Optional[ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e7cdb6683..341444c4c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -84,6 +84,7 @@ ) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, + KeyValueExtractor, RecordFilter, RecordSelector, ResponseToFileExtractor, @@ -304,6 +305,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToSnakeCase as KeysToSnakeCaseModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeyValueExtractor as KeyValueExtractorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, ) @@ -641,6 +645,7 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, + KeyValueExtractorModel: self.create_key_value_extractor, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -2218,6 +2223,22 @@ def create_dpath_extractor( parameters=model.parameters or {}, ) + def create_key_value_extractor( + self, + model: KeyValueExtractorModel, + config: Config, + decoder: Optional[Decoder] = JsonDecoder(parameters={}), + **kwargs: Any, + ) -> KeyValueExtractor: + keys_extractor = self._create_component_from_model( + model=model.keys_extractor, decoder=decoder, config=config + ) + values_extractor = self._create_component_from_model( + model=model.values_extractor, decoder=decoder, config=config + ) + + return KeyValueExtractor(keys_extractor=keys_extractor, values_extractor=values_extractor) + @staticmethod def create_response_to_file_extractor( model: ResponseToFileExtractorModel, diff --git a/unit_tests/sources/declarative/extractors/test_key_value_extractor.py b/unit_tests/sources/declarative/extractors/test_key_value_extractor.py new file mode 100644 index 000000000..3525f77bd --- /dev/null +++ b/unit_tests/sources/declarative/extractors/test_key_value_extractor.py @@ -0,0 +1,137 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +from airbyte_cdk.models import ( + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Type, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "api_key": "dummy_api_key", +} + + +_MANIFEST = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "test_stream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "KeyValueExtractor", + "keys_extractor": { + "type": "DpathExtractor", + "field_path": ["dimensions", "names"], + }, + "values_extractor": { + "type": "DpathExtractor", + "field_path": ["dimensions", "values"], + }, + }, + }, + "paginator": {"type": "NoPagination"}, + }, + } + ], +} + + +def test_key_value_extractor(): + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + { + "dimensions": { + "names": ["customer_segment", "traffic_source"], + "values": ["enterprise", "organic_search"], + } + } + ) + ), + ) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(records) == 1 + assert records[0].data == {"customer_segment": "enterprise", "traffic_source": "organic_search"}