Skip to content

Commit df447ec

Browse files
Implement multi-row INSERT batching for PreparedStatement (#944)
Linked issue: #867 This PR implements multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations. The implementation combines multiple single-row INSERT statements into fewer multi-row INSERT statements while respecting Databricks' 256 parameter limit. Adds a new InsertStatementParser utility for parsing INSERT statements and generating multi-row equivalents Optimizes executeBatch() and executeLargeBatch() to use multi-row INSERT when possible Includes parameter limit-aware chunking to handle large batches that exceed the 256 parameter maximum Impact illustration (10k rows, 5 columns, 50 ms RTT): • Before (single-row inserts): 10,000 statements → ~500s of RTT + server planning. • After (batched): 196 statements (10k ÷ 51) → ~9.8s of RTT. • That’s about a 50× reduction in latency, not even counting server CPU savings. --------- Signed-off-by: josecsotomorales <josecsmorales@gmail.com> Co-authored-by: Jayant Singh <jayant.singh@databricks.com>
1 parent 0e10d04 commit df447ec

11 files changed

Lines changed: 870 additions & 24 deletions

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- Added a client property `enableVolumeOperations` to enable GET/PUT/REMOVE volume operations on a stream. For backward compatibility, allowedVolumeIngestionPaths can also be used for REMOVE operation.
1010
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
1111
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
12+
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.
1213

1314
### Updated
1415
- Databricks SDK dependency upgraded to latest version 0.60.0

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,4 +1025,9 @@ public int getTelemetryFlushIntervalInMilliseconds() {
10251025
return Math.max(
10261026
1000, Integer.parseInt(getParameter(DatabricksJdbcUrlParams.TELEMETRY_FLUSH_INTERVAL)));
10271027
}
1028+
1029+
@Override
1030+
public boolean isBatchedInsertsEnabled() {
1031+
return getParameter(DatabricksJdbcUrlParams.ENABLE_BATCHED_INSERTS).equals("1");
1032+
}
10281033
}

src/main/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatement.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import static com.databricks.jdbc.common.util.SQLInterpolator.surroundPlaceholdersWithQuotes;
88
import static com.databricks.jdbc.common.util.ValidationUtil.throwErrorIfNull;
99

