Skip to content

Commit 970c4c8

Browse files
Fetch columnar results incrementally/lazily (#966)
## Description when JDBC is used with Arrow disabled and operating in Thrift mode (the default mode or when useThriftClient=1), the Hive ThriftServer returns results in a columnar binary format (non-Arrow). Currently, before constructing the result set, JDBC fetches all the required data at once and buffers the entire result set in memory. This PR modifies this behavior for the disabled Arrow and Thrift mode case, so that the result set maintains only a limited number of rows in memory at a time, controlled by the connection parameter RowsFetchedPerBlock. As the client reads rows, the result set will issue additional fetch requests incrementally. Implications: The JVM no longer buffers the entire result set upfront, which previously caused a sharp spike in heap memory usage before garbage collection could reclaim space. This spike often led to OutOfMemory (OOM) errors. With this change, peak heap memory usage is reduced, increasing more gradually, which allows the JVM’s memory management to work more effectively. Following improvements were observed when executing a SQL query on 1 million rows and iterating through the result set without printing the rows. Existing behaviour. Sudden spike with higher peak: <img width="719" height="272" alt="image" src="https://github.com/user-attachments/assets/1e9cf4c6-4847-4b39-a160-74175ca3f871" /> Gradual heap increase within memory management bounds with less peak: <img width="720" height="284" alt="image" src="https://github.com/user-attachments/assets/cb42c585-5af5-492b-8a12-1b34a50a4274" /> ## Testing <!-- Describe how the changes have been tested--> - e2e tests - unit tests - fake service tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> Note this does not lower the max heap usage by a lot (instead make memory increase gradual). I will make further improvements on how we process the data.
1 parent 22dd1c2 commit 970c4c8

7 files changed

Lines changed: 727 additions & 11 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- Support for fetching schemas across all catalogs (when catalog is specified as null or a wildcard) in `DatabaseMetaData#getSchemas` API in SQL Execution mode.
1111
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
1212
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.
13+
- Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors.
1314

1415
### Updated
1516
- Databricks SDK dependency upgraded to latest version 0.60.0

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,16 @@ public boolean isBeforeFirst() throws SQLException {
583583
return executionResult.getCurrentRow() == -1;
584584
}
585585

586+
/**
587+
* {@inheritDoc}
588+
*
589+
* <p><b>Limitation:</b> For lazy-loaded result sets ({@link LazyThriftResult}), particularly
590+
* those using {@link
591+
* com.databricks.jdbc.model.client.thrift.generated.TSparkRowSetType#COLUMN_BASED_SET}, this
592+
* method cannot reliably determine the cursor position. The total row count remains unknown until
593+
* all rows are fetched, preventing accurate detection of whether the cursor is after the last
594+
* row. This is specific to Databricks JDBC dialect.
595+
*/
586596
@Override
587597
public boolean isAfterLast() throws SQLException {
588598
checkIfClosed();
@@ -595,9 +605,27 @@ public boolean isFirst() throws SQLException {
595605
return executionResult.getCurrentRow() == 0;
596606
}
597607

608+
/**
609+
* {@inheritDoc}
610+
*
611+
* <p>This method uses different strategies based on the result set type:
612+
*
613+
* <ul>
614+
* <li>For {@link LazyThriftResult} instances: Checks if there are no more rows available (using
615+
* {@code hasNext()}), since the total row count is unknown until all rows are fetched.
616+
* <li>For other result types: Compares the current row position against the known total row
617+
* count.
618+
* </ul>
619+
*
620+
* @return {@code true} if the cursor is on the last row, {@code false} otherwise
621+
* @throws SQLException if the result set is closed or an error occurs
622+
*/
598623
@Override
599624
public boolean isLast() throws SQLException {
600625
checkIfClosed();
626+
if (executionResult instanceof LazyThriftResult) {
627+
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
628+
}
601629
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
602630
}
603631

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.databricks.jdbc.api.impl;
22

3-
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.convertColumnarToRowBased;
4-
53
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
64
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
75
import com.databricks.jdbc.api.internal.IDatabricksSession;
@@ -96,7 +94,7 @@ private static IExecutionResult getResultHandler(
9694
LOGGER.info("Processing result of format {} from Thrift server", resultFormat);
9795
switch (resultFormat) {
9896
case COLUMN_BASED_SET:
99-
return getResultSet(convertColumnarToRowBased(resultsResp, parentStatement, session));
97+
return new LazyThriftResult(resultsResp, parentStatement, session);
10098
case ARROW_BASED_SET:
10199
return new ArrowStreamResult(resultsResp, true, parentStatement, session);
102100
case URL_BASED_SET:
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
package com.databricks.jdbc.api.impl;
2+
3+
import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT;
4+
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar;
5+
6+
import com.databricks.jdbc.api.internal.IDatabricksSession;
7+
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
8+
import com.databricks.jdbc.exception.DatabricksSQLException;
9+
import com.databricks.jdbc.log.JdbcLogger;
10+
import com.databricks.jdbc.log.JdbcLoggerFactory;
11+
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
12+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
13+
import java.util.List;
14+
15+
public class LazyThriftResult implements IExecutionResult {
16+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class);
17+
18+
private TFetchResultsResp currentResponse;
19+
private List<List<Object>> currentBatch;
20+
private int currentBatchIndex;
21+
private long globalRowIndex;
22+
private final IDatabricksSession session;
23+
private final IDatabricksStatementInternal statement;
24+
private final int maxRows;
25+
private boolean hasReachedEnd;
26+
private boolean isClosed;
27+
private long totalRowsFetched;
28+
29+
/**
30+
* Creates a new LazyThriftResult that lazily fetches data on demand.
31+
*
32+
* @param initialResponse the initial response from the server
33+
* @param statement the statement that generated this result
34+
* @param session the session to use for fetching additional data
35+
* @throws DatabricksSQLException if the initial response cannot be processed
36+
*/
37+
public LazyThriftResult(
38+
TFetchResultsResp initialResponse,
39+
IDatabricksStatementInternal statement,
40+
IDatabricksSession session)
41+
throws DatabricksSQLException {
42+
this.currentResponse = initialResponse;
43+
this.statement = statement;
44+
this.session = session;
45+
this.maxRows = statement != null ? statement.getMaxRows() : DEFAULT_RESULT_ROW_LIMIT;
46+
this.globalRowIndex = -1;
47+
this.currentBatchIndex = -1;
48+
this.hasReachedEnd = false;
49+
this.isClosed = false;
50+
this.totalRowsFetched = 0;
51+
52+
// Load initial batch
53+
loadCurrentBatch();
54+
LOGGER.debug(
55+
"LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}",
56+
currentBatch.size(),
57+
currentResponse.hasMoreRows);
58+
}
59+
60+
/**
61+
* Gets the value at the specified column index for the current row.
62+
*
63+
* @param columnIndex the zero-based column index
64+
* @return the value at the specified column
65+
* @throws DatabricksSQLException if the result is closed, cursor is invalid, or column index is
66+
* out of bounds
67+
*/
68+
@Override
69+
public Object getObject(int columnIndex) throws DatabricksSQLException {
70+
if (isClosed) {
71+
throw new DatabricksSQLException(
72+
"Result is already closed", DatabricksDriverErrorCode.STATEMENT_CLOSED);
73+
}
74+
if (globalRowIndex == -1) {
75+
throw new DatabricksSQLException(
76+
"Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE);
77+
}
78+
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) {
79+
throw new DatabricksSQLException(
80+
"Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE);
81+
}
82+
List<Object> currentRowData = currentBatch.get(currentBatchIndex);
83+
if (columnIndex < 0 || columnIndex >= currentRowData.size()) {
84+
throw new DatabricksSQLException(
85+
"Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
86+
}
87+
return currentRowData.get(columnIndex);
88+
}
89+
90+
/**
91+
* Gets the current row index (0-based). Returns -1 if before the first row.
92+
*
93+
* @return the current row index
94+
*/
95+
@Override
96+
public long getCurrentRow() {
97+
return globalRowIndex;
98+
}
99+
100+
/**
101+
* Moves the cursor to the next row. Fetches additional data from server if needed.
102+
*
103+
* @return true if there is a next row, false if at the end
104+
* @throws DatabricksSQLException if an error occurs while fetching data
105+
*/
106+
@Override
107+
public boolean next() throws DatabricksSQLException {
108+
if (isClosed || hasReachedEnd) {
109+
return false;
110+
}
111+
112+
if (!hasNext()) {
113+
// Ideally the client code should first call, hasNext() and then next()
114+
// However, the client code like in DatabricksResultSet#next directly calls next
115+
// So, this is a safeguard to ensure we don't move past the end
116+
return false;
117+
}
118+
119+
// Check if we've reached the maxRows limit
120+
boolean hasRowLimit = maxRows > 0;
121+
if (hasRowLimit && globalRowIndex + 1 >= maxRows) {
122+
hasReachedEnd = true;
123+
return false;
124+
}
125+
126+
// Move to next row in current batch
127+
currentBatchIndex++;
128+
globalRowIndex++;
129+
130+
// Check if we need to fetch the next batch
131+
if (currentBatchIndex >= currentBatch.size()) {
132+
// Keep fetching until we get a non-empty batch or no more rows
133+
while (currentResponse.hasMoreRows) {
134+
fetchNextBatch();
135+
136+
// If we got a non-empty batch, we can proceed
137+
if (!currentBatch.isEmpty()) {
138+
currentBatchIndex = 0; // Reset to first row of new batch
139+
break;
140+
}
141+
142+
// If batch is still empty but hasMoreRows is false after fetch, we'll exit the loop
143+
}
144+
145+
// If we exited the loop and still have an empty batch, we've reached the end
146+
if (currentBatch.isEmpty()) {
147+
hasReachedEnd = true;
148+
globalRowIndex--; // Revert the increment since we didn't actually move to a new row
149+
return false;
150+
}
151+
}
152+
153+
return true;
154+
}
155+
156+
/**
157+
* Checks if there are more rows available without advancing the cursor.
158+
*
159+
* @return true if there are more rows, false otherwise
160+
*/
161+
@Override
162+
public boolean hasNext() {
163+
if (isClosed || hasReachedEnd) {
164+
return false;
165+
}
166+
167+
// Check maxRows limit
168+
boolean hasRowLimit = maxRows > 0;
169+
if (hasRowLimit && globalRowIndex + 1 >= maxRows) {
170+
return false;
171+
}
172+
173+
// Check if there are more rows in current batch
174+
if (currentBatchIndex + 1 < currentBatch.size()) {
175+
return true;
176+
}
177+
178+
// Check if there are more batches to fetch
179+
return currentResponse.hasMoreRows;
180+
}
181+
182+
/** Closes this result and releases associated resources. */
183+
@Override
184+
public void close() {
185+
this.isClosed = true;
186+
this.currentBatch = null;
187+
this.currentResponse = null;
188+
LOGGER.debug("LazyThriftResult closed after fetching {} total rows", totalRowsFetched);
189+
}
190+
191+
/**
192+
* Gets the number of rows in the current batch.
193+
*
194+
* @return the number of rows in the current batch
195+
*/
196+
@Override
197+
public long getRowCount() {
198+
// Return the number of rows in the current batch
199+
return currentBatch != null ? currentBatch.size() : 0;
200+
}
201+
202+
/**
203+
* Gets the chunk count. Always returns 0 for thrift columnar results.
204+
*
205+
* @return 0 (thrift results don't use chunks like Arrow)
206+
*/
207+
@Override
208+
public long getChunkCount() {
209+
// For thrift columnar results, we don't have chunks in the same sense as Arrow
210+
return 0;
211+
}
212+
213+
/**
214+
* Loads the current response data into memory as a batch of rows.
215+
*
216+
* @throws DatabricksSQLException if the response data cannot be processed
217+
*/
218+
private void loadCurrentBatch() throws DatabricksSQLException {
219+
currentBatch = extractRowsFromColumnar(currentResponse.getResults());
220+
currentBatchIndex = -1; // Reset batch index
221+
totalRowsFetched += currentBatch.size();
222+
LOGGER.debug(
223+
"Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched);
224+
}
225+
226+
/**
227+
* Fetches the next batch of data from the server and loads it into memory.
228+
*
229+
* @throws DatabricksSQLException if the fetch operation fails
230+
*/
231+
private void fetchNextBatch() throws DatabricksSQLException {
232+
try {
233+
LOGGER.debug("Fetching next batch, current total rows fetched: {}", totalRowsFetched);
234+
currentResponse = session.getDatabricksClient().getMoreResults(statement);
235+
loadCurrentBatch();
236+
237+
LOGGER.debug(
238+
"Fetched batch with {} rows, hasMoreRows: {}",
239+
currentBatch.size(),
240+
currentResponse.hasMoreRows);
241+
} catch (DatabricksSQLException e) {
242+
LOGGER.error("Failed to fetch next batch: {}", e.getMessage());
243+
hasReachedEnd = true;
244+
throw e; // Propagate exception to fail fast
245+
}
246+
}
247+
248+
/**
249+
* Gets the total number of rows fetched from the server so far. This is different from
250+
* getRowCount() which returns current batch size.
251+
*
252+
* @return the total number of rows fetched from the server
253+
*/
254+
public long getTotalRowsFetched() {
255+
return totalRowsFetched;
256+
}
257+
258+
/**
259+
* Checks if all data has been fetched from the server.
260+
*
261+
* @return true if all data has been fetched (either reached end or maxRows limit)
262+
*/
263+
public boolean isCompletelyFetched() {
264+
return hasReachedEnd || !currentResponse.hasMoreRows;
265+
}
266+
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ DatabricksResultSet execute(
253253
getResultSetResp(
254254
response.getStatus(),
255255
response.getOperationHandle(),
256-
response.toString(),
256+
"executeStatement",
257257
maxRowsPerBlock,
258258
true);
259259
long fetchEndTime = System.nanoTime();
@@ -294,7 +294,9 @@ private TGetOperationStatusResp pollTillOperationFinished(
294294
TGetOperationStatusResp statusResp = null;
295295
if (response.isSetDirectResults()) {
296296
checkDirectResultsForErrorStatus(
297-
response.getDirectResults(), response.toString(), statementId.toSQLExecStatementId());
297+
response.getDirectResults(),
298+
"executeStatement DirectResults",
299+
statementId.toSQLExecStatementId());
298300
statusResp = response.getDirectResults().getOperationStatus();
299301
checkOperationStatusForErrors(
300302
statusResp, StatementId.loggableStatementId(response.getOperationHandle()));
@@ -409,7 +411,7 @@ DatabricksResultSet getStatementResult(
409411
if (operationState == TOperationState.FINISHED_STATE) {
410412
long fetchStartTime = System.nanoTime();
411413
resultSet =
412-
getResultSetResp(response.getStatus(), operationHandle, response.toString(), -1, true);
414+
getResultSetResp(response.getStatus(), operationHandle, "getStatementResult", -1, true);
413415
long fetchEndTime = System.nanoTime();
414416
long fetchLatencyNanos = fetchEndTime - fetchStartTime;
415417
long fetchLatencyMillis = fetchLatencyNanos / 1_000_000;
@@ -523,16 +525,16 @@ TFetchResultsResp getResultSetResp(
523525
} catch (TException e) {
524526
String errorMessage =
525527
String.format(
526-
"Error while fetching results from Thrift server. Request {%s}, Error {%s}",
527-
request.toString(), e.getMessage());
528+
"Error while fetching results from Thrift server. Request maxRows=%d, maxBytes=%d, Error {%s}",
529+
request.getMaxRows(), request.getMaxBytes(), e.getMessage());
528530
LOGGER.error(e, errorMessage);
529531
throw new DatabricksHttpException(errorMessage, e, DatabricksDriverErrorCode.INVALID_STATE);
530532
}
531533
verifySuccessStatus(
532534
response.getStatus(),
533535
String.format(
534-
"Error while fetching results Request {%s}. TFetchResultsResp {%s}. ",
535-
request, response),
536+
"Error while fetching results Request maxRows=%d, maxBytes=%d. Response hasMoreRows=%s",
537+
request.getMaxRows(), request.getMaxBytes(), response.hasMoreRows),
536538
statementId);
537539
return response;
538540
}

src/test/java/com/databricks/jdbc/api/impl/ExecutionResultFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testGetResultSet_thriftColumnar() throws SQLException {
101101
when(fetchResultsResp.getResultSetMetadata()).thenReturn(resultSetMetadataResp);
102102
IExecutionResult result =
103103
ExecutionResultFactory.getResultSet(fetchResultsResp, session, parentStatement);
104-
assertInstanceOf(InlineJsonResult.class, result);
104+
assertInstanceOf(LazyThriftResult.class, result);
105105
}
106106

107107
@Test

0 commit comments

Comments
 (0)