-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathtest_file_stream.py
More file actions
246 lines (215 loc) · 9.55 KB
/
test_file_stream.py
File metadata and controls
246 lines (215 loc) · 9.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest import TestCase
from unittest.mock import Mock, patch
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover
from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.test.mock_http.response_builder import find_binary_response, find_template
from airbyte_cdk.test.state_builder import StateBuilder
class ConfigBuilder:
def build(self) -> Dict[str, Any]:
return {
"subdomain": "d3v-airbyte",
"start_date": "2023-01-01T00:00:00Z",
"credentials": {
"credentials": "api_token",
"email": "integration-test@airbyte.io",
"api_token": "fake_token",
},
}
def _source(
catalog: ConfiguredAirbyteCatalog,
config: Dict[str, Any],
state: Optional[List[AirbyteStateMessage]] = None,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> YamlDeclarativeSource:
if not yaml_file:
yaml_file = "file_stream_manifest.yaml"
return YamlDeclarativeSource(
path_to_yaml=str(Path(__file__).parent / yaml_file),
catalog=catalog,
config=config,
state=state,
emit_connector_builder_messages=emit_connector_builder_messages
)
def read(
config_builder: ConfigBuilder,
catalog: ConfiguredAirbyteCatalog,
state_builder: Optional[StateBuilder] = None,
expecting_exception: bool = False,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> EntrypointOutput:
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception
)
def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
config = config_builder.build()
return entrypoint_discover(
_source(CatalogBuilder().build(), config), config, expecting_exception
)
SERVER_URL = "https://d3v-airbyte.zendesk.com"
STREAM_URL = f"{SERVER_URL}/api/v2/help_center/incremental/articles?start_time=1672531200"
STREAM_ATTACHMENTS_URL = (
f"{SERVER_URL}/api/v2/help_center/articles/12138789487375/attachments?per_page=100&=1672531200"
)
STREAM_ATTACHMENT_CONTENT_URL = f"{SERVER_URL}/hc/article_attachments/12138758717583"
class FileStreamTest(TestCase):
def _config(self) -> ConfigBuilder:
return ConfigBuilder()
def test_check(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
source = _source(
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles"))
.build(),
self._config().build(),
)
check_result = source.check(Mock(), self._config().build())
assert check_result.status == Status.SUCCEEDED
def test_get_articles(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles"))
.build(),
)
assert output.records
def test_get_article_attachments(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
)
assert output.records
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert re.match(
r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.staging_file_url
)
assert file_reference.source_file_relative_path
assert re.match(
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path
)
assert file_reference.file_size_bytes
def test_get_article_attachments_with_filename_extractor(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)
assert output.records
file_reference = output.records[0].record.file_reference
assert file_reference
assert (
file_reference.staging_file_url
== "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png"
)
assert file_reference.source_file_relative_path
assert not re.match(
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path
)
assert file_reference.file_size_bytes
def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
emit_connector_builder_messages=True,
)
assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert file_reference.source_file_relative_path
# because we didn't write the file, the size is 0
assert file_reference.file_size_bytes == 0
# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path
assert record_data["file_size_bytes"] == file_reference.file_size_bytes
def test_discover_article_attachments(self) -> None:
output = discover(self._config())
article_attachments_stream = next(
filter(
lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams
)
)
assert article_attachments_stream.is_file_based