Skip to content

Commit 79e26b8

Browse files
authored
feat(bigquery-jdbc): support connection-scoped executor isolation and dynamic queue warnings (#13413)
b/519201498 This PR implements connection-scoped thread pool isolation and dynamic saturation monitoring for the BigQuery JDBC driver. * **Connection-Scoped Pools**: Spawns independent `queryExecutor` and `metadataExecutor` instances per connection, properly cleaning them up on connection close. * **Daemon Thread Factory**: Configures the connection thread factory to mark all executor threads as daemon threads, preventing JVM shutdown hangs. * **Hysteresis Saturation Warnings**: Implements lightweight queue saturation logging that triggers dynamically when the queue reaches `max(10, corePoolSize * 5)` and resets only when it recovers below `max(4, corePoolSize * 2)`. * **Testing Improvements**: Refactors `BigQueryJdbcMdcTest` to inherit from `BigQueryJdbcLoggingBaseTest`, and adds unit tests to verify lifecycle, saturation logging, and hysteresis behavior.
1 parent 1674b80 commit 79e26b8

6 files changed

Lines changed: 413 additions & 20 deletions

File tree

java-bigquery-jdbc/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ target-it/**
33
**/*logs*/**
44
**/ITBigQueryJDBCLocalTest.java
55
**/BigQueryStatementE2EBenchmark.java
6+
.agents/
67

78
tools/**/*.class
89
tools/**/drivers/**

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.UUID;
6363
import java.util.concurrent.ConcurrentHashMap;
6464
import java.util.concurrent.Executor;
65+
import java.util.concurrent.ExecutorService;
6566
import java.util.concurrent.TimeUnit;
6667

6768
/**
@@ -209,6 +210,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
209210
Boolean reqGoogleDriveScope;
210211
private final Properties clientInfo = new Properties();
211212
private boolean isReadOnlyTokenUsed = false;
213+
private final ExecutorService metadataExecutor;
214+
private final ExecutorService queryExecutor;
212215

213216
BigQueryConnection(String url) throws IOException {
214217
this(url, DataSource.fromUrl(url));
@@ -344,6 +347,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
344347

345348
this.headerProvider = createHeaderProvider();
346349
this.bigQuery = getBigQueryConnection();
350+
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
351+
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
347352
}
348353
}
349354

@@ -937,23 +942,91 @@ public void close() throws SQLException {
937942
}
938943

939944
private void closeImpl() throws SQLException {
945+
SQLException exceptionToThrow = null;
940946
try {
941947
if (this.bigQueryReadClient != null) {
942948
this.bigQueryReadClient.shutdown();
943-
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
944-
this.bigQueryReadClient.close();
945949
}
946-
947950
if (this.bigQueryWriteClient != null) {
948951
this.bigQueryWriteClient.shutdown();
949-
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
950-
this.bigQueryWriteClient.close();
952+
}
953+
if (this.metadataExecutor != null) {
954+
this.metadataExecutor.shutdown();
955+
}
956+
if (this.queryExecutor != null) {
957+
this.queryExecutor.shutdown();
951958
}
952959

953960
for (Statement statement : this.openStatements) {
954-
statement.close();
961+
try {
962+
statement.close();
963+
} catch (SQLException e) {
964+
if (exceptionToThrow == null) {
965+
exceptionToThrow = e;
966+
} else {
967+
exceptionToThrow.addSuppressed(e);
968+
}
969+
}
955970
}
956971
this.openStatements.clear();
972+
973+
boolean interrupted = Thread.currentThread().isInterrupted();
974+
975+
try {
976+
if (this.bigQueryReadClient != null) {
977+
if (interrupted) {
978+
this.bigQueryReadClient.shutdownNow();
979+
} else {
980+
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
981+
}
982+
}
983+
if (this.bigQueryWriteClient != null) {
984+
if (interrupted) {
985+
this.bigQueryWriteClient.shutdownNow();
986+
} else {
987+
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
988+
}
989+
}
990+
if (this.metadataExecutor != null) {
991+
if (interrupted || !this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
992+
this.metadataExecutor.shutdownNow();
993+
}
994+
}
995+
if (this.queryExecutor != null) {
996+
if (interrupted || !this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
997+
this.queryExecutor.shutdownNow();
998+
}
999+
}
1000+
} catch (InterruptedException e) {
1001+
interrupted = true;
1002+
if (this.bigQueryReadClient != null) {
1003+
this.bigQueryReadClient.shutdownNow();
1004+
}
1005+
if (this.bigQueryWriteClient != null) {
1006+
this.bigQueryWriteClient.shutdownNow();
1007+
}
1008+
if (this.metadataExecutor != null) {
1009+
this.metadataExecutor.shutdownNow();
1010+
}
1011+
if (this.queryExecutor != null) {
1012+
this.queryExecutor.shutdownNow();
1013+
}
1014+
} finally {
1015+
try {
1016+
if (this.bigQueryReadClient != null) {
1017+
this.bigQueryReadClient.close();
1018+
}
1019+
} finally {
1020+
if (this.bigQueryWriteClient != null) {
1021+
this.bigQueryWriteClient.close();
1022+
}
1023+
}
1024+
}
1025+
1026+
if (interrupted) {
1027+
Thread.currentThread().interrupt();
1028+
throw new InterruptedException("Interrupted awaiting executor termination");
1029+
}
9571030
} catch (ConcurrentModificationException ex) {
9581031
throw new BigQueryJdbcException("Concurrent modification during close", ex);
9591032
} catch (InterruptedException e) {
@@ -962,9 +1035,20 @@ private void closeImpl() throws SQLException {
9621035
BigQueryJdbcMdc.clear();
9631036
BigQueryJdbcRootLogger.closeConnectionHandler(this.connectionId);
9641037
}
1038+
if (exceptionToThrow != null) {
1039+
throw exceptionToThrow;
1040+
}
9651041
this.isClosed = true;
9661042
}
9671043

1044+
ExecutorService getExecutorService() {
1045+
return this.queryExecutor;
1046+
}
1047+
1048+
ExecutorService getMetadataExecutor() {
1049+
return this.metadataExecutor;
1050+
}
1051+
9681052
@Override
9691053
public boolean isClosed() {
9701054
return this.isClosed;

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
import java.util.concurrent.ThreadFactory;
2727
import java.util.concurrent.ThreadPoolExecutor;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
3132
class BigQueryJdbcMdc {
33+
private static final BigQueryJdbcCustomLogger LOG =
34+
new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName());
35+
3236
private static final InheritableThreadLocal<String> currentConnectionId =
3337
new InheritableThreadLocal<>();
3438

@@ -56,13 +60,16 @@ static void clear() {
5660
* context from the submitting thread to the executing thread.
5761
*/
5862
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
59-
return new MdcThreadPoolExecutor(
60-
nThreads,
61-
nThreads,
62-
0L,
63-
TimeUnit.MILLISECONDS,
64-
new LinkedBlockingQueue<>(),
65-
new MdcThreadFactory(threadFactory));
63+
MdcThreadPoolExecutor executor =
64+
new MdcThreadPoolExecutor(
65+
nThreads,
66+
nThreads,
67+
60L,
68+
TimeUnit.SECONDS,
69+
new LinkedBlockingQueue<>(),
70+
new MdcThreadFactory(threadFactory));
71+
executor.allowCoreThreadTimeOut(true);
72+
return executor;
6673
}
6774