10+
import com.databricks.jdbc.common.DatabricksJdbcConstants;
1011
import com.databricks.jdbc.common.StatementType;
1112
import com.databricks.jdbc.common.util.DatabricksTypeUtil;
13+
import com.databricks.jdbc.common.util.InsertStatementParser;
1214
import com.databricks.jdbc.exception.*;
1315
import com.databricks.jdbc.log.JdbcLogger;
1416
import com.databricks.jdbc.log.JdbcLoggerFactory;
@@ -88,6 +90,138 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
8890
@Override
8991
public long[] executeLargeBatch() throws DatabricksBatchUpdateException {
9092
LOGGER.debug("public long executeLargeBatch()");
93+
94+
if (databricksBatchParameterMetaData.isEmpty()) {
95+
return new long[0];
96+
}
97+
98+
// Try to optimize INSERT statements with multi-row batching
99+
if (canUseBatchedInsert()) {
100+
return executeBatchedInsert();
101+
} else {
102+
// Fall back to individual execution for non-INSERT or incompatible statements
103+
return executeIndividualStatements();
104+
}
105+
}
106+
107+
/**
108+
* Checks if the current batch can be optimized using multi-row INSERT. All statements must be
109+
* compatible INSERT operations.
110+
*
111+
* <p>A batch is eligible for multi-row INSERT optimization when:
112+
*
113+
* <ul>
114+
* <li>The EnableBatchedInserts connection property is enabled (default: true)
115+
* <li>The SQL statement is an INSERT operation
116+
* <li>The INSERT can be parsed successfully (has table name and column list)
117+
* <li>The batch contains parameter sets for multiple rows
118+
* </ul>
119+
*
120+
* <p>Compatible INSERT operations target the same table with the same columns in the same order.
121+
* When compatible, multiple individual INSERTs like:
122+
*
123+
* <pre>
124+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [1, "Alice"]
125+
* INSERT INTO users (id, name) VALUES (?, ?) -- with parameters [2, "Bob"]
126+
* </pre>
127+
*
128+
* Are combined into a single multi-row INSERT:
129+
*
130+
* <pre>
131+
* INSERT INTO users (id, name) VALUES (?, ?), (?, ?) -- with parameters [1, "Alice", 2, "Bob"]
132+
* </pre>
133+
*/
134+
private boolean canUseBatchedInsert() {
135+
// Check if batched inserts are enabled via connection property
136+
if (!connection.getConnectionContext().isBatchedInsertsEnabled()) {
137+
return false;
138+
}
139+
140+
// Use strict exception-based parsing for better error handling
141+
try {
142+
InsertStatementParser.parseInsertStrict(sql);
143+
return !databricksBatchParameterMetaData.isEmpty();
144+
} catch (Exception e) {
145+
// Not a valid INSERT statement suitable for batching
146+
return false;
147+
}
148+
}
149+
150+
/** Executes the batch as a single multi-row INSERT statement. */
151+
private long[] executeBatchedInsert() throws DatabricksBatchUpdateException {
152+
LOGGER.debug("Executing batched INSERT with {} rows", databricksBatchParameterMetaData.size());
153+
154+
try {
155+
InsertStatementParser.InsertInfo insertInfo = InsertStatementParser.parseInsertStrict(sql);
156+
157+
// Calculate how many rows we can fit in one chunk based on parameter limit
158+
int parametersPerRow = insertInfo.getColumnCount();
159+
int maxRowsPerChunk = DatabricksJdbcConstants.MAX_QUERY_PARAMETERS / parametersPerRow;
160+
161+
// Ensure we have at least 1 row per chunk
162+
if (maxRowsPerChunk < 1) {
163+
maxRowsPerChunk = 1;
164+
}
165+
166+
long[] allUpdateCounts = new long[databricksBatchParameterMetaData.size()];
167+
int processedRows = 0;
168+
169+
// Process batches in chunks
170+
for (int startIndex = 0;
171+
startIndex < databricksBatchParameterMetaData.size();
172+
startIndex += maxRowsPerChunk) {
173+
int endIndex =
174+
Math.min(startIndex + maxRowsPerChunk, databricksBatchParameterMetaData.size());
175+
int chunkSize = endIndex - startIndex;
176+
177+
LOGGER.debug("Processing chunk {}-{} ({} rows)", startIndex + 1, endIndex, chunkSize);
178+
179+
// Generate multi-row SQL for this chunk
180+
String multiRowSql = InsertStatementParser.generateMultiRowInsert(insertInfo, chunkSize);
181+
182+
// Combine parameters for this chunk
183+
Map<Integer, ImmutableSqlParameter> chunkParams = new HashMap<>();
184+
int paramIndex = 1;
185+
186+
for (int i = startIndex; i < endIndex; i++) {
187+
DatabricksParameterMetaData batchParams = databricksBatchParameterMetaData.get(i);
188+
Map<Integer, ImmutableSqlParameter> rowParams = batchParams.getParameterBindings();
189+
for (int j = 1; j <= rowParams.size(); j++) {
190+
if (rowParams.containsKey(j)) {
191+
chunkParams.put(paramIndex++, rowParams.get(j));
192+
}
193+
}
194+
}
195+
196+
// Execute this chunk
197+
executeInternal(multiRowSql, chunkParams, StatementType.UPDATE, false);
198+
199+
// Set update counts for this chunk (each row typically affects 1 row)
200+
for (int i = startIndex; i < endIndex; i++) {
201+
allUpdateCounts[i] = 1;
202+
}
203+
204+
processedRows += chunkSize;
205+
}
206+
207+
LOGGER.debug("Successfully processed {} rows in chunks", processedRows);
208+
return allUpdateCounts;
209+
210+
} catch (Exception e) {
211+
LOGGER.error("Error executing batched INSERT: {}", e.getMessage(), e);
212+
long[] failedCounts = new long[databricksBatchParameterMetaData.size()];
213+
for (int i = 0; i < failedCounts.length; i++) {
214+
failedCounts[i] = Statement.EXECUTE_FAILED;
215+
}
216+
throw new DatabricksBatchUpdateException(
217+
e.getMessage(), DatabricksDriverErrorCode.BATCH_EXECUTE_EXCEPTION, failedCounts);
218+
}
219+
}
220+
221+
/** Executes batch statements individually (fallback method). */
222+
private long[] executeIndividualStatements() throws DatabricksBatchUpdateException {
223+
LOGGER.debug(
224+
"Executing batch individually with {} statements", databricksBatchParameterMetaData.size());
91225
long[] largeUpdateCount = new long[databricksBatchParameterMetaData.size()];
92226

93227
for (int sqlQueryIndex = 0;

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,14 @@ static boolean isSelectQuery(String query) {
692692
return SELECT_PATTERN.matcher(trimmedQuery).find();
693693
}
694694

695+
static boolean isInsertQuery(String query) {
696+
if (query == null || query.trim().isEmpty()) {
697+
return false;
698+
}
699+
String trimmedQuery = trimCommentsAndWhitespaces(query);
700+
return INSERT_PATTERN.matcher(trimmedQuery).find();
701+
}
702+
695703
DatabricksResultSet executeInternal(
696704
String sql,
697705
Map<Integer, ImmutableSqlParameter> params,

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,4 +353,7 @@ public interface IDatabricksConnectionContext {
353353

354354
/** Returns the HTTP connection request timeout in seconds */
355355
Integer getHttpConnectionRequestTimeout();
356+
357+
/** Returns whether batched INSERT optimization is enabled */
358+
boolean isBatchedInsertsEnabled();
356359
}

src/main/java/com/databricks/jdbc/common/DatabricksJdbcConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ public enum FakeServiceType {
166166
Pattern.compile("^(\\s*\\()*\\s*REMOVE", Pattern.CASE_INSENSITIVE);
167167
public static final Pattern LIST_PATTERN =
168168
Pattern.compile("^(\\s*\\()*\\s*LIST", Pattern.CASE_INSENSITIVE);
169+
public static final Pattern INSERT_PATTERN =
170+
Pattern.compile("^(\\s*\\()*\\s*INSERT\\s+INTO", Pattern.CASE_INSENSITIVE);
171+
172+
/** Maximum number of parameters allowed in a single Databricks query */
173+
public static final int MAX_QUERY_PARAMETERS = 256;
174+
169175
// Regex: match queries starting with "BEGIN" but not followed by "TRANSACTION"
170176
// (?i) -> case-insensitive
171177
// ^\s*BEGIN -> string starts with BEGIN (allow leading whitespace)

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public enum DatabricksJdbcUrlParams {
154154
"HttpConnectionRequestTimeout", "HTTP connection request timeout in seconds"),
155155
CLOUD_FETCH_SPEED_THRESHOLD(
156156
"CloudFetchSpeedThreshold", "Minimum expected download speed in MB/s", "0.1"),
157+
ENABLE_BATCHED_INSERTS("EnableBatchedInserts", "Enable batched INSERT optimization", "0"),
157158
ENABLE_SQL_VALIDATION_FOR_IS_VALID(
158159
"EnableSQLValidationForIsValid",
159160
"Enable SQL query execution for connection validation in isValid() method",

0 commit comments

Comments
 (0)