Skip to content

Commit 87e2ea9

Browse files
committed
Merge branch 'main' into PECOBLR-838
2 parents 52eda8a + be6bd04 commit 87e2ea9

47 files changed

Lines changed: 3300 additions & 239 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/runJdbcComparator.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ jobs:
2626
- name: Merge main into jdbc-comparator
2727
run: |
2828
git checkout jdbc-comparator
29-
git merge main --no-edit || {
30-
echo "Merge conflict occurred"
31-
git merge --abort
32-
exit 1
29+
git merge main --allow-unrelated-histories --no-edit -X theirs || {
30+
echo "Force merging by accepting all changes from main"
31+
git checkout --theirs .
32+
git add .
33+
git commit --no-edit
3334
}
3435
3536
- name: Set up JDK 11

NEXT_CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
1010
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
1111
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
12+
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.
13+
- Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors.
1214

1315
### Updated
1416
- Databricks SDK dependency upgraded to latest version 0.60.0
1517

1618
### Fixed
19+
- Integrated Azure U2M flow into driver for improved stability.
1720
- Fixed `ResultSet.getString` for Boolean columns in Metadata result set.
21+
- Fixed volume operations not completing unless the ResultSet is fully iterated.
1822
- Fixed `connection.getMetadata().getColumns()` to return the correct SQL data type code for complex type columns.
1923
---
2024
*Note: When making changes, please add your change under the appropriate section with a brief description.*

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ public String getHostUrl() throws DatabricksParsingException {
215215
}
216216
}
217217

218+
@Override
219+
public String getHost() {
220+
return this.host;
221+
}
222+
218223
@Override
219224
public IDatabricksComputeResource getComputeResource() {
220225
return computeResource;
@@ -1025,4 +1030,9 @@ public int getTelemetryFlushIntervalInMilliseconds() {
10251030
return Math.max(
10261031
1000, Integer.parseInt(getParameter(DatabricksJdbcUrlParams.TELEMETRY_FLUSH_INTERVAL)));
10271032
}
1033+
1034+
@Override
1035+
public boolean isBatchedInsertsEnabled() {
1036+
return getParameter(DatabricksJdbcUrlParams.ENABLE_BATCHED_INSERTS).equals("1");
1037+
}
10281038
}

src/main/java/com/databricks/jdbc/api/impl/DatabricksParameterMetaData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import com.databricks.jdbc.common.util.WrapperUtil;
77
import com.databricks.jdbc.log.JdbcLogger;
88
import com.databricks.jdbc.log.JdbcLoggerFactory;
9-
import com.databricks.sdk.service.sql.ColumnInfoTypeName;
9+
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
1010
import java.sql.ParameterMetaData;
1111
import java.sql.SQLException;
1212
import java.util.HashMap;

src/main/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatement.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import static com.databricks.jdbc.common.util.SQLInterpolator.surroundPlaceholdersWithQuotes;
88
import static com.databricks.jdbc.common.util.ValidationUtil.throwErrorIfNull;
99

