Skip to content

Commit aa55100

Browse files
Implement getSchemas() for SEA (#802)
## Description - Retrieve all catalogs. - For each catalog, fetch schemas in parallel and merge the results. - On the e2-dogfood environment, which has around 9,000 schemas, this approach took approximately 12 seconds—compared to 9 seconds with the existing driver—an acceptable difference. - The runtime update for SHOW SCHEMAS IN ALL CATALOGS is currently under BEHAVE/compatibility review. Meanwhile, this client-side implementation serves as a temporary solution. - Introduced a JdbcThreadUtil class to simplify multi-threaded operations in JDBC. To ensure correctness, thread context must carry the appropriate connection-related details. ## Testing - End to end testing - Unit tests ## Additional Notes to the Reviewer - The BEHAVE committee reviewing [runtime PR #139818](https://github.com/databricks-eng/runtime/pull/139818) is expected to require the introduction of a SQL configuration. For JDBC clients to take advantage of these changes, users would need to manually set this SQL config in their Spark session. ~~Currently afaik, it's not possible to set this configuration directly through the JDBC connection.~~ We need to add a DBSQL config at https://github.com/databricks-eng/universe/blob/68a3f9bf4aa09b4d28f85229cbbb4c6e7bcef7e2/common/dbsql-config/src/SqlConfig.scala#L35 for the configuration which is WIP.
1 parent 4c634e3 commit aa55100

5 files changed

Lines changed: 458 additions & 8 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Added
66
- Support for fetching tables and views across all catalogs using SHOW TABLES FROM/IN ALL CATALOGS in the SQL Exec API.
77
- Support for Token Exchange in OAuth flows where in third party tokens are exchanged for InHouse tokens.
8+
- Support for fetching schemas across all catalogs in the SQL Exec API client.
89
- Added support for polling of statementStatus and sqlState for async SQL execution.
910

1011
### Updated

src/main/java/com/databricks/jdbc/api/impl/DatabricksDatabaseMetaData.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.databricks.jdbc.api.internal.IDatabricksSession;
1010
import com.databricks.jdbc.common.*;
1111
import com.databricks.jdbc.common.util.DriverUtil;
12+
import com.databricks.jdbc.common.util.JdbcThreadUtils;
1213
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
1314
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1415
import com.databricks.jdbc.exception.DatabricksSQLException;
@@ -19,6 +20,8 @@
1920
import com.databricks.sdk.service.sql.StatementState;
2021
import java.sql.*;
2122
import java.util.*;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
2225

2326
public class DatabricksDatabaseMetaData implements DatabaseMetaData {
2427

@@ -37,6 +40,9 @@ public class DatabricksDatabaseMetaData implements DatabaseMetaData {
3740
public static final String SYSTEM_FUNCTIONS = "DATABASE,IFNULL,USER";
3841
public static final String TIME_DATE_FUNCTIONS =
3942
"CURDATE,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURTIME,DAYNAME,DAYOFMONTH,DAYOFWEEK,DAYOFYEAR,HOUR,MINUTE,MONTH,MONTHNAME,NOW,QUARTER,SECOND,TIMESTAMPADD,TIMESTAMPDIFF,WEEK,YEAR";
43+
private static final Object THREAD_POOL_LOCK = new Object();
44+
private static ExecutorService schemasThreadPool = null;
45+
private static final int DEFAULT_MAX_THREADS = 10;
4046
private final IDatabricksConnectionInternal connection;
4147
private final IDatabricksSession session;
4248
private final MetadataResultSetBuilder metadataResultSetBuilder;
@@ -994,9 +1000,6 @@ public ResultSet getTables(
9941000
@Override
9951001
public ResultSet getSchemas() throws SQLException {
9961002
LOGGER.debug("public ResultSet getSchemas()");
997-
if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA) {
998-
return metadataResultSetBuilder.getSchemasResult(null);
999-
}
10001003
return getSchemas(null /* catalog */, null /* schema pattern */);
10011004
}
10021005

@@ -1498,11 +1501,52 @@ public RowIdLifetime getRowIdLifetime() throws SQLException {
14981501
@Override
14991502
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
15001503
LOGGER.debug(
1501-
String.format(
1502-
"public ResultSet getSchemas(String catalog = {}, String schemaPattern = {})",
1503-
catalog,
1504-
schemaPattern));
1505-
throwExceptionIfConnectionIsClosed();
1504+
"public ResultSet getSchemas(String catalog = %s, String schemaPattern = %s)",
1505+
catalog, schemaPattern);
1506+
throwExceptionIfConnectionIsClosed();
1507+
1508+
if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA
1509+
&& (catalog == null || catalog.equals("*") || catalog.equals("%"))) {
1510+
// Fetch catalogs from the metadata client
1511+
List<String> catalogList = new ArrayList<>();
1512+
try (ResultSet catalogs = getCatalogs()) {
1513+
while (catalogs.next()) {
1514+
String c = catalogs.getString(1);
1515+
if (c != null && !c.isEmpty()) {
1516+
catalogList.add(c);
1517+
}
1518+
}
1519+
}
1520+
1521+
// Process catalogs in parallel, gathering schema information
1522+
List<List<Object>> schemaRows =
1523+
JdbcThreadUtils.parallelFlatMap(
1524+
catalogList,
1525+
session.getConnectionContext(),
1526+
DEFAULT_MAX_THREADS, // Not significant since the executor is provided as a parameter
1527+
90, // 90 seconds timeout
1528+
c -> {
1529+
List<List<Object>> rows = new ArrayList<>();
1530+
try (ResultSet catalogSchemas =
1531+
session.getDatabricksMetadataClient().listSchemas(session, c, schemaPattern)) {
1532+
while (catalogSchemas.next()) {
1533+
List<Object> schemaRow = new ArrayList<>();
1534+
schemaRow.add(catalogSchemas.getString(1)); // TABLE_SCHEM
1535+
schemaRow.add(catalogSchemas.getString(2)); // TABLE_CATALOG
1536+
rows.add(schemaRow);
1537+
}
1538+
} catch (SQLException e) {
1539+
LOGGER.warn("Error fetching schemas for catalog %s %s", c, e.getMessage());
1540+
}
1541+
return rows;
1542+
},
1543+
getOrCreateSchemasThreadPool());
1544+
1545+
// Convert combined data into a result set
1546+
return metadataResultSetBuilder.getResultSetWithGivenRowsAndColumns(
1547+
SCHEMA_COLUMNS, schemaRows, METADATA_STATEMENT_ID, CommandName.LIST_SCHEMAS);
1548+
}
1549+
15061550
return session.getDatabricksMetadataClient().listSchemas(session, catalog, schemaPattern);
15071551
}
15081552

@@ -1615,6 +1659,23 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
16151659
return iface != null && iface.isAssignableFrom(this.getClass());
16161660
}
16171661

