Skip to content

Commit 59e524b

Browse files
authored
Add statement ID and session ID wherever possible (#844)
* Add statement ID and session ID wherever possible * Add null check tests
1 parent 3957211 commit 59e524b

8 files changed

Lines changed: 81 additions & 20 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
66
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
77
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
8-
import com.databricks.jdbc.dbclient.impl.common.StatementId;
98
import com.databricks.jdbc.exception.DatabricksParsingException;
109
import com.databricks.jdbc.exception.DatabricksSQLException;
1110
import com.databricks.jdbc.log.JdbcLogger;
@@ -26,7 +25,7 @@ class ChunkDownloadTask implements DatabricksCallableTask {
2625
private final IDatabricksHttpClient httpClient;
2726
private final ChunkDownloadCallback chunkDownloader;
2827
private final IDatabricksConnectionContext connectionContext;
29-
private final StatementId statementId;
28+
private final String statementId;
3029
private final ChunkLinkDownloadService linkDownloadService;
3130
Throwable uncaughtException = null;
3231

src/main/java/com/databricks/jdbc/common/util/DatabricksThreadContextHolder.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.databricks.jdbc.common.StatementType;
55
import com.databricks.jdbc.dbclient.impl.common.StatementId;
66

7+
/* TODO : eliminate the use of thread local completely. Currently, we are limiting the usage of this for non-critical flows such as telemetry.*/
78
public class DatabricksThreadContextHolder {
89
private static final ThreadLocal<IDatabricksConnectionContext> localConnectionContext =
910
new ThreadLocal<>();
10-
private static final ThreadLocal<StatementId> localStatementId = new ThreadLocal<>();
11+
private static final ThreadLocal<String> localStatementId = new ThreadLocal<>();
1112
private static final ThreadLocal<Long> localChunkId = new ThreadLocal<>();
1213
private static final ThreadLocal<Integer> localRetryCount = new ThreadLocal<>();
1314
private static final ThreadLocal<StatementType> localStatementType = new ThreadLocal<>();
15+
private static final ThreadLocal<String> localSessionId = new ThreadLocal<>();
1416

1517
public static void setConnectionContext(IDatabricksConnectionContext context) {
1618
localConnectionContext.set(context);
@@ -21,13 +23,28 @@ public static IDatabricksConnectionContext getConnectionContext() {
2123
}
2224

2325
public static void setStatementId(StatementId statementId) {
26+
if (statementId != null) {
27+
localStatementId.set(
28+
statementId.toSQLExecStatementId()); // This is because only GUID is relevant for tracking
29+
}
30+
}
31+
32+
public static void setStatementId(String statementId) {
2433
localStatementId.set(statementId);
2534
}
2635

27-
public static StatementId getStatementId() {
36+
public static String getStatementId() {
2837
return localStatementId.get();
2938
}
3039

40+
public static void setSessionId(String sessionId) {
41+
localSessionId.set(sessionId);
42+
}
43+
44+
public static String getSessionId() {
45+
return localSessionId.get();
46+
}
47+
3148
public static void setStatementType(StatementType statementType) {
3249
localStatementType.set(statementType);
3350
}

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public ImmutableSessionInfo createSession(
128128
LOGGER.error(errorMessage, e);
129129
throw new DatabricksSQLException(errorMessage, e, DatabricksDriverErrorCode.SDK_CLIENT_ERROR);
130130
}
131+
DatabricksThreadContextHolder.setSessionId(createSessionResponse.getSessionId());
131132
return ImmutableSessionInfo.builder()
132133
.computeResource(warehouse)
133134
.sessionId(createSessionResponse.getSessionId())
@@ -137,6 +138,7 @@ public ImmutableSessionInfo createSession(
137138
@Override
138139
public void deleteSession(ImmutableSessionInfo sessionInfo) throws DatabricksSQLException {
139140
LOGGER.debug("public void deleteSession(String sessionId = {})", sessionInfo.sessionId());
141+
DatabricksThreadContextHolder.setSessionId(sessionInfo.sessionId());
140142
DeleteSessionRequest request =
141143
new DeleteSessionRequest()
142144
.setSessionId(sessionInfo.sessionId())
@@ -164,11 +166,14 @@ public DatabricksResultSet executeStatement(
164166
IDatabricksStatementInternal parentStatement)
165167
throws SQLException {
166168
LOGGER.debug(
167-
"public DatabricksResultSet executeStatement(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters = {}, StatementType statementType = {}, IDatabricksSession session)",
169+
"public DatabricksResultSet executeStatement(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters = {}, StatementType statementType = {}, IDatabricksSession session = {}, parentStatement = {})",
168170
sql,
169171
computeResource.toString(),
170172
parameters,
171-
statementType);
173+
statementType,
174+
session,
175+
parentStatement);
176+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
172177
long pollCount = 0;
173178
long executionStartTime = Instant.now().toEpochMilli();
174179
DatabricksThreadContextHolder.setStatementType(statementType);
@@ -207,6 +212,7 @@ public DatabricksResultSet executeStatement(
207212
computeResource,
208213
statementId);
209214
StatementId typedStatementId = new StatementId(statementId);
215+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
210216
if (parentStatement != null) {
211217
parentStatement.setStatementId(typedStatementId);
212218
}
@@ -279,9 +285,12 @@ public DatabricksResultSet executeStatementAsync(
279285
IDatabricksStatementInternal parentStatement)
280286
throws SQLException {
281287
LOGGER.debug(
282-
"public DatabricksResultSet executeStatementAsync(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters, IDatabricksSession session)",
288+
"public DatabricksResultSet executeStatementAsync(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters, IDatabricksSession session = {}, IDatabricksStatementInternal parentStatement = {})",
283289
sql,
284-
computeResource.toString());
290+
computeResource.toString(),
291+
session,
292+
parentStatement);
293+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
285294
ExecuteStatementRequest request =
286295
getRequest(
287296
StatementType.SQL,
@@ -307,6 +316,7 @@ public DatabricksResultSet executeStatementAsync(
307316
handleFailedExecution(response, "", sql);
308317
}
309318
StatementId typedStatementId = new StatementId(statementId);
319+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
310320
if (parentStatement != null) {
311321
parentStatement.setStatementId(typedStatementId);
312322
}
@@ -328,6 +338,8 @@ public DatabricksResultSet getStatementResult(
328338
IDatabricksSession session,
329339
IDatabricksStatementInternal parentStatement)
330340
throws DatabricksSQLException {
341+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
342+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
331343
String statementId = typedStatementId.toSQLExecStatementId();
332344
GetStatementRequest request = new GetStatementRequest().setStatementId(statementId);
333345
String getStatusPath = String.format(STATEMENT_PATH_WITH_ID, statementId);
@@ -354,6 +366,7 @@ public DatabricksResultSet getStatementResult(
354366
@Override
355367
public void closeStatement(StatementId typedStatementId) throws DatabricksSQLException {
356368
String statementId = typedStatementId.toSQLExecStatementId();
369+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
357370
LOGGER.debug(String.format("public void closeStatement(String statementId = {})", statementId));
358371
CloseStatementRequest request = new CloseStatementRequest().setStatementId(statementId);
359372
String path = String.format(STATEMENT_PATH_WITH_ID, request.getStatementId());
@@ -371,6 +384,7 @@ public void closeStatement(StatementId typedStatementId) throws DatabricksSQLExc
371384
@Override
372385
public void cancelStatement(StatementId typedStatementId) throws DatabricksSQLException {
373386
String statementId = typedStatementId.toSQLExecStatementId();
387+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
374388
LOGGER.debug("public void cancelStatement(String statementId = {})", statementId);
375389
CancelStatementRequest request = new CancelStatementRequest().setStatementId(statementId);
376390
String path = String.format(CANCEL_STATEMENT_PATH_WITH_ID, request.getStatementId());
@@ -388,6 +402,7 @@ public void cancelStatement(StatementId typedStatementId) throws DatabricksSQLEx
388402
@Override
389403
public Collection<ExternalLink> getResultChunks(StatementId typedStatementId, long chunkIndex)
390404
throws DatabricksSQLException {
405+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
391406
String statementId = typedStatementId.toSQLExecStatementId();
392407
LOGGER.debug(
393408
"public Optional<ExternalLink> getResultChunk(String statementId = {}, long chunkIndex = {})",

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1010
import com.databricks.jdbc.common.DatabricksClientConfiguratorManager;
1111
import com.databricks.jdbc.common.StatementType;
12+
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
1213
import com.databricks.jdbc.common.util.DriverUtil;
1314
import com.databricks.jdbc.common.util.ProtocolFeatureUtil;
1415
import com.databricks.jdbc.dbclient.impl.common.StatementId;
@@ -221,6 +222,7 @@ DatabricksResultSet execute(
221222
checkResponseForErrors(response);
222223

223224
StatementId statementId = new StatementId(response.getOperationHandle().operationId);
225+
DatabricksThreadContextHolder.setStatementId(statementId);
224226
if (parentStatement != null) {
225227
parentStatement.setStatementId(statementId);
226228
}
@@ -322,6 +324,7 @@ DatabricksResultSet executeAsync(
322324
}
323325
}
324326
StatementId statementId = new StatementId(response.getOperationHandle().operationId);
327+
DatabricksThreadContextHolder.setStatementId(statementId);
325328
if (parentStatement != null) {
326329
parentStatement.setStatementId(statementId);
327330
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public ImmutableSessionInfo createSession(
110110
}
111111

112112
String sessionId = byteBufferToString(response.sessionHandle.getSessionId().guid);
113+
DatabricksThreadContextHolder.setSessionId(sessionId);
113114
LOGGER.debug("Session created with ID {}", sessionId);
114115
return ImmutableSessionInfo.builder()
115116
.sessionId(sessionId)
@@ -123,6 +124,7 @@ public void deleteSession(ImmutableSessionInfo sessionInfo) throws DatabricksSQL
123124
LOGGER.debug(
124125
String.format(
125126
"public void deleteSession(Session session = {%s}))", sessionInfo.toString()));
127+
DatabricksThreadContextHolder.setSessionId(sessionInfo.sessionId());
126128
TCloseSessionReq closeSessionReq =
127129
new TCloseSessionReq().setSessionHandle(sessionInfo.sessionHandle());
128130
TCloseSessionResp response =
@@ -191,7 +193,7 @@ private TExecuteStatementReq getRequest(
191193
IDatabricksStatementInternal parentStatement,
192194
boolean runAsync)
193195
throws SQLException {
194-
196+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
195197
TSparkArrowTypes arrowNativeTypes = new TSparkArrowTypes().setTimestampAsArrow(true);
196198

197199
// Convert the parameters to a list of TSparkParameter objects.
@@ -247,6 +249,7 @@ public void closeStatement(StatementId statementId) throws DatabricksSQLExceptio
247249
String.format(
248250
"public void closeStatement(String statementId = {%s}) using Thrift client",
249251
statementId));
252+
DatabricksThreadContextHolder.setStatementId(statementId);
250253
TCloseOperationReq request =
251254
new TCloseOperationReq().setOperationHandle(getOperationHandle(statementId));
252255
TCloseOperationResp resp = thriftAccessor.closeOperation(request);
@@ -259,6 +262,7 @@ public void cancelStatement(StatementId statementId) throws DatabricksSQLExcepti
259262
String.format(
260263
"public void cancelStatement(String statementId = {%s}) using Thrift client",
261264
statementId));
265+
DatabricksThreadContextHolder.setStatementId(statementId);
262266
TCancelOperationReq request =
263267
new TCancelOperationReq().setOperationHandle(getOperationHandle(statementId));
264268
TCancelOperationResp resp = thriftAccessor.cancelOperation(request);
@@ -275,6 +279,8 @@ public DatabricksResultSet getStatementResult(
275279
String.format(
276280
"public DatabricksResultSet getStatementResult(String statementId = {%s}) using Thrift client",
277281
statementId));
282+
DatabricksThreadContextHolder.setStatementId(statementId);
283+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
278284
return thriftAccessor.getStatementResult(
279285
getOperationHandle(statementId), parentStatement, session);
280286
}
@@ -287,6 +293,7 @@ public Collection<ExternalLink> getResultChunks(StatementId statementId, long ch
287293
"public Optional<ExternalLink> getResultChunk(String statementId = {%s}, long chunkIndex = {%s}) using Thrift client",
288294
statementId, chunkIndex);
289295
LOGGER.debug(context);
296+
DatabricksThreadContextHolder.setStatementId(statementId);
290297
TFetchResultsResp fetchResultsResp;
291298
List<ExternalLink> externalLinks = new ArrayList<>();
292299
AtomicInteger index = new AtomicInteger(0);
@@ -318,6 +325,7 @@ public DatabricksResultSet listCatalogs(IDatabricksSession session) throws SQLEx
318325
String context =
319326
String.format("Fetching catalogs using Thrift client. Session {%s}", session.toString());
320327
LOGGER.debug(context);
328+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
321329
TGetCatalogsReq request =
322330
new TGetCatalogsReq()
323331
.setSessionHandle(Objects.requireNonNull(session.getSessionInfo()).sessionHandle());
@@ -337,6 +345,7 @@ public DatabricksResultSet listSchemas(
337345
"Fetching schemas using Thrift client. Session {%s}, catalog {%s}, schemaNamePattern {%s}",
338346
session.toString(), catalog, schemaNamePattern);
339347
LOGGER.debug(context);
348+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
340349
TGetSchemasReq request =
341350
new TGetSchemasReq()
342351
.setSessionHandle(Objects.requireNonNull(session.getSessionInfo()).sessionHandle())
@@ -365,6 +374,7 @@ public DatabricksResultSet listTables(
365374
"Fetching tables using Thrift client. Session {%s}, catalog {%s}, schemaNamePattern {%s}, tableNamePattern {%s}",
366375
session.toString(), catalog, schemaNamePattern, tableNamePattern);
367376
LOGGER.debug(context);
377+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
368378
TGetTablesReq request =
369379
new TGetTablesReq()
370380
.setSessionHandle(Objects.requireNonNull(session.getSessionInfo()).sessionHandle())
@@ -387,6 +397,7 @@ public DatabricksResultSet listTableTypes(IDatabricksSession session) {
387397
LOGGER.debug(
388398
String.format(
389399
"Fetching table types using Thrift client. Session {%s}", session.toString()));
400+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
390401
return metadataResultSetBuilder.getTableTypesResult();
391402
}
392403

@@ -403,6 +414,7 @@ public DatabricksResultSet listColumns(
403414
"Fetching columns using Thrift client. Session {%s}, catalog {%s}, schemaNamePattern {%s}, tableNamePattern {%s}, columnNamePattern {%s}",
404415
session.toString(), catalog, schemaNamePattern, tableNamePattern, columnNamePattern);
405416
LOGGER.debug(context);
417+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
406418
TGetColumnsReq request =
407419
new TGetColumnsReq()
408420
.setSessionHandle(Objects.requireNonNull(session.getSessionInfo()).sessionHandle())
@@ -429,6 +441,7 @@ public DatabricksResultSet listFunctions(
429441
String.format(
430442
"Fetching functions using Thrift client. Session {%s}, catalog {%s}, schemaNamePattern {%s}, functionNamePattern {%s}.",
431443
session.toString(), catalog, schemaNamePattern, functionNamePattern);
444+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
432445
LOGGER.debug(context);
433446
TGetFunctionsReq request =
434447
new TGetFunctionsReq()
@@ -452,6 +465,7 @@ public DatabricksResultSet listPrimaryKeys(
452465
"Fetching primary keys using Thrift client. session {%s}, catalog {%s}, schema {%s}, table {%s}",
453466
session.toString(), catalog, schema, table);
454467
LOGGER.debug(context);
468+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
455469
TGetPrimaryKeysReq request =
456470
new TGetPrimaryKeysReq()
457471
.setSessionHandle(Objects.requireNonNull(session.getSessionInfo()).sessionHandle())
@@ -474,6 +488,7 @@ public DatabricksResultSet listImportedKeys(
474488
"Fetching imported keys using Thrift client for session {%s}, catalog {%s}, schema {%s}, table {%s}",
475489
session.toString(), catalog, schema, table);
476490
LOGGER.debug(context);
491+
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
477492
// GetImportedKeys is implemented using GetCrossReferences
478493
// When only foreign table name is provided, we get imported keys
479494
TGetCrossReferenceReq request =

src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
99
import com.databricks.jdbc.common.util.DriverUtil;
1010
import com.databricks.jdbc.common.util.StringUtil;
11-
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1211
import com.databricks.jdbc.exception.DatabricksParsingException;
1312
import com.databricks.jdbc.log.JdbcLogger;
1413
import com.databricks.jdbc.log.JdbcLoggerFactory;
@@ -64,7 +63,6 @@ public static boolean isTelemetryAllowedForConnection(IDatabricksConnectionConte
6463
.isFeatureEnabled(TELEMETRY_FEATURE_FLAG_NAME);
6564
}
6665

67-
// TODO : add an export even before connection context is built
6866
public static void exportInitialTelemetryLog(IDatabricksConnectionContext connectionContext) {
6967
if (connectionContext == null) {
7068
return;
@@ -123,26 +121,27 @@ public static void exportLatencyLog(long executionTime) {
123121
DatabricksThreadContextHolder.getConnectionContext(),
124122
executionTime,
125123
executionEvent,
126-
DatabricksThreadContextHolder.getStatementId());
124+
DatabricksThreadContextHolder.getStatementId(),
125+
DatabricksThreadContextHolder.getSessionId());
127126
}
128127

129128
@VisibleForTesting
130129
static void exportLatencyLog(
131130
IDatabricksConnectionContext connectionContext,
132131
long latencyMilliseconds,
133132
SqlExecutionEvent executionEvent,
134-
StatementId statementId) {
133+
String statementId,
134+
String sessionId) {
135135
// Though we already handle null connectionContext in the downstream implementation,
136136
// we are adding this check for extra sanity
137137
if (connectionContext != null) {
138138
TelemetryEvent telemetryEvent =
139139
new TelemetryEvent()
140140
.setLatency(latencyMilliseconds)
141141
.setSqlOperation(executionEvent)
142-
.setDriverConnectionParameters(getDriverConnectionParameter(connectionContext));
143-
if (statementId != null) {
144-
telemetryEvent.setSqlStatementId(statementId.toString());
145-
}
142+
.setDriverConnectionParameters(getDriverConnectionParameter(connectionContext))
143+
.setSqlStatementId(statementId)
144+
.setSessionId(sessionId);
146145
TelemetryFrontendLog telemetryFrontendLog =
147146
new TelemetryFrontendLog()
148147
.setFrontendLogEventId(getEventUUID())

src/test/java/com/databricks/jdbc/TestConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class TestConstants {
3232
public static final String TEST_FOREIGN_TABLE = "foreignTable";
3333
public static final String TEST_FUNCTION_PATTERN = "functionPattern";
3434
public static final String TEST_STRING = "test";
35+
public static final String TEST_STRING_2 = "test2";
3536
public static final String TEST_USER = "testUser";
3637
public static final String TEST_PASSWORD = "testPassword";
3738
public static final StatementId TEST_STATEMENT_ID = new StatementId("statement_id");

0 commit comments

Comments
 (0)