Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1421,12 +1421,22 @@ definitions:
default: ""
schema_loader:
title: Schema Loader
description: Component used to retrieve the schema for the current stream.
description:
One or many schema loaders can be used to retrieve the schema for the current stream. When
multiple schema loaders are defined, schema properties will be merged together. Schema
loaders defined first taking precedence in the event of a conflict.
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
- type: array
items:
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
# TODO we have move the transformation to the RecordSelector level in the code but kept this here for
# compatibility reason. We should eventually move this to align with the code.
transformations:
Expand Down Expand Up @@ -4362,4 +4372,4 @@ interpolation:
regex: The regular expression to search for. It must include a capture group.
return_type: str
examples:
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2151,6 +2153,14 @@ class Config:
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
List[
Union[
InlineSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
]
],
]
] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
from airbyte_cdk.sources.declarative.transformations import (
Expand Down Expand Up @@ -1914,9 +1915,25 @@ def create_declarative_stream(
else:
state_transformations = []

if model.schema_loader:
schema_loader: Union[
CompositeSchemaLoader,
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
]
if model.schema_loader and isinstance(model.schema_loader, list):
nested_schema_loaders = [
self._create_component_from_model(model=nested_schema_loader, config=config)
for nested_schema_loader in model.schema_loader
]
schema_loader = CompositeSchemaLoader(
schema_loaders=nested_schema_loaders, parameters={}
)
elif model.schema_loader:
schema_loader = self._create_component_from_model(
model=model.schema_loader, config=config
model=model.schema_loader, # type: ignore # If defined, schema_loader is guaranteed not to be a list and will be one of the existing base models
config=config,
)
else:
options = model.parameters or {}
Expand Down
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/declarative/schema/composite_schema_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping

from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader


@dataclass
class CompositeSchemaLoader(SchemaLoader):
"""
Schema loader that consists of multiple schema loaders that are combined into a single
schema. Subsequent schemas do not overwrite existing values so the schema loaders with
a higher priority should be defined first.
"""

schema_loaders: List[SchemaLoader]
parameters: InitVar[Mapping[str, Any]]

def get_json_schema(self) -> Mapping[str, Any]:
combined_schema: Dict[str, Any] = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"properties": {},
}
for schema_loader in self.schema_loaders:
schema_properties = schema_loader.get_json_schema()["properties"]
combined_schema["properties"] = {**schema_properties, **combined_schema["properties"]}
return combined_schema
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
Expand Down Expand Up @@ -4561,3 +4562,71 @@ def test_create_property_chunking_invalid_property_limit_type():
component_definition=property_chunking_model,
config={},
)


def test_create_stream_with_multiple_schema_loaders():
content = """
retriever:
requester:
type: "HttpRequester"
path: "example"
record_selector:
extractor:
field_path: []
stream_A:
type: DeclarativeStream
name: "A"
primary_key: "id"
schema_loader:
- type: InlineSchemaLoader
schema:
"#/schemas/first_schema"
- type: InlineSchemaLoader
schema:
"#/schemas/second_schema"
$parameters:
retriever: "#/retriever"
url_base: "https://airbyte.io"
schemas:
first_schema:
$schema: "http://json-schema.org/draft-07/schema"
type:
- "null"
- object
additionalProperties: true
properties:
id:
description: The user ID
type:
- "null"
- string
second_schema:
$schema: "http://json-schema.org/draft-07/schema"
type:
- "null"
- object
additionalProperties: true
properties:
name:
description: The user name
type:
- "null"
- string
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
partition_router_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["stream_A"], {}
)

declarative_stream = factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=partition_router_manifest,
config=input_config,
)

schema_loader = declarative_stream.schema_loader
assert isinstance(schema_loader, CompositeSchemaLoader)
assert len(schema_loader.schema_loaders) == 2
assert isinstance(schema_loader.schema_loaders[0], InlineSchemaLoader)
assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader)
Loading
Loading