-
Notifications
You must be signed in to change notification settings - Fork 40
Fix race condition in thrift client #999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fc65c7c
37e3b18
fae8c12
6867875
d83d672
fd547c4
6c9c2a0
479fd66
0aecd99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,10 +15,7 @@ | |
| import com.databricks.jdbc.dbclient.impl.common.StatementId; | ||
| import com.databricks.jdbc.dbclient.impl.common.TimeoutHandler; | ||
| import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory; | ||
| import com.databricks.jdbc.exception.DatabricksHttpException; | ||
| import com.databricks.jdbc.exception.DatabricksParsingException; | ||
| import com.databricks.jdbc.exception.DatabricksSQLException; | ||
| import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException; | ||
| import com.databricks.jdbc.exception.*; | ||
| import com.databricks.jdbc.log.JdbcLogger; | ||
| import com.databricks.jdbc.log.JdbcLoggerFactory; | ||
| import com.databricks.jdbc.model.client.thrift.generated.*; | ||
|
|
@@ -27,7 +24,6 @@ | |
| import com.databricks.jdbc.telemetry.latency.TelemetryCollector; | ||
| import com.databricks.sdk.core.DatabricksConfig; | ||
| import com.databricks.sdk.service.sql.StatementState; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import java.sql.SQLException; | ||
| import java.util.Arrays; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -51,13 +47,15 @@ final class DatabricksThriftAccessor { | |
| TExecuteStatementResp._Fields.OPERATION_HANDLE.getThriftFieldId(); | ||
| private static final short statusFieldId = | ||
| TExecuteStatementResp._Fields.STATUS.getThriftFieldId(); | ||
| private final ThreadLocal<TCLIService.Client> thriftClient; | ||
| private final DatabricksConfig databricksConfig; | ||
| private final boolean enableDirectResults; | ||
| private final int asyncPollIntervalMillis; | ||
| private final int maxRowsPerBlock; | ||
| private final String connectionUuid; | ||
| private final String endpointUrl; | ||
| private final IDatabricksConnectionContext connectionContext; | ||
| private TProtocolVersion serverProtocolVersion = JDBC_THRIFT_VERSION; | ||
| private ThreadLocal<TCLIService.Client> FAKE_SHARED_CLIENT; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: don't use caps for non final variables |
||
|
|
||
| DatabricksThriftAccessor(IDatabricksConnectionContext connectionContext) | ||
| throws DatabricksParsingException { | ||
|
|
@@ -66,35 +64,17 @@ final class DatabricksThriftAccessor { | |
| DatabricksClientConfiguratorManager.getInstance() | ||
| .getConfigurator(connectionContext) | ||
| .getDatabricksConfig(); | ||
| String endPointUrl = connectionContext.getEndpointURL(); | ||
| this.endpointUrl = connectionContext.getEndpointURL(); | ||
| this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval(); | ||
| this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock(); | ||
| this.connectionUuid = connectionContext.getConnectionUuid(); | ||
|
|
||
| if (!DriverUtil.isRunningAgainstFake()) { | ||
| // Create a new thrift client for each thread as client state is not thread safe. Note that | ||
| // the underlying protocol uses the same http client which is thread safe | ||
| this.thriftClient = | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need this logic any more?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Samikshya explained to me offline, @jayantsing-db can you also take a look as you worked on this previously. |
||
| ThreadLocal.withInitial( | ||
| () -> createThriftClient(endPointUrl, databricksConfig, connectionContext)); | ||
| } else { | ||
| TCLIService.Client client = | ||
| createThriftClient(endPointUrl, databricksConfig, connectionContext); | ||
| this.thriftClient = ThreadLocal.withInitial(() -> client); | ||
| this.connectionContext = connectionContext; | ||
| if (DriverUtil.isRunningAgainstFake()) { | ||
| TCLIService.Client client = newThriftClient(); | ||
| this.FAKE_SHARED_CLIENT = ThreadLocal.withInitial(() -> client); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| DatabricksThriftAccessor( | ||
| TCLIService.Client client, IDatabricksConnectionContext connectionContext) { | ||
| this.databricksConfig = null; | ||
| this.thriftClient = ThreadLocal.withInitial(() -> client); | ||
| this.enableDirectResults = connectionContext.getDirectResultMode(); | ||
| this.asyncPollIntervalMillis = connectionContext.getAsyncExecPollInterval(); | ||
| this.maxRowsPerBlock = connectionContext.getRowsFetchedPerBlock(); | ||
| this.connectionUuid = connectionContext.getConnectionUuid(); | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| TBase getThriftResponse(TBase request) throws DatabricksSQLException { | ||
| LOGGER.debug("Fetching thrift response for request {}", request.toString()); | ||
|
|
@@ -491,10 +471,6 @@ DatabricksResultSet getStatementResult( | |
| executionStatus, statementId, resultSet, StatementType.SQL, parentStatement, session); | ||
| } | ||
|
|
||
| TCLIService.Client getThriftClient() { | ||
| return thriftClient.get(); | ||
| } | ||
|
|
||
| DatabricksConfig getDatabricksConfig() { | ||
| return databricksConfig; | ||
| } | ||
|
|
@@ -602,25 +578,22 @@ private TFetchResultsResp listColumns(TGetColumnsReq request) | |
| return fetchMetadataResults(response, response.toString()); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new thrift client for the given endpoint URL and authentication headers. | ||
| * | ||
| * @param endPointUrl endpoint URL | ||
| * @param databricksConfig SDK config object required for authentication headers | ||
| */ | ||
| private TCLIService.Client createThriftClient( | ||
| String endPointUrl, | ||
| DatabricksConfig databricksConfig, | ||
| IDatabricksConnectionContext connectionContext) { | ||
| /** Creates a new thrift client for the given endpoint URL and authentication headers. */ | ||
| TCLIService.Client getThriftClient() { | ||
| if (DriverUtil.isRunningAgainstFake()) { | ||
| return FAKE_SHARED_CLIENT.get(); | ||
| } | ||
| return newThriftClient(); | ||
| } | ||
|
|
||
| private TCLIService.Client newThriftClient() { | ||
| DatabricksHttpTTransport transport = | ||
| new DatabricksHttpTTransport( | ||
| DatabricksHttpClientFactory.getInstance().getClient(connectionContext), | ||
| endPointUrl, | ||
| endpointUrl, | ||
| databricksConfig, | ||
| connectionContext); | ||
| TBinaryProtocol protocol = new TBinaryProtocol(transport); | ||
|
|
||
| return new TCLIService.Client(protocol); | ||
| return new TCLIService.Client(new TBinaryProtocol(transport)); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i am understanding correctly, a JDBC connection will now have a single thrift client. So, when a JDBC connection is used in different threads concurrently, it will use the same thrift client. Now, if we don't put a sync mechanism, different threads using the same thrift client will fail because:
out of sequence responseThis section of the code with all good intent and purposes tries to put a sync mechanism in place. But I am afraid, this might not be sufficient.
So the thrift calls happen in this order sequentially for a thread:
The current sync mechanism only protects transport.write and transport.read. And things could fail in exciting ways because the whole chain is not isolated/protected.
Happy to chat if this does not make sense at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, JDBC is a sync protocol and client app is not advised to use the same connection in different threads but there is no strict enforcement. Also i wrote a basic test on top of these changes that uses a JDBC connection across many threads and executes statements. I didn't encounter any errors (but i believe like other threading issues that's by chance that threads didn't step over each other)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this change, my goal was specifically to address the getFunctions issue raised in this repo. I may not be fully following the broader concern you mentioned — could you please share a concrete test case where the current fix fails? That would really help me understand it better.