Skip to content

feat(cdk): add KeyValueExtractor#552

Closed
Serhii Lazebnyi (lazebnyi) wants to merge 10 commits intomainfrom
lazebnyi/add-key-value-extractor
Closed

feat(cdk): add KeyValueExtractor#552
Serhii Lazebnyi (lazebnyi) wants to merge 10 commits intomainfrom
lazebnyi/add-key-value-extractor

Conversation

@lazebnyi
Copy link
Copy Markdown
Contributor

@lazebnyi Serhii Lazebnyi (lazebnyi) commented May 17, 2025

What

Resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/13047

Adds a KeyValueExtractor to support extracting structured records from APIs that return related keys and values in parallel arrays. This is useful for handling responses where field names and their corresponding values are returned separately — e.g., ["name", "age"] and ["Alice", 30"] -> { "name": "Alice", "age": 30 }.

How

  • Introduced a KeyValueExtractor class.
  • It takes two sub-extractors: keys_extractor and values_extractor.
  • Both extractors run on the same response.
  • Their outputs are zipped together into dictionaries using dict(zip(keys, values)).

Summary by CodeRabbit

  • New Features
    • Introduced a new record extraction method that combines separate key and value sources into structured records within declarative sources.
  • Documentation
    • Clarified the behavior of combining multiple schema loaders, including property merging and precedence rules.
  • Tests
    • Added unit tests to verify correct extraction and mapping of keys and values using the new method.

@lazebnyi
Copy link
Copy Markdown
Contributor Author

Serhii Lazebnyi (lazebnyi) commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@github-actions github-actions bot added the enhancement New feature or request label May 17, 2025
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented May 17, 2025

📝 Walkthrough

Walkthrough

This update introduces a new KeyValueExtractor component for declarative sources, enabling the extraction of keys and values from separate locations in API responses and combining them into records. The schema, model, factory, and public API are updated to support this extractor. A dedicated unit test validates its functionality within the declarative framework.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added KeyValueExtractor definition with required type, keys_extractor, and values_extractor fields; updated RecordSelector's extractor property to support KeyValueExtractor.
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py Introduced KeyValueExtractor class inheriting from RecordExtractor, combining keys and values from two extractors into dictionary records.
airbyte_cdk/sources/declarative/extractors/__init__.py Imported and exported KeyValueExtractor in the extractors package.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Added KeyValueExtractor Pydantic model; updated RecordSelector to accept KeyValueExtractor; clarified schema_loader documentation.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Supported instantiation of KeyValueExtractor in ModelToComponentFactory via a new mapping and creation method.
unit_tests/sources/declarative/extractors/test_key_value_extractor.py Added unit test for KeyValueExtractor, verifying correct mapping of keys and values from a simulated API response in a declarative source context.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DeclarativeSource
    participant Retriever
    participant KeyValueExtractor
    participant KeysExtractor
    participant ValuesExtractor

    User->>DeclarativeSource: read()
    DeclarativeSource->>Retriever: retrieve response
    Retriever->>KeyValueExtractor: extract_records(response)
    KeyValueExtractor->>KeysExtractor: extract_records(response)
    KeysExtractor-->>KeyValueExtractor: keys[]
    KeyValueExtractor->>ValuesExtractor: extract_records(response)
    ValuesExtractor-->>KeyValueExtractor: values[]
    KeyValueExtractor-->>Retriever: yield [{key: value, ...}]
    Retriever-->>DeclarativeSource: records
    DeclarativeSource-->>User: records
Loading

Would you like to see a diagram comparing the old and new extractor flows, or is this high-level overview sufficient for your needs? Wdyt?

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between ef6bd7a and 15c0ddc.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/extractors/test_key_value_extractor.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • unit_tests/sources/declarative/extractors/test_key_value_extractor.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)

34-42: Consider handling mismatched key-value counts.

The implementation is clean, but what happens if there are fewer values than keys in a chunk? The current implementation will create a dictionary with missing keys.

Would you consider adding handling for this case? Perhaps:

while True:
    chunk = list(islice(values, len(keys)))
    if not chunk:
        break
-   yield dict(zip(keys, chunk))
+   # Ensure all keys have values, even if there are fewer values than keys
+   result = {}
+   for i, key in enumerate(keys):
+       result[key] = chunk[i] if i < len(chunk) else None
+   yield result

Or alternatively, if it's preferable to keep only the keys that have values:

while True:
    chunk = list(islice(values, len(keys)))
    if not chunk:
        break
