Skip to content

Commit 43ac32b

Browse files
Add columnar data access for memory-efficient row processing (#975)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> This PR contains changes from the PR #966 as well. Introduce ColumnarRowView to provide direct access to columnar data without materialising entire result sets into row objects. This reduces memory allocations by allowing individual cell access via `getValue(row, col)` instead of creating `List<List<Object>>` structures. Key changes: - New ColumnarRowView class with direct columnar access methods. - Updated LazyThriftResult to use columnar view instead of materialized rows. - Added utility method in DatabricksThriftUtil for creating columnar views. - Comprehensive test coverage for all column types and null handling. This optimization maintains API compatibility while significantly reducing memory overhead for large result sets. Following the changes introduced in PR #966, the following improvements were observed during a test that executes a SQL query retrieving 5 million rows: Current heap usage over time: <img width="720" height="429" alt="image" src="https://github.com/user-attachments/assets/ac87eae3-63aa-4529-9995-6392af6d4a3b" /> Improved heap usage over time: <img width="720" height="288" alt="image" src="https://github.com/user-attachments/assets/befff1f2-a54f-4609-aa81-a6311a329312" /> - The first image shows multiple significant CPU usage spikes reaching around 15%, while the second image shows consistently flat CPU usage at approximately 3%. - The erratic CPU behavior in the "before" state has been completely smoothed out. - CPU usage is now consistent and predictable rather than volatile. - Memory usage dropped from a peak of 8440 MB down to just 745 MB - that's roughly an 91% decrease. - The large memory spike and sustained high usage in the first image has been completely resolved. - Memory usage is now consistently low and stable throughout the monitoring period. - There is no negative effect on execution time; in fact, it appears to improve, likely due to more active GC pauses in the previous state. ## Testing <!-- Describe how the changes have been tested--> - Unit tests - e2e tests - FakeService tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent 45115e6 commit 43ac32b

5 files changed

Lines changed: 362 additions & 24 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
- **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0.
1212
- Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations.
1313
- Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors.
14-
- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata
14+
- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata.
15+
- Introduce a memory-efficient columnar data access mechanism for JDBC result processing.
1516

1617
### Updated
1718
- Databricks SDK dependency upgraded to latest version 0.60.0
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package com.databricks.jdbc.api.impl;
2+
3+
import com.databricks.jdbc.exception.DatabricksSQLException;
4+
import com.databricks.jdbc.model.client.thrift.generated.TColumn;
5+
import com.databricks.jdbc.model.client.thrift.generated.TRowSet;
6+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
7+
import java.util.BitSet;
8+
import java.util.List;
9+
10+
/**
11+
* Memory-efficient columnar view that provides row-based access without materializing all rows.
12+
* Instead of creating List<List<Object>>, this class provides direct access to columnar data on a
13+
* per-row, per-column basis, significantly reducing memory allocations.
14+
*/
15+
public class ColumnarRowView {
16+
private final List<TColumn> columns;
17+
private final int rowCount;
18+
private final ColumnAccessor[] columnAccessors;
19+
20+
public ColumnarRowView(TRowSet rowSet) throws DatabricksSQLException {
21+
this.columns = rowSet != null ? rowSet.getColumns() : null;
22+
23+
if (columns == null || columns.isEmpty()) {
24+
this.rowCount = 0;
25+
this.columnAccessors = new ColumnAccessor[0];
26+
} else {
27+
this.rowCount = getRowCountFromFirstColumn();
28+
this.columnAccessors = new ColumnAccessor[columns.size()];
29+
for (int i = 0; i < columns.size(); i++) {
30+
this.columnAccessors[i] = createColumnAccessor(columns.get(i));
31+
}
32+
}
33+
}
34+
35+
/** Gets the number of rows in this view. */
36+
public int getRowCount() {
37+
return rowCount;
38+
}
39+
40+
/** Gets the number of columns in this view. */
41+
public int getColumnCount() {
42+
return columns != null ? columns.size() : 0;
43+
}
44+
45+
/** Gets the value at the specified row and column without materializing the entire row. */
46+
public Object getValue(int rowIndex, int columnIndex) throws DatabricksSQLException {
47+
if (rowIndex < 0 || rowIndex >= rowCount) {
48+
throw new DatabricksSQLException(
49+
"Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE);
50+
}
51+
if (columnIndex < 0 || columnIndex >= columnAccessors.length) {
52+
throw new DatabricksSQLException(
53+
"Column index out of bounds: " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
54+
}
55+
56+
return columnAccessors[columnIndex].getValue(rowIndex);
57+
}
58+
59+
/**
60+
* Creates a materialized row only when explicitly requested (for backward compatibility). This
61+
* should be avoided in performance-critical paths.
62+
*/
63+
public Object[] materializeRow(int rowIndex) throws DatabricksSQLException {
64+
if (rowIndex < 0 || rowIndex >= rowCount) {
65+
throw new DatabricksSQLException(
66+
"Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE);
67+
}
68+
69+
Object[] row = new Object[columnAccessors.length];
70+
for (int col = 0; col < columnAccessors.length; col++) {
71+
row[col] = columnAccessors[col].getValue(rowIndex);
72+
}
73+
return row;
74+
}
75+
76+
private int getRowCountFromFirstColumn() throws DatabricksSQLException {
77+
if (columns.isEmpty()) {
78+
return 0;
79+
}
80+
TColumn firstColumn = columns.get(0);
81+
return getColumnSize(firstColumn);
82+
}
83+
84+
private static int getColumnSize(TColumn column) throws DatabricksSQLException {
85+
if (column.isSetBinaryVal()) return column.getBinaryVal().getValuesSize();
86+
if (column.isSetBoolVal()) return column.getBoolVal().getValuesSize();
87+
if (column.isSetByteVal()) return column.getByteVal().getValuesSize();
88+
if (column.isSetDoubleVal()) return column.getDoubleVal().getValuesSize();
89+
if (column.isSetI16Val()) return column.getI16Val().getValuesSize();
90+
if (column.isSetI32Val()) return column.getI32Val().getValuesSize();
91+
if (column.isSetI64Val()) return column.getI64Val().getValuesSize();
92+
if (column.isSetStringVal()) return column.getStringVal().getValuesSize();
93+
94+
throw new DatabricksSQLException(
95+
"Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION);
96+
}
97+
98+
private static ColumnAccessor createColumnAccessor(TColumn column) throws DatabricksSQLException {
99+
if (column.isSetBinaryVal()) {
100+
return new TypedColumnAccessor<>(
101+
column.getBinaryVal().getValues(), column.getBinaryVal().getNulls());
102+
}
103+
if (column.isSetBoolVal()) {
104+
return new TypedColumnAccessor<>(
105+
column.getBoolVal().getValues(), column.getBoolVal().getNulls());
106+
}
107+
if (column.isSetByteVal()) {
108+
return new TypedColumnAccessor<>(
109+
column.getByteVal().getValues(), column.getByteVal().getNulls());
110+
}
111+
if (column.isSetDoubleVal()) {
112+
return new TypedColumnAccessor<>(
113+
column.getDoubleVal().getValues(), column.getDoubleVal().getNulls());
114+
}
115+
if (column.isSetI16Val()) {
116+
return new TypedColumnAccessor<>(
117+
column.getI16Val().getValues(), column.getI16Val().getNulls());
118+
}
119+
if (column.isSetI32Val()) {
120+
return new TypedColumnAccessor<>(
121+
column.getI32Val().getValues(), column.getI32Val().getNulls());
122+
}
123+
if (column.isSetI64Val()) {
124+
return new TypedColumnAccessor<>(
125+
column.getI64Val().getValues(), column.getI64Val().getNulls());
126+
}
127+
if (column.isSetStringVal()) {
128+
return new TypedColumnAccessor<>(
129+
column.getStringVal().getValues(), column.getStringVal().getNulls());
130+
}
131+
132+
throw new DatabricksSQLException(
133+
"Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION);
134+
}
135+
136+
/** Interface for accessing column values by index without materializing the entire column. */
137+
private interface ColumnAccessor {
138+
Object getValue(int rowIndex);
139+
}
140+
141+
/** Memory-efficient column accessor that handles nulls and provides direct index-based access. */
142+
private static class TypedColumnAccessor<T> implements ColumnAccessor {
143+
private final List<T> values;
144+
private final BitSet nullBits;
145+
146+
public TypedColumnAccessor(List<T> values, byte[] nulls) {
147+
this.values = values;
148+
this.nullBits = nulls != null ? BitSet.valueOf(nulls) : null;
149+
}
150+
151+
@Override
152+
public Object getValue(int rowIndex) {
153+
if (nullBits != null && nullBits.get(rowIndex)) {
154+
return null;
155+
}
156+
return values.get(rowIndex);
157+
}
158+
}
159+
}

src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

33
import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT;
4-
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar;
4+
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createColumnarView;
55

66
import com.databricks.jdbc.api.internal.IDatabricksSession;
77
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
@@ -10,13 +10,12 @@
1010
import com.databricks.jdbc.log.JdbcLoggerFactory;
1111
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
1212
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
13-
import java.util.List;
1413

1514
public class LazyThriftResult implements IExecutionResult {
1615
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class);
1716

1817
private TFetchResultsResp currentResponse;
19-
private List<List<Object>> currentBatch;
18+
private ColumnarRowView currentBatch;
2019
private int currentBatchIndex;
2120
private long globalRowIndex;
2221
private final IDatabricksSession session;
@@ -53,7 +52,7 @@ public LazyThriftResult(
5352
loadCurrentBatch();
5453
LOGGER.debug(
5554
"LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}",
56-
currentBatch.size(),
55+
currentBatch.getRowCount(),
5756
currentResponse.hasMoreRows);
5857
}
5958

@@ -75,16 +74,15 @@ public Object getObject(int columnIndex) throws DatabricksSQLException {
7574
throw new DatabricksSQLException(
7675
"Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE);
7776
}
78-
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) {
77+
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.getRowCount()) {
7978
throw new DatabricksSQLException(
8079
"Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE);
8180
}
82-
List<Object> currentRowData = currentBatch.get(currentBatchIndex);
83-
if (columnIndex < 0 || columnIndex >= currentRowData.size()) {
81+
if (columnIndex < 0 || columnIndex >= currentBatch.getColumnCount()) {
8482
throw new DatabricksSQLException(
8583
"Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
8684
}
87-
return currentRowData.get(columnIndex);
85+
return currentBatch.getValue(currentBatchIndex, columnIndex);
8886
}
8987

9088
/**
@@ -128,13 +126,13 @@ public boolean next() throws DatabricksSQLException {
128126
globalRowIndex++;
129127

130128
// Check if we need to fetch the next batch
131-
if (currentBatchIndex >= currentBatch.size()) {
129+
if (currentBatchIndex >= currentBatch.getRowCount()) {
132130
// Keep fetching until we get a non-empty batch or no more rows
133131
while (currentResponse.hasMoreRows) {
134132
fetchNextBatch();
135133

136134
// If we got a non-empty batch, we can proceed
137-
if (!currentBatch.isEmpty()) {
135+
if (currentBatch.getRowCount() > 0) {
138136
currentBatchIndex = 0; // Reset to first row of new batch
139137
break;
140138
}
@@ -143,7 +141,7 @@ public boolean next() throws DatabricksSQLException {
143141
}
144142

145143
// If we exited the loop and still have an empty batch, we've reached the end
146-
if (currentBatch.isEmpty()) {
144+
if (currentBatch.getRowCount() == 0) {
147145
hasReachedEnd = true;
148146
globalRowIndex--; // Revert the increment since we didn't actually move to a new row
149147
return false;
@@ -171,7 +169,7 @@ public boolean hasNext() {
171169
}
172170

173171
// Check if there are more rows in current batch
174-
if (currentBatchIndex + 1 < currentBatch.size()) {
172+
if (currentBatchIndex + 1 < currentBatch.getRowCount()) {
175173
return true;
176174
}
177175

@@ -196,7 +194,7 @@ public void close() {
196194
@Override
197195
public long getRowCount() {
198196
// Return the number of rows in the current batch
199-
return currentBatch != null ? currentBatch.size() : 0;
197+
return currentBatch != null ? currentBatch.getRowCount() : 0;
200198
}
201199

202200
/**
@@ -210,21 +208,18 @@ public long getChunkCount() {
210208
return 0;
211209
}
212210

213-
/**
214-
* Loads the current response data into memory as a batch of rows.
215-
*
216-
* @throws DatabricksSQLException if the response data cannot be processed
217-
*/
218211
private void loadCurrentBatch() throws DatabricksSQLException {
219-
currentBatch = extractRowsFromColumnar(currentResponse.getResults());
212+
currentBatch = createColumnarView(currentResponse.getResults());
220213
currentBatchIndex = -1; // Reset batch index
221-
totalRowsFetched += currentBatch.size();
214+
totalRowsFetched += currentBatch.getRowCount();
222215
LOGGER.debug(
223-
"Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched);
216+
"Loaded batch with {} rows, total fetched: {}",
217+
currentBatch.getRowCount(),
218+
totalRowsFetched);
224219
}
225220

226221
/**
227-
* Fetches the next batch of data from the server and loads it into memory.
222+
* Fetches the next batch of data from the server and creates columnar views.
228223
*
229224
* @throws DatabricksSQLException if the fetch operation fails
230225
*/
@@ -236,7 +231,7 @@ private void fetchNextBatch() throws DatabricksSQLException {
236231

237232
LOGGER.debug(
238233
"Fetched batch with {} rows, hasMoreRows: {}",
239-
currentBatch.size(),
234+
currentBatch.getRowCount(),
240235
currentResponse.hasMoreRows);
241236
} catch (DatabricksSQLException e) {
242237
LOGGER.error("Failed to fetch next batch: {}", e.getMessage());

src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.*;
55
import static com.databricks.jdbc.model.client.thrift.generated.TTypeId.*;
66

7+
import com.databricks.jdbc.api.impl.ColumnarRowView;
78
import com.databricks.jdbc.api.internal.IDatabricksSession;
89
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
910
import com.databricks.jdbc.common.DatabricksJdbcConstants;
@@ -125,6 +126,17 @@ public static List<List<Object>> extractRowsFromColumnar(TRowSet rowSet)
125126
return rows;
126127
}
127128

129+
/**
130+
* Memory-efficient alternative that creates a columnar view instead of materializing all rows.
131+
* Use this method to significantly reduce memory allocations when accessing row data.
132+
*
133+
* @param rowSet that contains columnar data
134+
* @return a columnar view that provides row-based access without full materialization
135+
*/
136+
public static ColumnarRowView createColumnarView(TRowSet rowSet) throws DatabricksSQLException {
137+
return new ColumnarRowView(rowSet);
138+
}
139+
128140
/** Returns statement status for given operation status response */
129141
public static StatementStatus getStatementStatus(TGetOperationStatusResp resp) {
130142
StatementState state = null;

0 commit comments

Comments
 (0)