-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathdefault_schema_loader.py
More file actions
47 lines (37 loc) · 1.79 KB
/
default_schema_loader.py
File metadata and controls
47 lines (37 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
@dataclass
class DefaultSchemaLoader(SchemaLoader):
"""
Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
Attributes:
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
"""
config: Config
parameters: InitVar[Mapping[str, Any]]
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
self.default_loader = JsonFileSchemaLoader(parameters=parameters, config=self.config)
def get_json_schema(self) -> Mapping[str, Any]:
"""
Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
:return: The empty schema
"""
try:
return self.default_loader.get_json_schema()
except (OSError, ValueError):
# A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
# runtime options stores stream name 'name' so we'll do the same here
stream_name = self._parameters.get("name", "")
logging.info(
f"Could not find schema for stream {stream_name}, defaulting to the empty schema"
)
return {}