-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathuc_volume_tests.py
More file actions
301 lines (238 loc) · 10.5 KB
/
uc_volume_tests.py
File metadata and controls
301 lines (238 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import os
import tempfile
import pytest
import databricks.sql as sql
from databricks.sql import Error
@pytest.fixture(scope="module", autouse=True)
def check_catalog_and_schema(catalog, schema):
"""This fixture verifies that a catalog and schema are present in the environment.
The fixture only evaluates when the test _isn't skipped_.
"""
if catalog is None or schema is None:
raise ValueError(
f"UC Volume tests require values for the `catalog` and `schema` environment variables. Found catalog {_catalog} schema {_schema}"
)
class PySQLUCVolumeTestSuiteMixin:
"""Simple namespace for UC Volume tests.
In addition to connection credentials (host, path, token) this suite requires env vars
named catalog and schema"""
def test_uc_volume_life_cycle(self, catalog, schema):
"""PUT a file into the UC Volume
GET the file from the UC Volume
REMOVE the file from the UC Volume
Try to GET the file again expecting to raise an exception
"""
# PUT should succeed
fh, temp_path = tempfile.mkstemp()
original_text = "hello world!".encode("utf-8")
with open(fh, "wb") as fp:
fp.write(original_text)
with self.connection(
extra_params={"staging_allowed_local_path": temp_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{temp_path}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
# GET should succeed
new_fh, new_temp_path = tempfile.mkstemp()
with self.connection(
extra_params={"staging_allowed_local_path": new_temp_path}
) as conn:
cursor = conn.cursor()
query = f"GET '/Volumes/{catalog}/{schema}/e2etests/file1.csv' TO '{new_temp_path}'"
cursor.execute(query)
with open(new_fh, "rb") as fp:
fetched_text = fp.read()
assert fetched_text == original_text
# REMOVE should succeed
remove_query = f"REMOVE '/Volumes/{catalog}/{schema}/e2etests/file1.csv'"
# Use minimal retry settings to fail fast
extra_params = {
"staging_allowed_local_path": "/",
"_retry_stop_after_attempts_count": 1,
"_retry_delay_max": 10,
}
with self.connection(extra_params=extra_params) as conn:
cursor = conn.cursor()
cursor.execute(remove_query)
# GET after REMOVE should fail
with pytest.raises(
Error, match="Staging operation over HTTP was unsuccessful: 404"
):
cursor = conn.cursor()
query = f"GET '/Volumes/{catalog}/{schema}/e2etests/file1.csv' TO '{new_temp_path}'"
cursor.execute(query)
os.remove(temp_path)
os.remove(new_temp_path)
def test_uc_volume_put_fails_without_staging_allowed_local_path(
self, catalog, schema
):
"""PUT operations are not supported unless the connection was built with
a parameter called staging_allowed_local_path
"""
fh, temp_path = tempfile.mkstemp()
original_text = "hello world!".encode("utf-8")
with open(fh, "wb") as fp:
fp.write(original_text)
with pytest.raises(
Error, match="You must provide at least one staging_allowed_local_path"
):
with self.connection() as conn:
cursor = conn.cursor()
query = f"PUT '{temp_path}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
def test_uc_volume_put_fails_if_localFile_not_in_staging_allowed_local_path(
self, catalog, schema
):
fh, temp_path = tempfile.mkstemp()
original_text = "hello world!".encode("utf-8")
with open(fh, "wb") as fp:
fp.write(original_text)
base_path, filename = os.path.split(temp_path)
# Add junk to base_path
base_path = os.path.join(base_path, "temp")
with pytest.raises(
Error,
match="Local file operations are restricted to paths within the configured staging_allowed_local_path",
):
with self.connection(
extra_params={"staging_allowed_local_path": base_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{temp_path}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
def test_uc_volume_put_fails_if_file_exists_and_overwrite_not_set(
self, catalog, schema
):
"""PUT a file into the staging location twice. First command should succeed. Second should fail."""
fh, temp_path = tempfile.mkstemp()
original_text = "hello world!".encode("utf-8")
with open(fh, "wb") as fp:
fp.write(original_text)
def perform_put():
with self.connection(
extra_params={"staging_allowed_local_path": temp_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{temp_path}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv'"
cursor.execute(query)
def perform_remove():
try:
remove_query = (
f"REMOVE '/Volumes/{catalog}/{schema}/e2etests/file1.csv'"
)
with self.connection(
extra_params={"staging_allowed_local_path": "/"}
) as conn:
cursor = conn.cursor()
cursor.execute(remove_query)
except Exception:
pass
# Make sure file does not exist
perform_remove()
# Put the file
perform_put()
# Try to put it again
with pytest.raises(
sql.exc.ServerOperationError, match="FILE_IN_STAGING_PATH_ALREADY_EXISTS"
):
perform_put()
# Clean up after ourselves
perform_remove()
def test_uc_volume_put_fails_if_absolute_localFile_not_in_staging_allowed_local_path(
self, catalog, schema
):
"""
This test confirms that staging_allowed_local_path and target_file are resolved into absolute paths.
"""
# If these two paths are not resolved absolutely, they appear to share a common path of /var/www/html
# after resolution their common path is only /var/www which should raise an exception
# Because the common path must always be equal to staging_allowed_local_path
staging_allowed_local_path = "/var/www/html"
target_file = "/var/www/html/../html1/not_allowed.html"
with pytest.raises(
Error,
match="Local file operations are restricted to paths within the configured staging_allowed_local_path",
):
with self.connection(
extra_params={"staging_allowed_local_path": staging_allowed_local_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{target_file}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
def test_uc_volume_empty_local_path_fails_to_parse_at_server(self, catalog, schema):
staging_allowed_local_path = "/var/www/html"
target_file = ""
with pytest.raises(Error, match="EMPTY_LOCAL_FILE_IN_STAGING_ACCESS_QUERY"):
with self.connection(
extra_params={"staging_allowed_local_path": staging_allowed_local_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{target_file}' INTO '/Volumes/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
def test_uc_volume_invalid_volume_path_fails_at_server(self, catalog, schema):
staging_allowed_local_path = "/var/www/html"
target_file = "index.html"
with pytest.raises(Error, match="NOT_FOUND: Catalog"):
with self.connection(
extra_params={"staging_allowed_local_path": staging_allowed_local_path}
) as conn:
cursor = conn.cursor()
query = f"PUT '{target_file}' INTO '/Volumes/RANDOMSTRINGOFCHARACTERS/{catalog}/{schema}/e2etests/file1.csv' OVERWRITE"
cursor.execute(query)
def test_uc_volume_supports_multiple_staging_allowed_local_path_values(
self, catalog, schema
):
"""staging_allowed_local_path may be either a path-like object or a list of path-like objects.
This test confirms that two configured base paths:
1 - doesn't raise an exception
2 - allows uploads from both paths
3 - doesn't allow uploads from a third path
"""
def generate_file_and_path_and_queries():
"""
1. Makes a temp file with some contents.
2. Write a query to PUT it into a staging location
3. Write a query to REMOVE it from that location (for cleanup)
"""
fh, temp_path = tempfile.mkstemp()
with open(fh, "wb") as fp:
original_text = "hello world!".encode("utf-8")
fp.write(original_text)
put_query = f"PUT '{temp_path}' INTO '/Volumes/{catalog}/{schema}/e2etests/{id(temp_path)}.csv' OVERWRITE"
remove_query = (
f"REMOVE '/Volumes/{catalog}/{schema}/e2etests/{id(temp_path)}.csv'"
)
return fh, temp_path, put_query, remove_query
(
fh1,
temp_path1,
put_query1,
remove_query1,
) = generate_file_and_path_and_queries()
(
fh2,
temp_path2,
put_query2,
remove_query2,
) = generate_file_and_path_and_queries()
(
fh3,
temp_path3,
put_query3,
remove_query3,
) = generate_file_and_path_and_queries()
with self.connection(
extra_params={"staging_allowed_local_path": [temp_path1, temp_path2]}
) as conn:
cursor = conn.cursor()
cursor.execute(put_query1)
cursor.execute(put_query2)
with pytest.raises(
Error,
match="Local file operations are restricted to paths within the configured staging_allowed_local_path",
):
cursor.execute(put_query3)
# Then clean up the files we made
cursor.execute(remove_query1)
cursor.execute(remove_query2)