Skip to content

Commit 341e51a

Browse files
authored
feat(bigquery-jdbc): migrate statement execution thread tracking to connection-scoped executor (#13454)
b/519201498 This PR migrates the asynchronous processing tasks of the JDBC driver away from the legacy static global thread pool (`queryTaskExecutor`) to the modern, connection-scoped `ExecutorService` (`connection.getExecutorService()`). ### Key Changes 1. **Statement & Stream Refactoring**: - Removed the static `queryTaskExecutor` from `BigQueryStatement`. - Updated `populateArrowBufferedQueue` to submit streaming and buffering tasks to the connection-scoped executor pool. - Converted background thread tracking in `BigQueryStatement` and `BigQueryArrowResultSet` from raw `Thread` references to modern, cancelable `Future<?>` task handles. 2. **Isolated Backward-Compatibility Wrapper**: - Isolated the legacy raw thread wrapping to `BigQueryDatabaseMetaData` (which is still pending future refactoring to the executor pool). - Implemented a temporary `wrapThread(Thread)` helper method inside `BigQueryDatabaseMetaData` to wrap raw background threads into cancelable `Future<?>` adapters on the fly before passing them to the result set. - Hardened the anonymous `Future` wrapper state tracking to strictly satisfy the `Future` specification contract (handling `cancel`, `isCancelled`, and `isDone` correctly using a volatile boolean flag to prevent duplicate cancellation triggers).
1 parent 06d0cde commit 341e51a

16 files changed

Lines changed: 270 additions & 181 deletions

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.ArrayList;
3939
import java.util.List;
4040
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.Future;
4142
import org.apache.arrow.memory.BufferAllocator;
4243
import org.apache.arrow.memory.RootAllocator;
4344
import org.apache.arrow.vector.FieldVector;
@@ -80,8 +81,8 @@ class BigQueryArrowResultSet extends BigQueryBaseResultSet {
8081
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
8182
private VectorSchemaRoot vectorSchemaRoot;
8283
private VectorLoader vectorLoader;
83-
// producer thread's reference
84-
private final Thread ownedThread;
84+
// producer task's reference
85+
private final Future<?> ownedTask;
8586

8687
private BigQueryArrowResultSet(
8788
Schema schema,
@@ -93,7 +94,7 @@ private BigQueryArrowResultSet(
9394
boolean isNested,
9495
int fromIndex,
9596
int toIndexExclusive,
96-
Thread ownedThread,
97+
Future<?> ownedTask,
9798
BigQuery bigQuery,
9899
Job job)
99100
throws SQLException {
@@ -105,7 +106,7 @@ private BigQueryArrowResultSet(
105106
this.fromIndex = fromIndex;
106107
this.toIndexExclusive = toIndexExclusive;
107108
this.nestedRowIndex = fromIndex - 1;
108-
this.ownedThread = ownedThread;
109+
this.ownedTask = ownedTask;
109110
if (!isNested && arrowSchema != null) {
110111
try {
111112
this.arrowDeserializer = new ArrowDeserializer(arrowSchema);
@@ -127,10 +128,10 @@ static BigQueryArrowResultSet of(
127128
long totalRows,
128129
BigQueryStatement statement,
129130
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
130-
Thread ownedThread,
131+
Future<?> ownedTask,
131132
BigQuery bigQuery)
132133
throws SQLException {
133-
return of(schema, arrowSchema, totalRows, statement, buffer, ownedThread, bigQuery, null);
134+
return of(schema, arrowSchema, totalRows, statement, buffer, ownedTask, bigQuery, null);
134135
}
135136

136137
static BigQueryArrowResultSet of(
@@ -139,7 +140,7 @@ static BigQueryArrowResultSet of(
139140
long totalRows,
140141
BigQueryStatement statement,
141142
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
142-
Thread ownedThread,
143+
Future<?> ownedTask,
143144
BigQuery bigQuery,
144145
Job job)
145146
throws SQLException {
@@ -153,7 +154,7 @@ static BigQueryArrowResultSet of(
153154
false,
154155
-1,
155156
-1,
156-
ownedThread,
157+
ownedTask,
157158
bigQuery,
158159
job);
159160
}
@@ -165,7 +166,7 @@ static BigQueryArrowResultSet of(
165166
this.currentNestedBatch = null;
166167
this.fromIndex = 0;
167168
this.toIndexExclusive = 0;
168-
this.ownedThread = null;
169+
this.ownedTask = null;
169170
this.arrowDeserializer = null;
170171
this.vectorSchemaRoot = null;
171172
this.vectorLoader = null;
@@ -484,9 +485,9 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp
484485
public void close() {
485486
LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this));
486487
this.isClosed = true;
487-
if (ownedThread != null && !ownedThread.isInterrupted()) {
488-
// interrupt the producer thread when result set is closed
489-
ownedThread.interrupt();
488+
if (ownedTask != null) {
489+
// cancel the producer task when result set is closed
490+
ownedTask.cancel(true);
490491
}
491492
super.close();
492493
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,15 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
347347

348348
this.headerProvider = createHeaderProvider();
349349
this.bigQuery = getBigQueryConnection();
350-
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
351-
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
350+
// Fixed thread pool queues tasks to limit concurrent metadata calls and prevent API
351+
// throttling.
352+
this.metadataExecutor =
353+
BigQueryJdbcMdc.newFixedThreadPool(
354+
String.format("BQ-Metadata-%s", connectionId), metadataFetchThreadCount);
355+
// Cached pool executes queries immediately without queueing and reclaims all idle threads
356+
// when inactive, minimizing resources.
357+
this.queryExecutor =
358+
BigQueryJdbcMdc.newCachedThreadPool(String.format("BQ-Query-%s", connectionId));
352359
}
353360
}
354361

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ public ResultSet getProcedures(
945945

946946
Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog);
947947
BigQueryJsonResultSet resultSet =
948-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
948+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
949949

950950
fetcherThread.start();
951951
LOG.info("Started background thread for getProcedures");
@@ -1207,7 +1207,7 @@ public ResultSet getProcedureColumns(
12071207
Thread fetcherThread =
12081208
new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog);
12091209
BigQueryJsonResultSet resultSet =
1210-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
1210+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
12111211

12121212
fetcherThread.start();
12131213
LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog);
@@ -1878,7 +1878,7 @@ public ResultSet getTables(
18781878

18791879
Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog);
18801880
BigQueryJsonResultSet resultSet =
1881-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
1881+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
18821882

18831883
fetcherThread.start();
18841884
LOG.info("Started background thread for getTables");
@@ -2018,7 +2018,8 @@ public ResultSet getCatalogs() {
20182018
populateQueue(catalogRows, queue, schemaFields);
20192019
signalEndOfData(queue, schemaFields);
20202020

2021-
return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]);
2021+
return BigQueryJsonResultSet.of(
2022+
catalogsSchema, catalogRows.size(), queue, null, new Future<?>[0]);
20222023
}
20232024

20242025
Schema defineGetCatalogsSchema() {
@@ -2050,7 +2051,7 @@ public ResultSet getTableTypes() {
20502051
signalEndOfData(queue, tableTypesSchema.getFields());
20512052

20522053
return BigQueryJsonResultSet.of(
2053-
tableTypesSchema, tableTypeRows.size(), queue, null, new Thread[0]);
2054+
tableTypesSchema, tableTypeRows.size(), queue, null, new Future<?>[0]);
20542055
}
20552056

20562057
static Schema defineGetTableTypesSchema() {
@@ -2204,7 +2205,7 @@ public ResultSet getColumns(
22042205

22052206
Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog);
22062207
BigQueryJsonResultSet resultSet =
2207-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
2208+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
22082209

22092210
fetcherThread.start();
22102211
LOG.info("Started background thread for getColumns");
@@ -2719,7 +2720,7 @@ public ResultSet getTypeInfo() {
27192720
populateQueue(typeInfoRows, queue, schemaFields);
27202721
signalEndOfData(queue, schemaFields);
27212722
return BigQueryJsonResultSet.of(
2722-
typeInfoSchema, typeInfoRows.size(), queue, null, new Thread[0]);
2723+
typeInfoSchema, typeInfoRows.size(), queue, null, new Future<?>[0]);
27232724
}
27242725

27252726
Schema defineGetTypeInfoSchema() {
@@ -3714,7 +3715,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
37143715

37153716
Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog);
37163717
BigQueryJsonResultSet resultSet =
3717-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
3718+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
37183719

37193720
fetcherThread.start();
37203721
LOG.info("Started background thread for getSchemas");
@@ -3833,7 +3834,7 @@ public ResultSet getClientInfoProperties() {
38333834
signalEndOfData(queue, resultSchemaFields);
38343835
}
38353836
return BigQueryJsonResultSet.of(
3836-
resultSchema, collectedResults.size(), queue, null, new Thread[0]);
3837+
resultSchema, collectedResults.size(), queue, null, new Future<?>[0]);
38373838
}
38383839

38393840
Schema defineGetClientInfoPropertiesSchema() {
@@ -4008,7 +4009,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct
40084009

40094010
Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog);
40104011
BigQueryJsonResultSet resultSet =
4011-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
4012+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
40124013

40134014
fetcherThread.start();
40144015
LOG.info("Started background thread for getFunctions");
@@ -4262,7 +4263,7 @@ public ResultSet getFunctionColumns(
42624263
Thread fetcherThread =
42634264
new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog);
42644265
BigQueryJsonResultSet resultSet =
4265-
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
4266+
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));
42664267

42674268
fetcherThread.start();
42684269
LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog);

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,54 +59,53 @@ static void clear() {
5959
* Creates a new fixed thread pool ExecutorService that automatically propagates MDC connection
6060
* context from the submitting thread to the executing thread.
6161
*/
62-
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
62+
static ExecutorService newFixedThreadPool(
63+
String threadName, int nThreads, ThreadFactory threadFactory) {
6364
MdcThreadPoolExecutor executor =
6465
new MdcThreadPoolExecutor(
66+
threadName,
6567
nThreads,
6668
nThreads,
6769
60L,
6870
TimeUnit.SECONDS,
6971
new LinkedBlockingQueue<>(),
70-
new MdcThreadFactory(threadFactory));
72+
new MdcThreadFactory(threadFactory, threadName));
7173
executor.allowCoreThreadTimeOut(true);
7274
return executor;
7375
}
7476

75-
/**
76-
* Creates a new fixed thread pool ExecutorService that automatically propagates MDC connection
77-
* context from the submitting thread to the executing thread.
78-
*/
79-
static ExecutorService newFixedThreadPool(int nThreads) {
80-
return newFixedThreadPool(nThreads, Executors.defaultThreadFactory());
77+
static ExecutorService newFixedThreadPool(String threadName, int nThreads) {
78+
return newFixedThreadPool(threadName, nThreads, Executors.defaultThreadFactory());
8179
}
8280

8381
/**
8482
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
8583
* context from the submitting thread to the executing thread.
8684
*/
87-
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
85+
static ExecutorService newCachedThreadPool(String threadName, ThreadFactory threadFactory) {
8886
return new MdcThreadPoolExecutor(
87+
threadName,
8988
0,
9089
Integer.MAX_VALUE,
9190
60L,
9291
TimeUnit.SECONDS,
9392
new java.util.concurrent.SynchronousQueue<>(),
94-
new MdcThreadFactory(threadFactory));
93+
new MdcThreadFactory(threadFactory, threadName));
9594
}
9695

