Skip to content

Commit c0278c6

Browse files
committed
feat(bigquery-jdbc): support connection-scoped executor isolation and dynamic queue warnings
1 parent fb49fb8 commit c0278c6

5 files changed

Lines changed: 261 additions & 6 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.UUID;
6363
import java.util.concurrent.ConcurrentHashMap;
6464
import java.util.concurrent.Executor;
65+
import java.util.concurrent.ExecutorService;
6566
import java.util.concurrent.TimeUnit;
6667

6768
/**
@@ -173,6 +174,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
173174
boolean useQueryCache;
174175
String queryDialect;
175176
int metadataFetchThreadCount;
177+
int queryExecutionThreadCount;
176178
boolean allowLargeResults;
177179
String destinationTable;
178180
String destinationDataset;
@@ -209,6 +211,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
209211
Boolean reqGoogleDriveScope;
210212
private final Properties clientInfo = new Properties();
211213
private boolean isReadOnlyTokenUsed = false;
214+
private final ExecutorService metadataExecutor;
215+
private final ExecutorService queryExecutor;
212216

213217
BigQueryConnection(String url) throws IOException {
214218
this(url, DataSource.fromUrl(url));
@@ -337,13 +341,16 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
337341
this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset();
338342
this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope();
339343
this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount();
344+
this.queryExecutionThreadCount = ds.getQueryExecutionThreadCount();
340345
this.requestReason = ds.getRequestReason();
341346
this.connectionPoolSize = ds.getConnectionPoolSize();
342347
this.listenerPoolSize = ds.getListenerPoolSize();
343348
this.partnerToken = ds.getPartnerToken();
344349

345350
this.headerProvider = createHeaderProvider();
346351
this.bigQuery = getBigQueryConnection();
352+
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(this.metadataFetchThreadCount);
353+
this.queryExecutor = BigQueryJdbcMdc.newFixedThreadPool(this.queryExecutionThreadCount);
347354
}
348355
}
349356

@@ -954,6 +961,18 @@ private void closeImpl() throws SQLException {
954961
statement.close();
955962
}
956963
this.openStatements.clear();
964+
965+
if (this.metadataExecutor != null) {
966+
this.metadataExecutor.shutdown();
967+
this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS);
968+
this.metadataExecutor.shutdownNow();
969+
}
970+
971+
if (this.queryExecutor != null) {
972+
this.queryExecutor.shutdown();
973+
this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS);
974+
this.queryExecutor.shutdownNow();
975+
}
957976
} catch (ConcurrentModificationException ex) {
958977
throw new BigQueryJdbcException("Concurrent modification during close", ex);
959978
} catch (InterruptedException e) {
@@ -965,6 +984,14 @@ private void closeImpl() throws SQLException {
965984
this.isClosed = true;
966985
}
967986

987+
ExecutorService getExecutorService() {
988+
return this.queryExecutor;
989+
}
990+
991+
ExecutorService getMetadataExecutor() {
992+
return this.metadataExecutor;
993+
}
994+
968995
@Override
969996
public boolean isClosed() {
970997
return this.isClosed;

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
import java.util.concurrent.ThreadFactory;
2727
import java.util.concurrent.ThreadPoolExecutor;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
3132
class BigQueryJdbcMdc {
33+
private static final BigQueryJdbcCustomLogger LOG =
34+
new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName());
35+
3236
private static final InheritableThreadLocal<String> currentConnectionId =
3337
new InheritableThreadLocal<>();
3438

@@ -82,11 +86,16 @@ public MdcThreadFactory(ThreadFactory delegate) {
8286

8387
@Override
8488
public Thread newThread(Runnable r) {
85-
return delegate.newThread(
86-
() -> {
87-
clear();
88-
r.run();
89-
});
89+
Thread t =
90+
delegate.newThread(
91+
() -> {
92+
clear();
93+
r.run();
94+
});
95+
if (t != null) {
96+
t.setDaemon(true);
97+
}
98+
return t;
9099
}
91100
}
92101

@@ -102,11 +111,35 @@ public MdcThreadPoolExecutor(
102111
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
103112
}
104113

114+
private final AtomicBoolean warningLogged = new AtomicBoolean(false);
115+
116+
private void monitorQueueSaturation(int queueSize) {
117+
int corePoolSize = getCorePoolSize();
118+
// Warn when queue size is >= corePoolSize * 5, with a minimum of 10 tasks to avoid false
119+
// alerts for tiny pools
120+
int warnThreshold = Math.max(10, corePoolSize * 5);
121+
// Recovery reset threshold is corePoolSize * 2, with a minimum of 4 tasks
122+
int recoveryThreshold = Math.max(4, corePoolSize * 2);
123+
124+
if (queueSize >= warnThreshold) {
125+
if (warningLogged.compareAndSet(false, true)) {
126+
LOG.warning(
127+
"Thread pool is saturating. Core pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
128+
corePoolSize, getActiveCount(), queueSize);
129+
}
130+
} else if (queueSize <= recoveryThreshold) {
131+
warningLogged.set(false);
132+
}
133+
}
134+
105135
@Override
106136
public void execute(Runnable command) {
107137
if (command == null) {
108138
throw new NullPointerException();
109139
}
140+
141+
monitorQueueSaturation(getQueue().size());
142+
110143
if (command instanceof MdcFutureTask) {
111144
super.execute(command);
112145
} else {

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
142142
Pattern.CASE_INSENSITIVE);
143143
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
144144
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;
145+
static final String QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME = "QueryExecutionThreadCount";
146+
static final int DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE = 4;
145147
static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
146148
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
147149
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";
@@ -540,6 +542,12 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
540542
"The number of threads used to call a DatabaseMetaData method.")
541543
.setDefaultValue(String.valueOf(DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE))
542544
.build(),
545+
BigQueryConnectionProperty.newBuilder()
546+
.setName(QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME)
547+
.setDescription(
548+
"The number of threads used for executing queries in background tasks.")
549+
.setDefaultValue(String.valueOf(DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE))
550+
.build(),
543551
BigQueryConnectionProperty.newBuilder()
544552
.setName(ENABLE_WRITE_API_PROPERTY_NAME)
545553
.setDescription(

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class DataSource implements javax.sql.DataSource {
8686
private Boolean filterTablesOnDefaultDataset;
8787
private Integer requestGoogleDriveScope;
8888
private Integer metadataFetchThreadCount;
89+
private Integer queryExecutionThreadCount;
8990
private String sslTrustStorePath;
9091
private String sslTrustStorePassword;
9192
private Map<String, String> labels;
@@ -248,6 +249,9 @@ public class DataSource implements javax.sql.DataSource {
248249
.put(
249250
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
250251
(ds, val) -> ds.setMetadataFetchThreadCount(Integer.parseInt(val)))
252+
.put(
253+
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME,
254+
(ds, val) -> ds.setQueryExecutionThreadCount(Integer.parseInt(val)))
251255
.put(
252256
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
253257
DataSource::setSSLTrustStorePath)
@@ -565,6 +569,11 @@ Properties createProperties() {
565569
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME,
566570
String.valueOf(this.metadataFetchThreadCount));
567571
}
572+
if (this.queryExecutionThreadCount != null) {
573+
connectionProperties.setProperty(
574+
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME,
575+
String.valueOf(this.queryExecutionThreadCount));
576+
}
568577
if (this.sslTrustStorePath != null) {
569578
connectionProperties.setProperty(
570579
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
@@ -1084,6 +1093,22 @@ public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
10841093
this.metadataFetchThreadCount = metadataFetchThreadCount;
10851094
}
10861095

1096+
public Integer getQueryExecutionThreadCount() {
1097+
return queryExecutionThreadCount != null
1098+
? queryExecutionThreadCount
1099+
: BigQueryJdbcUrlUtility.DEFAULT_QUERY_EXECUTION_THREAD_COUNT_VALUE;
1100+
}
1101+
1102+
public void setQueryExecutionThreadCount(Integer queryExecutionThreadCount) {
1103+
if (queryExecutionThreadCount != null) {
1104+
validateMin(
1105+
queryExecutionThreadCount,
1106+
2,
1107+
BigQueryJdbcUrlUtility.QUERY_EXECUTION_THREAD_COUNT_PROPERTY_NAME);
1108+
}
1109+
this.queryExecutionThreadCount = queryExecutionThreadCount;
1110+
}
1111+
10871112
public String getSSLTrustStorePath() {
10881113
return sslTrustStorePath;
10891114
}
@@ -1387,4 +1412,12 @@ private static void validateNonNegative(long val, String propertyName) {
13871412
"Invalid value for %s. It must be greater than or equal to 0.", propertyName));
13881413
}
13891414
}
1415+
1416+
private static void validateMin(long val, long min, String propertyName) {
1417+
if (val < min) {
1418+
throw new BigQueryJdbcRuntimeException(
1419+
String.format(
1420+
"Invalid value for %s. It must be greater than or equal to %d.", propertyName, min));
1421+
}
1422+
}
13901423
}

0 commit comments

Comments
 (0)