Skip to content

Commit 2ca9ad7

Browse files
feat: Add on_no_records and parent_fields_to_copy to RecordExpander
- Add on_no_records parameter with 'skip' (default) and 'emit_parent' options - Add parent_fields_to_copy parameter to copy specific parent fields to child records - Add ParentFieldMapping class to define source/target field mappings - Update schema YAML with new properties and ParentFieldMapping definition - Regenerate models from schema - Add comprehensive unit tests for new features Co-Authored-By: unknown <>
1 parent 5b0c0d5 commit 2ca9ad7

5 files changed

Lines changed: 473 additions & 74 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,6 +1961,54 @@ definitions:
19611961
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
19621962
type: boolean
19631963
default: false
1964+
on_no_records:
1965+
title: On No Records
1966+
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
1967+
type: string
1968+
enum:
1969+
- skip
1970+
- emit_parent
1971+
default: skip
1972+
parent_fields_to_copy:
1973+
title: Parent Fields To Copy
1974+
description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.
1975+
type: array
1976+
items:
1977+
"$ref": "#/definitions/ParentFieldMapping"
1978+
$parameters:
1979+
type: object
1980+
additionalProperties: true
1981+
ParentFieldMapping:
1982+
title: Parent Field Mapping
1983+
description: Defines a mapping from a parent record field to a child record field.
1984+
type: object
1985+
required:
1986+
- type
1987+
- source_field_path
1988+
- target_field
1989+
properties:
1990+
type:
1991+
type: string
1992+
enum: [ParentFieldMapping]
1993+
source_field_path:
1994+
title: Source Field Path
1995+
description: Path to the field in the parent record to copy.
1996+
type: array
1997+
items:
1998+
type: string
1999+
interpolation_context:
2000+
- config
2001+
examples:
2002+
- ["id"]
2003+
- ["created"]
2004+
- ["metadata", "timestamp"]
2005+
target_field:
2006+
title: Target Field
2007+
description: Name of the field in the child record where the value will be copied.
2008+
type: string
2009+
examples:
2010+
- "parent_id"
2011+
- "subscription_updated"
19642012
$parameters:
19652013
type: object
19662014
additionalProperties: true

airbyte_cdk/sources/declarative/expanders/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
5+
from airbyte_cdk.sources.declarative.expanders.record_expander import (
6+
ParentFieldMapping,
7+
RecordExpander,
8+
)
69

7-
__all__ = ["RecordExpander"]
10+
__all__ = ["ParentFieldMapping", "RecordExpander"]

airbyte_cdk/sources/declarative/expanders/record_expander.py

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,45 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from dataclasses import InitVar, dataclass
6-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
5+
from dataclasses import InitVar, dataclass, field
6+
from typing import Any, Iterable, Mapping, MutableMapping
77

88
import dpath
99

1010
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1111
from airbyte_cdk.sources.types import Config
1212

1313

