Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.common.*;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.common.util.JdbcThreadUtils;
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.exception.DatabricksSQLException;
Expand Down Expand Up @@ -40,9 +39,9 @@ public class DatabricksDatabaseMetaData implements DatabaseMetaData {
public static final String SYSTEM_FUNCTIONS = "DATABASE,IFNULL,USER";
public static final String TIME_DATE_FUNCTIONS =
"CURDATE,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURTIME,DAYNAME,DAYOFMONTH,DAYOFWEEK,DAYOFYEAR,HOUR,MINUTE,MONTH,MONTHNAME,NOW,QUARTER,SECOND,TIMESTAMPADD,TIMESTAMPDIFF,WEEK,YEAR";
public static final int DEFAULT_MAX_THREADS_FETCH_SCHEMAS = 10;
private static final Object THREAD_POOL_LOCK = new Object();
private static ExecutorService schemasThreadPool = null;
private static final int DEFAULT_MAX_THREADS = 10;
private final IDatabricksConnectionInternal connection;
private final IDatabricksSession session;
private final MetadataResultSetBuilder metadataResultSetBuilder;
Expand Down Expand Up @@ -1505,48 +1504,6 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce
catalog, schemaPattern);
throwExceptionIfConnectionIsClosed();

if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA
Comment thread
jayantsing-db marked this conversation as resolved.
&& (catalog == null || catalog.equals("*") || catalog.equals("%"))) {
// Fetch catalogs from the metadata client
List<String> catalogList = new ArrayList<>();
try (ResultSet catalogs = getCatalogs()) {
while (catalogs.next()) {
String c = catalogs.getString(1);
if (c != null && !c.isEmpty()) {
catalogList.add(c);
}
}
}

// Process catalogs in parallel, gathering schema information
List<List<Object>> schemaRows =
JdbcThreadUtils.parallelFlatMap(
catalogList,
session.getConnectionContext(),
DEFAULT_MAX_THREADS, // Not significant since the executor is provided as a parameter
90, // 90 seconds timeout
c -> {
List<List<Object>> rows = new ArrayList<>();
try (ResultSet catalogSchemas =
session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) {
while (catalogSchemas.next()) {
List<Object> schemaRow = new ArrayList<>();
schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM
schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG
rows.add(schemaRow);
}
} catch (SQLException e) {
LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage());
}
return rows;
},
getOrCreateSchemasThreadPool());

// Convert combined data into a result set
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
SCHEMA_COLUMNS, schemaRows, METADATA_STATEMENT_ID, CommandName.LIST_SCHEMAS);
}

return session.getDatabricksMetadataClient().listSchemas(session, catalog, schemaPattern);
}

Expand Down Expand Up @@ -1659,13 +1616,13 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface != null && iface.isAssignableFrom(this.getClass());
}

