Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,29 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyValueExtractor:
title: Key Value Extractor
description: Record extractor that combines keys and values from two separate extractors.
type: object
required:
- type
- keys_extractor
- values_extractor
properties:
type:
type: string
enum: [ KeyValueExtractor ]
keys_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
values_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -3315,6 +3338,7 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
Expand All @@ -18,4 +19,5 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we alphabetize this

]
42 changes: 42 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, MutableMapping

import requests

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor


@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, we should probably also take in the config and parameter objects since right now we are also passing them in via the factory in model_to_component_factory.py.create_key_value_extractor()


def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list.

What happens in the following:

  • Values has a length of 3 like ["Alice", 30, "what about this"]? Is this an error state, because right now, this would yield a second record of {"name": "what about this"} without the second property
  • Are we always expecting values to always be a single continuous list ["Alice", 30, "Thomas", 40, "Alex", 35]? Or do we expect it to be grouped into multiple lists of lists: [["Alice", 30], ["Thomas", 40], ["Alex", 35]]?


while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little more idiomatic to avoid using a permanent True + break statement. Can we instead use a has_more_chunks variable to manage when to stop iterating.

has_more_chunks = True
while has_more_chunks:
  ...
  if len(chunk) == 0: # As far as I understand reading the code we break if there are no values in the current chunk
    has_more_chunks = True
  else:
    yield dict(zip(keys, chunk))

yield dict(zip(keys, chunk))
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1844,6 +1842,13 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyValueExtractor(BaseModel):
type: Literal["KeyValueExtractor"]
keys_extractor: Union[DpathExtractor, CustomRecordExtractor]
values_extractor: Union[DpathExtractor, CustomRecordExtractor]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Comment thread
lazebnyi marked this conversation as resolved.

class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,7 +1886,7 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, KeyValueExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
Expand Down Expand Up @@ -2164,7 +2169,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
)
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
KeyValueExtractor,
RecordFilter,
RecordSelector,
ResponseToFileExtractor,
Expand Down Expand Up @@ -304,6 +305,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToSnakeCase as KeysToSnakeCaseModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeyValueExtractor as KeyValueExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
)
Expand Down Expand Up @@ -641,6 +645,7 @@ def _init_mappings(self) -> None:
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
DpathExtractorModel: self.create_dpath_extractor,
KeyValueExtractorModel: self.create_key_value_extractor,
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
Expand Down Expand Up @@ -2218,6 +2223,22 @@ def create_dpath_extractor(
parameters=model.parameters or {},
)

def create_key_value_extractor(
self,
model: KeyValueExtractorModel,
config: Config,
decoder: Optional[Decoder] = JsonDecoder(parameters={}),
**kwargs: Any,
) -> KeyValueExtractor:
keys_extractor = self._create_component_from_model(
model=model.keys_extractor, decoder=decoder, config=config
)
values_extractor = self._create_component_from_model(
model=model.values_extractor, decoder=decoder, config=config
)

return KeyValueExtractor(keys_extractor=keys_extractor, values_extractor=values_extractor)

@staticmethod
def create_response_to_file_extractor(
model: ResponseToFileExtractorModel,
Expand Down
132 changes: 132 additions & 0 deletions unit_tests/sources/declarative/extractors/test_key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
from unittest.mock import MagicMock

from airbyte_cdk.models import (ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream, DestinationSyncMode,
Type)
from airbyte_cdk.sources.declarative.concurrent_declarative_source import \
ConcurrentDeclarativeSource
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse


def to_configured_stream(
stream,
sync_mode=None,
destination_sync_mode=DestinationSyncMode.append,
cursor_field=None,
primary_key=None,
) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream(
stream=stream,
sync_mode=sync_mode,
destination_sync_mode=destination_sync_mode,
cursor_field=cursor_field,
primary_key=primary_key,
)


def to_configured_catalog(
configured_streams,
) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog(streams=configured_streams)


_CONFIG = {
"start_date": "2024-07-01T00:00:00.000Z",
}
Comment thread
lazebnyi marked this conversation as resolved.


_MANIFEST = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "test_stream",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "/items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "KeyValueExtractor",
"keys_extractor": {
"type": "DpathExtractor",
"field_path": ["dimensions", "names"],
},
"values_extractor": {
"type": "DpathExtractor",
"field_path": ["dimensions", "values"],
},
},
},
"paginator": {"type": "NoPagination"},
},
}
],
}


def test_key_value_extractor():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some additional scenarios beyond just the single element case to make sure this is working and demonstrate what the intended behavior should be for multiple elements.

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

configured_streams = [
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
for stream in actual_catalog.streams
]
configured_catalog = to_configured_catalog(configured_streams)

with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
{
"dimensions": {
"names": ["customer_segment", "traffic_source"],
"values": ["enterprise", "organic_search"],
}
}
)
),
)

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
if message.type == Type.RECORD
]

assert len(records) == 1
assert records[0].data == {"customer_segment": "enterprise", "traffic_source": "organic_search"}
Loading