Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@

### Fixed

- Reclassify transient/mis-categorized server errors so callers can identify
retryable failures. The remap is applied at all Thrift error sites
(`checkResponseForErrors`, `executeAsync`, `verifySuccessStatus`, and the
polling status handler) so the same server failure surfaces with the same
SQL state regardless of which response carries it.
- Unity Catalog unavailability (`[UC_CLIENT_EXCEPTION]`, previously `XXUCC`)
and parquet read / connection-acquisition deadlines
(`[PARQUET_FAILED_READ_FOOTER]`, `DEADLINE_EXCEEDED: acquiring connection`)
are now reported with SQL state `08S01` (communication link failure).
- Server-side `java.util.ConcurrentModificationException` is now reported
with SQL state `40001` (serialization failure) instead of the misleading
`42000`. The remap only applies when the original SQL state is `42000` so
unrelated `42xxx` states (e.g. `42501` insufficient privilege) are
preserved.
Notes for callers and operators:
- Callers branching on the legacy `XXUCC`/`42000` states for these failures
must update to `08S01`/`40001`. The driver logs the original→remapped
state at `INFO` level for traceability.
- The driver's failure telemetry uses `sqlState` as the error-name field,
so dashboards/alerts keyed on `XXUCC` or `42000` for these specific
failure modes will need to be updated to the new states.

---
*Note: When making changes, please add your change under the appropriate section
with a brief description.*
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ public final class DatabricksJdbcConstants {
/** Standard SQL state for communication link failure (SQLSTATE 08S01). */
public static final String COMMUNICATION_LINK_FAILURE_SQLSTATE = "08S01";

/**
* Standard SQL state for transaction rollback - serialization failure (SQLSTATE 40001). Used for
* concurrent-modification errors where the operation is potentially retryable.
*/
public static final String SERIALIZATION_FAILURE_SQLSTATE = "40001";

public static final int TEMPORARY_REDIRECT_STATUS_CODE = 307;
public static final String REDACTED_TOKEN = "****";
public static final String QUERY_TAGS = "query_tags";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,16 @@ public static void verifySuccessStatus(TStatus status, String errorContext, Stri
errorMessage, sqlState, null, DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
}

throw new DatabricksHttpException(errorMessage, sqlState);
String remappedSqlState =
SqlStateClassifier.classifyTransientSqlState(status.getErrorMessage(), sqlState);
if (!Objects.equals(remappedSqlState, sqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in thrift status (context: {})",
sqlState,
remappedSqlState,
errorContext);
}
throw new DatabricksHttpException(errorMessage, remappedSqlState);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.databricks.jdbc.common.util;

import static com.databricks.jdbc.common.DatabricksJdbcConstants.COMMUNICATION_LINK_FAILURE_SQLSTATE;
import static com.databricks.jdbc.common.DatabricksJdbcConstants.SERIALIZATION_FAILURE_SQLSTATE;

/**
* Reclassifies SQL states for known transient or mis-categorized server errors so callers can
* programmatically identify retryable failures.
*
* <p>Patterns handled:
*
* <ul>
* <li>Unity Catalog unavailability ({@code [UC_CLIENT_EXCEPTION]}) → {@code 08S01} (communication
* link failure, retryable).
* <li>Connection-acquisition / parquet read deadlines ({@code [PARQUET_FAILED_READ_FOOTER]},
* {@code DEADLINE_EXCEEDED: acquiring connection}) → {@code 08S01}.
* <li>Server-side {@code java.util.ConcurrentModificationException} mis-mapped to {@code 42000}
* (syntax/access violation) → {@code 40001} (serialization failure, retryable). Only applied
* when the original SQL state is {@code 42000} so unrelated {@code 42xxx} states are
* preserved.
* </ul>
*
* <p>Patterns are anchored on stable server-emitted tokens (bracketed Spark error classes,
* fully-qualified Java exception names) rather than English prose, so user-supplied SQL string
* literals cannot trigger a false remap and server message rewording does not silently regress the
* classifier.
*/
public final class SqlStateClassifier {

private static final String SYNTAX_OR_ACCESS_VIOLATION_SQLSTATE = "42000";

private SqlStateClassifier() {}

/**
* Returns a remapped SQL state if {@code errorMessage} matches a known transient pattern, or
* {@code originalSqlState} otherwise. Pure function — callers should log the remap separately
* with their own context (statement ID, response).
*/
public static String classifyTransientSqlState(String errorMessage, String originalSqlState) {
if (errorMessage == null) {
return originalSqlState;
}
// Bracketed Spark error classes are the outer cause; check before nested-Java-exception
// patterns like ConcurrentModificationException, which may appear inside a UC/Parquet
// wrapping.
if (errorMessage.contains("[UC_CLIENT_EXCEPTION]")
|| errorMessage.contains("[PARQUET_FAILED_READ_FOOTER]")
|| errorMessage.contains("DEADLINE_EXCEEDED: acquiring connection")) {
return COMMUNICATION_LINK_FAILURE_SQLSTATE;
}
if (SYNTAX_OR_ACCESS_VIOLATION_SQLSTATE.equals(originalSqlState)
&& errorMessage.contains("java.util.ConcurrentModificationException")) {
return SERIALIZATION_FAILURE_SQLSTATE;
}
return originalSqlState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT;
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.DECIMAL;
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.getDecimalTypeString;
import static com.databricks.jdbc.common.util.SqlStateClassifier.classifyTransientSqlState;
import static com.databricks.jdbc.dbclient.impl.sqlexec.PathConstants.*;
import static com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.TEMPORARY_REDIRECT_EXCEPTION;

Expand Down Expand Up @@ -766,8 +767,16 @@ void handleFailedExecution(
throw new DatabricksTimeoutException(
errorMessage, null, DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
}
String remappedSqlState = classifyTransientSqlState(errorMessage, sqlState);
if (!Objects.equals(remappedSqlState, sqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in SEA statement [{}]",
sqlState,
remappedSqlState,
statementId);
}
throw new DatabricksSQLException(
errorMessage, sqlState, DatabricksDriverErrorCode.EXECUTE_STATEMENT_FAILED);
errorMessage, remappedSqlState, DatabricksDriverErrorCode.EXECUTE_STATEMENT_FAILED);
}

private ExecuteStatementResponse wrapGetStatementResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.databricks.jdbc.common.DatabricksJdbcConstants.QUERY_EXECUTION_TIMEOUT_SQLSTATE;
import static com.databricks.jdbc.common.EnvironmentVariables.*;
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.*;
import static com.databricks.jdbc.common.util.SqlStateClassifier.classifyTransientSqlState;

import com.databricks.jdbc.api.impl.*;
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
Expand All @@ -29,6 +30,7 @@
import com.databricks.sdk.service.sql.StatementState;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpException;
import org.apache.thrift.TBase;
Expand Down Expand Up @@ -379,7 +381,16 @@ DatabricksResultSet executeAsync(
"Received error response {} from Thrift Server for request {}",
response,
request.toString());
throw new DatabricksSQLException(response.status.errorMessage, response.status.sqlState);
String originalSqlState = response.status.sqlState;
String remappedSqlState =
classifyTransientSqlState(response.status.errorMessage, originalSqlState);
if (!Objects.equals(remappedSqlState, originalSqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in async execute response",
originalSqlState,
remappedSqlState);
}
throw new DatabricksSQLException(response.status.errorMessage, remappedSqlState);
}
} catch (DatabricksSQLException | TException e) {

Expand Down Expand Up @@ -819,7 +830,16 @@ private <T extends TBase<T, F>, F extends TFieldIdEnum> void checkResponseForErr
if (!response.isSet(operationHandleField) || isErrorStatusCode(status)) {
// if the operationHandle has not been set, it is an error from the server.
LOGGER.error("Error thrift response {}", response);
throw new DatabricksSQLException(status.getErrorMessage(), status.getSqlState());
String originalSqlState = status.getSqlState();
String remappedSqlState =
classifyTransientSqlState(status.getErrorMessage(), originalSqlState);
if (!Objects.equals(remappedSqlState, originalSqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in thrift response",
originalSqlState,
remappedSqlState);
}
throw new DatabricksSQLException(status.getErrorMessage(), remappedSqlState);
}
}

