Skip to content

Commit fdc272e

Browse files
committed
feat: added option for api auth
1 parent 715019b commit fdc272e

1 file changed

Lines changed: 42 additions & 25 deletions

File tree

src/webapp/databricks.py

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .config import databricks_vars, gcs_vars
99
from .utilities import databricksify_inst_name, SchemaType
1010
from typing import List, Any
11+
from databricks.sdk.errors import DatabricksError
1112

1213
# List of data medallion levels
1314
MEDALLION_LEVELS = ["silver", "gold", "bronze"]
@@ -205,36 +206,52 @@ def fetch_table_data(
205206
Runs a simple SELECT * FROM <catalog>.<schema>.<table> LIMIT <limit>
206207
against the specified SQL warehouse, and returns a list of row‐dicts.
207208
"""
208-
w = WorkspaceClient(
209-
host=databricks_vars["DATABRICKS_HOST_URL"],
210-
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
211-
)
212-
if not w:
213-
raise ValueError(
214-
"fetch_table_data(): could not initialize WorkspaceClient."
215-
)
216209

217-
fq_table = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
218-
sql = f"SELECT * FROM {fq_table} LIMIT {limit}"
210+
try:
211+
client = WorkspaceClient(
212+
host=databricks_vars["DATABRICKS_HOST_URL"],
213+
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
214+
)
215+
except Exception as e:
216+
raise ValueError(f"Failed to initialize WorkspaceClient: {e}")
219217

220-
resp = w.statement_execution.execute_statement(
221-
warehouse_id=warehouse_id,
222-
statement=sql,
223-
format=Format.JSON_ARRAY,
224-
wait_timeout="30s",
225-
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL,
226-
)
218+
# 2. Build SQL text
219+
fully_qualified = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
220+
sql_text = f"SELECT * FROM {fully_qualified} LIMIT {limit}"
227221

228-
if not resp or not getattr(resp, "status", None):
229-
raise ValueError("fetch_table_data(): invalid response or missing status")
222+
# 3. Execute with INLINE+JSON_ARRAY, wait up to 30s, then CANCEL if not done
223+
try:
224+
resp = client.statement_execution.execute_statement(
225+
warehouse_id=warehouse_id,
226+
statement=sql_text,
227+
disposition="INLINE", # INLINE disposition
228+
format=Format.JSON_ARRAY, # JSON_ARRAY format
229+
wait_timeout="30s", # up to 30 seconds
230+
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL, # cancel if not done
231+
)
232+
except DatabricksError as e:
233+
raise ValueError(f"Databricks API call failed: {e}")
230234

231-
if resp.status.state != "SUCCEEDED":
232-
raise ValueError(
233-
f"fetch_table_data(): query failed with state {resp.status.state}"
235+
# 4. Check final state
236+
state = resp.status.state
237+
if state != "SUCCEEDED":
238+
# If there’s an error object, include its message
239+
err = resp.status.error
240+
msg = (
241+
err.message
242+
if (err is not None and err.message)
243+
else "No additional error info."
234244
)
245+
raise ValueError(f"Query did not succeed (state={state}): {msg}")
246+
247+
# 5. Ensure manifest and result are present
248+
if resp.manifest is None or resp.manifest.schema is None:
249+
raise ValueError("Query succeeded but schema manifest is missing.")
250+
if resp.result is None or resp.result.data_array is None:
251+
raise ValueError("Query succeeded but result data is missing.")
235252

236-
# Extract rows
253+
# 6. Extract column names and rows
237254
column_names = [col.name for col in resp.manifest.schema]
238-
rows = resp.result.data_array
255+
data_array = resp.result.data_array
239256

240-
return [dict(zip(column_names, row)) for row in rows]
257+
return [dict(zip(column_names, row)) for row in data_array]

0 commit comments

Comments
 (0)