Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ProjectList;
import com.google.api.services.bigquery.model.ProjectList.Projects;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand All @@ -36,14 +39,17 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import com.google.cloud.bigquery.exception.BigQueryJdbcSqlFeatureNotSupportedException;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -120,6 +126,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
BigQueryJdbcUrlUtility.SWA_APPEND_ROW_COUNT_PROPERTY_NAME,
BigQueryJdbcUrlUtility.SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME,
BigQueryJdbcUrlUtility.FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME,
BigQueryJdbcUrlUtility.ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME,
BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME,
BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME,
BigQueryJdbcUrlUtility.MAX_BYTES_BILLED_PROPERTY_NAME,
Expand Down Expand Up @@ -169,6 +176,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
int highThroughputMinTableSize;
int highThroughputActivationRatio;
boolean enableSession;
boolean enableProjectDiscovery;
private List<String> discoveredProjectsCache;
boolean unsupportedHTAPIFallback;
boolean useQueryCache;
String queryDialect;
Expand Down Expand Up @@ -335,6 +344,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.additionalProjects = ds.getAdditionalProjects();

this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset();
this.enableProjectDiscovery = ds.getEnableProjectDiscovery();
this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope();
this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount();
this.requestReason = ds.getRequestReason();
Expand Down Expand Up @@ -1221,6 +1231,42 @@ private boolean checkIsReadOnlyTokenUsed(Map<String, String> authProps) {
return false;
}

public boolean isEnableProjectDiscovery() {
return this.enableProjectDiscovery;
}

public synchronized List<String> getDiscoveredProjects() {
if (this.discoveredProjectsCache != null) {
return this.discoveredProjectsCache;
}

try {
BigQueryOptions options = (BigQueryOptions) getBigQuery().getOptions();
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated
BigQueryRpc rpc = (BigQueryRpc) options.getRpc();
Field bqField = rpc.getClass().getDeclaredField("bigquery");
bqField.setAccessible(true);
Bigquery lowLevelBq = (Bigquery) bqField.get(rpc);

List<String> projects = new ArrayList<>();
String pageToken = null;
do {
ProjectList projectList = lowLevelBq.projects().list().setPageToken(pageToken).execute();
if (projectList.getProjects() != null) {
for (Projects p : projectList.getProjects()) {
projects.add(p.getProjectReference().getProjectId());
}
}
pageToken = projectList.getNextPageToken();
} while (pageToken != null);

this.discoveredProjectsCache = ImmutableList.copyOf(projects);
} catch (Exception e) {
Comment thread
keshavdandeva marked this conversation as resolved.
LOG.warning(e, "Failed to list all accessible projects, falling back to connection default.");
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated
this.discoveredProjectsCache = ImmutableList.of();
}
Comment thread
keshavdandeva marked this conversation as resolved.
return this.discoveredProjectsCache;
}
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated

@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isInstance(this)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3652,44 +3652,51 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
return;
}

ExecutorService apiExecutor = null;
try {
apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE);
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated
List<Future<List<Dataset>>> apiFutures = new ArrayList<>();
for (String currentProjectToScan : projectsToScanList) {
if (Thread.currentThread().isInterrupted()) {
LOG.warning(
"Schema fetcher interrupted during project iteration for project: "
+ currentProjectToScan);
break;
}
LOG.info("Fetching schemas for project: " + currentProjectToScan);
List<Dataset> datasetsInProject =
findMatchingBigQueryObjects(
"Dataset",
() ->
bigquery.listDatasets(
currentProjectToScan,
BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
(name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)),
(ds) -> ds.getDatasetId().getDataset(),
schemaPattern,
schemaRegex,
LOG);
Callable<List<Dataset>> apiCallable =
() ->
findMatchingBigQueryObjects(
"Dataset",
() ->
bigquery.listDatasets(
currentProjectToScan,
BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
(name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)),
(ds) -> ds.getDatasetId().getDataset(),
schemaPattern,
schemaRegex,
LOG);
apiFutures.add(apiExecutor.submit(apiCallable));
}
apiExecutor.shutdown();

if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) {
LOG.info(
"Fetcher thread found no matching datasets in project: "
+ currentProjectToScan);
continue;
for (Future<List<Dataset>> apiFuture : apiFutures) {
if (Thread.currentThread().isInterrupted()) {
break;
}

LOG.fine("Processing found datasets for project: " + currentProjectToScan);
for (Dataset dataset : datasetsInProject) {
if (Thread.currentThread().isInterrupted()) {
LOG.warning(
"Schema fetcher interrupted during dataset iteration for project: "
+ currentProjectToScan);
break;
try {
List<Dataset> datasetsInProject = apiFuture.get();
if (datasetsInProject != null) {
for (Dataset dataset : datasetsInProject) {
if (Thread.currentThread().isInterrupted()) break;
processSchemaInfo(dataset, collectedResults, localResultSchemaFields);
}
}
processSchemaInfo(dataset, collectedResults, localResultSchemaFields);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warning("Fetcher thread interrupted while waiting for API future result.");
break;
} catch (ExecutionException e) {
LOG.warning("Error executing findMatchingDatasets task: " + e.getMessage());
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated
} catch (CancellationException e) {
LOG.warning("A findMatchingDatasets task was cancelled.");
}
}
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated

Expand All @@ -3706,6 +3713,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
} catch (Throwable t) {
LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage());
} finally {
shutdownExecutor(apiExecutor);
signalEndOfData(queue, localResultSchemaFields);
LOG.info("Schema fetcher thread finished.");
}
Expand Down Expand Up @@ -5197,6 +5205,10 @@ private List<String> getAccessibleCatalogNames() {
}
}

