forked from databricks/databricks-jdbc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTelemetryClient.java
More file actions
109 lines (97 loc) · 3.63 KB
/
TelemetryClient.java
File metadata and controls
109 lines (97 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.databricks.jdbc.telemetry;
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.model.telemetry.TelemetryFrontendLog;
import com.databricks.jdbc.model.telemetry.latency.ChunkDetails;
import com.databricks.jdbc.telemetry.latency.ChunkLatencyHandler;
import com.databricks.sdk.core.DatabricksConfig;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TelemetryClient implements ITelemetryClient {
private final IDatabricksConnectionContext context;
private final DatabricksConfig databricksConfig;
private final int eventsBatchSize;
private final boolean isAuthEnabled;
private final ExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
private List<TelemetryFrontendLog> eventsBatch;
public TelemetryClient(
IDatabricksConnectionContext connectionContext,
ExecutorService executorService,
DatabricksConfig config) {
this.eventsBatch = new LinkedList<>();
this.eventsBatchSize = connectionContext.getTelemetryBatchSize();
this.isAuthEnabled = true;
this.context = connectionContext;
this.databricksConfig = config;
this.executorService = executorService;
this.scheduledExecutorService =
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
schedulePeriodicFlush();
}
public TelemetryClient(
IDatabricksConnectionContext connectionContext, ExecutorService executorService) {
this.eventsBatch = new LinkedList<>();
this.eventsBatchSize = connectionContext.getTelemetryBatchSize();
this.isAuthEnabled = false;
this.context = connectionContext;
this.databricksConfig = null;
this.executorService = executorService;
this.scheduledExecutorService =
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
schedulePeriodicFlush();
}
private void schedulePeriodicFlush() {
// Ensure minimum 1 second interval to avoid over-calling flush
int intervalMillis = Math.max(1000, context.getTelemetryFlushIntervalInMilliseconds());
scheduledExecutorService.scheduleAtFixedRate(
this::flush, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS);
}
@Override
public void exportEvent(TelemetryFrontendLog event) {
synchronized (this) {
eventsBatch.add(event);
}
if (eventsBatch.size() == eventsBatchSize) {
flush();
}
}
@Override
public void close() {
// Export any pending chunk latency telemetry before flushing
ChunkLatencyHandler.getInstance()
.getAllPendingChunkDetails()
.forEach(
(statementId, chunkDetails) -> {
TelemetryHelper.exportChunkLatencyTelemetry(chunkDetails, statementId);
});
flush();
scheduledExecutorService.shutdown();
}
@Override
public void closeStatement(String statementId) {
ChunkDetails chunkDetails =
ChunkLatencyHandler.getInstance().getChunkDetailsAndCleanup(statementId);
if (chunkDetails != null) {
TelemetryHelper.exportChunkLatencyTelemetry(chunkDetails, statementId);
}
flush();
}
private void flush() {
synchronized (this) {
if (!eventsBatch.isEmpty()) {
List<TelemetryFrontendLog> logsToBeFlushed = eventsBatch;
executorService.submit(
new TelemetryPushTask(logsToBeFlushed, isAuthEnabled, context, databricksConfig));
eventsBatch = new LinkedList<>();
}
}
}
int getCurrentSize() {
synchronized (this) {
return eventsBatch.size();
}
}
}