From 7afdb22167ca0f9d39041003e3deba8e7746f52d Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 28 Aug 2025 00:14:48 +0530 Subject: [PATCH 1/5] Introduce lazy fetch requests --- .../jdbc/api/impl/DatabricksResultSet.java | 28 ++ .../jdbc/api/impl/ExecutionResultFactory.java | 4 +- .../jdbc/api/impl/LazyThriftResult.java | 256 +++++++++++ .../impl/thrift/DatabricksThriftAccessor.java | 16 +- .../api/impl/ExecutionResultFactoryTest.java | 2 +- .../jdbc/api/impl/LazyThriftResultTest.java | 421 ++++++++++++++++++ 6 files changed, 716 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java create mode 100644 src/test/java/com/databricks/jdbc/api/impl/LazyThriftResultTest.java diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index 8d780fd5cd..61d7985f5e 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -583,6 +583,16 @@ public boolean isBeforeFirst() throws SQLException { return executionResult.getCurrentRow() == -1; } + /** + * {@inheritDoc} + * + *

Limitation: For lazy-loaded result sets ({@link LazyThriftResult}), particularly + * those using {@link + * com.databricks.jdbc.model.client.thrift.generated.TSparkRowSetType#COLUMN_BASED_SET}, this + * method cannot reliably determine the cursor position. The total row count remains unknown until + * all rows are fetched, preventing accurate detection of whether the cursor is after the last + * row. This is specific to Databricks JDBC dialect. + */ @Override public boolean isAfterLast() throws SQLException { checkIfClosed(); @@ -595,9 +605,27 @@ public boolean isFirst() throws SQLException { return executionResult.getCurrentRow() == 0; } + /** + * {@inheritDoc} + * + *

This method uses different strategies based on the result set type: + * + *

+ * + * @return {@code true} if the cursor is on the last row, {@code false} otherwise + * @throws SQLException if the result set is closed or an error occurs + */ @Override public boolean isLast() throws SQLException { checkIfClosed(); + if (executionResult instanceof LazyThriftResult) { + return !executionResult.hasNext(); + } return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1; } diff --git a/src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java b/src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java index fb10bec039..4c719731de 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java +++ b/src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java @@ -1,7 +1,5 @@ package com.databricks.jdbc.api.impl; -import static com.databricks.jdbc.common.util.DatabricksThriftUtil.convertColumnarToRowBased; - import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult; import com.databricks.jdbc.api.impl.volume.VolumeOperationResult; import com.databricks.jdbc.api.internal.IDatabricksSession; @@ -96,7 +94,7 @@ private static IExecutionResult getResultHandler( LOGGER.info("Processing result of format {} from Thrift server", resultFormat); switch (resultFormat) { case COLUMN_BASED_SET: - return getResultSet(convertColumnarToRowBased(resultsResp, parentStatement, session)); + return new LazyThriftResult(resultsResp, parentStatement, session); case ARROW_BASED_SET: return new ArrowStreamResult(resultsResp, true, parentStatement, session); case URL_BASED_SET: diff --git a/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java new file mode 100644 index 0000000000..03dd8d0d14 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java @@ -0,0 +1,256 @@ +package com.databricks.jdbc.api.impl; + +import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT; +import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar; + +import com.databricks.jdbc.api.internal.IDatabricksSession; +import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.log.JdbcLogger; +import com.databricks.jdbc.log.JdbcLoggerFactory; +import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; +import java.util.List; + +public class LazyThriftResult implements IExecutionResult { + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class); + + private TFetchResultsResp currentResponse; + private List> currentBatch; + private int currentBatchIndex; + private long globalRowIndex; + private final IDatabricksSession session; + private final IDatabricksStatementInternal statement; + private final int maxRows; + private boolean hasReachedEnd; + private boolean isClosed; + private long totalRowsFetched; + + /** + * Creates a new LazyThriftResult that lazily fetches data on demand. + * + * @param initialResponse the initial response from the server + * @param statement the statement that generated this result + * @param session the session to use for fetching additional data + * @throws DatabricksSQLException if the initial response cannot be processed + */ + public LazyThriftResult( + TFetchResultsResp initialResponse, + IDatabricksStatementInternal statement, + IDatabricksSession session) + throws DatabricksSQLException { + this.currentResponse = initialResponse; + this.statement = statement; + this.session = session; + this.maxRows = statement != null ? statement.getMaxRows() : DEFAULT_RESULT_ROW_LIMIT; + this.globalRowIndex = -1; + this.currentBatchIndex = -1; + this.hasReachedEnd = false; + this.isClosed = false; + this.totalRowsFetched = 0; + + // Load initial batch + loadCurrentBatch(); + LOGGER.debug( + "LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}", + currentBatch.size(), + currentResponse.hasMoreRows); + } + + /** + * Gets the value at the specified column index for the current row. + * + * @param columnIndex the zero-based column index + * @return the value at the specified column + * @throws DatabricksSQLException if the result is closed, cursor is invalid, or column index is + * out of bounds + */ + @Override + public Object getObject(int columnIndex) throws DatabricksSQLException { + if (isClosed) { + throw new DatabricksSQLException( + "Result is already closed", DatabricksDriverErrorCode.STATEMENT_CLOSED); + } + if (globalRowIndex == -1) { + throw new DatabricksSQLException( + "Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE); + } + if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) { + throw new DatabricksSQLException( + "Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE); + } + List currentRowData = currentBatch.get(currentBatchIndex); + if (columnIndex < 0 || columnIndex >= currentRowData.size()) { + throw new DatabricksSQLException( + "Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + return currentRowData.get(columnIndex); + } + + /** + * Gets the current row index (0-based). Returns -1 if before the first row. + * + * @return the current row index + */ + @Override + public long getCurrentRow() { + return globalRowIndex; + } + + /** + * Moves the cursor to the next row. Fetches additional data from server if needed. + * + * @return true if there is a next row, false if at the end + * @throws DatabricksSQLException if an error occurs while fetching data + */ + @Override + public boolean next() throws DatabricksSQLException { + if (isClosed || hasReachedEnd) { + return false; + } + + if (!hasNext()) { + // Ideally the client code should first call, hasNext() and then next() + // However, the client code like in DatabricksResultSet#next directly calls next + // So, this is a safeguard to ensure we don't move past the end + return false; + } + + // Check if we've reached the maxRows limit + boolean hasRowLimit = maxRows > 0; + if (hasRowLimit && globalRowIndex + 1 >= maxRows) { + hasReachedEnd = true; + return false; + } + + // Move to next row in current batch + currentBatchIndex++; + globalRowIndex++; + + // Check if we need to fetch the next batch + if (currentBatchIndex >= currentBatch.size()) { + // Keep fetching until we get a non-empty batch or no more rows + while (currentResponse.hasMoreRows) { + fetchNextBatch(); + + // If we got a non-empty batch, we can proceed + if (!currentBatch.isEmpty()) { + currentBatchIndex = 0; // Reset to first row of new batch + break; + } + + // If batch is still empty but hasMoreRows is false after fetch, we'll exit the loop + } + + // If we exited the loop and still have an empty batch, we've reached the end + if (currentBatch.isEmpty()) { + hasReachedEnd = true; + globalRowIndex--; // Revert the increment since we didn't actually move to a new row + return false; + } + } + + return true; + } + + /** + * Checks if there are more rows available without advancing the cursor. + * + * @return true if there are more rows, false otherwise + */ + @Override + public boolean hasNext() { + if (isClosed || hasReachedEnd) { + return false; + } + + // Check maxRows limit + boolean hasRowLimit = maxRows > 0; + if (hasRowLimit && globalRowIndex + 1 >= maxRows) { + return false; + } + + // Check if there are more rows in current batch + if (currentBatchIndex + 1 < currentBatch.size()) { + return true; + } + + // Check if there are more batches to fetch + return currentResponse.hasMoreRows; + } + + /** Closes this result and releases associated resources. */ + @Override + public void close() { + this.isClosed = true; + this.currentBatch = null; + this.currentResponse = null; + LOGGER.debug("LazyThriftResult closed after fetching {} total rows", totalRowsFetched); + } + + /** + * Gets the number of rows in the current batch. + * + * @return the number of rows in the current batch + */ + @Override + public long getRowCount() { + // Return the number of rows in the current batch + return currentBatch != null ? currentBatch.size() : 0; + } + + /** + * Gets the chunk count. Always returns 0 for thrift columnar results. + * + * @return 0 (thrift results don't use chunks like Arrow) + */ + @Override + public long getChunkCount() { + // For thrift columnar results, we don't have chunks in the same sense as Arrow + return 0; + } + + private void loadCurrentBatch() throws DatabricksSQLException { + currentBatch = extractRowsFromColumnar(currentResponse.getResults()); + currentBatchIndex = -1; // Reset batch index + totalRowsFetched += currentBatch.size(); + LOGGER.debug( + "Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched); + } + + private void fetchNextBatch() throws DatabricksSQLException { + try { + LOGGER.debug("Fetching next batch, current total rows fetched: {}", totalRowsFetched); + currentResponse = session.getDatabricksClient().getMoreResults(statement); + loadCurrentBatch(); + + LOGGER.debug( + "Fetched batch with {} rows, hasMoreRows: {}", + currentBatch.size(), + currentResponse.hasMoreRows); + } catch (DatabricksSQLException e) { + LOGGER.error("Failed to fetch next batch: {}", e.getMessage()); + hasReachedEnd = true; + throw e; // Propagate exception to fail fast + } + } + + /** + * Gets the total number of rows fetched from the server so far. This is different from + * getRowCount() which returns current batch size. + * + * @return the total number of rows fetched from the server + */ + public long getTotalRowsFetched() { + return totalRowsFetched; + } + + /** + * Checks if all data has been fetched from the server. + * + * @return true if all data has been fetched (either reached end or maxRows limit) + */ + public boolean isCompletelyFetched() { + return hasReachedEnd || !currentResponse.hasMoreRows; + } +} diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java index 314abd19ad..f4ca5c1aef 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java @@ -253,7 +253,7 @@ DatabricksResultSet execute( getResultSetResp( response.getStatus(), response.getOperationHandle(), - response.toString(), + "executeStatement", maxRowsPerBlock, true); long fetchEndTime = System.nanoTime(); @@ -294,7 +294,9 @@ private TGetOperationStatusResp pollTillOperationFinished( TGetOperationStatusResp statusResp = null; if (response.isSetDirectResults()) { checkDirectResultsForErrorStatus( - response.getDirectResults(), response.toString(), statementId.toSQLExecStatementId()); + response.getDirectResults(), + "executeStatement DirectResults", + statementId.toSQLExecStatementId()); statusResp = response.getDirectResults().getOperationStatus(); checkOperationStatusForErrors( statusResp, StatementId.loggableStatementId(response.getOperationHandle())); @@ -409,7 +411,7 @@ DatabricksResultSet getStatementResult( if (operationState == TOperationState.FINISHED_STATE) { long fetchStartTime = System.nanoTime(); resultSet = - getResultSetResp(response.getStatus(), operationHandle, response.toString(), -1, true); + getResultSetResp(response.getStatus(), operationHandle, "getStatementResult", -1, true); long fetchEndTime = System.nanoTime(); long fetchLatencyNanos = fetchEndTime - fetchStartTime; long fetchLatencyMillis = fetchLatencyNanos / 1_000_000; @@ -523,16 +525,16 @@ TFetchResultsResp getResultSetResp( } catch (TException e) { String errorMessage = String.format( - "Error while fetching results from Thrift server. Request {%s}, Error {%s}", - request.toString(), e.getMessage()); + "Error while fetching results from Thrift server. Request maxRows=%d, maxBytes=%d, Error {%s}", + request.getMaxRows(), request.getMaxBytes(), e.getMessage()); LOGGER.error(e, errorMessage); throw new DatabricksHttpException(errorMessage, e, DatabricksDriverErrorCode.INVALID_STATE); } verifySuccessStatus( response.getStatus(), String.format( - "Error while fetching results Request {%s}. TFetchResultsResp {%s}. ", - request, response), + "Error while fetching results Request maxRows=%d, maxBytes=%d. Response hasMoreRows=%s", + request.getMaxRows(), request.getMaxBytes(), response.hasMoreRows), statementId); return response; } diff --git a/src/test/java/com/databricks/jdbc/api/impl/ExecutionResultFactoryTest.java b/src/test/java/com/databricks/jdbc/api/impl/ExecutionResultFactoryTest.java index 25e0aad1f6..3046f5eda5 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/ExecutionResultFactoryTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/ExecutionResultFactoryTest.java @@ -101,7 +101,7 @@ public void testGetResultSet_thriftColumnar() throws SQLException { when(fetchResultsResp.getResultSetMetadata()).thenReturn(resultSetMetadataResp); IExecutionResult result = ExecutionResultFactory.getResultSet(fetchResultsResp, session, parentStatement); - assertInstanceOf(InlineJsonResult.class, result); + assertInstanceOf(LazyThriftResult.class, result); } @Test diff --git a/src/test/java/com/databricks/jdbc/api/impl/LazyThriftResultTest.java b/src/test/java/com/databricks/jdbc/api/impl/LazyThriftResultTest.java new file mode 100644 index 0000000000..5fee41b547 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/LazyThriftResultTest.java @@ -0,0 +1,421 @@ +package com.databricks.jdbc.api.impl; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.lenient; + +import com.databricks.jdbc.api.internal.IDatabricksSession; +import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; +import com.databricks.jdbc.dbclient.IDatabricksClient; +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.model.client.thrift.generated.*; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; +import java.util.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class LazyThriftResultTest { + + @Mock private IDatabricksSession session; + @Mock private IDatabricksStatementInternal statement; + @Mock private IDatabricksClient databricksClient; + + @BeforeEach + void setUp() throws DatabricksSQLException { + // Lenient stubbing to avoid unnecessary strictness + lenient().when(session.getDatabricksClient()).thenReturn(databricksClient); + lenient().when(statement.getMaxRows()).thenReturn(0); // No limit by default + } + + @Test + void testConstructorWithEmptyResult() throws DatabricksSQLException { + TFetchResultsResp emptyResponse = createEmptyResponse(false); + + LazyThriftResult result = new LazyThriftResult(emptyResponse, statement, session); + + assertEquals(0, result.getRowCount()); + assertEquals(0, result.getTotalRowsFetched()); + assertFalse(result.hasNext()); + assertFalse(result.next()); + assertTrue(result.isCompletelyFetched()); + assertEquals(-1, result.getCurrentRow()); + } + + @Test + void testBasicIteration() throws DatabricksSQLException { + TFetchResultsResp response = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), + Arrays.asList("row2_col1", "row2_col2"), + false); // hasMoreRows = false + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + + // Test initial state + assertEquals(-1, result.getCurrentRow()); + assertEquals(2, result.getRowCount()); + assertEquals(2, result.getTotalRowsFetched()); + assertTrue(result.hasNext()); + + // Test first row + assertTrue(result.next()); + assertEquals(0, result.getCurrentRow()); + assertEquals("row1_col1", result.getObject(0)); + assertEquals("row1_col2", result.getObject(1)); + + // Test second row + assertTrue(result.hasNext()); + assertTrue(result.next()); + assertEquals(1, result.getCurrentRow()); + assertEquals("row2_col1", result.getObject(0)); + assertEquals("row2_col2", result.getObject(1)); + + // Test end of data + assertFalse(result.hasNext()); + assertFalse(result.next()); + assertTrue(result.isCompletelyFetched()); + } + + @Test + void testLazyFetching() throws DatabricksSQLException { + // Setup first batch + TFetchResultsResp firstBatch = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), + Arrays.asList("row2_col1", "row2_col2"), + true); // hasMoreRows = true + + // Setup second batch + TFetchResultsResp secondBatch = + createResponseWithStringData( + Arrays.asList("row3_col1", "row3_col2"), + Arrays.asList("row4_col1", "row4_col2"), + false); // hasMoreRows = false + + when(databricksClient.getMoreResults(statement)).thenReturn(secondBatch); + + LazyThriftResult result = new LazyThriftResult(firstBatch, statement, session); + + // Consume first batch + assertTrue(result.next()); // row 1 + assertEquals("row1_col1", result.getObject(0)); + assertTrue(result.next()); // row 2 + assertEquals("row2_col1", result.getObject(0)); + assertEquals(2, result.getTotalRowsFetched()); // Only first batch loaded so far + + // This should trigger lazy loading of second batch + assertTrue(result.next()); // row 3 (from second batch) + verify(databricksClient, times(1)).getMoreResults(statement); + assertEquals(2, result.getCurrentRow()); + assertEquals("row3_col1", result.getObject(0)); + assertEquals("row3_col2", result.getObject(1)); + assertEquals(4, result.getTotalRowsFetched()); // Total from both batches + + // Continue to last row + assertTrue(result.next()); // row 4 + assertEquals(3, result.getCurrentRow()); + assertEquals("row4_col1", result.getObject(0)); + + // Should be end of data + assertFalse(result.next()); + assertTrue(result.isCompletelyFetched()); + } + + @Test + void testEmptyBatchHandling() throws DatabricksSQLException { + // Setup first batch (empty) + TFetchResultsResp emptyBatch = createEmptyResponse(true); // hasMoreRows = true + + // Setup second batch (with data) + TFetchResultsResp dataBatch = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), false); // hasMoreRows = false + + when(databricksClient.getMoreResults(statement)).thenReturn(dataBatch); + + LazyThriftResult result = new LazyThriftResult(emptyBatch, statement, session); + + // Should have no data initially but hasNext should be true due to hasMoreRows + assertEquals(0, result.getRowCount()); + assertTrue(result.hasNext()); // Should be true because hasMoreRows = true + + // This should fetch through the empty batch to get data + assertTrue(result.next()); + verify(databricksClient, times(1)).getMoreResults(statement); + assertEquals("row1_col1", result.getObject(0)); + assertEquals(1, result.getTotalRowsFetched()); + } + + @Test + void testMultipleEmptyBatches() throws DatabricksSQLException { + // Setup first batch (empty) + TFetchResultsResp emptyBatch1 = createEmptyResponse(true); + + // Setup second batch (empty) + TFetchResultsResp emptyBatch2 = createEmptyResponse(true); + + // Setup third batch (with data) + TFetchResultsResp dataBatch = + createResponseWithStringData( + Arrays.asList("final_col1", "final_col2"), false); // hasMoreRows = false + + when(databricksClient.getMoreResults(statement)).thenReturn(emptyBatch2).thenReturn(dataBatch); + + LazyThriftResult result = new LazyThriftResult(emptyBatch1, statement, session); + + // Should skip through multiple empty batches + assertTrue(result.next()); + verify(databricksClient, times(2)).getMoreResults(statement); + assertEquals("final_col1", result.getObject(0)); + } + + @Test + void testMaxRowsLimit() throws DatabricksSQLException { + when(statement.getMaxRows()).thenReturn(2); // Limit to 2 rows + + TFetchResultsResp response = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), + Arrays.asList("row2_col1", "row2_col2"), + Arrays.asList("row3_col1", "row3_col2"), + true); // hasMoreRows = true + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + + // Should only get first 2 rows due to maxRows limit + assertTrue(result.next()); + assertEquals("row1_col1", result.getObject(0)); + + assertTrue(result.next()); + assertEquals("row2_col1", result.getObject(0)); + + // Should not proceed to next row due to limit + assertFalse(result.hasNext()); + assertFalse(result.next()); + + // Should not attempt to fetch more results from server + verify(databricksClient, never()).getMoreResults(any()); + } + + @Test + void testMaxRowsLimitAcrossBatches() throws DatabricksSQLException { + when(statement.getMaxRows()).thenReturn(3); // Limit to 3 rows + + // First batch has 2 rows + TFetchResultsResp firstBatch = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), + Arrays.asList("row2_col1", "row2_col2"), + true); // hasMoreRows = true + + // Second batch has 2 rows + TFetchResultsResp secondBatch = + createResponseWithStringData( + Arrays.asList("row3_col1", "row3_col2"), + Arrays.asList("row4_col1", "row4_col2"), + false); // hasMoreRows = false + + when(databricksClient.getMoreResults(statement)).thenReturn(secondBatch); + + LazyThriftResult result = new LazyThriftResult(firstBatch, statement, session); + + // Consume first batch + assertTrue(result.next()); // row 1 + assertTrue(result.next()); // row 2 + + // Should get one more row from second batch then stop + assertTrue(result.next()); // row 3 + assertEquals("row3_col1", result.getObject(0)); + + // Should stop due to maxRows limit + assertFalse(result.hasNext()); + assertFalse(result.next()); + assertEquals(4, result.getTotalRowsFetched()); // 2 from first batch + 2 from second batch + } + + @Test + void testErrorHandlingDuringFetch() throws DatabricksSQLException { + TFetchResultsResp firstBatch = + createResponseWithStringData( + Arrays.asList("row1_col1", "row1_col2"), true); // hasMoreRows = true + + DatabricksSQLException expectedException = + new DatabricksSQLException("Network error", DatabricksDriverErrorCode.CONNECTION_ERROR); + when(databricksClient.getMoreResults(statement)).thenThrow(expectedException); + + LazyThriftResult result = new LazyThriftResult(firstBatch, statement, session); + + // Consume first batch + assertTrue(result.next()); + + // This should trigger the exception + DatabricksSQLException thrown = assertThrows(DatabricksSQLException.class, result::next); + assertEquals("Network error", thrown.getMessage()); + assertTrue(result.isCompletelyFetched()); // Should be marked as complete due to error + } + + @Test + void testClosedResultAccess() throws DatabricksSQLException { + TFetchResultsResp response = + createResponseWithStringData(Arrays.asList("row1_col1", "row1_col2"), false); + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + result.next(); // Position on first row + result.close(); + + // Should throw exception when trying to access after close + assertThrows(DatabricksSQLException.class, () -> result.getObject(0)); + assertFalse(result.hasNext()); + assertFalse(result.next()); + } + + @Test + void testInvalidColumnIndex() throws DatabricksSQLException { + TFetchResultsResp response = + createResponseWithStringData( + Arrays.asList("col1", "col2"), // 2 columns + false); + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + result.next(); // Move to first row + + // Valid indices + assertDoesNotThrow(() -> result.getObject(0)); + assertDoesNotThrow(() -> result.getObject(1)); + + // Invalid indices + assertThrows(DatabricksSQLException.class, () -> result.getObject(2)); + assertThrows(DatabricksSQLException.class, () -> result.getObject(-1)); + } + + @Test + void testAccessBeforeFirstRow() throws DatabricksSQLException { + TFetchResultsResp response = + createResponseWithStringData(Arrays.asList("row1_col1", "row1_col2"), false); + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + + // Should throw exception when trying to access before calling next() + DatabricksSQLException thrown = + assertThrows(DatabricksSQLException.class, () -> result.getObject(0)); + assertTrue(thrown.getMessage().contains("before first row")); + } + + @Test + void testNullHandling() throws DatabricksSQLException { + TFetchResultsResp response = createResponseWithNulls(); + + LazyThriftResult result = new LazyThriftResult(response, statement, session); + + assertTrue(result.next()); + assertEquals("value1", result.getObject(0)); + assertNull(result.getObject(1)); // Should be null + } + + @Test + void testChunkCount() throws DatabricksSQLException { + TFetchResultsResp response = createEmptyResponse(false); + LazyThriftResult result = new LazyThriftResult(response, statement, session); + + // LazyThriftResult doesn't use chunks like Arrow results + assertEquals(0, result.getChunkCount()); + } + + // Helper methods for creating test data + + private TFetchResultsResp createEmptyResponse(boolean hasMoreRows) { + TFetchResultsResp response = new TFetchResultsResp(); + response.hasMoreRows = hasMoreRows; + + TRowSet emptyRowSet = new TRowSet(); + emptyRowSet.setColumns(Collections.emptyList()); + response.setResults(emptyRowSet); + + return response; + } + + private TFetchResultsResp createResponseWithStringData(List row, boolean hasMoreRows) { + return createResponseWithStringRows(Arrays.asList(row), hasMoreRows); + } + + private TFetchResultsResp createResponseWithStringData( + List row1, List row2, boolean hasMoreRows) { + return createResponseWithStringRows(Arrays.asList(row1, row2), hasMoreRows); + } + + private TFetchResultsResp createResponseWithStringData( + List row1, List row2, List row3, boolean hasMoreRows) { + return createResponseWithStringRows(Arrays.asList(row1, row2, row3), hasMoreRows); + } + + private TFetchResultsResp createResponseWithStringRows( + List> rows, boolean hasMoreRows) { + TFetchResultsResp response = new TFetchResultsResp(); + response.hasMoreRows = hasMoreRows; + + if (rows.isEmpty()) { + return createEmptyResponse(hasMoreRows); + } + + TRowSet rowSet = new TRowSet(); + int numColumns = rows.get(0).size(); + List columns = new ArrayList<>(numColumns); + + // Create a column for each column index + for (int col = 0; col < numColumns; col++) { + TColumn column = new TColumn(); + TStringColumn stringCol = new TStringColumn(); + List colValues = new ArrayList<>(); + + // Extract values for this column from all rows + for (List row : rows) { + if (col < row.size()) { + colValues.add(row.get(col)); + } else { + colValues.add(null); + } + } + + stringCol.setValues(colValues); + column.setStringVal(stringCol); + columns.add(column); + } + + rowSet.setColumns(columns); + response.setResults(rowSet); + + return response; + } + + private TFetchResultsResp createResponseWithNulls() { + TFetchResultsResp response = new TFetchResultsResp(); + response.hasMoreRows = false; + + TRowSet rowSet = new TRowSet(); + List columns = new ArrayList<>(); + + // First column - no nulls + TColumn col1 = new TColumn(); + TStringColumn stringCol1 = new TStringColumn(); + stringCol1.setValues(Arrays.asList("value1")); + col1.setStringVal(stringCol1); + columns.add(col1); + + // Second column - with null + TColumn col2 = new TColumn(); + TStringColumn stringCol2 = new TStringColumn(); + stringCol2.setValues(Arrays.asList("placeholder")); // Actual value doesn't matter + stringCol2.setNulls(new byte[] {0x01}); // First bit set = null + col2.setStringVal(stringCol2); + columns.add(col2); + + rowSet.setColumns(columns); + response.setResults(rowSet); + + return response; + } +} From 76e36e34188d25f41e70c032f493fbc54635c822 Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 28 Aug 2025 13:34:53 +0530 Subject: [PATCH 2/5] temp benchmarking --- .../client/jdbc/DatabricksDriverExamples.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java b/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java index a544ee63d3..e0523ff7f8 100644 --- a/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java +++ b/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java @@ -49,8 +49,8 @@ public class DatabricksDriverExamples { private static final String JDBC_URL_WAREHOUSE = - "jdbc:databricks://sample-host.cloud.databricks.com:9999/default;" - + "transportMode=https;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/999999999999;"; + "jdbc:databricks://benchmarking-staging-aws-us-west-2.staging.cloud.databricks.com:443/default;" + + "transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/0112a04cba39da27;"; private static final String JDBC_URL_CLUSTER = "jdbc:databricks://sample-host.cloud.databricks.com:9999/default;" + "transportMode=http;ssl=1;httpPath=sql/protocolv1/o/9999999999999999/9999999999999999;AuthMech=3;"; @@ -107,16 +107,16 @@ public void printResultSet(ResultSet resultSet) throws SQLException { // Print row data while (resultSet.next()) { rows++; - for (int i = 1; i <= columnsNumber; i++) { - try { - Object columnValue = resultSet.getObject(i); - System.out.print(columnValue + "\t\t"); - } catch (Exception e) { - // Certain columns might be absent or throw exceptions in edge cases - System.out.print("NULL\t\t"); - } - } - System.out.println(); + // for (int i = 1; i <= columnsNumber; i++) { + // try { + // Object columnValue = resultSet.getObject(i); + // System.out.print(columnValue + "\t\t"); + // } catch (Exception e) { + // // Certain columns might be absent or throw exceptions in edge cases + // System.out.print("NULL\t\t"); + // } + // } + // System.out.println(); } System.out.println("Total rows: " + rows); } @@ -143,11 +143,14 @@ void exampleDefaultStringColumnLength() throws Exception { void exampleRowsFetchedPerBlock() throws Exception { // Register the Databricks JDBC driver DriverManager.registerDriver(new Driver()); - String jdbcUrl = - JDBC_URL_WAREHOUSE + "EnableTelemetry=1" + ";enableArrow=0" + ";RowsFetchedPerBlock=3"; - Connection con = DriverManager.getConnection(jdbcUrl, "token", DATABRICKS_TOKEN); + String jdbcUrl = JDBC_URL_WAREHOUSE + "EnableTelemetry=1" + ";enableArrow=0;RowsFetchedPerBlock=5000"; + Connection con = + DriverManager.getConnection(jdbcUrl, "token", "token"); Statement stmt = con.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT * FROM RANGE(12)"); // 4 FetchResults calls made + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM main.tpch_sf10000_delta.customer LIMIT 1000000"); // 4 FetchResults calls + // made printResultSet(rs); stmt.close(); rs.close(); From 29197e89801ac854a1a036d581660688ad154093 Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 28 Aug 2025 16:02:59 +0530 Subject: [PATCH 3/5] Revert "temp benchmarking" This reverts commit 76e36e34188d25f41e70c032f493fbc54635c822. --- .../client/jdbc/DatabricksDriverExamples.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java b/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java index e0523ff7f8..a544ee63d3 100644 --- a/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java +++ b/src/test/java/com/databricks/client/jdbc/DatabricksDriverExamples.java @@ -49,8 +49,8 @@ public class DatabricksDriverExamples { private static final String JDBC_URL_WAREHOUSE = - "jdbc:databricks://benchmarking-staging-aws-us-west-2.staging.cloud.databricks.com:443/default;" - + "transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/0112a04cba39da27;"; + "jdbc:databricks://sample-host.cloud.databricks.com:9999/default;" + + "transportMode=https;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/999999999999;"; private static final String JDBC_URL_CLUSTER = "jdbc:databricks://sample-host.cloud.databricks.com:9999/default;" + "transportMode=http;ssl=1;httpPath=sql/protocolv1/o/9999999999999999/9999999999999999;AuthMech=3;"; @@ -107,16 +107,16 @@ public void printResultSet(ResultSet resultSet) throws SQLException { // Print row data while (resultSet.next()) { rows++; - // for (int i = 1; i <= columnsNumber; i++) { - // try { - // Object columnValue = resultSet.getObject(i); - // System.out.print(columnValue + "\t\t"); - // } catch (Exception e) { - // // Certain columns might be absent or throw exceptions in edge cases - // System.out.print("NULL\t\t"); - // } - // } - // System.out.println(); + for (int i = 1; i <= columnsNumber; i++) { + try { + Object columnValue = resultSet.getObject(i); + System.out.print(columnValue + "\t\t"); + } catch (Exception e) { + // Certain columns might be absent or throw exceptions in edge cases + System.out.print("NULL\t\t"); + } + } + System.out.println(); } System.out.println("Total rows: " + rows); } @@ -143,14 +143,11 @@ void exampleDefaultStringColumnLength() throws Exception { void exampleRowsFetchedPerBlock() throws Exception { // Register the Databricks JDBC driver DriverManager.registerDriver(new Driver()); - String jdbcUrl = JDBC_URL_WAREHOUSE + "EnableTelemetry=1" + ";enableArrow=0;RowsFetchedPerBlock=5000"; - Connection con = - DriverManager.getConnection(jdbcUrl, "token", "token"); + String jdbcUrl = + JDBC_URL_WAREHOUSE + "EnableTelemetry=1" + ";enableArrow=0" + ";RowsFetchedPerBlock=3"; + Connection con = DriverManager.getConnection(jdbcUrl, "token", DATABRICKS_TOKEN); Statement stmt = con.createStatement(); - ResultSet rs = - stmt.executeQuery( - "SELECT * FROM main.tpch_sf10000_delta.customer LIMIT 1000000"); // 4 FetchResults calls - // made + ResultSet rs = stmt.executeQuery("SELECT * FROM RANGE(12)"); // 4 FetchResults calls made printResultSet(rs); stmt.close(); rs.close(); From c7e8ea7026bcb4f7080ad27bdad1d37d5aa19804 Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Sat, 30 Aug 2025 03:11:35 +0530 Subject: [PATCH 4/5] Column view --- .../jdbc/api/impl/ColumnarRowView.java | 159 ++++++++++++++++ .../jdbc/api/impl/LazyThriftResult.java | 34 ++-- .../common/util/DatabricksThriftUtil.java | 12 ++ .../jdbc/api/impl/ColumnarRowViewTest.java | 171 ++++++++++++++++++ 4 files changed, 359 insertions(+), 17 deletions(-) create mode 100644 src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java create mode 100644 src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java diff --git a/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java b/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java new file mode 100644 index 0000000000..9c5b808104 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/ColumnarRowView.java @@ -0,0 +1,159 @@ +package com.databricks.jdbc.api.impl; + +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.model.client.thrift.generated.TColumn; +import com.databricks.jdbc.model.client.thrift.generated.TRowSet; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; +import java.util.BitSet; +import java.util.List; + +/** + * Memory-efficient columnar view that provides row-based access without materializing all rows. + * Instead of creating List>, this class provides direct access to columnar data on a + * per-row, per-column basis, significantly reducing memory allocations. + */ +public class ColumnarRowView { + private final List columns; + private final int rowCount; + private final ColumnAccessor[] columnAccessors; + + public ColumnarRowView(TRowSet rowSet) throws DatabricksSQLException { + this.columns = rowSet != null ? rowSet.getColumns() : null; + + if (columns == null || columns.isEmpty()) { + this.rowCount = 0; + this.columnAccessors = new ColumnAccessor[0]; + } else { + this.rowCount = getRowCountFromFirstColumn(); + this.columnAccessors = new ColumnAccessor[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + this.columnAccessors[i] = createColumnAccessor(columns.get(i)); + } + } + } + + /** Gets the number of rows in this view. */ + public int getRowCount() { + return rowCount; + } + + /** Gets the number of columns in this view. */ + public int getColumnCount() { + return columns != null ? columns.size() : 0; + } + + /** Gets the value at the specified row and column without materializing the entire row. */ + public Object getValue(int rowIndex, int columnIndex) throws DatabricksSQLException { + if (rowIndex < 0 || rowIndex >= rowCount) { + throw new DatabricksSQLException( + "Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + if (columnIndex < 0 || columnIndex >= columnAccessors.length) { + throw new DatabricksSQLException( + "Column index out of bounds: " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + + return columnAccessors[columnIndex].getValue(rowIndex); + } + + /** + * Creates a materialized row only when explicitly requested (for backward compatibility). This + * should be avoided in performance-critical paths. + */ + public Object[] materializeRow(int rowIndex) throws DatabricksSQLException { + if (rowIndex < 0 || rowIndex >= rowCount) { + throw new DatabricksSQLException( + "Row index out of bounds: " + rowIndex, DatabricksDriverErrorCode.INVALID_STATE); + } + + Object[] row = new Object[columnAccessors.length]; + for (int col = 0; col < columnAccessors.length; col++) { + row[col] = columnAccessors[col].getValue(rowIndex); + } + return row; + } + + private int getRowCountFromFirstColumn() throws DatabricksSQLException { + if (columns.isEmpty()) { + return 0; + } + TColumn firstColumn = columns.get(0); + return getColumnSize(firstColumn); + } + + private static int getColumnSize(TColumn column) throws DatabricksSQLException { + if (column.isSetBinaryVal()) return column.getBinaryVal().getValuesSize(); + if (column.isSetBoolVal()) return column.getBoolVal().getValuesSize(); + if (column.isSetByteVal()) return column.getByteVal().getValuesSize(); + if (column.isSetDoubleVal()) return column.getDoubleVal().getValuesSize(); + if (column.isSetI16Val()) return column.getI16Val().getValuesSize(); + if (column.isSetI32Val()) return column.getI32Val().getValuesSize(); + if (column.isSetI64Val()) return column.getI64Val().getValuesSize(); + if (column.isSetStringVal()) return column.getStringVal().getValuesSize(); + + throw new DatabricksSQLException( + "Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION); + } + + private static ColumnAccessor createColumnAccessor(TColumn column) throws DatabricksSQLException { + if (column.isSetBinaryVal()) { + return new TypedColumnAccessor<>( + column.getBinaryVal().getValues(), column.getBinaryVal().getNulls()); + } + if (column.isSetBoolVal()) { + return new TypedColumnAccessor<>( + column.getBoolVal().getValues(), column.getBoolVal().getNulls()); + } + if (column.isSetByteVal()) { + return new TypedColumnAccessor<>( + column.getByteVal().getValues(), column.getByteVal().getNulls()); + } + if (column.isSetDoubleVal()) { + return new TypedColumnAccessor<>( + column.getDoubleVal().getValues(), column.getDoubleVal().getNulls()); + } + if (column.isSetI16Val()) { + return new TypedColumnAccessor<>( + column.getI16Val().getValues(), column.getI16Val().getNulls()); + } + if (column.isSetI32Val()) { + return new TypedColumnAccessor<>( + column.getI32Val().getValues(), column.getI32Val().getNulls()); + } + if (column.isSetI64Val()) { + return new TypedColumnAccessor<>( + column.getI64Val().getValues(), column.getI64Val().getNulls()); + } + if (column.isSetStringVal()) { + return new TypedColumnAccessor<>( + column.getStringVal().getValues(), column.getStringVal().getNulls()); + } + + throw new DatabricksSQLException( + "Unsupported column type: " + column, DatabricksDriverErrorCode.UNSUPPORTED_OPERATION); + } + + /** Interface for accessing column values by index without materializing the entire column. */ + private interface ColumnAccessor { + Object getValue(int rowIndex); + } + + /** Memory-efficient column accessor that handles nulls and provides direct index-based access. */ + private static class TypedColumnAccessor implements ColumnAccessor { + private final List values; + private final BitSet nullBits; + + public TypedColumnAccessor(List values, byte[] nulls) { + this.values = values; + this.nullBits = nulls != null ? BitSet.valueOf(nulls) : null; + } + + @Override + public Object getValue(int rowIndex) { + if (nullBits != null && nullBits.get(rowIndex)) { + return null; + } + return values.get(rowIndex); + } + } +} diff --git a/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java index 03dd8d0d14..00c2514522 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/LazyThriftResult.java @@ -1,7 +1,7 @@ package com.databricks.jdbc.api.impl; import static com.databricks.jdbc.common.EnvironmentVariables.DEFAULT_RESULT_ROW_LIMIT; -import static com.databricks.jdbc.common.util.DatabricksThriftUtil.extractRowsFromColumnar; +import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createColumnarView; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; @@ -10,13 +10,12 @@ import com.databricks.jdbc.log.JdbcLoggerFactory; import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp; import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; -import java.util.List; public class LazyThriftResult implements IExecutionResult { private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class); private TFetchResultsResp currentResponse; - private List> currentBatch; + private ColumnarRowView currentBatch; private int currentBatchIndex; private long globalRowIndex; private final IDatabricksSession session; @@ -53,7 +52,7 @@ public LazyThriftResult( loadCurrentBatch(); LOGGER.debug( "LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}", - currentBatch.size(), + currentBatch.getRowCount(), currentResponse.hasMoreRows); } @@ -75,16 +74,15 @@ public Object getObject(int columnIndex) throws DatabricksSQLException { throw new DatabricksSQLException( "Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE); } - if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.size()) { + if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.getRowCount()) { throw new DatabricksSQLException( "Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE); } - List currentRowData = currentBatch.get(currentBatchIndex); - if (columnIndex < 0 || columnIndex >= currentRowData.size()) { + if (columnIndex < 0 || columnIndex >= currentBatch.getColumnCount()) { throw new DatabricksSQLException( "Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE); } - return currentRowData.get(columnIndex); + return currentBatch.getValue(currentBatchIndex, columnIndex); } /** @@ -128,13 +126,13 @@ public boolean next() throws DatabricksSQLException { globalRowIndex++; // Check if we need to fetch the next batch - if (currentBatchIndex >= currentBatch.size()) { + if (currentBatchIndex >= currentBatch.getRowCount()) { // Keep fetching until we get a non-empty batch or no more rows while (currentResponse.hasMoreRows) { fetchNextBatch(); // If we got a non-empty batch, we can proceed - if (!currentBatch.isEmpty()) { + if (currentBatch.getRowCount() > 0) { currentBatchIndex = 0; // Reset to first row of new batch break; } @@ -143,7 +141,7 @@ public boolean next() throws DatabricksSQLException { } // If we exited the loop and still have an empty batch, we've reached the end - if (currentBatch.isEmpty()) { + if (currentBatch.getRowCount() == 0) { hasReachedEnd = true; globalRowIndex--; // Revert the increment since we didn't actually move to a new row return false; @@ -171,7 +169,7 @@ public boolean hasNext() { } // Check if there are more rows in current batch - if (currentBatchIndex + 1 < currentBatch.size()) { + if (currentBatchIndex + 1 < currentBatch.getRowCount()) { return true; } @@ -196,7 +194,7 @@ public void close() { @Override public long getRowCount() { // Return the number of rows in the current batch - return currentBatch != null ? currentBatch.size() : 0; + return currentBatch != null ? currentBatch.getRowCount() : 0; } /** @@ -211,11 +209,13 @@ public long getChunkCount() { } private void loadCurrentBatch() throws DatabricksSQLException { - currentBatch = extractRowsFromColumnar(currentResponse.getResults()); + currentBatch = createColumnarView(currentResponse.getResults()); currentBatchIndex = -1; // Reset batch index - totalRowsFetched += currentBatch.size(); + totalRowsFetched += currentBatch.getRowCount(); LOGGER.debug( - "Loaded batch with {} rows, total fetched: {}", currentBatch.size(), totalRowsFetched); + "Loaded batch with {} rows, total fetched: {}", + currentBatch.getRowCount(), + totalRowsFetched); } private void fetchNextBatch() throws DatabricksSQLException { @@ -226,7 +226,7 @@ private void fetchNextBatch() throws DatabricksSQLException { LOGGER.debug( "Fetched batch with {} rows, hasMoreRows: {}", - currentBatch.size(), + currentBatch.getRowCount(), currentResponse.hasMoreRows); } catch (DatabricksSQLException e) { LOGGER.error("Failed to fetch next batch: {}", e.getMessage()); diff --git a/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java b/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java index 824009d0f1..e3475a1341 100644 --- a/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java +++ b/src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java @@ -4,6 +4,7 @@ import static com.databricks.jdbc.common.util.DatabricksTypeUtil.*; import static com.databricks.jdbc.model.client.thrift.generated.TTypeId.*; +import com.databricks.jdbc.api.impl.ColumnarRowView; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; import com.databricks.jdbc.common.DatabricksJdbcConstants; @@ -125,6 +126,17 @@ public static List> extractRowsFromColumnar(TRowSet rowSet) return rows; } + /** + * Memory-efficient alternative that creates a columnar view instead of materializing all rows. + * Use this method to significantly reduce memory allocations when accessing row data. + * + * @param rowSet that contains columnar data + * @return a columnar view that provides row-based access without full materialization + */ + public static ColumnarRowView createColumnarView(TRowSet rowSet) throws DatabricksSQLException { + return new ColumnarRowView(rowSet); + } + /** Returns statement status for given operation status response */ public static StatementStatus getStatementStatus(TGetOperationStatusResp resp) { StatementState state = null; diff --git a/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java b/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java new file mode 100644 index 0000000000..fad18cf0a2 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/ColumnarRowViewTest.java @@ -0,0 +1,171 @@ +package com.databricks.jdbc.api.impl; + +import static org.junit.jupiter.api.Assertions.*; + +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.model.client.thrift.generated.*; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +public class ColumnarRowViewTest { + + @Test + void testEmptyRowSet() throws DatabricksSQLException { + TRowSet emptyRowSet = new TRowSet(); + ColumnarRowView view = new ColumnarRowView(emptyRowSet); + + assertEquals(0, view.getRowCount()); + assertEquals(0, view.getColumnCount()); + } + + @Test + void testNullRowSet() throws DatabricksSQLException { + ColumnarRowView view = new ColumnarRowView(null); + + assertEquals(0, view.getRowCount()); + assertEquals(0, view.getColumnCount()); + } + + @Test + void testStringColumn() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world", "test")); + stringColumn.setStringVal(stringVal); + + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals("hello", view.getValue(0, 0)); + assertEquals("world", view.getValue(1, 0)); + assertEquals("test", view.getValue(2, 0)); + } + + @Test + void testStringColumnWithNulls() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world", "test")); + // Set second value as null (bit 1 set in byte array) + stringVal.setNulls(new byte[] {0x02}); // Binary: 00000010 (bit 1 is set) + stringColumn.setStringVal(stringVal); + + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals("hello", view.getValue(0, 0)); + assertNull(view.getValue(1, 0)); + assertEquals("test", view.getValue(2, 0)); + } + + @Test + void testIntegerColumn() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(10, 20, 30)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Collections.singletonList(intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(3, view.getRowCount()); + assertEquals(1, view.getColumnCount()); + assertEquals(10, view.getValue(0, 0)); + assertEquals(20, view.getValue(1, 0)); + assertEquals(30, view.getValue(2, 0)); + } + + @Test + void testMultipleColumns() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + + // String column + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("a", "b")); + stringColumn.setStringVal(stringVal); + + // Integer column + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(1, 2)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Arrays.asList(stringColumn, intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + assertEquals(2, view.getRowCount()); + assertEquals(2, view.getColumnCount()); + + // First row + assertEquals("a", view.getValue(0, 0)); + assertEquals(1, view.getValue(0, 1)); + + // Second row + assertEquals("b", view.getValue(1, 0)); + assertEquals(2, view.getValue(1, 1)); + } + + @Test + void testMaterializeRow() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + + // String column + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("hello", "world")); + stringColumn.setStringVal(stringVal); + + // Integer column + TColumn intColumn = new TColumn(); + TI32Column intVal = new TI32Column(); + intVal.setValues(Arrays.asList(100, 200)); + intColumn.setI32Val(intVal); + + rowSet.setColumns(Arrays.asList(stringColumn, intColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + Object[] row0 = view.materializeRow(0); + assertArrayEquals(new Object[] {"hello", 100}, row0); + + Object[] row1 = view.materializeRow(1); + assertArrayEquals(new Object[] {"world", 200}, row1); + } + + @Test + void testOutOfBoundsAccess() throws DatabricksSQLException { + TRowSet rowSet = new TRowSet(); + TColumn stringColumn = new TColumn(); + TStringColumn stringVal = new TStringColumn(); + stringVal.setValues(Arrays.asList("test")); + stringColumn.setStringVal(stringVal); + rowSet.setColumns(Collections.singletonList(stringColumn)); + + ColumnarRowView view = new ColumnarRowView(rowSet); + + // Row out of bounds + assertThrows(DatabricksSQLException.class, () -> view.getValue(-1, 0)); + assertThrows(DatabricksSQLException.class, () -> view.getValue(1, 0)); + + // Column out of bounds + assertThrows(DatabricksSQLException.class, () -> view.getValue(0, -1)); + assertThrows(DatabricksSQLException.class, () -> view.getValue(0, 1)); + + // Materialize row out of bounds + assertThrows(DatabricksSQLException.class, () -> view.materializeRow(-1)); + assertThrows(DatabricksSQLException.class, () -> view.materializeRow(1)); + } +} From 61d97028a0711b8c14c9d398b7ecdbed7c5e02cd Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 11 Sep 2025 16:42:34 +0530 Subject: [PATCH 5/5] Next changelog --- NEXT_CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index aca15bffb7..d9f75dd83f 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -11,7 +11,8 @@ - **Configurable SQL validation in isValid()**: Added `EnableSQLValidationForIsValid` connection property to control whether `isValid()` method executes an actual SQL query for server-side validation. Default value is 0. - Implement multi-row INSERT batching optimization for prepared statements to improve performance when executing large batches of INSERT operations. - Implement lazy/incremental fetching for columnar results when using Databricks JDBC in Thrift mode without Arrow support. The change modifies the behavior from buffering entire result sets in memory to maintaining only a limited number of rows at a time, reducing peak heap memory usage and preventing OutOfMemory errors. -- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata +- Added new artifact `databricks-jdbc-thin` for thin jar with runtime dependency metadata. +- Introduce a memory-efficient columnar data access mechanism for JDBC result processing. ### Updated - Databricks SDK dependency upgraded to latest version 0.60.0