Skip to content

Commit 67fa7ea

Browse files
committed
address comments
1 parent 6456ddf commit 67fa7ea

9 files changed

Lines changed: 204 additions & 54 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp
485485
public void close() {
486486
LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this));
487487
this.isClosed = true;
488-
if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) {
488+
if (ownedTask != null) {
489489
// cancel the producer task when result set is closed
490490
ownedTask.cancel(true);
491491
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
348348
this.headerProvider = createHeaderProvider();
349349
this.bigQuery = getBigQueryConnection();
350350
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
351-
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
351+
// Use a bounded cached thread pool to prevent unbounded thread creation (and OOMs)
352+
// under heavy load, while ensuring a limit (e.g., 100) high enough to prevent deadlocks
353+
// between interdependent producer/consumer tasks (like nextPageWorker and
354+
// populateBufferWorker).
355+
this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(100);
352356
}
353357
}
354358

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.concurrent.Future;
7373
import java.util.concurrent.LinkedBlockingQueue;
7474
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.TimeoutException;
7576
import java.util.concurrent.atomic.AtomicReference;
7677
import java.util.function.Function;
7778
import java.util.function.Supplier;
@@ -5269,7 +5270,7 @@ private void loadDriverVersionProperties() {
52695270
// TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures.
52705271
// This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService
52715272
// directly.
5272-
private static Future<?>[] wrapThread(final Thread thread) {
5273+
static Future<?>[] wrapThread(final Thread thread) {
52735274
if (thread == null) {
52745275
return null;
52755276
}
@@ -5279,7 +5280,7 @@ private static Future<?>[] wrapThread(final Thread thread) {
52795280

52805281
@Override
52815282
public boolean cancel(boolean mayInterruptIfRunning) {
5282-
if (cancelled || !thread.isAlive()) {
5283+
if (cancelled || thread.getState() == Thread.State.TERMINATED) {
52835284
return false;
52845285
}
52855286
cancelled = true;
@@ -5296,25 +5297,33 @@ public boolean isCancelled() {
52965297

52975298
@Override
52985299
public boolean isDone() {
5299-
return cancelled || !thread.isAlive();
5300+
return cancelled || thread.getState() == Thread.State.TERMINATED;
53005301
}
53015302

53025303
@Override
5303-
public Object get() {
5304-
try {
5305-
thread.join();
5306-
} catch (InterruptedException e) {
5307-
Thread.currentThread().interrupt();
5304+
public Object get() throws InterruptedException, CancellationException {
5305+
if (isCancelled()) {
5306+
throw new CancellationException();
5307+
}
5308+
thread.join();
5309+
if (isCancelled()) {
5310+
throw new CancellationException();
53085311
}
53095312
return null;
53105313
}
53115314

53125315
@Override
5313-
public Object get(long timeout, TimeUnit unit) {
5314-
try {
5315-
unit.timedJoin(thread, timeout);
5316-
} catch (InterruptedException e) {
5317-
Thread.currentThread().interrupt();
5316+
public Object get(long timeout, TimeUnit unit)
5317+
throws InterruptedException, CancellationException, TimeoutException {
5318+
if (isCancelled()) {
5319+
throw new CancellationException();
5320+
}
5321+
unit.timedJoin(thread, timeout);
5322+
if (isCancelled()) {
5323+
throw new CancellationException();
5324+
}
5325+
if (thread.isAlive()) {
5326+
throw new TimeoutException();
53185327
}
53195328
return null;
53205329
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,25 +81,25 @@ static ExecutorService newFixedThreadPool(int nThreads) {
8181
}
8282

8383
/**
84-
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
85-
* context from the submitting thread to the executing thread.
84+
* Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC
85+
* connection context from the submitting thread to the executing thread.
8686
*/
87-
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
87+
static ExecutorService newBoundedCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
8888
return new MdcThreadPoolExecutor(
8989
0,
90-
Integer.MAX_VALUE,
90+
maxThreads,
9191
60L,
9292
TimeUnit.SECONDS,
9393
new java.util.concurrent.SynchronousQueue<>(),
9494
new MdcThreadFactory(threadFactory));
9595
}
9696

9797
/**
98-
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
99-
* context from the submitting thread to the executing thread.
98+
* Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC
99+
* connection context from the submitting thread to the executing thread.
100100
*/
101-
static ExecutorService newCachedThreadPool() {
102-
return newCachedThreadPool(Executors.defaultThreadFactory());
101+
static ExecutorService newBoundedCachedThreadPool(int maxThreads) {
102+
return newBoundedCachedThreadPool(maxThreads, Executors.defaultThreadFactory());
103103
}
104104

105105
private static class MdcThreadFactory implements ThreadFactory {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public void close() {
294294
this.isClosed = true;
295295
if (ownedTasks != null) {
296296
for (Future<?> ownedTask : ownedTasks) {
297-
if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) {
297+
if (ownedTask != null) {
298298
ownedTask.cancel(true);
299299
}
300300
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public ArrowResultSetFinalizer(
4141
// Free resources. Remove all the hard refs
4242
public void finalizeResources() {
4343
LOG.finestTrace("finalizeResources");
44-
if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) {
44+
if (ownedTask != null) {
4545
ownedTask.cancel(true);
4646
}
4747
}
@@ -64,7 +64,7 @@ public void finalizeResources() {
6464
LOG.finestTrace("finalizeResources");
6565
if (ownedTasks != null) {
6666
for (Future<?> ownedTask : ownedTasks) {
67-
if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) {
67+
if (ownedTask != null) {
6868
ownedTask.cancel(true);
6969
}
7070
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.concurrent.ExecutorService;
7979
import java.util.concurrent.Future;
8080
import java.util.concurrent.LinkedBlockingDeque;
81+
import java.util.concurrent.RejectedExecutionException;
8182
import java.util.concurrent.ThreadFactory;
8283
import java.util.logging.Level;
8384

@@ -853,7 +854,12 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio
853854
arrowResultSet.setQueryId(results.getQueryId());
854855
return arrowResultSet;
855856

856-
} catch (Exception ex) {
857+
} catch (Exception | OutOfMemoryError ex) {
858+
if (ex instanceof OutOfMemoryError || ex instanceof RejectedExecutionException) {
859+
throw new BigQueryJdbcException(
860+
"Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.",
861+
ex);
862+
}
857863
throw new BigQueryJdbcException(ex.getMessage(), ex);
858864
}
859865
}
@@ -1034,7 +1040,7 @@ private boolean meetsReadRatio(TableResult results) {
10341040
return totalRows / pageSize > querySettings.getHighThroughputActivationRatio();
10351041
}
10361042

1037-
BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) {
1043+
BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws SQLException {
10381044
List<Future<?>> taskList = new ArrayList<>();
10391045

10401046
Schema schema = results.getSchema();
@@ -1044,32 +1050,38 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) {
10441050
new LinkedBlockingDeque<>(getPageCacheSize(getBufferSize(), schema));
10451051

10461052
JobId jobId = results.getJobId();
1047-
if (jobId != null) {
1048-
// Task to make rpc calls to fetch data from the server
1049-
Future<?> nextPageWorker =
1050-
runNextPageTaskAsync(
1051-
results,
1052-
results.getNextPageToken(),
1053-
jobId,
1054-
rpcResponseQueue,
1055-
this.bigQueryFieldValueListWrapperBlockingQueue);
1056-
taskList.add(nextPageWorker);
1057-
} else {
1058-
try {
1059-
populateFirstPage(results, rpcResponseQueue);
1060-
rpcResponseQueue.put(Tuple.of(null, false));
1061-
} catch (InterruptedException e) {
1062-
LOG.warning(
1063-
"%s Interrupted @ processJsonQueryResponseResults: %s",
1064-
Thread.currentThread().getName(), e.getMessage());
1053+
try {
1054+
if (jobId != null) {
1055+
// Task to make rpc calls to fetch data from the server
1056+
Future<?> nextPageWorker =
1057+
runNextPageTaskAsync(
1058+
results,
1059+
results.getNextPageToken(),
1060+
jobId,
1061+
rpcResponseQueue,
1062+
this.bigQueryFieldValueListWrapperBlockingQueue);
1063+
taskList.add(nextPageWorker);
1064+
} else {
1065+
try {
1066+
populateFirstPage(results, rpcResponseQueue);
1067+
rpcResponseQueue.put(Tuple.of(null, false));
1068+
} catch (InterruptedException e) {
1069+
LOG.warning(
1070+
"%s Interrupted @ processJsonQueryResponseResults: %s",
1071+
Thread.currentThread().getName(), e.getMessage());
1072+
}
10651073
}
1066-
}
10671074

1068-
// Task to parse data received from the server to client library objects
1069-
Future<?> populateBufferWorker =
1070-
parseAndPopulateRpcDataAsync(
1071-
schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue);
1072-
taskList.add(populateBufferWorker);
1075+
// Task to parse data received from the server to client library objects
1076+
Future<?> populateBufferWorker =
1077+
parseAndPopulateRpcDataAsync(
1078+
schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue);
1079+
taskList.add(populateBufferWorker);
1080+
} catch (RejectedExecutionException | OutOfMemoryError e) {
1081+
throw new BigQueryJdbcException(
1082+
"Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.",
1083+
e);
1084+
}
10731085

10741086
Future<?>[] jsonWorkers = taskList.toArray(new Future<?>[0]);
10751087

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@
4444
import java.sql.Types;
4545
import java.util.*;
4646
import java.util.concurrent.Callable;
47+
import java.util.concurrent.CancellationException;
48+
import java.util.concurrent.CountDownLatch;
4749
import java.util.concurrent.ExecutionException;
4850
import java.util.concurrent.ExecutorService;
4951
import java.util.concurrent.Future;
52+
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.TimeoutException;
5054
import java.util.regex.Pattern;
5155
import org.junit.jupiter.api.BeforeEach;
5256
import org.junit.jupiter.api.Test;
@@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy
33083312
assertEquals(
33093313
metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type);
33103314
}
3315+
3316+
@Test
3317+
public void testWrapThread_NullThread() {
3318+
assertNull(BigQueryDatabaseMetaData.wrapThread(null));
3319+
}
3320+
3321+
@Test
3322+
public void testWrapThread_BasicLifecycle() throws Exception {
3323+
CountDownLatch startLatch = new CountDownLatch(1);
3324+
CountDownLatch finishLatch = new CountDownLatch(1);
3325+
Thread t =
3326+
new Thread(
3327+
() -> {
3328+
try {
3329+
startLatch.countDown();
3330+
finishLatch.await();
3331+
} catch (InterruptedException e) {
3332+
// ignore
3333+
}
3334+
});
3335+
3336+
Future<?>[] futures = BigQueryDatabaseMetaData.wrapThread(t);
3337+
assertNotNull(futures);
3338+
assertEquals(1, futures.length);
3339+
Future<?> f = futures[0];
3340+
3341+
// Thread is NEW (not started yet).
3342+
assertFalse(f.isDone());
3343+
assertFalse(f.isCancelled());
3344+
3345+
t.start();
3346+
startLatch.await();
3347+
3348+
// Thread is running.
3349+
assertFalse(f.isDone());
3350+
assertFalse(f.isCancelled());
3351+
3352+
finishLatch.countDown();
3353+
t.join();
3354+
3355+
// Thread is terminated.
3356+
assertTrue(f.isDone());
3357+
assertFalse(f.isCancelled());
3358+
assertNull(f.get());
3359+
}
3360+
3361+
@Test
3362+
public void testWrapThread_CancelBeforeStart() throws Exception {
3363+
Thread t =
3364+
new Thread(
3365+
() -> {
3366+
try {
3367+
Thread.sleep(1000);
3368+
} catch (InterruptedException e) {
3369+
// ignore
3370+
}
3371+
});
3372+
3373+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3374+
assertTrue(f.cancel(true));
3375+
assertTrue(f.isCancelled());
3376+
assertTrue(f.isDone());
3377+
3378+
// cancel on already cancelled should return false
3379+
assertFalse(f.cancel(true));
3380+
3381+
assertThrows(CancellationException.class, () -> f.get());
3382+
assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS));
3383+
}
3384+
3385+
@Test
3386+
public void testWrapThread_CancelRunningWithInterrupt() throws Exception {
3387+
CountDownLatch startLatch = new CountDownLatch(1);
3388+
CountDownLatch interruptedLatch = new CountDownLatch(1);
3389+
Thread t =
3390+
new Thread(
3391+
() -> {
3392+
startLatch.countDown();
3393+
try {
3394+
Thread.sleep(10000);
3395+
} catch (InterruptedException e) {
3396+
interruptedLatch.countDown();
3397+
}
3398+
});
3399+
3400+
t.start();
3401+
startLatch.await();
3402+
3403+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3404+
assertTrue(f.cancel(true));
3405+
assertTrue(f.isCancelled());
3406+
assertTrue(f.isDone());
3407+
3408+
assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS));
3409+
assertThrows(CancellationException.class, () -> f.get());
3410+
}
3411+
3412+
@Test
3413+
public void testWrapThread_GetTimeout() throws Exception {
3414+
CountDownLatch startLatch = new CountDownLatch(1);
3415+
Thread t =
3416+
new Thread(
3417+
() -> {
3418+
startLatch.countDown();
3419+
try {
3420+
Thread.sleep(10000);
3421+
} catch (InterruptedException e) {
3422+
// ignore
3423+
}
3424+
});
3425+
3426+
t.start();
3427+
startLatch.await();
3428+
3429+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3430+
assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS));
3431+
3432+
// Cleanup: stop the thread
3433+
t.interrupt();
3434+
t.join();
3435+
}
33113436
}

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,9 @@ public void testConnectionScopedExecutorLifecycle() throws Exception {
320320
assertTrue(metadataExec2 instanceof ThreadPoolExecutor);
321321

322322
assertEquals(0, ((ThreadPoolExecutor) exec1).getCorePoolSize());
323-
assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec1).getMaximumPoolSize());
323+
assertEquals(100, ((ThreadPoolExecutor) exec1).getMaximumPoolSize());
324324
assertEquals(0, ((ThreadPoolExecutor) exec2).getCorePoolSize());
325-
assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec2).getMaximumPoolSize());
325+
assertEquals(100, ((ThreadPoolExecutor) exec2).getMaximumPoolSize());
326326
assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getCorePoolSize());
327327
assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getMaximumPoolSize());
328328
assertTrue(((ThreadPoolExecutor) metadataExec1).allowsCoreThreadTimeOut());

0 commit comments

Comments
 (0)