Skip to content

Commit 81e80a0

Browse files
committed
address pr feedback
1 parent 8f75b30 commit 81e80a0

2 files changed

Lines changed: 42 additions & 26 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.google.cloud.bigquery.jdbc;
1818

19+
import com.google.api.client.http.HttpRequestInitializer;
20+
import com.google.api.client.http.HttpTransport;
21+
import com.google.api.client.json.gson.GsonFactory;
1922
import com.google.api.gax.core.CredentialsProvider;
2023
import com.google.api.gax.core.FixedCredentialsProvider;
2124
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -39,7 +42,6 @@
3942
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
4043
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
4144
import com.google.cloud.bigquery.exception.BigQueryJdbcSqlFeatureNotSupportedException;
42-
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
4345
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
4446
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
4547
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
@@ -49,7 +51,6 @@
4951
import com.google.common.collect.ImmutableSortedSet;
5052
import java.io.IOException;
5153
import java.io.InputStream;
52-
import java.lang.reflect.Field;
5354
import java.sql.CallableStatement;
5455
import java.sql.Connection;
5556
import java.sql.DatabaseMetaData;
@@ -83,6 +84,10 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
8384
private final String connectionId;
8485
private static final String DEFAULT_JDBC_TOKEN_VALUE = "Google-BigQuery-JDBC-Driver";
8586
private static final String DEFAULT_VERSION = "0.0.0";
87+
private static final String BIGQUERY_SERVICE_NAME = "bigquery";
88+
private static final long MAX_PROJECTS_PER_PAGE = 10000L;
89+
private static final String PROJECT_LIST_FIELDS =
90+
"projects/projectReference/projectId,nextPageToken";
8691
private static final Set<String> SAFE_TO_LOG_PROPERTIES =
8792
ImmutableSortedSet.orderedBy(String.CASE_INSENSITIVE_ORDER)
8893
.add(
@@ -1242,15 +1247,26 @@ public synchronized List<String> getDiscoveredProjects() {
12421247

12431248
try {
12441249
BigQueryOptions options = (BigQueryOptions) getBigQuery().getOptions();
1245-
BigQueryRpc rpc = (BigQueryRpc) options.getRpc();
1246-
Field bqField = rpc.getClass().getDeclaredField("bigquery");
1247-
bqField.setAccessible(true);
1248-
Bigquery lowLevelBq = (Bigquery) bqField.get(rpc);
1250+
HttpTransportOptions transportOptions = (HttpTransportOptions) options.getTransportOptions();
1251+
HttpTransport transport = transportOptions.getHttpTransportFactory().create();
1252+
HttpRequestInitializer initializer = transportOptions.getHttpRequestInitializer(options);
1253+
Bigquery lowLevelBq =
1254+
new Bigquery.Builder(transport, new GsonFactory(), initializer)
1255+
.setRootUrl(options.getResolvedApiaryHost(BIGQUERY_SERVICE_NAME))
1256+
.setApplicationName(DEFAULT_JDBC_TOKEN_VALUE)
1257+
.build();
12491258

12501259
List<String> projects = new ArrayList<>();
12511260
String pageToken = null;
12521261
do {
1253-
ProjectList projectList = lowLevelBq.projects().list().setPageToken(pageToken).execute();
1262+
ProjectList projectList =
1263+
lowLevelBq
1264+
.projects()
1265+
.list()
1266+
.setPageToken(pageToken)
1267+
.setMaxResults(MAX_PROJECTS_PER_PAGE)
1268+
.setFields(PROJECT_LIST_FIELDS)
1269+
.execute();
12541270
if (projectList.getProjects() != null) {
12551271
for (Projects p : projectList.getProjects()) {
12561272
projects.add(p.getProjectReference().getProjectId());

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3653,13 +3653,11 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
36533653
}
36543654

36553655
ExecutorService apiExecutor = null;
3656+
final List<Future<List<Dataset>>> apiFutures = new ArrayList<>();
36563657
try {
36573658
apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE);
3658-
List<Future<List<Dataset>>> apiFutures = new ArrayList<>();
36593659
for (String currentProjectToScan : projectsToScanList) {
3660-
if (Thread.currentThread().isInterrupted()) {
3661-
break;
3662-
}
3660+
checkInterrupted(apiFutures);
36633661
Callable<List<Dataset>> apiCallable =
36643662
() ->
36653663
findMatchingBigQueryObjects(
@@ -3678,40 +3676,35 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
36783676
apiExecutor.shutdown();
36793677

36803678
for (Future<List<Dataset>> apiFuture : apiFutures) {
3681-
if (Thread.currentThread().isInterrupted()) {
3682-
break;
3683-
}
3679+
checkInterrupted(apiFutures);
36843680
try {
36853681
List<Dataset> datasetsInProject = apiFuture.get();
36863682
if (datasetsInProject != null) {
36873683
for (Dataset dataset : datasetsInProject) {
3688-
if (Thread.currentThread().isInterrupted()) break;
36893684
processSchemaInfo(dataset, collectedResults, localResultSchemaFields);
36903685
}
36913686
}
36923687
} catch (InterruptedException e) {
36933688
Thread.currentThread().interrupt();
3694-
LOG.warning("Fetcher thread interrupted while waiting for API future result.");
3695-
break;
3689+
checkInterrupted(apiFutures);
36963690
} catch (ExecutionException e) {
36973691
LOG.warning("Error executing findMatchingDatasets task: " + e.getMessage());
36983692
} catch (CancellationException e) {
36993693
LOG.warning("A findMatchingDatasets task was cancelled.");
37003694
}
37013695
}
37023696

3703-
if (!Thread.currentThread().isInterrupted()) {
3704-
Comparator<FieldValueList> comparator =
3705-
defineGetSchemasComparator(localResultSchemaFields);
3706-
sortResults(collectedResults, comparator, "getSchemas", LOG);
3707-
}
3708-
3709-
if (!Thread.currentThread().isInterrupted()) {
3710-
populateQueue(collectedResults, queue, localResultSchemaFields);
3711-
}
3697+
checkInterrupted(apiFutures);
3698+
Comparator<FieldValueList> comparator =
3699+
defineGetSchemasComparator(localResultSchemaFields);
3700+
sortResults(collectedResults, comparator, "getSchemas", LOG);
3701+
populateQueue(collectedResults, queue, localResultSchemaFields);
37123702

3703+
} catch (CancellationException e) {
3704+
LOG.warning("Schema fetcher task was cancelled/interrupted.");
37133705
} catch (Throwable t) {
37143706
LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage());
3707+
apiFutures.forEach(f -> f.cancel(true));
37153708
} finally {
37163709
shutdownExecutor(apiExecutor);
37173710
signalEndOfData(queue, localResultSchemaFields);
@@ -5155,6 +5148,13 @@ private void signalEndOfData(
51555148
}
51565149
}
51575150

5151+
private void checkInterrupted(List<? extends Future<?>> futures) {
5152+
if (Thread.currentThread().isInterrupted()) {
5153+
futures.forEach(f -> f.cancel(true));
5154+
throw new CancellationException("Fetcher thread was interrupted.");
5155+
}
5156+
}
5157+
51585158
private void shutdownExecutor(ExecutorService executor) {
51595159
if (executor == null || executor.isShutdown()) {
51605160
return;

0 commit comments

Comments
 (0)