-   yield dict(zip(keys, chunk))
+   # Only include keys that have corresponding values
+   yield dict(zip(keys[:len(chunk)], chunk))

What do you think?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)

1744-1769: Normalize enum formatting and enrich descriptions?

The enum for KeyValueExtractor is defined as [ KeyValueExtractor ] (with extra whitespace) and the description and field descriptions are placeholders. Could we trim the whitespace ([KeyValueExtractor]) and provide more meaningful descriptions (e.g., explain that it extracts keys and values from records using the provided extractors)? wdyt?


1769-1791: Normalize enum formatting and enrich descriptions?

Similarly, the CombinedExtractor uses [ CombinedExtractor ] with extraneous spaces and has placeholder descriptions. Would you consider adjusting the enum to [CombinedExtractor] and expanding the description (e.g., “Combines multiple record extractors to merge results into a single list of records”)? wdyt?


2369-2375: Provide a meaningful description?

The schema_filter property currently has description: placeholder. Can we clarify its purpose (e.g., “Filters extracted schema properties based on a predicate before schema transformation”) to improve schema docs? wdyt?


3372-3374: Add examples for new extractor types?

We’ve extended the extractor in RecordSelector to include CombinedExtractor and KeyValueExtractor but there are no examples demonstrating their usage. Could we add sample entries under examples to illustrate these new extractor types? wdyt?


4122-4127: Add metadata and verify code support for multiple configs?

The stream_config property has no title or description, and now allows both a single StreamConfig and an array. Have we updated the resolver implementation to handle both formats, and could we add descriptive metadata for clarity? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1-4: Check copyright year

The copyright year is set to 2025, which is in the future. This is likely an oversight.

-# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

What do you think?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1481-1482: Should this field really be Optional?

Declaring the property as Optional[bool] = False means None is still a valid value, yet the default suggests you only expect a boolean. Would switching to a plain bool = False, or at least documenting the meaning of None, make the intent clearer, wdyt?


2436-2438: schema_filter placeholder lacks guidance.

The new schema_filter option is great 👍. Could you add at least a one-line purpose sentence (e.g. “Filter dynamic schema records before type-inference is applied”) to help spec readers, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between fa8d54d and 50f729a.

📒 Files selected for processing (10)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (6 hunks)
  • airbyte_cdk/sources/declarative/extractors/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (7 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
  • airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2 hunks)
  • airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6 hunks)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (19-44)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
  • CombinedExtractor (1858-1863)
  • DpathExtractor (696-709)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
  • HttpSelector (13-37)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (3)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • extract_records (37-44)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • StreamConfig (1485-1494)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (2)
  • ConfigComponentsResolver (42-171)
  • StreamConfig (25-37)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (1)
  • DpathExtractor (18-86)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
  • CustomRecordExtractor (3745-3750)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (19-44)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

[error] 182-182: Argument 1 to "filter_records" of "RecordFilter" has incompatible type "ItemsView[str, Any]"; expected "Iterable[Mapping[str, Any]]" [arg-type]


[error] 183-183: Invalid index type "int" for "Mapping[str, Any]"; expected type "str" [index]

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

[error] 97-97: Function is missing a return type annotation [no-untyped-def]


[error] 98-98: Function is missing a type annotation [no-untyped-def]


[error] 103-103: Function is missing a type annotation [no-untyped-def]


[error] 106-106: Function is missing a return type annotation [no-untyped-def]


[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]


[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]


[error] 112-112: Unsupported left operand type for + ("object") [operator]


[error] 113-113: Need type annotation for "item" [var-annotated]


[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]


[error] 115-115: Function is missing a type annotation [no-untyped-def]


[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]


[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (26)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)

5-8: New extractors properly imported!

Both CombinedExtractor and KeyValueExtractor are properly imported. This makes sense as they complement the existing extractor functionality.


23-24: Exports properly added to all

The new extractor classes are correctly exposed in the __all__ list, ensuring they're accessible when the package is imported.

airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)

25-25: New create_or_update flag added correctly

Adding the optional boolean flag with a default value of False to ComponentMappingDefinition is a backward-compatible change. This flag enables conditional creation or updating of component mappings, which enhances flexibility. The default value ensures existing code won't break.


38-38: Mirrored create_or_update flag in ResolvedComponentMappingDefinition

The same flag is consistently added to ResolvedComponentMappingDefinition with the same default value, maintaining consistency between the two related dataclasses.

airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)