97-
/**
98-
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
99-
* context from the submitting thread to the executing thread.
100-
*/
101-
static ExecutorService newCachedThreadPool() {
102-
return newCachedThreadPool(Executors.defaultThreadFactory());
96+
static ExecutorService newCachedThreadPool(String threadName) {
97+
return newCachedThreadPool(threadName, Executors.defaultThreadFactory());
10398
}
10499

105100
private static class MdcThreadFactory implements ThreadFactory {
106101
private final ThreadFactory delegate;
102+
private final String threadName;
103+
private final java.util.concurrent.atomic.AtomicInteger count =
104+
new java.util.concurrent.atomic.AtomicInteger(1);
107105

108-
public MdcThreadFactory(ThreadFactory delegate) {
106+
public MdcThreadFactory(ThreadFactory delegate, String threadName) {
109107
this.delegate = delegate;
108+
this.threadName = threadName;
110109
}
111110

112111
@Override
@@ -119,21 +118,25 @@ public Thread newThread(Runnable r) {
119118
});
120119
if (t != null) {
121120
t.setDaemon(true);
121+
t.setName(threadName + "-" + count.getAndIncrement());
122122
}
123123
return t;
124124
}
125125
}
126126

127127
private static class MdcThreadPoolExecutor extends ThreadPoolExecutor {
128+
private final String poolName;
128129

129130
public MdcThreadPoolExecutor(
131+
String poolName,
130132
int corePoolSize,
131133
int maximumPoolSize,
132134
long keepAliveTime,
133135
TimeUnit unit,
134136
BlockingQueue<Runnable> workQueue,
135137
ThreadFactory threadFactory) {
136138
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
139+
this.poolName = poolName;
137140
}
138141

139142
private final AtomicBoolean warningLogged = new AtomicBoolean(false);
@@ -149,8 +152,8 @@ private void monitorQueueSaturation(int queueSize) {
149152
if (queueSize >= warnThreshold) {
150153
if (warningLogged.compareAndSet(false, true)) {
151154
LOG.warning(
152-
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
153-
maxPoolSize, getActiveCount(), queueSize);
155+
"[%s] Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the metadataFetchThreadCount property.",
156+
poolName, maxPoolSize, getActiveCount(), queueSize);
154157
}
155158
} else if (queueSize <= recoveryThreshold) {
156159
if (warningLogged.get()) {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
142142
Pattern.CASE_INSENSITIVE);
143143
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
144144
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;
145+
145146
static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
146147
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
147148
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";

0 commit comments

Comments
 (0)