Skip to content

Commit 85f8ba3

Browse files
fix(kernel): stop premature sync statement close (H4); bump KERNEL_REV to batch 2 (#830)
* fix(kernel): stop premature sync statement close (H4); bump KERNEL_REV to batch 2 Connector half of the kernel batch-2 fixes (kernel PR #123). Bumps KERNEL_REV to pick up the batch-2 kernel surface. H4 — don't close the kernel Statement at sync execute-return: execute_command's finally used to `stmt.close()` immediately after `stmt.execute()` succeeded. For a large CloudFetch result with paginated chunk links (all_fetched=false), the kernel fetches later links lazily (get_result_chunks against the LIVE statement) during consumption, so a premature CloseStatement broke those fetches. The kernel now auto-closes the server statement when its result stream drains (ExecutedStatement::next_batch end-of-stream), with the executed-handle Drop as the backstop for partial/abandoned reads. So the connector now flips close_stmt=False on a successful execute and only closes on the error path (no executed handle was produced). The other batch-2 fixes (cancelled-class -> OperationalError, U2M refresh fail-fast, metadata statement close-on-drop, per-binding OAuth client_id) are entirely kernel-side and need no connector code beyond the KERNEL_REV bump. Tests: unit (sync execute does-not-close on success / does-close on failure) + e2e (large multi-chunk result drains without premature close + cursor reuse; server cancel maps to OperationalError not ProgrammingError). All e2e verified live against dogfood. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> * chore: re-pin KERNEL_REV to #123 HEAD after cancelled-test fix #123 picked up a follow-up commit fixing the wiremock cancelled-state assertions (ErrorCode::Cancelled). Bump the placeholder pin so the connector CI builds the corrected kernel. Still to be re-pinned to the squash-merge SHA before #830 merges. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> * chore: re-pin KERNEL_REV to merged kernel #123 (f4ee6fe) Kernel batch-2 (#123) is merged to kernel main (f4ee6fe, current main HEAD). Re-pin from the orphaned branch HEAD (4f7fbe7) to the merged SHA — reachable from main, no orphan-SHA risk. Verified against a wheel built from f4ee6fe: connector unit (102) + kernel e2e (H4 large-result drain + reuse, server-cancel -> OperationalError, staging fail-loud, diagnostic-info) all pass against the real merged kernel. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> * docs: drop internal 'H4' audit shorthand from comments/docstrings Address review: 'H4' is internal audit shorthand, meaningless in the public connector codebase. Reword the affected comment + two test docstrings to describe the behavior directly (premature sync CloseStatement broke lazy CloudFetch chunk-link fetches). No code change. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent a3e882e commit 85f8ba3

3 files changed

Lines changed: 142 additions & 5 deletions

File tree

src/databricks/sql/backend/kernel/client.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,16 +473,29 @@ def execute_command(
473473
# Canceller is best-effort; never block execute on it.
474474
pass
475475
executed = stmt.execute()
476+
# Execute succeeded: the kernel now owns the statement
477+
# lifecycle. It auto-closes the server statement when the
478+
# result stream is fully drained (``ExecutedStatement::
479+
# next_batch`` end-of-stream), with the executed handle's
480+
# ``Drop`` as the backstop for partial/abandoned reads.
481+
# So we must NOT close ``stmt`` here: a premature
482+
# ``CloseStatement`` at execute-return broke lazy
483+
# CloudFetch chunk-link fetches (``get_result_chunks``
484+
# against the live statement) for large paginated-link
485+
# results. Closing here is left ONLY for the error path
486+
# below, where no executed handle / result set was
487+
# produced to reap it.
488+
close_stmt = False
476489
except Exception as exc:
477490
raise _wrap_kernel_exception("execute_command", exc) from exc
478491
finally:
479492
with self._sync_cancellers_lock:
480493
self._sync_cancellers.pop(id(cursor), None)
481494
if close_stmt:
482-
# Sync path: ``Statement`` is a lifecycle owner separate
483-
# from the executed handle. Drop it here so the parent
484-
# doesn't outlive its caller. Swallow close errors —
485-
# they're not actionable.
495+
# Reached only when ``stmt.execute()`` did not succeed
496+
# (or async, which flipped the flag earlier): no executed
497+
# handle owns the statement, so close it here to avoid a
498+
# leak. Swallow close errors — not actionable.
486499
try:
487500
stmt.close()
488501
except Exception:

tests/e2e/test_kernel_backend.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import pytest
2525

2626
import databricks.sql as sql
27-
from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError
27+
from databricks.sql.exc import (
28+
DatabaseError,
29+
NotSupportedError,
30+
OperationalError,
31+
ServerOperationError,
32+
)
2833

2934
# Skip the whole module unless the kernel wheel is genuinely installed.
3035
# ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a
@@ -563,3 +568,54 @@ def cancel_after_delay():
563568
finally:
564569
t.join()
565570
cur.close()
571+
572+
573+
# ── Batch 2 ────────────────────────────────────────────────────────
574+
575+
576+
def test_large_result_drains_without_premature_close(conn):
577+
"""A large multi-chunk result fully drains even though the connector
578+
no longer closes the statement at execute-return — the kernel
579+
auto-closes on drain. Guards the regression where a premature
580+
CloseStatement broke lazy CloudFetch chunk-link fetches."""
581+
n = 5_000_000
582+
with conn.cursor() as cur:
583+
cur.execute(f"SELECT id, cast(id AS string) s FROM range({n})")
584+
rows = cur.fetchall()
585+
assert len(rows) == n
586+
# Cursor is reusable after the auto-close fired on the prior result.
587+
cur.execute("SELECT 42 AS n")
588+
assert cur.fetchall()[0][0] == 42
589+
590+
591+
def test_server_cancel_maps_to_operational_error(conn):
592+
"""A server-side cancel surfaces as OperationalError (cancelled
593+
class), not ProgrammingError. We trigger it via a cross-thread
594+
cancel of a running query; the raised exception must be in the
595+
OperationalError family, not ProgrammingError."""
596+
import threading
597+
import time
598+
599+
from databricks.sql.exc import ProgrammingError
600+
601+
cur = conn.cursor()
602+
603+
def cancel_after_delay():
604+
time.sleep(15.0)
605+
cur.cancel()
606+
607+
t = threading.Thread(target=cancel_after_delay)
608+
t.start()
609+
try:
610+
with pytest.raises(Exception) as exc_info:
611+
cur.execute(
612+
"SELECT count(*) FROM range(0, 1000000000000) "
613+
"WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1"
614+
)
615+
# The cancellation must not masquerade as a caller-argument
616+
# (ProgrammingError) error. It should be operational.
617+
assert not isinstance(exc_info.value, ProgrammingError)
618+
assert isinstance(exc_info.value, (OperationalError, DatabaseError))
619+
finally:
620+
t.join()
621+
cur.close()

tests/unit/test_kernel_client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,74 @@ def fake_execute():
572572
assert id(cursor) not in c._sync_cancellers
573573

574574

575+
def test_sync_execute_does_not_close_statement_on_success():
576+
"""On a successful sync execute(), the connector must NOT close the
577+
parent kernel Statement — the kernel now auto-closes the server
578+
statement when the result stream drains (with the executed handle's
579+
Drop as backstop). A premature close() here broke lazy CloudFetch
580+
chunk-link fetches for large paginated-link results."""
581+
c = _make_client()
582+
c._kernel_session = MagicMock()
583+
cursor = MagicMock()
584+
cursor.arraysize = 100
585+
cursor.buffer_size_bytes = 1024
586+
587+
stmt = MagicMock()
588+
stmt.execute.return_value = MagicMock(
589+
statement_id="stmt-id",
590+
arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])),
591+
)
592+
c._kernel_session.statement.return_value = stmt
593+
594+
c.execute_command(
595+
operation="SELECT 1",
596+
session_id=MagicMock(),
597+
max_rows=1,
598+
max_bytes=1,
599+
lz4_compression=False,
600+
cursor=cursor,
601+
use_cloud_fetch=False,
602+
parameters=[],
603+
async_op=False,
604+
enforce_embedded_schema_correctness=False,
605+
)
606+
607+
# The kernel owns the statement lifecycle post-execute; connector
608+
# leaves it alone (kernel auto-close-on-drain + Drop backstop).
609+
stmt.close.assert_not_called()
610+
611+
612+
def test_sync_execute_closes_statement_on_failure():
613+
"""On the error path (execute raised, no executed handle / result
614+
set produced), the connector still closes the parent Statement so
615+
it isn't leaked."""
616+
c = _make_client()
617+
c._kernel_session = MagicMock()
618+
cursor = MagicMock()
619+
cursor.arraysize = 100
620+
cursor.buffer_size_bytes = 1024
621+
622+
stmt = MagicMock()
623+
stmt.execute.side_effect = RuntimeError("boom")
624+
c._kernel_session.statement.return_value = stmt
625+
626+
with pytest.raises(Exception):
627+
c.execute_command(
628+
operation="SELECT 1",
629+
session_id=MagicMock(),
630+
max_rows=1,
631+
max_bytes=1,
632+
lz4_compression=False,
633+
cursor=cursor,
634+
use_cloud_fetch=False,
635+
parameters=[],
636+
async_op=False,
637+
enforce_embedded_schema_correctness=False,
638+
)
639+
640+
stmt.close.assert_called_once_with()
641+
642+
575643
def test_get_columns_accepts_none_catalog():
576644
"""The kernel's `list_columns` honours `catalog=None` by issuing
577645
`SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should

0 commit comments

Comments
 (0)