Skip to content

Commit ead1747

Browse files
committed
refactor(RecordExpander): Remove ParentFieldMapping to delegate to RecordTransformations
1 parent f6cf99c commit ead1747

6 files changed

Lines changed: 9 additions & 305 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1969,46 +1969,6 @@ definitions:
19691969
- skip
19701970
- emit_parent
19711971
default: skip
1972-
parent_fields_to_copy:
1973-
title: Parent Fields To Copy
1974-
description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.
1975-
type: array
1976-
items:
1977-
"$ref": "#/definitions/ParentFieldMapping"
1978-
$parameters:
1979-
type: object
1980-
additionalProperties: true
1981-
ParentFieldMapping:
1982-
title: Parent Field Mapping
1983-
description: Defines a mapping from a parent record field to a child record field.
1984-
type: object
1985-
required:
1986-
- type
1987-
- source_field_path
1988-
- target_field
1989-
properties:
1990-
type:
1991-
type: string
1992-
enum: [ParentFieldMapping]
1993-
source_field_path:
1994-
title: Source Field Path
1995-
description: Path to the field in the parent record to copy.
1996-
type: array
1997-
items:
1998-
type: string
1999-
interpolation_context:
2000-
- config
2001-
examples:
2002-
- ["id"]
2003-
- ["created"]
2004-
- ["metadata", "timestamp"]
2005-
target_field:
2006-
title: Target Field
2007-
description: Name of the field in the child record where the value will be copied.
2008-
type: string
2009-
examples:
2010-
- "parent_id"
2011-
- "subscription_updated"
20121972
$parameters:
20131973
type: object
20141974
additionalProperties: true

airbyte_cdk/sources/declarative/expanders/__init__.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from airbyte_cdk.sources.declarative.expanders.record_expander import (
6-
ParentFieldMapping,
7-
RecordExpander,
8-
)
5+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
96

10-
__all__ = ["ParentFieldMapping", "RecordExpander"]
7+
__all__ = ["RecordExpander"]

airbyte_cdk/sources/declarative/expanders/record_expander.py

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,23 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Iterable, Mapping, MutableMapping, Sequence
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Iterable, Mapping, MutableMapping
77

88
import dpath
99

1010
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1111
from airbyte_cdk.sources.types import Config
1212

1313

14-
@dataclass
15-
class ParentFieldMapping:
16-
"""Defines a mapping from a parent record field to a child record field."""
17-
18-
source_field_path: Sequence[str | InterpolatedString]
19-
target_field: str
20-
config: Config
21-
parameters: InitVar[Mapping[str, Any]]
22-
23-
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
24-
self._source_path = [
25-
InterpolatedString.create(path, parameters=parameters)
26-
for path in self.source_field_path
27-
]
28-
29-
def copy_field(
30-
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
31-
) -> None:
32-
"""Copy a field from parent record to child record."""
33-
source_path = [path.eval(self.config) for path in self._source_path]
34-
try:
35-
value = dpath.get(dict(parent_record), source_path)
36-
child_record[self.target_field] = value
37-
except KeyError:
38-
pass
39-
40-
4114
@dataclass
4215
class RecordExpander:
4316
"""Expands records by extracting items from a nested array field.
4417
4518
When configured, this component extracts items from a specified nested array path
46-
within each record and emits each item as a separate record. Optionally, the original
47-
parent record can be embedded in each expanded item for context preservation.
19+
within each record and emits each item as a separate record. Set `remain_original_record: true`
20+
to embed the full parent record under `original_record` in each expanded item when you need
21+
downstream transformations to access parent context.
4822
4923
The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
5024
When wildcards are used, items from all matched arrays are extracted and emitted.
@@ -67,10 +41,6 @@ class RecordExpander:
6741
- "*"
6842
- "items"
6943
on_no_records: emit_parent
70-
parent_fields_to_copy:
71-
- type: ParentFieldMapping
72-
source_field_path: ["id"]
73-
target_field: "parent_id"
7444
```
7545
7646
Attributes:
@@ -81,8 +51,6 @@ class RecordExpander:
8151
parent record in an "original_record" field. Defaults to False.
8252
on_no_records: Behavior when expansion produces no records. "skip" (default)
8353
emits nothing. "emit_parent" emits the original parent record unchanged.
84-
parent_fields_to_copy: List of field mappings to copy from parent to each
85-
expanded child record.
8654
config: The user-provided configuration as specified by the source's spec.
8755
"""
8856

