Skip to content

Commit 1878d1b

Browse files
feat(cdk): Add RecordExpander component for nested array extraction
Co-Authored-By: sophie.cui@airbyte.io <sophie.cui@airbyte.io>
1 parent 15542de commit 1878d1b

8 files changed

Lines changed: 1005 additions & 98 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,10 @@ definitions:
19271927
- ["data", "records"]
19281928
- ["data", "{{ parameters.name }}"]
19291929
- ["data", "*", "record"]
1930+
record_expander:
1931+
title: Record Expander
1932+
description: Optional component to expand records by extracting items from nested array fields.
1933+
"$ref": "#/definitions/RecordExpander"
19301934
$parameters:
19311935
type: object
19321936
additionalProperties: true
@@ -1943,6 +1947,86 @@ definitions:
19431947
$parameters:
19441948
type: object
19451949
additionalProperties: true
1950+
RecordExpander:
1951+
title: Record Expander
1952+
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
1953+
type: object
1954+
required:
1955+
- type
1956+
- expand_records_from_field
1957+
properties:
1958+
type:
1959+
type: string
1960+
enum: [RecordExpander]
1961+
expand_records_from_field:
1962+
title: Expand Records From Field
1963+
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
1964+
type: array
1965+
items:
1966+
type: string
1967+
interpolation_context:
1968+
- config
1969+
examples:
1970+
- ["lines", "data"]
1971+
- ["items"]
1972+
- ["nested", "array"]
1973+
- ["sections", "*", "items"]
1974+
remain_original_record:
1975+
title: Remain Original Record
1976+
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
1977+
type: boolean
1978+
default: false
1979+
on_no_records:
1980+
title: On No Records
1981+
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
1982+
type: string
1983+
enum:
1984+
- skip
1985+
- emit_parent
1986+
default: skip
1987+
parent_fields_to_copy:
1988+
title: Parent Fields To Copy
1989+
description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.
1990+
type: array
1991+
items:
1992+
"$ref": "#/definitions/ParentFieldMapping"
1993+
$parameters:
1994+
type: object
1995+
additionalProperties: true
1996+
ParentFieldMapping:
1997+
title: Parent Field Mapping
1998+
description: Defines a mapping from a parent record field to a child record field.
1999+
type: object
2000+
required:
2001+
- type
2002+
- source_field_path
2003+
- target_field
2004+
properties:
2005+
type:
2006+
type: string
2007+
enum: [ParentFieldMapping]
2008+
source_field_path:
2009+
title: Source Field Path
2010+
description: Path to the field in the parent record to copy.
2011+
type: array
2012+
items:
2013+
type: string
2014+
interpolation_context:
2015+
- config
2016+
examples:
2017+
- ["id"]
2018+
- ["created"]
2019+
- ["metadata", "timestamp"]
2020+
target_field:
2021+
title: Target Field
2022+
description: Name of the field in the child record where the value will be copied.
2023+
type: string
2024+
examples:
2025+
- "parent_id"
2026+
- "subscription_updated"
2027+
$parameters:
2028+
type: object
2029+
additionalProperties: true
19462030
ExponentialBackoffStrategy:
19472031
title: Exponential Backoff
19482032
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.expanders.record_expander import (
6+
ParentFieldMapping,
7+
RecordExpander,
8+
)
9+
10+
__all__ = ["ParentFieldMapping", "RecordExpander"]
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass, field
6+
from typing import Any, Iterable, Mapping, MutableMapping
7+
8+
import dpath
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.sources.types import Config
12+
13+
14+
@dataclass
15+
class ParentFieldMapping:
16+
"""Defines a mapping from a parent record field to a child record field."""
17+
18+
source_field_path: list[str | InterpolatedString]
19+
target_field: str
20+
config: Config
21+
parameters: InitVar[Mapping[str, Any]]
22+
23+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
24+
self._source_path = [
25+
InterpolatedString.create(path, parameters=parameters)
26+
for path in self.source_field_path
27+
]
28+
29+
def copy_field(
30+
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
31+
) -> None:
32+
"""Copy a field from parent record to child record."""
33+
source_path = [path.eval(self.config) for path in self._source_path]
34+
try:
35+
value = dpath.get(dict(parent_record), source_path)
36+
child_record[self.target_field] = value
37+
except KeyError:
38+
pass
39+
40+
41+
@dataclass
42+
class RecordExpander:
43+
"""Expands records by extracting items from a nested array field.
44+
45+
When configured, this component extracts items from a specified nested array path
46+
within each record and emits each item as a separate record. Optionally, the original
47+
parent record can be embedded in each expanded item for context preservation.
48+
49+
The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
50+
When wildcards are used, items from all matched arrays are extracted and emitted.
51+
52+
Examples of instantiating this component:
53+
```
54+
record_expander:
55+
type: RecordExpander
56+
expand_records_from_field:
57+
- "lines"
58+
- "data"
59+
remain_original_record: true
60+
```
61+
62+
```
63+
record_expander:
64+
type: RecordExpander
65+
expand_records_from_field:
66+
- "sections"
67+
- "*"
68+
- "items"
69+
on_no_records: emit_parent
70+
parent_fields_to_copy:
71+
- type: ParentFieldMapping
72+
source_field_path: ["id"]
73+
target_field: "parent_id"
74+
```
75+
76+
Attributes:
77+
expand_records_from_field: Path to a nested array field within each record.
78+
Items from this array will be extracted and emitted as separate records.
79+
Supports wildcards (*).
80+
remain_original_record: If True, each expanded record will include the original
81+
parent record in an "original_record" field. Defaults to False.
82+
on_no_records: Behavior when expansion produces no records. "skip" (default)
83+
emits nothing. "emit_parent" emits the original parent record unchanged.
84+
parent_fields_to_copy: List of field mappings to copy from parent to each
85+
expanded child record.
86+
config: The user-provided configuration as specified by the source's spec.
87+
"""
88+
89+
expand_records_from_field: list[str | InterpolatedString]
90+
config: Config
91+
parameters: InitVar[Mapping[str, Any]]
92+
remain_original_record: bool = False
93+
on_no_records: str = "skip"
94+
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
95+
96+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
97+
self._expand_path: list[InterpolatedString] | None = [
98+
InterpolatedString.create(path, parameters=parameters)
99+
for path in self.expand_records_from_field
100+
]
101+
102+
def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
103+
"""Expand a record by extracting items from a nested array field."""
104+
if not self._expand_path:
105+
yield record
106+
return
107+
108+
parent_record = record
109+
expand_path = [path.eval(self.config) for path in self._expand_path]
110+
expanded_any = False
111+
112+
if "*" in expand_path:
113+
extracted: Any = dpath.values(parent_record, expand_path)
114+
for record in extracted:
115+
if isinstance(record, list):
116+
for item in record:
117+
if isinstance(item, dict):
118+
expanded_record = dict(item)
119+
self._apply_parent_context(parent_record, expanded_record)
120+
yield expanded_record
121+
expanded_any = True
122+
else:
123+
yield item
124+
expanded_any = True
125+
else:
126+
try:
127+
extracted = dpath.get(parent_record, expand_path)
128+
except KeyError:
129+
extracted = None
130+
131+
if isinstance(extracted, list):
132+
for item in extracted:
133+
if isinstance(item, dict):
134+
expanded_record = dict(item)
135+
self._apply_parent_context(parent_record, expanded_record)
136+
yield expanded_record
137+
expanded_any = True
138+
else:
139+
yield item
140+
expanded_any = True
141+
142+
if not expanded_any and self.on_no_records == "emit_parent":
143+
yield parent_record
144+
145+
def _apply_parent_context(
146+
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
147+
) -> None:
148+
"""Apply parent context to a child record."""
149+
if self.remain_original_record:
150+
child_record["original_record"] = parent_record
151+
152+
for field_mapping in self.parent_fields_to_copy:
153+
field_mapping.copy_field(parent_record, child_record)

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