1662+
private static ExecutorService getOrCreateSchemasThreadPool() {
1663+
synchronized (THREAD_POOL_LOCK) {
1664+
if (schemasThreadPool == null || schemasThreadPool.isShutdown()) {
1665+
// Could read max threads from a configuration property
1666+
schemasThreadPool =
1667+
Executors.newFixedThreadPool(
1668+
DEFAULT_MAX_THREADS,
1669+
r -> {
1670+
Thread t = new Thread(r, "jdbc-schemas-fetcher");
1671+
t.setDaemon(true);
1672+
return t;
1673+
});
1674+
}
1675+
return schemasThreadPool;
1676+
}
1677+
}
1678+
16181679
private void throwExceptionIfConnectionIsClosed() throws SQLException {
16191680
LOGGER.debug("private void throwExceptionIfConnectionIsClosed()");
16201681
if (!connection.getSession().isOpen()) {
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package com.databricks.jdbc.common.util;
2+
3+
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
4+
import com.databricks.jdbc.exception.DatabricksSQLException;
5+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
6+
import java.sql.SQLException;
7+
import java.util.ArrayList;
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.concurrent.*;
12+
import java.util.function.Function;
13+
14+
/** Utility class for executing tasks in parallel with proper context handling. */
15+
public class JdbcThreadUtils {
16+
17+
/**
18+
* Executes tasks concurrently with appropriate context management, utilizing a provided executor
19+
* service (which can be null, in which case a new one will be created).
20+
*
21+
* @param items The items to process
22+
* @param connectionContext The connection context to propagate to worker threads
23+
* @param maxThreads Maximum number of threads to use (when creating internal executor)
24+
* @param timeoutSeconds Timeout in seconds
25+
* @param task The task to execute for each item
26+
* @param executor Optional executor service to use; if null, an internal one will be created
27+
* @param <T> Type of input items
28+
* @param <R> Type of result
29+
* @return List of results from all tasks
30+
* @throws SQLException If an error occurs during execution
31+
*/
32+
public static <T, R> List<R> parallelMap(
33+
Collection<T> items,
34+
IDatabricksConnectionContext connectionContext,
35+
int maxThreads,
36+
int timeoutSeconds,
37+
Function<T, R> task,
38+
ExecutorService executor)
39+
throws SQLException {
40+
41+
if (items.isEmpty()) {
42+
return Collections.emptyList();
43+
}
44+
45+
boolean createdExecutor = false;
46+
ExecutorService executorToUse = executor;
47+
48+
// Create an executor if one wasn't provided
49+
if (executorToUse == null) {
50+
int threadCount = Math.min(items.size(), maxThreads);
51+
executorToUse = Executors.newFixedThreadPool(threadCount);
52+
createdExecutor = true;
53+
}
54+
55+
try {
56+
List<Future<R>> futures = new ArrayList<>();
57+
58+
// Submit tasks to the executor
59+
for (T item : items) {
60+
futures.add(
61+
executorToUse.submit(
62+
() -> {
63+
// Set connection context for this thread
64+
DatabricksThreadContextHolder.setConnectionContext(connectionContext);
65+
try {
66+
// Execute the task
67+
return task.apply(item);
68+
} finally {
69+
// Clear connection context
70+
DatabricksThreadContextHolder.clearConnectionContext();
71+
}
72+
}));
73+
}
74+
75+
// Collect results
76+
List<R> results = new ArrayList<>(items.size());
77+
for (Future<R> future : futures) {
78+
try {
79+
results.add(future.get(timeoutSeconds, TimeUnit.SECONDS));
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
82+
throw new DatabricksSQLException(
83+
"Parallel execution interrupted",
84+
e,
85+
DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
86+
} catch (ExecutionException e) {
87+
SQLException sqlEx = findSQLExceptionInCauseChain(e);
88+
if (sqlEx != null) {
89+
throw sqlEx;
90+
} else {
91+
throw new DatabricksSQLException(
92+
"Error in parallel execution", e, DatabricksDriverErrorCode.INVALID_STATE);
93+
}
94+
} catch (TimeoutException e) {
95+
throw new DatabricksSQLException(
96+
"Parallel execution timed out after " + timeoutSeconds + " seconds",
97+
e,
98+
DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
99+
}
100+
}
101+
102+
return results;
103+
} finally {
104+
// Only shut down the executor if we created it
105+
if (createdExecutor && executorToUse != null) {
106+
executorToUse.shutdownNow();
107+
}
108+
}
109+
}
110+
111+
/**
112+
* Executes tasks in parallel, collecting and flattening all results, utilizing a provided
113+
* executor service (which can be null, in which case a new one will be created).
114+
*
115+
* @param items The items to process
116+
* @param connectionContext The connection context to propagate to worker threads
117+
* @param maxThreads Maximum number of threads to use
118+
* @param timeoutSeconds Timeout in seconds
119+
* @param task The task to execute for each item, producing a collection of results
120+
* @param executor Optional executor service to use; if null, an internal one will be created
121+
* @param <T> Type of input items
122+
* @param <R> Type of result
123+
* @return Flattened list of all results
124+
* @throws SQLException If an error occurs during execution
125+
*/
126+
public static <T, R> List<R> parallelFlatMap(
127+
Collection<T> items,
128+
IDatabricksConnectionContext connectionContext,
129+
int maxThreads,
130+
int timeoutSeconds,
131+
Function<T, Collection<R>> task,
132+
ExecutorService executor)
133+
throws SQLException {
134+
135+
List<Collection<R>> collections =
136+
parallelMap(items, connectionContext, maxThreads, timeoutSeconds, task, executor);
137+
138+
// Flatten the results
139+
List<R> allResults = new ArrayList<>();
140+
for (Collection<R> collection : collections) {
141+
if (collection != null) {
142+
allResults.addAll(collection);
143+
}
144+
}
145+
146+
return allResults;
147+
}
148+
149+
/**
150+
* Recursively searches for a SQLException in the exception cause chain.
151+
*
152+
* @param throwable The exception to search
153+
* @return The first SQLException found in the cause chain, or null if none
154+
*/
155+
private static SQLException findSQLExceptionInCauseChain(Throwable throwable) {
156+
if (throwable == null) {
157+
return null;
158+
}
159+
160+
if (throwable instanceof SQLException) {
161+
return (SQLException) throwable;
162+
}
163+
164+
return findSQLExceptionInCauseChain(throwable.getCause());
165+
}
166+
}

src/main/java/com/databricks/jdbc/model/telemetry/enums/DatabricksDriverErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public enum DatabricksDriverErrorCode {
3636
CATALOG_OR_SCHEMA_FETCH_ERROR,
3737
INTEGRATION_TEST_ERROR,
3838
SDK_CLIENT_ERROR,
39+
OPERATION_TIMEOUT_ERROR,
3940
SSL_HANDSHAKE_ERROR,
4041
AUTH_ERROR
4142
}

0 commit comments

Comments
 (0)