Skip to content

Commit f3923fa

Browse files
authored
Proactive server-side statement close when results consumed (#1444)
## Summary Proactively closes the server-side operation when all results have been consumed or the ResultSet is explicitly closed, while keeping the client-side Statement open for reuse. This reduces server resource usage and warehouse cost for slow consumers who don't explicitly close statements. ### When server operation is closed: 1. `next()` returns false — all rows consumed 2. `ResultSet.close()` called explicitly ### What stays open: - Client-side Statement — fully reusable for re-execution - `Statement.close()` becomes a no-op for the server RPC (operation already closed), just cleans up client state ### New flag: `serverOperationClosed` - Set after proactive `closeStatement` RPC - Prevents duplicate RPCs when `Statement.close()` is called later - Reset on re-execution (`resetForNewExecution()`) - Skipped when `directResultsReceived` is already true (server already closed) - Best-effort: server errors during proactive close are swallowed (logged at DEBUG) ## Test plan - [x] `testCloseServerOperation_closesServerAndSkipsRpcOnStatementClose` — proactive close sends RPC, Statement.close() skips it - [x] `testCloseServerOperation_idempotent` — double call sends only one RPC - [x] `testCloseServerOperation_skippedForDirectResults` — no-op when server already closed - [x] `testCloseServerOperation_skippedWhenNoStatementId` — no-op when no statement ID - [x] `testCloseServerOperation_resetsAfterFlagClear` — flag prevents all subsequent RPCs - [x] `testCloseServerOperation_errorSwallowed` — server errors don't propagate - [x] Full suite: 193 tests pass (DatabricksStatementTest + DatabricksResultSetTest + DatabricksConnectionTest) This pull request was AI-assisted by Isaac. --------- Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 9a366d3 commit f3923fa

5 files changed

Lines changed: 376 additions & 53 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ upgrading. These changes do not affect metadata on All-Purpose Clusters.
4343
- `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT).
4444
- Arrow schema deserialization failures (Thrift metadata path) now surface a dedicated driver error code `ARROW_SCHEMA_PARSING_ERROR` (vendor code `22000`) and a proper SQLSTATE `22000` (Data Exception) on the thrown `SQLException`, instead of the generic `RESULT_SET_ERROR` (1004) and the enum name as SQLSTATE. The exception message is unchanged.
4545
- When a Statement is re-executed, the previous server-side operation is now explicitly closed before starting the new execution, preventing orphaned server-side operations when Statements are reused.
46+
- Server-side operations are now closed proactively when `ResultSet.close()` is called, improving resource utilization. The client-side Statement remains open and reusable for re-execution. As a result, `getExecutionResult()` after result consumption returns the cached ResultSet instead of making a server RPC.
4647

4748
### Fixed
4849
- Fixed `DatabaseMetaData.getTables()` in Thrift mode returning rows when called with an empty `types` array. Per JDBC spec, empty types means "no types selected" and now correctly returns zero rows (matching SEA mode).

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,22 @@ public boolean next() throws SQLException {
288288

289289
@Override
290290
public void close() throws DatabricksSQLException {
291+
// Proactively close server operation when ResultSet is closed explicitly.
292+
closeServerOperation();
291293
isClosed = true;
292294
this.executionResult.close();
293295
if (parentStatement != null) {
294296
parentStatement.handleResultSetClose(this);
295297
}
296298
}
297299

300+
/** Proactively closes the server-side operation via the parent statement. */
301+
private void closeServerOperation() {
302+
if (parentStatement != null) {
303+
parentStatement.closeServerOperation();
304+
}
305+
}
306+
298307
private static TelemetryCollector resolveTelemetryCollector(
299308
IDatabricksStatementInternal parentStatement) {
300309
try {

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 93 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public class DatabricksStatement implements IDatabricksStatement, IDatabricksSta
5959
// closed the operation — no further RPCs for this statement ID are possible. The JDBC Statement
6060
// itself remains open for re-execution. Reset on each new execution. Volatile because cancel()
6161
// can be called from a different thread (JDBC spec requirement).
62+
private volatile boolean serverOperationClosed = false; // Client proactively closed the server
63+
// operation after all results were consumed or ResultSet was closed. Prevents duplicate
64+
// closeStatement RPC when Statement.close() is called later. Reset on each new execution.
6265
protected Boolean shouldReturnResultSet =
6366
null; // Cached result of shouldReturnResultSetWithConfig()
6467

@@ -149,20 +152,26 @@ public void close(boolean removeFromSession) throws DatabricksSQLException {
149152
LOGGER.warn(warningMsg);
150153
warnings = WarningUtil.addWarning(warnings, warningMsg);
151154
} else {
152-
// Skip server-side close if the server already closed the operation (direct results).
153-
// The operation handle is gone on the server side, so closeStatement would fail.
154-
if (!directResultsReceived) {
155-
this.connection.getSession().getDatabricksClient().closeStatement(statementId);
156-
} else {
157-
LOGGER.debug(
158-
"Statement {} closed locally (direct results — server operation already closed, "
159-
+ "skipping closeStatement RPC)",
160-
statementId);
161-
}
155+
// Close ResultSet first — this triggers proactive server close via
156+
// closeServerOperation() and sets serverOperationClosed=true, preventing
157+
// a duplicate closeStatement RPC below.
162158
if (resultSet != null) {
163159
this.resultSet.close();
164160
this.resultSet = null;
165161
}
162+
// Skip server-side close if operation was already closed:
163+
// - directResultsReceived: server closed it (inline results)
164+
// - serverOperationClosed: client proactively closed it (results consumed or RS closed)
165+
if (!directResultsReceived && !serverOperationClosed) {
166+
this.connection.getSession().getDatabricksClient().closeStatement(statementId);
167+
} else {
168+
LOGGER.debug(
169+
"Statement {} closed locally (server operation already closed — "
170+
+ "directResults={}, proactivelyClosed={}, skipping closeStatement RPC)",
171+
statementId,
172+
directResultsReceived,
173+
serverOperationClosed);
174+
}
166175
}
167176
} finally {
168177
// Always run cleanup even if resultSet.close() or closeStatement() throws.
@@ -246,12 +255,11 @@ public void cancel() throws SQLException {
246255
LOGGER.debug("public void cancel()");
247256
checkIfClosed();
248257

249-
if (statementId != null && !directResultsReceived) {
258+
if (statementId != null && !directResultsReceived && !serverOperationClosed) {
250259
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
251260
DatabricksThreadContextHolder.clearStatementInfo();
252-
} else if (directResultsReceived) {
253-
String warningMsg =
254-
"Statement's server operation was already closed (direct results); cancel has no effect.";
261+
} else if (directResultsReceived || serverOperationClosed) {
262+
String warningMsg = "Statement's server operation was already closed; cancel has no effect.";
255263
LOGGER.debug(warningMsg);
256264
warnings = WarningUtil.addWarning(warnings, warningMsg);
257265
} else {
@@ -694,17 +702,18 @@ public ResultSet getExecutionResult() throws SQLException {
694702
"No execution available for statement", DatabricksDriverErrorCode.INPUT_VALIDATION_ERROR);
695703
}
696704

697-
// For direct results, the server already closed the operation — making an RPC
698-
// would return "not found". Return the cached result set instead.
699-
if (directResultsReceived) {
705+
// For direct results or proactively closed operations, the server operation is gone —
706+
// making an RPC would return "not found". Return the cached result set instead.
707+
if (directResultsReceived || serverOperationClosed) {
700708
if (resultSet != null) {
701709
LOGGER.debug(
702-
"Returning cached result for statement {} (direct results received)", statementId);
710+
"Returning cached result for statement {} (server operation already closed)",
711+
statementId);
703712
return resultSet;
704713
}
705714
throw new DatabricksSQLException(
706-
"Direct results were received but no result set is available. "
707-
+ "The server closed the operation and no further results can be fetched.",
715+
"Server operation was already closed and no result set is available. "
716+
+ "No further results can be fetched.",
708717
DatabricksDriverErrorCode.INVALID_STATE);
709718
}
710719

@@ -953,6 +962,38 @@ public void markDirectResultsReceived() {
953962
this.directResultsReceived = true;
954963
}
955964

965+
/**
966+
* Proactively closes the server-side operation to release server resources while keeping the
967+
* client-side Statement open for reuse. Called when all results have been consumed (next()
968+
* returns false) or when ResultSet.close() is called.
969+
*
970+
* <p>After this call, {@link #close(boolean)} will skip the closeStatement RPC since the server
971+
* operation is already closed. The Statement can still be re-executed.
972+
*/
973+
@Override
974+
public void closeServerOperation() {
975+
if (serverOperationClosed || directResultsReceived || statementId == null || isClosed) {
976+
return;
977+
}
978+
try {
979+
this.connection.getSession().getDatabricksClient().closeStatement(statementId);
980+
// Only mark closed on success — if the RPC fails (transient network error),
981+
// Statement.close() should retry the closeStatement RPC rather than skip it,
982+
// to avoid leaving the server operation alive until session timeout.
983+
this.serverOperationClosed = true;
984+
LOGGER.debug(
985+
"Proactively closed server operation for statement {} (results consumed)", statementId);
986+
} catch (SQLException | RuntimeException e) {
987+
// Best-effort — don't fail the user's close for a server cleanup failure.
988+
// serverOperationClosed stays false so Statement.close() will retry the RPC.
989+
LOGGER.warn(
990+
"Failed to proactively close server operation for statement {}: {}",
991+
statementId,
992+
e.getMessage(),
993+
e);
994+
}
995+
}
996+
956997
/**
957998
* Resets statement state before a new execution (sync or async). Closes the previous server-side
958999
* operation handle (if still open) and the local ResultSet, clears flags, and nulls the
@@ -962,39 +1003,36 @@ private void resetForNewExecution() {
9621003
noMoreResults = false;
9631004
updateCount = -1;
9641005

965-
// Close the previous server-side operation if it exists. This prevents resource
966-
// leaks when a Statement is re-executed (e.g., PreparedStatement in a loop).
967-
// This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and
968-
// Databricks Python SQL Connector — all close the previous operation on re-execute.
969-
//
970-
// Note on directResultsReceived: we check the flag value from the PREVIOUS execution
971-
// here. The flag is reset to false below, after this close attempt.
972-
//
973-
// Note on latency: this close is synchronous (adds one RPC round-trip before the next
974-
// execution). This is consistent with pgJDBC's closeForNextExecution() which is also
975-
// synchronous. The correctness benefit (no orphaned server operations) outweighs the
976-
// latency cost for typical usage patterns.
977-
//
978-
// Skip if: (1) no previous execution (statementId==null), or
979-
// (2) server already closed the operation (direct results).
980-
if (statementId != null && !directResultsReceived) {
981-
try {
982-
connection.getSession().getDatabricksClient().closeStatement(statementId);
983-
} catch (Exception e) {
984-
// Don't block re-execution if closing the previous operation fails.
985-
// This covers: network errors, operation already expired/evicted on server,
986-
// and transport-level errors (e.g., unexpected server responses).
987-
// The new execution will create a fresh operation with a new statementId.
988-
LOGGER.debug(
989-
"Failed to close previous server operation {} during re-execution: {}",
990-
statementId,
991-
e.getMessage());
992-
}
1006+
// Close the previous server-side operation if still open. Fire-and-forget on a
1007+
// daemon thread so the new execution is not blocked by the close RPC latency.
1008+
// If the user closed the ResultSet before re-executing (best practice), the
1009+
// proactive close already set serverOperationClosed=true and this is a no-op.
1010+
if (statementId != null && !directResultsReceived && !serverOperationClosed) {
1011+
final StatementId prevStatementId = statementId;
1012+
final IDatabricksClient prevClient = connection.getSession().getDatabricksClient();
1013+
Thread closeThread =
1014+
new Thread(
1015+
() -> {
1016+
try {
1017+
prevClient.closeStatement(prevStatementId);
1018+
LOGGER.debug(
1019+
"Closed previous server operation {} during re-execution", prevStatementId);
1020+
} catch (SQLException | RuntimeException e) {
1021+
LOGGER.warn(
1022+
"Failed to close previous server operation {} during re-execution: {}",
1023+
prevStatementId,
1024+
e.getMessage(),
1025+
e);
1026+
}
1027+
});
1028+
closeThread.setDaemon(true);
1029+
closeThread.setName("close-stmt-" + prevStatementId);
1030+
closeThread.start();
1031+
serverOperationClosed = true;
9931032
}
9941033

995-
directResultsReceived = false;
996-
997-
// Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet.
1034+
// Close the previous ResultSet. closeServerOperation() inside resultSet.close()
1035+
// is a no-op since serverOperationClosed was set above.
9981036
if (resultSet != null) {
9991037
try {
10001038
resultSet.close();
@@ -1004,6 +1042,10 @@ private void resetForNewExecution() {
10041042
resultSet = null;
10051043
}
10061044

1045+
// Reset flags AFTER closing old ResultSet
1046+
directResultsReceived = false;
1047+
serverOperationClosed = false;
1048+
10071049
// Null out statementId so that if the new execution fails before setStatementId(),
10081050
// close() takes the statementId==null branch instead of sending closeStatement(stale-id)
10091051
statementId = null;

src/main/java/com/databricks/jdbc/api/internal/IDatabricksStatementInternal.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,15 @@ public interface IDatabricksStatementInternal {
3737
default void markDirectResultsReceived() {
3838
// no-op by default
3939
}
40+
41+
/**
42+
* Proactively closes the server-side operation to release server resources while keeping the
43+
* client-side Statement open for reuse. Default no-op for implementations that don't support
44+
* proactive close.
45+
*/
46+
default void closeServerOperation() {
47+
// no-op by default
48+
}
49+
50+
long getLargeMaxRows() throws DatabricksSQLException;
4051
}

0 commit comments

Comments
 (0)