Skip to content

Commit 7c053cd

Browse files
committed
refined fetch table function
1 parent 86715bb commit 7c053cd

1 file changed

Lines changed: 56 additions & 123 deletions

File tree

src/webapp/databricks.py

Lines changed: 56 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -310,161 +310,94 @@ def delete_inst(self, inst_name: str) -> None:
310310
)
311311

312312
def fetch_table_data(
313-
self, catalog_name: str, inst_name: str, table_name: str, warehouse_id: str
313+
self,
314+
catalog_name: str,
315+
inst_name: str,
316+
table_name: str,
317+
warehouse_id: str,
314318
) -> List[Dict[str, Any]]:
319+
"""
320+
Execute SELECT * via Databricks SQL Statement Execution API using EXTERNAL_LINKS.
321+
Blocks server-side for up to 30s; if not SUCCEEDED, raises. Downloads presigned
322+
URLs in-memory and returns rows as List[Dict[str, Any]].
323+
"""
315324
w = WorkspaceClient(
316325
host=databricks_vars["DATABRICKS_HOST_URL"],
317326
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
318327
)
328+
319329
schema = databricksify_inst_name(inst_name)
320330
table_fqn = f"`{catalog_name}`.`{schema}_silver`.`{table_name}`"
321331
sql = f"SELECT * FROM {table_fqn}"
322332

323-
# 1) Execute INLINE + poll until SUCCEEDED
333+
# Start with EXTERNAL_LINKS and let the server block up to 30s.
324334
resp = w.statement_execution.execute_statement(
325335
warehouse_id=warehouse_id,
326336
statement=sql,
327-
disposition=Disposition.INLINE,
337+
disposition=Disposition.EXTERNAL_LINKS,
328338
format=Format.JSON_ARRAY,
329339
wait_timeout="30s",
330340
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
331341
)
332342

333-
MAX_BYTES = 20 * 1024 * 1024 # 20 MiB
334-
POLL_INTERVAL_S = 1.0
335-
POLL_TIMEOUT_S = 300.0 # 5 minutes
336-
337-
start = time.time()
338-
while not resp.status or resp.status.state not in {
339-
"SUCCEEDED",
340-
"FAILED",
341-
"CANCELED",
342-
}:
343-
if time.time() - start > POLL_TIMEOUT_S:
344-
raise TimeoutError("Timed out waiting for statement to finish (INLINE)")
345-
time.sleep(POLL_INTERVAL_S)
346-
resp = w.statement_execution.get_statement(statement_id=resp.statement_id)
347-
if resp.status.state != "SUCCEEDED":
348-
msg = (
349-
resp.status.error.message
350-
if resp.status and resp.status.error
351-
else "no details"
352-
)
353-
raise ValueError(f"Statement ended in {resp.status.state}: {msg}")
354-
355-
if not (
356-
resp.manifest and resp.manifest.schema and resp.manifest.schema.columns
357-
):
358-
raise ValueError("Schema/columns missing.")
359-
cols = [c.name for c in resp.manifest.schema.columns]
343+
stmt_id = resp.statement_id
344+
if stmt_id is None:
345+
raise ValueError("Databricks returned a null statement_id")
360346

361-
# 2) Build INLINE records until ~20 MiB; if projected to exceed, switch to EXTERNAL_LINKS ---
362-
records: List[Dict[str, Any]] = []
363-
bytes_so_far, have_items = 0, False
347+
# No client-side polling; require SUCCEEDED within 30s.
348+
if not resp.status or resp.status.state != "SUCCEEDED":
349+
state = resp.status.state if resp.status else "UNKNOWN"
350+
msg = resp.status.error.message if (resp.status and resp.status.error) else "Query not finished within wait_timeout"
351+
raise TimeoutError(f"Statement {stmt_id} not finished (state={state}): {msg}")
364352