if (this.connection.isEnableProjectDiscovery()) {
accessibleCatalogs.addAll(this.connection.getDiscoveredProjects());
}

List<String> sortedCatalogs = new ArrayList<>(accessibleCatalogs);
Collections.sort(sortedCatalogs);
return sortedCatalogs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME =
"FilterTablesOnDefaultDataset";
static final boolean DEFAULT_FILTER_TABLES_ON_DEFAULT_DATASET_VALUE = false;
static final String ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME = "EnableProjectDiscovery";
static final boolean DEFAULT_ENABLE_PROJECT_DISCOVERY_VALUE = false;
static final String REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME = "RequestGoogleDriveScope";
static final String SSL_TRUST_STORE_PROPERTY_NAME = "SSLTrustStore";
static final String SSL_TRUST_STORE_PWD_PROPERTY_NAME = "SSLTrustStorePwd";
Expand Down Expand Up @@ -576,6 +578,13 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
.setDefaultValue(
String.valueOf(DEFAULT_FILTER_TABLES_ON_DEFAULT_DATASET_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME)
.setDescription(
"Enables or disables automatic discovery of all accessible Google Cloud projects. "
+ "When disabled, only the default ProjectId and AdditionalProjects are listed as catalogs.")
.setDefaultValue(String.valueOf(DEFAULT_ENABLE_PROJECT_DISCOVERY_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME)
.setDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class DataSource implements javax.sql.DataSource {
private Boolean enableWriteAPI;
private String additionalProjects;
private Boolean filterTablesOnDefaultDataset;
private Boolean enableProjectDiscovery;
private Integer requestGoogleDriveScope;
private Integer metadataFetchThreadCount;
private String sslTrustStorePath;
Expand Down Expand Up @@ -242,6 +243,12 @@ public class DataSource implements javax.sql.DataSource {
BigQueryJdbcUrlUtility.convertIntToBoolean(
val,
BigQueryJdbcUrlUtility.FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME)))
.put(
BigQueryJdbcUrlUtility.ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME,
(ds, val) ->
ds.setEnableProjectDiscovery(
BigQueryJdbcUrlUtility.convertIntToBoolean(
val, BigQueryJdbcUrlUtility.ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME)))
.put(
BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME,
(ds, val) -> ds.setRequestGoogleDriveScope(Integer.parseInt(val)))
Expand Down Expand Up @@ -555,6 +562,11 @@ Properties createProperties() {
BigQueryJdbcUrlUtility.FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME,
String.valueOf(this.filterTablesOnDefaultDataset));
}
if (this.enableProjectDiscovery != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME,
String.valueOf(this.enableProjectDiscovery));
}
if (this.requestGoogleDriveScope != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME,
Expand Down Expand Up @@ -1059,6 +1071,16 @@ public void setFilterTablesOnDefaultDataset(Boolean filterTablesOnDefaultDataset
this.filterTablesOnDefaultDataset = filterTablesOnDefaultDataset;
}

public Boolean getEnableProjectDiscovery() {
return enableProjectDiscovery != null
? enableProjectDiscovery
: BigQueryJdbcUrlUtility.DEFAULT_ENABLE_PROJECT_DISCOVERY_VALUE;
}

public void setEnableProjectDiscovery(Boolean enableProjectDiscovery) {
this.enableProjectDiscovery = enableProjectDiscovery;
}

public Integer getRequestGoogleDriveScope() {
return requestGoogleDriveScope != null
? requestGoogleDriveScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3308,4 +3308,128 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy
assertEquals(
metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type);
}

@Test
public void testGetCatalogs_WithProjectDiscovery() throws SQLException {
when(bigQueryConnection.getCatalog()).thenReturn("primary-project");
when(bigQueryConnection.isEnableProjectDiscovery()).thenReturn(true);
when(bigQueryConnection.getDiscoveredProjects())
.thenReturn(Arrays.asList("discovered-1", "discovered-2"));
when(bigQueryConnection.getAdditionalProjects()).thenReturn("additional-1,additional-2");

ResultSet rs = dbMetadata.getCatalogs();
assertNotNull(rs);

List<String> catalogs = new ArrayList<>();
while (rs.next()) {
catalogs.add(rs.getString("TABLE_CAT"));
}

assertThat(catalogs)
.containsExactly(
"additional-1", "additional-2", "discovered-1", "discovered-2", "primary-project")
.inOrder();
}

@Test
public void testGetCatalogs_WithoutProjectDiscovery() throws SQLException {
when(bigQueryConnection.getCatalog()).thenReturn("primary-project");
when(bigQueryConnection.isEnableProjectDiscovery()).thenReturn(false);
when(bigQueryConnection.getDiscoveredProjects())
.thenReturn(Arrays.asList("discovered-1", "discovered-2"));
when(bigQueryConnection.getAdditionalProjects()).thenReturn("additional-1,additional-2");

ResultSet rs = dbMetadata.getCatalogs();
assertNotNull(rs);

List<String> catalogs = new ArrayList<>();
while (rs.next()) {
catalogs.add(rs.getString("TABLE_CAT"));
}

assertThat(catalogs)
.containsExactly("additional-1", "additional-2", "primary-project")
.inOrder();
}

@Test
public void testGetSchemas_WithProjectDiscovery() throws SQLException {
when(bigQueryConnection.getCatalog()).thenReturn("primary-project");
when(bigQueryConnection.isEnableProjectDiscovery()).thenReturn(true);
when(bigQueryConnection.getDiscoveredProjects()).thenReturn(Arrays.asList("discovered-1"));
when(bigQueryConnection.getAdditionalProjects()).thenReturn("additional-1");

Page<Dataset> pagePrimary = mock(Page.class);
Dataset dsPrimary = mockBigQueryDataset("primary-project", "dataset_p");
when(pagePrimary.iterateAll()).thenReturn(Collections.singletonList(dsPrimary));
when(bigqueryClient.listDatasets(eq("primary-project"), any(BigQuery.DatasetListOption.class)))
.thenReturn(pagePrimary);

Page<Dataset> pageAdditional = mock(Page.class);
Dataset dsAdditional = mockBigQueryDataset("additional-1", "dataset_a");
when(pageAdditional.iterateAll()).thenReturn(Collections.singletonList(dsAdditional));
when(bigqueryClient.listDatasets(eq("additional-1"), any(BigQuery.DatasetListOption.class)))
.thenReturn(pageAdditional);

Page<Dataset> pageDiscovered = mock(Page.class);
Dataset dsDiscovered = mockBigQueryDataset("discovered-1", "dataset_d");
when(pageDiscovered.iterateAll()).thenReturn(Collections.singletonList(dsDiscovered));
when(bigqueryClient.listDatasets(eq("discovered-1"), any(BigQuery.DatasetListOption.class)))
.thenReturn(pageDiscovered);

ResultSet rs = dbMetadata.getSchemas(null, null);
assertNotNull(rs);

List<String> schemas = new ArrayList<>();
List<String> catalogs = new ArrayList<>();
while (rs.next()) {
schemas.add(rs.getString("TABLE_SCHEM"));
catalogs.add(rs.getString("TABLE_CATALOG"));
}

// Results are sorted by catalog (TABLE_CATALOG) then schema (TABLE_SCHEM)
// alphabetical catalog: "additional-1", "discovered-1", "primary-project"
assertThat(catalogs)
.containsExactly("additional-1", "discovered-1", "primary-project")
.inOrder();
assertThat(schemas).containsExactly("dataset_a", "dataset_d", "dataset_p").inOrder();
}

@Test
public void testGetSchemas_WithoutProjectDiscovery() throws SQLException {
when(bigQueryConnection.getCatalog()).thenReturn("primary-project");
when(bigQueryConnection.isEnableProjectDiscovery()).thenReturn(false);
when(bigQueryConnection.getDiscoveredProjects()).thenReturn(Arrays.asList("discovered-1"));
when(bigQueryConnection.getAdditionalProjects()).thenReturn("additional-1");

Page<Dataset> pagePrimary = mock(Page.class);
Dataset dsPrimary = mockBigQueryDataset("primary-project", "dataset_p");
when(pagePrimary.iterateAll()).thenReturn(Collections.singletonList(dsPrimary));
when(bigqueryClient.listDatasets(eq("primary-project"), any(BigQuery.DatasetListOption.class)))
.thenReturn(pagePrimary);

Page<Dataset> pageAdditional = mock(Page.class);
Dataset dsAdditional = mockBigQueryDataset("additional-1", "dataset_a");
when(pageAdditional.iterateAll()).thenReturn(Collections.singletonList(dsAdditional));
when(bigqueryClient.listDatasets(eq("additional-1"), any(BigQuery.DatasetListOption.class)))
.thenReturn(pageAdditional);

ResultSet rs = dbMetadata.getSchemas(null, null);
assertNotNull(rs);

List<String> schemas = new ArrayList<>();
List<String> catalogs = new ArrayList<>();
while (rs.next()) {
schemas.add(rs.getString("TABLE_SCHEM"));
catalogs.add(rs.getString("TABLE_CATALOG"));
}

// Results are sorted by catalog (TABLE_CATALOG) then schema (TABLE_SCHEM)
// alphabetical catalog: "additional-1", "primary-project" (discovered-1 is ignored)
assertThat(catalogs).containsExactly("additional-1", "primary-project").inOrder();
assertThat(schemas).containsExactly("dataset_a", "dataset_p").inOrder();

verify(bigqueryClient, never())
.listDatasets(eq("discovered-1"), any(BigQuery.DatasetListOption.class));
}
}
Loading
Loading