-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathsource_base.py
More file actions
167 lines (150 loc) · 6.05 KB
/
source_base.py
File metadata and controls
167 lines (150 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Base class for source test suites."""
from dataclasses import asdict
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.standard_tests._job_runner import run_test_job
from airbyte_cdk.test.standard_tests.connector_base import (
ConnectorTestSuiteBase,
)
from airbyte_cdk.test.standard_tests.models import (
ConnectorTestScenario,
)
class SourceTestSuiteBase(ConnectorTestSuiteBase):
"""Base class for source test suites.
This class provides a base set of functionality for testing source connectors, and it
inherits all generic connector tests from the `ConnectorTestSuiteBase` class.
"""
def test_check(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Run standard `check` tests on the connector.
Assert that the connector returns a single CONNECTION_STATUS message.
This test is designed to validate the connector's ability to establish a connection
and return its status with the expected message type.
"""
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"check",
test_scenario=scenario,
)
conn_status_messages: list[AirbyteMessage] = [
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
] # noqa: SLF001 # Non-public API
num_status_messages = len(conn_status_messages)
assert num_status_messages == 1, (
f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n"
+ "\n".join([str(m) for m in result._messages])
)
def test_discover(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `discover`."""
run_test_job(
self.create_connector(scenario),
"discover",
test_scenario=scenario,
)
def test_spec(self) -> None:
"""Standard test for `spec`.
This test does not require a `scenario` input, since `spec`
does not require any inputs.
We assume `spec` should always succeed and it should always generate
a valid `SPEC` message.
Note: the parsing of messages by type also implicitly validates that
the generated `SPEC` message is valid JSON.
"""
result = run_test_job(
verb="spec",
test_scenario=None,
connector=self.create_connector(scenario=None),
)
if result.errors:
raise AssertionError(
f"Expected no errors but got {len(result.errors)}: \n"
+ "\n".join([str(e) for e in result.errors])
)
assert len(result.spec_messages) == 1, (
"Expected exactly 1 spec message but got {len(result.spec_messages)}",
result.errors,
)
def test_basic_read(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Run standard `read` test on the connector.
This test is designed to validate the connector's ability to read data
from the source and return records. It first runs a `discover` job to
obtain the catalog of streams, and then it runs a `read` job to fetch
records from those streams.
"""
discover_result = run_test_job(
self.create_connector(scenario),
"discover",
test_scenario=scenario,
)
if scenario.expect_exception:
assert discover_result.errors, "Expected exception but got none."
return
configured_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=stream,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append_dedup,
)
for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
]
)
result = run_test_job(
self.create_connector(scenario),
"read",
test_scenario=scenario,
catalog=configured_catalog,
)
if not result.records:
raise AssertionError("Expected records but got none.") # noqa: TRY003
def test_fail_read_with_bad_catalog(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Standard test for `read` when passed a bad catalog file."""
invalid_configured_catalog = ConfiguredAirbyteCatalog(
streams=[
# Create ConfiguredAirbyteStream which is deliberately invalid
# with regard to the Airbyte Protocol.
# This should cause the connector to fail.
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="__AIRBYTE__stream_that_does_not_exist",
json_schema={
"type": "object",
"properties": {"f1": {"type": "string"}},
},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode="INVALID", # type: ignore [reportArgumentType]
destination_sync_mode="INVALID", # type: ignore [reportArgumentType]
)
]
)
# Set expected status to "failed" to ensure the test fails if the connector.
scenario.status = "failed"
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"read",
test_scenario=scenario,
catalog=asdict(invalid_configured_catalog),
)
assert result.errors, "Expected errors but got none."
assert result.trace_messages, "Expected trace messages but got none."