Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.

### Updated
- Databricks SDK dependency upgraded to latest version 0.60.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,4 +1025,9 @@ public int getTelemetryFlushIntervalInMilliseconds() {
return Math.max(
1000, Integer.parseInt(getParameter(DatabricksJdbcUrlParams.TELEMETRY_FLUSH_INTERVAL)));
}

@Override
public boolean isBatchedInsertsEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_BATCHED_INSERTS).equals("1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import static com.databricks.jdbc.common.util.SQLInterpolator.surroundPlaceholdersWithQuotes;
import static com.databricks.jdbc.common.util.ValidationUtil.throwErrorIfNull;

import com.databricks.jdbc.common.DatabricksJdbcConstants;
import com.databricks.jdbc.common.StatementType;
import com.databricks.jdbc.common.util.DatabricksTypeUtil;
import com.databricks.jdbc.common.util.InsertStatementParser;
import com.databricks.jdbc.exception.*;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
Expand Down Expand Up @@ -88,6 +90,138 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
@Override
public long[] executeLargeBatch() throws DatabricksBatchUpdateException {
LOGGER.debug("public long executeLargeBatch()");

if (databricksBatchParameterMetaData.isEmpty()) {
return new long[0];
}

// Try to optimize INSERT statements with multi-row batching
if (canUseBatchedInsert()) {
return executeBatchedInsert();
} else {
// Fall back to individual execution for non-INSERT or incompatible statements
return executeIndividualStatements();
}
}

/**
* Checks if the current batch can be optimized using multi-row INSERT. All statements must be
* compatible INSERT operations.
Comment thread
josecsotomorales marked this conversation as resolved.
*
* <p>A batch is eligible for multi-row INSERT optimization when:
*
* <ul>
* <li>The EnableBatchedInserts connection property is enabled (default: true)
* <li>The SQL statement is an INSERT operation
* <li>The INSERT can be parsed successfully (has table name and column list)
* <li>The batch contains parameter sets for multiple rows
* </ul>
*
* <p>Compatible INSERT operations target the same table with the same columns in the same order.
* When compatible, multiple individual INSERTs like:
*
* <pre>
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [1, "Alice"]
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [2, "Bob"]
* </pre>
*
* Are combined into a single multi-row INSERT:
*
* <pre>
* INSERT INTO users (id, name) VALUES (?, ?), (?, ?) -- with parameters [1, "Alice", 2, "Bob"]
* </pre>
*/
private boolean canUseBatchedInsert() {
// Check if batched inserts are enabled via connection property
if (!connection.getConnectionContext().isBatchedInsertsEnabled()) {
return false;
}

// Use strict exception-based parsing for better error handling
try {
InsertStatementParser.parseInsertStrict(sql);
return !databricksBatchParameterMetaData.isEmpty();
} catch (Exception e) {
// Not a valid INSERT statement suitable for batching
return false;
}
}

/** Executes the batch as a single multi-row INSERT statement. */
private long[] executeBatchedInsert() throws DatabricksBatchUpdateException {
LOGGER.debug("Executing batched INSERT with {} rows", databricksBatchParameterMetaData.size());

try {
InsertStatementParser.InsertInfo insertInfo = InsertStatementParser.parseInsertStrict(sql);

// Calculate how many rows we can fit in one chunk based on parameter limit
int parametersPerRow = insertInfo.getColumnCount();
int maxRowsPerChunk = DatabricksJdbcConstants.MAX_QUERY_PARAMETERS / parametersPerRow;
Comment thread
josecsotomorales marked this conversation as resolved.

// Ensure we have at least 1 row per chunk
if (maxRowsPerChunk < 1) {
maxRowsPerChunk = 1;
Comment thread
josecsotomorales marked this conversation as resolved.
}

long[] allUpdateCounts = new long[databricksBatchParameterMetaData.size()];
int processedRows = 0;

// Process batches in chunks
for (int startIndex = 0;
startIndex < databricksBatchParameterMetaData.size();
startIndex += maxRowsPerChunk) {
int endIndex =
Math.min(startIndex + maxRowsPerChunk, databricksBatchParameterMetaData.size());
int chunkSize = endIndex - startIndex;

LOGGER.debug("Processing chunk {}-{} ({} rows)", startIndex + 1, endIndex, chunkSize);

// Generate multi-row SQL for this chunk
String multiRowSql = InsertStatementParser.generateMultiRowInsert(insertInfo, chunkSize);

// Combine parameters for this chunk
Map<Integer, ImmutableSqlParameter> chunkParams = new HashMap<>();
int paramIndex = 1;

for (int i = startIndex; i < endIndex; i++) {
DatabricksParameterMetaData batchParams = databricksBatchParameterMetaData.get(i);
Map<Integer, ImmutableSqlParameter> rowParams = batchParams.getParameterBindings();
for (int j = 1; j <= rowParams.size(); j++) {
if (rowParams.containsKey(j)) {
chunkParams.put(paramIndex++, rowParams.get(j));
}
}
}

// Execute this chunk
executeInternal(multiRowSql, chunkParams, StatementType.UPDATE, false);

// Set update counts for this chunk (each row typically affects 1 row)
for (int i = startIndex; i < endIndex; i++) {
allUpdateCounts[i] = 1;
}

processedRows += chunkSize;
}

LOGGER.debug("Successfully processed {} rows in chunks", processedRows);
return allUpdateCounts;

} catch (Exception e) {
Comment thread
josecsotomorales marked this conversation as resolved.
LOGGER.error("Error executing batched INSERT: {}", e.getMessage(), e);
long[] failedCounts = new long[databricksBatchParameterMetaData.size()];
for (int i = 0; i < failedCounts.length; i++) {
failedCounts[i] = Statement.EXECUTE_FAILED;
}
throw new DatabricksBatchUpdateException(
e.getMessage(), DatabricksDriverErrorCode.BATCH_EXECUTE_EXCEPTION, failedCounts);
}
}

/** Executes batch statements individually (fallback method). */
private long[] executeIndividualStatements() throws DatabricksBatchUpdateException {
LOGGER.debug(
"Executing batch individually with {} statements", databricksBatchParameterMetaData.size());
long[] largeUpdateCount = new long[databricksBatchParameterMetaData.size()];

for (int sqlQueryIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,14 @@ static boolean isSelectQuery(String query) {
return SELECT_PATTERN.matcher(trimmedQuery).find();
}

static boolean isInsertQuery(String query) {
if (query == null || query.trim().isEmpty()) {
return false;
}
String trimmedQuery = trimCommentsAndWhitespaces(query);
return INSERT_PATTERN.matcher(trimmedQuery).find();
}

DatabricksResultSet executeInternal(
String sql,
Map<Integer, ImmutableSqlParameter> params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,7 @@ public interface IDatabricksConnectionContext {

/** Returns the HTTP connection request timeout in seconds */
Integer getHttpConnectionRequestTimeout();

/** Returns whether batched INSERT optimization is enabled */
boolean isBatchedInsertsEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public enum FakeServiceType {
Pattern.compile("^(\\s*\\()*\\s*REMOVE", Pattern.CASE_INSENSITIVE);
public static final Pattern LIST_PATTERN =
Pattern.compile("^(\\s*\\()*\\s*LIST", Pattern.CASE_INSENSITIVE);
public static final Pattern INSERT_PATTERN =
Pattern.compile("^(\\s*\\()*\\s*INSERT\\s+INTO", Pattern.CASE_INSENSITIVE);

/** Maximum number of parameters allowed in a single Databricks query */
public static final int MAX_QUERY_PARAMETERS = 256;
Comment thread
josecsotomorales marked this conversation as resolved.

// Regex: match queries starting with "BEGIN" but not followed by "TRANSACTION"
// (?i) -> case-insensitive
// ^\s*BEGIN -> string starts with BEGIN (allow leading whitespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public enum DatabricksJdbcUrlParams {
"HttpConnectionRequestTimeout", "HTTP connection request timeout in seconds"),
CLOUD_FETCH_SPEED_THRESHOLD(
"CloudFetchSpeedThreshold", "Minimum expected download speed in MB/s", "0.1"),
ENABLE_BATCHED_INSERTS("EnableBatchedInserts", "Enable batched INSERT optimization", "0"),
ENABLE_SQL_VALIDATION_FOR_IS_VALID(
"EnableSQLValidationForIsValid",
"Enable SQL query execution for connection validation in isValid() method",
Expand Down
Loading
Loading