feat(cdk): add KeyValueExtractor#552
feat(cdk): add KeyValueExtractor#552Serhii Lazebnyi (lazebnyi) wants to merge 10 commits intomainfrom
Conversation
|
/autofix
|
📝 WalkthroughWalkthroughThis update introduces a new Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant DeclarativeSource
participant Retriever
participant KeyValueExtractor
participant KeysExtractor
participant ValuesExtractor
User->>DeclarativeSource: read()
DeclarativeSource->>Retriever: retrieve response
Retriever->>KeyValueExtractor: extract_records(response)
KeyValueExtractor->>KeysExtractor: extract_records(response)
KeysExtractor-->>KeyValueExtractor: keys[]
KeyValueExtractor->>ValuesExtractor: extract_records(response)
ValuesExtractor-->>KeyValueExtractor: values[]
KeyValueExtractor-->>Retriever: yield [{key: value, ...}]
Retriever-->>DeclarativeSource: records
DeclarativeSource-->>User: records
Would you like to see a diagram comparing the old and new extractor flows, or is this high-level overview sufficient for your needs? Wdyt? Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (9)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
34-42: Consider handling mismatched key-value counts.The implementation is clean, but what happens if there are fewer values than keys in a chunk? The current implementation will create a dictionary with missing keys.
Would you consider adding handling for this case? Perhaps:
while True: chunk = list(islice(values, len(keys))) if not chunk: break - yield dict(zip(keys, chunk)) + # Ensure all keys have values, even if there are fewer values than keys + result = {} + for i, key in enumerate(keys): + result[key] = chunk[i] if i < len(chunk) else None + yield resultOr alternatively, if it's preferable to keep only the keys that have values:
while True: chunk = list(islice(values, len(keys))) if not chunk: break - yield dict(zip(keys, chunk)) + # Only include keys that have corresponding values + yield dict(zip(keys[:len(chunk)], chunk))What do you think?
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)
1744-1769: Normalize enum formatting and enrich descriptions?The
enumforKeyValueExtractoris defined as[ KeyValueExtractor ](with extra whitespace) and thedescriptionand field descriptions are placeholders. Could we trim the whitespace ([KeyValueExtractor]) and provide more meaningful descriptions (e.g., explain that it extracts keys and values from records using the provided extractors)? wdyt?
1769-1791: Normalize enum formatting and enrich descriptions?Similarly, the
CombinedExtractoruses[ CombinedExtractor ]with extraneous spaces and has placeholder descriptions. Would you consider adjusting the enum to[CombinedExtractor]and expanding thedescription(e.g., “Combines multiple record extractors to merge results into a single list of records”)? wdyt?
2369-2375: Provide a meaningful description?The
schema_filterproperty currently hasdescription: placeholder. Can we clarify its purpose (e.g., “Filters extracted schema properties based on a predicate before schema transformation”) to improve schema docs? wdyt?
3372-3374: Add examples for new extractor types?We’ve extended the
extractorinRecordSelectorto includeCombinedExtractorandKeyValueExtractorbut there are no examples demonstrating their usage. Could we add sample entries underexamplesto illustrate these new extractor types? wdyt?
4122-4127: Add metadata and verify code support for multiple configs?The
stream_configproperty has notitleordescription, and now allows both a singleStreamConfigand an array. Have we updated the resolver implementation to handle both formats, and could we add descriptive metadata for clarity? wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1-4: Check copyright yearThe copyright year is set to 2025, which is in the future. This is likely an oversight.
-# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# Copyright (c) 2024 Airbyte, Inc., all rights reserved.What do you think?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1481-1482: Should this field really be Optional?Declaring the property as
Optional[bool] = FalsemeansNoneis still a valid value, yet the default suggests you only expect a boolean. Would switching to a plainbool = False, or at least documenting the meaning ofNone, make the intent clearer, wdyt?
2436-2438:schema_filterplaceholder lacks guidance.The new
schema_filteroption is great 👍. Could you add at least a one-line purpose sentence (e.g. “Filter dynamic schema records before type-inference is applied”) to help spec readers, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml(6 hunks)airbyte_cdk/sources/declarative/extractors/__init__.py(2 hunks)airbyte_cdk/sources/declarative/extractors/combined_extractor.py(1 hunks)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py(1 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(9 hunks)airbyte_cdk/sources/declarative/resolvers/components_resolver.py(2 hunks)airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py(6 hunks)airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
CombinedExtractor(19-44)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
CombinedExtractor(1858-1863)DpathExtractor(696-709)KeyValueExtractor(1847-1855)airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
HttpSelector(13-37)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
KeyValueExtractor(15-42)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (3)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
RecordExtractor(12-27)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
KeyValueExtractor(1847-1855)airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
extract_records(37-44)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
StreamConfig(1485-1494)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (2)
ConfigComponentsResolver(42-171)StreamConfig(25-37)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
KeyValueExtractor(15-42)airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (1)
DpathExtractor(18-86)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
CustomRecordExtractor(3745-3750)airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
CombinedExtractor(19-44)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
[error] 182-182: Argument 1 to "filter_records" of "RecordFilter" has incompatible type "ItemsView[str, Any]"; expected "Iterable[Mapping[str, Any]]" [arg-type]
[error] 183-183: Invalid index type "int" for "Mapping[str, Any]"; expected type "str" [index]
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (26)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)
5-8: New extractors properly imported!Both
CombinedExtractorandKeyValueExtractorare properly imported. This makes sense as they complement the existing extractor functionality.
23-24: Exports properly added to allThe new extractor classes are correctly exposed in the
__all__list, ensuring they're accessible when the package is imported.airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)
25-25: New create_or_update flag added correctlyAdding the optional boolean flag with a default value of
FalsetoComponentMappingDefinitionis a backward-compatible change. This flag enables conditional creation or updating of component mappings, which enhances flexibility. The default value ensures existing code won't break.
38-38: Mirrored create_or_update flag in ResolvedComponentMappingDefinitionThe same flag is consistently added to
ResolvedComponentMappingDefinitionwith the same default value, maintaining consistency between the two related dataclasses.airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
37-44: Good implementation of extract_records methodThe implementation correctly collects record iterables from all extractors, zips them together to align corresponding records, and merges them into single dictionaries to yield. This is an elegant way to combine records from multiple sources.
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
265-265: Clean refactoring to use dynamic_streams propertyThis change simplifies the code by using the
dynamic_streamsproperty instead of directly calling_dynamic_stream_configs. It improves readability while maintaining the same behavior.airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
13-13: Good addition of the RecordFilter import.The new import aligns perfectly with the schema filtering capability you're adding.
130-130: Love the optional schema filtering feature.This is a nice extension of the DynamicSchemaLoader functionality. The optional nature means existing code won't break while providing new filtering capabilities.
156-156: LGTM: Filtering before transformation.Applying the filter before transformations is the right approach, as it reduces the work transformations need to do.
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (2)
1-12: LGTM: Clean imports and structure.The imports are clean and appropriate for this class. Using
isliceis a good choice for taking chunks of an iterator.
14-33: Nice declarative class with clear documentation.The class definition and docstring are excellent. The example is particularly helpful for understanding the extractor's purpose.
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6)
32-32: Adding default_values to StreamConfig makes sense.This allows providing fallback configurations, which is a nice convenience.
53-53: Good change to support multiple stream configs.Changing from a single stream_config to a list of stream_configs provides more flexibility.
87-87: Adding create_or_update property passes through important behavior.Preserving this flag in the resolved component is important for proper handling later.
154-155: LGTM: Parsing YAML values is a nice enhancement.This is a good improvement - converting string YAML to native Python types before setting makes configurations more flexible.
157-158: Good handling for create_or_update paths.Using
dpath.newwhendpath.setfails andcreate_or_updateis true provides a nice way to handle missing paths.
162-171: LGTM: Clean YAML parsing helper.The static method for YAML parsing is well-implemented with proper error handling.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)
86-86: New extractors added to imports - looks good!The imports for the new
CombinedExtractorandKeyValueExtractoralong with their corresponding models have been correctly added to the file.Also applies to: 88-88, 147-149, 312-314
652-654: Constructor mappings added for new extractors - LGTM!You've properly registered the new extractors in the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary so they can be instantiated from the models.
2231-2246: KeyValueExtractor factory method implementation looks goodThe implementation correctly creates a KeyValueExtractor by instantiating both the keys and values extractors from their models.
2247-2260: CombinedExtractor factory method implementation looks goodThe implementation correctly creates extractors from the provided list and instantiates the CombinedExtractor with them.
2482-2482: New schema_filter parameter added to DynamicSchemaLoader initialization - LGTMThe schema_filter parameter is now correctly passed to the DynamicSchemaLoader constructor.
3644-3644: Added create_or_update flag to ComponentMappingDefinitionThe new create_or_update parameter enhances flexibility when dealing with component mappings.
3691-3691: Added default_values to StreamConfigThe default_values parameter adds support for default stream values, which is a good addition.
3698-3707: Improved handling of multiple stream configsThe code now properly handles both single and multiple stream configs by normalizing to a list and creating multiple stream config components.
3721-3721: ConfigComponentsResolver now takes a list of stream_configsThis change allows the resolver to handle multiple stream configs, which provides more flexibility.
|
/autofix
|
…hq/airbyte-python-cdk into lazebnyi/add-key-value-extractor
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2226-2241: Implementation looks clean - any thoughts on error handling edge cases?The implementation correctly creates the two sub-extractors and returns a new KeyValueExtractor instance. The default JsonDecoder is a sensible choice.
Should we add any validation to handle potential mismatch in the number of keys vs values that are extracted? Or is this handled in the KeyValueExtractor implementation itself? wdyt?
unit_tests/sources/declarative/extractors/test_key_value_extractor.py (4)
1-1: Fix Ruff formatting issuesThe pipeline is showing a Ruff formatting issue for this file. Running
ruff formatwould fix these formatting issues automatically.Would you like me to create an automated fix for this, or would you prefer to run the formatter yourself?
🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
97-132: Consider adding tests for edge casesThe test effectively validates the happy path for the
KeyValueExtractor, but there are no tests for edge cases like mismatched array lengths or empty arrays.Would adding tests for these scenarios be valuable? Perhaps something like:
def test_key_value_extractor_edge_cases(): source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None ) # Configure catalog same as in main test actual_catalog = source.discover(logger=source.logger, config=_CONFIG) configured_streams = [ to_configured_stream(stream, primary_key=stream.source_defined_primary_key) for stream in actual_catalog.streams ] configured_catalog = to_configured_catalog(configured_streams) # Case 1: Mismatched array lengths with HttpMocker() as http_mocker: http_mocker.get( HttpRequest(url="https://api.test.com/items"), HttpResponse( body=json.dumps( { "dimensions": { "names": ["customer_segment", "traffic_source"], "values": ["enterprise"] # One fewer value than keys } } ) ), ) # Test how the extractor handles this case # ...What do you think?
97-101: Add docstring to test functionAdding a docstring to explain what this test validates would improve readability and documentation.
Maybe something like:
def test_key_value_extractor(): + """ + Test that the KeyValueExtractor correctly combines keys and values from separate arrays in the API response + into a structured record with key-value pairs. + """ source = ConcurrentDeclarativeSource(wdyt?
47-93: Minor inconsistency in stream namesThe check component references "Rates" but the actual stream is named "test_stream".
This inconsistency doesn't affect the test since we're not testing the check method, but for clarity should the check stream name match the actual stream name? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml(2 hunks)airbyte_cdk/sources/declarative/extractors/__init__.py(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(4 hunks)unit_tests/sources/declarative/extractors/test_key_value_extractor.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/sources/declarative/extractors/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
- airbyte_cdk/sources/declarative/models/declarative_component_schema.py
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/extractors/test_key_value_extractor.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
87-87: Looks good! Adding KeyValueExtractor importClean addition to the imports from the extractors module.
308-310: Looks good! Adding KeyValueExtractorModel importClean import of the Pydantic model for the new extractor.
648-648: Looks good! Registering the KeyValueExtractor factory methodProperly registered the KeyValueExtractorModel to its factory method in the _init_mappings dictionary.
unit_tests/sources/declarative/extractors/test_key_value_extractor.py (3)
16-29: LGTM! Helper function looks goodThis helper function is well-designed with good default parameters and proper return type annotation.
32-35: LGTM! Clean catalog conversion helperSimple and effective helper function for converting streams to a catalog.
43-94: LGTM! Well-structured manifest for testing KeyValueExtractorThe manifest properly demonstrates how to use the new
KeyValueExtractorwith two DpathExtractors to extract keys and values from parallel arrays.
|
/autofix
|
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
Some general comments and little things. I think the main thing I want to understand better before approving is how this is supposed to work when we have multiple elements to emit depending on what values is.
Also, just as a general thought, my worry is that parsing over the values array seems pretty unique so I was not sure if there was a common pattern shared by multiple connectors and if this was more suitable as a custom component.
| "RecordFilter", | ||
| "RecordSelector", | ||
| "ResponseToFileExtractor", | ||
| "KeyValueExtractor", |
There was a problem hiding this comment.
Nit, can we alphabetize this
| while True: | ||
| chunk = list(islice(values, len(keys))) | ||
| if not chunk: | ||
| break |
There was a problem hiding this comment.
It feels a little more idiomatic to avoid using a permanent True + break statement. Can we instead use a has_more_chunks variable to manage when to stop iterating.
has_more_chunks = True
while has_more_chunks:
...
if len(chunk) == 0: # As far as I understand reading the code we break if there are no values in the current chunk
has_more_chunks = True
else:
yield dict(zip(keys, chunk))
|
|
||
| def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: | ||
| keys = list(self.keys_extractor.extract_records(response)) | ||
| values = self.values_extractor.extract_records(response) |
There was a problem hiding this comment.
What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list.
What happens in the following:
- Values has a length of 3 like
["Alice", 30, "what about this"]? Is this an error state, because right now, this would yield a second record of{"name": "what about this"}without the second property - Are we always expecting
valuesto always be a single continuous list["Alice", 30, "Thomas", 40, "Alex", 35]? Or do we expect it to be grouped into multiple lists of lists:[["Alice", 30], ["Thomas", 40], ["Alex", 35]]?
| } | ||
|
|
||
|
|
||
| def test_key_value_extractor(): |
There was a problem hiding this comment.
Let's add some additional scenarios beyond just the single element case to make sure this is working and demonstrate what the intended behavior should be for multiple elements.
| """ | ||
|
|
||
| keys_extractor: RecordExtractor | ||
| values_extractor: RecordExtractor |
There was a problem hiding this comment.
for consistency, we should probably also take in the config and parameter objects since right now we are also passing them in via the factory in model_to_component_factory.py.create_key_value_extractor()
|
A custom component will be used for the migration instead of this implementation. |
What
Resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/13047
Adds a
KeyValueExtractorto support extracting structured records from APIs that return related keys and values in parallel arrays. This is useful for handling responses where field names and their corresponding values are returned separately — e.g.,["name", "age"]and["Alice", 30"]->{ "name": "Alice", "age": 30 }.How
KeyValueExtractorclass.keys_extractorandvalues_extractor.dict(zip(keys, values)).Summary by CodeRabbit