Skip to content

Commit cfa40c3

Browse files
committed
set maxThreadPoolSize instead of coreThreadPoolSize
1 parent 95ab1dd commit cfa40c3

4 files changed

Lines changed: 97 additions & 20 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,16 @@ static void clear() {
6060
* context from the submitting thread to the executing thread.
6161
*/
6262
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
63-
return new MdcThreadPoolExecutor(
64-
nThreads,
65-
nThreads,
66-
0L,
67-
TimeUnit.MILLISECONDS,
68-
new LinkedBlockingQueue<>(),
69-
new MdcThreadFactory(threadFactory));
63+
MdcThreadPoolExecutor executor =
64+
new MdcThreadPoolExecutor(
65+
nThreads,
66+
nThreads,
67+
60L,
68+
TimeUnit.SECONDS,
69+
new LinkedBlockingQueue<>(),
70+
new MdcThreadFactory(threadFactory));
71+
executor.allowCoreThreadTimeOut(true);
72+
return executor;
7073
}
7174

7275
/**
@@ -136,18 +139,18 @@ public MdcThreadPoolExecutor(
136139
private final AtomicBoolean warningLogged = new AtomicBoolean(false);
137140

138141
private void monitorQueueSaturation(int queueSize) {
139-
int corePoolSize = getCorePoolSize();
140-
// Warn when queue size is >= corePoolSize * 5, with a minimum of 10 tasks to avoid false
142+
int maxPoolSize = getMaximumPoolSize();
143+
// Warn when queue size is >= maxPoolSize * 5, with a minimum of 10 tasks to avoid false
141144
// alerts for tiny pools
142-
int warnThreshold = Math.max(10, corePoolSize * 5);
143-
// Recovery reset threshold is corePoolSize * 2, with a minimum of 4 tasks
144-
int recoveryThreshold = Math.max(4, corePoolSize * 2);
145+
int warnThreshold = Math.max(10, maxPoolSize * 5);
146+
// Recovery reset threshold is maxPoolSize * 2, with a minimum of 4 tasks
147+
int recoveryThreshold = Math.max(4, maxPoolSize * 2);
145148

146149
if (queueSize >= warnThreshold) {
147150
if (warningLogged.compareAndSet(false, true)) {
148151
LOG.warning(
149-
"Thread pool is saturating. Core pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
150-
corePoolSize, getActiveCount(), queueSize);
152+
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
153+
maxPoolSize, getActiveCount(), queueSize);
151154
}
152155
} else if (queueSize <= recoveryThreshold) {
153156
if (warningLogged.get()) {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,8 +1077,9 @@ public Integer getMetadataFetchThreadCount() {
10771077

10781078
public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
10791079
if (metadataFetchThreadCount != null) {
1080-
validateNonNegative(
1080+
validateMin(
10811081
metadataFetchThreadCount,
1082+
1,
10821083
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME);
10831084
}
10841085
this.metadataFetchThreadCount = metadataFetchThreadCount;
@@ -1388,11 +1389,7 @@ private static void validateNonNegative(long val, String propertyName) {
13881389
}
13891390
}
13901391

1391-
/**
1392-
* Validates that a property value is greater than or equal to a minimum threshold. For thread
1393-
* pools, a minimum of 2 is enforced to ensure there are enough threads to handle concurrent
1394-
* coordination and avoid deadlock or thread starvation.
1395-
*/
1392+
/** Validates that a property value is greater than or equal to a minimum threshold. */
13961393
private static void validateMin(long val, long min, String propertyName) {
13971394
if (val < min) {
13981395
throw new BigQueryJdbcRuntimeException(

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,44 @@ public void testPoolThreadInheritanceSevered() throws Exception {
253253
}
254254
}
255255

256+
@Test
257+
public void testNewFixedThreadPoolTimeout() {
258+
ExecutorService exec2 = BigQueryJdbcMdc.newFixedThreadPool(2);
259+
ExecutorService exec3 = BigQueryJdbcMdc.newFixedThreadPool(3);
260+
ExecutorService exec4 = BigQueryJdbcMdc.newFixedThreadPool(4);
261+
ExecutorService exec5 = BigQueryJdbcMdc.newFixedThreadPool(5);
262+
ExecutorService exec10 = BigQueryJdbcMdc.newFixedThreadPool(10);
263+
264+
try {
265+
assertEquals(2, ((ThreadPoolExecutor) exec2).getCorePoolSize());
266+
assertEquals(2, ((ThreadPoolExecutor) exec2).getMaximumPoolSize());
267+
assertTrue(((ThreadPoolExecutor) exec2).allowsCoreThreadTimeOut());
268+
assertEquals(60L, ((ThreadPoolExecutor) exec2).getKeepAliveTime(TimeUnit.SECONDS));
269+
270+
assertEquals(3, ((ThreadPoolExecutor) exec3).getCorePoolSize());
271+
assertEquals(3, ((ThreadPoolExecutor) exec3).getMaximumPoolSize());
272+
assertTrue(((ThreadPoolExecutor) exec3).allowsCoreThreadTimeOut());
273+
274+
assertEquals(4, ((ThreadPoolExecutor) exec4).getCorePoolSize());
275+
assertEquals(4, ((ThreadPoolExecutor) exec4).getMaximumPoolSize());
276+
assertTrue(((ThreadPoolExecutor) exec4).allowsCoreThreadTimeOut());
277+
278+
assertEquals(5, ((ThreadPoolExecutor) exec5).getCorePoolSize());
279+
assertEquals(5, ((ThreadPoolExecutor) exec5).getMaximumPoolSize());
280+
assertTrue(((ThreadPoolExecutor) exec5).allowsCoreThreadTimeOut());
281+
282+
assertEquals(10, ((ThreadPoolExecutor) exec10).getCorePoolSize());
283+
assertEquals(10, ((ThreadPoolExecutor) exec10).getMaximumPoolSize());
284+
assertTrue(((ThreadPoolExecutor) exec10).allowsCoreThreadTimeOut());
285+
} finally {
286+
exec2.shutdownNow();
287+
exec3.shutdownNow();
288+
exec4.shutdownNow();
289+
exec5.shutdownNow();
290+
exec10.shutdownNow();
291+
}
292+
}
293+
256294
@Test
257295
public void testConnectionScopedExecutorLifecycle() throws Exception {
258296
String url1 =
@@ -287,8 +325,10 @@ public void testConnectionScopedExecutorLifecycle() throws Exception {
287325
assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec2).getMaximumPoolSize());
288326
assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getCorePoolSize());
289327
assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getMaximumPoolSize());
328+
assertTrue(((ThreadPoolExecutor) metadataExec1).allowsCoreThreadTimeOut());
290329
assertEquals(10, ((ThreadPoolExecutor) metadataExec2).getCorePoolSize());
291330
assertEquals(10, ((ThreadPoolExecutor) metadataExec2).getMaximumPoolSize());
331+
assertTrue(((ThreadPoolExecutor) metadataExec2).allowsCoreThreadTimeOut());
292332

