Skip to content

feat(DpathExtractor): Add RecordExpander component for nested array extraction#859

Merged
Alfredo Garcia (agarctfi) merged 20 commits intomainfrom
devin/1764690419-dpath-extractor-expansion
Apr 6, 2026
Merged

feat(DpathExtractor): Add RecordExpander component for nested array extraction#859
Alfredo Garcia (agarctfi) merged 20 commits intomainfrom
devin/1764690419-dpath-extractor-expansion

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Dec 2, 2025

feat(cdk): Add RecordExpander component for nested array extraction

Summary

This PR adds a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This is needed to fix the Stripe invoice_line_items and subscription_items stream issues where the events endpoint returns parent objects with nested arrays, but we need to emit each child item as a separate record.

Key changes:

  • New RecordExpander class in airbyte_cdk/sources/declarative/expanders/
  • New OnNoRecords enum for type-safe on_no_records behavior
  • Integration with DpathExtractor via optional record_expander parameter
  • Support for wildcard paths (e.g., ["sections", "*", "items"])
  • Optional remain_original_record flag to embed parent record context
  • on_no_records parameter: OnNoRecords.skip (default) or OnNoRecords.emit_parent behavior
  • Schema updates and auto-generated models
  • 19 new unit tests covering all expansion scenarios

Example usage:

extractor:
  type: DpathExtractor
  field_path: ["data", "object"]
  record_expander:
    type: RecordExpander
    expand_records_from_field: ["items", "data"]
    on_no_records: skip

To copy specific parent fields into expanded child records, use existing RecordTransformations (e.g., AddFields) downstream of the extractor rather than configuring it on RecordExpander directly.

Updates since last revision

  • Removed ParentFieldMapping and parent_fields_to_copy: Parent field copying is now delegated to existing RecordTransformations (e.g., AddFields), keeping RecordExpander focused on expansion only
  • Added OnNoRecords enum: on_no_records is now a proper enum (OnNoRecords.skip, OnNoRecords.emit_parent) instead of a raw string, for type safety
  • Lint and format auto-fixes applied

Review & Testing Checklist for Human

