Skip to content

Commit c7e8ea7

Browse files
committed
Column view
1 parent 29197e8 commit c7e8ea7

4 files changed

Lines changed: 359 additions & 17 deletions

File tree

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package com.databricks.jdbc.api.impl;
2+
3+
import com.databricks.jdbc.exception.DatabricksSQLException;
4+
import com.databricks.jdbc.model.client.thrift.generated.TColumn;
5+
import com.databricks.jdbc.model.client.thrift.generated.TRowSet;
6+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
7+
import java.util.BitSet;
8+
import java.util.List;
9+
10+
/**
11+
* Memory-efficient columnar view that provides row-based access without materializing all rows.
12+
* Instead of creating List<List<Object>>, this class provides direct access to columnar data on a
13+
* per-row, per-column basis, significantly reducing memory allocations.
14+
*/
15+
public class ColumnarRowView {
16+
private final List<TColumn> columns;
17+
private final int rowCount;
18+
private final ColumnAccessor[] columnAccessors;
19+
20+
public ColumnarRowView(TRowSet rowSet) throws DatabricksSQLException {
21+
this.columns = rowSet != null ? rowSet.getColumns() : null;
22+
23+
if (columns == null || columns.isEmpty()) {
24+
this.rowCount = 0;
25+
this.columnAccessors = new ColumnAccessor[0];
26+
} else {
27+
this.rowCount = getRowCountFromFirstColumn();
28+
this.columnAccessors = new ColumnAccessor[columns.size()];
29+
for (int i = 0; i < columns.size(); i++) {
30+
this.columnAccessors[i] = createColumnAccessor(columns.get(i));
31+
}
32+
}
33+
}
34+
35+
/** Gets the number of rows in this view. */
36+
public int getRowCount() {
37+
return rowCount;
38+
}
39+
40+
/** Gets the number of columns in this view. */
41+
public int getColumnCount() {
42+
return columns != null ? columns.size() : 0;
43+
}
44+
45+
/** Gets the value at the specified row and column without materializing the entire row. */
46+
public Object getValue(int rowIndex, int columnIndex) throws DatabricksSQLException {
47+
if (rowIndex < 0 || rowIndex >= rowCount) {
48+
throw new DatabricksSQLException(
49+
"Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE);
50+
}
51+
if (columnIndex < 0 || columnIndex >= columnAccessors.length) {
52+
throw new DatabricksSQLException(
53+
"Column index out of bounds: " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
54+
}
55+
56+
return columnAccessors[columnIndex].getValue(rowIndex);
57+
}
58+
59+
/**
60+
* Creates a materialized row only when explicitly requested (for backward compatibility). This
61+
* should be avoided in performance-critical paths.
62+
*/
63+
public Object[] materializeRow(int rowIndex) throws DatabricksSQLException {
64+
if (rowIndex < 0 || rowIndex >= rowCount) {
65+
throw new DatabricksSQLException(
66+
"Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE);
67+
}
68+
69+
Object[] row = new Object[columnAccessors.length];
70+
for (int col = 0; col < columnAccessors.length; col++) {
71+
row[col] = columnAccessors[col].getValue(rowIndex);
72+
}
73+
return row;
74+
}
75+
76+
private int getRowCountFromFirstColumn() throws DatabricksSQLException {
77+
if (columns.isEmpty()) {
78+
return 0;
79+
}
80+
TColumn firstColumn = columns.get(0);
81+
return getColumnSize(firstColumn);
82+
}
83+
84+
private static int getColumnSize(TColumn column) throws DatabricksSQLException {
85+
if (column.isSetBinaryVal()) return column.getBinaryVal().getValuesSize();
86+
if (column.isSetBoolVal()) return column.getBoolVal().getValuesSize();
87+
if (column.isSetByteVal()) return column.getByteVal().getValuesSize();
88+
if (column.isSetDoubleVal()) return column.getDoubleVal().getValuesSize();
89+
if (column.isSetI16Val()) return column.getI16Val().getValuesSize();
90+
if (column.isSetI32Val()) return column.getI32Val().getValuesSize();
91+
if (column.isSetI64Val()) return column.getI64Val().getValuesSize();
92+
if (column.isSetStringVal()) return column.getStringVal().getValuesSize();
93+
94+
throw new DatabricksSQLException(
95+
"Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION);
96+
}
97+
98+
private static ColumnAccessor createColumnAccessor(TColumn column) throws DatabricksSQLException {
99+
if (column.isSetBinaryVal()) {
100+
return new TypedColumnAccessor<>(
101+
column.getBinaryVal().getValues(), column.getBinaryVal().getNulls());
102+
}
103+
if (column.isSetBoolVal()) {
104+
return new TypedColumnAccessor<>(
105+
column.getBoolVal().getValues(), column.getBoolVal().getNulls());
106+
}
107+
if (column.isSetByteVal()) {
108+
return new TypedColumnAccessor<>(
109+
column.getByteVal().getValues(), column.getByteVal().getNulls());
110+
}
111+
if (column.isSetDoubleVal()) {
112+
return new TypedColumnAccessor<>(
113+
column.getDoubleVal().getValues(), column.getDoubleVal().getNulls());
114+
}
115+
if (column.isSetI16Val()) {
116+
return new TypedColumnAccessor<>(
117+
column.getI16Val().getValues(), column.getI16Val().getNulls());
118+
}
119+
if (column.isSetI32Val()) {
120+
return new TypedColumnAccessor<>(
121+
column.getI32Val().getValues(), column.getI32Val().getNulls());
122+
}
123+
if (column.isSetI64Val()) {
124+
return new TypedColumnAccessor<>(
125+
column.getI64Val().getValues(), column.getI64Val().getNulls());
126+
}
127+
if (column.isSetStringVal()) {
128+
return new TypedColumnAccessor<>(
129+
column.getStringVal().getValues(), column.getStringVal().getNulls());
130+
}
131+
132+
throw new DatabricksSQLException(
133+
"Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION);
134+
}
135+
136+
/** Interface for accessing column values by index without materializing the entire column. */
137+
private interface ColumnAccessor {
138+
Object getValue(int rowIndex);
139+
}
140+
141+
/** Memory-efficient column accessor that handles nulls and provides direct index-based access. */
142+
private static class TypedColumnAccessor<T> implements ColumnAccessor {
143+
private final List<T> values;
144+
private final BitSet nullBits;
145+
146+
public TypedColumnAccessor(List<T> values, byte[] nulls) {
147+
this.values = values;
148+
this.nullBits = nulls != null ? BitSet.valueOf(nulls) : null;
149+
}
150+
151+
@Override
152+
public Object getValue(int rowIndex) {
153+
if (nullBits != null && nullBits.get(rowIndex)) {
154+
return null;
155+
}
156+
return values.get(rowIndex);
157+
}
158+
}
159+
}

src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

33
import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT;
4-
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar;
4+
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createColumnarView;
55

66
import com.databricks.jdbc.api.internal.IDatabricksSession;
77
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
@@ -10,13 +10,12 @@
1010
import com.databricks.jdbc.log.JdbcLoggerFactory;
1111
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
1212
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
13-
import java.util.List;
1413

1514
public class LazyThriftResult implements IExecutionResult {
1615
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class);
1716

1817
private TFetchResultsResp currentResponse;
19-
private List<List<Object>> currentBatch;
18+
private ColumnarRowView currentBatch;
2019
private int currentBatchIndex;
2120
private long globalRowIndex;
2221
private final IDatabricksSession session;
@@ -53,7 +52,7 @@ public LazyThriftResult(
5352
loadCurrentBatch();
5453
LOGGER.debug(
5554
"LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}",
56-
currentBatch.size(),
55+
currentBatch.getRowCount(),
5756
currentResponse.hasMoreRows);
5857
}
5958

@@ -75,16 +74,15 @@ public Object getObject(int columnIndex) throws DatabricksSQLException {
7574
throw new DatabricksSQLException(
7675
"Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE);
7776
}
78-
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) {
77+
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.getRowCount()) {
7978
throw new DatabricksSQLException(
8079
"Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE);
8180
}
82-
List<Object> currentRowData = currentBatch.get(currentBatchIndex);
83-
if (columnIndex < 0 || columnIndex >= currentRowData.size()) {
81+
if (columnIndex < 0 || columnIndex >= currentBatch.getColumnCount()) {
8482
throw new DatabricksSQLException(
8583
"Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
8684
}
87-
return currentRowData.get(columnIndex);
85+
return currentBatch.getValue(currentBatchIndex, columnIndex);
8886
}
8987

9088
/**
@@ -128,13 +126,13 @@ public boolean next() throws DatabricksSQLException {
128126
globalRowIndex++;
129127

130128
// Check if we need to fetch the next batch
131-
if (currentBatchIndex >= currentBatch.size()) {
129+
if (currentBatchIndex >= currentBatch.getRowCount()) {
132130
// Keep fetching until we get a non-empty batch or no more rows
133131
while (currentResponse.hasMoreRows) {
134132
fetchNextBatch();
135133

136134
// If we got a non-empty batch, we can proceed
137-
if (!currentBatch.isEmpty()) {
135+
if (currentBatch.getRowCount() > 0) {
138136
currentBatchIndex = 0; // Reset to first row of new batch
139137
break;
140138
}
@@ -143,7 +141,7 @@ public boolean next() throws DatabricksSQLException {
143141
}
144142

145143
// If we exited the loop and still have an empty batch, we've reached the end
146-
if (currentBatch.isEmpty()) {
144+
if (currentBatch.getRowCount() == 0) {
147145
hasReachedEnd = true;
148146
globalRowIndex--; // Revert the increment since we didn't actually move to a new row
149147
return false;
@@ -171,7 +169,7 @@ public boolean hasNext() {
171169
}
172170

173171
// Check if there are more rows in current batch
174-
if (currentBatchIndex + 1 < currentBatch.size()) {
172+
if (currentBatchIndex + 1 < currentBatch.getRowCount()) {
175173
return true;
176174
}
177175

@@ -196,7 +194,7 @@ public void close() {
196194
@Override
197195
public long getRowCount() {
198196
// Return the number of rows in the current batch
199-
return currentBatch != null ? currentBatch.size() : 0;
197+
return currentBatch != null ? currentBatch.getRowCount() : 0;
200198
}
201199

202200
/**
@@ -211,11 +209,13 @@ public long getChunkCount() {
211209
}
212210

213211
private void loadCurrentBatch() throws DatabricksSQLException {
214-
currentBatch = extractRowsFromColumnar(currentResponse.getResults());
212+
currentBatch = createColumnarView(currentResponse.getResults());
215213
currentBatchIndex = -1; // Reset batch index
216-
totalRowsFetched += currentBatch.size();
214+
totalRowsFetched += currentBatch.getRowCount();
217215
LOGGER.debug(
218-
"Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched);
216+
"Loaded batch with {} rows, total fetched: {}",
217+
currentBatch.getRowCount(),
218+
totalRowsFetched);
219219
}
220220

221221
private void fetchNextBatch() throws DatabricksSQLException {
@@ -226,7 +226,7 @@ private void fetchNextBatch() throws DatabricksSQLException {
226226

227227
LOGGER.debug(
228228
"Fetched batch with {} rows, hasMoreRows: {}",
229-
currentBatch.size(),
229+
currentBatch.getRowCount(),
230230
currentResponse.hasMoreRows);
231231
} catch (DatabricksSQLException e) {
232232
LOGGER.error("Failed to fetch next batch: {}", e.getMessage());

src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.*;
55
import static com.databricks.jdbc.model.client.thrift.generated.TTypeId.*;
66

7+
import com.databricks.jdbc.api.impl.ColumnarRowView;
78
import com.databricks.jdbc.api.internal.IDatabricksSession;
89
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
910
import com.databricks.jdbc.common.DatabricksJdbcConstants;
@@ -125,6 +126,17 @@ public static List<List<Object>> extractRowsFromColumnar(TRowSet rowSet)
125126
return rows;
126127
}
127128

129+
/**
130+
* Memory-efficient alternative that creates a columnar view instead of materializing all rows.
131+
* Use this method to significantly reduce memory allocations when accessing row data.
132+
*
133+
* @param rowSet that contains columnar data
134+
* @return a columnar view that provides row-based access without full materialization
135+
*/
136+
public static ColumnarRowView createColumnarView(TRowSet rowSet) throws DatabricksSQLException {
137+
return new ColumnarRowView(rowSet);
138+
}
139+
128140
/** Returns statement status for given operation status response */
129141
public static StatementStatus getStatementStatus(TGetOperationStatusResp resp) {
130142
StatementState state = null;

0 commit comments

Comments
 (0)