Skip to content

Commit 4b02ee5

Browse files
Merge branch 'main' into jdbc/parse-url
2 parents 32109aa + 16cff56 commit 4b02ee5

11 files changed

Lines changed: 623 additions & 77 deletions

File tree

CONTRIBUTING.md

Lines changed: 294 additions & 42 deletions
Large diffs are not rendered by default.

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDaemonPollingTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class BigQueryDaemonPollingTask extends Thread {
4545
private BigQueryDaemonPollingTask(
4646
List<BigQueryResultSetFinalizers.ArrowResultSetFinalizer> arrowRsFinalizers,
4747
ReferenceQueue<BigQueryArrowResultSet> referenceQueueArrowRs) {
48+
super("BigQuery-GC-Daemon-Arrow");
4849
BigQueryDaemonPollingTask.referenceQueueArrowRs = referenceQueueArrowRs;
4950
BigQueryDaemonPollingTask.arrowRsFinalizers = arrowRsFinalizers;
5051
setDaemon(true);
@@ -53,6 +54,7 @@ private BigQueryDaemonPollingTask(
5354
private BigQueryDaemonPollingTask(
5455
ReferenceQueue<BigQueryJsonResultSet> referenceQueueJsonRs,
5556
List<BigQueryResultSetFinalizers.JsonResultSetFinalizer> jsonRsFinalizers) {
57+
super("BigQuery-GC-Daemon-Json");
5658
BigQueryDaemonPollingTask.referenceQueueJsonRs = referenceQueueJsonRs;
5759
BigQueryDaemonPollingTask.jsonRsFinalizers = jsonRsFinalizers;
5860
setDaemon(true);

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@
1616

1717
package com.google.cloud.bigquery.jdbc;
1818

19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.FutureTask;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.concurrent.RunnableFuture;
26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
1930
/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
2031
class BigQueryJdbcMdc {
2132
private static final InheritableThreadLocal<String> currentConnectionId =
@@ -40,6 +51,110 @@ static void clear() {
4051
currentConnectionId.remove();
4152
}
4253

54+
/**
55+
* Creates a new fixed thread pool ExecutorService that automatically propagates MDC connection
56+
* context from the submitting thread to the executing thread.
57+
*/
58+
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
59+
return new MdcThreadPoolExecutor(
60+
nThreads,
61+
nThreads,
62+
0L,
63+
TimeUnit.MILLISECONDS,
64+
new LinkedBlockingQueue<>(),
65+
new MdcThreadFactory(threadFactory));
66+
}
67+
68+
/**
69+
* Creates a new fixed thread pool ExecutorService that automatically propagates MDC connection
70+
* context from the submitting thread to the executing thread.
71+
*/
72+
static ExecutorService newFixedThreadPool(int nThreads) {
73+
return newFixedThreadPool(nThreads, Executors.defaultThreadFactory());
74+
}
75+
76+
private static class MdcThreadFactory implements ThreadFactory {
77+
private final ThreadFactory delegate;
78+
79+
public MdcThreadFactory(ThreadFactory delegate) {
80+
this.delegate = delegate;
81+
}
82+
83+
@Override
84+
public Thread newThread(Runnable r) {
85+
return delegate.newThread(
86+
() -> {
87+
clear();
88+
r.run();
89+
});
90+
}
91+
}
92+
93+
private static class MdcThreadPoolExecutor extends ThreadPoolExecutor {
94+
95+
public MdcThreadPoolExecutor(
96+
int corePoolSize,
97+
int maximumPoolSize,
98+
long keepAliveTime,
99+
TimeUnit unit,
100+
BlockingQueue<Runnable> workQueue,
101+
ThreadFactory threadFactory) {
102+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
103+
}
104+
105+
@Override
106+
public void execute(Runnable command) {
107+
if (command == null) {
108+
throw new NullPointerException();
109+
}
110+
if (command instanceof MdcFutureTask) {
111+
super.execute(command);
112+
} else {
113+
super.execute(wrap(command));
114+
}
115+
}
116+
117+
@Override
118+
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
119+
return new MdcFutureTask<>(runnable, value, getConnectionId());
120+
}
121+
122+
@Override
123+
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
124+
return new MdcFutureTask<>(callable, getConnectionId());
125+
}
126+
127+
private static Runnable wrap(Runnable runnable) {
128+
String connectionId = getConnectionId();
129+
return () -> {
130+
try (MdcCloseable mdc = registerInstance(connectionId)) {
131+
runnable.run();
132+
}
133+
};
134+
}
135+
}
136+
137+
private static class MdcFutureTask<V> extends FutureTask<V> {
138+
private final String connectionId;
139+
140+
public MdcFutureTask(Runnable runnable, V result, String connectionId) {
141+
super(runnable, result);
142+
this.connectionId = connectionId;
143+
}
144+
145+
public MdcFutureTask(Callable<V> callable, String connectionId) {
146+
super(callable);
147+
this.connectionId = connectionId;
148+
}
149+
150+
@Override
151+
public void run() {
152+
try (MdcCloseable mdc = registerInstance(connectionId)) {
153+
super.run();
154+
}
155+
}
156+
}
157+
43158
/**
44159
* Functional interface that extends AutoCloseable to avoid throwing checked exceptions in
45160
* try-with-resources.

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void testUnknownPropertyWarningIsLogged() throws SQLException {
112112
Connection connection =
113113
bigQueryDriver.connect(
114114
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"
115-
+ "OAuthType=2;OAuthAccessToken=redactedToken;ProjectId=t;LogLevel=3;"
115+
+ "OAuthType=2;OAuthAccessToken=redactedToken;ProjectId=t;LogLevel=3;LogPath=logs/;"
116116
+ "MyUnknownSetting=Value",
117117
new Properties());
118118
assertThat(connection.isClosed()).isFalse();
@@ -134,7 +134,7 @@ public void testMalformedUrlExceptionIsLogged() {
134134
() ->
135135
bigQueryDriver.connect(
136136
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"
137-
+ "OAuthType=2;OAuthAccessToken=redactedToken;ProjectId=t;LogLevel=3;"
137+
+ "OAuthType=2;OAuthAccessToken=redactedToken;ProjectId=t;LogLevel=3;LogPath=logs/;"
138138
+ "MalformedPropertyWithoutEquals",
139139
new Properties()));
140140

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.junit.jupiter.api.Assertions.assertNull;
21+
import static org.junit.jupiter.api.Assertions.assertThrows;
2122
import static org.junit.jupiter.api.Assertions.assertTrue;
2223

2324
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.FutureTask;
28+
import java.util.concurrent.RunnableFuture;
2429
import java.util.concurrent.TimeUnit;
2530
import java.util.concurrent.atomic.AtomicReference;
2631
import org.junit.jupiter.api.AfterEach;
@@ -131,4 +136,119 @@ public void testMdcCloseableClearsContext() {
131136
}
132137
assertNull(BigQueryJdbcMdc.getConnectionId());
133138
}
139+
140+
@Test
141+
public void testExecutorPropagatesMdc() throws Exception {
142+
BigQueryJdbcMdc.registerInstance("JdbcConnection-Executor-Test");
143+
ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(2);
144+
145+
try {
146+
// Test Runnable submission
147+
CountDownLatch runnableLatch = new CountDownLatch(1);
148+
AtomicReference<String> runnableMdcVal = new AtomicReference<>();
149+
executor.execute(
150+
() -> {
151+
runnableMdcVal.set(BigQueryJdbcMdc.getConnectionId());
152+
runnableLatch.countDown();
153+
});
154+
assertTrue(runnableLatch.await(5, TimeUnit.SECONDS));
155+
assertEquals("JdbcConnection-Executor-Test", runnableMdcVal.get());
156+
157+
// Test Callable submission
158+
Future<String> callableFuture =
159+
executor.submit(
160+
() -> {
161+
return BigQueryJdbcMdc.getConnectionId();
162+
});
163+
assertEquals("JdbcConnection-Executor-Test", callableFuture.get(5, TimeUnit.SECONDS));
164+
165+
// Test context is cleared on worker thread after task completion
166+
CountDownLatch cleanupLatch = new CountDownLatch(1);
167+
AtomicReference<String> postTaskMdcVal = new AtomicReference<>("initial-non-null");
168+
executor.execute(
169+
() -> {
170+
// Run a task on a potentially reused thread
171+
postTaskMdcVal.set(BigQueryJdbcMdc.getConnectionId());
172+
cleanupLatch.countDown();
173+
});
174+
assertTrue(cleanupLatch.await(5, TimeUnit.SECONDS));
175+
// Since the main thread has context set, the second task will also capture and set it.
176+
// Let's clear the context on the main thread, and submit a task to check if the thread-local
177+
// is clean for a thread that has previously executed tasks.
178+
BigQueryJdbcMdc.clear();
179+
CountDownLatch cleanMainLatch = new CountDownLatch(1);
180+
AtomicReference<String> cleanThreadMdcVal = new AtomicReference<>("initial-non-null");
181+
executor.execute(
182+
() -> {
183+
cleanThreadMdcVal.set(BigQueryJdbcMdc.getConnectionId());
184+
cleanMainLatch.countDown();
185+
});
186+
assertTrue(cleanMainLatch.await(5, TimeUnit.SECONDS));
187+
// It should be null because the submitting thread had no context set,
188+
// and the previous task's close() call cleaned the ThreadLocal.
189+
assertNull(cleanThreadMdcVal.get());
190+
191+
} finally {
192+
executor.shutdownNow();
193+
}
194+
}
195+
196+
@Test
197+
public void testExecutorThrowsNpeOnNullCommand() {
198+
ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(2);
199+
try {
200+
assertThrows(NullPointerException.class, () -> executor.execute(null));
201+
} finally {
202+
executor.shutdownNow();
203+
}
204+
}
205+
206+
@Test
207+
public void testExecutorWrapsCustomRunnableFuture() throws Exception {
208+
BigQueryJdbcMdc.registerInstance("JdbcConnection-CustomFuture-Test");
209+
ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(2);
210+
try {
211+
CountDownLatch latch = new CountDownLatch(1);
212+
AtomicReference<String> mdcVal = new AtomicReference<>();
213+
RunnableFuture<Void> customFuture =
214+
new FutureTask<>(
215+
() -> {
216+
mdcVal.set(BigQueryJdbcMdc.getConnectionId());
217+
latch.countDown();
218+
},
219+
null);
220+
221+
executor.execute(customFuture);
222+
assertTrue(latch.await(5, TimeUnit.SECONDS));
223+
assertEquals("JdbcConnection-CustomFuture-Test", mdcVal.get());
224+
} finally {
225+
executor.shutdownNow();
226+
}
227+
}
228+
229+
@Test
230+
public void testPoolThreadInheritanceSevered() throws Exception {
231+
BigQueryJdbcMdc.registerInstance("JdbcConnection-ParentContext");
232+
ExecutorService executor = BigQueryJdbcMdc.newFixedThreadPool(1);
233+
try {
234+
CountDownLatch initLatch = new CountDownLatch(1);
235+
executor.execute(initLatch::countDown);
236+
assertTrue(initLatch.await(5, TimeUnit.SECONDS));
237+
238+
BigQueryJdbcMdc.clear();
239+
240+
CountDownLatch taskLatch = new CountDownLatch(1);
241+
AtomicReference<String> workerMdcVal = new AtomicReference<>("initial-non-null");
242+
executor.execute(
243+
() -> {
244+
workerMdcVal.set(BigQueryJdbcMdc.getConnectionId());
245+
taskLatch.countDown();
246+
});
247+
248+
assertTrue(taskLatch.await(5, TimeUnit.SECONDS));
249+
assertNull(workerMdcVal.get());
250+
} finally {
251+
executor.shutdownNow();
252+
}
253+
}
134254
}

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
public class MetricsImpl implements Metrics, Closeable {
8888

8989
public static final String CUSTOM_METRIC = "bigtable.internal.enable-custom-metric";
90+
public static final String CUSTOM_METRIC_PREFIX = "bigtable.custom";
9091

9192
private static final boolean enableCustomMetric =
9293
Optional.ofNullable(System.getProperty(CUSTOM_METRIC))
@@ -319,7 +320,13 @@ public static OpenTelemetrySdk createBuiltinOtel(
319320
new BigtablePeriodicReader(
320321
new BigtableFilteringExporter(
321322
exporter,
322-
input -> input.getName().startsWith("bigtable.googleapis.com/internal/client")),
323+
input -> {
324+
// filter out custom metrics and keep everything else that's registered on metric
325+
// registry
326+
String name = input.getName();
327+
return metricRegistry.getMetric(name) != null
328+
&& !name.startsWith(CUSTOM_METRIC_PREFIX);
329+
}),
323330
executor));
324331

325332
if (enableCustomMetric) {
@@ -333,7 +340,7 @@ public static OpenTelemetrySdk createBuiltinOtel(
333340
PeriodicMetricReader.builder(
334341
new BigtableFilteringExporter(
335342
GoogleCloudMetricExporter.createWithConfiguration(metricConfig),
336-
input -> input.getName().startsWith("bigtable.custom")))
343+
input -> input.getName().startsWith(CUSTOM_METRIC_PREFIX)))
337344
.setInterval(Duration.ofMinutes(1))
338345
.build());
339346
}

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/metrics/CustomAttemptLatency.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.bigtable.v2.ClusterInformation;
1919
import com.google.bigtable.v2.PeerInfo;
20+
import com.google.cloud.bigtable.data.v2.internal.csm.MetricsImpl;
2021
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
2122
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.MethodInfo;
2223
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.Util;
@@ -33,7 +34,7 @@
3334
* exported as a custom metric.
3435
*/
3536
public class CustomAttemptLatency extends MetricWrapper<CustomSchema> {
36-
private static final String NAME = "bigtable.custom.attempt_latencies";
37+
private static final String NAME = MetricsImpl.CUSTOM_METRIC_PREFIX + ".attempt_latencies";
3738

3839
public CustomAttemptLatency() {
3940
super(CustomSchema.INSTANCE, NAME);

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricRegistryExportTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,38 @@ void testPacemaker() {
775775
.build());
776776
}
777777

778+
@Test
779+
void testGrpcMetricExport() {
780+
// 1. Record a gRPC metric using its internal name
781+
// Note: gRPC metrics are usually recorded by gRPC-Java's OpenTelemetry integration,
782+
// but we can simulate it by getting the instrument from the meter directly.
783+
meterProvider
784+
.get("grpc-java")
785+
.histogramBuilder("grpc.client.attempt.duration")
786+
.build()
787+
.record(
788+
123.0,
789+
io.opentelemetry.api.common.Attributes.of(
790+
io.opentelemetry.api.common.AttributeKey.stringKey("grpc.status"), "OK",
791+
io.opentelemetry.api.common.AttributeKey.stringKey("grpc.method"),
792+
"Bigtable.ReadRows"));
793+
794+
// 2. Flush the metrics to the exporter
795+
metricReader.forceFlush().join(1, TimeUnit.MINUTES);
796+
797+
// 3. Verify that the metric was exported under the Bigtable namespace
798+
// The internal 'grpc.client.attempt.duration' should be renamed by GrpcMetric.java
799+
TimeSeries timeSeries =
800+
metricService.getSingleTimeSeriesByName(
801+
"bigtable.googleapis.com/internal/client/grpc/client/attempt/duration");
802+
803+
// 4. Verify labels are correctly mapped (dots replaced with underscores)
804+
assertThat(timeSeries.getMetric().getLabelsMap())
805+
.containsAtLeast(
806+
"grpc_status", "OK",
807+
"grpc_method", "Bigtable.ReadRows");
808+
}
809+
778810
private static class FakeMetricService extends MetricServiceImplBase {
779811
final BlockingDeque<CreateTimeSeriesRequest> requests = new LinkedBlockingDeque<>();
780812

0 commit comments

Comments
 (0)