Skip to content

Commit 5e29b83

Browse files
committed
add queryExceutor connection property
1 parent bf2aba5 commit 5e29b83

5 files changed

Lines changed: 96 additions & 7 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
174174
boolean useQueryCache;
175175
String queryDialect;
176176
int metadataFetchThreadCount;
177+
int queryProcessThreadCount;
177178
boolean allowLargeResults;
178179
String destinationTable;
179180
String destinationDataset;
@@ -340,19 +341,20 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
340341
this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset();
341342
this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope();
342343
this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount();
344+
this.queryProcessThreadCount = ds.getQueryProcessThreadCount();
343345
this.requestReason = ds.getRequestReason();
344346
this.connectionPoolSize = ds.getConnectionPoolSize();
345347
this.listenerPoolSize = ds.getListenerPoolSize();
346348
this.partnerToken = ds.getPartnerToken();
347349

348350
this.headerProvider = createHeaderProvider();
349351
this.bigQuery = getBigQueryConnection();
352+
// Fixed thread pool limits concurrent calls to prevent BigQuery metadata API throttling
353+
// (independent tasks).
350354
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
351-
// Use a bounded cached thread pool to prevent unbounded thread creation (and OOMs)
352-
// under heavy load, while ensuring a limit (e.g., 100) high enough to prevent deadlocks
353-
// between interdependent producer/consumer tasks (like nextPageWorker and
354-
// populateBufferWorker).
355-
this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(100);
355+
// Cached thread pool prevents deadlocks between interdependent producer/consumer query
356+
// execution tasks.
357+
this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(queryProcessThreadCount);
356358
}
357359
}
358360

@@ -708,6 +710,10 @@ int getMetadataFetchThreadCount() {
708710
return this.metadataFetchThreadCount;
709711
}
710712

