-
Notifications
You must be signed in to change notification settings - Fork 44
feat(cdk): add KeyValueExtractor #552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
f081466
50f729a
ae16e6a
3c87c74
4378007
c2c1db3
4cd1e0c
ef6bd7a
58b98ee
15c0ddc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| # | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from dataclasses import dataclass | ||
| from itertools import islice | ||
| from typing import Any, Iterable, MutableMapping | ||
|
|
||
| import requests | ||
|
|
||
| from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor | ||
|
|
||
|
|
||
| @dataclass | ||
| class KeyValueExtractor(RecordExtractor): | ||
| """ | ||
| Extractor that combines keys and values from two separate extractors. | ||
|
|
||
| The `keys_extractor` and `values_extractor` extract records independently | ||
| from the response. Their outputs are zipped together to form key-value mappings. | ||
|
|
||
| Each key from `keys_extractor` should correspond to a key in the resulting dictionary, | ||
| and each value from `values_extractor` is the value for that key. | ||
|
|
||
| Example: | ||
| keys_extractor -> yields: ["name", "age"] | ||
| values_extractor -> yields: ["Alice", 30] | ||
| result: { "name": "Alice", "age": 30 } | ||
| """ | ||
|
|
||
| keys_extractor: RecordExtractor | ||
| values_extractor: RecordExtractor | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for consistency, we should probably also take in the |
||
|
|
||
| def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: | ||
| keys = list(self.keys_extractor.extract_records(response)) | ||
| values = self.values_extractor.extract_records(response) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list. What happens in the following:
|
||
|
|
||
| while True: | ||
| chunk = list(islice(values, len(keys))) | ||
| if not chunk: | ||
| break | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels a little more idiomatic to avoid using a permanent |
||
| yield dict(zip(keys, chunk)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| # | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| import json | ||
| from unittest.mock import MagicMock | ||
|
|
||
| from airbyte_cdk.models import (ConfiguredAirbyteCatalog, | ||
| ConfiguredAirbyteStream, DestinationSyncMode, | ||
| Type) | ||
| from airbyte_cdk.sources.declarative.concurrent_declarative_source import \ | ||
| ConcurrentDeclarativeSource | ||
| from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse | ||
|
|
||
|
|
||
| def to_configured_stream( | ||
| stream, | ||
| sync_mode=None, | ||
| destination_sync_mode=DestinationSyncMode.append, | ||
| cursor_field=None, | ||
| primary_key=None, | ||
| ) -> ConfiguredAirbyteStream: | ||
| return ConfiguredAirbyteStream( | ||
| stream=stream, | ||
| sync_mode=sync_mode, | ||
| destination_sync_mode=destination_sync_mode, | ||
| cursor_field=cursor_field, | ||
| primary_key=primary_key, | ||
| ) | ||
|
|
||
|
|
||
| def to_configured_catalog( | ||
| configured_streams, | ||
| ) -> ConfiguredAirbyteCatalog: | ||
| return ConfiguredAirbyteCatalog(streams=configured_streams) | ||
|
|
||
|
|
||
| _CONFIG = { | ||
| "start_date": "2024-07-01T00:00:00.000Z", | ||
| } | ||
|
lazebnyi marked this conversation as resolved.
|
||
|
|
||
|
|
||
| _MANIFEST = { | ||
| "version": "6.7.0", | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream", "stream_names": ["Rates"]}, | ||
| "streams": [ | ||
| { | ||
| "type": "DeclarativeStream", | ||
| "name": "test_stream", | ||
| "primary_key": [], | ||
| "schema_loader": { | ||
| "type": "InlineSchemaLoader", | ||
| "schema": { | ||
| "$schema": "http://json-schema.org/schema#", | ||
| "properties": { | ||
| "ABC": {"type": "number"}, | ||
| "AED": {"type": "number"}, | ||
| }, | ||
| "type": "object", | ||
| }, | ||
| }, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": { | ||
| "type": "HttpRequester", | ||
| "url_base": "https://api.test.com", | ||
| "path": "/items", | ||
| "http_method": "GET", | ||
| "authenticator": { | ||
| "type": "ApiKeyAuthenticator", | ||
| "header": "apikey", | ||
| "api_token": "{{ config['api_key'] }}", | ||
| }, | ||
| }, | ||
| "record_selector": { | ||
| "type": "RecordSelector", | ||
| "extractor": { | ||
| "type": "KeyValueExtractor", | ||
| "keys_extractor": { | ||
| "type": "DpathExtractor", | ||
| "field_path": ["dimensions", "names"], | ||
| }, | ||
| "values_extractor": { | ||
| "type": "DpathExtractor", | ||
| "field_path": ["dimensions", "values"], | ||
| }, | ||
| }, | ||
| }, | ||
| "paginator": {"type": "NoPagination"}, | ||
| }, | ||
| } | ||
| ], | ||
| } | ||
|
|
||
|
|
||
| def test_key_value_extractor(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add some additional scenarios beyond just the single element case to make sure this is working and demonstrate what the intended behavior should be for multiple elements. |
||
| source = ConcurrentDeclarativeSource( | ||
| source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None | ||
| ) | ||
|
|
||
| actual_catalog = source.discover(logger=source.logger, config=_CONFIG) | ||
|
|
||
| configured_streams = [ | ||
| to_configured_stream(stream, primary_key=stream.source_defined_primary_key) | ||
| for stream in actual_catalog.streams | ||
| ] | ||
| configured_catalog = to_configured_catalog(configured_streams) | ||
|
|
||
| with HttpMocker() as http_mocker: | ||
| http_mocker.get( | ||
| HttpRequest(url="https://api.test.com/items"), | ||
| HttpResponse( | ||
| body=json.dumps( | ||
| { | ||
| "dimensions": { | ||
| "names": ["customer_segment", "traffic_source"], | ||
| "values": ["enterprise", "organic_search"], | ||
| } | ||
| } | ||
| ) | ||
| ), | ||
| ) | ||
|
|
||
| records = [ | ||
| message.record | ||
| for message in source.read(MagicMock(), _CONFIG, configured_catalog) | ||
| if message.type == Type.RECORD | ||
| ] | ||
|
|
||
| assert len(records) == 1 | ||
| assert records[0].data == {"customer_segment": "enterprise", "traffic_source": "organic_search"} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, can we alphabetize this