Skip to content

Commit 82700db

Browse files
authored
Fix telemetry HTTP client socket leak preventing CRaC checkpoint (#1333)
## Summary Fixes #1325 (follow-up to #1233). After `Connection.close()`, delayed telemetry flush tasks could re-create TELEMETRY HTTP clients that were never closed, leaking TCP sockets and preventing CRaC checkpoint. **Two cross-thread race conditions fixed:** 1. **TelemetryClient re-creation**: `getTelemetryClient()` after `closeTelemetryClient()` created an orphaned `TelemetryClient` with a periodic flush scheduler that nobody closes. Fixed by tracking closed connection UUIDs and returning `NoopTelemetryClient`, and by reordering `closeTelemetryClient()` to export pending collector events before closing the client. 2. **HTTP client re-creation**: `getClient(ctx, TELEMETRY)` after `removeClient(ctx)` re-created a `DatabricksHttpClient` via `computeIfAbsent` that nobody closes. Fixed by adding `closeConnection()` which permanently marks a connection as closed, causing `getClient()` to return `null`. ### Files changed | File | Change | |------|--------| | `TelemetryClientFactory.java` | Added `closedConnectionUuids` guard, reordered close sequence | | `DatabricksHttpClientFactory.java` | Added `closedConnections` guard, new `closeConnection()` method | | `DatabricksConnection.java` | Use `closeConnection()` instead of `removeClient()` | | `TelemetryPushClient.java` | Null guard for `getClient()` return value | | `TelemetryHttpClientLeakTest.java` | 3 reproduction/verification tests | | `RCA_SOCKET_LEAK_TELEMETRY_HTTP_CLIENT.md` | Full RCA with reproduction and verification plan | ### Test results - 3085 unit tests pass, 0 failures, 0 errors - 3 new tests specifically reproduce and verify the fix for both race conditions ## Test plan - [x] `TelemetryHttpClientLeakTest#testGetTelemetryClientAfterCloseReCreatesClient` — verifies `NoopTelemetryClient` returned after close - [x] `TelemetryHttpClientLeakTest#testGetClientReturnsNullAfterCloseConnection` — verifies `null` returned from HTTP client factory after close - [x] `TelemetryHttpClientLeakTest#testCloseTelemetryClientWithPendingCollectorEventsReCreatesClient` — verifies pending collector events don't cause re-creation - [x] All 152 existing telemetry tests pass - [x] Full unit test suite (3085 tests) passes - [ ] Manual verification with CRaC-enabled JDK (0 sockets after `Connection.close()`) This pull request was AI-assisted by Isaac. --------- Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 24699cf commit 82700db

9 files changed

Lines changed: 417 additions & 38 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ upgrading. These changes do not affect metadata on All-Purpose Clusters.
6868
- Fixed `?` characters inside SQL comments, string literals, and quoted identifiers being incorrectly counted as parameter placeholders when `supportManyParameters=1`. `SQLInterpolator` now uses `SqlCommentParser` to locate only real placeholders. Fixes #1331.
6969
- Fixed `MetadataOperationTimeout` not being applied when metadata operations use SHOW commands. Operations like `getTables`, `getSchemas`, and `getColumns` now respect the `MetadataOperationTimeout` connection property instead of hanging indefinitely with no timeout.
7070
- Reclassify transient server errors to standard SQL states (08S01, 40001) across all Thrift error sites. This ensures UC unavailability and concurrent modification errors surface consistently for better retry handling. Note: Dashboards and branching logic keyed on legacy XXUCC or 42000 must be updated.
71+
- Fixed telemetry HTTP client socket leak that prevented CRaC checkpoint. After `Connection.close()`, delayed telemetry flush tasks could re-create HTTP clients that were never closed, leaking TCP sockets. Fixes #1325.
7172
- Fixed client-side enforcement of `maxRows` limit. When `statement.setMaxRows()` is set, `ResultSet.next()` now returns false once the row limit is reached, even if the server returns more rows. Applies to all result types (Thrift, SEA, inline, CloudFetch).
7273

7374
---

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ public void close() throws SQLException {
424424
TelemetryClientFactory.getInstance().closeTelemetryClient(connectionContext);
425425
DatabricksClientConfiguratorManager.getInstance().removeInstance(connectionContext);
426426
DatabricksDriverFeatureFlagsContextFactory.removeInstance(connectionContext);
427-
DatabricksHttpClientFactory.getInstance().removeClient(connectionContext);
427+
DatabricksHttpClientFactory.getInstance().closeConnection(connectionContext);
428428
DatabricksThreadContextHolder.clearAllContext();
429429
}
430430

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.databricks.jdbc.dbclient.impl.http;
2+
3+
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
4+
import com.databricks.jdbc.exception.DatabricksHttpException;
5+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
6+
import java.util.concurrent.Future;
7+
import org.apache.hc.core5.concurrent.FutureCallback;
8+
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
9+
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
10+
import org.apache.http.client.methods.CloseableHttpResponse;
11+
import org.apache.http.client.methods.HttpUriRequest;
12+
13+
/**
14+
* Sentinel HTTP client returned for connections that have been closed. All operations throw {@link
15+
* DatabricksHttpException} with a clear message, so callers that accidentally store and use a
16+
* post-close client get an immediate, diagnosable failure instead of a silent null.
17+
*
18+
* <p>Symmetric with {@link com.databricks.jdbc.telemetry.NoopTelemetryClient} which serves the same
19+
* role for telemetry clients.
20+
*/
21+
public final class ClosedConnectionHttpClient implements IDatabricksHttpClient {
22+
23+
static final ClosedConnectionHttpClient INSTANCE = new ClosedConnectionHttpClient();
24+
25+
private ClosedConnectionHttpClient() {}
26+
27+
@Override
28+
public CloseableHttpResponse execute(HttpUriRequest request) throws DatabricksHttpException {
29+
throw new DatabricksHttpException(
30+
"Connection has been closed; HTTP client is no longer usable",
31+
DatabricksDriverErrorCode.INVALID_STATE);
32+
}
33+
34+
@Override
35+
public CloseableHttpResponse execute(HttpUriRequest request, boolean supportGzipEncoding)
36+
throws DatabricksHttpException {
37+
throw new DatabricksHttpException(
38+
"Connection has been closed; HTTP client is no longer usable",
39+
DatabricksDriverErrorCode.INVALID_STATE);
40+
}
41+
42+
@Override
43+
public <T> Future<T> executeAsync(
44+
AsyncRequestProducer requestProducer,
45+
AsyncResponseConsumer<T> responseConsumer,
46+
FutureCallback<T> callback) {
47+
throw new IllegalStateException("Connection has been closed; HTTP client is no longer usable");
48+
}
49+
}

src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpClientFactory.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,23 @@
77
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
88
import com.databricks.jdbc.log.JdbcLogger;
99
import com.databricks.jdbc.log.JdbcLoggerFactory;
10+
import com.google.common.annotations.VisibleForTesting;
1011
import java.io.IOException;
1112
import java.util.concurrent.ConcurrentHashMap;
1213

1314
public class DatabricksHttpClientFactory {
1415
private static final JdbcLogger LOGGER =
1516
JdbcLoggerFactory.getLogger(DatabricksHttpClientFactory.class);
1617
private static final DatabricksHttpClientFactory INSTANCE = new DatabricksHttpClientFactory();
17-
private final ConcurrentHashMap<SimpleEntry<String, HttpClientType>, DatabricksHttpClient>
18+
19+
/**
20+
* Maps (connectionUuid, type) → HTTP client. On {@link #closeConnection}, real clients are
21+
* replaced with {@link ClosedConnectionHttpClient#INSTANCE} tombstones. {@link #getClient}'s
22+
* {@code computeIfAbsent} returns the tombstone for closed connections (key already exists) and
23+
* creates a real client for new keys. No parallel sets needed — the closed marker lives in the
24+
* map itself, bounded by live (uuid, type) pairs. See issue #1325.
25+
*/
26+
private final ConcurrentHashMap<SimpleEntry<String, HttpClientType>, IDatabricksHttpClient>
1827
instances = new ConcurrentHashMap<>();
1928

2029
private DatabricksHttpClientFactory() {
@@ -29,25 +38,67 @@ public IDatabricksHttpClient getClient(IDatabricksConnectionContext context) {
2938
return getClient(context, HttpClientType.COMMON);
3039
}
3140

41+
/**
42+
* Returns an HTTP client for the given connection and type, creating one if needed. For closed
43+
* connections, returns the {@link ClosedConnectionHttpClient} sentinel — callers that attempt to
44+
* use it get an immediate {@link com.databricks.jdbc.exception.DatabricksHttpException} with a
45+
* clear message. Never returns null.
46+
*/
3247
public IDatabricksHttpClient getClient(
3348
IDatabricksConnectionContext context, HttpClientType type) {
3449
return instances.computeIfAbsent(
3550
getClientKey(context.getConnectionUuid(), type),
3651
k -> new DatabricksHttpClient(context, type));
3752
}
3853

54+
/**
55+
* Permanently closes all HTTP clients for the given connection and replaces them with tombstone
56+
* sentinels that reject further use. Called from {@link
57+
* com.databricks.jdbc.api.impl.DatabricksConnection#close()}.
58+
*/
59+
public void closeConnection(IDatabricksConnectionContext context) {
60+
String uuid = context.getConnectionUuid();
61+
for (HttpClientType type : HttpClientType.values()) {
62+
SimpleEntry<String, HttpClientType> key = getClientKey(uuid, type);
63+
IDatabricksHttpClient old = instances.put(key, ClosedConnectionHttpClient.INSTANCE);
64+
if (old != null && !(old instanceof ClosedConnectionHttpClient)) {
65+
closeQuietly(old);
66+
}
67+
}
68+
}
69+
70+
@VisibleForTesting
3971
public void removeClient(IDatabricksConnectionContext context) {
4072
for (HttpClientType type : HttpClientType.values()) {
4173
removeClient(context, type);
4274
}
4375
}
4476

77+
@VisibleForTesting
4578
public void removeClient(IDatabricksConnectionContext context, HttpClientType type) {
46-
DatabricksHttpClient instance =
79+
IDatabricksHttpClient instance =
4780
instances.remove(getClientKey(context.getConnectionUuid(), type));
48-
if (instance != null) {
81+
if (instance != null && !(instance instanceof ClosedConnectionHttpClient)) {
82+
closeQuietly(instance);
83+
}
84+
}
85+
86+
/** Resets all state. For test cleanup only. */
87+
@VisibleForTesting
88+
public void reset() {
89+
instances.forEach(
90+
(key, client) -> {
91+
if (!(client instanceof ClosedConnectionHttpClient)) {
92+
closeQuietly(client);
93+
}
94+
});
95+
instances.clear();
96+
}
97+
98+
private static void closeQuietly(IDatabricksHttpClient client) {
99+
if (client instanceof DatabricksHttpClient) {
49100
try {
50-
instance.close();
101+
((DatabricksHttpClient) client).close();
51102
} catch (IOException e) {
52103
LOGGER.debug("Caught error while closing http client. Error {}", e);
53104
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryClientFactory.java

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ public class TelemetryClientFactory {
3434
@VisibleForTesting
3535
final Map<String, TelemetryClientHolder> noauthTelemetryClientHolders = new ConcurrentHashMap<>();
3636

37+
/**
38+
* Tracks connection UUIDs that have been closed. {@link #getTelemetryClient} checks this set and
39+
* returns {@link NoopTelemetryClient} for closed connections, preventing re-creation of orphaned
40+
* TelemetryClients (issue #1325). Growth is bounded by total connections closed over JVM lifetime
41+
* (~80 bytes per UUID) — negligible for typical connection pool sizes.
42+
*/
43+
@VisibleForTesting final Set<String> closedConnectionUuids = ConcurrentHashMap.newKeySet();
44+
3745
private final ExecutorService telemetryExecutorService;
3846
private ScheduledExecutorService sharedSchedulerService;
3947

@@ -75,6 +83,11 @@ public static TelemetryClientFactory getInstance() {
7583
}
7684

7785
public ITelemetryClient getTelemetryClient(IDatabricksConnectionContext connectionContext) {
86+
// Reject closed connections to prevent re-creation of orphaned TelemetryClients (issue #1325).
87+
String uuid = connectionContext.getConnectionUuid();
88+
if (uuid != null && closedConnectionUuids.contains(uuid)) {
89+
return NoopTelemetryClient.getInstance();
90+
}
7891
if (!isTelemetryAllowedForConnection(connectionContext)) {
7992
return NoopTelemetryClient.getInstance();
8093
}
@@ -137,43 +150,54 @@ public ITelemetryClient getTelemetryClient(IDatabricksConnectionContext connecti
137150
/**
138151
* Closes telemetry client for a connection. Thread-safe: computeIfPresent ensures atomic locking,
139152
* preventing race conditions between connection removal and addition.
153+
*
154+
* <p>The connection UUID is removed from the open set FIRST to prevent getTelemetryClient() from
155+
* re-creating a TelemetryClient during or after the close sequence. Pending TelemetryCollector
156+
* events are exported BEFORE the TelemetryClient is closed (inside try-finally), so they are
157+
* flushed through the existing client. See GitHub issue #1325.
140158
*/
141159
public void closeTelemetryClient(IDatabricksConnectionContext connectionContext) {
142160
String key = TelemetryHelper.keyOf(connectionContext);
143161
String connectionUuid = connectionContext.getConnectionUuid();
144-
// Atomically remove connection and close client if no connections remain for this key
145-
telemetryClientHolders.computeIfPresent(
146-
key,
147-
(k, holder) -> {
148-
holder.connectionUuids.remove(connectionUuid);
149-
if (holder.connectionUuids.isEmpty()) {
150-
closeTelemetryClient(holder.client, "telemetry client");
151-
return null;
152-
}
153-
return holder;
154-
});
155-
// Atomically remove connection and close client if no connections remain for this key
156-
noauthTelemetryClientHolders.computeIfPresent(
157-
key,
158-
(k, holder) -> {
159-
holder.connectionUuids.remove(connectionUuid);
160-
if (holder.connectionUuids.isEmpty()) {
161-
closeTelemetryClient(holder.client, "unauthenticated telemetry client");
162-
return null;
163-
}
164-
return holder;
165-
});
166-
167-
// Export and remove the TelemetryCollector for this connection
168-
TelemetryCollector collector =
169-
TelemetryCollectorManager.getInstance().removeCollector(connectionContext);
170-
if (collector != null) {
171-
// Export any remaining telemetry before removing
172-
collector.exportAllPendingTelemetryDetails();
162+
163+
// Mark connection closed FIRST to prevent getTelemetryClient() from re-creating a
164+
// TelemetryClient during or after this close sequence (issue #1325).
165+
if (connectionUuid != null) {
166+
closedConnectionUuids.add(connectionUuid);
173167
}
174168

175-
// Clean up cached connection parameters to prevent memory leaks
176-
TelemetryHelper.removeConnectionParameters(connectionContext.getConnectionUuid());
169+
// Export pending events inside try-finally so holder cleanup always runs,
170+
// even if export throws (F6).
171+
try {
172+
TelemetryCollector collector =
173+
TelemetryCollectorManager.getInstance().removeCollector(connectionContext);
174+
if (collector != null) {
175+
collector.exportAllPendingTelemetryDetails();
176+
}
177+
} finally {
178+
telemetryClientHolders.computeIfPresent(
179+
key,
180+
(k, holder) -> {
181+
holder.connectionUuids.remove(connectionUuid);
182+
if (holder.connectionUuids.isEmpty()) {
183+
closeTelemetryClient(holder.client, "telemetry client");
184+
return null;
185+
}
186+
return holder;
187+
});
188+
noauthTelemetryClientHolders.computeIfPresent(
189+
key,
190+
(k, holder) -> {
191+
holder.connectionUuids.remove(connectionUuid);
192+
if (holder.connectionUuids.isEmpty()) {
193+
closeTelemetryClient(holder.client, "unauthenticated telemetry client");
194+
return null;
195+
}
196+
return holder;
197+
});
198+
199+
TelemetryHelper.removeConnectionParameters(connectionUuid);
200+
}
177201
}
178202

179203
public ExecutorService getTelemetryExecutorService() {
@@ -216,6 +240,7 @@ public void reset() {
216240
// Clear the maps
217241
telemetryClientHolders.clear();
218242
noauthTelemetryClientHolders.clear();
243+
closedConnectionUuids.clear();
219244

220245
// Clear cached connection parameters
221246
TelemetryHelper.clearConnectionParameterCache();

src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public void pushEvent(TelemetryRequest request) throws Exception {
4747
IDatabricksHttpClient httpClient =
4848
DatabricksHttpClientFactory.getInstance()
4949
.getClient(connectionContext, HttpClientType.TELEMETRY);
50+
if (httpClient instanceof com.databricks.jdbc.dbclient.impl.http.ClosedConnectionHttpClient) {
51+
// Connection was closed — sentinel returned to prevent socket leaks (issue #1325).
52+
LOGGER.debug("Skipping telemetry push: connection has been closed");
53+
return;
54+
}
5055
String path =
5156
isAuthenticated
5257
? PathConstants.TELEMETRY_PATH

src/test/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpClientTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.http.impl.client.CloseableHttpClient;
2626
import org.apache.http.impl.client.HttpClientBuilder;
2727
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
28+
import org.junit.jupiter.api.AfterEach;
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.Test;
3031
import org.junit.jupiter.api.extension.ExtendWith;
@@ -46,6 +47,12 @@ public class DatabricksHttpClientTest {
4647
@BeforeEach
4748
public void setUp() {
4849
databricksHttpClient = new DatabricksHttpClient(mockHttpClient, mockConnectionManager);
50+
DatabricksHttpClientFactory.getInstance().reset();
51+
}
52+
53+
@AfterEach
54+
public void tearDown() {
55+
DatabricksHttpClientFactory.getInstance().reset();
4956
}
5057

5158
@Test
@@ -269,8 +276,8 @@ public void testConcurrentClientCreation() throws InterruptedException, Executio
269276
() -> {
270277
IDatabricksConnectionContext connectionContext =
271278
mock(IDatabricksConnectionContext.class);
272-
when(connectionContext.getConnectionUuid())
273-
.thenReturn(UUID.randomUUID().toString());
279+
String uuid = UUID.randomUUID().toString();
280+
when(connectionContext.getConnectionUuid()).thenReturn(uuid);
274281
when(connectionContext.getHttpMaxConnectionsPerRoute()).thenReturn(100);
275282
IDatabricksHttpClient client =
276283
DatabricksHttpClientFactory.getInstance().getClient(connectionContext);

src/test/java/com/databricks/jdbc/telemetry/TelemetryClientFactoryTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ private void setupTelemetryHelperMock(MockedStatic<TelemetryHelper> mockedStatic
308308

309309
private void setupMocksForTelemetryClient(IDatabricksConnectionContext context) {
310310
TelemetryClientFactory.getInstance().closeTelemetryClient(context);
311+
// Remove from closed set so subsequent getTelemetryClient() calls succeed in tests
312+
TelemetryClientFactory.getInstance().closedConnectionUuids.remove(context.getConnectionUuid());
311313
TelemetryAuthHelper.setupAuthMocks(context, clientConfigurator);
312314
Map<String, String> featureFlagMap = new HashMap<>();
313315
featureFlagMap.put(TELEMETRY_FEATURE_FLAG_NAME, "true");

0 commit comments

Comments
 (0)