37-44: Good implementation of extract_records method

The implementation correctly collects record iterables from all extractors, zips them together to align corresponding records, and merges them into single dictionaries to yield. This is an elegant way to combine records from multiple sources.

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

265-265: Clean refactoring to use dynamic_streams property

This change simplifies the code by using the dynamic_streams property instead of directly calling _dynamic_stream_configs. It improves readability while maintaining the same behavior.

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)

13-13: Good addition of the RecordFilter import.

The new import aligns perfectly with the schema filtering capability you're adding.


130-130: Love the optional schema filtering feature.

This is a nice extension of the DynamicSchemaLoader functionality. The optional nature means existing code won't break while providing new filtering capabilities.


156-156: LGTM: Filtering before transformation.

Applying the filter before transformations is the right approach, as it reduces the work transformations need to do.

airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (2)

1-12: LGTM: Clean imports and structure.

The imports are clean and appropriate for this class. Using islice is a good choice for taking chunks of an iterator.


14-33: Nice declarative class with clear documentation.

The class definition and docstring are excellent. The example is particularly helpful for understanding the extractor's purpose.

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6)

32-32: Adding default_values to StreamConfig makes sense.

This allows providing fallback configurations, which is a nice convenience.


53-53: Good change to support multiple stream configs.

Changing from a single stream_config to a list of stream_configs provides more flexibility.


87-87: Adding create_or_update property passes through important behavior.

Preserving this flag in the resolved component is important for proper handling later.


154-155: LGTM: Parsing YAML values is a nice enhancement.

This is a good improvement - converting string YAML to native Python types before setting makes configurations more flexible.


157-158: Good handling for create_or_update paths.

Using dpath.new when dpath.set fails and create_or_update is true provides a nice way to handle missing paths.


162-171: LGTM: Clean YAML parsing helper.

The static method for YAML parsing is well-implemented with proper error handling.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)

86-86: New extractors added to imports - looks good!

The imports for the new CombinedExtractor and KeyValueExtractor along with their corresponding models have been correctly added to the file.

Also applies to: 88-88, 147-149, 312-314


652-654: Constructor mappings added for new extractors - LGTM!

You've properly registered the new extractors in the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary so they can be instantiated from the models.


2231-2246: KeyValueExtractor factory method implementation looks good

The implementation correctly creates a KeyValueExtractor by instantiating both the keys and values extractors from their models.


2247-2260: CombinedExtractor factory method implementation looks good

The implementation correctly creates extractors from the provided list and instantiates the CombinedExtractor with them.


2482-2482: New schema_filter parameter added to DynamicSchemaLoader initialization - LGTM

The schema_filter parameter is now correctly passed to the DynamicSchemaLoader constructor.


3644-3644: Added create_or_update flag to ComponentMappingDefinition

The new create_or_update parameter enhances flexibility when dealing with component mappings.


3691-3691: Added default_values to StreamConfig

The default_values parameter adds support for default stream values, which is a good addition.


3698-3707: Improved handling of multiple stream configs

The code now properly handles both single and multiple stream configs by normalizing to a list and creating multiple stream config components.


3721-3721: ConfigComponentsResolver now takes a list of stream_configs

This change allows the resolver to handle multiple stream configs, which provides more flexibility.

Comment thread airbyte_cdk/sources/declarative/extractors/combined_extractor.py Outdated
Comment thread airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py Outdated
Comment thread airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py Outdated
Comment thread airbyte_cdk/sources/declarative/declarative_component_schema.yaml Outdated
Comment thread airbyte_cdk/sources/declarative/declarative_component_schema.yaml Outdated
Comment thread airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Outdated
Comment thread airbyte_cdk/sources/declarative/models/declarative_component_schema.py Outdated
@lazebnyi
Copy link
Copy Markdown
Contributor Author

Serhii Lazebnyi (lazebnyi) commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2226-2241: Implementation looks clean - any thoughts on error handling edge cases?

The implementation correctly creates the two sub-extractors and returns a new KeyValueExtractor instance. The default JsonDecoder is a sensible choice.

Should we add any validation to handle potential mismatch in the number of keys vs values that are extracted? Or is this handled in the KeyValueExtractor implementation itself? wdyt?

unit_tests/sources/declarative/extractors/test_key_value_extractor.py (4)

1-1: Fix Ruff formatting issues

The pipeline is showing a Ruff formatting issue for this file. Running ruff format would fix these formatting issues automatically.

