Skip to content

Commit 9cecd23

Browse files
Support fetching all schemas using a single SQL command (#953)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> This is concerning the SQL Execution API mode of the OSS JDBC driver (`useThriftClient=0`). Earlier when the catalog was specified as null in getSchemas(catalog, schema), JDBC used to fetch all catalogs followed by fetching schemas across each catalog in parallel. This imposed a performance regression as compared to the Thrift mode of the OSS JDBC (`useThriftClient=1`/default-mode). This PR executes the getSchemas(null, schema) using the SQL command `SHOW SCHEMAS IN ALL CATALOGS` introduced in DBR 17.x (https://docs.databricks.com/aws/en/release-notes/runtime/17.0#support-all-catalogs-in-show-schemas). Runtime PR: https://github.com/databricks-eng/runtime/pull/161811. The changes are scheduled to flow to OSS Spark https://github.com/apache/spark/blob/5660dbadf90ed08faef6dc883fd98f55b098e96a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ShowNamespacesCommand.scala. For earlier DBR versions where the SQL syntax is not supported, JDBC fallbacks to earlier approach of fetching catalogs and schemas across each catalog in parallel. ## Testing <!-- Describe how the changes have been tested--> - e2e testing - Repo fake service tests - Unit tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> --------- Signed-off-by: Jayant Singh <jayant.singh@databricks.com>
1 parent 7e057dc commit 9cecd23

13 files changed

Lines changed: 269 additions & 119 deletions

NEXT_CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
- **Query Tags support**: Added ability to attach key-value tags to SQL queries for analytical purposes that would appear in `system.query.history` table. Example: `jdbc:databricks://host;QUERY_TAGS=team:marketing,dashboard:abc123`.
88
- **SQL Scripting support**: Added support for [SQL Scripting](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-scripting)
9-
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
9+
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
10+
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
1011

1112
### Updated
1213
- Databricks SDK dependency upgraded to latest version 0.60.0

src/main/java/com/databricks/jdbc/api/impl/DatabricksDatabaseMetaData.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.databricks.jdbc.api.internal.IDatabricksSession;
1010
import com.databricks.jdbc.common.*;
1111
import com.databricks.jdbc.common.util.DriverUtil;
12-
import com.databricks.jdbc.common.util.JdbcThreadUtils;
1312
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
1413
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1514
import com.databricks.jdbc.exception.DatabricksSQLException;
@@ -20,8 +19,6 @@
2019
import com.databricks.sdk.service.sql.StatementState;
2120
import java.sql.*;
2221
import java.util.*;
23-
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2522

2623
public class DatabricksDatabaseMetaData implements DatabaseMetaData {
2724

@@ -40,9 +37,6 @@ public class DatabricksDatabaseMetaData implements DatabaseMetaData {
4037
public static final String SYSTEM_FUNCTIONS = "DATABASE,IFNULL,USER";
4138
public static final String TIME_DATE_FUNCTIONS =
4239
"CURDATE,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURTIME,DAYNAME,DAYOFMONTH,DAYOFWEEK,DAYOFYEAR,HOUR,MINUTE,MONTH,MONTHNAME,NOW,QUARTER,SECOND,TIMESTAMPADD,TIMESTAMPDIFF,WEEK,YEAR";
43-
private static final Object THREAD_POOL_LOCK = new Object();
44-
private static ExecutorService schemasThreadPool = null;
45-
private static final int DEFAULT_MAX_THREADS = 10;
4640
private final IDatabricksConnectionInternal connection;
4741
private final IDatabricksSession session;
4842
private final MetadataResultSetBuilder metadataResultSetBuilder;
@@ -1505,48 +1499,6 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce
15051499
catalog, schemaPattern);
15061500
throwExceptionIfConnectionIsClosed();
15071501

1508-
if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA
1509-
&& (catalog == null || catalog.equals("*") || catalog.equals("%"))) {
1510-
// Fetch catalogs from the metadata client
1511-
List<String> catalogList = new ArrayList<>();
1512-
try (ResultSet catalogs = getCatalogs()) {
1513-
while (catalogs.next()) {
1514-
String c = catalogs.getString(1);
1515-
if (c != null && !c.isEmpty()) {
1516-
catalogList.add(c);
1517-
}
1518-
}
1519-
}
1520-
1521-
// Process catalogs in parallel, gathering schema information
1522-
List<List<Object>> schemaRows =
1523-
JdbcThreadUtils.parallelFlatMap(
1524-
catalogList,
1525-
session.getConnectionContext(),
1526-
DEFAULT_MAX_THREADS, // Not significant since the executor is provided as a parameter
1527-
90, // 90 seconds timeout
1528-
c -> {
1529-
List<List<Object>> rows = new ArrayList<>();
1530-
try (ResultSet catalogSchemas =
1531-
session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) {
1532-
while (catalogSchemas.next()) {
1533-
List<Object> schemaRow = new ArrayList<>();
1534-
schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM
1535-
schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG
1536-
rows.add(schemaRow);
1537-
}
1538-
} catch (SQLException e) {
1539-
LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage());
1540-
}
1541-
return rows;
1542-
},
1543-
getOrCreateSchemasThreadPool());
1544-
1545-
// Convert combined data into a result set
1546-
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
1547-
SCHEMA_COLUMNS, schemaRows, METADATA_STATEMENT_ID, CommandName.LIST_SCHEMAS);
1548-
}
1549-
15501502
return session.getDatabricksMetadataClient().listSchemas(session, catalog, schemaPattern);
15511503
}
15521504

