Skip to content

Commit ba4c0dc

Browse files
authored
[PECOBLR-591] Add chunk latency telemetry (#877)
* Add chunk latency * Add chunk latency logs * Add tests
1 parent 9bf1d27 commit ba4c0dc

13 files changed

Lines changed: 709 additions & 33 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.databricks.jdbc.log.JdbcLoggerFactory;
2424
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
2525
import com.databricks.jdbc.telemetry.TelemetryClientFactory;
26+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
2627
import com.google.common.annotations.VisibleForTesting;
2728
import java.sql.*;
2829
import java.util.*;
@@ -150,6 +151,7 @@ public void close() throws DatabricksSQLException {
150151
LOGGER.debug("public void close()");
151152
for (IDatabricksStatementInternal statement : statementSet) {
152153
statement.close(false);
154+
ChunkLatencyHandler.getInstance().clearStatement(statement.getStatementId());
153155
statementSet.remove(statement);
154156
}
155157
this.session.close();

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3-
import static com.databricks.jdbc.telemetry.TelemetryHelper.exportLatencyLog;
4-
53
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
64
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
75
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
8-
import com.databricks.jdbc.exception.DatabricksParsingException;
96
import com.databricks.jdbc.exception.DatabricksSQLException;
107
import com.databricks.jdbc.log.JdbcLogger;
118
import com.databricks.jdbc.log.JdbcLoggerFactory;
129
import com.databricks.jdbc.model.core.ExternalLink;
1310
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
11+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
1412
import java.io.IOException;
1513
import java.util.Arrays;
1614
import java.util.concurrent.ExecutionException;
@@ -49,7 +47,6 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
4947
boolean downloadSuccessful = false;
5048

5149
// Sets context in the newly spawned thread
52-
DatabricksThreadContextHolder.setChunkId(chunk.getChunkIndex());
5350
DatabricksThreadContextHolder.setConnectionContext(this.connectionContext);
5451
DatabricksThreadContextHolder.setStatementId(this.statementId);
5552

@@ -67,7 +64,13 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
6764

6865
chunk.downloadData(httpClient, chunkDownloader.getCompressionCodec());
6966
downloadSuccessful = true;
70-
} catch (DatabricksParsingException | IOException e) {
67+
68+
// Record chunk download latency on successful download
69+
long downloadLatency = System.currentTimeMillis() - startTime;
70+
ChunkLatencyHandler.getInstance()
71+
.recordChunkDownloadLatency(statementId, chunk.getChunkIndex(), downloadLatency);
72+
73+
} catch (IOException | DatabricksSQLException e) {
7174
retries++;
7275
if (retries >= MAX_RETRIES) {
7376
LOGGER.error(
@@ -80,7 +83,9 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
8083
throw new DatabricksSQLException(
8184
"Failed to download chunk after multiple attempts",
8285
e,
83-
DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR);
86+
statementId,
87+
chunk.getChunkIndex(),
88+
DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR.name());
8489
} else {
8590
LOGGER.warn(
8691
String.format(
@@ -110,7 +115,6 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
110115
chunk.setStatus(ArrowResultChunk.ChunkStatus.DOWNLOAD_FAILED);
111116
}
112117

113-
exportLatencyLog(System.currentTimeMillis() - startTime);
114118
chunkDownloader.downloadProcessed(chunk.getChunkIndex());
115119
DatabricksThreadContextHolder.clearAllContext();
116120
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkLinkDownloadService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.databricks.jdbc.log.JdbcLogger;
1111
import com.databricks.jdbc.log.JdbcLoggerFactory;
1212
import com.databricks.jdbc.model.core.ExternalLink;
13+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
1314
import java.time.Instant;
1415
import java.util.Collection;
1516
import java.util.Map;
@@ -103,6 +104,7 @@ public ChunkLinkDownloadService(
103104
this.isDownloadInProgress = new AtomicBoolean(false);
104105
this.isDownloadChainStarted = new AtomicBoolean(false);
105106
this.isShutdown = false;
107+
ChunkLatencyHandler.getInstance().initializeStatement(statementId, totalChunks);
106108

107109
this.chunkIndexToLinkFuture = new ConcurrentHashMap<>();
108110
for (long i = 0; i < totalChunks; i++) {

src/main/java/com/databricks/jdbc/common/util/DatabricksThreadContextHolder.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ public class DatabricksThreadContextHolder {
99
private static final ThreadLocal<IDatabricksConnectionContext> localConnectionContext =
1010
new ThreadLocal<>();
1111
private static final ThreadLocal<String> localStatementId = new ThreadLocal<>();
12-
private static final ThreadLocal<Long> localChunkId = new ThreadLocal<>();
1312
private static final ThreadLocal<Integer> localRetryCount = new ThreadLocal<>();
1413
private static final ThreadLocal<StatementType> localStatementType = new ThreadLocal<>();
1514
private static final ThreadLocal<String> localSessionId = new ThreadLocal<>();
@@ -61,21 +60,12 @@ public static StatementType getStatementType() {
6160
return localStatementType.get();
6261
}
6362

64-
public static void setChunkId(Long chunkId) {
65-
localChunkId.set(chunkId);
66-
}
67-
68-
public static Long getChunkId() {
69-
return localChunkId.get();
70-
}
71-
7263
public static void clearConnectionContext() {
7364
localConnectionContext.remove();
7465
}
7566

7667
public static void clearStatementInfo() {
7768
localStatementId.remove();
78-
localChunkId.remove();
7969
localStatementType.remove();
8070
localRetryCount.remove();
8171
}

src/main/java/com/databricks/jdbc/exception/DatabricksSQLException.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ public DatabricksSQLException(String reason, Throwable cause, String sqlState) {
2525
reason);
2626
}
2727

28+
// This constructor is used to export chunk download failure logs
29+
public DatabricksSQLException(
30+
String reason, Throwable cause, String statementId, Long chunkIndex, String sqlState) {
31+
super(reason, sqlState, cause);
32+
exportFailureLog(
33+
DatabricksThreadContextHolder.getConnectionContext(),
34+
DatabricksDriverErrorCode.CONNECTION_ERROR.name(),
35+
reason,
36+
chunkIndex,
37+
statementId);
38+
}
39+
2840
public DatabricksSQLException(String reason, String sqlState) {
2941
super(reason, sqlState);
3042
exportFailureLog(DatabricksThreadContextHolder.getConnectionContext(), sqlState, reason);

src/main/java/com/databricks/jdbc/model/telemetry/DriverConnectionParameters.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,6 @@ public class DriverConnectionParameters {
121121
@JsonProperty("enable_sea_hybrid_results")
122122
boolean enableSeaHybridResults;
123123

124-
@JsonProperty("enable_complex_datatype_support")
125-
boolean enableComplexSupport;
126-
127124
@JsonProperty("allow_self_signed_support")
128125
boolean allowSelfSignedSupport;
129126

@@ -331,11 +328,6 @@ public DriverConnectionParameters setEnableSeaHybridResults(boolean enableSeaHyb
331328
return this;
332329
}
333330

334-
public DriverConnectionParameters setEnableComplexSupport(boolean enableComplexSupport) {
335-
this.enableComplexSupport = enableComplexSupport;
336-
return this;
337-
}
338-
339331
public DriverConnectionParameters setAllowSelfSignedSupport(boolean allowSelfSignedSupport) {
340332
this.allowSelfSignedSupport = allowSelfSignedSupport;
341333
return this;
@@ -397,7 +389,6 @@ public String toString() {
397389
.add("nonProxyHosts", nonProxyHosts)
398390
.add("httpConnectionPoolSize", httpConnectionPoolSize)
399391
.add("enableSeaHybridResults", enableSeaHybridResults)
400-
.add("enableComplexSupport", enableComplexSupport)
401392
.add("allowSelfSignedSupport", allowSelfSignedSupport)
402393
.add("useSystemTrustStore", useSystemTrustStore)
403394
.add("rowsFetchedPerBlock", rowsFetchedPerBlock)

src/main/java/com/databricks/jdbc/model/telemetry/latency/ChunkDetails.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,20 @@ public class ChunkDetails {
1212
private Long slowestChunkLatencyMillis;
1313

1414
@JsonProperty("total_chunks_present")
15-
private Integer totalChunksPresent;
15+
private Long totalChunksPresent;
1616

1717
@JsonProperty("total_chunks_iterated")
18-
private Integer totalChunksIterated;
18+
private Long totalChunksIterated;
1919

2020
@JsonProperty("sum_chunks_download_time_millis")
2121
private Long sumChunksDownloadTimeMillis;
2222

23+
public ChunkDetails(long totalChunks) {
24+
this.totalChunksIterated = 0L;
25+
this.sumChunksDownloadTimeMillis = 0L;
26+
this.totalChunksPresent = totalChunks;
27+
}
28+
2329
public ChunkDetails setInitialChunkLatencyMillis(Long initialChunkLatencyMillis) {
2430
this.initialChunkLatencyMillis = initialChunkLatencyMillis;
2531
return this;
@@ -30,12 +36,12 @@ public ChunkDetails setSlowestChunkLatencyMillis(Long slowestChunkLatencyMillis)
3036
return this;
3137
}
3238

33-
public ChunkDetails setTotalChunksPresent(Integer totalChunksPresent) {
39+
public ChunkDetails setTotalChunksPresent(Long totalChunksPresent) {
3440
this.totalChunksPresent = totalChunksPresent;
3541
return this;
3642
}
3743

38-
public ChunkDetails setTotalChunksIterated(Integer totalChunksIterated) {
44+
public ChunkDetails setTotalChunksIterated(Long totalChunksIterated) {
3945
this.totalChunksIterated = totalChunksIterated;
4046
return this;
4147
}
@@ -45,6 +51,26 @@ public ChunkDetails setSumChunksDownloadTimeMillis(Long sumChunksDownloadTimeMil
4551
return this;
4652
}
4753

54+
public Long getInitialChunkLatencyMillis() {
55+
return initialChunkLatencyMillis;
56+
}
57+
58+
public Long getSlowestChunkLatencyMillis() {
59+
return slowestChunkLatencyMillis;
60+
}
61+
62+
public Long getTotalChunksPresent() {
63+
return totalChunksPresent;
64+
}
65+
66+
public Long getTotalChunksIterated() {
67+
return totalChunksIterated;
68+
}
69+
70+
public Long getSumChunksDownloadTimeMillis() {
71+
return sumChunksDownloadTimeMillis;
72+
}
73+
4874
@Override
4975
public String toString() {
5076
return new ToStringer(ChunkDetails.class)

src/main/java/com/databricks/jdbc/telemetry/ITelemetryClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ public interface ITelemetryClient {
66
void exportEvent(TelemetryFrontendLog event);
77

88
void close();
9+
10+
void closeStatement(String statementId);
911
}

src/main/java/com/databricks/jdbc/telemetry/NoopTelemetryClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,9 @@ public void exportEvent(TelemetryFrontendLog event) {
2525
public void close() {
2626
// do nothing
2727
}
28+
29+
@Override
30+
public void closeStatement(String statementId) {
31+
// do nothing
32+
}
2833
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryClient.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
44
import com.databricks.jdbc.model.telemetry.TelemetryFrontendLog;
5+
import com.databricks.jdbc.model.telemetry.latency.ChunkDetails;
6+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
57
import com.databricks.sdk.core.DatabricksConfig;
68
import java.util.LinkedList;
79
import java.util.List;
@@ -51,6 +53,23 @@ public void exportEvent(TelemetryFrontendLog event) {
5153

5254
@Override
5355
public void close() {
56+
// Export any pending chunk latency telemetry before flushing
57+
ChunkLatencyHandler.getInstance()
58+
.getAllPendingChunkDetails()
59+
.forEach(
60+
(statementId, chunkDetails) -> {
61+
TelemetryHelper.exportChunkLatencyTelemetry(chunkDetails, statementId);
62+
});
63+
flush();
64+
}
65+
66+
@Override
67+
public void closeStatement(String statementId) {
68+
ChunkDetails chunkDetails =
69+
ChunkLatencyHandler.getInstance().getChunkDetailsAndCleanup(statementId);
70+
if (chunkDetails != null) {
71+
TelemetryHelper.exportChunkLatencyTelemetry(chunkDetails, statementId);
72+
}
5473
flush();
5574
}
5675

0 commit comments

Comments
 (0)