diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index aca15bffb7..d9f75dd83f 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -11,7 +11,8 @@ - **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0. - Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations. - Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors. -- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata +- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata. +- Introduce a memory-efficient columnar data access mechanism for JDBC result processing. ### Updated - Databricks SDK dependency upgraded to latest version 0.60.0 diff --git a/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java b/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java new file mode 100644 index 0000000000..9c5b808104 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java @@ -0,0 +1,159 @@ +package com.databricks.jdbc.api.impl; + +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.model.client.thrift.generated.TColumn; +import com.databricks.jdbc.model.client.thrift.generated.TRowSet; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; +import java.util.BitSet; +import java.util.List; + +/** + * Memory-efficient columnar view that provides row-based access without materializing all rows. + * Instead of creating List>, this class provides direct access to columnar data on a + * per-row, per-column basis, significantly reducing memory allocations. + */ +public class ColumnarRowView { + private final List columns; + private final int rowCount; + private final ColumnAccessor[] columnAccessors; + + public ColumnarRowView(TRowSet rowSet) throws DatabricksSQLException { + this.columns = rowSet != null ? rowSet.getColumns() : null; + + if (columns == null || columns.isEmpty()) { + this.rowCount = 0; + this.columnAccessors = new ColumnAccessor[0]; + } else { + this.rowCount = getRowCountFromFirstColumn(); + this.columnAccessors = new ColumnAccessor[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + this.columnAccessors[i] = createColumnAccessor(columns.get(i)); + } + } + } + + /** Gets the number of rows in this view. */ + public int getRowCount() { + return rowCount; + } + + /** Gets the number of columns in this view. */ + public int getColumnCount() { + return columns != null ? columns.size() : 0; + } + + /** Gets the value at the specified row and column without materializing the entire row. */ + public Object getValue(int rowIndex, int columnIndex) throws DatabricksSQLException { + if (rowIndex < 0 || rowIndex >= rowCount) { + throw new DatabricksSQLException( + "Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + if (columnIndex < 0 || columnIndex >= columnAccessors.length) { + throw new DatabricksSQLException( + "Column index out of bounds: " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + + return columnAccessors[columnIndex].getValue(rowIndex); + } + + /** + * Creates a materialized row only when explicitly requested (for backward compatibility). This + * should be avoided in performance-critical paths. + */ + public Object[] materializeRow(int rowIndex) throws DatabricksSQLException { + if (rowIndex < 0 || rowIndex >= rowCount) { + throw new DatabricksSQLException( + "Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + + Object[] row = new Object[columnAccessors.length]; + for (int col = 0; col < columnAccessors.length; col++) { + row[col] = columnAccessors[col].getValue(rowIndex); + } + return row; + } + + private int getRowCountFromFirstColumn() throws DatabricksSQLException { + if (columns.isEmpty()) { + return 0; + } + TColumn firstColumn = columns.get(0); + return getColumnSize(firstColumn); + } + + private static int getColumnSize(TColumn column) throws DatabricksSQLException { + if (column.isSetBinaryVal()) return column.getBinaryVal().getValuesSize(); + if (column.isSetBoolVal()) return column.getBoolVal().getValuesSize(); + if (column.isSetByteVal()) return column.getByteVal().getValuesSize(); + if (column.isSetDoubleVal()) return column.getDoubleVal().getValuesSize(); + if (column.isSetI16Val()) return column.getI16Val().getValuesSize(); + if (column.isSetI32Val()) return column.getI32Val().getValuesSize(); + if (column.isSetI64Val()) return column.getI64Val().getValuesSize(); + if (column.isSetStringVal()) return column.getStringVal().getValuesSize(); + + throw new DatabricksSQLException( + "Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION); + } + + private static ColumnAccessor createColumnAccessor(TColumn column) throws DatabricksSQLException { + if (column.isSetBinaryVal()) { + return new TypedColumnAccessor<>( + column.getBinaryVal().getValues(), column.getBinaryVal().getNulls()); + } + if (column.isSetBoolVal()) { + return new TypedColumnAccessor<>( + column.getBoolVal().getValues(), column.getBoolVal().getNulls()); + } + if (column.isSetByteVal()) { + return new TypedColumnAccessor<>( + column.getByteVal().getValues(), column.getByteVal().getNulls()); + } + if (column.isSetDoubleVal()) { + return new TypedColumnAccessor<>( + column.getDoubleVal().getValues(), column.getDoubleVal().getNulls()); + } + if (column.isSetI16Val()) { + return new TypedColumnAccessor<>( + column.getI16Val().getValues(), column.getI16Val().getNulls()); + } + if (column.isSetI32Val()) { + return new TypedColumnAccessor<>( + column.getI32Val().getValues(), column.getI32Val().getNulls()); + } + if (column.isSetI64Val()) { + return new TypedColumnAccessor<>( + column.getI64Val().getValues(), column.getI64Val().getNulls()); + } + if (column.isSetStringVal()) { + return new TypedColumnAccessor<>( + column.getStringVal().getValues(), column.getStringVal().getNulls()); + } + + throw new DatabricksSQLException( + "Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION); + } + + /** Interface for accessing column values by index without materializing the entire column. */ + private interface ColumnAccessor { + Object getValue(int rowIndex); + } + + /** Memory-efficient column accessor that handles nulls and provides direct index-based access. */ + private static class TypedColumnAccessor implements ColumnAccessor { + private final List values; + private final BitSet nullBits; + + public TypedColumnAccessor(List values, byte[] nulls) { + this.values = values; + this.nullBits = nulls != null ? BitSet.valueOf(nulls) : null; + } + + @Override + public Object getValue(int rowIndex) { + if (nullBits != null && nullBits.get(rowIndex)) { + return null; + } + return values.get(rowIndex); + } + } +} diff --git a/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java index d6b6581ebd..568b232aca 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java @@ -1,7 +1,7 @@ package com.databricks.jdbc.api.impl; import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT; -import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar; +import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createColumnarView; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; @@ -10,13 +10,12 @@ import com.databricks.jdbc.log.JdbcLoggerFactory; import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp; import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; -import java.util.List; public class LazyThriftResult implements IExecutionResult { private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class); private TFetchResultsResp currentResponse; - private List> currentBatch; + private ColumnarRowView currentBatch; private int currentBatchIndex; private long globalRowIndex; private final IDatabricksSession session; @@ -53,7 +52,7 @@ public LazyThriftResult( loadCurrentBatch(); LOGGER.debug( "LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}", - currentBatch.size(), + currentBatch.getRowCount(), currentResponse.hasMoreRows); } @@ -75,16 +74,15 @@ public Object getObject(int columnIndex) throws DatabricksSQLException { throw new DatabricksSQLException( "Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE); } - if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) { + if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.getRowCount()) { throw new DatabricksSQLException( "Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE); } - List currentRowData = currentBatch.get(currentBatchIndex); - if (columnIndex < 0 || columnIndex >= currentRowData.size()) { + if (columnIndex < 0 || columnIndex >= currentBatch.getColumnCount()) { throw new DatabricksSQLException( "Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE); } - return currentRowData.get(columnIndex); + return currentBatch.getValue(currentBatchIndex, columnIndex); } /** @@ -128,13 +126,13 @@ public boolean next() throws DatabricksSQLException { globalRowIndex++; // Check if we need to fetch the next batch - if (currentBatchIndex >= currentBatch.size()) { + if (currentBatchIndex >= currentBatch.getRowCount()) { // Keep fetching until we get a non-empty batch or no more rows while (currentResponse.hasMoreRows) { fetchNextBatch(); // If we got a non-empty batch, we can proceed - if (!currentBatch.isEmpty()) { + if (currentBatch.getRowCount() > 0) { currentBatchIndex = 0; // Reset to first row of new batch break; } @@ -143,7 +141,7 @@ public boolean next() throws DatabricksSQLException { } // If we exited the loop and still have an empty batch, we've reached the end - if (currentBatch.isEmpty()) { + if (currentBatch.getRowCount() == 0) { hasReachedEnd = true; globalRowIndex--; // Revert the increment since we didn't actually move to a new row return false; @@ -171,7 +169,7 @@ public boolean hasNext() { } // Check if there are more rows in current batch - if (currentBatchIndex + 1 < currentBatch.size()) { + if (currentBatchIndex + 1 < currentBatch.getRowCount()) { return true; } @@ -196,7 +194,7 @@ public void close() { @Override public long getRowCount() { // Return the number of rows in the current batch - return currentBatch != null ? currentBatch.size() : 0; + return currentBatch != null ? currentBatch.getRowCount() : 0; } /** @@ -210,21 +208,18 @@ public long getChunkCount() { return 0; } - /** - * Loads the current response data into memory as a batch of rows. - * - * @throws DatabricksSQLException if the response data cannot be processed - */ private void loadCurrentBatch() throws DatabricksSQLException { - currentBatch = extractRowsFromColumnar(currentResponse.getResults()); + currentBatch = createColumnarView(currentResponse.getResults()); currentBatchIndex = -1; // Reset batch index - totalRowsFetched += currentBatch.size(); + totalRowsFetched += currentBatch.getRowCount(); LOGGER.debug( - "Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched); + "Loaded batch with {} rows, total fetched: {}", + currentBatch.getRowCount(), + totalRowsFetched); } /** - * Fetches the next batch of data from the server and loads it into memory. + * Fetches the next batch of data from the server and creates columnar views. * * @throws DatabricksSQLException if the fetch operation fails */ @@ -236,7 +231,7 @@ private void fetchNextBatch() throws DatabricksSQLException { LOGGER.debug( "Fetched batch with {} rows, hasMoreRows: {}", - currentBatch.size(), + currentBatch.getRowCount(), currentResponse.hasMoreRows); } catch (DatabricksSQLException e) { LOGGER.error("Failed to fetch next batch: {}", e.getMessage()); diff --git a/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java b/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java index e1a5dd185e..71f5aac2d4 100644 --- a/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java +++ b/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java @@ -4,6 +4,7 @@ import static com.databricks.jdbc.common.util.DatabricksTypeUtil.*; import static com.databricks.jdbc.model.client.thrift.generated.TTypeId.*; +import com.databricks.jdbc.api.impl.ColumnarRowView; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; import com.databricks.jdbc.common.DatabricksJdbcConstants; @@ -125,6 +126,17 @@ public static List> extractRowsFromColumnar(TRowSet rowSet) return rows; } + /** + * Memory-efficient alternative that creates a columnar view instead of materializing all rows. + * Use this method to significantly reduce memory allocations when accessing row data. + * + * @param rowSet that contains columnar data + * @return a columnar view that provides row-based access without full materialization + */ + public static ColumnarRowView createColumnarView(TRowSet rowSet) throws DatabricksSQLException { + return new ColumnarRowView(rowSet); + } + /** Returns statement status for given operation status response */ public static StatementStatus getStatementStatus(TGetOperationStatusResp resp) { StatementState state = null; diff --git a/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java b/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java new file mode 100644 index 0000000000..fad18cf0a2 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java @@ -0,0 +1,171 @@ +package com.databricks.jdbc.api.impl; + +import static org.junit.jupiter.api.Assertions.*; + +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.model.client.thrift.generated.*; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +public class ColumnarRowViewTest { + + @Test + void testEmptyRowSet() throws DatabricksSQLException { + TRowSet emptyRowSet = new TRowSet(); + ColumnarRowView view = new ColumnarRowView(emptyRowSet); + + assertEquals(0, view.getRowCount()); + assertEquals(0, view.getColumnCount()); + } + + @Test + void testNullRowSet() throws DatabricksSQLException { + ColumnarRowView view = new ColumnarRowView(null); + + assertEquals(0, view.getRowCount()); + assertEquals(0, view.getColumnCount()); + } + + @Test + void testStringColumn() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world", "test")); + stringColumn.setStringVal(stringVal); + + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals("hello", view.getValue(0, 0)); + assertEquals("world", view.getValue(1, 0)); + assertEquals("test", view.getValue(2, 0)); + } + + @Test + void testStringColumnWithNulls() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world", "test")); + // Set second value as null (bit 1 set in byte array) + stringVal.setNulls(new byte[] {0x02}); // Binary: 00000010 (bit 1 is set) + stringColumn.setStringVal(stringVal); + + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals("hello", view.getValue(0, 0)); + assertNull(view.getValue(1, 0)); + assertEquals("test", view.getValue(2, 0)); + } + + @Test + void testIntegerColumn() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(10, 20, 30)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Collections.singletonList(intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals(10, view.getValue(0, 0)); + assertEquals(20, view.getValue(1, 0)); + assertEquals(30, view.getValue(2, 0)); + } + + @Test + void testMultipleColumns() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + + // String column + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("a", "b")); + stringColumn.setStringVal(stringVal); + + // Integer column + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(1, 2)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Arrays.asList(stringColumn, intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(2, view.getRowCount()); + assertEquals(2, view.getColumnCount()); + + // First row + assertEquals("a", view.getValue(0, 0)); + assertEquals(1, view.getValue(0, 1)); + + // Second row + assertEquals("b", view.getValue(1, 0)); + assertEquals(2, view.getValue(1, 1)); + } + + @Test + void testMaterializeRow() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + + // String column + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world")); + stringColumn.setStringVal(stringVal); + + // Integer column + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(100, 200)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Arrays.asList(stringColumn, intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + Object[] row0 = view.materializeRow(0); + assertArrayEquals(new Object[] {"hello", 100}, row0); + + Object[] row1 = view.materializeRow(1); + assertArrayEquals(new Object[] {"world", 200}, row1); + } + + @Test + void testOutOfBoundsAccess() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("test")); + stringColumn.setStringVal(stringVal); + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + // Row out of bounds + assertThrows(DatabricksSQLException.class, () -> view.getValue(-1, 0)); + assertThrows(DatabricksSQLException.class, () -> view.getValue(1, 0)); + + // Column out of bounds + assertThrows(DatabricksSQLException.class, () -> view.getValue(0, -1)); + assertThrows(DatabricksSQLException.class, () -> view.getValue(0, 1)); + + // Materialize row out of bounds + assertThrows(DatabricksSQLException.class, () -> view.materializeRow(-1)); + assertThrows(DatabricksSQLException.class, () -> view.materializeRow(1)); + } +}