Skip to content

Commit 409b69d

Browse files
authored
[PECOBLR-1190] Make both telemetryClient and factory on a host level (#1075)
## Description - Minimise the number of OAuth requests from the driver. This PR reduces it on the featureFlag and telemetry level - Now that it is reduced, we also change logLevels (Note that even in 3.0.3 we were sending all telemetry logs, the 3.0.4 contains logLevels in telemetry PR) ## Testing - Unit tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent ebe4f82 commit 409b69d

6 files changed

Lines changed: 107 additions & 30 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- Added a new config attribute `DisableOauthRefreshToken` to control whether refresh tokens are requested in OAuth exchanges. By default, the driver does not include the `offline_access` scope. If `offline_access` is explicitly provided by the user, it is preserved and not removed.
1010

1111
### Updated
12+
- Minimized OAuth requests by reducing calls in feature flags and telemetry.
1213
- Updated sdk version from 0.65.0 to 0.69.0
1314

1415
### Fixed

src/main/java/com/databricks/jdbc/common/safe/DatabricksDriverFeatureFlagsContextFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ static void setFeatureFlagsContext(
7373
}
7474

7575
private static String keyOf(IDatabricksConnectionContext context) {
76-
return context.getComputeResource().getUniqueIdentifier();
76+
return context.getHostForOAuth();
7777
}
7878
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryClientFactory.java

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import com.databricks.jdbc.log.JdbcLoggerFactory;
99
import com.databricks.sdk.core.DatabricksConfig;
1010
import com.google.common.annotations.VisibleForTesting;
11-
import java.util.LinkedHashMap;
11+
import java.util.Map;
12+
import java.util.concurrent.ConcurrentHashMap;
1213
import java.util.concurrent.ExecutorService;
1314
import java.util.concurrent.Executors;
1415
import java.util.concurrent.ThreadFactory;
@@ -23,10 +24,10 @@ public class TelemetryClientFactory {
2324
private static final TelemetryClientFactory INSTANCE = new TelemetryClientFactory();
2425

2526
@VisibleForTesting
26-
final LinkedHashMap<String, TelemetryClient> telemetryClients = new LinkedHashMap<>();
27+
final Map<String, TelemetryClientHolder> telemetryClientHolders = new ConcurrentHashMap<>();
2728

2829
@VisibleForTesting
29-
final LinkedHashMap<String, TelemetryClient> noauthTelemetryClients = new LinkedHashMap<>();
30+
final Map<String, TelemetryClientHolder> noauthTelemetryClientHolders = new ConcurrentHashMap<>();
3031

3132
private final ExecutorService telemetryExecutorService;
3233

@@ -59,24 +60,60 @@ public ITelemetryClient getTelemetryClient(IDatabricksConnectionContext connecti
5960
DatabricksConfig databricksConfig =
6061
TelemetryHelper.getDatabricksConfigSafely(connectionContext);
6162
if (databricksConfig != null) {
62-
return telemetryClients.computeIfAbsent(
63-
connectionContext.getConnectionUuid(),
64-
k ->
65-
new TelemetryClient(
66-
connectionContext, getTelemetryExecutorService(), databricksConfig));
63+
String key = keyOf(connectionContext);
64+
TelemetryClientHolder holder =
65+
telemetryClientHolders.compute(
66+
key,
67+
(k, existing) -> {
68+
if (existing == null) {
69+
return new TelemetryClientHolder(
70+
new TelemetryClient(
71+
connectionContext, getTelemetryExecutorService(), databricksConfig),
72+
1);
73+
}
74+
existing.refCount.incrementAndGet();
75+
return existing;
76+
});
77+
return holder.client;
6778
}
6879
// Use no-auth telemetry client if connection creation failed.
69-
return noauthTelemetryClients.computeIfAbsent(
70-
connectionContext.getConnectionUuid(),
71-
k -> new TelemetryClient(connectionContext, getTelemetryExecutorService()));
80+
String key = keyOf(connectionContext);
81+
TelemetryClientHolder holder =
82+
noauthTelemetryClientHolders.compute(
83+
key,
84+
(k, existing) -> {
85+
if (existing == null) {
86+
return new TelemetryClientHolder(
87+
new TelemetryClient(connectionContext, getTelemetryExecutorService()), 1);
88+
}
89+
existing.refCount.incrementAndGet();
90+
return existing;
91+
});
92+
return holder.client;
7293
}
7394

7495
public void closeTelemetryClient(IDatabricksConnectionContext connectionContext) {
75-
closeTelemetryClient(
76-
telemetryClients.remove(connectionContext.getConnectionUuid()), "telemetry client");
77-
closeTelemetryClient(
78-
noauthTelemetryClients.remove(connectionContext.getConnectionUuid()),
79-
"unauthenticated telemetry client");
96+
String key = keyOf(connectionContext);
97+
telemetryClientHolders.computeIfPresent(
98+
key,
99+
(k, holder) -> {
100+
if (holder.refCount.get() <= 1) {
101+
closeTelemetryClient(holder.client, "telemetry client");
102+
return null;
103+
}
104+
holder.refCount.decrementAndGet();
105+
return holder;
106+
});
107+
noauthTelemetryClientHolders.computeIfPresent(
108+
key,
109+
(k, holder) -> {
110+
if (holder.refCount.get() <= 1) {
111+
closeTelemetryClient(holder.client, "unauthenticated telemetry client");
112+
return null;
113+
}
114+
holder.refCount.decrementAndGet();
115+
return holder;
116+
});
80117
}
81118

82119
public ExecutorService getTelemetryExecutorService() {
@@ -109,12 +146,12 @@ static ITelemetryPushClient getTelemetryPushClient(
109146
@VisibleForTesting
110147
public void reset() {
111148
// Close all existing clients
112-
telemetryClients.values().forEach(TelemetryClient::close);
113-
noauthTelemetryClients.values().forEach(TelemetryClient::close);
149+
telemetryClientHolders.values().forEach(holder -> holder.client.close());
150+
noauthTelemetryClientHolders.values().forEach(holder -> holder.client.close());
114151

115152
// Clear the maps
116-
telemetryClients.clear();
117-
noauthTelemetryClients.clear();
153+
telemetryClientHolders.clear();
154+
noauthTelemetryClientHolders.clear();
118155
}
119156

120157
private void closeTelemetryClient(ITelemetryClient client, String clientType) {
@@ -126,4 +163,18 @@ private void closeTelemetryClient(ITelemetryClient client, String clientType) {
126163
}
127164
}
128165
}
166+
167+
private static final class TelemetryClientHolder {
168+
final TelemetryClient client;
169+
final AtomicInteger refCount;
170+
171+
TelemetryClientHolder(TelemetryClient client, int initialCount) {
172+
this.client = client;
173+
this.refCount = new AtomicInteger(initialCount);
174+
}
175+
}
176+
177+
private static String keyOf(IDatabricksConnectionContext context) {
178+
return context.getHostForOAuth();
179+
}
129180
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private static void exportTelemetryEvent(
8888
TelemetryLogLevel logLevel) {
8989
if (connectionContext == null
9090
|| telemetryDetails == null
91-
|| logLevel.toInt() <= connectionContext.getTelemetryLogLevel().toInt()) {
91+
|| logLevel.toInt() < connectionContext.getTelemetryLogLevel().toInt()) {
9292
// We don't export telemetry logs in the following three scenarios:
9393
// 1. When the context is not set.
9494
// 2. When telemetry details are not set.

src/test/java/com/databricks/jdbc/telemetry/TelemetryClientFactoryTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public void testGetNoOpTelemetryClient() throws Exception {
4444
ITelemetryClient telemetryClient =
4545
TelemetryClientFactory.getInstance().getTelemetryClient(context);
4646
assertInstanceOf(NoopTelemetryClient.class, telemetryClient);
47-
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClients.size());
48-
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClients.size());
47+
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClientHolders.size());
48+
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClientHolders.size());
4949
}
5050

5151
@Test
@@ -58,11 +58,11 @@ public void testGetAuthenticatedTelemetryClient() throws Exception {
5858
ITelemetryClient telemetryClient =
5959
TelemetryClientFactory.getInstance().getTelemetryClient(context);
6060
assertInstanceOf(TelemetryClient.class, telemetryClient);
61-
assertEquals(1, TelemetryClientFactory.getInstance().telemetryClients.size());
62-
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClients.size());
61+
assertEquals(1, TelemetryClientFactory.getInstance().telemetryClientHolders.size());
62+
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClientHolders.size());
6363
TelemetryClientFactory.getInstance().closeTelemetryClient(context);
64-
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClients.size());
65-
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClients.size());
64+
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClientHolders.size());
65+
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClientHolders.size());
6666
TelemetryClientFactory.getInstance().closeTelemetryClient(context);
6767
}
6868

