Skip to content

Commit f562667

Browse files
feat(bqjdbc): Per connection logs - Add BigQueryJdbcMdc (#12833)
PR 1 of Per connection logging implementation. - Implements BigQueryJdbcMdc using an InheritableThreadLocal design and corresponding unit tests for verification. --------- Co-authored-by: cloud-java-bot <cloud-java-bot@google.com>
1 parent 67fa52f commit f562667

2 files changed

Lines changed: 232 additions & 0 deletions

File tree

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.jdbc;
18+
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
/**
23+
* Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal.
24+
* Allocates a dedicated, independent InheritableThreadLocal object per concrete BigQueryConnection
25+
* instance.
26+
*/
27+
public class BigQueryJdbcMdc {
28+
private static final AtomicLong nextId = new AtomicLong(1);
29+
private static final ConcurrentHashMap<BigQueryConnection, InheritableThreadLocal<String>>
30+
instanceLocals = new ConcurrentHashMap<>();
31+
private static final ConcurrentHashMap<BigQueryConnection, String> instanceIds =
32+
new ConcurrentHashMap<>();
33+
34+
/** Allocates an exclusive InheritableThreadLocal and registers the connection mapping. */
35+
private static final InheritableThreadLocal<String> currentConnectionId =
36+
new InheritableThreadLocal<>();
37+
38+
public static void registerInstance(BigQueryConnection connection, String id) {
39+
if (connection != null) {
40+
String cleanId =
41+
instanceIds.computeIfAbsent(
42+
connection,
43+
k -> {
44+
String suffix =
45+
(id != null && !id.isEmpty()) ? id : String.valueOf(nextId.getAndIncrement());
46+
return "JdbcConnection-" + suffix;
47+
});
48+
49+
currentConnectionId.set(cleanId);
50+
InheritableThreadLocal<String> threadLocal =
51+
instanceLocals.computeIfAbsent(connection, k -> new InheritableThreadLocal<>());
52+
threadLocal.set(cleanId);
53+
}
54+
}
55+
56+
/**
57+
* Returns the connection ID carried by any registered active connection on the current thread.
58+
*/
59+
public static String getConnectionId() {
60+
return currentConnectionId.get();
61+
}
62+
63+
/** Clears the connection ID context from all active connection contexts on the current thread. */
64+
public static void removeInstance(BigQueryConnection connection) {
65+
if (connection != null) {
66+
InheritableThreadLocal<String> local = instanceLocals.remove(connection);
67+
if (local != null) {
68+
local.remove();
69+
}
70+
instanceIds.remove(connection);
71+
}
72+
}
73+
74+
public static void clear() {
75+
currentConnectionId.remove();
76+
for (InheritableThreadLocal<String> local : instanceLocals.values()) {
77+
local.remove();
78+
}
79+
}
80+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.jdbc;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNull;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.mockito.Mockito;
30+
31+
public class BigQueryJdbcMdcTest {
32+
33+
private BigQueryConnection mockConnection1;
34+
private BigQueryConnection mockConnection2;
35+
36+
@BeforeEach
37+
public void setUp() {
38+
mockConnection1 = Mockito.mock(BigQueryConnection.class);
39+
mockConnection2 = Mockito.mock(BigQueryConnection.class);
40+
}
41+
42+
@AfterEach
43+
public void tearDown() {
44+
BigQueryJdbcMdc.clear();
45+
}
46+
47+
@Test
48+
public void testRegisterAndRetrieveConnectionId() {
49+
BigQueryJdbcMdc.registerInstance(mockConnection1, "123");
50+
assertEquals("JdbcConnection-123", BigQueryJdbcMdc.getConnectionId());
51+
}
52+
53+
@Test
54+
public void testRemoveInstance() {
55+
BigQueryJdbcMdc.registerInstance(mockConnection1, "1");
56+
assertEquals("JdbcConnection-1", BigQueryJdbcMdc.getConnectionId());
57+
58+
BigQueryJdbcMdc.removeInstance(mockConnection1);
59+
// Note: removeInstance does not clear currentConnectionId on the current thread
60+
// based on current implementation.
61+
assertEquals("JdbcConnection-1", BigQueryJdbcMdc.getConnectionId());
62+
63+
BigQueryJdbcMdc.clear();
64+
assertNull(BigQueryJdbcMdc.getConnectionId());
65+
}
66+
67+
@Test
68+
public void testClearContext() {
69+
BigQueryJdbcMdc.registerInstance(mockConnection1, "456");
70+
assertEquals("JdbcConnection-456", BigQueryJdbcMdc.getConnectionId());
71+
72+
BigQueryJdbcMdc.clear();
73+
assertNull(BigQueryJdbcMdc.getConnectionId());
74+
}
75+
76+
@Test
77+
public void testThreadInheritance() throws InterruptedException {
78+
BigQueryJdbcMdc.registerInstance(mockConnection1, "parent");
79+
assertEquals("JdbcConnection-parent", BigQueryJdbcMdc.getConnectionId());
80+
81+
AtomicReference<String> childConnectionId = new AtomicReference<>();
82+
CountDownLatch latch = new CountDownLatch(1);
83+
84+
Thread childThread =
85+
new Thread(
86+
() -> {
87+
childConnectionId.set(BigQueryJdbcMdc.getConnectionId());
88+
latch.countDown();
89+
});
90+
childThread.start();
91+
assertTrue(latch.await(5, TimeUnit.SECONDS));
92+
93+
assertEquals("JdbcConnection-parent", childConnectionId.get());
94+
}
95+
96+
@Test
97+
public void testThreadIsolation() throws InterruptedException {
98+
CountDownLatch threadARegistered = new CountDownLatch(1);
99+
CountDownLatch threadBChecked = new CountDownLatch(1);
100+
CountDownLatch threadBRegistered = new CountDownLatch(1);
101+
CountDownLatch testFinished = new CountDownLatch(2);
102+
103+
AtomicReference<String> threadAIdBeforeB = new AtomicReference<>();
104+
AtomicReference<String> threadAIdAfterB = new AtomicReference<>();
105+
AtomicReference<String> threadBIdBeforeRegister = new AtomicReference<>();
106+
AtomicReference<String> threadBIdAfterRegister = new AtomicReference<>();
107+
108+
Thread threadA =
109+
new Thread(
110+
() -> {
111+
try {
112+
BigQueryJdbcMdc.registerInstance(mockConnection1, "A");
113+
threadAIdBeforeB.set(BigQueryJdbcMdc.getConnectionId());
114+
threadARegistered.countDown();
115+
116+
threadBRegistered.await();
117+
threadAIdAfterB.set(BigQueryJdbcMdc.getConnectionId());
118+
} catch (InterruptedException e) {
119+
Thread.currentThread().interrupt();
120+
} finally {
121+
testFinished.countDown();
122+
}
123+
});
124+
125+
Thread threadB =
126+
new Thread(
127+
() -> {
128+
try {
129+
threadARegistered.await();
130+
threadBIdBeforeRegister.set(BigQueryJdbcMdc.getConnectionId());
131+
132+
BigQueryJdbcMdc.registerInstance(mockConnection2, "B");
133+
threadBIdAfterRegister.set(BigQueryJdbcMdc.getConnectionId());
134+
threadBRegistered.countDown();
135+
} catch (InterruptedException e) {
136+
Thread.currentThread().interrupt();
137+
} finally {
138+
testFinished.countDown();
139+
}
140+
});
141+
142+
threadA.start();
143+
threadB.start();
144+
145+
assertTrue(testFinished.await(5, TimeUnit.SECONDS));
146+
147+
assertEquals("JdbcConnection-A", threadAIdBeforeB.get());
148+
assertNull(threadBIdBeforeRegister.get());
149+
assertEquals("JdbcConnection-B", threadBIdAfterRegister.get());
150+
assertEquals("JdbcConnection-A", threadAIdAfterB.get());
151+
}
152+
}

0 commit comments

Comments
 (0)