This is a YELLOW risk PR (medium confidence). Please verify:

  • on_no_records behavior: Verify the emit_parent option correctly emits the parent record when expansion path is missing, empty, or non-array. The logic at the end of expand_record() in record_expander.py handles this.

  • End-to-end testing: This PR only includes unit tests. The real-world behavior needs to be verified with the Stripe connector in the companion PR (fix(source-stripe): Fix invoice_line_items and subscription_items incremental streams (do not merge) airbyte#70294). In particular, confirm that parent field copying via RecordTransformations works as expected now that ParentFieldMapping has been removed.

Recommended test plan:

  1. Run CDK tests: poetry run pytest unit_tests/sources/declarative/extractors/test_dpath_extractor.py -v
  2. Test with Stripe connector (separate PR) to verify end-to-end behavior for both subscription_items and invoice_line_items streams
  3. Try edge cases: empty arrays, missing paths, remain_original_record with on_no_records: emit_parent

Notes

Design

Design Document: RecordExpander Component

PR: #859
Related issues: oncall#8683 (invoice_line_items), oncall#10756 (subscription_items)
Companion PR: airbytehq/airbyte#70294 (Stripe connector)


1. Problem Statement

Several Stripe streams (invoice_line_items, subscription_items) use an events-based incremental sync that fetches parent objects (e.g., an Invoice) from the events endpoint. These parent objects contain nested arrays of child items (e.g., invoice.lines.data[]). The connector needs to emit each child item as a separate record, not the parent object.

Before this PR, the DpathExtractor could only extract records at a single path depth. It had no mechanism to "explode" a nested array within an extracted record into multiple output records. Connector developers had to write custom Python code to handle this pattern, which is common across many APIs.

2. Solution Overview

The PR introduces a new RecordExpander declarative component that plugs into the existing DpathExtractor. When configured, it takes each record extracted by DpathExtractor and expands it by pulling out items from a nested array field, emitting each item as a separate record.

Parent field copying (e.g., copying invoice_id from the parent into each child) is intentionally not part of RecordExpander. Instead, use existing RecordTransformations such as AddFields downstream of the extractor to enrich expanded records with parent context.

Data flow with RecordExpander:

API Response
    |
    v
DpathExtractor.extract_records()
    |  Extracts top-level records via field_path (e.g., "data.object")
    v
[Parent Record 1, Parent Record 2, ...]
    |
    v  (for each parent record)
RecordExpander.expand_record()
    |  Extracts child items from nested array (e.g., "items.data")
    v
[Child Item 1a, Child Item 1b, ..., Child Item 2a, ...]

Data flow without RecordExpander (existing behavior, unchanged):

API Response
    |
    v
DpathExtractor.extract_records()
    |  Extracts records via field_path
    v
[Record 1, Record 2, ...]  (emitted directly)

3. Component Architecture

3.1 New Classes

OnNoRecords (Enum)

Location: airbyte_cdk/sources/declarative/expanders/record_expander.py

Defines the behavior when record expansion produces no records:

  • skip — Emits nothing (default)
  • emit_parent — Emits the original parent record unchanged

RecordExpander (dataclass)

Location: airbyte_cdk/sources/declarative/expanders/record_expander.py

The core component. Given a parent record, it navigates to a nested array field and yields each element as a separate record.

Attributes:

Attribute Type Default Description
expand_records_from_field Sequence[str | InterpolatedString] (required) DPath to the nested array to expand. Supports wildcards (*).
remain_original_record bool False If true, embeds the parent record under an "original_record" key in each child.
on_no_records OnNoRecords OnNoRecords.skip Behavior when expansion yields nothing. skip emits nothing; emit_parent emits the parent record as-is.
config Config (required) Connector configuration for interpolation.
parameters InitVar[Mapping] (required) Parameters for InterpolatedString resolution.

Key method - expand_record(record):

expand_record(record) -> Iterable[MutableMapping]
  1. Evaluates the expand_records_from_field path (resolving interpolation).
  2. If the path contains a wildcard (*), uses dpath.values() to match multiple nested arrays.
  3. Otherwise, uses dpath.get() to retrieve the single nested value.
  4. For each item in the extracted array:
    • If the item is a dict, creates a shallow copy and applies parent context (via _apply_parent_context).
    • Otherwise, yields the item as-is.
  5. If no items were expanded and on_no_records == OnNoRecords.emit_parent, yields the original parent record.

3.2 Modified Classes

DpathExtractor

A new optional attribute record_expander: Optional[RecordExpander] = None is added. The extract_records() method is modified:

  • Without record_expander: Behavior is identical to before (no change).
  • With record_expander: After extracting each record from the response body, each record is passed through record_expander.expand_record(), and all expanded child records are yielded instead.
# Simplified logic in extract_records():
if isinstance(extracted, list):
    if not self.record_expander:
        yield from extracted                          # original behavior
    else:
        for record in extracted:
            yield from self.record_expander.expand_record(record)  # new behavior

ModelToComponentFactory

  • create_dpath_extractor(): Now checks for model.record_expander and instantiates a RecordExpander if present.
  • New create_record_expander() method: Converts the Pydantic model to a RecordExpander instance, mapping on_no_records string values to the OnNoRecords enum.

declarative_component_schema.yaml

New schema definitions added:

  • RecordExpander: Defines expand_records_from_field, remain_original_record, and on_no_records (enum: skip/emit_parent).
  • DpathExtractor updated: New optional record_expander property referencing RecordExpander.

Auto-generated Pydantic models

declarative_component_schema.py updated with generated RecordExpander and OnNoRecords model classes.

manifest_component_transformer.py

Updated to include propagation mapping for DpathExtractor.record_expander, ensuring parameters are correctly propagated during manifest resolution.

4. YAML Manifest Usage

Basic expansion

extractor:
  type: DpathExtractor
  field_path: ["data", "object"]
  record_expander:
    type: RecordExpander
    expand_records_from_field: ["lines", "data"]

This extracts response.data.object as the parent record, then expands parent.lines.data[] into individual records.

With wildcard path and parent embedding

record_expander:
  type: RecordExpander
  expand_records_from_field: ["sections", "*", "items"]
  remain_original_record: true
  on_no_records: emit_parent

Matches sections[0].items[], sections[1].items[], etc. Each child gets an "original_record" key containing the full parent. If no items are found, the parent is emitted unchanged.

5. Concrete Example: Stripe invoice_line_items

API response from Stripe events endpoint:

{
  "data": {
    "object": {
      "id": "in_123",
      "customer": "cus_456",
      "lines": {
        "data": [
          {"id": "il_aaa", "amount": 1000, "description": "Widget"},
          {"id": "il_bbb", "amount": 2000, "description": "Gadget"}
        ]
      }
    }
  }
}

Manifest configuration:

extractor:
  type: DpathExtractor
  field_path: ["data", "object"]
  record_expander:
    type: RecordExpander
    expand_records_from_field: ["lines", "data"]

Output records:

{"id": "il_aaa", "amount": 1000, "description": "Widget"}
{"id": "il_bbb", "amount": 2000, "description": "Gadget"}

To enrich these with the parent invoice_id, use a downstream AddFields transformation rather than configuring it on RecordExpander.

6. Edge Cases and Behavior Matrix

Scenario on_no_records=skip on_no_records=emit_parent
Nested array has items Yields each item Yields each item
Nested array is empty ([]) Yields nothing Yields parent record
Nested path doesn't exist Yields nothing Yields parent record
Nested value is not an array Yields nothing Yields parent record
Wildcard matches multiple arrays Yields all items from all arrays Same, or parent if total is 0
expand_records_from_field is empty/None Yields original record unchanged Yields original record unchanged

7. Design Decisions and Trade-offs

Why a separate RecordExpander class instead of inline logic in DpathExtractor?

Separation of concerns. DpathExtractor handles response-level extraction (navigating JSON to find records). RecordExpander handles record-level transformation (flattening nested arrays). This keeps each class focused and testable independently, and allows RecordExpander to potentially be reused with other extractor types in the future.

Why dpath for nested access?

The CDK already uses dpath extensively for path-based JSON navigation. Reusing it maintains consistency and avoids introducing new dependencies. The wildcard (*) support comes for free from dpath.values().

Why on_no_records instead of always emitting parent?

Different use cases need different behavior. For Stripe invoice_line_items, if an invoice event has no line items, we want to skip it entirely (skip). For other APIs, the parent record itself might be the meaningful output when no children exist (emit_parent).

Why no parent_fields_to_copy on RecordExpander?

Parent field copying was originally part of this component but was removed to keep RecordExpander focused solely on expansion. Copying parent fields into child records is already supported by existing RecordTransformations (e.g., AddFields), which is the idiomatic CDK pattern for record enrichment. This avoids duplicating transformation logic and keeps the component composable.

Why an OnNoRecords enum instead of raw strings?

Using an enum provides type safety, IDE autocompletion, and prevents invalid values at construction time rather than at runtime.

Shallow copy behavior

dict(item) creates a shallow copy when yielding expanded child records. This means nested mutable objects are shared between the parent and child records. This is acceptable because:

  1. Records are generally treated as read-only after extraction.
  2. A deep copy would be expensive for large records.

8. Files Changed

File Change Type Description
airbyte_cdk/sources/declarative/expanders/record_expander.py New Core RecordExpander class and OnNoRecords enum
airbyte_cdk/sources/declarative/expanders/__init__.py New Module exports
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py Modified Added optional record_expander parameter and expansion logic
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Modified Schema definitions for new components
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Modified Auto-generated Pydantic models
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Modified Factory method for RecordExpander with OnNoRecords enum mapping
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py Modified Parameter propagation mapping
unit_tests/sources/declarative/extractors/test_dpath_extractor.py Modified 19 new unit tests

9. Testing

19 unit tests covering:

  • Basic expansion from nested arrays
  • Wildcard path matching
  • remain_original_record flag
  • on_no_records with both OnNoRecords.skip and OnNoRecords.emit_parent
  • Missing/empty/non-array nested paths
  • Non-dict items in nested arrays
  • Multiple parent records
  • Integration with DpathExtractor.extract_records()

End-to-end validation with the Stripe connector is covered by the companion PR (airbyte/airbyte#70294).

Link to Devin session: https://app.devin.ai/sessions/08169cc37f9342acb410071ab8306f05
Requested by: Alfredo Garcia (@agarctfi)

…thExtractor

- Add optional expand_records_from_field parameter to extract items from nested arrays
- Add optional remain_original_record parameter to preserve parent record context
- Implement _expand_record method to handle array expansion logic
- Add comprehensive unit tests covering all edge cases
- Maintain backward compatibility with existing functionality

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Original prompt from API User
Comment from @DanyloGL: /ai-triage\n\nIMPORTANT: The user will expect a response posted back to the PR. You should post exactly one comment back to the respective issue PR. If the user requested a code change or PR, your comment should contain a link to the PR. Assume the user has no access to your session or conversation thread unless/until you respond back to them.\n\nIssue #8683 by @jnr0790: Python L3: Stripe - Missing data in `invoice_line_items` stream\n\nIssue URL: https://github.com/airbytehq/oncall/issues/8683\n\nPlease use playbook macro: !issue_triage

PLAYBOOK_md:
# `/ai-triage` Slash Command Playbook

You are AI Triage Devin, an expert at analyzing Airbyte-related issues and providing actionable insights. You are responding to a GitHub slash command request. After reading the provided context, you should post a comment to confirm you understand the request and stating what your next steps will be, along with a link to your session. Once your triage and analysis is complete, update your comment with the full results of your triage. Collapse all of your comments under expandable sections.

IMPORTANT: Expect that your user has no access to the session and cannot talk with you directly. Do not wait for feedback or confirmation on any action.

## Context

You are analyzing the issue provided to you above. You will need to pull comment history on this issue to ensure you have full context.

## Your Task: Static Analysis and Triage

1. **Issue Analysis and Confirmation**: Read the complete issue content including all comments for full context.
   - **Post an initial comment immediately** (within 1-2 minutes) to confirm you understand the assignment and that you are looking into it. Include your session URL.
   - If you are missing any critical information or context (e.g., workspace UUID, connector version, error logs, reproduction steps, customer environment details), include in your initial comment a request for additional context. (Do not block waiting for a... (9078 chars truncated...)

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Dec 2, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764690419-dpath-extractor-expansion#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764690419-dpath-extractor-expansion

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Dec 2, 2025
@github-actions
Copy link
Copy Markdown

github-actions bot commented Dec 2, 2025

PyTest Results (Fast)

3 997 tests  +184   3 986 ✅ +185   7m 46s ⏱️ + 1m 15s
    1 suites ±  0      11 💤  -   1 
    1 files   ±  0       0 ❌ ±  0 

Results for commit 939d374. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Dec 2, 2025

PyTest Results (Full)

4 000 tests  +184   3 988 ✅ +184   11m 23s ⏱️ +27s
    1 suites ±  0      12 💤 ±  0 
    1 files   ±  0       0 ❌ ±  0 

Results for commit 939d374. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/
- Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander
- Update DpathExtractor to accept optional record_expander attribute
- Register RecordExpander in manifest component transformer
- Update unit tests to use new RecordExpander class structure
- All 24 tests passing, MyPy and Ruff checks passing

This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component.

Co-Authored-By: unknown <>
- Add RecordExpander definition to declarative_component_schema.yaml
- Add record_expander property to DpathExtractor schema
- Update create_dpath_extractor in model_to_component_factory.py to handle record_expander
- Auto-generate models from schema using poetry run poe build
- All 24 tests passing

This completes the schema registration for RecordExpander component, enabling
YAML manifests to properly instantiate RecordExpander when used with DpathExtractor.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add expand_records_from_field and remain_original_record to DpathExtractor (do not merge) feat: Add RecordExpander component for nested array extraction Dec 2, 2025
Apply cleaner logic using 'yield from' consistently:
- When extracted is a list without record_expander, use 'yield from extracted'
- Check 'if not self.record_expander' instead of nested if/else
- Remove unnecessary 'yield from []' for empty case

All 24 tests passing. Suggested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
- Add back 'else: yield from []' in DpathExtractor for explicit empty case
- Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list
- Update unit tests to expect no records instead of original record when expansion fails

This makes RecordExpander stricter: it only emits records when successfully expanding a list.
For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects.

All 24 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs)
2. Add wildcard (*) support to RecordExpander for matching multiple arrays
3. Update docstring and schema to document wildcard support
4. Add 5 new unit tests for wildcard expansion scenarios
5. Regenerate models from updated schema

When wildcards are used, RecordExpander:
- Uses dpath.values() to find all matches
- Filters for list-valued matches only
- Expands items from all matched lists
- Returns nothing if no list matches found

All 29 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
MyPy was complaining that dpath.values() and dpath.get() return 'object' type.
Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get()
to satisfy MyPy type checking while maintaining runtime behavior.

All 29 tests passing. MyPy check now passes.

Co-Authored-By: unknown <>
Unified the wildcard and non-wildcard branches by collecting all arrays
to process into a single list, then using one common loop for expansion.
This eliminates the duplicated item iteration and record expansion logic.

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add RecordExpander component for nested array extraction feat(cdk): Add RecordExpander component for nested array extraction Dec 2, 2025
Changes per Daryna's feedback:
1. Removed isinstance(m, list) filter - now checking inside loop
2. Renamed 'matches' to 'extracted'
3. Removed type casts - using 'extracted: Any' instead
4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original
5. Removed 'if not nested_array:' check (redundant with for loop)

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
- Add on_no_records parameter with 'skip' (default) and 'emit_parent' options
- Add parent_fields_to_copy parameter to copy specific parent fields to child records
- Add ParentFieldMapping class to define source/target field mappings
- Update schema YAML with new properties and ParentFieldMapping definition
- Regenerate models from schema
- Add comprehensive unit tests for new features

Co-Authored-By: unknown <>
@sophiecuiy sophiecuiy marked this pull request as ready for review February 4, 2026 20:51
Copilot AI review requested due to automatic review settings February 4, 2026 20:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This addresses the Stripe connector's need to handle invoice_line_items and subscription_items streams where parent objects contain nested arrays that need to be flattened.

Changes:

  • Added RecordExpander class with support for wildcard paths, optional parent record embedding, configurable empty-array behavior, and selective parent field copying
  • Integrated RecordExpander with DpathExtractor via optional record_expander parameter
  • Updated schema definitions and component factory to support the new component
  • Added 40 comprehensive unit tests covering expansion scenarios, wildcard paths, and edge cases

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
airbyte_cdk/sources/declarative/expanders/record_expander.py New core implementation of RecordExpander and ParentFieldMapping classes
airbyte_cdk/sources/declarative/expanders/__init__.py Module initialization exporting new expander components
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py Integration of RecordExpander into DpathExtractor with optional expansion logic
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated Pydantic models for new components and parameters
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Schema definitions for RecordExpander and ParentFieldMapping
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Factory method updates to instantiate RecordExpander from models
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py Component transformer mapping for DpathExtractor.record_expander
unit_tests/sources/declarative/extractors/test_dpath_extractor.py 40 new unit tests for expansion, on_no_records, parent_fields_to_copy, and combined features

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sophiecuiy
Copy link
Copy Markdown
Contributor

sophiecuiy commented Feb 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21726348626

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

❌ Cannot revive Devin session - the session is too old. Please start a new session instead.

@agarctfi
Copy link
Copy Markdown
Contributor

Alfredo Garcia (agarctfi) commented Feb 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@agarctfi
Copy link
Copy Markdown
Contributor

Alfredo Garcia (agarctfi) commented Feb 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

…ecordExpander

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
@agarctfi
Copy link
Copy Markdown
Contributor

Alfredo Garcia (agarctfi) commented Mar 27, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@pnilan
Copy link
Copy Markdown
Contributor

AsideAlfredo Garcia (@agarctfi), here are some findings from reviewing this PR:

Issues worth addressing

  1. Shallow copy + remain_original_record mutation risk_apply_parent_context sets child_record["original_record"] = parent_record, which is a direct reference to the mutable parent dict. Since the stated pattern is to use downstream AddFields transformations (which mutate records in-place), modifications to one child's original_record would affect all siblings. Consider using copy.deepcopy(parent_record) here, or at minimum document the caveat.

  2. Non-dict record passed to expand_record will crash — In dpath_extractor.py, the elif extracted: branch now routes single non-dict values through record_expander.expand_record(). Inside expand_record, dpath.get(parent_record, expand_path) will raise on a non-dict input. This path is unlikely with typical usage but is a real bug if hit.

Nits / suggestions

  1. Duplicated wildcard/non-wildcard code paths — The wildcard and non-wildcard branches in expand_record() are nearly identical. dpath.values() on a non-wildcard path returns a single-element list, so you could unify them.

  2. expanders/ package placement — This component is tightly coupled to DpathExtractor and has no standalone use case. It would be more natural under extractors/ rather than a new top-level expanders/ package for a single class.

  3. Non-dict items + remain_original_record=True — When the nested array contains non-dict items (e.g., primitives), they're yielded without parent context even when remain_original_record=True. This combo is untested and the behavior may be surprising to users.

  4. Redundant hasattr check in factorycreate_dpath_extractor uses hasattr(model, "record_expander"), but since the Pydantic model defines record_expander with a default of None, hasattr always returns True. Other factory methods just check if model.<field>: directly.

  5. _expand_path typed as list | None but never None — In __post_init__, _expand_path is always set to a list (possibly empty). The Optional in the type annotation is misleading.

  6. No interpolation test coverage — The schema declares interpolation_context: [config] for expand_records_from_field, but no test exercises interpolated path segments (e.g., {{ config['field_name'] }}).

  7. InterpolatedString in dataclass type signature but schema only allows str — The dataclass declares Sequence[str | InterpolatedString] for expand_records_from_field, but the YAML schema and Pydantic model only accept List[str]. The InterpolatedString.create() wrapping makes it work, but the type signature is misleading.

Copy link
Copy Markdown
Contributor

@pnilan Patrick Nilan (pnilan) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment

@agarctfi Alfredo Garcia (agarctfi) changed the title feat(cdk): Add RecordExpander component for nested array extraction feat(DpathExtractor): Add RecordExpander component for nested array extraction Mar 31, 2026
@agarctfi
Copy link
Copy Markdown
Contributor

Patrick Nilan (@pnilan) Thanks! Can you check the latest commit?

@agarctfi
Copy link
Copy Markdown
Contributor

/ai-review

@agarctfi Alfredo Garcia (agarctfi) merged commit 5b14f41 into main Apr 6, 2026
28 of 29 checks passed
@agarctfi Alfredo Garcia (agarctfi) deleted the devin/1764690419-dpath-extractor-expansion branch April 6, 2026 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants