Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

- **Query Tags support**: Added ability to attach key-value tags to SQL queries for analytical purposes that would appear in `system.query.history` table. Example: `jdbc:databricks://host;QUERY_TAGS=team:marketing,dashboard:abc123`.
- **SQL Scripting support**: Added support for [SQL Scripting](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-scripting)
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.

### Updated
- Databricks SDK dependency upgraded to latest version 0.60.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.common.*;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.common.util.JdbcThreadUtils;
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.exception.DatabricksSQLException;
Expand All @@ -20,8 +19,6 @@
import com.databricks.sdk.service.sql.StatementState;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DatabricksDatabaseMetaData implements DatabaseMetaData {

Expand All @@ -40,9 +37,6 @@ public class DatabricksDatabaseMetaData implements DatabaseMetaData {
public static final String SYSTEM_FUNCTIONS = "DATABASE,IFNULL,USER";
public static final String TIME_DATE_FUNCTIONS =
"CURDATE,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURTIME,DAYNAME,DAYOFMONTH,DAYOFWEEK,DAYOFYEAR,HOUR,MINUTE,MONTH,MONTHNAME,NOW,QUARTER,SECOND,TIMESTAMPADD,TIMESTAMPDIFF,WEEK,YEAR";
private static final Object THREAD_POOL_LOCK = new Object();
private static ExecutorService schemasThreadPool = null;
private static final int DEFAULT_MAX_THREADS = 10;
private final IDatabricksConnectionInternal connection;
private final IDatabricksSession session;
private final MetadataResultSetBuilder metadataResultSetBuilder;
Expand Down Expand Up @@ -1505,48 +1499,6 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce
catalog, schemaPattern);
throwExceptionIfConnectionIsClosed();

if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA
Comment thread
jayantsing-db marked this conversation as resolved.
&& (catalog == null || catalog.equals("*") || catalog.equals("%"))) {
// Fetch catalogs from the metadata client
List<String> catalogList = new ArrayList<>();
try (ResultSet catalogs = getCatalogs()) {
while (catalogs.next()) {
String c = catalogs.getString(1);
if (c != null && !c.isEmpty()) {
catalogList.add(c);
}
}
}

// Process catalogs in parallel, gathering schema information
List<List<Object>> schemaRows =
JdbcThreadUtils.parallelFlatMap(
catalogList,
session.getConnectionContext(),
DEFAULT_MAX_THREADS, // Not significant since the executor is provided as a parameter
90, // 90 seconds timeout
c -> {
List<List<Object>> rows = new ArrayList<>();
try (ResultSet catalogSchemas =
session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) {
while (catalogSchemas.next()) {
List<Object> schemaRow = new ArrayList<>();
schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM
schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG
rows.add(schemaRow);
}
} catch (SQLException e) {
LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage());
}
return rows;
},
getOrCreateSchemasThreadPool());

// Convert combined data into a result set
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
SCHEMA_COLUMNS, schemaRows, METADATA_STATEMENT_ID, CommandName.LIST_SCHEMAS);
}

return session.getDatabricksMetadataClient().listSchemas(session, catalog, schemaPattern);
}

Expand Down Expand Up @@ -1659,23 +1611,6 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface != null && iface.isAssignableFrom(this.getClass());
}

private static ExecutorService getOrCreateSchemasThreadPool() {
synchronized (THREAD_POOL_LOCK) {
if (schemasThreadPool == null || schemasThreadPool.isShutdown()) {
// Could read max threads from a configuration property
schemasThreadPool =
Executors.newFixedThreadPool(
DEFAULT_MAX_THREADS,
r -> {
Thread t = new Thread(r, "jdbc-schemas-fetcher");
t.setDaemon(true);
return t;
});
}
return schemasThreadPool;
}
}

