diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index e7b2d83d4..2c54a3ca8 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -43,6 +43,7 @@ upgrading. These changes do not affect metadata on All-Purpose Clusters. - `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT). - Arrow schema deserialization failures (Thrift metadata path) now surface a dedicated driver error code `ARROW_SCHEMA_PARSING_ERROR` (vendor code `22000`) and a proper SQLSTATE `22000` (Data Exception) on the thrown `SQLException`, instead of the generic `RESULT_SET_ERROR` (1004) and the enum name as SQLSTATE. The exception message is unchanged. - When a Statement is re-executed, the previous server-side operation is now explicitly closed before starting the new execution, preventing orphaned server-side operations when Statements are reused. +- Server-side operations are now closed proactively when `ResultSet.close()` is called, improving resource utilization. The client-side Statement remains open and reusable for re-execution. As a result, `getExecutionResult()` after result consumption returns the cached ResultSet instead of making a server RPC. ### Fixed - Fixed `DatabaseMetaData.getTables()` in Thrift mode returning rows when called with an empty `types` array. Per JDBC spec, empty types means "no types selected" and now correctly returns zero rows (matching SEA mode). diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index a83956528..db436247c 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -288,6 +288,8 @@ public boolean next() throws SQLException { @Override public void close() throws DatabricksSQLException { + // Proactively close server operation when ResultSet is closed explicitly. + closeServerOperation(); isClosed = true; this.executionResult.close(); if (parentStatement != null) { @@ -295,6 +297,13 @@ public void close() throws DatabricksSQLException { } } + /** Proactively closes the server-side operation via the parent statement. */ + private void closeServerOperation() { + if (parentStatement != null) { + parentStatement.closeServerOperation(); + } + } + private static TelemetryCollector resolveTelemetryCollector( IDatabricksStatementInternal parentStatement) { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index 0afe1c26f..42c10b18e 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -59,6 +59,9 @@ public class DatabricksStatement implements IDatabricksStatement, IDatabricksSta // closed the operation — no further RPCs for this statement ID are possible. The JDBC Statement // itself remains open for re-execution. Reset on each new execution. Volatile because cancel() // can be called from a different thread (JDBC spec requirement). + private volatile boolean serverOperationClosed = false; // Client proactively closed the server + // operation after all results were consumed or ResultSet was closed. Prevents duplicate + // closeStatement RPC when Statement.close() is called later. Reset on each new execution. protected Boolean shouldReturnResultSet = null; // Cached result of shouldReturnResultSetWithConfig() @@ -149,20 +152,26 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { LOGGER.warn(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); } else { - // Skip server-side close if the server already closed the operation (direct results). - // The operation handle is gone on the server side, so closeStatement would fail. - if (!directResultsReceived) { - this.connection.getSession().getDatabricksClient().closeStatement(statementId); - } else { - LOGGER.debug( - "Statement {} closed locally (direct results — server operation already closed, " - + "skipping closeStatement RPC)", - statementId); - } + // Close ResultSet first — this triggers proactive server close via + // closeServerOperation() and sets serverOperationClosed=true, preventing + // a duplicate closeStatement RPC below. if (resultSet != null) { this.resultSet.close(); this.resultSet = null; } + // Skip server-side close if operation was already closed: + // - directResultsReceived: server closed it (inline results) + // - serverOperationClosed: client proactively closed it (results consumed or RS closed) + if (!directResultsReceived && !serverOperationClosed) { + this.connection.getSession().getDatabricksClient().closeStatement(statementId); + } else { + LOGGER.debug( + "Statement {} closed locally (server operation already closed — " + + "directResults={}, proactivelyClosed={}, skipping closeStatement RPC)", + statementId, + directResultsReceived, + serverOperationClosed); + } } } finally { // Always run cleanup even if resultSet.close() or closeStatement() throws. @@ -246,12 +255,11 @@ public void cancel() throws SQLException { LOGGER.debug("public void cancel()"); checkIfClosed(); - if (statementId != null && !directResultsReceived) { + if (statementId != null && !directResultsReceived && !serverOperationClosed) { this.connection.getSession().getDatabricksClient().cancelStatement(statementId); DatabricksThreadContextHolder.clearStatementInfo(); - } else if (directResultsReceived) { - String warningMsg = - "Statement's server operation was already closed (direct results); cancel has no effect."; + } else if (directResultsReceived || serverOperationClosed) { + String warningMsg = "Statement's server operation was already closed; cancel has no effect."; LOGGER.debug(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); } else { @@ -694,17 +702,18 @@ public ResultSet getExecutionResult() throws SQLException { "No execution available for statement", DatabricksDriverErrorCode.INPUT_VALIDATION_ERROR); } - // For direct results, the server already closed the operation — making an RPC - // would return "not found". Return the cached result set instead. - if (directResultsReceived) { + // For direct results or proactively closed operations, the server operation is gone — + // making an RPC would return "not found". Return the cached result set instead. + if (directResultsReceived || serverOperationClosed) { if (resultSet != null) { LOGGER.debug( - "Returning cached result for statement {} (direct results received)", statementId); + "Returning cached result for statement {} (server operation already closed)", + statementId); return resultSet; } throw new DatabricksSQLException( - "Direct results were received but no result set is available. " - + "The server closed the operation and no further results can be fetched.", + "Server operation was already closed and no result set is available. " + + "No further results can be fetched.", DatabricksDriverErrorCode.INVALID_STATE); } @@ -953,6 +962,38 @@ public void markDirectResultsReceived() { this.directResultsReceived = true; } + /** + * Proactively closes the server-side operation to release server resources while keeping the + * client-side Statement open for reuse. Called when all results have been consumed (next() + * returns false) or when ResultSet.close() is called. + * + *

After this call, {@link #close(boolean)} will skip the closeStatement RPC since the server + * operation is already closed. The Statement can still be re-executed. + */ + @Override + public void closeServerOperation() { + if (serverOperationClosed || directResultsReceived || statementId == null || isClosed) { + return; + } + try { + this.connection.getSession().getDatabricksClient().closeStatement(statementId); + // Only mark closed on success — if the RPC fails (transient network error), + // Statement.close() should retry the closeStatement RPC rather than skip it, + // to avoid leaving the server operation alive until session timeout. + this.serverOperationClosed = true; + LOGGER.debug( + "Proactively closed server operation for statement {} (results consumed)", statementId); + } catch (SQLException | RuntimeException e) { + // Best-effort — don't fail the user's close for a server cleanup failure. + // serverOperationClosed stays false so Statement.close() will retry the RPC. + LOGGER.warn( + "Failed to proactively close server operation for statement {}: {}", + statementId, + e.getMessage(), + e); + } + } + /** * Resets statement state before a new execution (sync or async). Closes the previous server-side * operation handle (if still open) and the local ResultSet, clears flags, and nulls the @@ -962,39 +1003,36 @@ private void resetForNewExecution() { noMoreResults = false; updateCount = -1; - // Close the previous server-side operation if it exists. This prevents resource - // leaks when a Statement is re-executed (e.g., PreparedStatement in a loop). - // This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and - // Databricks Python SQL Connector — all close the previous operation on re-execute. - // - // Note on directResultsReceived: we check the flag value from the PREVIOUS execution - // here. The flag is reset to false below, after this close attempt. - // - // Note on latency: this close is synchronous (adds one RPC round-trip before the next - // execution). This is consistent with pgJDBC's closeForNextExecution() which is also - // synchronous. The correctness benefit (no orphaned server operations) outweighs the - // latency cost for typical usage patterns. - // - // Skip if: (1) no previous execution (statementId==null), or - // (2) server already closed the operation (direct results). - if (statementId != null && !directResultsReceived) { - try { - connection.getSession().getDatabricksClient().closeStatement(statementId); - } catch (Exception e) { - // Don't block re-execution if closing the previous operation fails. - // This covers: network errors, operation already expired/evicted on server, - // and transport-level errors (e.g., unexpected server responses). - // The new execution will create a fresh operation with a new statementId. - LOGGER.debug( - "Failed to close previous server operation {} during re-execution: {}", - statementId, - e.getMessage()); - } + // Close the previous server-side operation if still open. Fire-and-forget on a + // daemon thread so the new execution is not blocked by the close RPC latency. + // If the user closed the ResultSet before re-executing (best practice), the + // proactive close already set serverOperationClosed=true and this is a no-op. + if (statementId != null && !directResultsReceived && !serverOperationClosed) { + final StatementId prevStatementId = statementId; + final IDatabricksClient prevClient = connection.getSession().getDatabricksClient(); + Thread closeThread = + new Thread( + () -> { + try { + prevClient.closeStatement(prevStatementId); + LOGGER.debug( + "Closed previous server operation {} during re-execution", prevStatementId); + } catch (SQLException | RuntimeException e) { + LOGGER.warn( + "Failed to close previous server operation {} during re-execution: {}", + prevStatementId, + e.getMessage(), + e); + } + }); + closeThread.setDaemon(true); + closeThread.setName("close-stmt-" + prevStatementId); + closeThread.start(); + serverOperationClosed = true; } - directResultsReceived = false; - - // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. + // Close the previous ResultSet. closeServerOperation() inside resultSet.close() + // is a no-op since serverOperationClosed was set above. if (resultSet != null) { try { resultSet.close(); @@ -1004,6 +1042,10 @@ private void resetForNewExecution() { resultSet = null; } + // Reset flags AFTER closing old ResultSet + directResultsReceived = false; + serverOperationClosed = false; + // Null out statementId so that if the new execution fails before setStatementId(), // close() takes the statementId==null branch instead of sending closeStatement(stale-id) statementId = null; diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksStatementInternal.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksStatementInternal.java index ccb6f035e..965a971e7 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksStatementInternal.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksStatementInternal.java @@ -37,4 +37,15 @@ public interface IDatabricksStatementInternal { default void markDirectResultsReceived() { // no-op by default } + + /** + * Proactively closes the server-side operation to release server resources while keeping the + * client-side Statement open for reuse. Default no-op for implementations that don't support + * proactive close. + */ + default void closeServerOperation() { + // no-op by default + } + + long getLargeMaxRows() throws DatabricksSQLException; } diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index 1b56abedd..a4bab6411 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -16,6 +16,7 @@ import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException; import com.databricks.jdbc.model.core.StatementStatus; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; import com.databricks.sdk.service.sql.StatementState; import java.io.InputStream; import java.sql.*; @@ -1064,10 +1065,10 @@ public void testReExecutionClosesServerOperation() throws Exception { StatementId firstStatementId = new StatementId("first-stmt-id"); statement.setStatementId(firstStatementId); - // Second execution — should close the first server operation + // Second execution — should close the first server operation asynchronously statement.executeQuery(STATEMENT); - verify(client, times(1)).closeStatement(eq(firstStatementId)); + verify(client, timeout(5000).times(1)).closeStatement(eq(firstStatementId)); verify(firstResult, times(1)).close(); assertEquals(secondResult, statement.getResultSet()); } @@ -1138,6 +1139,8 @@ public void testReExecutionHandlesCloseFailureGracefully() throws Exception { // Re-execution should succeed even though closing previous operation failed assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); assertEquals(secondResult, statement.getResultSet()); + // Verify the async close was attempted + verify(client, timeout(5000).times(1)).closeStatement(eq(firstStatementId)); } @Test @@ -1176,6 +1179,8 @@ public void testReExecutionHandlesTransportErrorGracefully() throws Exception { // The new execution creates a fresh server operation with a new statementId. assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); assertEquals(secondResult, statement.getResultSet()); + // Verify the async close was attempted + verify(client, timeout(5000).times(1)).closeStatement(eq(firstStatementId)); } @Test @@ -1823,4 +1828,259 @@ public void testGetResultSet_WithNonRowcountQueryPrefixes_ReturnsResultSet() thr statement.close(); } + + // ========================================================================= + // Proactive server operation close + // ========================================================================= + + @Test + public void testCloseServerOperation_closesServerAndSkipsRpcOnStatementClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Proactively close the server operation (simulates next() returning false) + statement.closeServerOperation(); + + // Statement should still be open + assertFalse(statement.isClosed()); + + // closeStatement should have been called once (proactive close) + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Now close the statement — should NOT call closeStatement again + statement.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); // still 1, not 2 + } + + @Test + public void testCloseServerOperation_idempotent() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Call twice — should only close once + statement.closeServerOperation(); + statement.closeServerOperation(); + + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_skippedForDirectResults() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + statement.markDirectResultsReceived(); + + // Should be a no-op since directResults already closed the server operation + statement.closeServerOperation(); + + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_skippedWhenNoStatementId() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + // statementId is null — should be a no-op + + statement.closeServerOperation(); + + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_resetsAfterFlagClear() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // First proactive close + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Second call is no-op (flag set) + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Statement.close() is also no-op for server RPC (flag set) + statement.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_errorSwallowed() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Server close fails — should not throw + doThrow(new DatabricksSQLException("Server error", DatabricksDriverErrorCode.SDK_CLIENT_ERROR)) + .when(client) + .closeStatement(STATEMENT_ID); + + assertDoesNotThrow(() -> statement.closeServerOperation()); + + // Flag should NOT be set on failure — Statement.close() should retry + verify(client, times(1)).closeStatement(STATEMENT_ID); + // The second closeServerOperation call should retry since flag wasn't set + reset(client); // clear the throw stub + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_cancelSkipsAfterProactiveClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Proactively close server operation + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // cancel() should be a no-op — server operation already closed + statement.cancel(); + verify(client, never()).cancelStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_getExecutionResultReturnsCachedAfterClose() + throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + // Set resultSet field via reflection to simulate executeQuery having run + java.lang.reflect.Field rsField = DatabricksStatement.class.getDeclaredField("resultSet"); + rsField.setAccessible(true); + rsField.set(statement, resultSet); + + // Proactively close server operation + statement.closeServerOperation(); + + // getExecutionResult() should return cached result, not make RPC + ResultSet cached = statement.getExecutionResult(); + assertNotNull(cached); + assertEquals(resultSet, cached); + verify(client, never()) + .getStatementResult(any(StatementId.class), any(IDatabricksSession.class), any()); + } + + @Test + public void testStatementReusableAfterProactiveClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(resultSet); + + // First execution + statement.executeQuery(STATEMENT); + statement.closeServerOperation(); + assertFalse(statement.isClosed(), "Statement should stay open after proactive close"); + + // Re-execute — should work, flag reset by resetForNewExecution + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Both executions should have called executeStatement + verify(client, times(2)) + .executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any()); + } + + @Test + public void testResultSetClose_triggersProactiveServerClose() throws Exception { + // Verify that ResultSet.close() triggers closeServerOperation on the parent Statement, + // and that Statement.close() then skips the duplicate RPC. + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + IExecutionResult execResult = mock(IExecutionResult.class); + DatabricksResultSetMetaData rsMeta = mock(DatabricksResultSetMetaData.class); + DatabricksResultSet resultSet = + new DatabricksResultSet( + new StatementStatus().setState(StatementState.SUCCEEDED), + STATEMENT_ID, + StatementType.QUERY, + statement, + execResult, + rsMeta, + false); + statement.setStatementId(STATEMENT_ID); + statement.resultSet = resultSet; + + // Close ResultSet — should trigger proactive server close + resultSet.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Close Statement — should skip RPC since server operation already closed + statement.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); // still 1, not 2 + } + + @Test + public void testStatementClose_noDoubleRpc_whenResultSetNotClosed() throws Exception { + // Verify that Statement.close() with a non-closed ResultSet fires exactly one RPC: + // ResultSet.close() inside Statement.close() triggers proactive close, and the + // subsequent check sees serverOperationClosed=true and skips. + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + IExecutionResult execResult = mock(IExecutionResult.class); + DatabricksResultSetMetaData rsMeta = mock(DatabricksResultSetMetaData.class); + DatabricksResultSet resultSet = + new DatabricksResultSet( + new StatementStatus().setState(StatementState.SUCCEEDED), + STATEMENT_ID, + StatementType.QUERY, + statement, + execResult, + rsMeta, + false); + statement.setStatementId(STATEMENT_ID); + statement.resultSet = resultSet; + + // Close Statement directly (without closing ResultSet first) + statement.close(); + + // Should fire exactly one closeStatement RPC, not two + verify(client, times(1)).closeStatement(STATEMENT_ID); + } }