Skip to content

Commit c4a190d

Browse files
committed
Merge remote-tracking branch 'upstream/main' into design/heartbeat-keep-alive
2 parents 75be9b7 + 722d80a commit c4a190d

3 files changed

Lines changed: 173 additions & 8 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
### Updated
1919
- `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT).
2020
- Arrow schema deserialization failures (Thrift metadata path) now surface a dedicated driver error code `ARROW_SCHEMA_PARSING_ERROR` (vendor code `22000`) and a proper SQLSTATE `22000` (Data Exception) on the thrown `SQLException`, instead of the generic `RESULT_SET_ERROR` (1004) and the enum name as SQLSTATE. The exception message is unchanged.
21+
- When a Statement is re-executed, the previous server-side operation is now explicitly closed before starting the new execution, preventing orphaned server-side operations when Statements are reused.
2122

2223
### Fixed
2324
- Fixed `?` characters inside SQL comments, string literals, and quoted identifiers being incorrectly counted as parameter placeholders when `supportManyParameters=1`. `SQLInterpolator` now uses `SqlCommentParser` to locate only real placeholders. Fixes #1331.

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -980,12 +980,35 @@ private void resetForNewExecution() {
980980
noMoreResults = false;
981981
updateCount = -1;
982982

983-
// Per JDBC spec, re-execution does not explicitly close the previous server-side
984-
// operation handle. The server manages operation handle lifecycle — handles are
985-
// cleaned up when the session closes or the server evicts idle operations.
986-
// Attempting to close handles here would corrupt Thrift HTTP transport connections
987-
// when the server returns unexpected responses (e.g., WireMock 404 in tests).
988-
// For direct results, the server already closed the handle.
983+
// Close the previous server-side operation if it exists. This prevents resource
984+
// leaks when a Statement is re-executed (e.g., PreparedStatement in a loop).
985+
// This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and
986+
// Databricks Python SQL Connector — all close the previous operation on re-execute.
987+
//
988+
// Note on directResultsReceived: we check the flag value from the PREVIOUS execution
989+
// here. The flag is reset to false below, after this close attempt.
990+
//
991+
// Note on latency: this close is synchronous (adds one RPC round-trip before the next
992+
// execution). This is consistent with pgJDBC's closeForNextExecution() which is also
993+
// synchronous. The correctness benefit (no orphaned server operations) outweighs the
994+
// latency cost for typical usage patterns.
995+
//
996+
// Skip if: (1) no previous execution (statementId==null), or
997+
// (2) server already closed the operation (direct results).
998+
if (statementId != null && !directResultsReceived) {
999+
try {
1000+
connection.getSession().getDatabricksClient().closeStatement(statementId);
1001+
} catch (Exception e) {
1002+
// Don't block re-execution if closing the previous operation fails.
1003+
// This covers: network errors, operation already expired/evicted on server,
1004+
// and transport-level errors (e.g., unexpected server responses).
1005+
// The new execution will create a fresh operation with a new statementId.
1006+
LOGGER.debug(
1007+
"Failed to close previous server operation {} during re-execution: {}",
1008+
statementId,
1009+
e.getMessage());
1010+
}
1011+
}
9891012

9901013
// Stop heartbeat for the previous execution before clearing state.
9911014
// Without this, the old heartbeat (keyed by old statementId) would fail and self-terminate

src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,15 +1028,156 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr
10281028
// First execution
10291029
statement.executeQuery(STATEMENT);
10301030

1031-
// Second execution — previous ResultSet is closed per JDBC spec.
1032-
// Server handle is NOT explicitly closed (server manages handle lifecycle).
1031+
// Second execution — previous ResultSet closed. No server close because
1032+
// statementId is null in this mock setup (mock doesn't call setStatementId).
10331033
statement.executeQuery(STATEMENT);
10341034

10351035
verify(firstResult, times(1)).close();
10361036
verify(client, never()).closeStatement(any(StatementId.class));
10371037
assertEquals(secondResult, statement.getResultSet());
10381038
}
10391039

1040+
@Test
1041+
public void testReExecutionClosesServerOperation() throws Exception {
1042+
IDatabricksConnectionContext connectionContext =
1043+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
1044+
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
1045+
DatabricksStatement statement = new DatabricksStatement(connection);
1046+
1047+
DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
1048+
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
1049+
1050+
when(client.executeStatement(
1051+
eq(STATEMENT),
1052+
eq(new Warehouse(WAREHOUSE_ID)),
1053+
eq(new HashMap<>()),
1054+
eq(StatementType.QUERY),
1055+
any(IDatabricksSession.class),
1056+
eq(statement),
1057+
any()))
1058+
.thenReturn(firstResult)
1059+
.thenReturn(secondResult);
1060+
1061+
// First execution
1062+
statement.executeQuery(STATEMENT);
1063+
// Simulate server setting the statementId (normally done inside executeStatement)
1064+
StatementId firstStatementId = new StatementId("first-stmt-id");
1065+
statement.setStatementId(firstStatementId);
1066+
1067+
// Second execution — should close the first server operation
1068+
statement.executeQuery(STATEMENT);
1069+
1070+
verify(client, times(1)).closeStatement(eq(firstStatementId));
1071+
verify(firstResult, times(1)).close();
1072+
assertEquals(secondResult, statement.getResultSet());
1073+
}
1074+
1075+
@Test
1076+
public void testReExecutionSkipsServerCloseForDirectResults() throws Exception {
1077+
IDatabricksConnectionContext connectionContext =
1078+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
1079+
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
1080+
DatabricksStatement statement = new DatabricksStatement(connection);
1081+
1082+
DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
1083+
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
1084+
1085+
when(client.executeStatement(
1086+
eq(STATEMENT),
1087+
eq(new Warehouse(WAREHOUSE_ID)),
1088+
eq(new HashMap<>()),
1089+
eq(StatementType.QUERY),
1090+
any(IDatabricksSession.class),
1091+
eq(statement),
1092+
any()))
1093+
.thenReturn(firstResult)
1094+
.thenReturn(secondResult);
1095+
1096+
// First execution with direct results (server already closed the operation)
1097+
statement.executeQuery(STATEMENT);
1098+
statement.setStatementId(new StatementId("direct-stmt-id"));
1099+
statement.markDirectResultsReceived();
1100+
1101+
// Second execution — should NOT close server operation (already closed by server)
1102+
statement.executeQuery(STATEMENT);
1103+
1104+
verify(client, never()).closeStatement(any(StatementId.class));
1105+
verify(firstResult, times(1)).close();
1106+
}
1107+
1108+
@Test
1109+
public void testReExecutionHandlesCloseFailureGracefully() throws Exception {
1110+
IDatabricksConnectionContext connectionContext =
1111+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
1112+
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
1113+
DatabricksStatement statement = new DatabricksStatement(connection);
1114+
1115+
DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
1116+
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
1117+
StatementId firstStatementId = new StatementId("failing-stmt-id");
1118+
1119+
// closeStatement throws (e.g., operation already expired on server)
1120+
doThrow(new DatabricksSQLException("Operation not found", "HY000"))
1121+
.when(client)
1122+
.closeStatement(eq(firstStatementId));
1123+
1124+
when(client.executeStatement(
1125+
eq(STATEMENT),
1126+
eq(new Warehouse(WAREHOUSE_ID)),
1127+
eq(new HashMap<>()),
1128+
eq(StatementType.QUERY),
1129+
any(IDatabricksSession.class),
1130+
eq(statement),
1131+
any()))
1132+
.thenReturn(firstResult)
1133+
.thenReturn(secondResult);
1134+
1135+
statement.executeQuery(STATEMENT);
1136+
statement.setStatementId(firstStatementId);
1137+
1138+
// Re-execution should succeed even though closing previous operation failed
1139+
assertDoesNotThrow(() -> statement.executeQuery(STATEMENT));
1140+
assertEquals(secondResult, statement.getResultSet());
1141+
}
1142+
1143+
@Test
1144+
public void testReExecutionHandlesTransportErrorGracefully() throws Exception {
1145+
IDatabricksConnectionContext connectionContext =
1146+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
1147+
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
1148+
DatabricksStatement statement = new DatabricksStatement(connection);
1149+
1150+
DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
1151+
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
1152+
StatementId firstStatementId = new StatementId("transport-error-stmt-id");
1153+
1154+
// closeStatement throws a transport-level error (e.g., unexpected server response,
1155+
// corrupted framed transport). This is the scarier failure mode — not just "not found"
1156+
// but a low-level I/O error that could corrupt shared transport state.
1157+
doThrow(new RuntimeException("HTTP request failed by code: 500, unexpected response"))
1158+
.when(client)
1159+
.closeStatement(eq(firstStatementId));
1160+
1161+
when(client.executeStatement(
1162+
eq(STATEMENT),
1163+
eq(new Warehouse(WAREHOUSE_ID)),
1164+
eq(new HashMap<>()),
1165+
eq(StatementType.QUERY),
1166+
any(IDatabricksSession.class),
1167+
eq(statement),
1168+
any()))
1169+
.thenReturn(firstResult)
1170+
.thenReturn(secondResult);
1171+
1172+
statement.executeQuery(STATEMENT);
1173+
statement.setStatementId(firstStatementId);
1174+
1175+
// Re-execution must succeed even with transport-level close failure.
1176+
// The new execution creates a fresh server operation with a new statementId.
1177+
assertDoesNotThrow(() -> statement.executeQuery(STATEMENT));
1178+
assertEquals(secondResult, statement.getResultSet());
1179+
}
1180+
10401181
@Test
10411182
public void testAsyncExecutionResetsStateFromPreviousSyncExecution() throws Exception {
10421183
IDatabricksConnectionContext connectionContext =

0 commit comments

Comments
 (0)