Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,12 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_filter:
title: Schema Filter
description: Responsible for filtering fields to be added to json schema.
anyOf:
- "$ref": "#/definitions/RecordFilter"
- "$ref": "#/definitions/CustomRecordFilter"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2164,7 +2164,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2414,6 +2414,9 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None, description="placeholder", title="Schema Filter"
)
schema_transformations: Optional[
List[
Union[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2432,10 +2432,14 @@ def create_dynamic_schema_loader(
schema_type_identifier = self._create_component_from_model(
model.schema_type_identifier, config=config, parameters=model.parameters or {}
)
schema_filter = self._create_component_from_model(
model.schema_filter, config=config, parameters=model.parameters or {}
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_transformations=schema_transformations,
schema_filter=schema_filter,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ComponentMappingDefinition:
value: Union["InterpolatedString", str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@dataclass(frozen=True)
Expand All @@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
value: "InterpolatedString"
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down
23 changes: 19 additions & 4 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dpath
from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
Expand Down Expand Up @@ -126,6 +127,7 @@ class DynamicSchemaLoader(SchemaLoader):
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
schema_filter: Optional[RecordFilter] = None

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -151,20 +153,18 @@ def get_json_schema(self) -> Mapping[str, Any]:
)
properties[key] = value

transformed_properties = self._transform(properties, {})
filtered_transformed_properties = self._transform(self._filter(properties))

return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": transformed_properties,
"properties": filtered_transformed_properties,
}

def _transform(
self,
properties: Mapping[str, Any],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
Expand All @@ -173,6 +173,21 @@ def _transform(
)
return properties

def _filter(
self,
properties: Mapping[str, Any],
) -> Mapping[str, Any]:
if not self.schema_filter:
return properties

filtered_properties = {}
for item in self.schema_filter.filter_records(
({k: v} for k, v in properties.items()),
{},
):
filtered_properties.update(item)
return filtered_properties
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _get_key(
self,
raw_schema: MutableMapping[str, Any],
Expand Down
Loading