-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathsource_base.py
More file actions
195 lines (175 loc) · 7.3 KB
/
source_base.py
File metadata and controls
195 lines (175 loc) · 7.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Base class for source test suites."""
from dataclasses import asdict
from typing import TYPE_CHECKING
import pytest
from airbyte_cdk.models import (
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)
from airbyte_cdk.test.models import (
ConnectorTestScenario,
)
from airbyte_cdk.test.standard_tests._job_runner import run_test_job
from airbyte_cdk.test.standard_tests.connector_base import (
ConnectorTestSuiteBase,
)
if TYPE_CHECKING:
from airbyte_cdk.test import entrypoint_wrapper
class SourceTestSuiteBase(ConnectorTestSuiteBase):
"""Base class for source test suites.
This class provides a base set of functionality for testing source connectors, and it
inherits all generic connector tests from the `ConnectorTestSuiteBase` class.
"""
def test_check(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Run standard `check` tests on the connector.
Assert that the connector returns a single CONNECTION_STATUS message.
This test is designed to validate the connector's ability to establish a connection
and return its status with the expected message type.
"""
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"check",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
)
num_status_messages = len(result.connection_status_messages)
assert num_status_messages == 1, (
f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n"
+ "\n".join([str(m) for m in result.get_message_iterator()])
)
def test_discover(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `discover`."""
if scenario.expected_outcome.expect_exception():
pytest.skip(
"Skipping `discover` test because the scenario is expected to raise an exception."
)
run_test_job(
self.create_connector(scenario),
"discover",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario,
)
def test_spec(self) -> None:
"""Standard test for `spec`.
This test does not require a `scenario` input, since `spec`
does not require any inputs.
We assume `spec` should always succeed and it should always generate
a valid `SPEC` message.
Note: the parsing of messages by type also implicitly validates that
the generated `SPEC` message is valid JSON.
"""
result = run_test_job(
verb="spec",
test_scenario=None,
connector=self.create_connector(scenario=None),
connector_root=self.get_connector_root_dir(),
)
# If an error occurs, it will be raised above.
assert len(result.spec_messages) == 1, (
"Expected exactly 1 spec message but got {len(result.spec_messages)}",
result.errors,
)
def test_basic_read(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Run standard `read` test on the connector.
This test is designed to validate the connector's ability to read data
from the source and return records. It first runs a `discover` job to
obtain the catalog of streams, and then it runs a `read` job to fetch
records from those streams.
"""
check_result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"check",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
)
if scenario.expected_outcome.expect_exception() and check_result.errors:
# Expected failure and we got it. Return early.
return
discover_result = run_test_job(
self.create_connector(scenario),
"discover",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.without_expected_outcome(),
)
if scenario.expected_outcome.expect_exception() and discover_result.errors:
# Failed as expected; we're done.
return
configured_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=stream,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append_dedup,
)
for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
]
)
result = run_test_job(
self.create_connector(scenario),
"read",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
catalog=configured_catalog,
)
if scenario.expected_outcome.expect_exception() and not result.errors:
# By now we should have raised an exception.
raise AssertionError("Expected an error but got none.")
if scenario.expected_outcome.expect_success() and not result.records:
raise AssertionError("Expected records but got none.")
def test_fail_read_with_bad_catalog(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `read` when passed a bad catalog file."""
# Recreate the scenario with the same config but set the status to "failed".
scenario = ConnectorTestScenario(
config_dict=scenario.get_config_dict(
connector_root=scenario.connector_root,
empty_if_missing=False,
),
status="failed",
)
invalid_configured_catalog = ConfiguredAirbyteCatalog(
streams=[
# Create ConfiguredAirbyteStream which is deliberately invalid
# with regard to the Airbyte Protocol.
# This should cause the connector to fail.
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="__AIRBYTE__stream_that_does_not_exist",
json_schema={
"type": "object",
"properties": {"f1": {"type": "string"}},
},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode="INVALID", # type: ignore [reportArgumentType]
destination_sync_mode="INVALID", # type: ignore [reportArgumentType]
),
],
)
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"read",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog
catalog=asdict(invalid_configured_catalog),
)
assert result.errors, "Expected errors but got none."
assert result.trace_messages, "Expected trace messages but got none."