Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,13 @@
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaLoader,
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
CachingSchemaLoaderDecorator,
)
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
from airbyte_cdk.sources.declarative.stream_slicers import (
Expand Down Expand Up @@ -2095,13 +2099,7 @@ def create_default_stream(
if isinstance(retriever, AsyncRetriever):
stream_slicer = retriever.stream_slicer

schema_loader: Union[
CompositeSchemaLoader,
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
]
schema_loader: SchemaLoader
if model.schema_loader and isinstance(model.schema_loader, list):
nested_schema_loaders = [
self._create_component_from_model(model=nested_schema_loader, config=config)
Expand All @@ -2120,6 +2118,7 @@ def create_default_stream(
if "name" not in options:
options["name"] = model.name
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
schema_loader = CachingSchemaLoaderDecorator(schema_loader)

stream_name = model.name or ""
return DefaultStream(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any, Mapping, Optional

from airbyte_cdk.sources.declarative.schema import SchemaLoader


class CachingSchemaLoaderDecorator(SchemaLoader):
def __init__(self, schema_loader: SchemaLoader):
self._decorated = schema_loader
self._loaded_schema: Optional[Mapping[str, Any]] = None

def get_json_schema(self) -> Mapping[str, Any]:
if self._loaded_schema is None:
self._loaded_schema = self._decorated.get_json_schema()

return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ def get_total_records(self) -> int:
return self.total_record_counter


class SchemaLoaderCachingDecorator(SchemaLoader):
def __init__(self, schema_loader: SchemaLoader):
self._decorated = schema_loader
self._loaded_schema: Optional[Mapping[str, Any]] = None

def get_json_schema(self) -> Mapping[str, Any]:
if self._loaded_schema is None:
self._loaded_schema = self._decorated.get_json_schema()

return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated


class DeclarativePartitionFactory:
def __init__(
self,
Expand All @@ -58,7 +46,7 @@ def __init__(
In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe.
"""
self._stream_name = stream_name
self._schema_loader = SchemaLoaderCachingDecorator(schema_loader)
self._schema_loader = schema_loader
self._retriever = retriever
self._message_repository = message_repository
self._max_records_limit = max_records_limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,12 @@
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
CachingSchemaLoaderDecorator,
)
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
SchemaLoaderCachingDecorator,
)
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
Expand Down Expand Up @@ -5107,7 +5106,7 @@ def test_create_stream_with_multiple_schema_loaders():
def get_schema_loader(stream: DefaultStream):
assert isinstance(
stream._stream_partition_generator._partition_factory._schema_loader,
SchemaLoaderCachingDecorator,
CachingSchemaLoaderDecorator,
)
return stream._stream_partition_generator._partition_factory._schema_loader._decorated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from unittest import TestCase
from unittest.mock import Mock

from airbyte_cdk.sources.declarative.schema import SchemaLoader
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
CachingSchemaLoaderDecorator,
)


class CachingSchemaLoaderDecoratorTest(TestCase):
def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self):
decorated = Mock(spec=SchemaLoader)
schema_loader = CachingSchemaLoaderDecorator(decorated)

schema_loader.get_json_schema()
schema_loader.get_json_schema()
schema_loader.get_json_schema()

assert decorated.get_json_schema.call_count == 1
Loading