14+
@dataclass
15+
class ParentFieldMapping:
16+
"""Defines a mapping from a parent record field to a child record field."""
17+
18+
source_field_path: list[str | InterpolatedString]
19+
target_field: str
20+
config: Config
21+
parameters: InitVar[Mapping[str, Any]]
22+
23+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
24+
self._source_path = [
25+
InterpolatedString.create(path, parameters=parameters)
26+
for path in self.source_field_path
27+
]
28+
29+
def copy_field(
30+
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
31+
) -> None:
32+
"""Copy a field from parent record to child record."""
33+
source_path = [path.eval(self.config) for path in self._source_path]
34+
try:
35+
value = dpath.get(dict(parent_record), source_path)
36+
child_record[self.target_field] = value
37+
except KeyError:
38+
pass
39+
40+
1441
@dataclass
1542
class RecordExpander:
16-
"""
17-
Expands records by extracting items from a nested array field.
43+
"""Expands records by extracting items from a nested array field.
1844
1945
When configured, this component extracts items from a specified nested array path
2046
within each record and emits each item as a separate record. Optionally, the original
@@ -40,22 +66,35 @@ class RecordExpander:
4066
- "sections"
4167
- "*"
4268
- "items"
43-
remain_original_record: false
69+
on_no_records: emit_parent
70+
parent_fields_to_copy:
71+
- type: ParentFieldMapping
72+
source_field_path: ["id"]
73+
target_field: "parent_id"
4474
```
4575
4676
Attributes:
47-
expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*).
48-
remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
49-
config (Config): The user-provided configuration as specified by the source's spec
77+
expand_records_from_field: Path to a nested array field within each record.
78+
Items from this array will be extracted and emitted as separate records.
79+
Supports wildcards (*).
80+
remain_original_record: If True, each expanded record will include the original
81+
parent record in an "original_record" field. Defaults to False.
82+
on_no_records: Behavior when expansion produces no records. "skip" (default)
83+
emits nothing. "emit_parent" emits the original parent record unchanged.
84+
parent_fields_to_copy: List of field mappings to copy from parent to each
85+
expanded child record.
86+
config: The user-provided configuration as specified by the source's spec.
5087
"""
5188

52-
expand_records_from_field: List[Union[InterpolatedString, str]]
89+
expand_records_from_field: list[str | InterpolatedString]
5390
config: Config
5491
parameters: InitVar[Mapping[str, Any]]
5592
remain_original_record: bool = False
93+
on_no_records: str = "skip"
94+
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
5695

5796
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
58-
self._expand_path: Optional[List[InterpolatedString]] = [
97+
self._expand_path: list[InterpolatedString] | None = [
5998
InterpolatedString.create(path, parameters=parameters)
6099
for path in self.expand_records_from_field
61100
]
@@ -68,6 +107,7 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap
68107

69108
parent_record = record
70109
expand_path = [path.eval(self.config) for path in self._expand_path]
110+
expanded_any = False
71111

72112
if "*" in expand_path:
73113
extracted: Any = dpath.values(parent_record, expand_path)
@@ -76,23 +116,38 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap
76116
for item in record:
77117
if isinstance(item, dict):
78118
expanded_record = dict(item)
79-
if self.remain_original_record:
80-
expanded_record["original_record"] = parent_record
119+
self._apply_parent_context(parent_record, expanded_record)
81120
yield expanded_record
121+
expanded_any = True
82122
else:
83123
yield item
124+
expanded_any = True
84125
else:
85126
try:
86127
extracted = dpath.get(parent_record, expand_path)
87128
except KeyError:
88-
return
89-
if not isinstance(extracted, list):
90-
return
91-
for item in extracted:
92-
if isinstance(item, dict):
93-
expanded_record = dict(item)
94-
if self.remain_original_record:
95-
expanded_record["original_record"] = parent_record
96-
yield expanded_record
97-
else:
98-
yield item
129+
extracted = None
130+
131+
if isinstance(extracted, list):
132+
for item in extracted:
133+
if isinstance(item, dict):
134+
expanded_record = dict(item)
135+
self._apply_parent_context(parent_record, expanded_record)
136+
yield expanded_record
137+
expanded_any = True
138+
else:
139+
yield item
140+
expanded_any = True
141+
142+
if not expanded_any and self.on_no_records == "emit_parent":
143+
yield parent_record
144+
145+
def _apply_parent_context(
146+
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
147+
) -> None:
148+
"""Apply parent context to a child record."""
149+
if self.remain_original_record:
150+
child_record["original_record"] = parent_record
151+
152+
for field_mapping in self.parent_fields_to_copy:
153+
field_mapping.copy_field(parent_record, child_record)

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 80 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -487,23 +487,24 @@ class ResponseToFileExtractor(BaseModel):
487487
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
488488

489489

490-
class RecordExpander(BaseModel):
491-
type: Literal["RecordExpander"]
492-
expand_records_from_field: List[str] = Field(
490+
class OnNoRecords(Enum):
491+
skip = "skip"
492+
emit_parent = "emit_parent"
493+
494+
495+
class ParentFieldMapping(BaseModel):
496+
type: Literal["ParentFieldMapping"]
497+
source_field_path: List[str] = Field(
493498
...,
494-
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.",
495-
examples=[
496-
["lines", "data"],
497-
["items"],
498-
["nested", "array"],
499-
["sections", "*", "items"],
500-
],
501-
title="Expand Records From Field",
499+
description="Path to the field in the parent record to copy.",
500+
examples=[["id"], ["created"], ["metadata", "timestamp"]],
501+
title="Source Field Path",
502502
)
503-
remain_original_record: Optional[bool] = Field(
504-
False,
505-
description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.',
506-
title="Remain Original Record",
503+
target_field: str = Field(
504+
...,
505+
description="Name of the field in the child record where the value will be copied.",
506+
examples=["parent_id", "subscription_updated"],
507+
title="Target Field",
507508
)
508509
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
509510

@@ -2039,23 +2040,33 @@ class DefaultPaginator(BaseModel):
20392040
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20402041

20412042

2042-
class DpathExtractor(BaseModel):
2043-
type: Literal["DpathExtractor"]
2044-
field_path: List[str] = Field(
2043+
class RecordExpander(BaseModel):
2044+
type: Literal["RecordExpander"]
2045+
expand_records_from_field: List[str] = Field(
20452046
...,
2046-
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
2047+
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.",
20472048
examples=[
2048-
["data"],
2049-
["data", "records"],
2050-
["data", "{{ parameters.name }}"],
2051-
["data", "*", "record"],
2049+
["lines", "data"],
2050+
["items"],
2051+
["nested", "array"],
2052+
["sections", "*", "items"],
20522053
],
2053-
title="Field Path",
2054+
title="Expand Records From Field",
20542055
)
2055-
record_expander: Optional[RecordExpander] = Field(
2056+
remain_original_record: Optional[bool] = Field(
2057+
False,
2058+
description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.',
2059+
title="Remain Original Record",
2060+
)
2061+
on_no_records: Optional[OnNoRecords] = Field(
2062+
OnNoRecords.skip,
2063+
description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.',
2064+
title="On No Records",
2065+
)
2066+
parent_fields_to_copy: Optional[List[ParentFieldMapping]] = Field(
20562067
None,
2057-
description="Optional component to expand records by extracting items from nested array fields.",
2058-
title="Record Expander",
2068+
description="List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.",
2069+
title="Parent Fields To Copy",
20592070
)
20602071
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20612072

@@ -2118,27 +2129,6 @@ class ListPartitionRouter(BaseModel):
21182129
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
21192130

21202131

2121-
class RecordSelector(BaseModel):
2122-
type: Literal["RecordSelector"]
2123-
extractor: Union[DpathExtractor, CustomRecordExtractor]
2124-
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
2125-
None,
2126-
description="Responsible for filtering records to be emitted by the Source.",
2127-
title="Record Filter",
2128-
)
2129-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
2130-
None,
2131-
description="Responsible for normalization according to the schema.",
2132-
title="Schema Normalization",
2133-
)
2134-
transform_before_filtering: Optional[bool] = Field(
2135-
None,
2136-
description="If true, transformation will be applied before record filtering.",
2137-
title="Transform Before Filtering",
2138-
)
2139-
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2140-
2141-
21422132
class PaginationReset(BaseModel):
21432133
type: Literal["PaginationReset"]
21442134
action: Action1
@@ -2259,6 +2249,27 @@ class Config:
22592249
)
22602250

22612251

2252+
class DpathExtractor(BaseModel):
2253+
type: Literal["DpathExtractor"]
2254+
field_path: List[str] = Field(
2255+
...,
2256+
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
2257+
examples=[
2258+
["data"],
2259+
["data", "records"],
2260+
["data", "{{ parameters.name }}"],
2261+
["data", "*", "record"],
2262+
],
2263+
title="Field Path",
2264+
)
2265+
record_expander: Optional[RecordExpander] = Field(
2266+
None,
2267+
description="Optional component to expand records by extracting items from nested array fields.",
2268+
title="Record Expander",
2269+
)
2270+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2271+
2272+
22622273
class ZipfileDecoder(BaseModel):
22632274
class Config:
22642275
extra = Extra.allow
@@ -2271,6 +2282,27 @@ class Config:
22712282
)
22722283

22732284

2285+
class RecordSelector(BaseModel):
2286+
type: Literal["RecordSelector"]
2287+
extractor: Union[DpathExtractor, CustomRecordExtractor]
2288+
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
2289+
None,
2290+
description="Responsible for filtering records to be emitted by the Source.",
2291+
title="Record Filter",
2292+
)
2293+
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
2294+
None,
2295+
description="Responsible for normalization according to the schema.",
2296+
title="Schema Normalization",
2297+
)
2298+
transform_before_filtering: Optional[bool] = Field(
2299+
None,
2300+
description="If true, transformation will be applied before record filtering.",
2301+
title="Transform Before Filtering",
2302+
)
2303+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2304+
2305+
22742306
class ConfigMigration(BaseModel):
22752307
type: Literal["ConfigMigration"]
22762308
description: Optional[str] = Field(

0 commit comments

Comments
 (0)