Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4055,6 +4055,11 @@ definitions:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
create_or_update:
Comment thread
lazebnyi marked this conversation as resolved.
title: Create or Update
description: Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -4106,6 +4111,12 @@ definitions:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
default_values:
title: Default Values
description: A list of default values, each matching the structure expected from the parsed component value.
type: array
items:
type: object
$parameters:
type: object
additionalProperties: true
Expand All @@ -4117,7 +4128,11 @@ definitions:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
anyOf:
- type: array
items:
"$ref": "#/definitions/StreamConfig"
- "$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,11 @@ class ComponentMappingDefinition(BaseModel):
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
create_or_update: Optional[bool] = Field(
False,
description="Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.",
title="Create or Update",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -1489,12 +1494,17 @@ class StreamConfig(BaseModel):
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
default_values: Optional[List[Dict[str, Any]]] = Field(
None,
description="A list of default values, each matching the structure expected from the parsed component value.",
title="Default Values",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
stream_config: Union[List[StreamConfig], StreamConfig]
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3754,6 +3754,7 @@ def create_components_mapping_definition(
field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
create_or_update=model.create_or_update,
parameters=model.parameters or {},
)

Expand Down Expand Up @@ -3800,16 +3801,24 @@ def create_stream_config(

return StreamConfig(
configs_pointer=model_configs_pointer,
default_values=model.default_values,
parameters=model.parameters or {},
)

def create_config_components_resolver(
self, model: ConfigComponentsResolverModel, config: Config
) -> Any:
stream_config = self._create_component_from_model(
model.stream_config, config=config, parameters=model.parameters or {}
model_stream_configs = (
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
)

stream_configs = [
self._create_component_from_model(
stream_config, config=config, parameters=model.parameters or {}
)
for stream_config in model_stream_configs
]

components_mapping = [
self._create_component_from_model(
model=components_mapping_definition_model,
Expand All @@ -3822,7 +3831,7 @@ def create_config_components_resolver(
]

return ConfigComponentsResolver(
stream_config=stream_config,
stream_configs=stream_configs,
config=config,
components_mapping=components_mapping,
parameters=model.parameters or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ComponentMappingDefinition:
value: Union["InterpolatedString", str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@dataclass(frozen=True)
Expand All @@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
value: "InterpolatedString"
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Union
from itertools import product
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union

import dpath
import yaml
from typing_extensions import deprecated
from yaml.parser import ParserError

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
Expand All @@ -28,6 +31,7 @@ class StreamConfig:

configs_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
default_values: Optional[List[Any]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.configs_pointer = [
Expand All @@ -48,7 +52,7 @@ class ConfigComponentsResolver(ComponentsResolver):
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
"""

stream_config: StreamConfig
stream_configs: List[StreamConfig]
config: Config
components_mapping: List[ComponentMappingDefinition]
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -82,6 +86,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
field_path=field_path,
value=interpolated_value,
value_type=component_mapping.value_type,
create_or_update=component_mapping.create_or_update,
parameters=parameters,
)
)
Expand All @@ -90,18 +95,45 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
)

@staticmethod
def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]:
"""Collapse a combination of ``(idx, elem)`` into one config dict."""
result: Dict[str, Any] = {}
for config_index, (elem_index, elem) in enumerate(combo):
if isinstance(elem, dict):
result.update(elem)
else:
# keep non-dict values under an artificial name
result.setdefault(f"source_config_{config_index}", (elem_index, elem))
return result

@property
def _stream_config(self) -> Iterable[Mapping[str, Any]]:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self.stream_config.configs_pointer
def _stream_config(self) -> List[Dict[str, Any]]:
"""
Build every unique stream-configuration combination defined by
each ``StreamConfig`` and any ``default_values``.
"""
all_indexed_streams = []
for stream_config in self.stream_configs:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in stream_config.configs_pointer
]
stream_configs_raw = dpath.get(dict(self.config), path, default=[])
stream_configs = (
list(stream_configs_raw)
if isinstance(stream_configs_raw, list)
else [stream_configs_raw]
)

if stream_config.default_values:
stream_configs.extend(stream_config.default_values)

all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)])
return [
self._merge_combination(combo) # type: ignore[arg-type]
for combo in product(*all_indexed_streams)
]
stream_config = dpath.get(dict(self.config), path, default=[])

if not isinstance(stream_config, list):
stream_config = [stream_config]

return stream_config

def resolve_components(
self, stream_template_config: Dict[str, Any]
Expand Down Expand Up @@ -130,7 +162,27 @@ def resolve_components(
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
parsed_value = self._parse_yaml_if_possible(value)
updated = dpath.set(updated_config, path, parsed_value)

dpath.set(updated_config, path, value)
if parsed_value and not updated and resolved_component.create_or_update:
dpath.new(updated_config, path, parsed_value)

yield updated_config

@staticmethod
def _parse_yaml_if_possible(value: Any) -> Any:
"""
Try to turn value into a Python object by YAML-parsing it.

* If value is a `str` and can be parsed by `yaml.safe_load`,
return the parsed result.
* If parsing fails (`yaml.parser.ParserError`) – or value is not
a string at all – return the original value unchanged.
"""
if isinstance(value, str):
try:
return yaml.safe_load(value)
except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML
return value
Comment thread
lazebnyi marked this conversation as resolved.
return value
Loading
Loading