@@ -91,7 +59,6 @@ class RecordExpander:
9159
parameters: InitVar[Mapping[str, Any]]
9260
remain_original_record: bool = False
9361
on_no_records: str = "skip"
94-
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
9562

9663
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9764
self._expand_path: list[InterpolatedString] | None = [
@@ -148,6 +115,3 @@ def _apply_parent_context(
148115
"""Apply parent context to a child record."""
149116
if self.remain_original_record:
150117
child_record["original_record"] = parent_record
151-
152-
for field_mapping in self.parent_fields_to_copy:
153-
field_mapping.copy_field(parent_record, child_record)

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -492,23 +492,6 @@ class OnNoRecords(Enum):
492492
emit_parent = "emit_parent"
493493

494494

495-
class ParentFieldMapping(BaseModel):
496-
type: Literal["ParentFieldMapping"]
497-
source_field_path: List[str] = Field(
498-
...,
499-
description="Path to the field in the parent record to copy.",
500-
examples=[["id"], ["created"], ["metadata", "timestamp"]],
501-
title="Source Field Path",
502-
)
503-
target_field: str = Field(
504-
...,
505-
description="Name of the field in the child record where the value will be copied.",
506-
examples=["parent_id", "subscription_updated"],
507-
title="Target Field",
508-
)
509-
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
510-
511-
512495
class ExponentialBackoffStrategy(BaseModel):
513496
type: Literal["ExponentialBackoffStrategy"]
514497
factor: Optional[Union[float, str]] = Field(
@@ -2063,11 +2046,6 @@ class RecordExpander(BaseModel):
20632046
description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.',
20642047
title="On No Records",
20652048
)
2066-
parent_fields_to_copy: Optional[List[ParentFieldMapping]] = Field(
2067-
None,
2068-
description="List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.",
2069-
title="Parent Fields To Copy",
2070-
)
20712049
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20722050

20732051

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
RecordSelector,
105105
ResponseToFileExtractor,
106106
)
107+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
107108
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
108109
from airbyte_cdk.sources.declarative.extractors.record_filter import (
109110
ClientSideIncrementalRecordFilterDecorator,
@@ -374,9 +375,6 @@
374375
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
375376
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
376377
)
377-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
378-
ParentFieldMapping as ParentFieldMappingModel,
379-
)
380378
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
381379
ParentStreamConfig as ParentStreamConfigModel,
382380
)
@@ -2347,25 +2345,12 @@ def create_record_expander(
23472345
config: Config,
23482346
**kwargs: Any,
23492347
) -> RecordExpander:
2350-
parent_field_mappings: list[ParentFieldMapping] = []
2351-
if model.parent_fields_to_copy:
2352-
for mapping_model in model.parent_fields_to_copy:
2353-
parent_field_mappings.append(
2354-
ParentFieldMapping(
2355-
source_field_path=mapping_model.source_field_path,
2356-
target_field=mapping_model.target_field,
2357-
config=config,
2358-
parameters=mapping_model.parameters or {},
2359-
)
2360-
)
2361-
23622348
return RecordExpander(
23632349
expand_records_from_field=model.expand_records_from_field,
23642350
config=config,
23652351
parameters=model.parameters or {},
23662352
remain_original_record=model.remain_original_record or False,
23672353
on_no_records=model.on_no_records.value if model.on_no_records else "skip",
2368-
parent_fields_to_copy=parent_field_mappings,
23692354
)
23702355

23712356
@staticmethod

0 commit comments

Comments
 (0)