-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathdatabricks_client.py
More file actions
345 lines (284 loc) · 11.9 KB
/
databricks_client.py
File metadata and controls
345 lines (284 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""
Abstract client interface for interacting with Databricks SQL services.
Implementations of this class are responsible for:
- Managing connections to Databricks SQL services
- Executing SQL queries and commands
- Retrieving query results
- Fetching metadata about catalogs, schemas, tables, and columns
"""
from abc import ABC, abstractmethod
from typing import Dict, Tuple, List, Optional, Any, Union, TYPE_CHECKING
if TYPE_CHECKING:
from databricks.sql.client import Cursor
from databricks.sql.thrift_api.TCLIService import ttypes
from databricks.sql.backend.types import SessionId, CommandId, CommandState
# Forward reference for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from databricks.sql.result_set import ResultSet
class DatabricksClient(ABC):
# == Connection and Session Management ==
@abstractmethod
def open_session(
self,
session_configuration: Optional[Dict[str, Any]],
catalog: Optional[str],
schema: Optional[str],
) -> SessionId:
"""
Opens a new session with the Databricks SQL service.
This method establishes a new session with the server and returns a session
identifier that can be used for subsequent operations.
Args:
session_configuration: Optional dictionary of configuration parameters for the session
catalog: Optional catalog name to use as the initial catalog for the session
schema: Optional schema name to use as the initial schema for the session
Returns:
SessionId: A session identifier object that can be used for subsequent operations
Raises:
Error: If the session configuration is invalid
OperationalError: If there's an error establishing the session
InvalidServerResponseError: If the server response is invalid or unexpected
"""
pass
@abstractmethod
def close_session(self, session_id: SessionId) -> None:
"""
Closes an existing session with the Databricks SQL service.
This method terminates the session identified by the given session ID and
releases any resources associated with it.
Args:
session_id: The session identifier returned by open_session()
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error closing the session
"""
pass
# == Query Execution, Command Management ==
@abstractmethod
def execute_command(
self,
operation: str,
session_id: SessionId,
max_rows: int,
max_bytes: int,
lz4_compression: bool,
cursor: "Cursor",
use_cloud_fetch: bool,
parameters: List,
async_op: bool,
enforce_embedded_schema_correctness: bool,
) -> Union["ResultSet", None]:
"""
Executes a SQL command or query within the specified session.
This method sends a SQL command to the server for execution and handles
the response. It can operate in both synchronous and asynchronous modes.
Args:
operation: The SQL command or query to execute
session_id: The session identifier in which to execute the command
max_rows: Maximum number of rows to fetch in a single fetch batch
max_bytes: Maximum number of bytes to fetch in a single fetch batch
lz4_compression: Whether to use LZ4 compression for result data
cursor: The cursor object that will handle the results
use_cloud_fetch: Whether to use cloud fetch for retrieving large result sets
parameters: List of parameters to bind to the query
async_op: Whether to execute the command asynchronously
enforce_embedded_schema_correctness: Whether to enforce schema correctness
Returns:
If async_op is False, returns a ResultSet object containing the
query results and metadata. If async_op is True, returns None and the
results must be fetched later using get_execution_result().
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error executing the command
ServerOperationError: If the server encounters an error during execution
"""
pass
@abstractmethod
def cancel_command(self, command_id: CommandId) -> None:
"""
Cancels a running command or query.
This method attempts to cancel a command that is currently being executed.
It can be called from a different thread than the one executing the command.
Args:
command_id: The command identifier to cancel
Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error canceling the command
"""
pass
@abstractmethod
def close_command(self, command_id: CommandId) -> None:
"""
Closes a command and releases associated resources.
This method informs the server that the client is done with the command
and any resources associated with it can be released.
Args:
command_id: The command identifier to close
Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error closing the command
"""
pass
@abstractmethod
def get_query_state(self, command_id: CommandId) -> CommandState:
"""
Gets the current state of a query or command.
This method retrieves the current execution state of a command from the server.
Args:
command_id: The command identifier to check
Returns:
CommandState: The current state of the command
Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the state
ServerOperationError: If the command is in an error state
DatabaseError: If the command has been closed unexpectedly
"""
pass
@abstractmethod
def get_execution_result(
self,
command_id: CommandId,
cursor: "Cursor",
) -> "ResultSet":
"""
Retrieves the results of a previously executed command.
This method fetches the results of a command that was executed asynchronously
or retrieves additional results from a command that has more rows available.
Args:
command_id: The command identifier for which to retrieve results
cursor: The cursor object that will handle the results
Returns:
ResultSet: An object containing the query results and metadata
Raises:
ValueError: If the command ID is invalid
OperationalError: If there's an error retrieving the results
"""
pass
# == Metadata Operations ==
@abstractmethod
def get_catalogs(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
) -> "ResultSet":
"""
Retrieves a list of available catalogs.
This method fetches metadata about all catalogs available in the current
session's context.
Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
Returns:
ResultSet: An object containing the catalog metadata
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the catalogs
"""
pass
@abstractmethod
def get_schemas(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
) -> "ResultSet":
"""
Retrieves a list of schemas, optionally filtered by catalog and schema name patterns.
This method fetches metadata about schemas available in the specified catalog
or all catalogs if no catalog is specified.
Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
Returns:
ResultSet: An object containing the schema metadata
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the schemas
"""
pass
@abstractmethod
def get_tables(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
table_types: Optional[List[str]] = None,
) -> "ResultSet":
"""
Retrieves a list of tables, optionally filtered by catalog, schema, table name, and table types.
This method fetches metadata about tables available in the specified catalog
and schema, or all catalogs and schemas if not specified.
Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
table_types: Optional list of table types to filter by (e.g., ['TABLE', 'VIEW'])
Returns:
ResultSet: An object containing the table metadata
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the tables
"""
pass
@abstractmethod
def get_columns(
self,
session_id: SessionId,
max_rows: int,
max_bytes: int,
cursor: "Cursor",
catalog_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
column_name: Optional[str] = None,
) -> "ResultSet":
"""
Retrieves a list of columns, optionally filtered by catalog, schema, table, and column name patterns.
This method fetches metadata about columns available in the specified table,
or all tables if not specified.
Args:
session_id: The session identifier
max_rows: Maximum number of rows to fetch in a single batch
max_bytes: Maximum number of bytes to fetch in a single batch
cursor: The cursor object that will handle the results
catalog_name: Optional catalog name pattern to filter by
schema_name: Optional schema name pattern to filter by
table_name: Optional table name pattern to filter by
column_name: Optional column name pattern to filter by
Returns:
ResultSet: An object containing the column metadata
Raises:
ValueError: If the session ID is invalid
OperationalError: If there's an error retrieving the columns
"""
pass
@property
@abstractmethod
def max_download_threads(self) -> int:
"""
Gets the maximum number of download threads for cloud fetch operations.
Returns:
int: The maximum number of download threads
"""
pass