diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4809f5151..fdaf26bba 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -551,9 +551,13 @@ DynamicSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader, + SchemaLoader, SchemaTypeIdentifier, TypesMap, ) +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, +) from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec from airbyte_cdk.sources.declarative.stream_slicers import ( @@ -2095,13 +2099,7 @@ def create_default_stream( if isinstance(retriever, AsyncRetriever): stream_slicer = retriever.stream_slicer - schema_loader: Union[ - CompositeSchemaLoader, - DefaultSchemaLoader, - DynamicSchemaLoader, - InlineSchemaLoader, - JsonFileSchemaLoader, - ] + schema_loader: SchemaLoader if model.schema_loader and isinstance(model.schema_loader, list): nested_schema_loaders = [ self._create_component_from_model(model=nested_schema_loader, config=config) @@ -2120,6 +2118,7 @@ def create_default_stream( if "name" not in options: options["name"] = model.name schema_loader = DefaultSchemaLoader(config=config, parameters=options) + schema_loader = CachingSchemaLoaderDecorator(schema_loader) stream_name = model.name or "" return DefaultStream( diff --git a/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py b/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py new file mode 100644 index 000000000..3e8cda400 --- /dev/null +++ b/airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py @@ -0,0 +1,15 @@ +from typing import Any, Mapping, Optional + +from airbyte_cdk.sources.declarative.schema import SchemaLoader + + +class CachingSchemaLoaderDecorator(SchemaLoader): + def __init__(self, schema_loader: SchemaLoader): + self._decorated = schema_loader + self._loaded_schema: Optional[Mapping[str, Any]] = None + + def get_json_schema(self) -> Mapping[str, Any]: + if self._loaded_schema is None: + self._loaded_schema = self._decorated.get_json_schema() + + return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated diff --git a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py index 47c32d1cc..593b616ee 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py @@ -31,18 +31,6 @@ def get_total_records(self) -> int: return self.total_record_counter -class SchemaLoaderCachingDecorator(SchemaLoader): - def __init__(self, schema_loader: SchemaLoader): - self._decorated = schema_loader - self._loaded_schema: Optional[Mapping[str, Any]] = None - - def get_json_schema(self) -> Mapping[str, Any]: - if self._loaded_schema is None: - self._loaded_schema = self._decorated.get_json_schema() - - return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated - - class DeclarativePartitionFactory: def __init__( self, @@ -58,7 +46,7 @@ def __init__( In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe. """ self._stream_name = stream_name - self._schema_loader = SchemaLoaderCachingDecorator(schema_loader) + self._schema_loader = schema_loader self._retriever = retriever self._message_repository = message_repository self._max_records_limit = max_records_limit diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 4463358c0..591d47ae6 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -157,13 +157,12 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, +) from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader from airbyte_cdk.sources.declarative.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator -from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( - SchemaLoaderCachingDecorator, -) from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( @@ -5107,7 +5106,7 @@ def test_create_stream_with_multiple_schema_loaders(): def get_schema_loader(stream: DefaultStream): assert isinstance( stream._stream_partition_generator._partition_factory._schema_loader, - SchemaLoaderCachingDecorator, + CachingSchemaLoaderDecorator, ) return stream._stream_partition_generator._partition_factory._schema_loader._decorated diff --git a/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py new file mode 100644 index 000000000..9bc23f081 --- /dev/null +++ b/unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py @@ -0,0 +1,19 @@ +from unittest import TestCase +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.schema import SchemaLoader +from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( + CachingSchemaLoaderDecorator, +) + + +class CachingSchemaLoaderDecoratorTest(TestCase): + def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self): + decorated = Mock(spec=SchemaLoader) + schema_loader = CachingSchemaLoaderDecorator(decorated) + + schema_loader.get_json_schema() + schema_loader.get_json_schema() + schema_loader.get_json_schema() + + assert decorated.get_json_schema.call_count == 1