|
| 1 | +# |
| 2 | +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. |
| 3 | +# |
| 4 | + |
| 5 | +from dataclasses import dataclass |
| 6 | +from typing import Any, List, Mapping, Union |
| 7 | + |
| 8 | +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString |
| 9 | +from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import ConfigTransformation |
| 10 | + |
| 11 | + |
| 12 | +@dataclass |
| 13 | +class RemapField(ConfigTransformation): |
| 14 | + """ |
| 15 | + Transformation that remaps a field's value to another value based on a static map. |
| 16 | + """ |
| 17 | + |
| 18 | + map: Mapping[str, Any] |
| 19 | + field_path: List[Union[InterpolatedString, str]] |
| 20 | + |
| 21 | + def __post_init__(self) -> None: |
| 22 | + self._field_path = [ |
| 23 | + InterpolatedString.create(path, parameters={}) for path in self.field_path |
| 24 | + ] |
| 25 | + for path_index in range(len(self.field_path)): |
| 26 | + if isinstance(self.field_path[path_index], str): |
| 27 | + self._field_path[path_index] = InterpolatedString.create( |
| 28 | + self.field_path[path_index], parameters={} |
| 29 | + ) |
| 30 | + |
| 31 | + def transform( |
| 32 | + self, |
| 33 | + config: Mapping[str, Any], |
| 34 | + ) -> None: |
| 35 | + """ |
| 36 | + Transforms a config by remapping a field value based on the provided map. |
| 37 | + If the original value is found in the map, it's replaced with the mapped value. |
| 38 | + If the value is not in the map, the field remains unchanged. |
| 39 | +
|
| 40 | + :param config: The user-provided configuration to be transformed |
| 41 | + """ |
| 42 | + path_components = [path.eval(config) for path in self._field_path] |
| 43 | + |
| 44 | + current = config |
| 45 | + for i, component in enumerate(path_components[:-1]): |
| 46 | + if component not in current: |
| 47 | + return |
| 48 | + current = current[component] |
| 49 | + |
| 50 | + if not isinstance(current, Mapping): |
| 51 | + return |
| 52 | + |
| 53 | + field_name = path_components[-1] |
| 54 | + |
| 55 | + if field_name in current and current[field_name] in self.map: |
| 56 | + current[field_name] = self.map[current[field_name]] |
0 commit comments