@@ -1659,23 +1611,6 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
16591611
return iface != null && iface.isAssignableFrom(this.getClass());
16601612
}
16611613

1662-
private static ExecutorService getOrCreateSchemasThreadPool() {
1663-
synchronized (THREAD_POOL_LOCK) {
1664-
if (schemasThreadPool == null || schemasThreadPool.isShutdown()) {
1665-
// Could read max threads from a configuration property
1666-
schemasThreadPool =
1667-
Executors.newFixedThreadPool(
1668-
DEFAULT_MAX_THREADS,
1669-
r -> {
1670-
Thread t = new Thread(r, "jdbc-schemas-fetcher");
1671-
t.setDaemon(true);
1672-
return t;
1673-
});
1674-
}
1675-
return schemasThreadPool;
1676-
}
1677-
}
1678-
16791614
private void throwExceptionIfConnectionIsClosed() throws SQLException {
16801615
LOGGER.debug("private void throwExceptionIfConnectionIsClosed()");
16811616
if (!connection.getSession().isOpen()) {

src/main/java/com/databricks/jdbc/common/MetadataResultConstants.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ public class MetadataResultConstants {
1313
public static final String[] DEFAULT_TABLE_TYPES = {"TABLE", "VIEW", "SYSTEM TABLE"};
1414
public static final ResultColumn CATALOG_COLUMN =
1515
new ResultColumn("TABLE_CAT", "catalogName", Types.VARCHAR);
16-
private static final ResultColumn CATALOG_FULL_COLUMN =
16+
public static final ResultColumn CATALOG_FULL_COLUMN =
1717
new ResultColumn("TABLE_CATALOG", "catalogName", Types.VARCHAR);
18-
public static final ResultColumn CATALOG_COLUMN_FOR_GET_CATALOGS =
18+
public static final ResultColumn CATALOG_RESULT_COLUMN =
1919
new ResultColumn("TABLE_CAT", "catalog", Types.VARCHAR);
2020
public static final ResultColumn TYPE_CATALOG_COLUMN =
2121
new ResultColumn("TYPE_CAT", "TYPE_CATALOG_COLUMN", Types.VARCHAR);
@@ -216,7 +216,7 @@ public class MetadataResultConstants {
216216
IS_AUTO_INCREMENT_COLUMN,
217217
IS_GENERATED_COLUMN);
218218

219-
public static List<ResultColumn> CATALOG_COLUMNS = List.of(CATALOG_COLUMN_FOR_GET_CATALOGS);
219+
public static List<ResultColumn> CATALOG_COLUMNS = List.of(CATALOG_RESULT_COLUMN);
220220

221221
public static List<ResultColumn> SCHEMA_COLUMNS =
222222
List.of(SCHEMA_COLUMN_FOR_GET_SCHEMA, CATALOG_FULL_COLUMN);
@@ -491,9 +491,7 @@ public class MetadataResultConstants {
491491
MetadataResultConstants.TYPE_NAME_COLUMN,
492492
MetadataResultConstants.DATA_TYPE_COLUMN,
493493
MetadataResultConstants.PRECISION_COLUMN));
494-
put(
495-
CommandName.LIST_CATALOGS,
496-
List.of(MetadataResultConstants.CATALOG_COLUMN_FOR_GET_CATALOGS));
494+
put(CommandName.LIST_CATALOGS, List.of(MetadataResultConstants.CATALOG_RESULT_COLUMN));
497495
put(
498496
CommandName.LIST_TABLES,
499497
List.of(MetadataResultConstants.TABLE_NAME_COLUMN, TABLE_TYPE_COLUMN));

src/main/java/com/databricks/jdbc/common/util/WildcardUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ public static boolean isNullOrEmpty(String s) {
2121
return s == null || s.trim().isEmpty();
2222
}
2323

24+
public static boolean isNullOrWildcard(String s) {
25+
return s == null || isWildcard(s) || s.equals("%");
26+
}
27+
2428
/**
2529
* This function checks if the input string is a wildcard string
2630
*

src/main/java/com/databricks/jdbc/dbclient/impl/common/CommandConstants.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@ public class CommandConstants {
1111
public static final String IN_CATALOG_SQL = " IN CATALOG %s";
1212
public static final String IN_ABSOLUTE_SCHEMA_SQL = " IN SCHEMA %s";
1313
public static final String IN_ABSOLUTE_TABLE_SQL = " IN TABLE %s";
14-
public static final String SHOW_SCHEMA_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
14+
public static final String IN_ALL_CATALOGS_SQL = " IN ALL CATALOGS";
15+
public static final String SHOW_SCHEMAS_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
1516
public static final String LIKE_SQL = " LIKE '%s'";
1617
public static final String SCHEMA_LIKE_SQL = " SCHEMA" + LIKE_SQL;
1718
public static final String TABLE_LIKE_SQL = " TABLE" + LIKE_SQL;
1819
public static final String SHOW_TABLES_SQL = "SHOW TABLES" + IN_CATALOG_SQL;
19-
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES IN ALL CATALOGS";
20+
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES" + IN_ALL_CATALOGS_SQL;
2021
public static final String SHOW_COLUMNS_SQL = "SHOW COLUMNS" + IN_CATALOG_SQL;
2122
public static final String SHOW_FUNCTIONS_SQL = "SHOW FUNCTIONS" + IN_CATALOG_SQL;
23+
public static final String SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL =
24+
"SHOW SCHEMAS" + IN_ALL_CATALOGS_SQL;
2225
public static final String SHOW_PRIMARY_KEYS_SQL =
2326
"SHOW KEYS" + IN_CATALOG_SQL + IN_ABSOLUTE_SCHEMA_SQL + IN_ABSOLUTE_TABLE_SQL;
2427
public static final String SHOW_FOREIGN_KEYS_SQL =

src/main/java/com/databricks/jdbc/dbclient/impl/common/MetadataResultSetBuilder.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public DatabricksResultSet getCatalogsResult(DatabricksResultSet resultSet) thro
6868

6969
public DatabricksResultSet getSchemasResult(DatabricksResultSet resultSet, String catalog)
7070
throws SQLException {
71-
List<List<Object>> rows = getRowsForSchemas(resultSet, SCHEMA_COLUMNS, catalog);
71+
List<List<Object>> rows =
72+
getRowsForSchemas(
73+
resultSet, SCHEMA_COLUMNS, catalog, new SchemasDatabricksResultSetAdapter());
7274
return buildResultSet(
7375
SCHEMA_COLUMNS,
7476
rows,
@@ -562,19 +564,39 @@ private List<List<Object>> getRowsForFunctions(
562564
}
563565

564566
private List<List<Object>> getRowsForSchemas(
565-
DatabricksResultSet resultSet, List<ResultColumn> columns, String catalog)
567+
DatabricksResultSet resultSet,
568+
List<ResultColumn> columns,
569+
String catalog,
570+
IDatabricksResultSetAdapter adapter)
566571
throws SQLException {
567572
List<List<Object>> rows = new ArrayList<>();
568573
while (resultSet.next()) {
574+
// Check if this row should be included based on the adapter's filter
575+
if (!adapter.includeRow(resultSet, columns)) {
576+
continue;
577+
}
578+
569579
List<Object> row = new ArrayList<>();
570580
for (ResultColumn column : columns) {
571-
if (column.getColumnName().equals("TABLE_CATALOG")) {
572-
row.add(catalog);
573-
continue;
581+
// Map the expected column on client to column in the result set using the adapter
582+
ResultColumn mappedColumn = adapter.mapColumn(column);
583+
584+
if (mappedColumn
585+
.getResultSetColumnName()
586+
.equals(CATALOG_RESULT_COLUMN.getResultSetColumnName())) {
587+
try {
588+
resultSet.findColumn(mappedColumn.getResultSetColumnName());
589+
} catch (SQLException e) {
590+
// Result set does not have a catalog column
591+
// Manually add the catalog and move to next column
592+
row.add(catalog);
593+
continue;
594+
}
574595
}
596+
575597
Object object;
576598
try {
577-
object = resultSet.getObject(column.getResultSetColumnName());
599+
object = resultSet.getObject(mappedColumn.getResultSetColumnName());
578600
if (object == null) {
579601
object = NULL_STRING;
580602
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.databricks.jdbc.dbclient.impl.common;
2+
3+
import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_FULL_COLUMN;
4+
import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_RESULT_COLUMN;
5+
6+
import com.databricks.jdbc.model.core.ResultColumn;
7+
import java.sql.ResultSet;
8+
import java.sql.SQLException;
9+
import java.util.List;
10+
11+
public class SchemasDatabricksResultSetAdapter implements IDatabricksResultSetAdapter {
12+
@Override
13+
public ResultColumn mapColumn(ResultColumn column) {
14+
String columnName = column.getResultSetColumnName();
15+
if (columnName.equals(CATALOG_FULL_COLUMN.getResultSetColumnName())) {
16+
// Map CATALOG_FULL_COLUMN to CATALOG_COLUMN_FOR_GET_CATALOGS of result set
17+
return CATALOG_RESULT_COLUMN;
18+
} else {
19+
return column;
20+
}
21+
}
22+
23+
@Override
24+
public boolean includeRow(ResultSet resultSet, List<ResultColumn> columns) throws SQLException {
25+
return true;
26+
}
27+
}

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/CommandBuilder.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,27 +75,28 @@ private String fetchCatalogSQL() {
7575
}
7676

7777
private String fetchSchemaSQL() throws SQLException {
78-
String contextString =
79-
String.format(
80-
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
81-
catalogName, schemaPattern, sessionContext);
82-
LOGGER.debug(contextString);
83-
throwErrorIfNull(Collections.singletonMap(CATALOG, catalogName), contextString);
84-
String showSchemaSQL = String.format(SHOW_SCHEMA_IN_CATALOG_SQL, catalogName);
78+
LOGGER.debug(
79+
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
80+
catalogName, schemaPattern, sessionContext);
81+
String showSchemasSQL;
82+
if (WildcardUtil.isNullOrWildcard(catalogName)) {
83+
// SHOW SCHEMAS IN ALL CATALOGS
84+
showSchemasSQL = SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL;
85+
} else {
86+
showSchemasSQL = String.format(SHOW_SCHEMAS_IN_CATALOG_SQL, catalogName);
87+
}
8588
if (!WildcardUtil.isNullOrEmpty(schemaPattern)) {
86-
showSchemaSQL += String.format(LIKE_SQL, schemaPattern);
89+
showSchemasSQL += String.format(LIKE_SQL, schemaPattern);
8790
}
88-
return showSchemaSQL;
91+
return showSchemasSQL;
8992
}
9093

9194
private String fetchTablesSQL() throws SQLException {
92-
String contextString =
93-
String.format(
94-
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
95-
catalogName, schemaPattern, tablePattern, sessionContext);
96-
LOGGER.debug(contextString);
95+
LOGGER.debug(
96+
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
97+
catalogName, schemaPattern, tablePattern, sessionContext);
9798
String showTablesSQL;
98-
if (catalogName == null || catalogName.equals("*") || catalogName.equals("%")) {
99+
if (WildcardUtil.isNullOrWildcard(catalogName)) {
99100
// SHOW TABLES IN ALL CATALOGS
100101
showTablesSQL = SHOW_TABLES_IN_ALL_CATALOGS_SQL;
101102
} else {

0 commit comments

Comments
 (0)