Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d077d99
feat: Add expand_records_from_field and remain_original_record to Dpa…
devin-ai-integration[bot] Dec 2, 2025
81d4630
style: Fix ruff formatting in dpath_extractor.py
devin-ai-integration[bot] Dec 2, 2025
c7ac5f2
style: Fix ruff formatting in test_dpath_extractor.py
devin-ai-integration[bot] Dec 2, 2025
24c8ac9
fix: Add type annotation for _expand_path to fix MyPy error
devin-ai-integration[bot] Dec 2, 2025
91690f4
refactor: Extract record expansion logic into RecordExpander class
devin-ai-integration[bot] Dec 2, 2025
c035138
feat: Add RecordExpander to declarative component schema
devin-ai-integration[bot] Dec 2, 2025
b04e174
refactor: Clean up DpathExtractor extract_records logic
devin-ai-integration[bot] Dec 2, 2025
c8a2643
fix: Update RecordExpander to return nothing when path doesn't exist
devin-ai-integration[bot] Dec 2, 2025
c6a9d05
feat: Add wildcard support to RecordExpander and remove TypeError
devin-ai-integration[bot] Dec 2, 2025
c6448e5
fix: Add type casts for dpath.values and dpath.get to fix MyPy errors
devin-ai-integration[bot] Dec 2, 2025
6afe474
refactor: Eliminate code duplication in expand_record method
devin-ai-integration[bot] Dec 2, 2025
5b0c0d5
refactor: Simplify expand_record per code review feedback
devin-ai-integration[bot] Dec 2, 2025
2ca9ad7
feat: Add on_no_records and parent_fields_to_copy to RecordExpander
devin-ai-integration[bot] Feb 3, 2026
1aadd2f
Add missing import
agarctfi Feb 6, 2026
1f31837
Auto-fix lint and format issues
Feb 6, 2026
f6cf99c
fix: Use Sequence instead of list for covariant type annotations in R…
devin-ai-integration[bot] Feb 6, 2026
ead1747
refactor(RecordExpander): Remove ParentFieldMapping to delegate to Re…
agarctfi Mar 27, 2026
967dffd
refactor(RecordExpander): Add enum for on_no_records
agarctfi Mar 27, 2026
5e3b13b
Auto-fix lint and format issues
Mar 27, 2026
939d374
fix: harden record expander and clean up stripe subscription_items ex…
agarctfi Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,10 @@ definitions:
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
record_expander:
title: Record Expander
description: Optional component to expand records by extracting items from nested array fields.
"$ref": "#/definitions/RecordExpander"
$parameters:
type: object
additionalProperties: true
Expand All @@ -1928,6 +1932,46 @@ definitions:
$parameters:
type: object
additionalProperties: true
RecordExpander:
title: Record Expander
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
type: object
required:
- type
- expand_records_from_field
properties:
type:
type: string
enum: [RecordExpander]
expand_records_from_field:
title: Expand Records From Field
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["lines", "data"]
- ["items"]
- ["nested", "array"]
- ["sections", "*", "items"]
remain_original_record:
title: Remain Original Record
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
type: boolean
default: false
on_no_records:
title: On No Records
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
type: string
enum:
- skip
- emit_parent
default: skip
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.expanders.record_expander import OnNoRecords, RecordExpander

__all__ = ["OnNoRecords", "RecordExpander"]
128 changes: 128 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/record_expander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, Mapping, MutableMapping, Sequence

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


class OnNoRecords(Enum):
"""
Behavior when record expansion produces no records.
"""

skip = "skip"
emit_parent = "emit_parent"


@dataclass
class RecordExpander:
"""Expands records by extracting items from a nested array field.

When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Set `remain_original_record: true`
to embed the full parent record under `original_record` in each expanded item when you need
downstream transformations to access parent context.

The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
When wildcards are used, items from all matched arrays are extracted and emitted.

Examples of instantiating this component:
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

```
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
on_no_records: emit_parent
```

Attributes:
expand_records_from_field: Path to a nested array field within each record.
Items from this array will be extracted and emitted as separate records.
Supports wildcards (*).
remain_original_record: If True, each expanded record will include the original
parent record in an "original_record" field. Defaults to False.
on_no_records: Behavior when expansion produces no records. "skip" (default)
emits nothing. "emit_parent" emits the original parent record unchanged.
config: The user-provided configuration as specified by the source's spec.
"""

expand_records_from_field: Sequence[str]
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: OnNoRecords = OnNoRecords.skip

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] = [
InterpolatedString.create(path, parameters=parameters)
for path in self.expand_records_from_field
]

def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
"""Expand a record by extracting items from a nested array field."""
if not isinstance(record, Mapping):
# If the input isn't a mapping, expansion can't proceed; yield as-is.
yield record
return

if not self._expand_path:
yield record
return

parent_record = record
expand_path = [path.eval(self.config) for path in self._expand_path]
expanded_any = False

try:
extracted_values = dpath.values(parent_record, expand_path)
except KeyError:
extracted_values = []

for extracted in extracted_values:
if not isinstance(extracted, list):
continue
items = extracted
for item in items:
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
if self.remain_original_record:
yield {
"value": item,
"original_record": copy.deepcopy(parent_record),
}
else:
yield item
expanded_any = True

if not expanded_any and self.on_no_records == OnNoRecords.emit_parent:
yield parent_record

def _apply_parent_context(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Apply parent context to a child record."""
if self.remain_original_record:
child_record["original_record"] = copy.deepcopy(parent_record)
35 changes: 32 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
Expand All @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
If the field path points to an empty object, an empty array is returned.
If the field path points to a non-existing path, an empty array is returned.

Optionally, records can be expanded by providing a RecordExpander component.
When record_expander is configured, each extracted record is passed through the
expander which extracts items from nested array fields and emits each item as a
separate record.

Examples of instantiating this transform:
```
extractor:
Expand All @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
field_path: []
```

```
extractor:
type: DpathExtractor
field_path:
- "data"
- "object"
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

Attributes:
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
"""

field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
record_expander: Optional[RecordExpander] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
if not self.record_expander:
yield from extracted
else:
for record in extracted:
yield from self.record_expander.expand_record(record)
elif extracted:
yield extracted
if self.record_expander:
yield from self.record_expander.expand_record(extracted)
else:
yield extracted
else:
yield from []
Loading
Loading