Skip to content

Commit 3281cdb

Browse files
author
maxime.c
committed
move schema loader decorator to model_to_component_factory
1 parent 26a9b98 commit 3281cdb

4 files changed

Lines changed: 37 additions & 21 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -552,8 +552,9 @@
552552
InlineSchemaLoader,
553553
JsonFileSchemaLoader,
554554
SchemaTypeIdentifier,
555-
TypesMap,
555+
TypesMap, SchemaLoader,
556556
)
557+
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator
557558
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
558559
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
559560
from airbyte_cdk.sources.declarative.stream_slicers import (
@@ -2095,13 +2096,7 @@ def create_default_stream(
20952096
if isinstance(retriever, AsyncRetriever):
20962097
stream_slicer = retriever.stream_slicer
20972098

2098-
schema_loader: Union[
2099-
CompositeSchemaLoader,
2100-
DefaultSchemaLoader,
2101-
DynamicSchemaLoader,
2102-
InlineSchemaLoader,
2103-
JsonFileSchemaLoader,
2104-
]
2099+
schema_loader: SchemaLoader
21052100
if model.schema_loader and isinstance(model.schema_loader, list):
21062101
nested_schema_loaders = [
21072102
self._create_component_from_model(model=nested_schema_loader, config=config)
@@ -2120,6 +2115,7 @@ def create_default_stream(
21202115
if "name" not in options:
21212116
options["name"] = model.name
21222117
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
2118+
schema_loader = CachingSchemaLoaderDecorator(schema_loader)
21232119

21242120
stream_name = model.name or ""
21252121
return DefaultStream(
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Any, Mapping, Optional
2+
3+
from airbyte_cdk.sources.declarative.schema import SchemaLoader
4+
5+
6+
class CachingSchemaLoaderDecorator(SchemaLoader):
7+
def __init__(self, schema_loader: SchemaLoader):
8+
self._decorated = schema_loader
9+
self._loaded_schema: Optional[Mapping[str, Any]] = None
10+
11+
def get_json_schema(self) -> Mapping[str, Any]:
12+
if self._loaded_schema is None:
13+
self._loaded_schema = self._decorated.get_json_schema()
14+
15+
return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,6 @@ def get_total_records(self) -> int:
3131
return self.total_record_counter
3232

3333

34-
class SchemaLoaderCachingDecorator(SchemaLoader):
35-
def __init__(self, schema_loader: SchemaLoader):
36-
self._decorated = schema_loader
37-
self._loaded_schema: Optional[Mapping[str, Any]] = None
38-
39-
def get_json_schema(self) -> Mapping[str, Any]:
40-
if self._loaded_schema is None:
41-
self._loaded_schema = self._decorated.get_json_schema()
42-
43-
return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated
44-
45-
4634
class DeclarativePartitionFactory:
4735
def __init__(
4836
self,
@@ -58,7 +46,7 @@ def __init__(
5846
In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe.
5947
"""
6048
self._stream_name = stream_name
61-
self._schema_loader = SchemaLoaderCachingDecorator(schema_loader)
49+
self._schema_loader = schema_loader
6250
self._retriever = retriever
6351
self._message_repository = message_repository
6452
self._max_records_limit = max_records_limit
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from unittest import TestCase
2+
from unittest.mock import Mock
3+
4+
from airbyte_cdk.sources.declarative.schema import SchemaLoader
5+
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import CachingSchemaLoaderDecorator
6+
7+
8+
class CachingSchemaLoaderDecoratorTest(TestCase):
9+
def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self):
10+
decorated = Mock(spec=SchemaLoader)
11+
schema_loader = CachingSchemaLoaderDecorator(decorated)
12+
13+
schema_loader.get_json_schema()
14+
schema_loader.get_json_schema()
15+
schema_loader.get_json_schema()
16+
17+
assert decorated.get_json_schema.call_count == 1

0 commit comments

Comments
 (0)