private static ExecutorService getOrCreateSchemasThreadPool() {
public static ExecutorService getOrCreateSchemasThreadPool() {
synchronized (THREAD_POOL_LOCK) {
if (schemasThreadPool == null || schemasThreadPool.isShutdown()) {
// Could read max threads from a configuration property
schemasThreadPool =
Executors.newFixedThreadPool(
DEFAULT_MAX_THREADS,
DEFAULT_MAX_THREADS_FETCH_SCHEMAS,
r -> {
Thread t = new Thread(r, "jdbc-schemas-fetcher");
t.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class MetadataResultConstants {
public static final String[] DEFAULT_TABLE_TYPES = {"TABLE", "VIEW", "SYSTEM TABLE"};
public static final ResultColumn CATALOG_COLUMN =
new ResultColumn("TABLE_CAT", "catalogName", Types.VARCHAR);
private static final ResultColumn CATALOG_FULL_COLUMN =
public static final ResultColumn CATALOG_FULL_COLUMN =
new ResultColumn("TABLE_CATALOG", "catalogName", Types.VARCHAR);
public static final ResultColumn CATALOG_COLUMN_FOR_GET_CATALOGS =
new ResultColumn("TABLE_CAT", "catalog", Types.VARCHAR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public static boolean isNullOrEmpty(String s) {
return s == null || s.trim().isEmpty();
}

public static boolean isNullOrWildcard(String s) {
return s == null || isWildcard(s);
Comment thread
jayantsing-db marked this conversation as resolved.
Outdated
}

/**
* This function checks if the input string is a wildcard string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ public class CommandConstants {
public static final String IN_CATALOG_SQL = " IN CATALOG %s";
public static final String IN_ABSOLUTE_SCHEMA_SQL = " IN SCHEMA %s";
public static final String IN_ABSOLUTE_TABLE_SQL = " IN TABLE %s";
public static final String SHOW_SCHEMA_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
public static final String IN_ALL_CATALOGS_SQL = " IN ALL CATALOGS";
public static final String SHOW_SCHEMAS_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
public static final String LIKE_SQL = " LIKE '%s'";
public static final String SCHEMA_LIKE_SQL = " SCHEMA" + LIKE_SQL;
public static final String TABLE_LIKE_SQL = " TABLE" + LIKE_SQL;
public static final String SHOW_TABLES_SQL = "SHOW TABLES" + IN_CATALOG_SQL;
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES IN ALL CATALOGS";
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES" + IN_ALL_CATALOGS_SQL;
public static final String SHOW_COLUMNS_SQL = "SHOW COLUMNS" + IN_CATALOG_SQL;
public static final String SHOW_FUNCTIONS_SQL = "SHOW FUNCTIONS" + IN_CATALOG_SQL;
public static final String SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL =
"SHOW SCHEMAS" + IN_ALL_CATALOGS_SQL;
public static final String SHOW_PRIMARY_KEYS_SQL =
"SHOW KEYS" + IN_CATALOG_SQL + IN_ABSOLUTE_SCHEMA_SQL + IN_ABSOLUTE_TABLE_SQL;
public static final String SHOW_FOREIGN_KEYS_SQL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public DatabricksResultSet getCatalogsResult(DatabricksResultSet resultSet) thro

public DatabricksResultSet getSchemasResult(DatabricksResultSet resultSet, String catalog)
throws SQLException {
List<List<Object>> rows = getRowsForSchemas(resultSet, SCHEMA_COLUMNS, catalog);
List<List<Object>> rows =
getRowsForSchemas(
resultSet, SCHEMA_COLUMNS, catalog, new SchemasDatabricksResultSetAdapter());
return buildResultSet(
SCHEMA_COLUMNS,
rows,
Expand Down Expand Up @@ -562,19 +564,39 @@ private List<List<Object>> getRowsForFunctions(
}

private List<List<Object>> getRowsForSchemas(
DatabricksResultSet resultSet, List<ResultColumn> columns, String catalog)
DatabricksResultSet resultSet,
List<ResultColumn> columns,
String catalog,
IDatabricksResultSetAdapter adapter)
throws SQLException {
List<List<Object>> rows = new ArrayList<>();
while (resultSet.next()) {
// Check if this row should be included based on the adapter's filter
if (!adapter.includeRow(resultSet, columns)) {
continue;
}

List<Object> row = new ArrayList<>();
for (ResultColumn column : columns) {
if (column.getColumnName().equals("TABLE_CATALOG")) {
row.add(catalog);
continue;
// Map the expected column on client to column in the result set using the adapter
ResultColumn mappedColumn = adapter.mapColumn(column);

if (mappedColumn
.getResultSetColumnName()
.equals(CATALOG_COLUMN_FOR_GET_CATALOGS.getResultSetColumnName())) {
Comment thread
jayantsing-db marked this conversation as resolved.
Outdated
try {
resultSet.findColumn(mappedColumn.getResultSetColumnName());
} catch (SQLException e) {
// Result set does not have a catalog column
// Manually add the catalog and move to next column
row.add(catalog);
continue;
}
}

Object object;
try {
object = resultSet.getObject(column.getResultSetColumnName());
object = resultSet.getObject(mappedColumn.getResultSetColumnName());
if (object == null) {
object = NULL_STRING;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.databricks.jdbc.dbclient.impl.common;

import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_COLUMN_FOR_GET_CATALOGS;
import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_FULL_COLUMN;

import com.databricks.jdbc.model.core.ResultColumn;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public class SchemasDatabricksResultSetAdapter implements IDatabricksResultSetAdapter {
@Override
public ResultColumn mapColumn(ResultColumn column) {
String columnName = column.getResultSetColumnName();
if (columnName.equals(CATALOG_FULL_COLUMN.getResultSetColumnName())) {
// Map CATALOG_FULL_COLUMN to CATALOG_COLUMN_FOR_GET_CATALOGS of result set
return CATALOG_COLUMN_FOR_GET_CATALOGS;
Comment thread
jayantsing-db marked this conversation as resolved.
Outdated
} else {
return column;
}
}

@Override
public boolean includeRow(ResultSet resultSet, List<ResultColumn> columns) throws SQLException {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,28 @@ private String fetchCatalogSQL() {
}

private String fetchSchemaSQL() throws SQLException {
String contextString =
String.format(
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
catalogName, schemaPattern, sessionContext);
LOGGER.debug(contextString);
throwErrorIfNull(Collections.singletonMap(CATALOG, catalogName), contextString);
String showSchemaSQL = String.format(SHOW_SCHEMA_IN_CATALOG_SQL, catalogName);
LOGGER.debug(
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
catalogName, schemaPattern, sessionContext);
String showSchemasSQL;
if (WildcardUtil.isNullOrWildcard(catalogName)) {
Comment thread
jayantsing-db marked this conversation as resolved.
// SHOW SCHEMAS IN ALL CATALOGS
showSchemasSQL = SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL;
} else {
showSchemasSQL = String.format(SHOW_SCHEMAS_IN_CATALOG_SQL, catalogName);
}
if (!WildcardUtil.isNullOrEmpty(schemaPattern)) {
showSchemaSQL += String.format(LIKE_SQL, schemaPattern);
showSchemasSQL += String.format(LIKE_SQL, schemaPattern);
}
return showSchemaSQL;
return showSchemasSQL;
}

private String fetchTablesSQL() throws SQLException {
String contextString =
String.format(
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
catalogName, schemaPattern, tablePattern, sessionContext);
LOGGER.debug(contextString);
LOGGER.debug(
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
catalogName, schemaPattern, tablePattern, sessionContext);
String showTablesSQL;
if (catalogName == null || catalogName.equals("*") || catalogName.equals("%")) {
if (WildcardUtil.isNullOrWildcard(catalogName)) {
Comment thread
jayantsing-db marked this conversation as resolved.
// SHOW TABLES IN ALL CATALOGS
showTablesSQL = SHOW_TABLES_IN_ALL_CATALOGS_SQL;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.databricks.jdbc.dbclient.impl.sqlexec;

import static com.databricks.jdbc.common.MetadataResultConstants.DEFAULT_TABLE_TYPES;
import static com.databricks.jdbc.common.MetadataResultConstants.PARSE_SYNTAX_ERROR_SQL_STATE;
import static com.databricks.jdbc.api.impl.DatabricksDatabaseMetaData.DEFAULT_MAX_THREADS_FETCH_SCHEMAS;
import static com.databricks.jdbc.api.impl.DatabricksDatabaseMetaData.getOrCreateSchemasThreadPool;
Comment thread
jayantsing-db marked this conversation as resolved.
Outdated
import static com.databricks.jdbc.common.MetadataResultConstants.*;
import static com.databricks.jdbc.dbclient.impl.common.CommandConstants.GET_TABLES_STATEMENT_ID;
import static com.databricks.jdbc.dbclient.impl.common.CommandConstants.METADATA_STATEMENT_ID;
import static com.databricks.jdbc.dbclient.impl.sqlexec.ResultConstants.TYPE_INFO_RESULT;
Expand All @@ -10,14 +11,18 @@
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.common.MetadataResultConstants;
import com.databricks.jdbc.common.StatementType;
import com.databricks.jdbc.common.util.JdbcThreadUtils;
import com.databricks.jdbc.common.util.WildcardUtil;
import com.databricks.jdbc.dbclient.IDatabricksClient;
import com.databricks.jdbc.dbclient.IDatabricksMetadataClient;
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

/** Implementation for {@link IDatabricksMetadataClient} using {@link IDatabricksClient}. */
Expand Down Expand Up @@ -54,7 +59,20 @@ public DatabricksResultSet listSchemas(
new CommandBuilder(catalog, session).setSchemaPattern(schemaNamePattern);
String SQL = commandBuilder.getSQLString(CommandName.LIST_SCHEMAS);
LOGGER.debug("SQL command to fetch schemas: {}", SQL);
return metadataResultSetBuilder.getSchemasResult(getResultSet(SQL, session), catalog);
try {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make it fail for all customers until the new DBSQL release is rolled out. We will unnecessarily add an extra hop until then to first go through the exception flow and then fallback. Can we do something intelligently?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with using some signals, please check if that looks good to you.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are further evaluating this. I will make changes separately as per the conclusion.

return metadataResultSetBuilder.getSchemasResult(getResultSet(SQL, session), catalog);
} catch (SQLException e) {
if (WildcardUtil.isNullOrWildcard(catalog)
&& e.getSQLState().equals(PARSE_SYNTAX_ERROR_SQL_STATE)) {
// This is a fallback for the case where the SQL command fails with "syntax error at or near
// "ALL CATALOGS""
// This is a known issue for older DBR versions
LOGGER.debug("SQL command failed with syntax error. Fetching schemas across all catalogs.");
return fetchSchemasAcrossCatalogs(session, schemaNamePattern);
} else {
throw e;
}
}
}

@Override
Expand Down Expand Up @@ -159,7 +177,6 @@ public DatabricksResultSet listImportedKeys(
if (e.getSQLState().equals(PARSE_SYNTAX_ERROR_SQL_STATE)) {
// This is a workaround for the issue where the SQL command fails with "syntax error at or
// near "foreign""
Comment thread
jayantsing-db marked this conversation as resolved.
// This is a known issue in Databricks for older DBSQL versions
LOGGER.debug("SQL command failed with syntax error. Returning empty result set.");
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
MetadataResultConstants.IMPORTED_KEYS_COLUMNS,
Expand Down Expand Up @@ -233,4 +250,49 @@ private DatabricksResultSet getResultSet(String SQL, IDatabricksSession session)
session,
null /* parentStatement */);
}

private DatabricksResultSet fetchSchemasAcrossCatalogs(
IDatabricksSession session, String schemaPattern) throws SQLException {
List<String> catalogList = new ArrayList<>();
try (ResultSet catalogs = session.getDatabricksMetadataClient().listCatalogs(session)) {
while (catalogs.next()) {
String c = catalogs.getString(1);
if (c != null && !c.isEmpty()) {
catalogList.add(c);
}
}
}

// Process catalogs in parallel, gathering schema information
List<List<Object>> schemaRows =
JdbcThreadUtils.parallelFlatMap(
catalogList,
session.getConnectionContext(),
DEFAULT_MAX_THREADS_FETCH_SCHEMAS, // Not significant since the executor is provided as
// a parameter
Comment thread
jayantsing-db marked this conversation as resolved.
90, // 90 seconds timeout
Comment thread
jayantsing-db marked this conversation as resolved.
Outdated
c -> {
List<List<Object>> rows = new ArrayList<>();
try (ResultSet catalogSchemas =
session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) {
while (catalogSchemas.next()) {
List<Object> schemaRow = new ArrayList<>();
schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM
schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG
rows.add(schemaRow);
}
} catch (SQLException e) {
LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage());
}
return rows;
},
getOrCreateSchemasThreadPool());

// Convert combined data into a result set
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
SCHEMA_COLUMNS,
schemaRows,
METADATA_STATEMENT_ID,
com.databricks.jdbc.common.CommandName.LIST_SCHEMAS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,14 +683,6 @@ public void testGetSchemas_SqlExec() throws SQLException {
DatabricksConnectionContext.parse(WAREHOUSE_JDBC_URL_WITH_SEA, new Properties()));
ResultSet resultSet = metaData.getSchemas();
assertNotNull(resultSet);

// Result set should have 2 columns; TABLE_SCHEM and TABLE_CATALOG
assertEquals(2, resultSet.getMetaData().getColumnCount());
assertSame("TABLE_SCHEM", resultSet.getMetaData().getColumnName(1));
assertSame("TABLE_CATALOG", resultSet.getMetaData().getColumnName(2));

// For SQL_EXEC execution, result set data should be empty
assertFalse(resultSet.next());
}

@Test
Expand Down
Loading
Loading