Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -173,6 +174,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
boolean useQueryCache;
String queryDialect;
int metadataFetchThreadCount;
int queryExecutionThreadCount;
boolean allowLargeResults;
String destinationTable;
String destinationDataset;
Expand Down Expand Up @@ -209,6 +211,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Boolean reqGoogleDriveScope;
private final Properties clientInfo = new Properties();
private boolean isReadOnlyTokenUsed = false;
private final ExecutorService metadataExecutor;
private final ExecutorService queryExecutor;

BigQueryConnection(String url) throws IOException {
this(url, DataSource.fromUrl(url));
Expand Down Expand Up @@ -337,13 +341,16 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset();
this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope();
this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount();
this.queryExecutionThreadCount = ds.getQueryExecutionThreadCount();
this.requestReason = ds.getRequestReason();
this.connectionPoolSize = ds.getConnectionPoolSize();
this.listenerPoolSize = ds.getListenerPoolSize();
this.partnerToken = ds.getPartnerToken();

this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(this.metadataFetchThreadCount);
this.queryExecutor = BigQueryJdbcMdc.newFixedThreadPool(this.queryExecutionThreadCount);
}
}

Expand Down Expand Up @@ -954,6 +961,42 @@ private void closeImpl() throws SQLException {
statement.close();
}
this.openStatements.clear();

if (this.metadataExecutor != null) {
Comment thread
Neenu1995 marked this conversation as resolved.
this.metadataExecutor.shutdown();
}
if (this.queryExecutor != null) {
this.queryExecutor.shutdown();
}

boolean interrupted = false;

if (this.metadataExecutor != null) {
try {
if (!this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.metadataExecutor.shutdownNow();
}
} catch (InterruptedException e) {
this.metadataExecutor.shutdownNow();
interrupted = true;
}
}

if (this.queryExecutor != null) {
try {
if (!this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.queryExecutor.shutdownNow();
}
} catch (InterruptedException e) {
this.queryExecutor.shutdownNow();
interrupted = true;
}
}
Comment thread
Neenu1995 marked this conversation as resolved.
Outdated

if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException("Interrupted awaiting executor termination");
}
Comment thread
Neenu1995 marked this conversation as resolved.
} catch (ConcurrentModificationException ex) {
throw new BigQueryJdbcException("Concurrent modification during close", ex);
} catch (InterruptedException e) {
Expand All @@ -965,6 +1008,14 @@ private void closeImpl() throws SQLException {
this.isClosed = true;
}

ExecutorService getExecutorService() {
return this.queryExecutor;
}

ExecutorService getMetadataExecutor() {
return this.metadataExecutor;
}

@Override
public boolean isClosed() {
return this.isClosed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
class BigQueryJdbcMdc {
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName());

private static final InheritableThreadLocal<String> currentConnectionId =
new InheritableThreadLocal<>();

Expand Down Expand Up @@ -82,11 +86,16 @@ public MdcThreadFactory(ThreadFactory delegate) {

@Override
public Thread newThread(Runnable r) {
return delegate.newThread(
() -> {
clear();
r.run();
});
Thread t =
delegate.newThread(
() -> {
clear();
r.run();
});
if (t != null) {
t.setDaemon(true);
}
return t;
}
}

Expand All @@ -102,11 +111,35 @@ public MdcThreadPoolExecutor(
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

private final AtomicBoolean warningLogged = new AtomicBoolean(false);

private void monitorQueueSaturation(int queueSize) {
int corePoolSize = getCorePoolSize();
Comment thread
Neenu1995 marked this conversation as resolved.
Outdated
// Warn when queue size is >= corePoolSize * 5, with a minimum of 10 tasks to avoid false
// alerts for tiny pools
int warnThreshold = Math.max(10, corePoolSize * 5);
// Recovery reset threshold is corePoolSize * 2, with a minimum of 4 tasks
int recoveryThreshold = Math.max(4, corePoolSize * 2);

if (queueSize >= warnThreshold) {
if (warningLogged.compareAndSet(false, true)) {
LOG.warning(
"Thread pool is saturating. Core pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
corePoolSize, getActiveCount(), queueSize);
}
} else if (queueSize <= recoveryThreshold) {
warningLogged.set(false);
}
Comment thread
Neenu1995 marked this conversation as resolved.
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}

monitorQueueSaturation(getQueue().size());

if (command instanceof MdcFutureTask) {
super.execute(command);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
Pattern.CASE_INSENSITIVE);
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;
static final String QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME = "QueryExecutionThreadCount";
static final int DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE = 4;
static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";
Expand Down Expand Up @@ -540,6 +542,12 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
"The number of threads used to call a DatabaseMetaData method.")
.setDefaultValue(String.valueOf(DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME)
.setDescription(
"The number of threads used for executing queries in background tasks.")
.setDefaultValue(String.valueOf(DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(ENABLE_WRITE_API_PROPERTY_NAME)
.setDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class DataSource implements javax.sql.DataSource {
private Boolean filterTablesOnDefaultDataset;
private Integer requestGoogleDriveScope;
private Integer metadataFetchThreadCount;
private Integer queryExecutionThreadCount;
private String sslTrustStorePath;
private String sslTrustStorePassword;
private Map<String, String> labels;
Expand Down Expand Up @@ -248,6 +249,9 @@ public class DataSource implements javax.sql.DataSource {
.put(
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
(ds, val) -> ds.setMetadataFetchThreadCount(Integer.parseInt(val)))
.put(
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME,
(ds, val) -> ds.setQueryExecutionThreadCount(Integer.parseInt(val)))
.put(
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
DataSource::setSSLTrustStorePath)
Expand Down Expand Up @@ -565,6 +569,11 @@ Properties createProperties() {
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
String.valueOf(this.metadataFetchThreadCount));
}
if (this.queryExecutionThreadCount != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME,
String.valueOf(this.queryExecutionThreadCount));
}
if (this.sslTrustStorePath != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
Expand Down Expand Up @@ -1084,6 +1093,22 @@ public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
this.metadataFetchThreadCount = metadataFetchThreadCount;
}

public Integer getQueryExecutionThreadCount() {
return queryExecutionThreadCount != null
? queryExecutionThreadCount
: BigQueryJdbcUrlUtility.DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE;
}

public void setQueryExecutionThreadCount(Integer queryExecutionThreadCount) {
if (queryExecutionThreadCount != null) {
validateMin(
queryExecutionThreadCount,
2,
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME);
}
this.queryExecutionThreadCount = queryExecutionThreadCount;
}

public String getSSLTrustStorePath() {
return sslTrustStorePath;
}
Expand Down Expand Up @@ -1387,4 +1412,17 @@ private static void validateNonNegative(long val, String propertyName) {
"Invalid value for %s. It must be greater than or equal to 0.", propertyName));
}
}

/**
* Validates that a property value is greater than or equal to a minimum threshold.
* For thread pools, a minimum of 2 is enforced to ensure there are enough threads
* to handle concurrent coordination and avoid deadlock or thread starvation.
*/
private static void validateMin(long val, long min, String propertyName) {
if (val < min) {
throw new BigQueryJdbcRuntimeException(
String.format(
"Invalid value for %s. It must be greater than or equal to %d.", propertyName, min));
}
}
}
Loading
Loading