Skip to content

Commit 4310e77

Browse files
Implement lazy loading for inline Arrow results (#1029)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> This PR introduces lazy loading support for inline Arrow results to improve memory efficiency when handling large result sets. Previously, InlineChunkProvider would eagerly fetch all arrow batches upfront when results had hasMoreRows = true, which could lead to memory issues with large datasets. This change splits the handling into two separate paths: 1. Lazy path (new): For Thrift-based inline Arrow results (when ARROW_BASED_SET is returned), we now use LazyThriftInlineArrowResult which fetches arrow batches on-demand as the client iterates through rows. This is similar to how LazyThriftResult works for columnar data. 2. Remote path (existing): For URL-based Arrow results (URL_BASED_SET), we continue using ArrowStreamResult with RemoteChunkProvider which downloads chunks from cloud storage. The InlineChunkProvider is now only used for SEA results with JSON_ARRAY format and INLINE disposition (contain all data inline {no hasMoreRows flag set}). This will reduce memory consumption and improve performance when dealing with large inline Arrow result sets similar to #975. ## Testing <!-- Describe how the changes have been tested--> - Unit tests - Integration tests - Manual testing ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> Bypassing an existing failure on CI/CD because of 3e4f21c
1 parent 3e4f21c commit 4310e77

11 files changed

Lines changed: 1000 additions & 239 deletions

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Added
66

77
### Updated
8+
- Implemented lazy loading for inline Arrow results, fetching arrow batches on demand instead of all at once. This improves memory usage and initial response time for large result sets when using the Thrift protocol with Arrow format.
89

910
### Fixed
1011
- Fixed complex data type metadata support when retrieving 0 rows in Arrow format

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.databricks.jdbc.api.IExecutionStatus;
88
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
99
import com.databricks.jdbc.api.impl.arrow.ChunkProvider;
10+
import com.databricks.jdbc.api.impl.arrow.LazyThriftInlineArrowResult;
1011
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
1112
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
1213
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
@@ -155,6 +156,8 @@ public DatabricksResultSet(
155156
List<String> arrowMetadata = null;
156157
if (executionResult instanceof ArrowStreamResult) {
157158
arrowMetadata = ((ArrowStreamResult) executionResult).getArrowMetadata();
159+
} else if (executionResult instanceof LazyThriftInlineArrowResult) {
160+
arrowMetadata = ((LazyThriftInlineArrowResult) executionResult).getArrowMetadata();
158161
}
159162
this.resultSetMetaData =
160163
new DatabricksResultSetMetaData(

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

33
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
4+
import com.databricks.jdbc.api.impl.arrow.LazyThriftInlineArrowResult;
45
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
56
import com.databricks.jdbc.api.internal.IDatabricksSession;
67
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
@@ -96,9 +97,9 @@ private static IExecutionResult getResultHandler(
9697
case COLUMN_BASED_SET:
9798
return new LazyThriftResult(resultsResp, parentStatement, session);
9899
case ARROW_BASED_SET:
99-
return new ArrowStreamResult(resultsResp, true, parentStatement, session);
100+
return new LazyThriftInlineArrowResult(resultsResp, parentStatement, session);
100101
case URL_BASED_SET:
101-
return new ArrowStreamResult(resultsResp, false, parentStatement, session);
102+
return new ArrowStreamResult(resultsResp, parentStatement, session);
102103
case ROW_BASED_SET:
103104
throw new DatabricksSQLFeatureNotSupportedException(
104105
"Invalid state - row based set cannot be received");

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 67 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,11 @@ private static ChunkProvider createRemoteChunkProvider(
143143

144144
public ArrowStreamResult(
145145
TFetchResultsResp resultsResp,
146-
boolean isInlineArrow,
147146
IDatabricksStatementInternal parentStatementId,
148147
IDatabricksSession session)
149148
throws DatabricksSQLException {
150149
this(
151150
resultsResp,
152-
isInlineArrow,
153151
parentStatementId,
154152
session,
155153
DatabricksHttpClientFactory.getInstance().getClient(session.getConnectionContext()));
@@ -158,19 +156,14 @@ public ArrowStreamResult(
158156
@VisibleForTesting
159157
ArrowStreamResult(
160158
TFetchResultsResp resultsResp,
161-
boolean isInlineArrow,
162159
IDatabricksStatementInternal parentStatement,
163160
IDatabricksSession session,
164161
IDatabricksHttpClient httpClient)
165162
throws DatabricksSQLException {
166163
this.session = session;
167164
setColumnInfo(resultsResp.getResultSetMetadata());
168-
if (isInlineArrow) {
169-
this.chunkProvider = new InlineChunkProvider(resultsResp, parentStatement, session);
170-
} else {
171-
this.chunkProvider =
172-
createThriftRemoteChunkProvider(resultsResp, parentStatement, session, httpClient);
173-
}
165+
this.chunkProvider =
166+
createThriftRemoteChunkProvider(resultsResp, parentStatement, session, httpClient);
174167
}
175168

176169
/**
@@ -238,48 +231,15 @@ public List<String> getArrowMetadata() throws DatabricksSQLException {
238231
/** {@inheritDoc} */
239232
@Override
240233
public Object getObject(int columnIndex) throws DatabricksSQLException {
241-
ColumnInfoTypeName requiredType = columnInfos.get(columnIndex).getTypeName();
234+
ColumnInfo columnInfo = columnInfos.get(columnIndex);
235+
ColumnInfoTypeName requiredType = columnInfo.getTypeName();
242236
String arrowMetadata = chunkIterator.getType(columnIndex);
243237
if (arrowMetadata == null) {
244-
arrowMetadata = columnInfos.get(columnIndex).getTypeText();
238+
arrowMetadata = columnInfo.getTypeText();
245239
}
246240

247-
// Handle complex type conversion when complex datatype support is disabled
248-
boolean isComplexDatatypeSupportEnabled =
249-
this.session.getConnectionContext().isComplexDatatypeSupportEnabled();
250-
boolean isGeoSpatialSupportEnabled =
251-
this.session.getConnectionContext().isGeoSpatialSupportEnabled();
252-
253-
// Check if we need to convert geospatial types to string when geospatial support is disabled
254-
// This check must come before the general complex type check
255-
if (!isGeoSpatialSupportEnabled && isGeospatialType(requiredType)) {
256-
LOGGER.debug("Geospatial support is disabled, converting {} to STRING", requiredType);
257-
258-
Object result =
259-
chunkIterator.getColumnObjectAtCurrentRow(
260-
columnIndex, ColumnInfoTypeName.STRING, "STRING", columnInfos.get(columnIndex));
261-
if (result == null) {
262-
return null;
263-
}
264-
// Return raw string for geospatial types when support is disabled
265-
return result;
266-
}
267-
268-
if (!isComplexDatatypeSupportEnabled && isComplexType(requiredType)) {
269-
LOGGER.debug("Complex datatype support is disabled, converting complex type to STRING");
270-
271-
Object result =
272-
chunkIterator.getColumnObjectAtCurrentRow(
273-
columnIndex, ColumnInfoTypeName.STRING, "STRING", columnInfos.get(columnIndex));
274-
if (result == null) {
275-
return null;
276-
}
277-
ComplexDataTypeParser parser = new ComplexDataTypeParser();
278-
return parser.formatComplexTypeString(result.toString(), requiredType.name(), arrowMetadata);
279-
}
280-
281-
return chunkIterator.getColumnObjectAtCurrentRow(
282-
columnIndex, requiredType, arrowMetadata, columnInfos.get(columnIndex));
241+
return getObjectWithComplexTypeHandling(
242+
session, chunkIterator, columnIndex, requiredType, arrowMetadata, columnInfo);
283243
}
284244

285245
/**
@@ -384,6 +344,66 @@ private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
384344
}
385345
}
386346

347+
/**
348+
* Helper method to handle complex type and geospatial type conversion when support is disabled.
349+
*
350+
* <p>This method is also used by LazyThriftInlineArrowResult for consistent type handling.
351+
*
352+
* @param session The databricks session
353+
* @param chunkIterator The chunk iterator
354+
* @param columnIndex The column index
355+
* @param requiredType The required column type
356+
* @param arrowMetadata The arrow metadata
357+
* @param columnInfo The column info
358+
* @return The object value (converted if complex/geospatial type and support disabled)
359+
* @throws DatabricksSQLException if an error occurs
360+
*/
361+
protected static Object getObjectWithComplexTypeHandling(
362+
IDatabricksSession session,
363+
ArrowResultChunkIterator chunkIterator,
364+
int columnIndex,
365+
ColumnInfoTypeName requiredType,
366+
String arrowMetadata,
367+
ColumnInfo columnInfo)
368+
throws DatabricksSQLException {
369+
boolean isComplexDatatypeSupportEnabled =
370+
session.getConnectionContext().isComplexDatatypeSupportEnabled();
371+
boolean isGeoSpatialSupportEnabled =
372+
session.getConnectionContext().isGeoSpatialSupportEnabled();
373+
374+
// Check if we need to convert geospatial types to string when geospatial support is disabled
375+
// This check must come before the general complex type check
376+
if (!isGeoSpatialSupportEnabled && isGeospatialType(requiredType)) {
377+
LOGGER.debug("Geospatial support is disabled, converting {} to STRING", requiredType);
378+
379+
Object result =
380+
chunkIterator.getColumnObjectAtCurrentRow(
381+
columnIndex, ColumnInfoTypeName.STRING, "STRING", columnInfo);
382+
if (result == null) {
383+
return null;
384+
}
385+
// Return raw string for geospatial types when support is disabled
386+
return result;
387+
}
388+
389+
if (!isComplexDatatypeSupportEnabled && isComplexType(requiredType)) {
390+
LOGGER.debug("Complex datatype support is disabled, converting complex type to STRING");
391+
392+
Object result =
393+
chunkIterator.getColumnObjectAtCurrentRow(
394+
columnIndex, ColumnInfoTypeName.STRING, "STRING", columnInfo);
395+
if (result == null) {
396+
return null;
397+
}
398+
ComplexDataTypeParser parser = new ComplexDataTypeParser();
399+
400+
return parser.formatComplexTypeString(result.toString(), requiredType.name(), arrowMetadata);
401+
}
402+
403+
return chunkIterator.getColumnObjectAtCurrentRow(
404+
columnIndex, requiredType, arrowMetadata, columnInfo);
405+
}
406+
387407
/**
388408
* Converts a collection of ExternalLinks to a ChunkLinkFetchResult.
389409
*

src/main/java/com/databricks/jdbc/api/impl/arrow/InlineChunkProvider.java

Lines changed: 0 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,17 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3-
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.*;
43
import static com.databricks.jdbc.common.util.DecompressionUtil.decompress;
54

6-
import com.databricks.jdbc.api.internal.IDatabricksSession;
7-
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
85
import com.databricks.jdbc.common.CompressionCodec;
96
import com.databricks.jdbc.exception.DatabricksParsingException;
107
import com.databricks.jdbc.exception.DatabricksSQLException;
118
import com.databricks.jdbc.log.JdbcLogger;
129
import com.databricks.jdbc.log.JdbcLoggerFactory;
13-
import com.databricks.jdbc.model.client.thrift.generated.*;
1410
import com.databricks.jdbc.model.core.ResultData;
1511
import com.databricks.jdbc.model.core.ResultManifest;
1612
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
1713
import com.google.common.annotations.VisibleForTesting;
1814
import java.io.ByteArrayInputStream;
19-
import java.io.ByteArrayOutputStream;
20-
import java.io.IOException;
21-
import java.sql.SQLException;
22-
import java.util.ArrayList;
23-
import java.util.List;
24-
import org.apache.arrow.vector.types.pojo.ArrowType;
25-
import org.apache.arrow.vector.types.pojo.Field;
26-
import org.apache.arrow.vector.types.pojo.FieldType;
27-
import org.apache.arrow.vector.types.pojo.Schema;
28-
import org.apache.arrow.vector.util.SchemaUtility;
2915

3016
/** Class to manage inline Arrow chunks */
3117
public class InlineChunkProvider implements ChunkProvider {
@@ -37,23 +23,6 @@ public class InlineChunkProvider implements ChunkProvider {
3723
private final ArrowResultChunk
3824
arrowResultChunk; // There is only one packet of data in case of inline arrow
3925

40-
InlineChunkProvider(
41-
TFetchResultsResp resultsResp,
42-
IDatabricksStatementInternal parentStatement,
43-
IDatabricksSession session)
44-
throws DatabricksParsingException {
45-
this.currentChunkIndex = -1;
46-
this.totalRows = 0;
47-
ByteArrayInputStream byteStream = initializeByteStream(resultsResp, session, parentStatement);
48-
ArrowResultChunk.Builder builder =
49-
ArrowResultChunk.builder().withInputStream(byteStream, totalRows);
50-
51-
if (parentStatement != null) {
52-
builder.withStatementId(parentStatement.getStatementId());
53-
}
54-
arrowResultChunk = builder.build();
55-
}
56-
5726
/**
5827
* Constructor for inline arrow chunk provider from {@link ResultData} and {@link ResultManifest}.
5928
*
@@ -123,97 +92,6 @@ public boolean isClosed() {
12392
return isClosed;
12493
}
12594

126-
private ByteArrayInputStream initializeByteStream(
127-
TFetchResultsResp resultsResp,
128-
IDatabricksSession session,
129-
IDatabricksStatementInternal parentStatement)
130-
throws DatabricksParsingException {
131-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
132-
CompressionCodec compressionType =
133-
CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata());
134-
try {
135-
byte[] serializedSchema = getSerializedSchema(resultsResp.getResultSetMetadata());
136-
if (serializedSchema != null) {
137-
baos.write(serializedSchema);
138-
}
139-
writeToByteOutputStream(
140-
compressionType, parentStatement, resultsResp.getResults().getArrowBatches(), baos);
141-
while (resultsResp.hasMoreRows) {
142-
resultsResp = session.getDatabricksClient().getMoreResults(parentStatement);
143-
writeToByteOutputStream(
144-
compressionType, parentStatement, resultsResp.getResults().getArrowBatches(), baos);
145-
}
146-
return new ByteArrayInputStream(baos.toByteArray());
147-
} catch (DatabricksSQLException | IOException e) {
148-
handleError(e);
149-
}
150-
return null;
151-
}
152-
153-
private void writeToByteOutputStream(
154-
CompressionCodec compressionCodec,
155-
IDatabricksStatementInternal parentStatement,
156-
List<TSparkArrowBatch> arrowBatchList,
157-
ByteArrayOutputStream baos)
158-
throws DatabricksSQLException, IOException {
159-
for (TSparkArrowBatch arrowBatch : arrowBatchList) {
160-
byte[] decompressedBytes =
161-
decompress(
162-
arrowBatch.getBatch(),
163-
compressionCodec,
164-
String.format(
165-
"Data fetch for inline arrow batch [%d] and statement [%s] with decompression algorithm : [%s]",
166-
arrowBatch.getRowCount(), parentStatement, compressionCodec));
167-
totalRows += arrowBatch.getRowCount();
168-
baos.write(decompressedBytes);
169-
}
170-
}
171-
172-
private byte[] getSerializedSchema(TGetResultSetMetadataResp metadata)
173-
throws DatabricksSQLException {
174-
if (metadata.getArrowSchema() != null) {
175-
return metadata.getArrowSchema();
176-
}
177-
Schema arrowSchema = hiveSchemaToArrowSchema(metadata.getSchema());
178-
try {
179-
return SchemaUtility.serialize(arrowSchema);
180-
} catch (IOException e) {
181-
handleError(e);
182-
}
183-
// should never reach here;
184-
return null;
185-
}
186-
187-
private Schema hiveSchemaToArrowSchema(TTableSchema hiveSchema)
188-
throws DatabricksParsingException {
189-
List<Field> fields = new ArrayList<>();
190-
if (hiveSchema == null) {
191-
return new Schema(fields);
192-
}
193-
try {
194-
hiveSchema
195-
.getColumns()
196-
.forEach(
197-
columnDesc -> {
198-
try {
199-
fields.add(getArrowField(columnDesc));
200-
} catch (SQLException e) {
201-
throw new RuntimeException(e);
202-
}
203-
});
204-
} catch (RuntimeException e) {
205-
handleError(e);
206-
}
207-
return new Schema(fields);
208-
}
209-
210-
private Field getArrowField(TColumnDesc columnDesc) throws SQLException {
211-
TPrimitiveTypeEntry primitiveTypeEntry = getTPrimitiveTypeOrDefault(columnDesc.getTypeDesc());
212-
ArrowType arrowType = mapThriftToArrowType(primitiveTypeEntry.getType());
213-
FieldType fieldType = new FieldType(true, arrowType, null);
214-
return new Field(columnDesc.getColumnName(), fieldType, null);
215-
}
216-
21795
@VisibleForTesting
21896
void handleError(Exception e) throws DatabricksParsingException {
21997
String errorMessage =

0 commit comments

Comments
 (0)