Skip to content

Commit c724642

Browse files
gopalldbclaude
andauthored
[ES-1774740] Fix Thrift polling infinite loop on invalid operation handle (#1273)
## Summary - **Fix infinite poll loop on invalid handle**: `checkOperationStatusForErrors()` now checks `TStatus.statusCode` for `INVALID_HANDLE_STATUS` during polling. Previously only `operationState` was checked — if the server returned `INVALID_HANDLE_STATUS` without setting `operationState` (e.g. after a driver restart), the polling loop ran indefinitely. - **Add timeout and sleep to metadata polling loop**: `fetchMetadataResults()` previously had no timeout and no sleep between polls. It now uses a configurable `MetadataOperationTimeout` (default 300s) and sleeps between polls using the same interval as the SQL execution polling loop. - **New connection property**: `MetadataOperationTimeout` (seconds, default 300, 0 = no timeout) controls the metadata polling timeout. ## Context ES-1774740: After a Databricks cluster restart, the JDBC driver entered an infinite poll loop against the invalid operation handle. The root cause was that `GetOperationStatus` returned `INVALID_HANDLE_STATUS` in `TStatus.statusCode` but did not set `operationState`, so `shouldContinuePolling()` kept returning `true`. With the default `queryTimeout=0` (infinite), there was no safety net to break the loop. The metadata polling loop (`fetchMetadataResults`) had a separate issue: no timeout handler and no sleep between polls, meaning it could hammer the server in a tight loop indefinitely. SEA mode is not affected — it uses HTTP status codes (e.g. 404) for invalid statements, which propagate as uncaught `RuntimeException` rather than causing an infinite loop. ## Test plan - [x] `testPollingThrowsOnInvalidHandleStatus` — SQL execution polling detects `INVALID_HANDLE_STATUS` and throws - [x] `testMetadataPollingThrowsOnInvalidHandleStatus` — metadata polling detects `INVALID_HANDLE_STATUS` and throws - [x] `testMetadataPollingTimesOut` — metadata polling respects timeout, cancels operation, and throws - [x] `testMetadataPollingWithSleepBetweenPolls` — verifies sleep delay between metadata polls - [x] All 47 tests in `DatabricksThriftAccessorTest` pass (43 existing + 4 new) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7741c3c commit c724642

6 files changed

Lines changed: 194 additions & 3 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- PECOBLR-1121 Arrow patch to circumvent Arrow issues with JDK 16+.
1414

1515
### Fixed
16+
- Fixed Thrift polling infinite loop when server restarts invalidate operation handles, and added configurable timeout (`MetadataOperationTimeout`, default 300s) with sleep between polls for metadata operations.
1617
- Fixed `DatabricksParameterMetaData.countParameters` and `DatabricksStatement.trimCommentsAndWhitespaces` with a `SqlCommentParser` utility class.
1718
- Fixed `rollback()` to throw `SQLException` when called in auto-commit mode (no active transaction), aligning with JDBC spec. Previously it silently sent a ROLLBACK command to the server.
1819
- Fixed `fetchAutoCommitStateFromServer()` to accept both `"1"`/`"0"` and `"true"`/`"false"` responses from `SET AUTOCOMMIT` query, since different server implementations return different formats.

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,16 @@ public boolean treatMetadataCatalogNameAsPattern() {
11081108
return getParameter(DatabricksJdbcUrlParams.TREAT_METADATA_CATALOG_NAME_AS_PATTERN).equals("1");
11091109
}
11101110

1111+
@Override
1112+
public int getMetadataOperationTimeout() {
1113+
try {
1114+
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.METADATA_OPERATION_TIMEOUT));
1115+
} catch (NumberFormatException e) {
1116+
LOGGER.warn("Invalid value for MetadataOperationTimeout, using default of 300 seconds");
1117+
return 300;
1118+
}
1119+
}
1120+
11111121
@Override
11121122
public boolean getEnableMetricViewMetadata() {
11131123
return getParameter(DatabricksJdbcUrlParams.ENABLE_METRIC_VIEW_METADATA).equals("1");

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,9 @@ public interface IDatabricksConnectionContext {
409409

410410
boolean treatMetadataCatalogNameAsPattern();
411411

412+
/** Returns the timeout in seconds for metadata polling operations. 0 means no timeout. */
413+
int getMetadataOperationTimeout();
414+
412415
/** Returns whether batched INSERT optimization is enabled */
413416
boolean isBatchedInsertsEnabled();
414417

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ public enum DatabricksJdbcUrlParams {
176176
"TreatMetadataCatalogNameAsPattern",
177177
"Treat catalog names as patterns in Thrift metadata RPCs. When disabled (default), wildcard characters in catalog names are escaped",
178178
"0"),
179+
METADATA_OPERATION_TIMEOUT(
180+
"MetadataOperationTimeout",
181+
"Timeout in seconds for metadata polling operations (e.g. GetTables, GetColumns). 0 means no timeout",
182+
"300"),
179183
ENABLE_BATCHED_INSERTS("EnableBatchedInserts", "Enable batched INSERT optimization", "0"),
180184
ENABLE_SQL_VALIDATION_FOR_IS_VALID(
181185
"EnableSQLValidationForIsValid",

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,9 +712,42 @@ TFetchResultsResp fetchMetadataResults(TResp response, String contextDescription
712712
new TGetOperationStatusReq()
713713
.setOperationHandle(operationHandle)
714714
.setGetProgressUpdate(false);
715+
TimeoutHandler metadataTimeoutHandler =
716+
new TimeoutHandler(
717+
connectionContext.getMetadataOperationTimeout(),
718+
"Metadata operation for statement: " + statementId,
719+
() -> {
720+
try {
721+
if (operationHandle != null) {
722+
LOGGER.debug("Canceling metadata operation due to timeout: {}", operationHandle);
723+
cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle));
724+
}
725+
} catch (Exception e) {
726+
LOGGER.warn("Failed to cancel metadata operation on timeout: {}", e.getMessage());
727+
}
728+
},
729+
DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
715730
while (shouldContinuePolling(statusResp)) {
731+
metadataTimeoutHandler.checkTimeout();
716732
statusResp = getThriftClient().GetOperationStatus(statusReq);
717733
checkOperationStatusForErrors(statusResp, statementId);
734+
if (!shouldContinuePolling(statusResp)) {
735+
break;
736+
}
737+
try {
738+
TimeUnit.MILLISECONDS.sleep(asyncPollIntervalMillis);
739+
} catch (InterruptedException e) {
740+
Thread.currentThread().interrupt();
741+
LOGGER.error(
742+
"Metadata operation interrupted for statement [{}], canceling operation", statementId);
743+
if (operationHandle != null) {
744+
cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle));
745+
}
746+
throw new DatabricksSQLException(
747+
"Metadata operation interrupted",
748+
e,
749+
DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
750+
}
718751
}
719752

720753
if (hasResultDataInDirectResults(response)) {
@@ -758,9 +791,25 @@ private <T extends TBase<T, F>, F extends TFieldIdEnum> void checkResponseForErr
758791

759792
private void checkOperationStatusForErrors(TGetOperationStatusResp statusResp, String statementId)
760793
throws SQLException {
761-
if (statusResp != null
762-
&& statusResp.isSetOperationState()
763-
&& isErrorOperationState(statusResp.getOperationState())) {
794+
if (statusResp == null) {
795+
return;
796+
}
797+
798+
// Check TStatus for INVALID_HANDLE_STATUS — this can happen when the server restarts
799+
// and the operation handle becomes invalid. Without this check, the polling loop would
800+
// continue indefinitely since operationState may not be set in the response.
801+
if (statusResp.isSetStatus() && isErrorStatusCode(statusResp.getStatus())) {
802+
String errorMsg =
803+
String.format(
804+
"Operation status check failed with status code: [%s] for statement [%s], "
805+
+ "error: [%s]",
806+
statusResp.getStatus().getStatusCode(), statementId, statusResp.getErrorMessage());
807+
LOGGER.error(errorMsg);
808+
throw new DatabricksSQLException(
809+
errorMsg, statusResp.isSetSqlState() ? statusResp.getSqlState() : null);
810+
}
811+
812+
if (statusResp.isSetOperationState() && isErrorOperationState(statusResp.getOperationState())) {
764813
String errorMsg =
765814
String.format(
766815
"Operation failed with error: [%s] for statement [%s], with response [%s]",

src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessorTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,130 @@ void testFetchResultsWithCustomMaxRowsPerBlock()
940940
verify(thriftClient).FetchResults(expectedFetchRequest);
941941
}
942942

943+
@Test
944+
void testPollingThrowsOnInvalidHandleStatus()
945+
throws TException, SQLException, DatabricksValidationException {
946+
setup(false);
947+
948+
TExecuteStatementReq request = new TExecuteStatementReq();
949+
TExecuteStatementResp tExecuteStatementResp =
950+
new TExecuteStatementResp()
951+
.setOperationHandle(tOperationHandle)
952+
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
953+
when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp);
954+
955+
// Simulate server restart: GetOperationStatus returns INVALID_HANDLE_STATUS
956+
// without setting operationState
957+
TGetOperationStatusResp invalidHandleResp =
958+
new TGetOperationStatusResp()
959+
.setStatus(new TStatus().setStatusCode(TStatusCode.INVALID_HANDLE_STATUS));
960+
when(thriftClient.GetOperationStatus(operationStatusReq)).thenReturn(invalidHandleResp);
961+
962+
Statement statement = mock(Statement.class);
963+
when(parentStatement.getStatement()).thenReturn(statement);
964+
when(statement.getQueryTimeout()).thenReturn(0);
965+
966+
DatabricksSQLException exception =
967+
assertThrows(
968+
DatabricksSQLException.class,
969+
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));
970+
assertTrue(exception.getMessage().contains("INVALID_HANDLE_STATUS"));
971+
}
972+
973+
@Test
974+
void testMetadataPollingThrowsOnInvalidHandleStatus()
975+
throws TException, SQLException, DatabricksValidationException {
976+
setup(false);
977+
lenient().when(connectionContext.getMetadataOperationTimeout()).thenReturn(300);
978+
979+
TGetSchemasReq request = new TGetSchemasReq();
980+
TGetSchemasResp tGetSchemasResp =
981+
new TGetSchemasResp()
982+
.setOperationHandle(tOperationHandle)
983+
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
984+
when(thriftClient.GetSchemas(request)).thenReturn(tGetSchemasResp);
985+
986+
// Simulate server restart: GetOperationStatus returns INVALID_HANDLE_STATUS
987+
TGetOperationStatusResp invalidHandleResp =
988+
new TGetOperationStatusResp()
989+
.setStatus(new TStatus().setStatusCode(TStatusCode.INVALID_HANDLE_STATUS));
990+
when(thriftClient.GetOperationStatus(operationStatusReq)).thenReturn(invalidHandleResp);
991+
992+
DatabricksSQLException exception =
993+
assertThrows(DatabricksSQLException.class, () -> accessor.getThriftResponse(request));
994+
assertTrue(exception.getMessage().contains("INVALID_HANDLE_STATUS"));
995+
}
996+
997+
@Test
998+
void testMetadataPollingTimesOut()
999+
throws TException, SQLException, DatabricksValidationException {
1000+
// Set the async poll interval to 200ms for faster test
1001+
when(connectionContext.getAsyncExecPollInterval()).thenReturn(200);
1002+
// Set metadata timeout to 1 second
1003+
when(connectionContext.getMetadataOperationTimeout()).thenReturn(1);
1004+
1005+
accessor = spy(new DatabricksThriftAccessor(connectionContext));
1006+
doReturn(thriftClient).when(accessor).getThriftClient();
1007+
1008+
TGetTablesReq request = new TGetTablesReq();
1009+
TGetTablesResp tGetTablesResp =
1010+
new TGetTablesResp()
1011+
.setOperationHandle(tOperationHandle)
1012+
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
1013+
when(thriftClient.GetTables(request)).thenReturn(tGetTablesResp);
1014+
1015+
// Simulate operation that stays running forever
1016+
when(thriftClient.GetOperationStatus(operationStatusReq))
1017+
.thenReturn(operationStatusRunningResp);
1018+
1019+
// Create cancel mock
1020+
TCancelOperationResp cancelResp =
1021+
new TCancelOperationResp()
1022+
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
1023+
when(thriftClient.CancelOperation(any(TCancelOperationReq.class))).thenReturn(cancelResp);
1024+
1025+
// getThriftResponse wraps SQLException into DatabricksSQLException, so the timeout
1026+
// exception is wrapped. Verify that the root cause message contains the timeout info.
1027+
DatabricksSQLException exception =
1028+
assertThrows(DatabricksSQLException.class, () -> accessor.getThriftResponse(request));
1029+
assertTrue(exception.getMessage().contains("timed-out after 1 seconds"));
1030+
1031+
// Verify cancel was called
1032+
verify(thriftClient).CancelOperation(any(TCancelOperationReq.class));
1033+
}
1034+
1035+
@Test
1036+
void testMetadataPollingWithSleepBetweenPolls()
1037+
throws TException, SQLException, DatabricksValidationException {
1038+
// Set poll interval to 200ms
1039+
when(connectionContext.getAsyncExecPollInterval()).thenReturn(200);
1040+
when(connectionContext.getMetadataOperationTimeout()).thenReturn(300);
1041+
1042+
accessor = spy(new DatabricksThriftAccessor(connectionContext));
1043+
doReturn(thriftClient).when(accessor).getThriftClient();
1044+
1045+
TGetColumnsReq request = new TGetColumnsReq();
1046+
TGetColumnsResp tGetColumnsResp =
1047+
new TGetColumnsResp()
1048+
.setOperationHandle(tOperationHandle)
1049+
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
1050+
when(thriftClient.GetColumns(request)).thenReturn(tGetColumnsResp);
1051+
1052+
// Simulate: first poll returns running, second returns finished
1053+
when(thriftClient.GetOperationStatus(operationStatusReq))
1054+
.thenReturn(operationStatusRunningResp)
1055+
.thenReturn(operationStatusFinishedResp);
1056+
when(thriftClient.FetchResults(getFetchResultsRequest(false))).thenReturn(fetchResultsResponse);
1057+
1058+
long startTime = System.currentTimeMillis();
1059+
TFetchResultsResp actualResponse = (TFetchResultsResp) accessor.getThriftResponse(request);
1060+
long elapsed = System.currentTimeMillis() - startTime;
1061+
1062+
assertEquals(actualResponse, fetchResultsResponse);
1063+
// Verify sleep happened — elapsed time should be at least ~200ms
1064+
assertTrue(elapsed >= 150, "Expected at least 150ms elapsed due to poll sleep, got " + elapsed);
1065+
}
1066+
9431067
private TFetchResultsReq getFetchResultsRequest(boolean includeMetadata)
9441068
throws DatabricksValidationException {
9451069
TFetchResultsReq request =

0 commit comments

Comments
 (0)