Skip to content

Commit bf0393c

Browse files
committed
Keep the latencyDetails for a later pr, reverting to keep this PR clean
1 parent afbf8f8 commit bf0393c

4 files changed

Lines changed: 188 additions & 369 deletions

File tree

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,15 @@ private TGetOperationStatusResp getOperationStatus(
754754
long operationStatusStartTime = System.nanoTime();
755755
TGetOperationStatusResp operationStatus = getThriftClient().GetOperationStatus(statusReq);
756756
long operationStatusEndTime = System.nanoTime();
757+
long operationStatusLatencyMillis =
758+
(operationStatusEndTime - operationStatusStartTime) / 1_000_000;
759+
LOGGER.debug(
760+
"Statement [{}] Thrift operation status latency: {}ms",
761+
statementId,
762+
operationStatusLatencyMillis);
763+
764+
// TODO: Export operation status latency to telemetry
765+
757766
return operationStatus;
758767
}
759768
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package com.databricks.jdbc.telemetry.latency;
2+
3+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
4+
import com.databricks.jdbc.log.JdbcLogger;
5+
import com.databricks.jdbc.log.JdbcLoggerFactory;
6+
import com.databricks.jdbc.model.telemetry.latency.ChunkDetails;
7+
import java.util.Collections;
8+
import java.util.Map;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
11+
/**
12+
* Handler for tracking chunk-related latency metrics for Databricks JDBC driver. This class manages
13+
* per-statement chunk details and provides logic for data collection.
14+
*/
15+
public class ChunkLatencyHandler {
16+
17+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkLatencyHandler.class);
18+
19+
// Singleton instance for global access
20+
private static final ChunkLatencyHandler INSTANCE = new ChunkLatencyHandler();
21+
22+
// Per-statement latency tracking using ChunkDetails
23+
private final ConcurrentHashMap<String, ChunkDetails> statementTrackers =
24+
new ConcurrentHashMap<>();
25+
26+
private ChunkLatencyHandler() {
27+
// Private constructor for singleton
28+
}
29+
30+
public static ChunkLatencyHandler getInstance() {
31+
return INSTANCE;
32+
}
33+
34+
/**
35+
* Initialize tracking for a statement with the total number of chunks.
36+
*
37+
* @param statementId the statement ID
38+
* @param totalChunks the total number of chunks in the result set
39+
*/
40+
public void initializeStatement(StatementId statementId, long totalChunks) {
41+
if (statementId == null) {
42+
LOGGER.trace("Statement ID is null, skipping initialization");
43+
return;
44+
}
45+
statementTrackers.put(statementId.toString(), new ChunkDetails(totalChunks));
46+
LOGGER.trace(
47+
"Initialized chunk tracking for statement {} with {} total chunks",
48+
statementId.toString(),
49+
totalChunks);
50+
}
51+
52+
/**
53+
* Records the latency for downloading a chunk and updates metrics.
54+
*
55+
* @param statementId the statement ID
56+
* @param chunkIndex the index of the chunk being downloaded
57+
* @param latencyMillis the time taken to download the chunk in milliseconds
58+
*/
59+
public void recordChunkDownloadLatency(String statementId, long chunkIndex, long latencyMillis) {
60+
if (statementId == null) {
61+
LOGGER.trace("Statement ID is null, skipping chunk latency recording");
62+
return;
63+
}
64+
65+
ChunkDetails chunkDetails =
66+
statementTrackers.computeIfAbsent(statementId, k -> new ChunkDetails(0));
67+
68+
// Record initial chunk latency (first chunk downloaded)
69+
if (chunkIndex == 0) {
70+
chunkDetails.setInitialChunkLatencyMillis(latencyMillis);
71+
}
72+
73+
// Update slowest chunk latency
74+
Long currentSlowest = chunkDetails.getSlowestChunkLatencyMillis();
75+
if (currentSlowest == null || latencyMillis > currentSlowest) {
76+
chunkDetails.setSlowestChunkLatencyMillis(latencyMillis);
77+
}
78+
79+
// Add to sum of all chunk download times
80+
Long currentSum = chunkDetails.getSumChunksDownloadTimeMillis();
81+
if (currentSum == null) {
82+
currentSum = 0L;
83+
}
84+
chunkDetails.setSumChunksDownloadTimeMillis(currentSum + latencyMillis);
85+
86+
LOGGER.trace(
87+
"Recorded chunk {} latency: {}ms for statement {}", chunkIndex, latencyMillis, statementId);
88+
}
89+
90+
/**
91+
* Records when a chunk is iterated/consumed by the result set.
92+
*
93+
* @param statementId the statement ID
94+
* @param chunkIndex the index of the chunk being iterated
95+
*/
96+
public void recordChunkIteration(String statementId, long chunkIndex) {
97+
if (statementId == null) {
98+
return;
99+
}
100+
101+
ChunkDetails chunkDetails = statementTrackers.get(statementId);
102+
if (chunkDetails != null) {
103+
Long currentIterated = chunkDetails.getTotalChunksIterated();
104+
if (currentIterated == null) {
105+
currentIterated = 0L;
106+
}
107+
chunkDetails.setTotalChunksIterated(currentIterated + 1);
108+
LOGGER.trace("Recorded chunk {} iteration for statement {}", chunkIndex, statementId);
109+
}
110+
}
111+
112+
/**
113+
* Gets the collected chunk details for a statement without removing the tracker.
114+
*
115+
* @param statementId the statement ID
116+
* @return the ChunkDetails object or null if no tracker found
117+
*/
118+
public ChunkDetails getChunkDetails(String statementId) {
119+
if (statementId == null) {
120+
return null;
121+
}
122+
123+
return statementTrackers.get(statementId);
124+
}
125+
126+
/**
127+
* Gets the collected chunk details and removes the tracker from memory (cleanup).
128+
*
129+
* @param statementId the statement ID
130+
* @return the ChunkDetails object or null if no tracker found
131+
*/
132+
public ChunkDetails getChunkDetailsAndCleanup(String statementId) {
133+
if (statementId == null) {
134+
return null;
135+
}
136+
137+
ChunkDetails chunkDetails = statementTrackers.remove(statementId);
138+
if (chunkDetails == null) {
139+
LOGGER.trace("No chunk latency telemetry found for statement {}", statementId);
140+
return null;
141+
}
142+
return chunkDetails;
143+
}
144+
145+
/**
146+
* Clears tracking data for a statement (useful for cleanup).
147+
*
148+
* @param statementId the statement ID to clear tracking for
149+
*/
150+
public void clearStatement(StatementId statementId) {
151+
if (statementId == null) {
152+
LOGGER.trace("Statement ID is null, skipping cleanup");
153+
return;
154+
}
155+
156+
String statementIdStr = statementId.toString();
157+
statementTrackers.remove(statementIdStr);
158+
LOGGER.trace("Cleared tracking for statement {}", statementIdStr);
159+
}
160+
161+
/**
162+
* Gets all pending chunk details and clears the trackers. This method is called when the
163+
* connection/client is being closed.
164+
*
165+
* @return a map of statement ID to ChunkDetails for all pending statements
166+
*/
167+
public Map<String, ChunkDetails> getAllPendingChunkDetails() {
168+
if (statementTrackers.isEmpty()) {
169+
return Collections.emptyMap();
170+
}
171+
172+
LOGGER.trace(
173+
"Retrieved {} pending chunk details for telemetry export", statementTrackers.size());
174+
175+
Map<String, ChunkDetails> pendingDetails = new ConcurrentHashMap<>(statementTrackers);
176+
statementTrackers.clear();
177+
return pendingDetails;
178+
}
179+
}

src/main/java/com/databricks/jdbc/telemetry/latency/StatementLatencyDetails.java

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)