From 1b44bfe9194a6e1f36746e95a429c5629a7da608 Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 13:20:55 -0400 Subject: [PATCH 1/7] feat(bigquery-jdbc): migrate statement execution thread tracking to connection-scoped executor --- .../bigquery/jdbc/BigQueryArrowResultSet.java | 25 ++++--- .../jdbc/BigQueryDatabaseMetaData.java | 73 ++++++++++++++++--- .../bigquery/jdbc/BigQueryJsonResultSet.java | 29 ++++---- .../jdbc/BigQueryResultSetFinalizers.java | 25 ++++--- .../bigquery/jdbc/BigQueryStatement.java | 58 +++++++-------- .../jdbc/BigQueryArrowResultSetTest.java | 11 +-- .../jdbc/BigQueryJsonResultSetTest.java | 15 ++-- .../jdbc/BigQueryResultSetFinalizersTest.java | 41 +++-------- .../jdbc/BigQueryResultSetMetadataTest.java | 7 +- .../bigquery/jdbc/BigQueryStatementTest.java | 6 +- .../jdbc/PerConnectionFileHandlerTest.java | 4 +- 11 files changed, 167 insertions(+), 127 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index 64269c7f74be..55282df2ee23 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; @@ -80,8 +81,8 @@ class BigQueryArrowResultSet extends BigQueryBaseResultSet { // Decoder object will be reused to avoid re-allocation and too much garbage collection. private VectorSchemaRoot vectorSchemaRoot; private VectorLoader vectorLoader; - // producer thread's reference - private final Thread ownedThread; + // producer task's reference + private final Future ownedTask; private BigQueryArrowResultSet( Schema schema, @@ -93,7 +94,7 @@ private BigQueryArrowResultSet( boolean isNested, int fromIndex, int toIndexExclusive, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery, Job job) throws SQLException { @@ -105,7 +106,7 @@ private BigQueryArrowResultSet( this.fromIndex = fromIndex; this.toIndexExclusive = toIndexExclusive; this.nestedRowIndex = fromIndex - 1; - this.ownedThread = ownedThread; + this.ownedTask = ownedTask; if (!isNested && arrowSchema != null) { try { this.arrowDeserializer = new ArrowDeserializer(arrowSchema); @@ -127,10 +128,10 @@ static BigQueryArrowResultSet of( long totalRows, BigQueryStatement statement, BlockingQueue buffer, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery) throws SQLException { - return of(schema, arrowSchema, totalRows, statement, buffer, ownedThread, bigQuery, null); + return of(schema, arrowSchema, totalRows, statement, buffer, ownedTask, bigQuery, null); } static BigQueryArrowResultSet of( @@ -139,7 +140,7 @@ static BigQueryArrowResultSet of( long totalRows, BigQueryStatement statement, BlockingQueue buffer, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery, Job job) throws SQLException { @@ -153,7 +154,7 @@ static BigQueryArrowResultSet of( false, -1, -1, - ownedThread, + ownedTask, bigQuery, job); } @@ -165,7 +166,7 @@ static BigQueryArrowResultSet of( this.currentNestedBatch = null; this.fromIndex = 0; this.toIndexExclusive = 0; - this.ownedThread = null; + this.ownedTask = null; this.arrowDeserializer = null; this.vectorSchemaRoot = null; this.vectorLoader = null; @@ -484,9 +485,9 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp public void close() { LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this)); this.isClosed = true; - if (ownedThread != null && !ownedThread.isInterrupted()) { - // interrupt the producer thread when result set is closed - ownedThread.interrupt(); + if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + // cancel the producer task when result set is closed + ownedTask.cancel(true); } super.close(); } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 32ed62d91fd6..4db431ead646 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -944,7 +944,7 @@ public ResultSet getProcedures( Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getProcedures"); @@ -1206,7 +1206,7 @@ public ResultSet getProcedureColumns( Thread fetcherThread = new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog); @@ -1877,7 +1877,7 @@ public ResultSet getTables( Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getTables"); @@ -2017,7 +2017,8 @@ public ResultSet getCatalogs() { populateQueue(catalogRows, queue, schemaFields); signalEndOfData(queue, schemaFields); - return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); + return BigQueryJsonResultSet.of( + catalogsSchema, catalogRows.size(), queue, null, new Future[0]); } Schema defineGetCatalogsSchema() { @@ -2049,7 +2050,7 @@ public ResultSet getTableTypes() { signalEndOfData(queue, tableTypesSchema.getFields()); return BigQueryJsonResultSet.of( - tableTypesSchema, tableTypeRows.size(), queue, null, new Thread[0]); + tableTypesSchema, tableTypeRows.size(), queue, null, new Future[0]); } static Schema defineGetTableTypesSchema() { @@ -2203,7 +2204,7 @@ public ResultSet getColumns( Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getColumns"); @@ -2718,7 +2719,7 @@ public ResultSet getTypeInfo() { populateQueue(typeInfoRows, queue, schemaFields); signalEndOfData(queue, schemaFields); return BigQueryJsonResultSet.of( - typeInfoSchema, typeInfoRows.size(), queue, null, new Thread[0]); + typeInfoSchema, typeInfoRows.size(), queue, null, new Future[0]); } Schema defineGetTypeInfoSchema() { @@ -3713,7 +3714,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getSchemas"); @@ -3832,7 +3833,7 @@ public ResultSet getClientInfoProperties() { signalEndOfData(queue, resultSchemaFields); } return BigQueryJsonResultSet.of( - resultSchema, collectedResults.size(), queue, null, new Thread[0]); + resultSchema, collectedResults.size(), queue, null, new Future[0]); } Schema defineGetClientInfoPropertiesSchema() { @@ -4007,7 +4008,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getFunctions"); @@ -4261,7 +4262,7 @@ public ResultSet getFunctionColumns( Thread fetcherThread = new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog); @@ -5264,4 +5265,54 @@ private void loadDriverVersionProperties() { throw ex; } } + + // TODO(developer): This is a temporary compatibility bridge to wrap raw Threads into Futures. + // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService + // directly. + private static Future[] wrapThread(final Thread thread) { + if (thread == null) { + return null; + } + return new Future[] { + new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + thread.interrupt(); + } + return true; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return !thread.isAlive(); + } + + @Override + public Object get() { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) { + try { + unit.timedJoin(thread, timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + } + } + }; + } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index eeb4baf2d03e..08ef917cec4f 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -29,6 +29,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; /** {@link ResultSet} Implementation for JSON datasource (Using REST APIs) */ class BigQueryJsonResultSet extends BigQueryBaseResultSet { @@ -43,7 +44,7 @@ class BigQueryJsonResultSet extends BigQueryBaseResultSet { private boolean afterLast = false; private final int fromIndex; private final int toIndexExclusive; - private final Thread[] ownedThreads; + private final Future[] ownedTasks; private BigQueryJsonResultSet( Schema schema, @@ -54,7 +55,7 @@ private BigQueryJsonResultSet( BigQueryFieldValueListWrapper cursor, int fromIndex, int toIndexExclusive, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery, Job job) { super(bigQuery, statement, schema, isNested, job); @@ -64,7 +65,7 @@ private BigQueryJsonResultSet( this.fromIndex = fromIndex; this.toIndexExclusive = toIndexExclusive; this.nestedRowIndex = fromIndex - 1; - this.ownedThreads = ownedThreads; + this.ownedTasks = ownedTasks; } /** @@ -78,10 +79,10 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery) { - return of(schema, totalRows, buffer, statement, ownedThreads, bigQuery, null); + return of(schema, totalRows, buffer, statement, ownedTasks, bigQuery, null); } static BigQueryJsonResultSet of( @@ -89,12 +90,12 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery, Job job) { return new BigQueryJsonResultSet( - schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, bigQuery, job); + schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, bigQuery, job); } static BigQueryJsonResultSet of( @@ -102,10 +103,10 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads) { + Future[] ownedTasks) { return new BigQueryJsonResultSet( - schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, null, null); + schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, null, null); } BigQueryJsonResultSet() { @@ -113,7 +114,7 @@ static BigQueryJsonResultSet of( totalRows = 0; buffer = null; fromIndex = 0; - ownedThreads = new Thread[0]; + ownedTasks = new Future[0]; toIndexExclusive = 0; } @@ -291,10 +292,10 @@ private FieldValue getObjectInternal(int columnIndex) throws SQLException { public void close() { LOG.fineTrace("close", () -> String.format("Closing BigqueryJsonResultSet %s.", this)); this.isClosed = true; - if (ownedThreads != null) { - for (Thread ownedThread : ownedThreads) { - if (!ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTasks != null) { + for (Future ownedTask : ownedTasks) { + if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + ownedTask.cancel(true); } } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java index 85a00214376f..0606d39bee90 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java @@ -19,6 +19,7 @@ import com.google.api.core.InternalApi; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; +import java.util.concurrent.Future; @InternalApi class BigQueryResultSetFinalizers { @@ -27,44 +28,44 @@ class BigQueryResultSetFinalizers { @InternalApi static class ArrowResultSetFinalizer extends PhantomReference { - Thread ownedThread; + Future ownedTask; public ArrowResultSetFinalizer( BigQueryArrowResultSet referent, ReferenceQueue q, - Thread ownedThread) { + Future ownedTask) { super(referent, q); - this.ownedThread = ownedThread; + this.ownedTask = ownedTask; } // Free resources. Remove all the hard refs public void finalizeResources() { LOG.finestTrace("finalizeResources"); - if (ownedThread != null && !ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + ownedTask.cancel(true); } } } @InternalApi static class JsonResultSetFinalizer extends PhantomReference { - Thread[] ownedThreads; + Future[] ownedTasks; public JsonResultSetFinalizer( BigQueryJsonResultSet referent, ReferenceQueue q, - Thread[] ownedThreads) { + Future[] ownedTasks) { super(referent, q); - this.ownedThreads = ownedThreads; + this.ownedTasks = ownedTasks; } // Free resources. Remove all the hard refs public void finalizeResources() { LOG.finestTrace("finalizeResources"); - if (ownedThreads != null) { - for (Thread ownedThread : ownedThreads) { - if (!ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTasks != null) { + for (Future ownedTask : ownedTasks) { + if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + ownedTask.cancel(true); } } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 0d4d94175b8d..227118ec9310 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -76,7 +76,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; @@ -90,10 +90,6 @@ */ public class BigQueryStatement extends BigQueryNoOpsStatement { - // TODO (obada): Update this after benchmarking - private static final int MAX_PROCESS_QUERY_THREADS_CNT = 50; - protected static ExecutorService queryTaskExecutor = - Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); public static final int DEFAULT_BUFFER_SIZE = BigQuerySettings.DEFAULT_NUM_BUFFERED_ROWS * 2; private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; @@ -836,9 +832,9 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio ReadSession readSession = getReadSession(builder.build()); this.arrowBatchWrapperBlockingQueue = new LinkedBlockingDeque<>(getBufferSize()); // deserialize and populate the buffer async, so that the client isn't blocked - Thread populateBufferWorker = + Future populateBufferWorker = populateArrowBufferedQueue( - readSession, this.arrowBatchWrapperBlockingQueue, this.bigQueryReadClient); + readSession, this.arrowBatchWrapperBlockingQueue, getBigQueryReadClient()); BigQueryArrowResultSet arrowResultSet = BigQueryArrowResultSet.of( @@ -864,12 +860,14 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio /** Asynchronously reads results and populates an arrow record queue */ @InternalApi - Thread populateArrowBufferedQueue( + Future populateArrowBufferedQueue( ReadSession readSession, BlockingQueue arrowBatchWrapperBlockingQueue, BigQueryReadClient bqReadClient) { LOG.finer("++enter++"); + ExecutorService executor = connection.getExecutorService(); + Runnable arrowStreamProcessor = () -> { long rowsRead = 0; @@ -890,7 +888,7 @@ Thread populateArrowBufferedQueue( com.google.api.gax.rpc.ServerStream stream = bqReadClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { break; } @@ -952,9 +950,7 @@ Thread populateArrowBufferedQueue( } }; - Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); - populateBufferWorker.start(); - return populateBufferWorker; + return executor.submit(arrowStreamProcessor); } /** Executes SQL query using either fast query path or read API */ @@ -1039,7 +1035,7 @@ private boolean meetsReadRatio(TableResult results) { } BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { - List threadList = new ArrayList(); + List> taskList = new ArrayList<>(); Schema schema = results.getSchema(); long totalRows = (getMaxRows() > 0) ? getMaxRows() : results.getTotalRows(); @@ -1049,15 +1045,15 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { JobId jobId = results.getJobId(); if (jobId != null) { - // Thread to make rpc calls to fetch data from the server - Thread nextPageWorker = + // Task to make rpc calls to fetch data from the server + Future nextPageWorker = runNextPageTaskAsync( results, results.getNextPageToken(), jobId, rpcResponseQueue, this.bigQueryFieldValueListWrapperBlockingQueue); - threadList.add(nextPageWorker); + taskList.add(nextPageWorker); } else { try { populateFirstPage(results, rpcResponseQueue); @@ -1069,13 +1065,13 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { } } - // Thread to parse data received from the server to client library objects - Thread populateBufferWorker = + // Task to parse data received from the server to client library objects + Future populateBufferWorker = parseAndPopulateRpcDataAsync( schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); - threadList.add(populateBufferWorker); + taskList.add(populateBufferWorker); - Thread[] jsonWorkers = threadList.toArray(new Thread[0]); + Future[] jsonWorkers = taskList.toArray(new Future[0]); BigQueryJsonResultSet jsonResultSet = BigQueryJsonResultSet.of( @@ -1118,7 +1114,7 @@ public void setFetchDirection(int direction) throws SQLException { } @VisibleForTesting - Thread runNextPageTaskAsync( + Future runNextPageTaskAsync( TableResult result, String firstPageToken, JobId jobId, @@ -1129,6 +1125,8 @@ Thread runNextPageTaskAsync( // calls populateFirstPage(result, rpcResponseQueue); + ExecutorService executor = connection.getExecutorService(); + // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { @@ -1142,7 +1140,7 @@ Thread runNextPageTaskAsync( try { while (currentPageToken != null) { // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { LOG.warning( "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); break; @@ -1177,9 +1175,7 @@ Thread runNextPageTaskAsync( // have finished processing the records and even that will be interrupted }; - Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask); - nextPageWorker.start(); - return nextPageWorker; + return executor.submit(nextPageTask); } /** @@ -1187,12 +1183,14 @@ Thread runNextPageTaskAsync( * bigQueryFieldValueListWrapperBlockingQueue with FieldValueList */ @VisibleForTesting - Thread parseAndPopulateRpcDataAsync( + Future parseAndPopulateRpcDataAsync( Schema schema, BlockingQueue bigQueryFieldValueListWrapperBlockingQueue, BlockingQueue> rpcResponseQueue) { LOG.finer("++enter++"); + ExecutorService executor = connection.getExecutorService(); + Runnable populateBufferRunnable = () -> { // producer thread populating the buffer try { @@ -1217,7 +1215,7 @@ Thread parseAndPopulateRpcDataAsync( } if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() + || executor.isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) break; @@ -1227,7 +1225,7 @@ Thread parseAndPopulateRpcDataAsync( long results = 0; for (FieldValueList fieldValueList : fieldValueLists) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { // do not process further pages and shutdown (inner loop) break; } @@ -1262,9 +1260,7 @@ Thread parseAndPopulateRpcDataAsync( } }; - Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); - populateBufferWorker.start(); - return populateBufferWorker; + return executor.submit(populateBufferRunnable); } /** diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java index 93eb573f3c35..1ffd05ff4cd6 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.TimeZone; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.stream.Stream; import org.apache.arrow.memory.RootAllocator; @@ -237,10 +238,10 @@ public void setUp() throws SQLException, IOException { ArrowSchema.newBuilder() .setSerializedSchema(serializeSchema(vectorSchemaRoot.getSchema())) .build(); - Thread workerThread = new Thread(); + Future workerTask = mock(Future.class); bigQueryArrowResultSet = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerTask, null); // nested result set data setup JsonStringArrayList jsonStringArrayList = getJsonStringArrayList(); @@ -275,17 +276,17 @@ public void testRowCount() throws SQLException, IOException { ArrowSchema.newBuilder() .setSerializedSchema(serializeSchema(vectorSchemaRoot.getSchema())) .build(); - Thread workerThread = new Thread(); + Future workerTask = mock(Future.class); // ResultSet with 1 row buffer and 1 total rows. BigQueryArrowResultSet bigQueryArrowResultSet2 = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerTask, null); assertThat(resultSetRowCount(bigQueryArrowResultSet2)).isEqualTo(1); // ResultSet with 2 rows buffer and 1 total rows. bigQueryArrowResultSet2 = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, bufferWithTwoRows, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, bufferWithTwoRows, workerTask, null); assertThat(resultSetRowCount(bigQueryArrowResultSet2)).isEqualTo(1); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java index 1e9a3830cd90..b75f8493be80 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java @@ -53,6 +53,7 @@ import java.time.LocalTime; import java.util.TimeZone; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -168,9 +169,9 @@ public void setUp() { statement = mock(BigQueryStatement.class); buffer.add(BigQueryFieldValueListWrapper.of(fieldList, fieldValues)); buffer.add(BigQueryFieldValueListWrapper.of(null, null, true)); // last marker - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); // Buffer with 2 rows. bufferWithTwoRows = new LinkedBlockingDeque<>(3); @@ -196,9 +197,9 @@ public void setUp() { private boolean resetResultSet() throws SQLException { // re-initialises the resultset and moves the cursor to the first row - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); return bigQueryJsonResultSet.next(); // move to the first row } @@ -214,15 +215,15 @@ public void testClose() { @Test public void testRowCount() throws SQLException { - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; // ResultSet with 1 row buffer and 1 total rows. BigQueryJsonResultSet bigQueryJsonResultSet2 = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); assertThat(resultSetRowCount(bigQueryJsonResultSet2)).isEqualTo(1); // ResultSet with 2 rows buffer and 1 total rows. bigQueryJsonResultSet2 = BigQueryJsonResultSet.of( - QUERY_SCHEMA, 1L, bufferWithTwoRows, statementForTwoRows, workerThreads); + QUERY_SCHEMA, 1L, bufferWithTwoRows, statementForTwoRows, workerTasks); assertThat(resultSetRowCount(bigQueryJsonResultSet2)).isEqualTo(1); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java index ee4d6047b931..460e5b68c3c5 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java @@ -16,52 +16,33 @@ package com.google.cloud.bigquery.jdbc; -import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import java.util.concurrent.Future; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class BigQueryResultSetFinalizersTest { - Thread arrowWorker; - Thread[] jsonWorkers; + Future[] jsonWorkers; @BeforeEach public void setUp() { - // create and start the demon threads - arrowWorker = - new Thread( - () -> { - while (true) { - if (Thread.currentThread().isInterrupted()) { - break; - } - } - }); - arrowWorker.setDaemon(true); - Thread jsonWorker = - new Thread( - () -> { - while (true) { - if (Thread.currentThread().isInterrupted()) { - break; - } - } - }); - jsonWorker.setDaemon(true); - jsonWorkers = new Thread[] {jsonWorker}; - arrowWorker.start(); - jsonWorker.start(); + Future mockFuture = mock(Future.class); + jsonWorkers = new Future[] {mockFuture}; } @Test public void testFinalizeResources() { + Future mockFuture = mock(Future.class); BigQueryResultSetFinalizers.ArrowResultSetFinalizer arrowResultSetFinalizer = - new BigQueryResultSetFinalizers.ArrowResultSetFinalizer(null, null, arrowWorker); + new BigQueryResultSetFinalizers.ArrowResultSetFinalizer(null, null, mockFuture); arrowResultSetFinalizer.finalizeResources(); - assertThat(arrowWorker.isInterrupted()).isTrue(); + verify(mockFuture).cancel(true); + BigQueryResultSetFinalizers.JsonResultSetFinalizer jsonResultSetFinalizer = new BigQueryResultSetFinalizers.JsonResultSetFinalizer(null, null, jsonWorkers); jsonResultSetFinalizer.finalizeResources(); - assertThat(jsonWorkers[0].isInterrupted()).isTrue(); + verify(jsonWorkers[0]).cancel(true); } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java index b95bb0e056b5..8261a14dc981 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java @@ -31,6 +31,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.concurrent.Future; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -84,9 +85,9 @@ public class BigQueryResultSetMetadataTest { @BeforeEach public void setUp() throws SQLException { statement = mock(BigQueryStatement.class); - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; BigQueryJsonResultSet bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, null, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, null, statement, workerTasks); // values for nested types resultSetMetaData = bigQueryJsonResultSet.getMetaData(); @@ -290,7 +291,7 @@ public void testIsSearchableForAllTypes(StandardSQLTypeName type) throws SQLExce FieldList schemaFields = FieldList.of(field); BigQueryJsonResultSet resultSet = BigQueryJsonResultSet.of( - Schema.of(schemaFields), 1L, null, statement, new Thread[] {new Thread()}); + Schema.of(schemaFields), 1L, null, statement, new Future[] {mock(Future.class)}); ResultSetMetaData metaData = resultSet.getMetaData(); assertThat(metaData.isSearchable(1)).isTrue(); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java index 674eb0df64e0..189aeedfaed0 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java @@ -67,6 +67,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; @@ -150,6 +152,8 @@ public void setUp() throws IOException, SQLException { .when(bigQueryConnection) .getQueryDialect(); doReturn(1000L).when(bigQueryConnection).getMaxResults(); + ExecutorService executorService = mock(ExecutorService.class); + doReturn(executorService).when(bigQueryConnection).getExecutorService(); bigQueryStatement = new BigQueryStatement(bigQueryConnection); VectorSchemaRoot vectorSchemaRoot = getTestVectorSchemaRoot(); arrowSchema = @@ -252,7 +256,7 @@ public void getArrowResultSetTest() throws SQLException { doReturn(readSession) .when(bigQueryStatementSpy) .getReadSession(any(CreateReadSessionRequest.class)); - Thread mockWorker = new Thread(); + Future mockWorker = mock(Future.class); doReturn(mockWorker) .when(bigQueryStatementSpy) .populateArrowBufferedQueue( diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java index fda5112703fd..db77acc58342 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java @@ -31,6 +31,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Optional; +import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.LogRecord; import org.junit.jupiter.api.AfterEach; @@ -195,7 +196,8 @@ public void testResultSetExceptionLogRouting() throws Exception { // Instantiate a real BigQueryJsonResultSet (which extends BigQueryBaseResultSet) // passing the mock statement carrying connectionId "c789" - BigQueryJsonResultSet rs = BigQueryJsonResultSet.of(schema, 0, null, mockStmt, new Thread[0]); + BigQueryJsonResultSet rs = + BigQueryJsonResultSet.of(schema, 0, null, mockStmt, new Future[0]); // Calling findColumn(null) throws SQLException because column label is null assertThrows(SQLException.class, () -> rs.findColumn(null)); From 6456ddf051bb00bd6f5bc01beb1f5586a877098d Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 13:26:32 -0400 Subject: [PATCH 2/7] refactor(jdbc): fix anonymous Future compliance in wrapThread --- .../bigquery/jdbc/BigQueryDatabaseMetaData.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 4db431ead646..dab91f7d8327 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -5266,7 +5266,7 @@ private void loadDriverVersionProperties() { } } - // TODO(developer): This is a temporary compatibility bridge to wrap raw Threads into Futures. + // TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures. // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService // directly. private static Future[] wrapThread(final Thread thread) { @@ -5275,8 +5275,14 @@ private static Future[] wrapThread(final Thread thread) { } return new Future[] { new Future() { + private volatile boolean cancelled = false; + @Override public boolean cancel(boolean mayInterruptIfRunning) { + if (cancelled || !thread.isAlive()) { + return false; + } + cancelled = true; if (mayInterruptIfRunning) { thread.interrupt(); } @@ -5285,12 +5291,12 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Override public boolean isCancelled() { - return false; + return cancelled; } @Override public boolean isDone() { - return !thread.isAlive(); + return cancelled || !thread.isAlive(); } @Override From 67fa7eacbbb0c5cf07e968a8004d5f9f1c34191c Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 15:28:47 -0400 Subject: [PATCH 3/7] address comments --- .../bigquery/jdbc/BigQueryArrowResultSet.java | 2 +- .../bigquery/jdbc/BigQueryConnection.java | 6 +- .../jdbc/BigQueryDatabaseMetaData.java | 35 +++-- .../cloud/bigquery/jdbc/BigQueryJdbcMdc.java | 16 +-- .../bigquery/jdbc/BigQueryJsonResultSet.java | 2 +- .../jdbc/BigQueryResultSetFinalizers.java | 4 +- .../bigquery/jdbc/BigQueryStatement.java | 64 +++++---- .../jdbc/BigQueryDatabaseMetaDataTest.java | 125 ++++++++++++++++++ .../bigquery/jdbc/BigQueryJdbcMdcTest.java | 4 +- 9 files changed, 204 insertions(+), 54 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index 55282df2ee23..e4ba837643f8 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -485,7 +485,7 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp public void close() { LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this)); this.isClosed = true; - if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + if (ownedTask != null) { // cancel the producer task when result set is closed ownedTask.cancel(true); } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 586a5c329405..cbbf50acc392 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -348,7 +348,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount); - this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool(); + // Use a bounded cached thread pool to prevent unbounded thread creation (and OOMs) + // under heavy load, while ensuring a limit (e.g., 100) high enough to prevent deadlocks + // between interdependent producer/consumer tasks (like nextPageWorker and + // populateBufferWorker). + this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(100); } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index dab91f7d8327..b7a7eadc235f 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -72,6 +72,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -5269,7 +5270,7 @@ private void loadDriverVersionProperties() { // TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures. // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService // directly. - private static Future[] wrapThread(final Thread thread) { + static Future[] wrapThread(final Thread thread) { if (thread == null) { return null; } @@ -5279,7 +5280,7 @@ private static Future[] wrapThread(final Thread thread) { @Override public boolean cancel(boolean mayInterruptIfRunning) { - if (cancelled || !thread.isAlive()) { + if (cancelled || thread.getState() == Thread.State.TERMINATED) { return false; } cancelled = true; @@ -5296,25 +5297,33 @@ public boolean isCancelled() { @Override public boolean isDone() { - return cancelled || !thread.isAlive(); + return cancelled || thread.getState() == Thread.State.TERMINATED; } @Override - public Object get() { - try { - thread.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + public Object get() throws InterruptedException, CancellationException { + if (isCancelled()) { + throw new CancellationException(); + } + thread.join(); + if (isCancelled()) { + throw new CancellationException(); } return null; } @Override - public Object get(long timeout, TimeUnit unit) { - try { - unit.timedJoin(thread, timeout); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + public Object get(long timeout, TimeUnit unit) + throws InterruptedException, CancellationException, TimeoutException { + if (isCancelled()) { + throw new CancellationException(); + } + unit.timedJoin(thread, timeout); + if (isCancelled()) { + throw new CancellationException(); + } + if (thread.isAlive()) { + throw new TimeoutException(); } return null; } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java index 5f2416753c4d..61326e809d4a 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java @@ -81,13 +81,13 @@ static ExecutorService newFixedThreadPool(int nThreads) { } /** - * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection - * context from the submitting thread to the executing thread. + * Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC + * connection context from the submitting thread to the executing thread. */ - static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + static ExecutorService newBoundedCachedThreadPool(int maxThreads, ThreadFactory threadFactory) { return new MdcThreadPoolExecutor( 0, - Integer.MAX_VALUE, + maxThreads, 60L, TimeUnit.SECONDS, new java.util.concurrent.SynchronousQueue<>(), @@ -95,11 +95,11 @@ static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { } /** - * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection - * context from the submitting thread to the executing thread. + * Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC + * connection context from the submitting thread to the executing thread. */ - static ExecutorService newCachedThreadPool() { - return newCachedThreadPool(Executors.defaultThreadFactory()); + static ExecutorService newBoundedCachedThreadPool(int maxThreads) { + return newBoundedCachedThreadPool(maxThreads, Executors.defaultThreadFactory()); } private static class MdcThreadFactory implements ThreadFactory { diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index 08ef917cec4f..46f70b965c39 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -294,7 +294,7 @@ public void close() { this.isClosed = true; if (ownedTasks != null) { for (Future ownedTask : ownedTasks) { - if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + if (ownedTask != null) { ownedTask.cancel(true); } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java index 0606d39bee90..eefdafcea84b 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java @@ -41,7 +41,7 @@ public ArrowResultSetFinalizer( // Free resources. Remove all the hard refs public void finalizeResources() { LOG.finestTrace("finalizeResources"); - if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + if (ownedTask != null) { ownedTask.cancel(true); } } @@ -64,7 +64,7 @@ public void finalizeResources() { LOG.finestTrace("finalizeResources"); if (ownedTasks != null) { for (Future ownedTask : ownedTasks) { - if (ownedTask != null && !ownedTask.isCancelled() && !ownedTask.isDone()) { + if (ownedTask != null) { ownedTask.cancel(true); } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 227118ec9310..f97765f349d2 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -78,6 +78,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; @@ -853,7 +854,12 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio arrowResultSet.setQueryId(results.getQueryId()); return arrowResultSet; - } catch (Exception ex) { + } catch (Exception | OutOfMemoryError ex) { + if (ex instanceof OutOfMemoryError || ex instanceof RejectedExecutionException) { + throw new BigQueryJdbcException( + "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", + ex); + } throw new BigQueryJdbcException(ex.getMessage(), ex); } } @@ -1034,7 +1040,7 @@ private boolean meetsReadRatio(TableResult results) { return totalRows / pageSize > querySettings.getHighThroughputActivationRatio(); } - BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { + BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws SQLException { List> taskList = new ArrayList<>(); Schema schema = results.getSchema(); @@ -1044,32 +1050,38 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { new LinkedBlockingDeque<>(getPageCacheSize(getBufferSize(), schema)); JobId jobId = results.getJobId(); - if (jobId != null) { - // Task to make rpc calls to fetch data from the server - Future nextPageWorker = - runNextPageTaskAsync( - results, - results.getNextPageToken(), - jobId, - rpcResponseQueue, - this.bigQueryFieldValueListWrapperBlockingQueue); - taskList.add(nextPageWorker); - } else { - try { - populateFirstPage(results, rpcResponseQueue); - rpcResponseQueue.put(Tuple.of(null, false)); - } catch (InterruptedException e) { - LOG.warning( - "%s Interrupted @ processJsonQueryResponseResults: %s", - Thread.currentThread().getName(), e.getMessage()); + try { + if (jobId != null) { + // Task to make rpc calls to fetch data from the server + Future nextPageWorker = + runNextPageTaskAsync( + results, + results.getNextPageToken(), + jobId, + rpcResponseQueue, + this.bigQueryFieldValueListWrapperBlockingQueue); + taskList.add(nextPageWorker); + } else { + try { + populateFirstPage(results, rpcResponseQueue); + rpcResponseQueue.put(Tuple.of(null, false)); + } catch (InterruptedException e) { + LOG.warning( + "%s Interrupted @ processJsonQueryResponseResults: %s", + Thread.currentThread().getName(), e.getMessage()); + } } - } - // Task to parse data received from the server to client library objects - Future populateBufferWorker = - parseAndPopulateRpcDataAsync( - schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); - taskList.add(populateBufferWorker); + // Task to parse data received from the server to client library objects + Future populateBufferWorker = + parseAndPopulateRpcDataAsync( + schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); + taskList.add(populateBufferWorker); + } catch (RejectedExecutionException | OutOfMemoryError e) { + throw new BigQueryJdbcException( + "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", + e); + } Future[] jsonWorkers = taskList.toArray(new Future[0]); diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 58a5a7212066..9b2b82644c35 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -44,9 +44,13 @@ import java.sql.Types; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy assertEquals( metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type); } + + @Test + public void testWrapThread_NullThread() { + assertNull(BigQueryDatabaseMetaData.wrapThread(null)); + } + + @Test + public void testWrapThread_BasicLifecycle() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + try { + startLatch.countDown(); + finishLatch.await(); + } catch (InterruptedException e) { + // ignore + } + }); + + Future[] futures = BigQueryDatabaseMetaData.wrapThread(t); + assertNotNull(futures); + assertEquals(1, futures.length); + Future f = futures[0]; + + // Thread is NEW (not started yet). + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + t.start(); + startLatch.await(); + + // Thread is running. + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + finishLatch.countDown(); + t.join(); + + // Thread is terminated. + assertTrue(f.isDone()); + assertFalse(f.isCancelled()); + assertNull(f.get()); + } + + @Test + public void testWrapThread_CancelBeforeStart() throws Exception { + Thread t = + new Thread( + () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + }); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + // cancel on already cancelled should return false + assertFalse(f.cancel(true)); + + assertThrows(CancellationException.class, () -> f.get()); + assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS)); + } + + @Test + public void testWrapThread_CancelRunningWithInterrupt() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch interruptedLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + interruptedLatch.countDown(); + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS)); + assertThrows(CancellationException.class, () -> f.get()); + } + + @Test + public void testWrapThread_GetTimeout() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // ignore + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS)); + + // Cleanup: stop the thread + t.interrupt(); + t.join(); + } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java index 387e769bae0a..9248fa1578b9 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java @@ -320,9 +320,9 @@ public void testConnectionScopedExecutorLifecycle() throws Exception { assertTrue(metadataExec2 instanceof ThreadPoolExecutor); assertEquals(0, ((ThreadPoolExecutor) exec1).getCorePoolSize()); - assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec1).getMaximumPoolSize()); + assertEquals(100, ((ThreadPoolExecutor) exec1).getMaximumPoolSize()); assertEquals(0, ((ThreadPoolExecutor) exec2).getCorePoolSize()); - assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec2).getMaximumPoolSize()); + assertEquals(100, ((ThreadPoolExecutor) exec2).getMaximumPoolSize()); assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getCorePoolSize()); assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getMaximumPoolSize()); assertTrue(((ThreadPoolExecutor) metadataExec1).allowsCoreThreadTimeOut()); From 9a0f70fad9fd7074bffa117e8fa4e14d774ee303 Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 16:04:05 -0400 Subject: [PATCH 4/7] cancel if failed --- .../google/cloud/bigquery/jdbc/BigQueryStatement.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index f97765f349d2..b1cde0568756 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -810,6 +810,7 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio JobId currentJobId = results.getJobId(); TableId destinationTable = getDestinationTable(currentJobId); Schema schema = results.getSchema(); + Future populateBufferWorker = null; try { String parent = String.format("projects/%s", destinationTable.getProject()); String srcTable = @@ -833,7 +834,7 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio ReadSession readSession = getReadSession(builder.build()); this.arrowBatchWrapperBlockingQueue = new LinkedBlockingDeque<>(getBufferSize()); // deserialize and populate the buffer async, so that the client isn't blocked - Future populateBufferWorker = + populateBufferWorker = populateArrowBufferedQueue( readSession, this.arrowBatchWrapperBlockingQueue, getBigQueryReadClient()); @@ -855,6 +856,9 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio return arrowResultSet; } catch (Exception | OutOfMemoryError ex) { + if (populateBufferWorker != null) { + populateBufferWorker.cancel(true); + } if (ex instanceof OutOfMemoryError || ex instanceof RejectedExecutionException) { throw new BigQueryJdbcException( "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", @@ -1078,6 +1082,11 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); taskList.add(populateBufferWorker); } catch (RejectedExecutionException | OutOfMemoryError e) { + for (Future task : taskList) { + if (task != null) { + task.cancel(true); + } + } throw new BigQueryJdbcException( "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", e); From fa2a70c711d5635539aa0c44cd0ce8538f0905d3 Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 16:07:35 -0400 Subject: [PATCH 5/7] fix bridge --- .../jdbc/BigQueryDatabaseMetaData.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index b7a7eadc235f..b408b248485b 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -5305,9 +5305,11 @@ public Object get() throws InterruptedException, CancellationException { if (isCancelled()) { throw new CancellationException(); } - thread.join(); - if (isCancelled()) { - throw new CancellationException(); + while (thread.getState() != Thread.State.TERMINATED) { + if (isCancelled()) { + throw new CancellationException(); + } + thread.join(50); } return null; } @@ -5318,12 +5320,21 @@ public Object get(long timeout, TimeUnit unit) if (isCancelled()) { throw new CancellationException(); } - unit.timedJoin(thread, timeout); - if (isCancelled()) { - throw new CancellationException(); - } - if (thread.isAlive()) { - throw new TimeoutException(); + long remainingNanos = unit.toNanos(timeout); + long deadline = System.nanoTime() + remainingNanos; + while (thread.getState() != Thread.State.TERMINATED) { + if (isCancelled()) { + throw new CancellationException(); + } + if (remainingNanos <= 0) { + throw new TimeoutException(); + } + long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + if (remainingMillis <= 0) { + remainingMillis = 1; + } + thread.join(Math.min(remainingMillis, 50)); + remainingNanos = deadline - System.nanoTime(); } return null; } From 940aaa6b9552beb24bdcef1a0e54b72edbbfe31e Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 16:18:59 -0400 Subject: [PATCH 6/7] exception management --- .../bigquery/jdbc/BigQueryStatement.java | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index b1cde0568756..d2908c314d19 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -864,6 +864,16 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", ex); } + if (ex instanceof RuntimeException) { + throw (ex instanceof BigQueryJdbcRuntimeException) + ? (BigQueryJdbcRuntimeException) ex + : new BigQueryJdbcRuntimeException(ex); + } + if (ex instanceof SQLException) { + throw (ex instanceof BigQueryJdbcException) + ? (BigQueryJdbcException) ex + : new BigQueryJdbcException(ex); + } throw new BigQueryJdbcException(ex.getMessage(), ex); } } @@ -1073,6 +1083,8 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws LOG.warning( "%s Interrupted @ processJsonQueryResponseResults: %s", Thread.currentThread().getName(), e.getMessage()); + Thread.currentThread().interrupt(); + throw new BigQueryJdbcException("Query execution was interrupted.", e); } } @@ -1081,15 +1093,28 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws parseAndPopulateRpcDataAsync( schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); taskList.add(populateBufferWorker); - } catch (RejectedExecutionException | OutOfMemoryError e) { + } catch (Exception | OutOfMemoryError e) { for (Future task : taskList) { if (task != null) { task.cancel(true); } } - throw new BigQueryJdbcException( - "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", - e); + if (e instanceof RejectedExecutionException || e instanceof OutOfMemoryError) { + throw new BigQueryJdbcException( + "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", + e); + } + if (e instanceof RuntimeException) { + throw (e instanceof BigQueryJdbcRuntimeException) + ? (BigQueryJdbcRuntimeException) e + : new BigQueryJdbcRuntimeException(e); + } + if (e instanceof SQLException) { + throw (e instanceof BigQueryJdbcException) + ? (BigQueryJdbcException) e + : new BigQueryJdbcException(e); + } + throw new BigQueryJdbcException(e.getMessage(), e); } Future[] jsonWorkers = taskList.toArray(new Future[0]); From bf2aba5e5c02f746f74ad5a692ea70e9201f945c Mon Sep 17 00:00:00 2001 From: Neenu1995 Date: Fri, 12 Jun 2026 16:46:03 -0400 Subject: [PATCH 7/7] nit --- .../bigquery/jdbc/BigQueryDatabaseMetaData.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index b408b248485b..83ed7b30c6d5 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -5279,7 +5279,7 @@ static Future[] wrapThread(final Thread thread) { private volatile boolean cancelled = false; @Override - public boolean cancel(boolean mayInterruptIfRunning) { + public synchronized boolean cancel(boolean mayInterruptIfRunning) { if (cancelled || thread.getState() == Thread.State.TERMINATED) { return false; } @@ -5309,7 +5309,11 @@ public Object get() throws InterruptedException, CancellationException { if (isCancelled()) { throw new CancellationException(); } - thread.join(50); + if (thread.getState() == Thread.State.NEW) { + Thread.sleep(50); + } else { + thread.join(50); + } } return null; } @@ -5333,7 +5337,12 @@ public Object get(long timeout, TimeUnit unit) if (remainingMillis <= 0) { remainingMillis = 1; } - thread.join(Math.min(remainingMillis, 50)); + long delay = Math.min(remainingMillis, 50); + if (thread.getState() == Thread.State.NEW) { + Thread.sleep(delay); + } else { + thread.join(delay); + } remainingNanos = deadline - System.nanoTime(); } return null;