365-
def add_row(rd: Dict[str, Any]) -> bool:
366-
nonlocal bytes_so_far, have_items
367-
b = json.dumps(rd, ensure_ascii=False, separators=(",", ":")).encode(
368-
"utf-8"
369-
)
370-
projected = bytes_so_far + (1 if have_items else 0) + len(b) + 2
371-
if projected > MAX_BYTES:
372-
return False
373-
records.append(rd)
374-
bytes_so_far += (1 if have_items else 0) + len(b)
375-
have_items = True
376-
return True
377-
378-
# Consume INLINE chunks
379-
def consume_inline_chunk(chunk_obj) -> bool:
380-
if getattr(chunk_obj, "truncated", False):
381-
raise ValueError("Server truncated INLINE result.")
382-
arr = getattr(chunk_obj, "data_array", None) or []
383-
for row in arr:
384-
if not add_row(dict(zip(cols, row))):
385-
return False
386-
return True
387-
388-
first = resp.result
389-
if first and not consume_inline_chunk(first):
390-
inline_over_limit = True
391-
else:
392-
inline_over_limit = False
393-
next_idx = getattr(first, "next_chunk_index", None) if first else None
394-
while next_idx is not None:
395-
chunk = w.statement_execution.get_statement_result_chunk_n(
396-
statement_id=resp.statement_id, chunk_index=next_idx
397-
)
398-
if not consume_inline_chunk(chunk):
399-
inline_over_limit = True
400-
break
401-
next_idx = getattr(chunk, "next_chunk_index", None)
402-
403-
if not inline_over_limit:
404-
return records # INLINE fit under 20 MiB
353+
# Columns (ensure List[str] for type-checkers)
354+
if not (resp.manifest and resp.manifest.schema and resp.manifest.schema.columns):
355+
raise ValueError("Schema/columns missing (EXTERNAL_LINKS).")
356+
cols: List[str] = []
357+
for c in resp.manifest.schema.columns:
358+
if c.name is None:
359+
raise ValueError("Encountered a column without a name.")
360+
cols.append(c.name)
405361

406-
# 3) Re-execute with EXTERNAL_LINKS, then download each presigned URL (no auth header) ---
407-
resp = w.statement_execution.execute_statement(
408-
warehouse_id=warehouse_id,
409-
statement=sql,
410-
disposition=Disposition.EXTERNAL_LINKS,
411-
format=Format.JSON_ARRAY,
412-
wait_timeout="30s",
413-
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
414-
)
415-
start = time.time()
416-
while not resp.status or resp.status.state not in {
417-
"SUCCEEDED",
418-
"FAILED",
419-
"CANCELED",
420-
}:
421-
if time.time() - start > POLL_TIMEOUT_S:
422-
raise TimeoutError(
423-
"Timed out waiting for statement to finish (EXTERNAL_LINKS)"
424-
)
425-
time.sleep(POLL_INTERVAL_S)
426-
resp = w.statement_execution.get_statement(statement_id=resp.statement_id)
427-
if resp.status.state != "SUCCEEDED":
428-
msg = (
429-
resp.status.error.message
430-
if resp.status and resp.status.error
431-
else "no details"
432-
)
433-
raise ValueError(
434-
f"Statement (EXTERNAL_LINKS) ended in {resp.status.state}: {msg}"
435-
)
362+
records: List[Dict[str, Any]] = []
436363

437-
if not (
438-
resp.manifest and resp.manifest.schema and resp.manifest.schema.columns
439-
):
440-
raise ValueError("Schema/columns missing (EXTERNAL_LINKS).")
441-
cols = [c.name for c in resp.manifest.schema.columns]
442-
443-
def consume_external_result(result_obj):
444-
links = getattr(result_obj, "external_links", None) or []
445-
for link in links:
446-
url = (
447-
link.external_link
448-
if hasattr(link, "external_link")
449-
else link.get("external_link")
450-
)
364+
# Helper: consume one chunk-like object (first result or subsequent chunk)
365+
def _consume_chunk(chunk_obj: Any) -> int | None:
366+
links = getattr(chunk_obj, "external_links", None) or []
367+
for link_obj in links:
368+
url = getattr(link_obj, "external_link", None)
369+
if url is None and isinstance(link_obj, dict):
370+
url = link_obj.get("external_link")
371+
if not url:
372+
continue
373+
# IMPORTANT: do not send Databricks auth header to presigned URLs.
451374
r = requests.get(url, timeout=120)
452375
r.raise_for_status()
453-
for row in r.json():
376+
rows = r.json()
377+
if not isinstance(rows, list):
378+
raise ValueError("Unexpected external link payload (expected JSON array).")
379+
for row in rows:
380+
if not isinstance(row, list):
381+
raise ValueError("Unexpected row shape (expected list).")
454382
records.append(dict(zip(cols, row)))
455-
return getattr(result_obj, "next_chunk_index", None)
383+
return getattr(chunk_obj, "next_chunk_index", None)
456384

457-
records.clear()
458-
next_idx = consume_external_result(resp.result)
385+
# First batch is in resp.result
386+
if not resp.result:
387+
return records
388+
next_idx = _consume_chunk(resp.result)
459389

390+
# Remaining batches by chunk index
460391
while next_idx is not None:
461392
chunk = w.statement_execution.get_statement_result_chunk_n(
462-
statement_id=resp.statement_id, chunk_index=next_idx
393+
statement_id=stmt_id,
394+
chunk_index=next_idx,
463395
)
464-
next_idx = consume_external_result(chunk)
396+
next_idx = _consume_chunk(chunk)
465397

466398
return records
467399

400+
468401
def get_key_for_file(
469402
self, mapping: Dict[str, Any], file_name: str
470403
) -> Optional[str]:

0 commit comments

Comments
 (0)