Expand All @@ -840,8 +860,16 @@ private void checkOperationStatusForErrors(TGetOperationStatusResp statusResp, S
+ "error: [%s]",
statusResp.getStatus().getStatusCode(), statementId, serverError);
LOGGER.error(errorMsg);
throw new DatabricksSQLException(
errorMsg, statusResp.isSetSqlState() ? statusResp.getSqlState() : null);
String originalSqlState = statusResp.isSetSqlState() ? statusResp.getSqlState() : null;
String remappedSqlState = classifyTransientSqlState(serverError, originalSqlState);
if (!Objects.equals(remappedSqlState, originalSqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in statement [{}]",
originalSqlState,
remappedSqlState,
statementId);
}
throw new DatabricksSQLException(errorMsg, remappedSqlState);
}

if (statusResp.isSetOperationState()
Expand All @@ -864,7 +892,15 @@ private void checkOperationStatusForErrors(TGetOperationStatusResp statusResp, S
errorMsg, null, DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
}

throw new DatabricksSQLException(errorMsg, sqlState);
String remappedSqlState = classifyTransientSqlState(serverError, sqlState);
if (!Objects.equals(remappedSqlState, sqlState)) {
LOGGER.info(
"Remapped SQL state [{}] -> [{}] for transient error pattern in statement [{}]",
sqlState,
remappedSqlState,
statementId);
}
throw new DatabricksSQLException(errorMsg, remappedSqlState);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.databricks.jdbc.common.util;

import static com.databricks.jdbc.common.util.SqlStateClassifier.classifyTransientSqlState;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.Test;

class SqlStateClassifierTest {

@Test
void unityCatalogBracketedTokenRemapsTo08S01() {
String ucMessage =
"Error running query: [UC_CLIENT_EXCEPTION] "
+ "com.databricks.sql.managedcatalog.UnityCatalogClientException: "
+ "[UC_CLIENT_EXCEPTION] Failed to contact the Unity Catalog server. "
+ "HTTP/1.1 504 Gateway Timeout, DEADLINE_EXCEEDED";
assertEquals("08S01", classifyTransientSqlState(ucMessage, "XXUCC"));
}

@Test
void parquetReadFooterBracketedTokenRemapsTo08S01() {
String message =
"Error running query: [PARQUET_FAILED_READ_FOOTER] "
+ "com.databricks.sql.io.parquet.ParquetFailedReadFooterException: "
+ "DEADLINE_EXCEEDED: acquiring connection";
assertEquals("08S01", classifyTransientSqlState(message, null));
}

@Test
void deadlineExceededAcquiringConnectionRemapsTo08S01() {
assertEquals(
"08S01",
classifyTransientSqlState("DEADLINE_EXCEEDED: acquiring connection from pool", null));
}

@Test
void concurrentModificationGatedOn42000() {
String message =
"Error running query: java.util.ConcurrentModificationException: "
+ "mutation occurred during iteration";
assertEquals("40001", classifyTransientSqlState(message, "42000"));
}

@Test
void concurrentModificationDoesNotRemapWhenOriginalStateIsNot42000() {
String message =
"Error running query: java.util.ConcurrentModificationException: "
+ "mutation occurred during iteration";
assertEquals("42501", classifyTransientSqlState(message, "42501"));
assertEquals("XXUCC", classifyTransientSqlState(message, "XXUCC"));
assertNull(classifyTransientSqlState(message, null));
}

@Test
void bareConcurrentModificationExceptionWithoutFqnDoesNotRemap() {
assertEquals(
"42000",
classifyTransientSqlState(
"SELECT 'ConcurrentModificationException' FROM nonexistent_tbl", "42000"));
}

@Test
void unbracketedUcProseDoesNotTriggerRemap() {
assertEquals(
"42S02",
classifyTransientSqlState(
"User-supplied literal: Failed to contact the Unity Catalog server", "42S02"));
}

@Test
void unrelatedErrorPreservesOriginalState() {
assertEquals(
"42S02", classifyTransientSqlState("Table or view not found: foo.bar.baz", "42S02"));
}

@Test
void nullMessagePreservesOriginalState() {
assertNull(classifyTransientSqlState(null, null));
assertEquals("42S02", classifyTransientSqlState(null, "42S02"));
}

@Test
void emptyOriginalStateIsPreservedWhenUnrelated() {
assertEquals("", classifyTransientSqlState("Some unrelated error", ""));
}

@Test
void caseSensitivityIsExplicit() {
assertEquals(
"42S02",
classifyTransientSqlState("Lowercase [uc_client_exception] should not match", "42S02"));
}

@Test
void ucCheckRunsBeforeCmeWhenOriginalStateIs42000AndBothSubstringsPresent() {
String message =
"[UC_CLIENT_EXCEPTION] catalog server: caused by "
+ "java.util.ConcurrentModificationException: nested";
assertEquals("08S01", classifyTransientSqlState(message, "42000"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,38 @@ public void testHandleFailedExecution_FailedState_ThrowsWithoutHY008() throws Ex
assertTrue(exception.getMessage().contains("execution failed"));
}

@Test
public void testHandleFailedExecution_unityCatalogError_remapsToCommunicationLinkFailure()
throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);

StatementStatus failedStatus =
new StatementStatus()
.setState(StatementState.FAILED)
.setSqlState("XXUCC")
.setError(
new ServiceError()
.setMessage(
"[UC_CLIENT_EXCEPTION] Failed to contact the Unity Catalog server. "
+ "HTTP/1.1 504 Gateway Timeout, DEADLINE_EXCEEDED"));
ExecuteStatementResponse response =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(failedStatus);

DatabricksSQLException exception =
assertThrows(
DatabricksSQLException.class,
() ->
databricksSdkClient.handleFailedExecution(
response, STATEMENT_ID.toSQLExecStatementId(), STATEMENT));

assertEquals("08S01", exception.getSQLState(), "Expected XXUCC to be remapped to 08S01");
}

@Test
public void testGetStatementResult_CancelledState_ThrowsWithHY008() throws Exception {
IDatabricksConnectionContext connectionContext =
Expand Down
Loading
Loading