-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathdeclarative_sources.py
More file actions
94 lines (74 loc) · 3.13 KB
/
declarative_sources.py
File metadata and controls
94 lines (74 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
from hashlib import md5
from pathlib import Path
from typing import Any, cast
import yaml
from boltons.typeutils import classproperty
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.test.standard_tests._job_runner import IConnector
from airbyte_cdk.test.standard_tests.models import ConnectorTestScenario
from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase
from airbyte_cdk.utils.connector_paths import MANIFEST_YAML
def md5_checksum(file_path: Path) -> str:
"""Helper function to calculate the MD5 checksum of a file.
This is used to calculate the checksum of the `components.py` file, if it exists.
"""
with open(file_path, "rb") as file:
return md5(file.read()).hexdigest()
class DeclarativeSourceTestSuite(SourceTestSuiteBase):
"""Declarative source test suite.
This inherits from the Python-based source test suite and implements the
`create_connector` method to create a declarative source object instead of
requiring a custom Python source object.
The class also automatically locates the `manifest.yaml` file and the
`components.py` file (if it exists) in the connector's directory.
"""
connector: type[IConnector] | None = None
@classproperty
def manifest_yaml_path(cls) -> Path:
"""Get the path to the manifest.yaml file."""
result = cls.get_connector_root_dir() / MANIFEST_YAML
if result.exists():
return result
raise FileNotFoundError(
f"Manifest YAML file not found at {result}. "
"Please ensure that the test suite is run in the correct directory.",
)
@classproperty
def components_py_path(cls) -> Path | None:
"""Get the path to the `components.py` file, if one exists.
If not `components.py` file exists, return None.
"""
result = cls.get_connector_root_dir() / "components.py"
if result.exists():
return result
return None
@classmethod
def create_connector(
cls,
scenario: ConnectorTestScenario,
) -> IConnector:
"""Create a connector scenario for the test suite.
This overrides `create_connector` from the create a declarative source object
instead of requiring a custom python source object.
Subclasses should not need to override this method.
"""
config: dict[str, Any] = scenario.get_config_dict()
manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text())
if cls.components_py_path and cls.components_py_path.exists():
os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true"
config["__injected_components_py"] = cls.components_py_path.read_text()
config["__injected_components_py_checksums"] = {
"md5": md5_checksum(cls.components_py_path),
}
return cast(
IConnector,
ConcurrentDeclarativeSource(
config=config,
catalog=None,
state=None,
source_config=manifest_dict,
),
)