-
Notifications
You must be signed in to change notification settings - Fork 40
Support fetching all schemas using a single SQL command #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f25cda7
99b1175
40157f4
a7964c9
535d2bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| package com.databricks.jdbc.dbclient.impl.common; | ||
|
|
||
| import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_COLUMN_FOR_GET_CATALOGS; | ||
| import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_FULL_COLUMN; | ||
|
|
||
| import com.databricks.jdbc.model.core.ResultColumn; | ||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import java.util.List; | ||
|
|
||
| public class SchemasDatabricksResultSetAdapter implements IDatabricksResultSetAdapter { | ||
| @Override | ||
| public ResultColumn mapColumn(ResultColumn column) { | ||
| String columnName = column.getResultSetColumnName(); | ||
| if (columnName.equals(CATALOG_FULL_COLUMN.getResultSetColumnName())) { | ||
| // Map CATALOG_FULL_COLUMN to CATALOG_COLUMN_FOR_GET_CATALOGS of result set | ||
| return CATALOG_COLUMN_FOR_GET_CATALOGS; | ||
|
jayantsing-db marked this conversation as resolved.
Outdated
|
||
| } else { | ||
| return column; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean includeRow(ResultSet resultSet, List<ResultColumn> columns) throws SQLException { | ||
| return true; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,8 @@ | ||
| package com.databricks.jdbc.dbclient.impl.sqlexec; | ||
|
|
||
| import static com.databricks.jdbc.common.MetadataResultConstants.DEFAULT_TABLE_TYPES; | ||
| import static com.databricks.jdbc.common.MetadataResultConstants.PARSE_SYNTAX_ERROR_SQL_STATE; | ||
| import static com.databricks.jdbc.api.impl.DatabricksDatabaseMetaData.DEFAULT_MAX_THREADS_FETCH_SCHEMAS; | ||
| import static com.databricks.jdbc.api.impl.DatabricksDatabaseMetaData.getOrCreateSchemasThreadPool; | ||
|
jayantsing-db marked this conversation as resolved.
Outdated
|
||
| import static com.databricks.jdbc.common.MetadataResultConstants.*; | ||
| import static com.databricks.jdbc.dbclient.impl.common.CommandConstants.GET_TABLES_STATEMENT_ID; | ||
| import static com.databricks.jdbc.dbclient.impl.common.CommandConstants.METADATA_STATEMENT_ID; | ||
| import static com.databricks.jdbc.dbclient.impl.sqlexec.ResultConstants.TYPE_INFO_RESULT; | ||
|
|
@@ -10,14 +11,18 @@ | |
| import com.databricks.jdbc.api.internal.IDatabricksSession; | ||
| import com.databricks.jdbc.common.MetadataResultConstants; | ||
| import com.databricks.jdbc.common.StatementType; | ||
| import com.databricks.jdbc.common.util.JdbcThreadUtils; | ||
| import com.databricks.jdbc.common.util.WildcardUtil; | ||
| import com.databricks.jdbc.dbclient.IDatabricksClient; | ||
| import com.databricks.jdbc.dbclient.IDatabricksMetadataClient; | ||
| import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder; | ||
| import com.databricks.jdbc.log.JdbcLogger; | ||
| import com.databricks.jdbc.log.JdbcLoggerFactory; | ||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
||
| /** Implementation for {@link IDatabricksMetadataClient} using {@link IDatabricksClient}. */ | ||
|
|
@@ -54,7 +59,20 @@ public DatabricksResultSet listSchemas( | |
| new CommandBuilder(catalog, session).setSchemaPattern(schemaNamePattern); | ||
| String SQL = commandBuilder.getSQLString(CommandName.LIST_SCHEMAS); | ||
| LOGGER.debug("SQL command to fetch schemas: {}", SQL); | ||
| return metadataResultSetBuilder.getSchemasResult(getResultSet(SQL, session), catalog); | ||
| try { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will make it fail for all customers until the new DBSQL release is rolled out. We will unnecessarily add an extra hop until then to first go through the exception flow and then fallback. Can we do something intelligently?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline with using some signals, please check if that looks good to you.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are further evaluating this. I will make changes separately as per the conclusion. |
||
| return metadataResultSetBuilder.getSchemasResult(getResultSet(SQL, session), catalog); | ||
| } catch (SQLException e) { | ||
| if (WildcardUtil.isNullOrWildcard(catalog) | ||
| && e.getSQLState().equals(PARSE_SYNTAX_ERROR_SQL_STATE)) { | ||
| // This is a fallback for the case where the SQL command fails with "syntax error at or near | ||
| // "ALL CATALOGS"" | ||
| // This is a known issue for older DBR versions | ||
| LOGGER.debug("SQL command failed with syntax error. Fetching schemas across all catalogs."); | ||
| return fetchSchemasAcrossCatalogs(session, schemaNamePattern); | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -159,7 +177,6 @@ public DatabricksResultSet listImportedKeys( | |
| if (e.getSQLState().equals(PARSE_SYNTAX_ERROR_SQL_STATE)) { | ||
| // This is a workaround for the issue where the SQL command fails with "syntax error at or | ||
| // near "foreign"" | ||
|
jayantsing-db marked this conversation as resolved.
|
||
| // This is a known issue in Databricks for older DBSQL versions | ||
| LOGGER.debug("SQL command failed with syntax error. Returning empty result set."); | ||
| return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns( | ||
| MetadataResultConstants.IMPORTED_KEYS_COLUMNS, | ||
|
|
@@ -233,4 +250,49 @@ private DatabricksResultSet getResultSet(String SQL, IDatabricksSession session) | |
| session, | ||
| null /* parentStatement */); | ||
| } | ||
|
|
||
| private DatabricksResultSet fetchSchemasAcrossCatalogs( | ||
| IDatabricksSession session, String schemaPattern) throws SQLException { | ||
| List<String> catalogList = new ArrayList<>(); | ||
| try (ResultSet catalogs = session.getDatabricksMetadataClient().listCatalogs(session)) { | ||
| while (catalogs.next()) { | ||
| String c = catalogs.getString(1); | ||
| if (c != null && !c.isEmpty()) { | ||
| catalogList.add(c); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Process catalogs in parallel, gathering schema information | ||
| List<List<Object>> schemaRows = | ||
| JdbcThreadUtils.parallelFlatMap( | ||
| catalogList, | ||
| session.getConnectionContext(), | ||
| DEFAULT_MAX_THREADS_FETCH_SCHEMAS, // Not significant since the executor is provided as | ||
| // a parameter | ||
|
jayantsing-db marked this conversation as resolved.
|
||
| 90, // 90 seconds timeout | ||
|
jayantsing-db marked this conversation as resolved.
Outdated
|
||
| c -> { | ||
| List<List<Object>> rows = new ArrayList<>(); | ||
| try (ResultSet catalogSchemas = | ||
| session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) { | ||
| while (catalogSchemas.next()) { | ||
| List<Object> schemaRow = new ArrayList<>(); | ||
| schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM | ||
| schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG | ||
| rows.add(schemaRow); | ||
| } | ||
| } catch (SQLException e) { | ||
| LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage()); | ||
| } | ||
| return rows; | ||
| }, | ||
| getOrCreateSchemasThreadPool()); | ||
|
|
||
| // Convert combined data into a result set | ||
| return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns( | ||
| SCHEMA_COLUMNS, | ||
| schemaRows, | ||
| METADATA_STATEMENT_ID, | ||
| com.databricks.jdbc.common.CommandName.LIST_SCHEMAS); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.