Skip to content

Commit 98fc960

Browse files
authored
Improvements to telemetry implementation (#1009)
## Description - Improved the cache logic for feature flags - Minor latency improvement for telemetry collector (3-4 ms improvement on hot paths). This is by skipping allocation when present and skipping map fetch when new object is created. - Moved system config info to be logged at connection start-up asynchrounously (Note : we can't move it to static as we need it in our log folder -i.e., we can add it only after logging is setup) - For feature flag factory, we preserve the object instance per compute - This will improve performance in case of multi-connection. Currently, we clear out for every compute - i.e., make a call to the feature flag endpoint on every connectionContext. - Force flush synchronously so that telemetry logs are pushed during connection close itself. ### For benchmarking improvements, there is ~30 ms improvement with these changes.[ see the internal table](https://docs.google.com/document/d/1oAu1pTCM9VFrPDCt0I_aWsSIoNGOapmGc_2Zob6QUiA/edit?tab=t.0#heading=h.ibcl7vrboyf2) for more detail ## Testing - Verified locally that system information is being logged correctly `2025-09-17 12:08:27 INFO com.databricks.client.jdbc.Driver#lambda$connect$0 - DriverSystemConfiguration[driverName='oss-jdbc', driverVersion='1.0.9-oss', osName='Mac OS X', osVersion='15.6.1', osArch='aarch64', runtimeName='Java HotSpot(TM) 64-Bit Server VM', runtimeVersion='21.0.7', runtimeVendor='Oracle Corporation', localeName='en_US', processName='JUnitStarter', defaultCharsetEncoding='UTF-8'] ` - Even if i close the JVM after conn.close(), the logs show up. This was not the case earlier ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent 165ab43 commit 98fc960

10 files changed

Lines changed: 167 additions & 67 deletions

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Added
66

77
### Updated
8+
- Telemetry data is now captured more efficiently and consistently due to enhancements in the log and connection close flush logic.
89

910
### Fixed
1011
- Fixed state leaking issue in thrift client.

src/main/java/com/databricks/client/jdbc/Driver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import java.util.List;
2323
import java.util.Properties;
2424
import java.util.TimeZone;
25+
import java.util.concurrent.CompletableFuture;
2526
import java.util.logging.Logger;
2627

2728
/** Databricks JDBC driver. */
28-
public class Driver implements IDatabricksDriver, java.sql.Driver {
29+
public class Driver implements IDatabricksDriver {
2930
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(Driver.class);
3031
private static final Driver INSTANCE;
3132

@@ -57,8 +58,8 @@ public Connection connect(String url, Properties info) throws DatabricksSQLExcep
5758
IDatabricksConnectionContext connectionContext =
5859
DatabricksConnectionContextFactory.create(url, info);
5960
DriverUtil.setUpLogging(connectionContext);
61+
CompletableFuture.runAsync(() -> LOGGER.info(getDriverSystemConfiguration().toString()));
6062
UserAgentManager.setUserAgent(connectionContext);
61-
LOGGER.info(getDriverSystemConfiguration().toString());
6263
DatabricksConnection connection = new DatabricksConnection(connectionContext);
6364
boolean isConnectionOpen = false;
6465
try {

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ public void close() throws DatabricksSQLException {
156156
}
157157
this.session.close();
158158
TelemetryClientFactory.getInstance().closeTelemetryClient(connectionContext);
159-
DatabricksHttpClientFactory.getInstance().removeClient(connectionContext);
160159
DatabricksClientConfiguratorManager.getInstance().removeInstance(connectionContext);
161160
DatabricksDriverFeatureFlagsContextFactory.removeInstance(connectionContext);
161+
DatabricksHttpClientFactory.getInstance().removeClient(connectionContext);
162162
DatabricksThreadContextHolder.clearAllContext();
163163
}
164164

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ public int getTelemetryBatchSize() {
796796

797797
@Override
798798
public boolean isTelemetryEnabled() {
799-
return getParameter(DatabricksJdbcUrlParams.ENABLE_TELEMETRY, "0").equals("1");
799+
return getParameter(DatabricksJdbcUrlParams.ENABLE_TELEMETRY).equals("1");
800800
}
801801

802802
@Override

src/main/java/com/databricks/jdbc/common/safe/DatabricksDriverFeatureFlagsContext.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.databricks.jdbc.common.safe;
22

3-
import static java.lang.Math.max;
4-
53
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
64
import com.databricks.jdbc.common.DatabricksClientConfiguratorManager;
75
import com.databricks.jdbc.common.util.DriverUtil;
@@ -12,13 +10,13 @@
1210
import com.databricks.jdbc.log.JdbcLogger;
1311
import com.databricks.jdbc.log.JdbcLoggerFactory;
1412
import com.google.common.annotations.VisibleForTesting;
15-
import com.google.common.cache.*;
16-
import com.google.common.util.concurrent.Futures;
17-
import com.google.common.util.concurrent.ListenableFuture;
13+
import com.google.common.cache.Cache;
14+
import com.google.common.cache.CacheBuilder;
1815
import java.io.IOException;
1916
import java.util.Map;
20-
import java.util.concurrent.ExecutorService;
2117
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.ScheduledFuture;
2220
import java.util.concurrent.TimeUnit;
2321
import org.apache.http.client.methods.CloseableHttpResponse;
2422
import org.apache.http.client.methods.HttpGet;
@@ -33,56 +31,58 @@ public class DatabricksDriverFeatureFlagsContext {
3331
"/api/2.0/connector-service/feature-flags/OSS_JDBC/%s",
3432
DriverUtil.getDriverVersionWithoutOSSSuffix());
3533
private static final int DEFAULT_TTL_SECONDS = 900; // 15 minutes
36-
private static final int REFRESH_BEFORE_EXPIRY_SECONDS = 10; // refresh 10s before expiry
3734
private final String featureFlagEndpoint;
3835
private final IDatabricksConnectionContext connectionContext;
39-
private LoadingCache<String, String> featureFlags;
40-
private final ExecutorService asyncExecutor = Executors.newCachedThreadPool();
36+
private final Cache<String, String> featureFlags;
37+
private final ScheduledExecutorService scheduler =
38+
Executors.newSingleThreadScheduledExecutor(
39+
r -> {
40+
Thread t = new Thread(r, "databricks-jdbc-feature-flags-refresh");
41+
t.setDaemon(true);
42+
return t;
43+
});
44+
private ScheduledFuture<?> scheduledRefreshTask;
45+
private volatile int refreshIntervalSeconds = DEFAULT_TTL_SECONDS;
4146

4247
public DatabricksDriverFeatureFlagsContext(IDatabricksConnectionContext connectionContext) {
4348
this.connectionContext = connectionContext;
44-
this.featureFlags = createFeatureFlagsCache(DEFAULT_TTL_SECONDS);
49+
this.featureFlags = CacheBuilder.newBuilder().build();
4550
this.featureFlagEndpoint =
4651
String.format(
4752
"https://%s%s", connectionContext.getHostForOAuth(), FEATURE_FLAGS_ENDPOINT_SUFFIX);
53+
// Make an initial blocking call to fetch featureFlags
54+
refreshAllFeatureFlags();
55+
// Async fetch eventually
56+
scheduleOrRescheduleRefresh(DEFAULT_TTL_SECONDS);
4857
}
4958

5059
// Constructor for testing
5160
DatabricksDriverFeatureFlagsContext(
5261
IDatabricksConnectionContext connectionContext, Map<String, String> initialFlags) {
5362
this.connectionContext = connectionContext;
54-
this.featureFlags = createFeatureFlagsCache(DEFAULT_TTL_SECONDS);
63+
this.featureFlags = CacheBuilder.newBuilder().build();
5564
this.featureFlagEndpoint =
5665
String.format(
5766
"https://%s%s", connectionContext.getHostForOAuth(), FEATURE_FLAGS_ENDPOINT_SUFFIX);
5867
initialFlags.forEach(this.featureFlags::put);
68+
scheduleOrRescheduleRefresh(DEFAULT_TTL_SECONDS);
5969
}
6070

61-
private LoadingCache<String, String> createFeatureFlagsCache(int ttlSeconds) {
62-
return CacheBuilder.newBuilder()
63-
.expireAfterWrite(ttlSeconds, TimeUnit.SECONDS)
64-
.refreshAfterWrite(
65-
max(
66-
300,
67-
ttlSeconds
68-
- REFRESH_BEFORE_EXPIRY_SECONDS), // refresh time should be minimum 5 minutes
69-
TimeUnit.SECONDS)
70-
.build(
71-
new CacheLoader<>() {
72-
@Override
73-
public String load(String key) {
74-
refreshAllFeatureFlags();
75-
return featureFlags.getIfPresent(key) != null
76-
? featureFlags.getIfPresent(key)
77-
: "false";
78-
}
79-
80-
@Override
81-
public ListenableFuture<String> reload(String key, String oldValue) {
82-
asyncExecutor.submit(() -> refreshAllFeatureFlags());
83-
return Futures.immediateFuture(oldValue); // keep old value until refresh is done
84-
}
85-
});
71+
private void scheduleOrRescheduleRefresh(int ttlSeconds) {
72+
this.refreshIntervalSeconds = ttlSeconds > 0 ? ttlSeconds : DEFAULT_TTL_SECONDS;
73+
if (scheduler.isShutdown()) {
74+
return;
75+
}
76+
if (scheduledRefreshTask != null && !scheduledRefreshTask.isCancelled()) {
77+
scheduledRefreshTask.cancel(false);
78+
}
79+
// Schedule refresh at a fixed rate.
80+
scheduledRefreshTask =
81+
scheduler.scheduleAtFixedRate(
82+
this::refreshAllFeatureFlags,
83+
this.refreshIntervalSeconds,
84+
this.refreshIntervalSeconds,
85+
TimeUnit.SECONDS);
8686
}
8787

8888
private void refreshAllFeatureFlags() {
@@ -121,7 +121,7 @@ void fetchAndSetFlagsFromServer(IDatabricksHttpClient httpClient, HttpGet reques
121121

122122
Integer ttlSeconds = featureFlagsResponse.getTtlSeconds();
123123
if (ttlSeconds != null) {
124-
featureFlags = createFeatureFlagsCache(ttlSeconds);
124+
scheduleOrRescheduleRefresh(ttlSeconds);
125125
}
126126
} else {
127127
LOGGER.trace(
@@ -133,11 +133,23 @@ void fetchAndSetFlagsFromServer(IDatabricksHttpClient httpClient, HttpGet reques
133133
}
134134

135135
public boolean isFeatureEnabled(String name) {
136+
String value = featureFlags.getIfPresent(name);
137+
return Boolean.parseBoolean(value);
138+
}
139+
140+
public void shutdown() {
141+
ScheduledFuture<?> task = scheduledRefreshTask;
142+
if (task != null) {
143+
task.cancel(false);
144+
}
145+
scheduler.shutdown();
136146
try {
137-
return Boolean.parseBoolean(featureFlags.get(name));
138-
} catch (Exception e) {
139-
LOGGER.trace("Error fetching feature flag {}: {}", name, e.getMessage());
140-
return false;
147+
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
148+
scheduler.shutdownNow();
149+
}
150+
} catch (InterruptedException ie) {
151+
Thread.currentThread().interrupt();
152+
scheduler.shutdownNow();
141153
}
142154
}
143155
}

src/main/java/com/databricks/jdbc/common/safe/DatabricksDriverFeatureFlagsContextFactory.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
/** Factory class to manage DatabricksDriverFeatureFlagsContext instances */
99
public class DatabricksDriverFeatureFlagsContextFactory {
10-
private static final Map<String, DatabricksDriverFeatureFlagsContext> contextMap =
10+
private static final Map<String, FeatureFlagsContextHolder> contextMap =
1111
new ConcurrentHashMap<>();
1212

1313
private DatabricksDriverFeatureFlagsContextFactory() {
@@ -22,9 +22,21 @@ private DatabricksDriverFeatureFlagsContextFactory() {
2222
*/
2323
public static DatabricksDriverFeatureFlagsContext getInstance(
2424
IDatabricksConnectionContext context) {
25-
return contextMap.computeIfAbsent(
26-
context.getComputeResource().getUniqueIdentifier(),
27-
k -> new DatabricksDriverFeatureFlagsContext(context));
25+
String key = keyOf(context);
26+
FeatureFlagsContextHolder holder =
27+
contextMap.compute(
28+
key,
29+
(k, existing) -> {
30+
if (existing == null) {
31+
// First reference for this compute
32+
return new FeatureFlagsContextHolder(
33+
new DatabricksDriverFeatureFlagsContext(context), 1);
34+
}
35+
// Additional reference for the same compute
36+
existing.refCount.incrementAndGet();
37+
return existing;
38+
});
39+
return holder.context;
2840
}
2941

3042
/**
@@ -34,15 +46,33 @@ public static DatabricksDriverFeatureFlagsContext getInstance(
3446
*/
3547
public static void removeInstance(IDatabricksConnectionContext connectionContext) {
3648
if (connectionContext != null) {
37-
contextMap.remove(connectionContext.getComputeResource().getUniqueIdentifier());
49+
String key = keyOf(connectionContext);
50+
contextMap.computeIfPresent(
51+
key,
52+
(k, holder) -> {
53+
// Last reference being removed: shutdown and remove entry
54+
if (holder.refCount.get() <= 1) {
55+
holder.context.shutdown();
56+
return null;
57+
}
58+
// Still referenced elsewhere: just decrement
59+
holder.refCount.decrementAndGet();
60+
return holder;
61+
});
3862
}
3963
}
4064

4165
@VisibleForTesting
4266
static void setFeatureFlagsContext(
4367
IDatabricksConnectionContext connectionContext, Map<String, String> featureFlags) {
68+
String key = keyOf(connectionContext);
4469
contextMap.put(
45-
connectionContext.getComputeResource().getUniqueIdentifier(),
46-
new DatabricksDriverFeatureFlagsContext(connectionContext, featureFlags));
70+
key,
71+
new FeatureFlagsContextHolder(
72+
new DatabricksDriverFeatureFlagsContext(connectionContext, featureFlags), 1));
73+
}
74+
75+
private static String keyOf(IDatabricksConnectionContext context) {
76+
return context.getComputeResource().getUniqueIdentifier();
4777
}
4878
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.databricks.jdbc.common.safe;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
final class FeatureFlagsContextHolder {
6+
final DatabricksDriverFeatureFlagsContext context;
7+
AtomicInteger refCount;
8+
9+
FeatureFlagsContextHolder(DatabricksDriverFeatureFlagsContext context, int refCount) {
10+
this.context = context;
11+
this.refCount = new AtomicInteger(refCount);
12+
}
13+
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
2323
import com.databricks.jdbc.dbclient.impl.common.StatementId;
2424
import com.databricks.jdbc.dbclient.impl.sqlexec.CommandBuilder;
25-
import com.databricks.jdbc.exception.DatabricksHttpException;
2625
import com.databricks.jdbc.exception.DatabricksParsingException;
2726
import com.databricks.jdbc.exception.DatabricksSQLException;
2827
import com.databricks.jdbc.log.JdbcLogger;
@@ -48,7 +47,7 @@ public class DatabricksThriftServiceClient implements IDatabricksClient, IDatabr
4847
private final MetadataResultSetBuilder metadataResultSetBuilder;
4948

5049
public DatabricksThriftServiceClient(IDatabricksConnectionContext connectionContext)
51-
throws DatabricksParsingException, DatabricksHttpException {
50+
throws DatabricksParsingException {
5251
this.connectionContext = connectionContext;
5352
this.thriftAccessor = new DatabricksThriftAccessor(connectionContext);
5453
this.metadataResultSetBuilder = new MetadataResultSetBuilder(connectionContext);

0 commit comments

Comments
 (0)