-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathrecord_expander.py
More file actions
128 lines (106 loc) · 4.47 KB
/
record_expander.py
File metadata and controls
128 lines (106 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import copy
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, Mapping, MutableMapping, Sequence
import dpath
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
class OnNoRecords(Enum):
"""
Behavior when record expansion produces no records.
"""
skip = "skip"
emit_parent = "emit_parent"
@dataclass
class RecordExpander:
"""Expands records by extracting items from a nested array field.
When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Set `remain_original_record: true`
to embed the full parent record under `original_record` in each expanded item when you need
downstream transformations to access parent context.
The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
When wildcards are used, items from all matched arrays are extracted and emitted.
Examples of instantiating this component:
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
on_no_records: emit_parent
```
Attributes:
expand_records_from_field: Path to a nested array field within each record.
Items from this array will be extracted and emitted as separate records.
Supports wildcards (*).
remain_original_record: If True, each expanded record will include the original
parent record in an "original_record" field. Defaults to False.
on_no_records: Behavior when expansion produces no records. "skip" (default)
emits nothing. "emit_parent" emits the original parent record unchanged.
config: The user-provided configuration as specified by the source's spec.
"""
expand_records_from_field: Sequence[str]
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: OnNoRecords = OnNoRecords.skip
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] = [
InterpolatedString.create(path, parameters=parameters)
for path in self.expand_records_from_field
]
def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
"""Expand a record by extracting items from a nested array field."""
if not isinstance(record, Mapping):
# If the input isn't a mapping, expansion can't proceed; yield as-is.
yield record
return
if not self._expand_path:
yield record
return
parent_record = record
expand_path = [path.eval(self.config) for path in self._expand_path]
expanded_any = False
try:
extracted_values = dpath.values(parent_record, expand_path)
except KeyError:
extracted_values = []
for extracted in extracted_values:
if not isinstance(extracted, list):
continue
items = extracted
for item in items:
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
if self.remain_original_record:
yield {
"value": item,
"original_record": copy.deepcopy(parent_record),
}
else:
yield item
expanded_any = True
if not expanded_any and self.on_no_records == OnNoRecords.emit_parent:
yield parent_record
def _apply_parent_context(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Apply parent context to a child record."""
if self.remain_original_record:
child_record["original_record"] = copy.deepcopy(parent_record)