10+
import com.databricks.jdbc.common.DatabricksJdbcConstants;
1011
import com.databricks.jdbc.common.StatementType;
1112
import com.databricks.jdbc.common.util.DatabricksTypeUtil;
13+
import com.databricks.jdbc.common.util.InsertStatementParser;
1214
import com.databricks.jdbc.exception.*;
1315
import com.databricks.jdbc.log.JdbcLogger;
1416
import com.databricks.jdbc.log.JdbcLoggerFactory;
@@ -88,6 +90,138 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
8890
@Override
8991
public long[] executeLargeBatch() throws DatabricksBatchUpdateException {
9092
LOGGER.debug("public long executeLargeBatch()");
93+
94+
if (databricksBatchParameterMetaData.isEmpty()) {
95+
return new long[0];
96+
}
97+
98+
// Try to optimize INSERT statements with multi-row batching
99+
if (canUseBatchedInsert()) {
100+
return executeBatchedInsert();
101+
} else {
102+
// Fall back to individual execution for non-INSERT or incompatible statements
103+
return executeIndividualStatements();
104+
}
105+
}
106+
107+
/**
108+
* Checks if the current batch can be optimized using multi-row INSERT. All statements must be
109+
* compatible INSERT operations.
110+
*
111+
* <p>A batch is eligible for multi-row INSERT optimization when:
112+
*
113+
* <ul>
114+
* <li>The EnableBatchedInserts connection property is enabled (default: true)
115+
* <li>The SQL statement is an INSERT operation
116+
* <li>The INSERT can be parsed successfully (has table name and column list)
117+
* <li>The batch contains parameter sets for multiple rows
118+
* </ul>
119+
*
120+
* <p>Compatible INSERT operations target the same table with the same columns in the same order.
121+
* When compatible, multiple individual INSERTs like:
122+
*
123+
* <pre>
124+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [1, "Alice"]
125+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [2, "Bob"]
126+
* </pre>
127+
*
128+
* Are combined into a single multi-row INSERT:
129+
*
130+
* <pre>
131+
* INSERT INTO users (id, name) VALUES (?, ?), (?, ?) -- with parameters [1, "Alice", 2, "Bob"]
132+
* </pre>
133+
*/
134+
private boolean canUseBatchedInsert() {
135+
// Check if batched inserts are enabled via connection property
136+
if (!connection.getConnectionContext().isBatchedInsertsEnabled()) {
137+
return false;
138+
}
139+
140+
// Use strict exception-based parsing for better error handling
141+
try {
142+
InsertStatementParser.parseInsertStrict(sql);
143+
return !databricksBatchParameterMetaData.isEmpty();
144+
} catch (Exception e) {
145+
// Not a valid INSERT statement suitable for batching
146+
return false;
147+
}
148+
}
149+
150+
/** Executes the batch as a single multi-row INSERT statement. */
151+
private long[] executeBatchedInsert() throws DatabricksBatchUpdateException {
152+
LOGGER.debug("Executing batched INSERT with {} rows", databricksBatchParameterMetaData.size());
153+
154+
try {
155+
InsertStatementParser.InsertInfo insertInfo = InsertStatementParser.parseInsertStrict(sql);
156+
157+
// Calculate how many rows we can fit in one chunk based on parameter limit
158+
int parametersPerRow = insertInfo.getColumnCount();
159+
int maxRowsPerChunk = DatabricksJdbcConstants.MAX_QUERY_PARAMETERS / parametersPerRow;
160+
161+
// Ensure we have at least 1 row per chunk
162+
if (maxRowsPerChunk < 1) {
163+
maxRowsPerChunk = 1;
164+
}
165+
166+
long[] allUpdateCounts = new long[databricksBatchParameterMetaData.size()];
167+
int processedRows = 0;
168+
169+
// Process batches in chunks
170+
for (int startIndex = 0;
171+
startIndex < databricksBatchParameterMetaData.size();
172+
startIndex += maxRowsPerChunk) {
173+
int endIndex =
174+
Math.min(startIndex + maxRowsPerChunk, databricksBatchParameterMetaData.size());
175+
int chunkSize = endIndex - startIndex;
176+
177+
LOGGER.debug("Processing chunk {}-{} ({} rows)", startIndex + 1, endIndex, chunkSize);
178+
179+
// Generate multi-row SQL for this chunk
180+
String multiRowSql = InsertStatementParser.generateMultiRowInsert(insertInfo, chunkSize);
181+
182+
// Combine parameters for this chunk
183+
Map<Integer, ImmutableSqlParameter> chunkParams = new HashMap<>();
184+
int paramIndex = 1;
185+
186+
for (int i = startIndex; i < endIndex; i++) {
187+
DatabricksParameterMetaData batchParams = databricksBatchParameterMetaData.get(i);
188+
Map<Integer, ImmutableSqlParameter> rowParams = batchParams.getParameterBindings();
189+
for (int j = 1; j <= rowParams.size(); j++) {
190+
if (rowParams.containsKey(j)) {
191+
chunkParams.put(paramIndex++, rowParams.get(j));
192+
}
193+
}
194+
}
195+
196+
// Execute this chunk
197+
executeInternal(multiRowSql, chunkParams, StatementType.UPDATE, false);
198+
199+
// Set update counts for this chunk (each row typically affects 1 row)
200+
for (int i = startIndex; i < endIndex; i++) {
201+
allUpdateCounts[i] = 1;
202+
}
203+
204+
processedRows += chunkSize;
205+
}
206+
207+
LOGGER.debug("Successfully processed {} rows in chunks", processedRows);
208+
return allUpdateCounts;
209+
210+
} catch (Exception e) {
211+
LOGGER.error("Error executing batched INSERT: {}", e.getMessage(), e);
212+
long[] failedCounts = new long[databricksBatchParameterMetaData.size()];
213+
for (int i = 0; i < failedCounts.length; i++) {
214+
failedCounts[i] = Statement.EXECUTE_FAILED;
215+
}
216+
throw new DatabricksBatchUpdateException(
217+
e.getMessage(), DatabricksDriverErrorCode.BATCH_EXECUTE_EXCEPTION, failedCounts);
218+
}
219+
}
220+
221+
/** Executes batch statements individually (fallback method). */
222+
private long[] executeIndividualStatements() throws DatabricksBatchUpdateException {
223+
LOGGER.debug(
224+
"Executing batch individually with {} statements", databricksBatchParameterMetaData.size());
91225
long[] largeUpdateCount = new long[databricksBatchParameterMetaData.size()];
92226

93227
for (int sqlQueryIndex = 0;

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,16 @@ public boolean isBeforeFirst() throws SQLException {
583583
return executionResult.getCurrentRow() == -1;
584584
}
585585

586+
/**
587+
* {@inheritDoc}
588+
*
589+
* <p><b>Limitation:</b> For lazy-loaded result sets ({@link LazyThriftResult}), particularly
590+
* those using {@link
591+
* com.databricks.jdbc.model.client.thrift.generated.TSparkRowSetType#COLUMN_BASED_SET}, this
592+
* method cannot reliably determine the cursor position. The total row count remains unknown until
593+
* all rows are fetched, preventing accurate detection of whether the cursor is after the last
594+
* row. This is specific to Databricks JDBC dialect.
595+
*/
586596
@Override
587597
public boolean isAfterLast() throws SQLException {
588598
checkIfClosed();
@@ -595,9 +605,27 @@ public boolean isFirst() throws SQLException {
595605
return executionResult.getCurrentRow() == 0;
596606
}
597607

608+
/**
609+
* {@inheritDoc}
610+
*
611+
* <p>This method uses different strategies based on the result set type:
612+
*
613+
* <ul>
614+
* <li>For {@link LazyThriftResult} instances: Checks if there are no more rows available (using
615+
* {@code hasNext()}), since the total row count is unknown until all rows are fetched.
616+
* <li>For other result types: Compares the current row position against the known total row
617+
* count.
618+
* </ul>
619+
*
620+
* @return {@code true} if the cursor is on the last row, {@code false} otherwise
621+
* @throws SQLException if the result set is closed or an error occurs
622+
*/
598623
@Override
599624
public boolean isLast() throws SQLException {
600625
checkIfClosed();
626+
if (executionResult instanceof LazyThriftResult) {
627+
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
628+
}
601629
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
602630
}
603631

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSetMetaData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import com.databricks.jdbc.log.JdbcLogger;
2121
import com.databricks.jdbc.log.JdbcLoggerFactory;
2222
import com.databricks.jdbc.model.client.thrift.generated.*;
23+
import com.databricks.jdbc.model.core.ColumnInfo;
24+
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
2325
import com.databricks.jdbc.model.core.ColumnMetadata;
2426
import com.databricks.jdbc.model.core.ResultManifest;
25-
import com.databricks.sdk.service.sql.ColumnInfo;
26-
import com.databricks.sdk.service.sql.ColumnInfoTypeName;
2727
import com.google.common.collect.ImmutableList;
2828
import java.sql.ResultSetMetaData;
2929
import java.sql.SQLException;

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,14 @@ static boolean isSelectQuery(String query) {
692692
return SELECT_PATTERN.matcher(trimmedQuery).find();
693693
}
694694

695+
static boolean isInsertQuery(String query) {
696+
if (query == null || query.trim().isEmpty()) {
697+
return false;
698+
}
699+
String trimmedQuery = trimCommentsAndWhitespaces(query);
700+
return INSERT_PATTERN.matcher(trimmedQuery).find();
701+
}
702+
695703
DatabricksResultSet executeInternal(
696704
String sql,
697705
Map<Integer, ImmutableSqlParameter> params,

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.databricks.jdbc.api.impl;
22

3-
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.convertColumnarToRowBased;
4-
53
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
64
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
75
import com.databricks.jdbc.api.internal.IDatabricksSession;
@@ -96,7 +94,7 @@ private static IExecutionResult getResultHandler(
9694
LOGGER.info("Processing result of format {} from Thrift server", resultFormat);
9795
switch (resultFormat) {
9896
case COLUMN_BASED_SET:
99-
return getResultSet(convertColumnarToRowBased(resultsResp, parentStatement, session));
97+
return new LazyThriftResult(resultsResp, parentStatement, session);
10098
case ARROW_BASED_SET:
10199
return new ArrowStreamResult(resultsResp, true, parentStatement, session);
102100
case URL_BASED_SET:

0 commit comments

Comments
 (0)