@@ -80,8 +80,8 @@ public void testGetNoOpTelemetryClientWhenDatabricksConfigIsNull() throws Except
8080
TelemetryClientFactory.getInstance().getTelemetryClient(context);
8181

8282
assertInstanceOf(NoopTelemetryClient.class, telemetryClient);
83-
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClients.size());
84-
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClients.size());
83+
assertEquals(0, TelemetryClientFactory.getInstance().telemetryClientHolders.size());
84+
assertEquals(0, TelemetryClientFactory.getInstance().noauthTelemetryClientHolders.size());
8585
TelemetryClientFactory.getInstance().closeTelemetryClient(context);
8686
}
8787
}

src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public void testGetDatabricksConfigSafely_HandlesNullContext() {
205205
public void testTelemetryNotAllowedUsecase() {
206206
// Clear thread context to ensure telemetry is not allowed
207207
when(connectionContext.forceEnableTelemetry()).thenReturn(false);
208+
when(connectionContext.getHostForOAuth()).thenReturn("test-host");
208209
when(connectionContext.isTelemetryEnabled()).thenReturn(false);
209210
assertFalse(isTelemetryAllowedForConnection(connectionContext));
210211
when(connectionContext.getComputeResource()).thenReturn(WAREHOUSE_COMPUTE);
@@ -216,6 +217,7 @@ public void testTelemetryNotAllowedUsecase() {
216217
public void testTelemetryAllowedWithForceTelemetryFlag() {
217218
when(connectionContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.DEBUG);
218219
when(connectionContext.getComputeResource()).thenReturn(WAREHOUSE_COMPUTE);
220+
when(connectionContext.getHostForOAuth()).thenReturn("test-host");
219221
enableFeatureFlagForTesting(connectionContext, Collections.emptyMap());
220222
assertTrue(() -> isTelemetryAllowedForConnection(connectionContext));
221223
}
@@ -260,6 +262,29 @@ void testExportTelemetryLog_EmitsWhenEventLevelHigherThanConfigured() {
260262
}
261263
}
262264

265+
@Test
266+
void testExportTelemetryLog_EmitsWhenEventLevelEqualToConfigured() {
267+
// Configured level: INFO (4); Event level: INFO (4) -> should export (4 >= 4)
268+
when(connectionContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.INFO);
269+
StatementTelemetryDetails details =
270+
new StatementTelemetryDetails("stmt-eq").setOperationLatencyMillis(15L);
271+
272+
ITelemetryClient clientMock = mock(ITelemetryClient.class);
273+
TelemetryClientFactory factoryMock = mock(TelemetryClientFactory.class);
274+
275+
try (MockedStatic<TelemetryClientFactory> mocked =
276+
Mockito.mockStatic(TelemetryClientFactory.class)) {
277+
mocked.when(TelemetryClientFactory::getInstance).thenReturn(factoryMock);
278+
when(factoryMock.getTelemetryClient(connectionContext)).thenReturn(clientMock);
279+
280+
TelemetryHelper.exportTelemetryLog(details, TelemetryLogLevel.INFO);
281+
282+
mocked.verify(TelemetryClientFactory::getInstance, times(1));
283+
verify(factoryMock, times(1)).getTelemetryClient(connectionContext);
284+
verify(clientMock, times(1)).exportEvent(any());
285+
}
286+
}
287+
263288
static Stream<Object[]> failureLogParameters() {
264289
return Stream.of(
265290
new Object[] {"test-statement-id", null},

0 commit comments

Comments
 (0)