Skip to content

Commit 5b14f41

Browse files
devin-ai-integration[bot]agarctfioctavia-squidington-iii
authored
feat(DpathExtractor): Add RecordExpander component for nested array extraction (#859)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu> Co-authored-by: octavia-squidington-iii <contact@airbyte.com> Co-authored-by: alfredo.garcia@airbyte.io
1 parent 0494fd4 commit 5b14f41

File tree

8 files changed

+691
-40
lines changed

8 files changed

+691
-40
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1945,6 +1945,10 @@ definitions:
19451945
- ["data", "records"]
19461946
- ["data", "{{ parameters.name }}"]
19471947
- ["data", "*", "record"]
1948+
record_expander:
1949+
title: Record Expander
1950+
description: Optional component to expand records by extracting items from nested array fields.
1951+
"$ref": "#/definitions/RecordExpander"
19481952
$parameters:
19491953
type: object
19501954
additionalProperties: true
@@ -1961,6 +1965,46 @@ definitions:
19611965
$parameters:
19621966
type: object
19631967
additionalProperties: true
1968+
RecordExpander:
1969+
title: Record Expander
1970+
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
1971+
type: object
1972+
required:
1973+
- type
1974+
- expand_records_from_field
1975+
properties:
1976+
type:
1977+
type: string
1978+
enum: [RecordExpander]
1979+
expand_records_from_field:
1980+
title: Expand Records From Field
1981+
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
1982+
type: array
1983+
items:
1984+
type: string
1985+
interpolation_context:
1986+
- config
1987+
examples:
1988+
- ["lines", "data"]
1989+
- ["items"]
1990+
- ["nested", "array"]
1991+
- ["sections", "*", "items"]
1992+
remain_original_record:
1993+
title: Remain Original Record
1994+
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
1995+
type: boolean
1996+
default: false
1997+
on_no_records:
1998+
title: On No Records
1999+
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
2000+
type: string
2001+
enum:
2002+
- skip
2003+
- emit_parent
2004+
default: skip
2005+
$parameters:
2006+
type: object
2007+
additionalProperties: true
19642008
ExponentialBackoffStrategy:
19652009
title: Exponential Backoff
19662010
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.expanders.record_expander import OnNoRecords, RecordExpander
6+
7+
__all__ = ["OnNoRecords", "RecordExpander"]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import copy
6+
from dataclasses import InitVar, dataclass
7+
from enum import Enum
8+
from typing import Any, Iterable, Mapping, MutableMapping, Sequence
9+
10+
import dpath
11+
12+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
13+
from airbyte_cdk.sources.types import Config
14+
15+
16+
class OnNoRecords(Enum):
17+
"""
18+
Behavior when record expansion produces no records.
19+
"""
20+
21+
skip = "skip"
22+
emit_parent = "emit_parent"
23+
24+
25+
@dataclass
26+
class RecordExpander:
27+
"""Expands records by extracting items from a nested array field.
28+
29+
When configured, this component extracts items from a specified nested array path
30+
within each record and emits each item as a separate record. Set `remain_original_record: true`
31+
to embed the full parent record under `original_record` in each expanded item when you need
32+
downstream transformations to access parent context.
33+
34+
The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
35+
When wildcards are used, items from all matched arrays are extracted and emitted.
36+
37+
Examples of instantiating this component:
38+
```
39+
record_expander:
40+
type: RecordExpander
41+
expand_records_from_field:
42+
- "lines"
43+
- "data"
44+
remain_original_record: true
45+
```
46+
47+
```
48+
record_expander:
49+
type: RecordExpander
50+
expand_records_from_field:
51+
- "sections"
52+
- "*"
53+
- "items"
54+
on_no_records: emit_parent
55+
```
56+
57+
Attributes:
58+
expand_records_from_field: Path to a nested array field within each record.
59+
Items from this array will be extracted and emitted as separate records.
60+
Supports wildcards (*).
61+
remain_original_record: If True, each expanded record will include the original
62+
parent record in an "original_record" field. Defaults to False.
63+
on_no_records: Behavior when expansion produces no records. "skip" (default)
64+
emits nothing. "emit_parent" emits the original parent record unchanged.
65+
config: The user-provided configuration as specified by the source's spec.
66+
"""
67+
68+
expand_records_from_field: Sequence[str]
69+
config: Config
70+
parameters: InitVar[Mapping[str, Any]]
71+
remain_original_record: bool = False
72+
on_no_records: OnNoRecords = OnNoRecords.skip
73+
74+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
75+
self._expand_path: list[InterpolatedString] = [
76+
InterpolatedString.create(path, parameters=parameters)
77+
for path in self.expand_records_from_field
78+
]
79+
80+
def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
81+
"""Expand a record by extracting items from a nested array field."""
82+
if not isinstance(record, Mapping):
83+
# If the input isn't a mapping, expansion can't proceed; yield as-is.
84+
yield record
85+
return
86+
87+
if not self._expand_path:
88+
yield record
89+
return
90+
91+
parent_record = record
92+
expand_path = [path.eval(self.config) for path in self._expand_path]
93+
expanded_any = False
94+
95+
try:
96+
extracted_values = dpath.values(parent_record, expand_path)
97+
except KeyError:
98+
extracted_values = []
99+
100+
for extracted in extracted_values:
101+
if not isinstance(extracted, list):
102+
continue
103+
items = extracted
104+
for item in items:
105+
if isinstance(item, dict):
106+
expanded_record = dict(item)
107+
self._apply_parent_context(parent_record, expanded_record)
108+
yield expanded_record
109+
expanded_any = True
110+
else:
111+
if self.remain_original_record:
112+
yield {
113+
"value": item,
114+
"original_record": copy.deepcopy(parent_record),
115+
}
116+
else:
117+
yield item
118+
expanded_any = True
119+
120+
if not expanded_any and self.on_no_records == OnNoRecords.emit_parent:
121+
yield parent_record
122+
123+
def _apply_parent_context(
124+
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
125+
) -> None:
126+
"""Apply parent context to a child record."""
127+
if self.remain_original_record:
128+
child_record["original_record"] = copy.deepcopy(parent_record)

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

88
import dpath
99
import requests
1010

1111
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
12+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
1213
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1415
from airbyte_cdk.sources.types import Config
@@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
2425
If the field path points to an empty object, an empty array is returned.
2526
If the field path points to a non-existing path, an empty array is returned.
2627
28+
Optionally, records can be expanded by providing a RecordExpander component.
29+
When record_expander is configured, each extracted record is passed through the
30+
expander which extracts items from nested array fields and emits each item as a
31+
separate record.
32+
2733
Examples of instantiating this transform:
2834
```
2935
extractor:
@@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
4753
field_path: []
4854
```
4955
56+
```
57+
extractor:
58+
type: DpathExtractor
59+
field_path:
60+
- "data"
61+
- "object"
62+
record_expander:
63+
type: RecordExpander
64+
expand_records_from_field:
65+
- "lines"
66+
- "data"
67+
remain_original_record: true
68+
```
69+
5070
Attributes:
5171
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
5272
config (Config): The user-provided configuration as specified by the source's spec
5373
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
74+
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
5475
"""
5576

5677
field_path: List[Union[InterpolatedString, str]]
5778
config: Config
5879
parameters: InitVar[Mapping[str, Any]]
5980
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
81+
record_expander: Optional[RecordExpander] = None
6082

6183
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6284
self._field_path = [
@@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
79101
else:
80102
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
81103
if isinstance(extracted, list):
82-
yield from extracted
104+
if not self.record_expander:
105+
yield from extracted
106+
else:
107+
for record in extracted:
108+
yield from self.record_expander.expand_record(record)
83109
elif extracted:
84-
yield extracted
110+
if self.record_expander:
111+
yield from self.record_expander.expand_record(extracted)
112+
else:
113+
yield extracted
85114
else:
86115
yield from []

0 commit comments

Comments
 (0)