Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Updated

### Fixed
- Fixed state leaking issue in thrift client.
- Fixed timestamp values returning only milliseconds instead of the full nanosecond precision.
---
*Note: When making changes, please add your change under the appropriate section with a brief description.*
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public int read(byte[] buf, int off, int len) throws TTransportException {

@Override
public void write(byte[] buf, int off, int len) {
requestBuffer.write(buf, off, len);
synchronized (requestBuffer) {
requestBuffer.write(buf, off, len);
}
}

@Override
Expand Down Expand Up @@ -115,9 +117,13 @@ public void flush() throws TTransportException {
LOGGER.debug("Thrift tracing header: " + traceHeader);
request.addHeader(TracingUtil.TRACE_HEADER, traceHeader);
}

byte[] requestPayload;
synchronized (requestBuffer) {
Copy link
Copy Markdown
Collaborator

@jayantsing-db jayantsing-db Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i am understanding correctly, a JDBC connection will now have a single thrift client. So, when a JDBC connection is used in different threads concurrently, it will use the same thrift client. Now, if we don't put a sync mechanism, different threads using the same thrift client will fail because:

  • thread A can increment the seq number of thrift client from x to x + 1
  • thread B can receive the response for the seq number x. thread B will see that seq num is not matching and will throw exception: out of sequence response

This section of the code with all good intent and purposes tries to put a sync mechanism in place. But I am afraid, this might not be sufficient.

So the thrift calls happen in this order sequentially for a thread:

TCLIService.Client.ExecuteStatement -> seqid ++ -> transport.write -> transport.flush -> <wait for the http response> -> transport.read (when reading it checks the seqid with that present in message)

The current sync mechanism only protects transport.write and transport.read. And things could fail in exciting ways because the whole chain is not isolated/protected.

Happy to chat if this does not make sense at all.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, JDBC is a sync protocol and client app is not advised to use the same connection in different threads but there is no strict enforcement. Also i wrote a basic test on top of these changes that uses a JDBC connection across many threads and executes statements. I didn't encounter any errors (but i believe like other threading issues that's by chance that threads didn't step over each other)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this change, my goal was specifically to address the getFunctions issue raised in this repo. I may not be fully following the broader concern you mentioned — could you please share a concrete test case where the current fix fails? That would really help me understand it better.

requestPayload = requestBuffer.toByteArray();
requestBuffer.reset();
}
// Set the request entity
request.setEntity(new ByteArrayEntity(requestBuffer.toByteArray()));
request.setEntity(new ByteArrayEntity(requestPayload));

// Execute the request and handle the response
long httpRequestStartTime = System.currentTimeMillis();
Expand Down Expand Up @@ -145,9 +151,6 @@ public void flush() throws TTransportException {
LOGGER.error(e, errorMessage);
throw new TTransportException(TTransportException.UNKNOWN, errorMessage, e);
}

// Reset the request buffer
requestBuffer.reset();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.dbclient.impl.common.TimeoutHandler;
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
import com.databricks.jdbc.exception.DatabricksHttpException;
import com.databricks.jdbc.exception.DatabricksParsingException;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
import com.databricks.jdbc.exception.*;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.thrift.generated.*;
Expand All @@ -27,7 +24,6 @@
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
import com.databricks.sdk.core.DatabricksConfig;
import com.databricks.sdk.service.sql.StatementState;
import com.google.common.annotations.VisibleForTesting;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
Expand All @@ -51,13 +47,15 @@ final class DatabricksThriftAccessor {
TExecuteStatementResp._Fields.OPERATION_HANDLE.getThriftFieldId();
private static final short statusFieldId =
TExecuteStatementResp._Fields.STATUS.getThriftFieldId();
private final ThreadLocal<TCLIService.Client> thriftClient;
private final DatabricksConfig databricksConfig;
private final boolean enableDirectResults;
private final int asyncPollIntervalMillis;
private final int maxRowsPerBlock;
private final String connectionUuid;
private final String endpointUrl;
private final IDatabricksConnectionContext connectionContext;
private TProtocolVersion serverProtocolVersion = JDBC_THRIFT_VERSION;
private ThreadLocal<TCLIService.Client> FAKE_SHARED_CLIENT;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't use caps for non final variables


DatabricksThriftAccessor(IDatabricksConnectionContext connectionContext)
throws DatabricksParsingException {
Expand All @@ -66,35 +64,17 @@ final class DatabricksThriftAccessor {
DatabricksClientConfiguratorManager.getInstance()
.getConfigurator(connectionContext)
.getDatabricksConfig();
String endPointUrl = connectionContext.getEndpointURL();
this.endpointUrl = connectionContext.getEndpointURL();
this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval();
this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock();
this.connectionUuid = connectionContext.getConnectionUuid();

if (!DriverUtil.isRunningAgainstFake()) {
// Create a new thrift client for each thread as client state is not thread safe. Note that
// the underlying protocol uses the same http client which is thread safe
this.thriftClient =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this logic any more?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samikshya explained to me offline, @jayantsing-db can you also take a look as you worked on this previously.

ThreadLocal.withInitial(
() -> createThriftClient(endPointUrl, databricksConfig, connectionContext));
} else {
TCLIService.Client client =
createThriftClient(endPointUrl, databricksConfig, connectionContext);
this.thriftClient = ThreadLocal.withInitial(() -> client);
this.connectionContext = connectionContext;
if (DriverUtil.isRunningAgainstFake()) {
TCLIService.Client client = newThriftClient();
this.FAKE_SHARED_CLIENT = ThreadLocal.withInitial(() -> client);
}
}

@VisibleForTesting
DatabricksThriftAccessor(
TCLIService.Client client, IDatabricksConnectionContext connectionContext) {
this.databricksConfig = null;
this.thriftClient = ThreadLocal.withInitial(() -> client);
this.enableDirectResults = connectionContext.getDirectResultMode();
this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval();
this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock();
this.connectionUuid = connectionContext.getConnectionUuid();
}

@SuppressWarnings("rawtypes")
TBase getThriftResponse(TBase request) throws DatabricksSQLException {
LOGGER.debug("Fetching thrift response for request {}", request.toString());
Expand Down Expand Up @@ -491,10 +471,6 @@ DatabricksResultSet getStatementResult(
executionStatus, statementId, resultSet, StatementType.SQL, parentStatement, session);
}

TCLIService.Client getThriftClient() {
return thriftClient.get();
}

DatabricksConfig getDatabricksConfig() {
return databricksConfig;
}
Expand Down Expand Up @@ -602,25 +578,22 @@ private TFetchResultsResp listColumns(TGetColumnsReq request)
return fetchMetadataResults(response, response.toString());
}

/**
* Creates a new thrift client for the given endpoint URL and authentication headers.
*
* @param endPointUrl endpoint URL
* @param databricksConfig SDK config object required for authentication headers
*/
private TCLIService.Client createThriftClient(
String endPointUrl,
DatabricksConfig databricksConfig,
IDatabricksConnectionContext connectionContext) {
/** Creates a new thrift client for the given endpoint URL and authentication headers. */
TCLIService.Client getThriftClient() {
if (DriverUtil.isRunningAgainstFake()) {
return FAKE_SHARED_CLIENT.get();
}
return newThriftClient();
}

private TCLIService.Client newThriftClient() {
DatabricksHttpTTransport transport =
new DatabricksHttpTTransport(
DatabricksHttpClientFactory.getInstance().getClient(connectionContext),
endPointUrl,
endpointUrl,
databricksConfig,
connectionContext);
TBinaryProtocol protocol = new TBinaryProtocol(transport);

return new TCLIService.Client(protocol);
return new TCLIService.Client(new TBinaryProtocol(transport));
}

/**
Expand Down
Loading
Loading