88
import dpath
99
import requests
1010

1111
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
12+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
1213
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1415
from airbyte_cdk.sources.types import Config
@@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
2425
If the field path points to an empty object, an empty array is returned.
2526
If the field path points to a non-existing path, an empty array is returned.
2627
28+
Optionally, records can be expanded by providing a RecordExpander component.
29+
When record_expander is configured, each extracted record is passed through the
30+
expander which extracts items from nested array fields and emits each item as a
31+
separate record.
32+
2733
Examples of instantiating this transform:
2834
```
2935
extractor:
@@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
4753
field_path: []
4854
```
4955
56+
```
57+
extractor:
58+
type: DpathExtractor
59+
field_path:
60+
- "data"
61+
- "object"
62+
record_expander:
63+
type: RecordExpander
64+
expand_records_from_field:
65+
- "lines"
66+
- "data"
67+
remain_original_record: true
68+
```
69+
5070
Attributes:
5171
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
5272
config (Config): The user-provided configuration as specified by the source's spec
5373
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
74+
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
5475
"""
5576

5677
field_path: List[Union[InterpolatedString, str]]
5778
config: Config
5879
parameters: InitVar[Mapping[str, Any]]
5980
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
81+
record_expander: Optional[RecordExpander] = None
6082

6183
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6284
self._field_path = [
@@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
79101
else:
80102
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
81103
if isinstance(extracted, list):
82-
yield from extracted
104+
if not self.record_expander:
105+
yield from extracted
106+
else:
107+
for record in extracted:
108+
yield from self.record_expander.expand_record(record)
83109
elif extracted:
84-
yield extracted
110+
if self.record_expander:
111+
yield from self.record_expander.expand_record(extracted)
112+
else:
113+
yield extracted
85114
else:
86115
yield from []

0 commit comments

Comments
 (0)