6875
/**
@@ -73,6 +80,28 @@ static ExecutorService newFixedThreadPool(int nThreads) {
7380
return newFixedThreadPool(nThreads, Executors.defaultThreadFactory());
7481
}
7582

83+
/**
84+
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
85+
* context from the submitting thread to the executing thread.
86+
*/
87+
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
88+
return new MdcThreadPoolExecutor(
89+
0,
90+
Integer.MAX_VALUE,
91+
60L,
92+
TimeUnit.SECONDS,
93+
new java.util.concurrent.SynchronousQueue<>(),
94+
new MdcThreadFactory(threadFactory));
95+
}
96+
97+
/**
98+
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
99+
* context from the submitting thread to the executing thread.
100+
*/
101+
static ExecutorService newCachedThreadPool() {
102+
return newCachedThreadPool(Executors.defaultThreadFactory());
103+
}
104+
76105
private static class MdcThreadFactory implements ThreadFactory {
77106
private final ThreadFactory delegate;
78107

@@ -82,11 +111,16 @@ public MdcThreadFactory(ThreadFactory delegate) {
82111

83112
@Override
84113
public Thread newThread(Runnable r) {
85-
return delegate.newThread(
86-
() -> {
87-
clear();
88-
r.run();
89-
});
114+
Thread t =
115+
delegate.newThread(
116+
() -> {
117+
clear();
118+
r.run();
119+
});
120+
if (t != null) {
121+
t.setDaemon(true);
122+
}
123+
return t;
90124
}
91125
}
92126

@@ -102,11 +136,37 @@ public MdcThreadPoolExecutor(
102136
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
103137
}
104138

139+
private final AtomicBoolean warningLogged = new AtomicBoolean(false);
140+
141+
private void monitorQueueSaturation(int queueSize) {
142+
int maxPoolSize = getMaximumPoolSize();
143+
// Warn when queue size is >= maxPoolSize * 5, with a minimum of 10 tasks to avoid false
144+
// alerts for tiny pools
145+
int warnThreshold = Math.max(10, maxPoolSize * 5);
146+
// Recovery reset threshold is maxPoolSize * 2, with a minimum of 4 tasks
147+
int recoveryThreshold = Math.max(4, maxPoolSize * 2);
148+
149+
if (queueSize >= warnThreshold) {
150+
if (warningLogged.compareAndSet(false, true)) {
151+
LOG.warning(
152+
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
153+
maxPoolSize, getActiveCount(), queueSize);
154+
}
155+
} else if (queueSize <= recoveryThreshold) {
156+
if (warningLogged.get()) {
157+
warningLogged.set(false);
158+
}
159+
}
160+
}
161+
105162
@Override
106163
public void execute(Runnable command) {
107164
if (command == null) {
108165
throw new NullPointerException();
109166
}
167+
168+
monitorQueueSaturation(getQueue().size());
169+
110170
if (command instanceof MdcFutureTask) {
111171
super.execute(command);
112172
} else {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,8 +1077,9 @@ public Integer getMetadataFetchThreadCount() {
10771077

10781078
public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
10791079
if (metadataFetchThreadCount != null) {
1080-
validateNonNegative(
1080+
validateMin(
10811081
metadataFetchThreadCount,
1082+
1,
10821083
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME);
10831084
}
10841085
this.metadataFetchThreadCount = metadataFetchThreadCount;
@@ -1387,4 +1388,13 @@ private static void validateNonNegative(long val, String propertyName) {
13871388
"Invalid value for %s. It must be greater than or equal to 0.", propertyName));
13881389
}
13891390
}
1391+
1392+
/** Validates that a property value is greater than or equal to a minimum threshold. */
1393+
private static void validateMin(long val, long min, String propertyName) {
1394+
if (val < min) {
1395+
throw new BigQueryJdbcRuntimeException(
1396+
String.format(
1397+
"Invalid value for %s. It must be greater than or equal to %d.", propertyName, min));
1398+
}
1399+
}
13901400
}

0 commit comments

Comments
 (0)