Would you like me to create an automated fix for this, or would you prefer to run the formatter yourself?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.


97-132: Consider adding tests for edge cases

The test effectively validates the happy path for the KeyValueExtractor, but there are no tests for edge cases like mismatched array lengths or empty arrays.

Would adding tests for these scenarios be valuable? Perhaps something like:

def test_key_value_extractor_edge_cases():
    source = ConcurrentDeclarativeSource(
        source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
    )
    
    # Configure catalog same as in main test
    actual_catalog = source.discover(logger=source.logger, config=_CONFIG)
    configured_streams = [
        to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
        for stream in actual_catalog.streams
    ]
    configured_catalog = to_configured_catalog(configured_streams)
    
    # Case 1: Mismatched array lengths
    with HttpMocker() as http_mocker:
        http_mocker.get(
            HttpRequest(url="https://api.test.com/items"),
            HttpResponse(
                body=json.dumps(
                    {
                        "dimensions": {
                            "names": ["customer_segment", "traffic_source"],
                            "values": ["enterprise"]  # One fewer value than keys
                        }
                    }
                )
            ),
        )
        
        # Test how the extractor handles this case
        # ...

What do you think?


97-101: Add docstring to test function

Adding a docstring to explain what this test validates would improve readability and documentation.

Maybe something like:

def test_key_value_extractor():
+    """
+    Test that the KeyValueExtractor correctly combines keys and values from separate arrays in the API response
+    into a structured record with key-value pairs.
+    """
    source = ConcurrentDeclarativeSource(

wdyt?


47-93: Minor inconsistency in stream names

The check component references "Rates" but the actual stream is named "test_stream".

This inconsistency doesn't affect the test since we're not testing the check method, but for clarity should the check stream name match the actual stream name? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 50f729a and ef6bd7a.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • unit_tests/sources/declarative/extractors/test_key_value_extractor.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/sources/declarative/extractors/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/extractors/test_key_value_extractor.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

87-87: Looks good! Adding KeyValueExtractor import

Clean addition to the imports from the extractors module.


308-310: Looks good! Adding KeyValueExtractorModel import

Clean import of the Pydantic model for the new extractor.


648-648: Looks good! Registering the KeyValueExtractor factory method

Properly registered the KeyValueExtractorModel to its factory method in the _init_mappings dictionary.

unit_tests/sources/declarative/extractors/test_key_value_extractor.py (3)

16-29: LGTM! Helper function looks good

This helper function is well-designed with good default parameters and proper return type annotation.


32-35: LGTM! Clean catalog conversion helper

Simple and effective helper function for converting streams to a catalog.


43-94: LGTM! Well-structured manifest for testing KeyValueExtractor

The manifest properly demonstrates how to use the new KeyValueExtractor with two DpathExtractors to extract keys and values from parallel arrays.

@lazebnyi
Copy link
Copy Markdown
Contributor Author

Serhii Lazebnyi (lazebnyi) commented May 19, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Copy Markdown
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some general comments and little things. I think the main thing I want to understand better before approving is how this is supposed to work when we have multiple elements to emit depending on what values is.

Also, just as a general thought, my worry is that parsing over the values array seems pretty unique so I was not sure if there was a common pattern shared by multiple connectors and if this was more suitable as a custom component.

"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we alphabetize this

while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little more idiomatic to avoid using a permanent True + break statement. Can we instead use a has_more_chunks variable to manage when to stop iterating.

has_more_chunks = True
while has_more_chunks:
  ...
  if len(chunk) == 0: # As far as I understand reading the code we break if there are no values in the current chunk
    has_more_chunks = True
  else:
    yield dict(zip(keys, chunk))


def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list.

What happens in the following:

  • Values has a length of 3 like ["Alice", 30, "what about this"]? Is this an error state, because right now, this would yield a second record of {"name": "what about this"} without the second property
  • Are we always expecting values to always be a single continuous list ["Alice", 30, "Thomas", 40, "Alex", 35]? Or do we expect it to be grouped into multiple lists of lists: [["Alice", 30], ["Thomas", 40], ["Alex", 35]]?

}


def test_key_value_extractor():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some additional scenarios beyond just the single element case to make sure this is working and demonstrate what the intended behavior should be for multiple elements.

"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, we should probably also take in the config and parameter objects since right now we are also passing them in via the factory in model_to_component_factory.py.create_key_value_extractor()

@lazebnyi
Copy link
Copy Markdown
Contributor Author

A custom component will be used for the migration instead of this implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants