Skip to content

Commit 40df7de

Browse files
committed
Merge remote-tracking branch 'databricks/main' into fetch-requests-columnar-view
2 parents e09688c + 970c4c8 commit 40df7de

48 files changed

Lines changed: 2686 additions & 69 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

NEXT_CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88
- **SQL Scripting support**: Added support for [SQL Scripting](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-scripting)
99
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
1010
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
11+
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
12+
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.
13+
- Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors.
1114

1215
### Updated
1316
- Databricks SDK dependency upgraded to latest version 0.60.0
1417

1518
### Fixed
16-
-
19+
- Integrated Azure U2M flow into driver for improved stability.
20+
- Fixed `ResultSet.getString` for Boolean columns in Metadata result set.
1721
---
1822
*Note: When making changes, please add your change under the appropriate section with a brief description.*

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ Optional parameters:
9191
- `OAuth2RedirectUrlPort` - Ports for redirect URL (default: 8020)
9292
- `EnableOIDCDiscovery` - Enable OIDC discovery (default: 1)
9393
- `OAuthDiscoveryURL` - OIDC discovery endpoint (default: /oidc/.well-known/oauth-authorization-server)
94+
- `EnableSQLValidationForIsValid` - Enable SQL query based validation in `isValid()` connection checks (default: 0)
9495

9596
### Logging
9697

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,21 @@ public SQLXML createSQLXML() throws SQLException {
414414
@Override
415415
public boolean isValid(int timeout) throws SQLException {
416416
ValidationUtil.checkIfNonNegative(timeout, "timeout");
417-
return !isClosed();
417+
if (isClosed()) {
418+
return false;
419+
}
420+
if (connectionContext.getEnableSQLValidationForIsValid()) {
421+
try (Statement stmt = createStatement()) {
422+
stmt.setQueryTimeout(timeout);
423+
// This is a lightweight query to check if the connection is valid
424+
stmt.execute("SELECT VERSION()");
425+
return true;
426+
} catch (Exception e) {
427+
LOGGER.debug("Validation failed for isValid(): {}", e.getMessage());
428+
return false;
429+
}
430+
}
431+
return true;
418432
}
419433

420434
/**

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ public String getHostUrl() throws DatabricksParsingException {
215215
}
216216
}
217217

218+
@Override
219+
public String getHost() {
220+
return this.host;
221+
}
222+
218223
@Override
219224
public IDatabricksComputeResource getComputeResource() {
220225
return computeResource;
@@ -225,6 +230,12 @@ public String getHttpPath() {
225230
return getParameter(DatabricksJdbcUrlParams.HTTP_PATH);
226231
}
227232

233+
public boolean getEnableSQLValidationForIsValid() {
234+
LOGGER.debug("String getEnableSQLValidationForIsValid()");
235+
return getParameter(DatabricksJdbcUrlParams.ENABLE_SQL_VALIDATION_FOR_IS_VALID, "0")
236+
.equals("1");
237+
}
238+
228239
@Override
229240
public String getHostForOAuth() {
230241
return this.host;
@@ -1019,4 +1030,9 @@ public int getTelemetryFlushIntervalInMilliseconds() {
10191030
return Math.max(
10201031
1000, Integer.parseInt(getParameter(DatabricksJdbcUrlParams.TELEMETRY_FLUSH_INTERVAL)));
10211032
}
1033+
1034+
@Override
1035+
public boolean isBatchedInsertsEnabled() {
1036+
return getParameter(DatabricksJdbcUrlParams.ENABLE_BATCHED_INSERTS).equals("1");
1037+
}
10221038
}

src/main/java/com/databricks/jdbc/api/impl/DatabricksParameterMetaData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import com.databricks.jdbc.common.util.WrapperUtil;
77
import com.databricks.jdbc.log.JdbcLogger;
88
import com.databricks.jdbc.log.JdbcLoggerFactory;
9-
import com.databricks.sdk.service.sql.ColumnInfoTypeName;
9+
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
1010
import java.sql.ParameterMetaData;
1111
import java.sql.SQLException;
1212
import java.util.HashMap;

src/main/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatement.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import static com.databricks.jdbc.common.util.SQLInterpolator.surroundPlaceholdersWithQuotes;
88
import static com.databricks.jdbc.common.util.ValidationUtil.throwErrorIfNull;
99

10+
import com.databricks.jdbc.common.DatabricksJdbcConstants;
1011
import com.databricks.jdbc.common.StatementType;
1112
import com.databricks.jdbc.common.util.DatabricksTypeUtil;
13+
import com.databricks.jdbc.common.util.InsertStatementParser;
1214
import com.databricks.jdbc.exception.*;
1315
import com.databricks.jdbc.log.JdbcLogger;
1416
import com.databricks.jdbc.log.JdbcLoggerFactory;
@@ -88,6 +90,138 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
8890
@Override
8991
public long[] executeLargeBatch() throws DatabricksBatchUpdateException {
9092
LOGGER.debug("public long executeLargeBatch()");
93+
94+
if (databricksBatchParameterMetaData.isEmpty()) {
95+
return new long[0];
96+
}
97+
98+
// Try to optimize INSERT statements with multi-row batching
99+
if (canUseBatchedInsert()) {
100+
return executeBatchedInsert();
101+
} else {
102+
// Fall back to individual execution for non-INSERT or incompatible statements
103+
return executeIndividualStatements();
104+
}
105+
}
106+
107+
/**
108+
* Checks if the current batch can be optimized using multi-row INSERT. All statements must be
109+
* compatible INSERT operations.
110+
*
111+
* <p>A batch is eligible for multi-row INSERT optimization when:
112+
*
113+
* <ul>
114+
* <li>The EnableBatchedInserts connection property is enabled (default: true)
115+
* <li>The SQL statement is an INSERT operation
116+
* <li>The INSERT can be parsed successfully (has table name and column list)
117+
* <li>The batch contains parameter sets for multiple rows
118+
* </ul>
119+
*
120+
* <p>Compatible INSERT operations target the same table with the same columns in the same order.
121+
* When compatible, multiple individual INSERTs like:
122+
*
123+
* <pre>
124+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [1, "Alice"]
125+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [2, "Bob"]
126+
* </pre>
127+
*
128+
* Are combined into a single multi-row INSERT:
129+
*
130+
* <pre>
131+
* INSERT INTO users (id, name) VALUES (?, ?), (?, ?) -- with parameters [1, "Alice", 2, "Bob"]
132+
* </pre>
133+
*/
134+
private boolean canUseBatchedInsert() {
135+
// Check if batched inserts are enabled via connection property
136+
if (!connection.getConnectionContext().isBatchedInsertsEnabled()) {
137+
return false;
138+
}
139+
140+
// Use strict exception-based parsing for better error handling
141+
try {
142+
InsertStatementParser.parseInsertStrict(sql);
143+
return !databricksBatchParameterMetaData.isEmpty();
144+
} catch (Exception e) {
145+
// Not a valid INSERT statement suitable for batching
146+
return false;
147+
}
148+
}
149+
150+
/** Executes the batch as a single multi-row INSERT statement. */
151+
private long[] executeBatchedInsert() throws DatabricksBatchUpdateException {
152+
LOGGER.debug("Executing batched INSERT with {} rows", databricksBatchParameterMetaData.size());
153+
154+
try {
155+
InsertStatementParser.InsertInfo insertInfo = InsertStatementParser.parseInsertStrict(sql);
156+
157+
// Calculate how many rows we can fit in one chunk based on parameter limit
158+
int parametersPerRow = insertInfo.getColumnCount();
159+
int maxRowsPerChunk = DatabricksJdbcConstants.MAX_QUERY_PARAMETERS / parametersPerRow;
160+
161+
// Ensure we have at least 1 row per chunk
162+
if (maxRowsPerChunk < 1) {
163+
maxRowsPerChunk = 1;
164+
}
165+
166+
long[] allUpdateCounts = new long[databricksBatchParameterMetaData.size()];
167+
int processedRows = 0;
168+
169+
// Process batches in chunks
170+
for (int startIndex = 0;
171+
startIndex < databricksBatchParameterMetaData.size();
172+
startIndex += maxRowsPerChunk) {
173+
int endIndex =
174+
Math.min(startIndex + maxRowsPerChunk, databricksBatchParameterMetaData.size());
175+
int chunkSize = endIndex - startIndex;
176+
177+
LOGGER.debug("Processing chunk {}-{} ({} rows)", startIndex + 1, endIndex, chunkSize);
178+
179+
// Generate multi-row SQL for this chunk
180+
String multiRowSql = InsertStatementParser.generateMultiRowInsert(insertInfo, chunkSize);
181+
182+
// Combine parameters for this chunk
183+
Map<Integer, ImmutableSqlParameter> chunkParams = new HashMap<>();
184+
int paramIndex = 1;
185+
186+
for (int i = startIndex; i < endIndex; i++) {
187+
DatabricksParameterMetaData batchParams = databricksBatchParameterMetaData.get(i);
188+
Map<Integer, ImmutableSqlParameter> rowParams = batchParams.getParameterBindings();
189+
for (int j = 1; j <= rowParams.size(); j++) {
190+
if (rowParams.containsKey(j)) {
191+
chunkParams.put(paramIndex++, rowParams.get(j));
192+
}
193+
}
194+
}
195+
196+
// Execute this chunk
197+
executeInternal(multiRowSql, chunkParams, StatementType.UPDATE, false);
198+
199+
// Set update counts for this chunk (each row typically affects 1 row)
200+
for (int i = startIndex; i < endIndex; i++) {
201+
allUpdateCounts[i] = 1;
202+
}
203+
204+
processedRows += chunkSize;
205+
}
206+
207+
LOGGER.debug("Successfully processed {} rows in chunks", processedRows);
208+
return allUpdateCounts;
209+
210+
} catch (Exception e) {
211+
LOGGER.error("Error executing batched INSERT: {}", e.getMessage(), e);
212+
long[] failedCounts = new long[databricksBatchParameterMetaData.size()];
213+
for (int i = 0; i < failedCounts.length; i++) {
214+
failedCounts[i] = Statement.EXECUTE_FAILED;
215+
}
216+
throw new DatabricksBatchUpdateException(
217+
e.getMessage(), DatabricksDriverErrorCode.BATCH_EXECUTE_EXCEPTION, failedCounts);
218+
}
219+
}
220+
221+
/** Executes batch statements individually (fallback method). */
222+
private long[] executeIndividualStatements() throws DatabricksBatchUpdateException {
223+
LOGGER.debug(
224+
"Executing batch individually with {} statements", databricksBatchParameterMetaData.size());
91225
long[] largeUpdateCount = new long[databricksBatchParameterMetaData.size()];
92226

93227
for (int sqlQueryIndex = 0;

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ public boolean isFirst() throws SQLException {
624624
public boolean isLast() throws SQLException {
625625
checkIfClosed();
626626
if (executionResult instanceof LazyThriftResult) {
627-
return !executionResult.hasNext();
627+
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
628628
}
629629
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
630630
}

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSetMetaData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import com.databricks.jdbc.log.JdbcLogger;
2121
import com.databricks.jdbc.log.JdbcLoggerFactory;
2222
import com.databricks.jdbc.model.client.thrift.generated.*;
23+
import com.databricks.jdbc.model.core.ColumnInfo;
24+
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
2325
import com.databricks.jdbc.model.core.ColumnMetadata;
2426
import com.databricks.jdbc.model.core.ResultManifest;
25-
import com.databricks.sdk.service.sql.ColumnInfo;
26-
import com.databricks.sdk.service.sql.ColumnInfoTypeName;
2727
import com.google.common.collect.ImmutableList;
2828
import java.sql.ResultSetMetaData;
2929
import java.sql.SQLException;

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,14 @@ static boolean isSelectQuery(String query) {
692692
return SELECT_PATTERN.matcher(trimmedQuery).find();
693693
}
694694

695+
static boolean isInsertQuery(String query) {
696+
if (query == null || query.trim().isEmpty()) {
697+
return false;
698+
}
699+
String trimmedQuery = trimCommentsAndWhitespaces(query);
700+
return INSERT_PATTERN.matcher(trimmedQuery).find();
701+
}
702+
695703
DatabricksResultSet executeInternal(
696704
String sql,
697705
Map<Integer, ImmutableSqlParameter> params,

src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ private void loadCurrentBatch() throws DatabricksSQLException {
218218
totalRowsFetched);
219219
}
220220

221+
/**
222+
* Fetches the next batch of data from the server and creates columnar views.
223+
*
224+
* @throws DatabricksSQLException if the fetch operation fails
225+
*/
221226
private void fetchNextBatch() throws DatabricksSQLException {
222227
try {
223228
LOGGER.debug("Fetching next batch, current total rows fetched: {}", totalRowsFetched);

0 commit comments

Comments
 (0)