713+
int getQueryProcessThreadCount() {
714+
return this.queryProcessThreadCount;
715+
}
716+
711717
boolean isEnableWriteAPI() {
712718
return enableWriteAPI;
713719
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ static void clear() {
6262
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
6363
MdcThreadPoolExecutor executor =
6464
new MdcThreadPoolExecutor(
65+
"Metadata Fetch Pool",
6566
nThreads,
6667
nThreads,
6768
60L,
@@ -86,6 +87,7 @@ static ExecutorService newFixedThreadPool(int nThreads) {
8687
*/
8788
static ExecutorService newBoundedCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
8889
return new MdcThreadPoolExecutor(
90+
"Query Executor Pool",
8991
0,
9092
maxThreads,
9193
60L,
@@ -125,15 +127,18 @@ public Thread newThread(Runnable r) {
125127
}
126128

127129
private static class MdcThreadPoolExecutor extends ThreadPoolExecutor {
130+
private final String poolName;
128131

129132
public MdcThreadPoolExecutor(
133+
String poolName,
130134
int corePoolSize,
131135
int maximumPoolSize,
132136
long keepAliveTime,
133137
TimeUnit unit,
134138
BlockingQueue<Runnable> workQueue,
135139
ThreadFactory threadFactory) {
136140
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
141+
this.poolName = poolName;
137142
}
138143

139144
private final AtomicBoolean warningLogged = new AtomicBoolean(false);
@@ -149,8 +154,8 @@ private void monitorQueueSaturation(int queueSize) {
149154
if (queueSize >= warnThreshold) {
150155
if (warningLogged.compareAndSet(false, true)) {
151156
LOG.warning(
152-
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
153-
maxPoolSize, getActiveCount(), queueSize);
157+
"[%s] Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the metadataFetchThreadCount property.",
158+
poolName, maxPoolSize, getActiveCount(), queueSize);
154159
}
155160
} else if (queueSize <= recoveryThreshold) {
156161
if (warningLogged.get()) {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
142142
Pattern.CASE_INSENSITIVE);
143143
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
144144
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;
145+
static final String QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME = "QueryProcessThreadCount";
146+
static final int DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE = 100;
145147
static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
146148
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
147149
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";
@@ -540,6 +542,12 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
540542
"The number of threads used to call a DatabaseMetaData method.")
541543
.setDefaultValue(String.valueOf(DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE))
542544
.build(),
545+
BigQueryConnectionProperty.newBuilder()
546+
.setName(QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME)
547+
.setDescription(
548+
"The maximum number of threads used to process query results concurrently.")
549+
.setDefaultValue(String.valueOf(DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE))
550+
.build(),
543551
BigQueryConnectionProperty.newBuilder()
544552
.setName(ENABLE_WRITE_API_PROPERTY_NAME)
545553
.setDescription(

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class DataSource implements javax.sql.DataSource {
8686
private Boolean filterTablesOnDefaultDataset;
8787
private Integer requestGoogleDriveScope;
8888
private Integer metadataFetchThreadCount;
89+
private Integer queryProcessThreadCount;
8990
private String sslTrustStorePath;
9091
private String sslTrustStorePassword;
9192
private Map<String, String> labels;
@@ -248,6 +249,9 @@ public class DataSource implements javax.sql.DataSource {
248249
.put(
249250
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
250251
(ds, val) -> ds.setMetadataFetchThreadCount(Integer.parseInt(val)))
252+
.put(
253+
BigQueryJdbcUrlUtility.QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME,
254+
(ds, val) -> ds.setQueryProcessThreadCount(Integer.parseInt(val)))
251255
.put(
252256
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
253257
DataSource::setSSLTrustStorePath)
@@ -565,6 +569,11 @@ Properties createProperties() {
565569
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
566570
String.valueOf(this.metadataFetchThreadCount));
567571
}
572+
if (this.queryProcessThreadCount != null) {
573+
connectionProperties.setProperty(
574+
BigQueryJdbcUrlUtility.QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME,
575+
String.valueOf(this.queryProcessThreadCount));
576+
}
568577
if (this.sslTrustStorePath != null) {
569578
connectionProperties.setProperty(
570579
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
@@ -1085,6 +1094,24 @@ public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
10851094
this.metadataFetchThreadCount = metadataFetchThreadCount;
10861095
}
10871096

1097+
public Integer getQueryProcessThreadCount() {
1098+
return queryProcessThreadCount != null
1099+
? queryProcessThreadCount
1100+
: BigQueryJdbcUrlUtility.DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE;
1101+
}
1102+
1103+
public void setQueryProcessThreadCount(Integer queryProcessThreadCount) {
1104+
if (queryProcessThreadCount != null) {
1105+
// Must be at least 4 to avoid thread starvation deadlocks between interdependent
1106+
// producer-consumer tasks (like nextPageWorker and populateBufferWorker).
1107+
validateMin(
1108+
queryProcessThreadCount,
1109+
4,
1110+
BigQueryJdbcUrlUtility.QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME);
1111+
}
1112+
this.queryProcessThreadCount = queryProcessThreadCount;
1113+
}
1114+
10881115
public String getSSLTrustStorePath() {
10891116
return sslTrustStorePath;
10901117
}

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/DataSourceTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,47 @@ public void testMetadataFetchThreadCountValidation() {
8383
.contains(
8484
"Invalid value for MetaDataFetchThreadCount. It must be greater than or equal to 1"));
8585
}
86+
87+
@Test
88+
public void testQueryProcessThreadCountValidation() {
89+
DataSource dataSource = new DataSource();
90+
91+
// Setting values >= 4 should succeed
92+
dataSource.setQueryProcessThreadCount(4);
93+
assertEquals(4, dataSource.getQueryProcessThreadCount());
94+
95+
dataSource.setQueryProcessThreadCount(10);
96+
assertEquals(10, dataSource.getQueryProcessThreadCount());
97+
98+
dataSource.setQueryProcessThreadCount(null); // Should fallback to default
99+
assertEquals(
100+
BigQueryJdbcUrlUtility.DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE,
101+
dataSource.getQueryProcessThreadCount());
102+
103+
// Setting value < 4 (e.g., 3, 0, -5) should throw BigQueryJdbcRuntimeException
104+
BigQueryJdbcRuntimeException ex3 =
105+
assertThrows(
106+
BigQueryJdbcRuntimeException.class, () -> dataSource.setQueryProcessThreadCount(3));
107+
assertTrue(
108+
ex3.getMessage()
109+
.contains(
110+
"Invalid value for QueryProcessThreadCount. It must be greater than or equal to 4"));
111+
112+
BigQueryJdbcRuntimeException ex0 =
113+
assertThrows(
114+
BigQueryJdbcRuntimeException.class, () -> dataSource.setQueryProcessThreadCount(0));
115+
assertTrue(
116+
ex0.getMessage()
117+
.contains(
118+
"Invalid value for QueryProcessThreadCount. It must be greater than or equal to 4"));
119+
120+
BigQueryJdbcRuntimeException exNeg =
121+
assertThrows(
122+
BigQueryJdbcRuntimeException.class, () -> dataSource.setQueryProcessThreadCount(-5));
123+
assertTrue(
124+
exNeg
125+
.getMessage()
126+
.contains(
127+
"Invalid value for QueryProcessThreadCount. It must be greater than or equal to 4"));
128+
}
86129
}

0 commit comments

Comments
 (0)