From 3281cdb6b8d1a95f84ceed9fba3ff20521f4bd00 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Fri, 24 Oct 2025 10:18:55 -0400 Subject: [PATCH 1/2] move schema loader decorator to model_to_component_factory --- .../parsers/model_to_component_factory.py | 12 ++++-------- .../schema/caching_schema_loader_decorator.py | 15 +++++++++++++++ .../declarative_partition_generator.py | 14 +------------- .../test_caching_schema_loader_decorator.py | 17 +++++++++++++++++ 4 files changed, 37 insertions(+), 21 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py create mode 100644 unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4809f5151..cd6d6ad34 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -552,8 +552,9 @@ InlineSchemaLoader, JsonFileSchemaLoader, SchemaTypeIdentifier, - TypesMap, + TypesMap, SchemaLoader, ) +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec from airbyte_cdk.sources.declarative.stream_slicers import ( @@ -2095,13 +2096,7 @@ def create_default_stream( if isinstance(retriever, AsyncRetriever): stream_slicer = retriever.stream_slicer - schema_loader: Union[ - CompositeSchemaLoader, - DefaultSchemaLoader, - DynamicSchemaLoader, - InlineSchemaLoader, - JsonFileSchemaLoader, - ] + schema_loader: SchemaLoader if model.schema_loader and isinstance(model.schema_loader, list): nested_schema_loaders = [ self._create_component_from_model(model=nested_schema_loader, config=config) @@ -2120,6 +2115,7 @@ def create_default_stream( if "name" not in options: options["name"] = model.name schema_loader = DefaultSchemaLoader(config=config, parameters=options) + schema_loader = CachingSchemaLoaderDecorator(schema_loader) stream_name = model.name or "" return DefaultStream( diff --git a/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py b/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py new file mode 100644 index 000000000..3e8cda400 --- /dev/null +++ b/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py @@ -0,0 +1,15 @@ +from typing import Any, Mapping, Optional + +from airbyte_cdk.sources.declarative.schema import SchemaLoader + + +class CachingSchemaLoaderDecorator(SchemaLoader): + def __init__(self, schema_loader: SchemaLoader): + self._decorated = schema_loader + self._loaded_schema: Optional[Mapping[str, Any]] = None + + def get_json_schema(self) -> Mapping[str, Any]: + if self._loaded_schema is None: + self._loaded_schema = self._decorated.get_json_schema() + + return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated diff --git a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py index 47c32d1cc..593b616ee 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py @@ -31,18 +31,6 @@ def get_total_records(self) -> int: return self.total_record_counter -class SchemaLoaderCachingDecorator(SchemaLoader): - def __init__(self, schema_loader: SchemaLoader): - self._decorated = schema_loader - self._loaded_schema: Optional[Mapping[str, Any]] = None - - def get_json_schema(self) -> Mapping[str, Any]: - if self._loaded_schema is None: - self._loaded_schema = self._decorated.get_json_schema() - - return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated - - class DeclarativePartitionFactory: def __init__( self, @@ -58,7 +46,7 @@ def __init__( In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe. """ self._stream_name = stream_name - self._schema_loader = SchemaLoaderCachingDecorator(schema_loader) + self._schema_loader = schema_loader self._retriever = retriever self._message_repository = message_repository self._max_records_limit = max_records_limit diff --git a/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py new file mode 100644 index 000000000..75303091a --- /dev/null +++ b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py @@ -0,0 +1,17 @@ +from unittest import TestCase +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.schema import SchemaLoader +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator + + +class CachingSchemaLoaderDecoratorTest(TestCase): + def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self): + decorated = Mock(spec=SchemaLoader) + schema_loader = CachingSchemaLoaderDecorator(decorated) + + schema_loader.get_json_schema() + schema_loader.get_json_schema() + schema_loader.get_json_schema() + + assert decorated.get_json_schema.call_count == 1 From d4fb78432955eb08109d421a1eb2c558dc698641 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Fri, 24 Oct 2025 10:25:19 -0400 Subject: [PATCH 2/2] fix test + format --- .../declarative/parsers/model_to_component_factory.py | 7 +++++-- .../parsers/test_model_to_component_factory.py | 9 ++++----- .../schema/test_caching_schema_loader_decorator.py | 4 +++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index cd6d6ad34..fdaf26bba 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -551,10 +551,13 @@ DynamicSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader, + SchemaLoader, SchemaTypeIdentifier, - TypesMap, SchemaLoader, + TypesMap, +) +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, ) -from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec from airbyte_cdk.sources.declarative.stream_slicers import ( diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 4463358c0..591d47ae6 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -157,13 +157,12 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, +) from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader from airbyte_cdk.sources.declarative.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator -from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( - SchemaLoaderCachingDecorator, -) from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( @@ -5107,7 +5106,7 @@ def test_create_stream_with_multiple_schema_loaders(): def get_schema_loader(stream: DefaultStream): assert isinstance( stream._stream_partition_generator._partition_factory._schema_loader, - SchemaLoaderCachingDecorator, + CachingSchemaLoaderDecorator, ) return stream._stream_partition_generator._partition_factory._schema_loader._decorated diff --git a/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py index 75303091a..9bc23f081 100644 --- a/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py +++ b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py @@ -2,7 +2,9 @@ from unittest.mock import Mock from airbyte_cdk.sources.declarative.schema import SchemaLoader -from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, +) class CachingSchemaLoaderDecoratorTest(TestCase):