-
Notifications
You must be signed in to change notification settings - Fork 40
Proactive server-side statement close when results consumed #1444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0243311
863e9ee
cf2f6b3
a709568
113d579
3e88c36
4a06513
376b38a
27d1d6a
cfd2db3
bbcf049
a239225
5f852d2
17fb42e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,9 @@ public class DatabricksStatement implements IDatabricksStatement, IDatabricksSta | |
| // closed the operation — no further RPCs for this statement ID are possible. The JDBC Statement | ||
| // itself remains open for re-execution. Reset on each new execution. Volatile because cancel() | ||
| // can be called from a different thread (JDBC spec requirement). | ||
| private volatile boolean serverOperationClosed = false; // Client proactively closed the server | ||
| // operation after all results were consumed or ResultSet was closed. Prevents duplicate | ||
| // closeStatement RPC when Statement.close() is called later. Reset on each new execution. | ||
| protected Boolean shouldReturnResultSet = | ||
| null; // Cached result of shouldReturnResultSetWithConfig() | ||
|
|
||
|
|
@@ -149,20 +152,26 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { | |
| LOGGER.warn(warningMsg); | ||
| warnings = WarningUtil.addWarning(warnings, warningMsg); | ||
| } else { | ||
| // Skip server-side close if the server already closed the operation (direct results). | ||
| // The operation handle is gone on the server side, so closeStatement would fail. | ||
| if (!directResultsReceived) { | ||
| this.connection.getSession().getDatabricksClient().closeStatement(statementId); | ||
| } else { | ||
| LOGGER.debug( | ||
| "Statement {} closed locally (direct results — server operation already closed, " | ||
| + "skipping closeStatement RPC)", | ||
| statementId); | ||
| } | ||
| // Close ResultSet first — this triggers proactive server close via | ||
| // closeServerOperation() and sets serverOperationClosed=true, preventing | ||
| // a duplicate closeStatement RPC below. | ||
| if (resultSet != null) { | ||
| this.resultSet.close(); | ||
| this.resultSet = null; | ||
| } | ||
| // Skip server-side close if operation was already closed: | ||
| // - directResultsReceived: server closed it (inline results) | ||
| // - serverOperationClosed: client proactively closed it (results consumed or RS closed) | ||
| if (!directResultsReceived && !serverOperationClosed) { | ||
| this.connection.getSession().getDatabricksClient().closeStatement(statementId); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Double closeStatement RPC regression (flagged by language + devils-advocate reviewers, verified by orchestrator) Trace when user calls
The duplicate is silently swallowed by Suggested fix: set Also add a regression test that calls
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Reordered Added |
||
| } else { | ||
| LOGGER.debug( | ||
| "Statement {} closed locally (server operation already closed — " | ||
| + "directResults={}, proactivelyClosed={}, skipping closeStatement RPC)", | ||
| statementId, | ||
| directResultsReceived, | ||
| serverOperationClosed); | ||
| } | ||
| } | ||
| } finally { | ||
| // Always run cleanup even if resultSet.close() or closeStatement() throws. | ||
|
|
@@ -246,12 +255,11 @@ public void cancel() throws SQLException { | |
| LOGGER.debug("public void cancel()"); | ||
| checkIfClosed(); | ||
|
|
||
| if (statementId != null && !directResultsReceived) { | ||
| if (statementId != null && !directResultsReceived && !serverOperationClosed) { | ||
| this.connection.getSession().getDatabricksClient().cancelStatement(statementId); | ||
| DatabricksThreadContextHolder.clearStatementInfo(); | ||
| } else if (directResultsReceived) { | ||
| String warningMsg = | ||
| "Statement's server operation was already closed (direct results); cancel has no effect."; | ||
| } else if (directResultsReceived || serverOperationClosed) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 LOW — Pre-PR
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is consistent with JDBC spec — cancelling a completed statement is a no-op. The warning provides signal. Not adding to changelog since it's spec-compliant behavior. |
||
| String warningMsg = "Statement's server operation was already closed; cancel has no effect."; | ||
| LOGGER.debug(warningMsg); | ||
| warnings = WarningUtil.addWarning(warnings, warningMsg); | ||
| } else { | ||
|
|
@@ -694,17 +702,18 @@ public ResultSet getExecutionResult() throws SQLException { | |
| "No execution available for statement", DatabricksDriverErrorCode.INPUT_VALIDATION_ERROR); | ||
| } | ||
|
|
||
| // For direct results, the server already closed the operation — making an RPC | ||
| // would return "not found". Return the cached result set instead. | ||
| if (directResultsReceived) { | ||
| // For direct results or proactively closed operations, the server operation is gone — | ||
| // making an RPC would return "not found". Return the cached result set instead. | ||
| if (directResultsReceived || serverOperationClosed) { | ||
| if (resultSet != null) { | ||
| LOGGER.debug( | ||
| "Returning cached result for statement {} (direct results received)", statementId); | ||
| "Returning cached result for statement {} (server operation already closed)", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MEDIUM — Behavior change not documented (flagged by devils-advocate, architecture) Pre-PR: a second This is arguably an improvement (the previous behavior was murky), but it IS a behavior change visible to any code that called Also: the error message at line 712-714 was changed from "Direct results were received but no result set is available" to the more generic "Server operation was already closed and no result set is available." Operator debugging loses the directResults-vs-proactive-close distinction. Consider including the specific cause in the error.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated NEXT_CHANGELOG.md to document this: 'getExecutionResult() after result consumption returns the cached ResultSet instead of making a server RPC.' |
||
| statementId); | ||
| return resultSet; | ||
| } | ||
| throw new DatabricksSQLException( | ||
| "Direct results were received but no result set is available. " | ||
| + "The server closed the operation and no further results can be fetched.", | ||
| "Server operation was already closed and no result set is available. " | ||
| + "No further results can be fetched.", | ||
| DatabricksDriverErrorCode.INVALID_STATE); | ||
| } | ||
|
|
||
|
|
@@ -953,6 +962,38 @@ public void markDirectResultsReceived() { | |
| this.directResultsReceived = true; | ||
| } | ||
|
|
||
| /** | ||
| * Proactively closes the server-side operation to release server resources while keeping the | ||
| * client-side Statement open for reuse. Called when all results have been consumed (next() | ||
| * returns false) or when ResultSet.close() is called. | ||
| * | ||
| * <p>After this call, {@link #close(boolean)} will skip the closeStatement RPC since the server | ||
| * operation is already closed. The Statement can still be re-executed. | ||
| */ | ||
| @Override | ||
| public void closeServerOperation() { | ||
| if (serverOperationClosed || directResultsReceived || statementId == null || isClosed) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MEDIUM — Concurrency: volatile flag is necessary but not sufficient (flagged by devils-advocate, language; security marks server-idempotent) The
Server-side these are idempotent, but the test Fix: gate the RPC with
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deferred. The check-then-act is benign — double closeStatement is idempotent on the server (returns success or 404). CAS would add complexity for no user-visible benefit. |
||
| return; | ||
| } | ||
| try { | ||
| this.connection.getSession().getDatabricksClient().closeStatement(statementId); | ||
| // Only mark closed on success — if the RPC fails (transient network error), | ||
| // Statement.close() should retry the closeStatement RPC rather than skip it, | ||
| // to avoid leaving the server operation alive until session timeout. | ||
| this.serverOperationClosed = true; | ||
| LOGGER.debug( | ||
| "Proactively closed server operation for statement {} (results consumed)", statementId); | ||
| } catch (SQLException | RuntimeException e) { | ||
| // Best-effort — don't fail the user's close for a server cleanup failure. | ||
| // serverOperationClosed stays false so Statement.close() will retry the RPC. | ||
| LOGGER.warn( | ||
| "Failed to proactively close server operation for statement {}: {}", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 HIGH — Operational visibility & rollout safety (flagged by ops, devils-advocate, language)
Fix:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed. Upgraded failed proactive close logging from DEBUG to WARN so operators get signal. Also narrowed catch to |
||
| statementId, | ||
| e.getMessage(), | ||
| e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Resets statement state before a new execution (sync or async). Closes the previous server-side | ||
| * operation handle (if still open) and the local ResultSet, clears flags, and nulls the | ||
|
|
@@ -962,39 +1003,36 @@ private void resetForNewExecution() { | |
| noMoreResults = false; | ||
| updateCount = -1; | ||
|
|
||
| // Close the previous server-side operation if it exists. This prevents resource | ||
| // leaks when a Statement is re-executed (e.g., PreparedStatement in a loop). | ||
| // This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and | ||
| // Databricks Python SQL Connector — all close the previous operation on re-execute. | ||
| // | ||
| // Note on directResultsReceived: we check the flag value from the PREVIOUS execution | ||
| // here. The flag is reset to false below, after this close attempt. | ||
| // | ||
| // Note on latency: this close is synchronous (adds one RPC round-trip before the next | ||
| // execution). This is consistent with pgJDBC's closeForNextExecution() which is also | ||
| // synchronous. The correctness benefit (no orphaned server operations) outweighs the | ||
| // latency cost for typical usage patterns. | ||
| // | ||
| // Skip if: (1) no previous execution (statementId==null), or | ||
| // (2) server already closed the operation (direct results). | ||
| if (statementId != null && !directResultsReceived) { | ||
| try { | ||
| connection.getSession().getDatabricksClient().closeStatement(statementId); | ||
| } catch (Exception e) { | ||
| // Don't block re-execution if closing the previous operation fails. | ||
| // This covers: network errors, operation already expired/evicted on server, | ||
| // and transport-level errors (e.g., unexpected server responses). | ||
| // The new execution will create a fresh operation with a new statementId. | ||
| LOGGER.debug( | ||
| "Failed to close previous server operation {} during re-execution: {}", | ||
| statementId, | ||
| e.getMessage()); | ||
| } | ||
| // Close the previous server-side operation if still open. Fire-and-forget on a | ||
| // daemon thread so the new execution is not blocked by the close RPC latency. | ||
| // If the user closed the ResultSet before re-executing (best practice), the | ||
| // proactive close already set serverOperationClosed=true and this is a no-op. | ||
| if (statementId != null && !directResultsReceived && !serverOperationClosed) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 HIGH — The fix for F2 (removing the inline close from if (statementId != null && !directResultsReceived && !serverOperationClosed) {
try {
connection.getSession().getDatabricksClient().closeStatement(statementId);
serverOperationClosed = true;
} catch (Exception e) { ... }
}Concerns:
Suggested fix:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch on concern 3 — applied the same F3/F10 treatment here: narrowed to
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All four concerns addressed in commits 5f852d2 and 17fb42e:
|
||
| final StatementId prevStatementId = statementId; | ||
| final IDatabricksClient prevClient = connection.getSession().getDatabricksClient(); | ||
| Thread closeThread = | ||
| new Thread( | ||
| () -> { | ||
| try { | ||
| prevClient.closeStatement(prevStatementId); | ||
| LOGGER.debug( | ||
| "Closed previous server operation {} during re-execution", prevStatementId); | ||
| } catch (SQLException | RuntimeException e) { | ||
| LOGGER.warn( | ||
| "Failed to close previous server operation {} during re-execution: {}", | ||
| prevStatementId, | ||
| e.getMessage(), | ||
| e); | ||
| } | ||
| }); | ||
| closeThread.setDaemon(true); | ||
| closeThread.setName("close-stmt-" + prevStatementId); | ||
| closeThread.start(); | ||
| serverOperationClosed = true; | ||
| } | ||
|
|
||
| directResultsReceived = false; | ||
|
|
||
| // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. | ||
| // Close the previous ResultSet. closeServerOperation() inside resultSet.close() | ||
| // is a no-op since serverOperationClosed was set above. | ||
| if (resultSet != null) { | ||
| try { | ||
| resultSet.close(); | ||
|
|
@@ -1004,6 +1042,10 @@ private void resetForNewExecution() { | |
| resultSet = null; | ||
| } | ||
|
|
||
| // Reset flags AFTER closing old ResultSet | ||
| directResultsReceived = false; | ||
| serverOperationClosed = false; | ||
|
|
||
| // Null out statementId so that if the new execution fails before setStatementId(), | ||
| // close() takes the statementId==null branch instead of sending closeStatement(stale-id) | ||
| statementId = null; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 MEDIUM — Redundant flags (flagged by maintainability + architecture)
Both
directResultsReceivedandserverOperationClosedare checked together in every predicate now:close()(line 158),cancel()(line 255),getExecutionResult()(line 704),closeServerOperation()itself (line 971). They both express "the server operation handle is gone." Risk: a future code path checks only one of them.First step (low-risk) — extract a predicate:
Better follow-up — collapse to a single
serverOperationLiveflag and store the reason (SERVER_INLINEvsCLIENT_PROACTIVE) as separate enum if needed for telemetry.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged. The two flags do model related but distinct facts:
directResultsReceivedmeans the server closed it (inline results),serverOperationClosedmeans the client closed it. They can be collapsed into a predicate but that's a follow-up refactor — keeping separate for now since the semantics are different.