-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathdocker_base.py
More file actions
371 lines (327 loc) · 13.7 KB
/
docker_base.py
File metadata and controls
371 lines (327 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Base class for connector test suites."""
from __future__ import annotations
import inspect
import shutil
import sys
import tempfile
import warnings
from dataclasses import asdict
from pathlib import Path
from subprocess import CompletedProcess, SubprocessError
from typing import Literal, cast
import orjson
import pytest
import yaml
from boltons.typeutils import classproperty
from airbyte_cdk.models import (
AirbyteCatalog,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.models.connector_metadata import MetadataFile
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.models import ConnectorTestScenario
from airbyte_cdk.utils.connector_paths import (
ACCEPTANCE_TEST_CONFIG,
find_connector_root,
)
from airbyte_cdk.utils.docker import (
build_connector_image,
run_docker_airbyte_command,
run_docker_command,
)
class DockerConnectorTestSuite:
"""Base class for connector test suites."""
@classmethod
def get_test_class_dir(cls) -> Path:
"""Get the file path that contains the class."""
module = sys.modules[cls.__module__]
# Get the directory containing the test file
return Path(inspect.getfile(module)).parent
@classmethod
def get_connector_root_dir(cls) -> Path:
"""Get the root directory of the connector."""
return find_connector_root([cls.get_test_class_dir(), Path.cwd()])
@classproperty
def connector_name(self) -> str:
"""Get the name of the connector."""
connector_root = self.get_connector_root_dir()
return connector_root.absolute().name
@classmethod
def is_destination_connector(cls) -> bool:
"""Check if the connector is a destination."""
return cast(str, cls.connector_name).startswith("destination-")
@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if result.exists():
return result
raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
@classmethod
def get_scenarios(
cls,
) -> list[ConnectorTestScenario]:
"""Get acceptance tests for a given category.
This has to be a separate function because pytest does not allow
parametrization of fixtures with arguments from the test class itself.
"""
categories = ["connection", "spec"]
try:
acceptance_test_config_path = cls.acceptance_test_config_path
except FileNotFoundError as e:
# Destinations sometimes do not have an acceptance tests file.
warnings.warn(
f"Acceptance test config file not found: {e!s}. No scenarios will be loaded.",
category=UserWarning,
stacklevel=1,
)
return []
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
if "acceptance_tests" not in all_tests_config:
raise ValueError(
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
f" Found only: {str(all_tests_config)}."
)
test_scenarios: list[ConnectorTestScenario] = []
for category in categories:
if (
category not in all_tests_config["acceptance_tests"]
or "tests" not in all_tests_config["acceptance_tests"][category]
):
continue
for test in all_tests_config["acceptance_tests"][category]["tests"]:
if "config_path" not in test:
# Skip tests without a config_path
continue
if "iam_role" in test["config_path"]:
# We skip iam_role tests for now, as they are not supported in the test suite.
continue
scenario = ConnectorTestScenario.model_validate(test)
if scenario.config_path and scenario.config_path in [
s.config_path for s in test_scenarios
]:
# Skip duplicate scenarios based on config_path
continue
test_scenarios.append(scenario)
return test_scenarios
@pytest.mark.skipif(
shutil.which("docker") is None,
reason="docker CLI not found in PATH, skipping docker image tests",
)
@pytest.mark.image_tests
def test_docker_image_build_and_spec(
self,
connector_image_override: str | None,
) -> None:
"""Run `docker_image` acceptance tests."""
connector_root = self.get_connector_root_dir().absolute()
metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
connector_image: str | None = connector_image_override
if not connector_image:
tag = "dev-latest"
connector_image = build_connector_image(
connector_name=connector_root.absolute().name,
connector_directory=connector_root,
metadata=metadata,
tag=tag,
no_verify=False,
)
_ = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
connector_image,
"spec",
],
raise_if_errors=True,
)
@pytest.mark.skipif(
shutil.which("docker") is None,
reason="docker CLI not found in PATH, skipping docker image tests",
)
@pytest.mark.image_tests
def test_docker_image_build_and_check(
self,
scenario: ConnectorTestScenario,
connector_image_override: str | None,
) -> None:
"""Run `docker_image` acceptance tests.
This test builds the connector image and runs the `check` command inside the container.
Note:
- It is expected for docker image caches to be reused between test runs.
- In the rare case that image caches need to be cleared, please clear
the local docker image cache using `docker image prune -a` command.
"""
if scenario.expected_outcome.expect_exception():
pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).")
tag = "dev-latest"
connector_root = self.get_connector_root_dir()
metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
connector_image: str | None = connector_image_override
if not connector_image:
tag = "dev-latest"
connector_image = build_connector_image(
connector_name=connector_root.absolute().name,
connector_directory=connector_root,
metadata=metadata,
tag=tag,
no_verify=False,
)
container_config_path = "/secrets/config.json"
with scenario.with_temp_config_file(
connector_root=connector_root,
) as temp_config_file:
_ = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
"-v",
f"{temp_config_file}:{container_config_path}",
connector_image,
"check",
"--config",
container_config_path,
],
raise_if_errors=True,
)
@pytest.mark.skipif(
shutil.which("docker") is None,
reason="docker CLI not found in PATH, skipping docker image tests",
)
@pytest.mark.image_tests
def test_docker_image_build_and_read(
self,
scenario: ConnectorTestScenario,
connector_image_override: str | None,
read_from_streams: Literal["all", "none", "default"] | list[str],
read_scenarios: Literal["all", "none", "default"] | list[str],
) -> None:
"""Read from the connector's Docker image.
This test builds the connector image and runs the `read` command inside the container.
Note:
- It is expected for docker image caches to be reused between test runs.
- In the rare case that image caches need to be cleared, please clear
the local docker image cache using `docker image prune -a` command.
- If the --connector-image arg is provided, it will be used instead of building the image.
"""
if self.is_destination_connector():
pytest.skip("Skipping read test for destination connector.")
if scenario.expected_outcome.expect_exception():
pytest.skip("Skipping (expected to fail).")
if read_from_streams == "none":
pytest.skip("Skipping read test (`--read-from-streams=false`).")
if read_scenarios == "none":
pytest.skip("Skipping (`--read-scenarios=none`).")
default_scenario_ids = ["config", "valid_config", "default"]
if read_scenarios == "all":
pass
elif read_scenarios == "default":
if scenario.id not in default_scenario_ids:
pytest.skip(
f"Skipping read test for scenario '{scenario.id}' "
f"(not in default scenarios list '{default_scenario_ids}')."
)
elif scenario.id not in read_scenarios:
# pytest.skip(
raise ValueError(
f"Skipping read test for scenario '{scenario.id}' "
f"(not in --read-scenarios={read_scenarios})."
)
tag = "dev-latest"
connector_root = self.get_connector_root_dir()
connector_name = connector_root.absolute().name
metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
connector_image: str | None = connector_image_override
if not connector_image:
tag = "dev-latest"
connector_image = build_connector_image(
connector_name=connector_name,
connector_directory=connector_root,
metadata=metadata,
tag=tag,
no_verify=False,
)
container_config_path = "/secrets/config.json"
container_catalog_path = "/secrets/catalog.json"
with (
scenario.with_temp_config_file(
connector_root=connector_root,
) as temp_config_file,
tempfile.TemporaryDirectory(
prefix=f"{connector_name}-test",
ignore_cleanup_errors=True,
) as temp_dir_str,
):
temp_dir = Path(temp_dir_str)
discover_result = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
"-v",
f"{temp_config_file}:{container_config_path}",
connector_image,
"discover",
"--config",
container_config_path,
],
raise_if_errors=True,
)
catalog_message = discover_result.catalog # Get catalog message
assert catalog_message.catalog is not None, "Catalog message missing catalog."
discovered_catalog: AirbyteCatalog = catalog_message.catalog
if not discovered_catalog.streams:
raise ValueError(
f"Discovered catalog for connector '{connector_name}' is empty. "
"Please check the connector's discover implementation."
)
streams_list = [stream.name for stream in discovered_catalog.streams]
if read_from_streams == "default" and metadata.data.suggestedStreams:
# set `streams_list` to be the intersection of discovered and suggested streams.
streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams.streams))
if isinstance(read_from_streams, list):
# If `read_from_streams` is a list, we filter the discovered streams.
streams_list = list(set(streams_list) & set(read_from_streams))
configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=stream,
sync_mode=(
stream.supported_sync_modes[0]
if stream.supported_sync_modes
else SyncMode.full_refresh
),
destination_sync_mode=DestinationSyncMode.append,
)
for stream in discovered_catalog.streams
if stream.name in streams_list
]
)
configured_catalog_path = temp_dir / "catalog.json"
configured_catalog_path.write_text(
orjson.dumps(asdict(configured_catalog)).decode("utf-8")
)
read_result: EntrypointOutput = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
"-v",
f"{temp_config_file}:{container_config_path}",
"-v",
f"{configured_catalog_path}:{container_catalog_path}",
connector_image,
"read",
"--config",
container_config_path,
"--catalog",
container_catalog_path,
],
raise_if_errors=True,
)