Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6694cd1
implement validators and transformer
pnilan May 6, 2025
4dbd363
create config transformations
pnilan May 6, 2025
03e776c
remove unnecessary validation strategies
pnilan May 7, 2025
41f376b
chore: format code
pnilan May 7, 2025
454cb78
add tests for dpath validator
pnilan May 7, 2025
54f9f9f
add predicate validator tests
pnilan May 7, 2025
7881f9f
add tests for RemapField
pnilan May 8, 2025
f8c252b
create tests for ValidateAdheresToSchema
pnilan May 8, 2025
c492b81
chore: type check
pnilan May 8, 2025
48e5ab0
chore: lint
pnilan May 8, 2025
3623325
add test for json strings
pnilan May 8, 2025
dad6100
fix errant inclusion
pnilan May 8, 2025
565b709
add json string parsing to ValidateAdheresToSchema
pnilan May 8, 2025
01415e2
chore: lint
pnilan May 8, 2025
7149962
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
953ad41
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
f72efc0
fix assertions
pnilan May 8, 2025
7927a14
remove re-raise
pnilan May 8, 2025
cf1b01c
update tests and error handling for dpath validator
pnilan May 9, 2025
4727b28
fix predicate validator test
pnilan May 9, 2025
711384c
implement config transformations: AddFields and RemoveFields`
pnilan May 13, 2025
fb7d1e9
fix module and classname conflicts
pnilan May 13, 2025
c64e588
chore: lint
pnilan May 13, 2025
01cf5a6
update remap to handle interpolated keys/values
pnilan May 13, 2025
a2dc105
chore: format
pnilan May 13, 2025
833d9e7
update transformations per comments
pnilan May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from .add_fields import ConfigAddFields
from .remap_field import ConfigRemapField
from .remove_fields import ConfigRemoveFields

__all__ = ["ConfigRemapField", "ConfigAddFields", "ConfigRemoveFields"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass, field
from typing import Any, List, MutableMapping, Optional, Type, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)
from airbyte_cdk.sources.types import FieldPointer


@dataclass(frozen=True)
class AddedFieldDefinition:
"""Defines the field to add on a config"""

path: FieldPointer
value: Union[InterpolatedString, str]
value_type: Optional[Type[Any]] = None


@dataclass(frozen=True)
class ParsedAddFieldDefinition:
"""Defines the field to add on a config"""

path: FieldPointer
value: InterpolatedString
value_type: Optional[Type[Any]] = None


@dataclass
class ConfigAddFields(ConfigTransformation):
"""
Transformation which adds fields to a config. The path of the added field can be nested. Adding nested fields will create all
necessary parent objects (like mkdir -p).

This transformation has access to the config being transformed.

Examples of instantiating this transformation via YAML:
- type: ConfigAddFields
fields:
# hardcoded constant
- path: ["path"]
value: "static_value"

# nested path
- path: ["path", "to", "field"]
value: "static"

# from config
- path: ["derived_field"]
value: "{{ config.original_field }}"

# by supplying any valid Jinja template directive or expression
- path: ["two_times_two"]
value: "{{ 2 * 2 }}"

Attributes:
fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the config
"""

fields: List[AddedFieldDefinition]
condition: str = ""
_parsed_fields: List[ParsedAddFieldDefinition] = field(
init=False, repr=False, default_factory=list
)

def __post_init__(self) -> None:
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})

for add_field in self.fields:
if len(add_field.path) < 1:
raise ValueError(
f"Expected a non-zero-length path for the AddFields transformation {add_field}"
)

if not isinstance(add_field.value, InterpolatedString):
if not isinstance(add_field.value, str):
raise ValueError(
f"Expected a string value for the AddFields transformation: {add_field}"
)
else:
self._parsed_fields.append(
ParsedAddFieldDefinition(
add_field.path,
InterpolatedString.create(add_field.value, parameters={}),
value_type=add_field.value_type,
)
)
else:
self._parsed_fields.append(
ParsedAddFieldDefinition(
add_field.path,
add_field.value,
value_type=add_field.value_type,
)
)

def transform(
self,
config: MutableMapping[str, Any],
) -> None:
"""
Transforms a config by adding fields based on the provided field definitions.

:param config: The user-provided configuration to be transformed
"""
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types)
if not self.condition or self._filter_interpolator.eval(
config, value=value, path=parsed_field.path
):
dpath.new(config, parsed_field.path, value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, MutableMapping


class ConfigTransformation(ABC):
"""
Implementations of this class define transformations that can be applied to source configurations.
"""

@abstractmethod
def transform(
self,
config: MutableMapping[str, Any],
) -> None:
"""
Transform a configuration by adding, deleting, or mutating fields directly from the config reference passed in argument.

:param config: The user-provided configuration to be transformed
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass, field
from typing import Any, List, Mapping, MutableMapping, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)


@dataclass
class ConfigRemapField(ConfigTransformation):
"""
Transformation that remaps a field's value to another value based on a static map.
"""

map: Mapping[str, Any]
Comment thread
pnilan marked this conversation as resolved.
field_path: List[Union[InterpolatedString, str]]
config: Mapping[str, Any] = field(default_factory=dict)

def __post_init__(self) -> None:
if not self.field_path:
raise Exception("field_path cannot be empty.")
self._field_path = [
InterpolatedString.create(path, parameters={}) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters={}
)
self._map = InterpolatedMapping(self.map, parameters={})

def transform(
self,
config: MutableMapping[str, Any],
) -> None:
"""
Transforms a config by remapping a field value based on the provided map.
If the original value is found in the map, it's replaced with the mapped value.
If the value is not in the map, the field remains unchanged.

:param config: The user-provided configuration to be transformed
"""
path_components = [path.eval(config) for path in self._field_path]

current = config
for i, component in enumerate(path_components[:-1]):
if component not in current:
return
current = current[component]

if not isinstance(current, MutableMapping):
return

field_name = path_components[-1]

mapping = self._map.eval(config=self.config)

if field_name in current and current[field_name] in mapping:
current[field_name] = mapping[current[field_name]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, List, MutableMapping

import dpath
import dpath.exceptions

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)
from airbyte_cdk.sources.types import FieldPointer


@dataclass
class ConfigRemoveFields(ConfigTransformation):
"""
A transformation which removes fields from a config. The fields removed are designated using FieldPointers.
During transformation, if a field or any of its parents does not exist in the config, no error is thrown.

If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then
the object at that index is set to None rather than being entirely removed from the list.

It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}

Usage syntax:

```yaml
config_transformations:
- type: RemoveFields
field_pointers:
- ["path", "to", "field1"]
- ["path2"]
condition: "{{ config.some_flag }}" # Optional condition
```

Attributes:
field_pointers (List[FieldPointer]): pointers to the fields that should be removed
condition (str): Optional condition that determines if the fields should be removed
"""

field_pointers: List[FieldPointer]
condition: str = ""

def __post_init__(self) -> None:
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})

def transform(
self,
config: MutableMapping[str, Any],
) -> None:
"""
Transforms a config by removing fields based on the provided field pointers.

:param config: The user-provided configuration to be transformed
"""
if self.condition and not self._filter_interpolator.eval(config):
return

for pointer in self.field_pointers:
try:
dpath.delete(config, pointer)
except dpath.exceptions.PathNotFound:
pass
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/declarative/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.validators.dpath_validator import DpathValidator
from airbyte_cdk.sources.declarative.validators.predicate_validator import PredicateValidator
from airbyte_cdk.sources.declarative.validators.validate_adheres_to_schema import (
ValidateAdheresToSchema,
)
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
from airbyte_cdk.sources.declarative.validators.validator import Validator

__all__ = [
"Validator",
"DpathValidator",
"ValidationStrategy",
"ValidateAdheresToSchema",
"PredicateValidator",
]
59 changes: 59 additions & 0 deletions airbyte_cdk/sources/declarative/validators/dpath_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, List, Union

import dpath.util

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
from airbyte_cdk.sources.declarative.validators.validator import Validator


@dataclass
class DpathValidator(Validator):
"""
Validator that extracts a value at a specific path in the input data
and applies a validation strategy to it.
"""

field_path: List[Union[InterpolatedString, str]]
strategy: ValidationStrategy

def __post_init__(self) -> None:
self._field_path = [
InterpolatedString.create(path, parameters={}) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters={}
)

def validate(self, input_data: dict[str, Any]) -> None:
"""
Extracts the value at the specified path and applies the validation strategy.

:param input_data: Dictionary containing the data to validate
:raises ValueError: If the path doesn't exist or validation fails
"""
path = [path.eval({}) for path in self._field_path]

if len(path) == 0:
raise ValueError("Field path is empty")

if "*" in path:
try:
values = dpath.values(input_data, path)
for value in values:
self.strategy.validate(value)
except KeyError as e:
raise ValueError(f"Error validating path '{self.field_path}': {e}")
else:
try:
value = dpath.get(input_data, path)
self.strategy.validate(value)
except KeyError as e:
raise ValueError(f"Error validating path '{self.field_path}': {e}")
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/validators/predicate_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any

from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy


@dataclass
class PredicateValidator:
"""
Validator that applies a validation strategy to a value.
"""

value: Any
strategy: ValidationStrategy

def validate(self) -> None:
"""
Applies the validation strategy to the value.

:raises ValueError: If validation fails
"""
self.strategy.validate(self.value)
Loading
Loading