Skip to content

Commit a644512

Browse files
authored
Add time based flush + add force telemetry flag (#880)
* Add time based flush + add force telemetry flag * Address comments
1 parent d7d24da commit a644512

7 files changed

Lines changed: 192 additions & 7 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,4 +947,16 @@ private Map<String, String> parseCustomHeaders(ImmutableMap<String, String> para
947947
Collectors.toMap(
948948
entry -> entry.getKey().substring(filterPrefix.length()), Map.Entry::getValue));
949949
}
950+
951+
@Override
952+
public boolean forceEnableTelemetry() {
953+
return getParameter(DatabricksJdbcUrlParams.FORCE_ENABLE_TELEMETRY).equals("1");
954+
}
955+
956+
@Override
957+
public int getTelemetryFlushIntervalInMilliseconds() {
958+
// There is a minimum threshold of 1000ms for the flush interval
959+
return Math.max(
960+
1000, Integer.parseInt(getParameter(DatabricksJdbcUrlParams.TELEMETRY_FLUSH_INTERVAL)));
961+
}
950962
}

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,10 @@ public interface IDatabricksConnectionContext {
323323

324324
/** Returns the application name using JDBC Connection */
325325
String getApplicationName();
326+
327+
/** Returns whether telemetry is enabled for all connections */
328+
boolean forceEnableTelemetry();
329+
330+
/** Returns the flush interval in milliseconds for telemetry */
331+
int getTelemetryFlushIntervalInMilliseconds();
326332
}

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ public enum DatabricksJdbcUrlParams {
130130
TOKEN_CACHE_PASS_PHRASE("TokenCachePassPhrase", "Pass phrase to use for OAuth U2M Token Cache"),
131131
ENABLE_TOKEN_CACHE("EnableTokenCache", "Enable caching OAuth tokens", "1"),
132132
APPLICATION_NAME("ApplicationName", "Name of application using the driver", ""),
133+
FORCE_ENABLE_TELEMETRY("ForceEnableTelemetry", "Force enable telemetry", "0"),
134+
TELEMETRY_FLUSH_INTERVAL("TelemetryFlushInterval", "Flush interval in milliseconds", "5000"),
133135
;
134136

135137
private final String paramName;

src/main/java/com/databricks/jdbc/telemetry/TelemetryClient.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import java.util.LinkedList;
99
import java.util.List;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.ScheduledFuture;
13+
import java.util.concurrent.TimeUnit;
1114

1215
public class TelemetryClient implements ITelemetryClient {
1316

@@ -16,7 +19,11 @@ public class TelemetryClient implements ITelemetryClient {
1619
private final int eventsBatchSize;
1720
private final boolean isAuthEnabled;
1821
private final ExecutorService executorService;
22+
private final ScheduledExecutorService scheduledExecutorService;
1923
private List<TelemetryFrontendLog> eventsBatch;
24+
private volatile long lastFlushedTime;
25+
private ScheduledFuture<?> flushTask;
26+
private final int flushIntervalMillis;
2027

2128
public TelemetryClient(
2229
IDatabricksConnectionContext connectionContext,
@@ -28,6 +35,11 @@ public TelemetryClient(
2835
this.context = connectionContext;
2936
this.databricksConfig = config;
3037
this.executorService = executorService;
38+
this.scheduledExecutorService =
39+
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
40+
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
41+
this.lastFlushedTime = System.currentTimeMillis();
42+
schedulePeriodicFlush();
3143
}
3244

3345
public TelemetryClient(
@@ -38,6 +50,27 @@ public TelemetryClient(
3850
this.context = connectionContext;
3951
this.databricksConfig = null;
4052
this.executorService = executorService;
53+
this.scheduledExecutorService =
54+
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
55+
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
56+
this.lastFlushedTime = System.currentTimeMillis();
57+
schedulePeriodicFlush();
58+
}
59+
60+
private void schedulePeriodicFlush() {
61+
if (flushTask != null) {
62+
flushTask.cancel(false);
63+
}
64+
flushTask =
65+
scheduledExecutorService.scheduleAtFixedRate(
66+
this::periodicFlush, flushIntervalMillis, flushIntervalMillis, TimeUnit.MILLISECONDS);
67+
}
68+
69+
private void periodicFlush() {
70+
long now = System.currentTimeMillis();
71+
if (now - lastFlushedTime >= flushIntervalMillis) {
72+
flush();
73+
}
4174
}
4275

4376
@Override
@@ -61,6 +94,10 @@ public void close() {
6194
TelemetryHelper.exportChunkLatencyTelemetry(chunkDetails, statementId);
6295
});
6396
flush();
97+
if (flushTask != null) {
98+
flushTask.cancel(false);
99+
}
100+
scheduledExecutorService.shutdown();
64101
}
65102

66103
@Override
@@ -75,14 +112,19 @@ public void closeStatement(String statementId) {
75112

76113
private void flush() {
77114
synchronized (this) {
78-
List<TelemetryFrontendLog> logsToBeFlushed = eventsBatch;
79-
executorService.submit(
80-
new TelemetryPushTask(logsToBeFlushed, isAuthEnabled, context, databricksConfig));
81-
eventsBatch = new LinkedList<>();
115+
if (!eventsBatch.isEmpty()) {
116+
List<TelemetryFrontendLog> logsToBeFlushed = eventsBatch;
117+
executorService.submit(
118+
new TelemetryPushTask(logsToBeFlushed, isAuthEnabled, context, databricksConfig));
119+
eventsBatch = new LinkedList<>();
120+
}
121+
lastFlushedTime = System.currentTimeMillis();
82122
}
83123
}
84124

85125
int getCurrentSize() {
86-
return eventsBatch.size();
126+
synchronized (this) {
127+
return eventsBatch.size();
128+
}
87129
}
88130
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public static void updateClientAppName(String clientAppName) {
5858
}
5959

6060
public static boolean isTelemetryAllowedForConnection(IDatabricksConnectionContext context) {
61+
if (context.forceEnableTelemetry()) {
62+
return true;
63+
}
6164
return context != null
6265
&& context.isTelemetryEnabled()
6366
&& DatabricksDriverFeatureFlagsContextFactory.getInstance(context)

src/test/java/com/databricks/jdbc/telemetry/TelemetryClientTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.google.common.util.concurrent.MoreExecutors;
1616
import java.util.Map;
1717
import java.util.Properties;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.TimeUnit;
1820
import org.apache.http.HttpHeaders;
1921
import org.apache.http.StatusLine;
2022
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -139,4 +141,114 @@ public void testExportEventDoesNotThrowErrorsInFailures() throws Exception {
139141
() -> client.exportEvent(new TelemetryFrontendLog().setFrontendLogEventId("event2")));
140142
}
141143
}
144+
145+
@Test
146+
public void testPeriodicFlushWithAuthenticatedClient() throws Exception {
147+
try (MockedStatic<DatabricksHttpClientFactory> factoryMocked =
148+
mockStatic(DatabricksHttpClientFactory.class)) {
149+
DatabricksHttpClientFactory mockFactory = mock(DatabricksHttpClientFactory.class);
150+
factoryMocked.when(DatabricksHttpClientFactory::getInstance).thenReturn(mockFactory);
151+
when(mockFactory.getClient(any())).thenReturn(mockHttpClient);
152+
when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
153+
when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
154+
when(mockStatusLine.getStatusCode()).thenReturn(200);
155+
TelemetryResponse response = new TelemetryResponse().setNumSuccess(1L).setNumProtoSuccess(1L);
156+
when(mockHttpResponse.getEntity())
157+
.thenReturn(new StringEntity(new ObjectMapper().writeValueAsString(response)));
158+
159+
Map<String, String> headers = Map.of(HttpHeaders.AUTHORIZATION, "token");
160+
when(databricksConfig.authenticate()).thenReturn(headers);
161+
162+
// JDBC URL with 2 seconds flush interval
163+
String jdbcUrlWith2SecondsFlush =
164+
"jdbc:databricks://adb-20.azuredatabricks.net:4423/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/ghgjhgj;UserAgentEntry=MyApp;EnableTelemetry=1;TelemetryBatchSize=2;TelemetryFlushInterval=2000";
165+
166+
IDatabricksConnectionContext context =
167+
DatabricksConnectionContext.parse(jdbcUrlWith2SecondsFlush, new Properties());
168+
TelemetryClient client =
169+
new TelemetryClient(context, MoreExecutors.newDirectExecutorService(), databricksConfig);
170+
171+
// Add a single event that won't trigger batch flush
172+
client.exportEvent(new TelemetryFrontendLog().setFrontendLogEventId("event1"));
173+
assertEquals(1, client.getCurrentSize());
174+
175+
// Wait for a short time to verify the periodic flush doesn't trigger immediately
176+
Thread.sleep(100);
177+
assertEquals(1, client.getCurrentSize());
178+
179+
// Wait for 2 seconds to trigger the periodic flush
180+
Thread.sleep(2000);
181+
assertEquals(0, client.getCurrentSize());
182+
183+
client.exportEvent(new TelemetryFrontendLog().setFrontendLogEventId("event2"));
184+
assertEquals(1, client.getCurrentSize());
185+
// Close the client to trigger final flush
186+
client.close();
187+
assertEquals(0, client.getCurrentSize());
188+
}
189+
}
190+
191+
@Test
192+
public void testTimerResetOnBatchSizeFlush() throws Exception {
193+
TelemetryClient client = null;
194+
ExecutorService executor = null;
195+
try (MockedStatic<DatabricksHttpClientFactory> factoryMocked =
196+
mockStatic(DatabricksHttpClientFactory.class)) {
197+
DatabricksHttpClientFactory mockFactory = mock(DatabricksHttpClientFactory.class);
198+
factoryMocked.when(DatabricksHttpClientFactory::getInstance).thenReturn(mockFactory);
199+
when(mockFactory.getClient(any())).thenReturn(mockHttpClient);
200+
when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
201+
when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
202+
when(mockStatusLine.getStatusCode()).thenReturn(200);
203+
TelemetryResponse response = new TelemetryResponse().setNumSuccess(1L).setNumProtoSuccess(1L);
204+
when(mockHttpResponse.getEntity())
205+
.thenReturn(new StringEntity(new ObjectMapper().writeValueAsString(response)));
206+
207+
// Set up a client with 3 second flush interval and batch size of 2
208+
String jdbcUrl =
209+
"jdbc:databricks://adb-20.azuredatabricks.net:4423/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/ghgjhgj;UserAgentEntry=MyApp;EnableTelemetry=1;TelemetryBatchSize=2;TelemetryFlushInterval=3000";
210+
IDatabricksConnectionContext context =
211+
DatabricksConnectionContext.parse(jdbcUrl, new Properties());
212+
executor = MoreExecutors.newDirectExecutorService();
213+
client = new TelemetryClient(context, executor);
214+
215+
// Add events to trigger batch size flush
216+
client.exportEvent(new TelemetryFrontendLog().setFrontendLogEventId("event1"));
217+
client.exportEvent(
218+
new TelemetryFrontendLog()
219+
.setFrontendLogEventId("event2")); // This should trigger flush due to batch size
220+
221+
// Wait 2 seconds (less than the flush interval)
222+
Thread.sleep(2000);
223+
224+
// Add another event
225+
client.exportEvent(new TelemetryFrontendLog().setFrontendLogEventId("event3"));
226+
227+
// Verify it's still in the batch (shouldn't have been flushed yet since timer was reset)
228+
assertEquals(1, client.getCurrentSize());
229+
230+
// Wait another 2 seconds (still less than full interval from last flush)
231+
Thread.sleep(2000);
232+
233+
// Verify it's still not flushed
234+
assertEquals(1, client.getCurrentSize());
235+
236+
} finally {
237+
// Clean up resources
238+
if (client != null) {
239+
client.close();
240+
}
241+
if (executor != null) {
242+
executor.shutdown();
243+
// Wait for any pending tasks to complete
244+
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
245+
executor.shutdownNow();
246+
}
247+
}
248+
// Verify mocks were properly used
249+
verify(mockHttpClient, atLeastOnce()).execute(any());
250+
verify(mockHttpResponse, atLeastOnce()).getStatusLine();
251+
verify(mockStatusLine, atLeastOnce()).getStatusCode();
252+
}
253+
}
142254
}

src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,23 @@ public void testGetDatabricksConfigSafely_ReturnsNullOnError() {
131131

132132
@Test
133133
public void testGetDatabricksConfigSafely_HandlesNullContext() {
134-
DatabricksConfig result = TelemetryHelper.getDatabricksConfigSafely(null);
134+
DatabricksConfig result = TelemetryHelper.getDatabricksConfigSafely(connectionContext);
135135
assertNull(result, "Should return null when context is null");
136136
}
137137

138138
@Test
139139
public void testTelemetryNotAllowedUsecase() {
140-
assertFalse(() -> isTelemetryAllowedForConnection(null));
140+
assertFalse(() -> isTelemetryAllowedForConnection(connectionContext));
141141
when(connectionContext.getComputeResource()).thenReturn(WAREHOUSE_COMPUTE);
142142
enableFeatureFlagForTesting(connectionContext, Collections.emptyMap());
143143
assertFalse(() -> isTelemetryAllowedForConnection(connectionContext));
144144
}
145+
146+
@Test
147+
public void testTelemetryAllowedWithForceTelemetryFlag() {
148+
when(connectionContext.getComputeResource()).thenReturn(WAREHOUSE_COMPUTE);
149+
when(connectionContext.forceEnableTelemetry()).thenReturn(true);
150+
enableFeatureFlagForTesting(connectionContext, Collections.emptyMap());
151+
assertTrue(() -> isTelemetryAllowedForConnection(connectionContext));
152+
}
145153
}

0 commit comments

Comments
 (0)