Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,33 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyTransformation:
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
type: object
required:
- type
properties:
type:
type: string
enum: [ KeyTransformation ]
prefix:
title: Key Prefix
description: Prefix to add for object keys. If not provided original keys remain unchanged.
type: string
examples:
- flattened_
suffix:
title: Key Suffix
description: Suffix to add for object keys. If not provided original keys remain unchanged.
type: string
examples:
- _flattened
custom:
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated
title: Custom Transformation for keys
description: Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.
type: string
examples:
- flattened_{{ key }}
DpathFlattenFields:
title: Dpath Flatten Fields
description: A transformation that flatten field values to the to top of the record.
Expand Down Expand Up @@ -2335,6 +2362,11 @@ definitions:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
key_transformation:
title: Key transformation
description: Transformation for object keys. If not provided, original key will be used.
type: object
"$ref": "#/definitions/KeyTransformation"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,33 @@ class FlattenFields(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyTransformation(BaseModel):
prefix: Optional[Union[str, None]] = Field(
None,
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"flattened_",
],
title="Key Prefix",
)
suffix: Optional[Union[str, None]] = Field(
None,
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"_flattened",
],
title="Key Suffix",
)
custom: Optional[Union[str, None]] = Field(
None,
description="Custom transformation. Can be used with {{ key }} as a original value for key name. If not provided original keys remain unchanged.",
examples=[
"flattened_{{ key }}",
],
title="Custom Transformation for keys",
)


class DpathFlattenFields(BaseModel):
type: Literal["DpathFlattenFields"]
field_path: List[str] = Field(
Expand All @@ -897,6 +924,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
None,
description="Transformation for object keys. If not provided, original key will be used.",
title="Key transformation",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
DpathFlattenFields,
KeyTransformation,
)
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
Expand Down Expand Up @@ -790,13 +791,23 @@ def create_dpath_flatten_fields(
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
) -> DpathFlattenFields:
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
key_transformation = (
KeyTransformation(
prefix=model.key_transformation.prefix,
suffix=model.key_transformation.suffix,
custom=model.key_transformation.custom,
)
if model.key_transformation is not None
else None
)
return DpathFlattenFields(
config=config,
field_path=model_field_path,
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
key_transformation=key_transformation,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass(frozen=True)
class KeyTransformation:
prefix: Union[InterpolatedString, str, None] = None
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated
suffix: Union[InterpolatedString, str, None] = None
custom: Union[InterpolatedString, str, None] = None

Comment thread
darynaishchenko marked this conversation as resolved.

@dataclass
class DpathFlattenFields(RecordTransformation):
"""
Expand All @@ -16,6 +23,7 @@ class DpathFlattenFields(RecordTransformation):
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.
key_transformation: KeyTransformation = None how to transform extracted object keys

"""

Expand All @@ -24,17 +32,43 @@ class DpathFlattenFields(RecordTransformation):
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False
key_transformation: Union[KeyTransformation, None] = None
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
self._field_path = [
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters=parameters
self.field_path[path_index], parameters=self._parameters
)

def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
if self.key_transformation:
if self.key_transformation.prefix:
prefix = InterpolatedString.create(
self.key_transformation.prefix, parameters=self._parameters
).eval(config=self.config)
extracted = {f"{prefix}{key}": value for key, value in extracted.items()}

if self.key_transformation.suffix:
suffix = InterpolatedString.create(
Comment thread
darynaishchenko marked this conversation as resolved.
Outdated
self.key_transformation.suffix, parameters=self._parameters
).eval(config=self.config)
extracted = {f"{key}{suffix}": value for key, value in extracted.items()}

if self.key_transformation.custom:
updated_extracted = {}
for key, value in extracted.items():
updated_key = InterpolatedString.create(
self.key_transformation.custom, parameters=self._parameters
).eval(key=key, config=self.config)
updated_extracted[updated_key] = value
extracted = updated_extracted
return extracted
Comment thread
darynaishchenko marked this conversation as resolved.

def transform(
self,
record: Dict[str, Any],
Expand All @@ -50,6 +84,8 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
extracted = self._apply_key_transformation(extracted)

if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
Expand Down
Loading
Loading