Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6694cd1
implement validators and transformer
pnilan May 6, 2025
4dbd363
create config transformations
pnilan May 6, 2025
03e776c
remove unnecessary validation strategies
pnilan May 7, 2025
41f376b
chore: format code
pnilan May 7, 2025
454cb78
add tests for dpath validator
pnilan May 7, 2025
54f9f9f
add predicate validator tests
pnilan May 7, 2025
7881f9f
add tests for RemapField
pnilan May 8, 2025
f8c252b
create tests for ValidateAdheresToSchema
pnilan May 8, 2025
c492b81
chore: type check
pnilan May 8, 2025
48e5ab0
chore: lint
pnilan May 8, 2025
3623325
add test for json strings
pnilan May 8, 2025
dad6100
fix errant inclusion
pnilan May 8, 2025
565b709
add json string parsing to ValidateAdheresToSchema
pnilan May 8, 2025
01415e2
chore: lint
pnilan May 8, 2025
7149962
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
953ad41
Merge branch 'main' into pnilan/feat/implement-validators
pnilan May 8, 2025
f72efc0
fix assertions
pnilan May 8, 2025
7927a14
remove re-raise
pnilan May 8, 2025
cf1b01c
update tests and error handling for dpath validator
pnilan May 9, 2025
4727b28
fix predicate validator test
pnilan May 9, 2025
711384c
implement config transformations: AddFields and RemoveFields`
pnilan May 13, 2025
fb7d1e9
fix module and classname conflicts
pnilan May 13, 2025
c64e588
chore: lint
pnilan May 13, 2025
01cf5a6
update remap to handle interpolated keys/values
pnilan May 13, 2025
a2dc105
chore: format
pnilan May 13, 2025
833d9e7
update transformations per comments
pnilan May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from .remap_field import RemapField

__all__ = ["RemapField"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Dict


class ConfigTransformation(ABC):
"""
Implementations of this class define transformations that can be applied to source configurations.
"""

@abstractmethod
def transform(
self,
config: Dict[str, Any],
Comment thread
pnilan marked this conversation as resolved.
Outdated
) -> None:
"""
Transform a configuration by adding, deleting, or mutating fields directly from the config reference passed in argument.

:param config: The user-provided configuration to be transformed
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, List, Mapping, MutableMapping, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
ConfigTransformation,
)


@dataclass
class RemapField(ConfigTransformation):
"""
Transformation that remaps a field's value to another value based on a static map.
"""

map: Mapping[str, Any]
Comment thread
pnilan marked this conversation as resolved.
field_path: List[Union[InterpolatedString, str]]

def __post_init__(self) -> None:
if not self.field_path:
raise Exception("field_path cannot be empty.")
self._field_path = [
InterpolatedString.create(path, parameters={}) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters={}
)

def transform(
self,
config: MutableMapping[str, Any],
) -> None:
"""
Transforms a config by remapping a field value based on the provided map.
If the original value is found in the map, it's replaced with the mapped value.
If the value is not in the map, the field remains unchanged.

:param config: The user-provided configuration to be transformed
"""
path_components = [path.eval(config) for path in self._field_path]

current = config
for i, component in enumerate(path_components[:-1]):
if component not in current:
return
current = current[component]

if not isinstance(current, Mapping):
return

field_name = path_components[-1]

if field_name in current and current[field_name] in self.map:
current[field_name] = self.map[current[field_name]]
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/declarative/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.validators.dpath_validator import DpathValidator
from airbyte_cdk.sources.declarative.validators.predicate_validator import PredicateValidator
from airbyte_cdk.sources.declarative.validators.validate_adheres_to_schema import (
ValidateAdheresToSchema,
)
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
from airbyte_cdk.sources.declarative.validators.validator import Validator

__all__ = [
"Validator",
"DpathValidator",
"ValidationStrategy",
"ValidateAdheresToSchema",
"PredicateValidator",
]
65 changes: 65 additions & 0 deletions airbyte_cdk/sources/declarative/validators/dpath_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, List, Union

import dpath.util

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
from airbyte_cdk.sources.declarative.validators.validator import Validator


@dataclass
class DpathValidator(Validator):
"""
Validator that extracts a value at a specific path in the input data
and applies a validation strategy to it.
"""

field_path: List[Union[InterpolatedString, str]]
strategy: ValidationStrategy

def __post_init__(self) -> None:
self._field_path = [
InterpolatedString.create(path, parameters={}) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters={}
)

def validate(self, input_data: dict[str, Any]) -> None:
"""
Extracts the value at the specified path and applies the validation strategy.

:param input_data: Dictionary containing the data to validate
:raises ValueError: If the path doesn't exist or validation fails
"""
path = [path.eval({}) for path in self._field_path]

if len(path) == 0:
raise ValueError("Field path is empty")

if "*" in path:
try:
values = dpath.values(input_data, path)
except KeyError as e:
raise KeyError(f"Error validating path '{self.field_path}': {e}")
for value in values:
try:
self.strategy.validate(value)
except Exception as e:
raise ValueError(f"Error validating value '{value}': {e}")
Comment thread
pnilan marked this conversation as resolved.
Outdated
else:
try:
value = dpath.get(input_data, path)
except KeyError as e:
raise KeyError(f"Error validating path '{self.field_path}': {e}")
try:
self.strategy.validate(value)
except Exception as e:
raise ValueError(f"Error validating value '{value}': {e}")
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/validators/predicate_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any

from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy


@dataclass
class PredicateValidator:
"""
Validator that applies a validation strategy to a value.
"""

value: Any
strategy: ValidationStrategy

def validate(self) -> None:
"""
Applies the validation strategy to the value.

:raises ValueError: If validation fails
"""
self.strategy.validate(self.value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
import json
from typing import Any, Mapping

import jsonschema

from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
Comment thread
pnilan marked this conversation as resolved.
Outdated


@dataclass
class ValidateAdheresToSchema(ValidationStrategy):
"""
Validates that a value adheres to a specified JSON schema.
"""

schema: Mapping[str, Any]

def validate(self, value: Any) -> None:
"""
Validates the value against the JSON schema.

:param value: The value to validate
:raises ValueError: If the value does not adhere to the schema
"""

if isinstance(value, str):
try:
value = json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON string: {value}") from e

try:
jsonschema.validate(instance=value, schema=self.schema)
except jsonschema.ValidationError as e:
raise ValueError(f"JSON schema validation error: {e.message}")
22 changes: 22 additions & 0 deletions airbyte_cdk/sources/declarative/validators/validation_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any


class ValidationStrategy(ABC):
"""
Base class for validation strategies.
"""

@abstractmethod
def validate(self, value: Any) -> None:
"""
Validates a value according to a specific strategy.

:param value: The value to validate
:raises ValueError: If validation fails
"""
pass
18 changes: 18 additions & 0 deletions airbyte_cdk/sources/declarative/validators/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any


class Validator(ABC):
@abstractmethod
def validate(self, input_data: Any) -> None:
"""
Validates the input data.

:param input_data: The data to validate
:raises ValueError: If validation fails
"""
pass
Loading
Loading