forked from databricks/databricks-sql-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_put_tests.py
More file actions
65 lines (52 loc) · 2.48 KB
/
streaming_put_tests.py
File metadata and controls
65 lines (52 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python3
"""
E2E tests for streaming PUT operations.
"""
import io
import logging
import pytest
logger = logging.getLogger(__name__)
class PySQLStreamingPutTestSuiteMixin:
"""Test suite for streaming PUT operations."""
def test_streaming_put_basic(self, catalog, schema):
"""Test basic streaming PUT functionality."""
# Create test data
test_data = b"Hello, streaming world! This is test data."
filename = "streaming_put_test.txt"
file_path = f"/Volumes/{catalog}/{schema}/e2etests/{filename}"
try:
with self.connection() as conn:
with conn.cursor() as cursor:
self._cleanup_test_file(file_path)
with io.BytesIO(test_data) as stream:
cursor.execute(
f"PUT '__input_stream__' INTO '{file_path}'",
input_stream=stream
)
# Verify file exists
cursor.execute(f"LIST '/Volumes/{catalog}/{schema}/e2etests/'")
files = cursor.fetchall()
# Check if our file is in the list
file_paths = [row[0] for row in files]
assert file_path in file_paths, f"File {file_path} not found in {file_paths}"
finally:
self._cleanup_test_file(file_path)
def test_streaming_put_missing_stream(self, catalog, schema):
"""Test that missing stream raises appropriate error."""
with self.connection() as conn:
with conn.cursor() as cursor:
# Test without providing stream
with pytest.raises(Exception): # Should fail
cursor.execute(
f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/test.txt'"
# Note: No input_stream parameter
)
def _cleanup_test_file(self, file_path):
"""Clean up a test file if it exists."""
try:
with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn:
with conn.cursor() as cursor:
cursor.execute(f"REMOVE '{file_path}'")
logger.info("Successfully cleaned up test file: %s", file_path)
except Exception as e:
logger.error("Cleanup failed for %s: %s", file_path, e)