293333
try (BigQueryJdbcMdc.MdcCloseable mdc =
294334
BigQueryJdbcMdc.registerInstance(conn1.getConnectionId())) {

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/DataSourceTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package com.google.cloud.bigquery.jdbc;
1818

19+
import static org.junit.jupiter.api.Assertions.assertEquals;
1920
import static org.junit.jupiter.api.Assertions.assertFalse;
2021
import static org.junit.jupiter.api.Assertions.assertSame;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
2223
import static org.junit.jupiter.api.Assertions.assertTrue;
2324

2425
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
26+
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
2527
import java.sql.SQLException;
2628
import org.junit.jupiter.api.Test;
2729

@@ -46,4 +48,39 @@ public void testWrapperMethods() throws SQLException {
4648
BigQueryJdbcException.class, () -> dataSource.unwrap(java.sql.Connection.class));
4749
assertTrue(e.getMessage().contains("Cannot unwrap to java.sql.Connection"));
4850
}
51+
52+
@Test
53+
public void testMetadataFetchThreadCountValidation() {
54+
DataSource dataSource = new DataSource();
55+
56+
// Setting positive values should succeed
57+
dataSource.setMetadataFetchThreadCount(1);
58+
assertEquals(1, dataSource.getMetadataFetchThreadCount());
59+
60+
dataSource.setMetadataFetchThreadCount(5);
61+
assertEquals(5, dataSource.getMetadataFetchThreadCount());
62+
63+
dataSource.setMetadataFetchThreadCount(null); // Should fallback to default
64+
assertEquals(
65+
BigQueryJdbcUrlUtility.DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE,
66+
dataSource.getMetadataFetchThreadCount());
67+
68+
// Setting 0 or negative should throw BigQueryJdbcRuntimeException
69+
BigQueryJdbcRuntimeException ex0 =
70+
assertThrows(
71+
BigQueryJdbcRuntimeException.class, () -> dataSource.setMetadataFetchThreadCount(0));
72+
assertTrue(
73+
ex0.getMessage()
74+
.contains(
75+
"Invalid value for MetaDataFetchThreadCount. It must be greater than or equal to 1"));
76+
77+
BigQueryJdbcRuntimeException exNeg =
78+
assertThrows(
79+
BigQueryJdbcRuntimeException.class, () -> dataSource.setMetadataFetchThreadCount(-5));
80+
assertTrue(
81+
exNeg
82+
.getMessage()
83+
.contains(
84+
"Invalid value for MetaDataFetchThreadCount. It must be greater than or equal to 1"));
85+
}
4986
}

0 commit comments

Comments
 (0)