Skip to content

Commit e9045a5

Browse files
committed
Add chunk latency
1 parent 9112ded commit e9045a5

7 files changed

Lines changed: 196 additions & 19 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.databricks.jdbc.log.JdbcLoggerFactory;
2424
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
2525
import com.databricks.jdbc.telemetry.TelemetryClientFactory;
26+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
2627
import com.google.common.annotations.VisibleForTesting;
2728
import java.sql.*;
2829
import java.util.*;
@@ -150,6 +151,7 @@ public void close() throws DatabricksSQLException {
150151
LOGGER.debug("public void close()");
151152
for (IDatabricksStatementInternal statement : statementSet) {
152153
statement.close(false);
154+
ChunkLatencyHandler.getInstance().clearStatement(statement.getStatementId());
153155
statementSet.remove(statement);
154156
}
155157
this.session.close();

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3-
import static com.databricks.jdbc.telemetry.TelemetryHelper.exportLatencyLog;
4-
53
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
64
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
75
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
8-
import com.databricks.jdbc.exception.DatabricksParsingException;
96
import com.databricks.jdbc.exception.DatabricksSQLException;
107
import com.databricks.jdbc.log.JdbcLogger;
118
import com.databricks.jdbc.log.JdbcLoggerFactory;
129
import com.databricks.jdbc.model.core.ExternalLink;
1310
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
11+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
1412
import java.io.IOException;
1513
import java.util.Arrays;
1614
import java.util.concurrent.ExecutionException;
@@ -67,7 +65,13 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
6765

6866
chunk.downloadData(httpClient, chunkDownloader.getCompressionCodec());
6967
downloadSuccessful = true;
70-
} catch (DatabricksParsingException | IOException e) {
68+
69+
// Record chunk download latency on successful download
70+
long downloadLatency = System.currentTimeMillis() - startTime;
71+
ChunkLatencyHandler.getInstance()
72+
.recordChunkDownloadLatency(statementId, chunk.getChunkIndex(), downloadLatency);
73+
74+
} catch (IOException | DatabricksSQLException e) {
7175
retries++;
7276
if (retries >= MAX_RETRIES) {
7377
LOGGER.error(
@@ -110,7 +114,6 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
110114
chunk.setStatus(ArrowResultChunk.ChunkStatus.DOWNLOAD_FAILED);
111115
}
112116

113-
exportLatencyLog(System.currentTimeMillis() - startTime);
114117
chunkDownloader.downloadProcessed(chunk.getChunkIndex());
115118
DatabricksThreadContextHolder.clearAllContext();
116119
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkLinkDownloadService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.databricks.jdbc.log.JdbcLogger;
1111
import com.databricks.jdbc.log.JdbcLoggerFactory;
1212
import com.databricks.jdbc.model.core.ExternalLink;
13+
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
1314
import java.time.Instant;
1415
import java.util.Collection;
1516
import java.util.Map;
@@ -103,6 +104,7 @@ public ChunkLinkDownloadService(
103104
this.isDownloadInProgress = new AtomicBoolean(false);
104105
this.isDownloadChainStarted = new AtomicBoolean(false);
105106
this.isShutdown = false;
107+
ChunkLatencyHandler.getInstance().initializeStatement(statementId, totalChunks);
106108

107109
this.chunkIndexToLinkFuture = new ConcurrentHashMap<>();
108110
for (long i = 0; i < totalChunks; i++) {

src/main/java/com/databricks/jdbc/model/telemetry/DriverConnectionParameters.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,6 @@ public class DriverConnectionParameters {
121121
@JsonProperty("enable_sea_hybrid_results")
122122
boolean enableSeaHybridResults;
123123

124-
@JsonProperty("enable_complex_datatype_support")
125-
boolean enableComplexSupport;
126-
127124
@JsonProperty("allow_self_signed_support")
128125
boolean allowSelfSignedSupport;
129126

@@ -331,11 +328,6 @@ public DriverConnectionParameters setEnableSeaHybridResults(boolean enableSeaHyb
331328
return this;
332329
}
333330

334-
public DriverConnectionParameters setEnableComplexSupport(boolean enableComplexSupport) {
335-
this.enableComplexSupport = enableComplexSupport;
336-
return this;
337-
}
338-
339331
public DriverConnectionParameters setAllowSelfSignedSupport(boolean allowSelfSignedSupport) {
340332
this.allowSelfSignedSupport = allowSelfSignedSupport;
341333
return this;
@@ -397,7 +389,6 @@ public String toString() {
397389
.add("nonProxyHosts", nonProxyHosts)
398390
.add("httpConnectionPoolSize", httpConnectionPoolSize)
399391
.add("enableSeaHybridResults", enableSeaHybridResults)
400-
.add("enableComplexSupport", enableComplexSupport)
401392
.add("allowSelfSignedSupport", allowSelfSignedSupport)
402393
.add("useSystemTrustStore", useSystemTrustStore)
403394
.add("rowsFetchedPerBlock", rowsFetchedPerBlock)

src/main/java/com/databricks/jdbc/model/telemetry/latency/ChunkDetails.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,20 @@ public class ChunkDetails {
1212
private Long slowestChunkLatencyMillis;
1313

1414
@JsonProperty("total_chunks_present")
15-
private Integer totalChunksPresent;
15+
private Long totalChunksPresent;
1616

1717
@JsonProperty("total_chunks_iterated")
18-
private Integer totalChunksIterated;
18+
private Long totalChunksIterated;
1919

2020
@JsonProperty("sum_chunks_download_time_millis")
2121
private Long sumChunksDownloadTimeMillis;
2222

23+
public ChunkDetails(long totalChunks) {
24+
this.totalChunksIterated = 0L;
25+
this.sumChunksDownloadTimeMillis = 0L;
26+
this.totalChunksPresent = totalChunks;
27+
}
28+
2329
public ChunkDetails setInitialChunkLatencyMillis(Long initialChunkLatencyMillis) {
2430
this.initialChunkLatencyMillis = initialChunkLatencyMillis;
2531
return this;
@@ -30,12 +36,12 @@ public ChunkDetails setSlowestChunkLatencyMillis(Long slowestChunkLatencyMillis)
3036
return this;
3137
}
3238

33-
public ChunkDetails setTotalChunksPresent(Integer totalChunksPresent) {
39+
public ChunkDetails setTotalChunksPresent(Long totalChunksPresent) {
3440
this.totalChunksPresent = totalChunksPresent;
3541
return this;
3642
}
3743

38-
public ChunkDetails setTotalChunksIterated(Integer totalChunksIterated) {
44+
public ChunkDetails setTotalChunksIterated(Long totalChunksIterated) {
3945
this.totalChunksIterated = totalChunksIterated;
4046
return this;
4147
}
@@ -45,6 +51,22 @@ public ChunkDetails setSumChunksDownloadTimeMillis(Long sumChunksDownloadTimeMil
4551
return this;
4652
}
4753

54+
public Long getSlowestChunkLatencyMillis() {
55+
return slowestChunkLatencyMillis;
56+
}
57+
58+
public Long getTotalChunksPresent() {
59+
return totalChunksPresent;
60+
}
61+
62+
public Long getTotalChunksIterated() {
63+
return totalChunksIterated;
64+
}
65+
66+
public Long getSumChunksDownloadTimeMillis() {
67+
return sumChunksDownloadTimeMillis;
68+
}
69+
4870
@Override
4971
public String toString() {
5072
return new ToStringer(ChunkDetails.class)

src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ private static DriverConnectionParameters buildDriverConnectionParameters(
231231
.setNonProxyHosts(StringUtil.split(connectionContext.getNonProxyHosts()))
232232
.setHttpConnectionPoolSize(connectionContext.getHttpConnectionPoolSize())
233233
.setEnableSeaHybridResults(connectionContext.isSqlExecHybridResultsEnabled())
234-
.setEnableComplexSupport(connectionContext.isComplexDatatypeSupportEnabled())
235234
.setAllowSelfSignedSupport(connectionContext.allowSelfSignedCerts())
236235
.setUseSystemTrustStore(connectionContext.useSystemTrustStore())
237236
.setRowsFetchedPerBlock(connectionContext.getRowsFetchedPerBlock())
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package com.databricks.jdbc.telemetry.latency;
2+
3+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
4+
import com.databricks.jdbc.log.JdbcLogger;
5+
import com.databricks.jdbc.log.JdbcLoggerFactory;
6+
import com.databricks.jdbc.model.telemetry.latency.ChunkDetails;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
9+
/**
10+
* Handler for tracking chunk-related latency metrics for Databricks JDBC driver. This class manages
11+
* per-statement chunk details and provides logic for data collection.
12+
*/
13+
public class ChunkLatencyHandler {
14+
15+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkLatencyHandler.class);
16+
17+
// Singleton instance for global access
18+
private static final ChunkLatencyHandler INSTANCE = new ChunkLatencyHandler();
19+
20+
// Per-statement latency tracking using ChunkDetails
21+
private final ConcurrentHashMap<String, ChunkDetails> statementTrackers =
22+
new ConcurrentHashMap<>();
23+
24+
private ChunkLatencyHandler() {
25+
// Private constructor for singleton
26+
}
27+
28+
public static ChunkLatencyHandler getInstance() {
29+
return INSTANCE;
30+
}
31+
32+
/**
33+
* Initialize tracking for a statement with the total number of chunks.
34+
*
35+
* @param statementId the statement ID
36+
* @param totalChunks the total number of chunks in the result set
37+
*/
38+
public void initializeStatement(StatementId statementId, long totalChunks) {
39+
if (statementId == null) {
40+
LOGGER.trace("Statement ID is null, skipping initialization");
41+
return;
42+
}
43+
statementTrackers.put(statementId.toString(), new ChunkDetails(totalChunks));
44+
LOGGER.trace(
45+
"Initialized chunk tracking for statement {} with {} total chunks",
46+
statementId.toString(),
47+
totalChunks);
48+
}
49+
50+
/**
51+
* Records the latency for downloading a chunk and updates metrics.
52+
*
53+
* @param statementId the statement ID
54+
* @param chunkIndex the index of the chunk being downloaded
55+
* @param latencyMillis the time taken to download the chunk in milliseconds
56+
*/
57+
public void recordChunkDownloadLatency(String statementId, long chunkIndex, long latencyMillis) {
58+
if (statementId == null) {
59+
LOGGER.trace("Statement ID is null, skipping chunk latency recording");
60+
return;
61+
}
62+
63+
ChunkDetails chunkDetails =
64+
statementTrackers.computeIfAbsent(statementId, k -> new ChunkDetails(0));
65+
66+
// Record initial chunk latency (first chunk downloaded)
67+
if (chunkIndex == 0) {
68+
chunkDetails.setInitialChunkLatencyMillis(latencyMillis);
69+
}
70+
71+
// Update slowest chunk latency
72+
Long currentSlowest = chunkDetails.getSlowestChunkLatencyMillis();
73+
if (currentSlowest == null || latencyMillis > currentSlowest) {
74+
chunkDetails.setSlowestChunkLatencyMillis(latencyMillis);
75+
}
76+
77+
// Add to sum of all chunk download times
78+
Long currentSum = chunkDetails.getSumChunksDownloadTimeMillis();
79+
if (currentSum == null) {
80+
currentSum = 0L;
81+
}
82+
chunkDetails.setSumChunksDownloadTimeMillis(currentSum + latencyMillis);
83+
84+
LOGGER.trace(
85+
"Recorded chunk {} latency: {}ms for statement {}", chunkIndex, latencyMillis, statementId);
86+
}
87+
88+
/**
89+
* Records when a chunk is iterated/consumed by the result set.
90+
*
91+
* @param statementId the statement ID
92+
* @param chunkIndex the index of the chunk being iterated
93+
*/
94+
public void recordChunkIteration(String statementId, long chunkIndex) {
95+
if (statementId == null) {
96+
return;
97+
}
98+
99+
ChunkDetails chunkDetails = statementTrackers.get(statementId);
100+
if (chunkDetails != null) {
101+
Long currentIterated = chunkDetails.getTotalChunksIterated();
102+
if (currentIterated == null) {
103+
currentIterated = 0L;
104+
}
105+
chunkDetails.setTotalChunksIterated(currentIterated + 1);
106+
LOGGER.trace("Recorded chunk {} iteration for statement {}", chunkIndex, statementId);
107+
}
108+
}
109+
110+
/**
111+
* Gets the collected chunk details for a statement without removing the tracker.
112+
*
113+
* @param statementId the statement ID
114+
* @return the ChunkDetails object or null if no tracker found
115+
*/
116+
public ChunkDetails getChunkDetails(String statementId) {
117+
if (statementId == null) {
118+
return null;
119+
}
120+
121+
return statementTrackers.get(statementId);
122+
}
123+
124+
/**
125+
* Gets the collected chunk details and removes the tracker from memory (cleanup).
126+
*
127+
* @param statementId the statement ID
128+
* @return the ChunkDetails object or null if no tracker found
129+
*/
130+
public ChunkDetails getChunkDetailsAndCleanup(String statementId) {
131+
if (statementId == null) {
132+
return null;
133+
}
134+
135+
ChunkDetails chunkDetails = statementTrackers.remove(statementId);
136+
if (chunkDetails == null) {
137+
LOGGER.trace("No chunk latency telemetry found for statement {}", statementId);
138+
return null;
139+
}
140+
return chunkDetails;
141+
}
142+
143+
/**
144+
* Clears tracking data for a statement (useful for cleanup).
145+
*
146+
* @param statementId the statement ID to clear tracking for
147+
*/
148+
public void clearStatement(StatementId statementId) {
149+
if (statementId == null) {
150+
LOGGER.trace("Statement ID is null, skipping cleanup");
151+
return;
152+
}
153+
154+
String statementIdStr = statementId.toString();
155+
statementTrackers.remove(statementIdStr);
156+
LOGGER.trace("Cleared tracking for statement {}", statementIdStr);
157+
}
158+
}

0 commit comments

Comments
 (0)