private void throwExceptionIfConnectionIsClosed() throws SQLException {
LOGGER.debug("private void throwExceptionIfConnectionIsClosed()");
if (!connection.getSession().isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public class MetadataResultConstants {
public static final String[] DEFAULT_TABLE_TYPES = {"TABLE", "VIEW", "SYSTEM TABLE"};
public static final ResultColumn CATALOG_COLUMN =
new ResultColumn("TABLE_CAT", "catalogName", Types.VARCHAR);
private static final ResultColumn CATALOG_FULL_COLUMN =
public static final ResultColumn CATALOG_FULL_COLUMN =
new ResultColumn("TABLE_CATALOG", "catalogName", Types.VARCHAR);
public static final ResultColumn CATALOG_COLUMN_FOR_GET_CATALOGS =
public static final ResultColumn CATALOG_RESULT_COLUMN =
new ResultColumn("TABLE_CAT", "catalog", Types.VARCHAR);
public static final ResultColumn TYPE_CATALOG_COLUMN =
new ResultColumn("TYPE_CAT", "TYPE_CATALOG_COLUMN", Types.VARCHAR);
Expand Down Expand Up @@ -216,7 +216,7 @@ public class MetadataResultConstants {
IS_AUTO_INCREMENT_COLUMN,
IS_GENERATED_COLUMN);

public static List<ResultColumn> CATALOG_COLUMNS = List.of(CATALOG_COLUMN_FOR_GET_CATALOGS);
public static List<ResultColumn> CATALOG_COLUMNS = List.of(CATALOG_RESULT_COLUMN);

public static List<ResultColumn> SCHEMA_COLUMNS =
List.of(SCHEMA_COLUMN_FOR_GET_SCHEMA, CATALOG_FULL_COLUMN);
Expand Down Expand Up @@ -491,9 +491,7 @@ public class MetadataResultConstants {
MetadataResultConstants.TYPE_NAME_COLUMN,
MetadataResultConstants.DATA_TYPE_COLUMN,
MetadataResultConstants.PRECISION_COLUMN));
put(
CommandName.LIST_CATALOGS,
List.of(MetadataResultConstants.CATALOG_COLUMN_FOR_GET_CATALOGS));
put(CommandName.LIST_CATALOGS, List.of(MetadataResultConstants.CATALOG_RESULT_COLUMN));
put(
CommandName.LIST_TABLES,
List.of(MetadataResultConstants.TABLE_NAME_COLUMN, TABLE_TYPE_COLUMN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public static boolean isNullOrEmpty(String s) {
return s == null || s.trim().isEmpty();
}

public static boolean isNullOrWildcard(String s) {
return s == null || isWildcard(s) || s.equals("%");
}

/**
* This function checks if the input string is a wildcard string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ public class CommandConstants {
public static final String IN_CATALOG_SQL = " IN CATALOG %s";
public static final String IN_ABSOLUTE_SCHEMA_SQL = " IN SCHEMA %s";
public static final String IN_ABSOLUTE_TABLE_SQL = " IN TABLE %s";
public static final String SHOW_SCHEMA_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
public static final String IN_ALL_CATALOGS_SQL = " IN ALL CATALOGS";
public static final String SHOW_SCHEMAS_IN_CATALOG_SQL = "SHOW SCHEMAS IN %s";
public static final String LIKE_SQL = " LIKE '%s'";
public static final String SCHEMA_LIKE_SQL = " SCHEMA" + LIKE_SQL;
public static final String TABLE_LIKE_SQL = " TABLE" + LIKE_SQL;
public static final String SHOW_TABLES_SQL = "SHOW TABLES" + IN_CATALOG_SQL;
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES IN ALL CATALOGS";
public static final String SHOW_TABLES_IN_ALL_CATALOGS_SQL = "SHOW TABLES" + IN_ALL_CATALOGS_SQL;
public static final String SHOW_COLUMNS_SQL = "SHOW COLUMNS" + IN_CATALOG_SQL;
public static final String SHOW_FUNCTIONS_SQL = "SHOW FUNCTIONS" + IN_CATALOG_SQL;
public static final String SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL =
"SHOW SCHEMAS" + IN_ALL_CATALOGS_SQL;
public static final String SHOW_PRIMARY_KEYS_SQL =
"SHOW KEYS" + IN_CATALOG_SQL + IN_ABSOLUTE_SCHEMA_SQL + IN_ABSOLUTE_TABLE_SQL;
public static final String SHOW_FOREIGN_KEYS_SQL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public DatabricksResultSet getCatalogsResult(DatabricksResultSet resultSet) thro

public DatabricksResultSet getSchemasResult(DatabricksResultSet resultSet, String catalog)
throws SQLException {
List<List<Object>> rows = getRowsForSchemas(resultSet, SCHEMA_COLUMNS, catalog);
List<List<Object>> rows =
getRowsForSchemas(
resultSet, SCHEMA_COLUMNS, catalog, new SchemasDatabricksResultSetAdapter());
return buildResultSet(
SCHEMA_COLUMNS,
rows,
Expand Down Expand Up @@ -562,19 +564,39 @@ private List<List<Object>> getRowsForFunctions(
}

private List<List<Object>> getRowsForSchemas(
DatabricksResultSet resultSet, List<ResultColumn> columns, String catalog)
DatabricksResultSet resultSet,
List<ResultColumn> columns,
String catalog,
IDatabricksResultSetAdapter adapter)
throws SQLException {
List<List<Object>> rows = new ArrayList<>();
while (resultSet.next()) {
// Check if this row should be included based on the adapter's filter
if (!adapter.includeRow(resultSet, columns)) {
continue;
}

List<Object> row = new ArrayList<>();
for (ResultColumn column : columns) {
if (column.getColumnName().equals("TABLE_CATALOG")) {
row.add(catalog);
continue;
// Map the expected column on client to column in the result set using the adapter
ResultColumn mappedColumn = adapter.mapColumn(column);

if (mappedColumn
.getResultSetColumnName()
.equals(CATALOG_RESULT_COLUMN.getResultSetColumnName())) {
try {
resultSet.findColumn(mappedColumn.getResultSetColumnName());
} catch (SQLException e) {
// Result set does not have a catalog column
// Manually add the catalog and move to next column
row.add(catalog);
continue;
}
}

Object object;
try {
object = resultSet.getObject(column.getResultSetColumnName());
object = resultSet.getObject(mappedColumn.getResultSetColumnName());
if (object == null) {
object = NULL_STRING;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.databricks.jdbc.dbclient.impl.common;

import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_FULL_COLUMN;
import static com.databricks.jdbc.common.MetadataResultConstants.CATALOG_RESULT_COLUMN;

import com.databricks.jdbc.model.core.ResultColumn;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public class SchemasDatabricksResultSetAdapter implements IDatabricksResultSetAdapter {
@Override
public ResultColumn mapColumn(ResultColumn column) {
String columnName = column.getResultSetColumnName();
if (columnName.equals(CATALOG_FULL_COLUMN.getResultSetColumnName())) {
// Map CATALOG_FULL_COLUMN to CATALOG_COLUMN_FOR_GET_CATALOGS of result set
return CATALOG_RESULT_COLUMN;
} else {
return column;
}
}

@Override
public boolean includeRow(ResultSet resultSet, List<ResultColumn> columns) throws SQLException {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,28 @@ private String fetchCatalogSQL() {
}

private String fetchSchemaSQL() throws SQLException {
String contextString =
String.format(
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
catalogName, schemaPattern, sessionContext);
LOGGER.debug(contextString);
throwErrorIfNull(Collections.singletonMap(CATALOG, catalogName), contextString);
String showSchemaSQL = String.format(SHOW_SCHEMA_IN_CATALOG_SQL, catalogName);
LOGGER.debug(
"Building command for fetching schema. Catalog %s, SchemaPattern %s and session context %s",
catalogName, schemaPattern, sessionContext);
String showSchemasSQL;
if (WildcardUtil.isNullOrWildcard(catalogName)) {
Comment thread
jayantsing-db marked this conversation as resolved.
// SHOW SCHEMAS IN ALL CATALOGS
showSchemasSQL = SHOW_SCHEMAS_IN_ALL_CATALOGS_SQL;
} else {
showSchemasSQL = String.format(SHOW_SCHEMAS_IN_CATALOG_SQL, catalogName);
}
if (!WildcardUtil.isNullOrEmpty(schemaPattern)) {
showSchemaSQL += String.format(LIKE_SQL, schemaPattern);
showSchemasSQL += String.format(LIKE_SQL, schemaPattern);
}
return showSchemaSQL;
return showSchemasSQL;
}

private String fetchTablesSQL() throws SQLException {
String contextString =
String.format(
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
catalogName, schemaPattern, tablePattern, sessionContext);
LOGGER.debug(contextString);
LOGGER.debug(
"Building command for fetching tables. Catalog %s, SchemaPattern %s, TablePattern %s and session context %s",
catalogName, schemaPattern, tablePattern, sessionContext);
String showTablesSQL;
if (catalogName == null || catalogName.equals("*") || catalogName.equals("%")) {
if (WildcardUtil.isNullOrWildcard(catalogName)) {
Comment thread
jayantsing-db marked this conversation as resolved.
// SHOW TABLES IN ALL CATALOGS
showTablesSQL = SHOW_TABLES_IN_ALL_CATALOGS_SQL;
} else {
Expand Down
Loading
Loading