Skip to content

Commit d330039

Browse files
Add StreamingChunkProvider for result fetching (#1111)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> Implement streaming chunk provider that fetches results without dependency on total chunk count. Key components: - StreamingChunkProvider: Memory-bounded parallel downloads with proactive link prefetching based on a window size - ChunkLinkFetcher interface with SeaChunkLinkFetcher for SEA and Thrift API - StreamingChunkDownloadTask: Simplified download task with retry logic - Support for initial links from ResultData to avoid extra fetch calls Enable via URL parameter: EnableStreamingChunkProvider=1 ## Testing <!-- Describe how the changes have been tested--> Manual testing Unit tests Following is the perf improvement on cloud fetch for 400M records: - v2.7.4 ``` Total rows: 400000000 Time taken: 121150ms ``` - These changes ``` Total rows: 400000000 Time taken: 65769ms ``` ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent aaed05b commit d330039

25 files changed

Lines changed: 3868 additions & 116 deletions

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
### Updated
1010
- Added validation for positive integer configuration properties (RowsFetchedPerBlock, BatchInsertSize, etc.) to prevent hangs and errors when set to zero or negative values.
1111
- Updated Circuit breaker to be triggered by 429 errors too.
12+
- Refactored chunk download to keep a sliding window of chunk links. The window advances as the main thread consumes chunks. These changes can be enabled using the connection property EnableStreamingChunkProvider=1. The changes are expected to make chunk download faster and robust.
1213
- Added separate circuit breaker to handle 429 from SEA connection creation calls, and fall back to Thrift.
1314

1415
### Fixed

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,4 +1196,14 @@ public boolean getDisableOauthRefreshToken() {
11961196
public boolean isTokenFederationEnabled() {
11971197
return getParameter(DatabricksJdbcUrlParams.ENABLE_TOKEN_FEDERATION, "1").equals("1");
11981198
}
1199+
1200+
@Override
1201+
public boolean isStreamingChunkProviderEnabled() {
1202+
return getParameter(DatabricksJdbcUrlParams.ENABLE_STREAMING_CHUNK_PROVIDER).equals("1");
1203+
}
1204+
1205+
@Override
1206+
public int getLinkPrefetchWindow() {
1207+
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.LINK_PREFETCH_WINDOW));
1208+
}
11991209
}

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ public Long getChunkIndex() {
116116
return chunkIndex;
117117
}
118118

119+
/**
120+
* Returns the starting row offset for this chunk.
121+
*
122+
* @return the row offset
123+
*/
124+
public long getRowOffset() {
125+
return rowOffset;
126+
}
127+
119128
/**
120129
* Checks if the chunk link is invalid or expired.
121130
*

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowResultChunk.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private void logDownloadMetrics(
158158
double speedMBps = (contentLength / 1024.0 / 1024.0) / (downloadTimeMs / 1000.0);
159159
String baseUrl = url.split("\\?")[0];
160160

161-
LOGGER.info(
161+
LOGGER.debug(
162162
String.format(
163163
"CloudFetch download: %.4f MB/s, %d bytes in %dms from %s",
164164
speedMBps, contentLength, downloadTimeMs, baseUrl));
@@ -197,6 +197,23 @@ public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
197197
return this;
198198
}
199199

200+
/**
201+
* Sets chunk metadata directly without requiring a BaseChunkInfo object. Useful for streaming
202+
* chunk creation where metadata comes from ExternalLink.
203+
*
204+
* @param chunkIndex The index of this chunk
205+
* @param rowCount The number of rows in this chunk
206+
* @param rowOffset The starting row offset for this chunk
207+
* @return this builder
208+
*/
209+
public Builder withChunkMetadata(long chunkIndex, long rowCount, long rowOffset) {
210+
this.chunkIndex = chunkIndex;
211+
this.numRows = rowCount;
212+
this.rowOffset = rowOffset;
213+
this.status = status == null ? ChunkStatus.PENDING : status;
214+
return this;
215+
}
216+
200217
public Builder withInputStream(InputStream stream, long rowCount) {
201218
this.numRows = rowCount;
202219
this.inputStream = stream;

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 195 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3+
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createExternalLink;
34
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.getColumnInfoFromTColumnDesc;
45

56
import com.databricks.jdbc.api.impl.ComplexDataTypeParser;
67
import com.databricks.jdbc.api.impl.IExecutionResult;
8+
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
79
import com.databricks.jdbc.api.internal.IDatabricksSession;
810
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
911
import com.databricks.jdbc.common.CompressionCodec;
@@ -16,12 +18,16 @@
1618
import com.databricks.jdbc.model.client.thrift.generated.TColumnDesc;
1719
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
1820
import com.databricks.jdbc.model.client.thrift.generated.TGetResultSetMetadataResp;
21+
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
22+
import com.databricks.jdbc.model.core.ChunkLinkFetchResult;
1923
import com.databricks.jdbc.model.core.ColumnInfo;
2024
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
25+
import com.databricks.jdbc.model.core.ExternalLink;
2126
import com.databricks.jdbc.model.core.ResultData;
2227
import com.databricks.jdbc.model.core.ResultManifest;
2328
import com.google.common.annotations.VisibleForTesting;
2429
import java.util.ArrayList;
30+
import java.util.Collection;
2531
import java.util.List;
2632

2733
/** Result container for Arrow-based query results. */
@@ -69,20 +75,71 @@ public ArrowStreamResult(
6975
"Creating ArrowStreamResult with remote links for statementId: {}",
7076
statementId.toSQLExecStatementId());
7177
this.chunkProvider =
72-
new RemoteChunkProvider(
73-
statementId,
74-
resultManifest,
75-
resultData,
76-
session,
77-
httpClient,
78-
session.getConnectionContext().getCloudFetchThreadPoolSize());
78+
createRemoteChunkProvider(statementId, resultManifest, resultData, session, httpClient);
7979
}
8080
this.columnInfos =
8181
resultManifest.getSchema().getColumnCount() == 0
8282
? new ArrayList<>()
8383
: new ArrayList<>(resultManifest.getSchema().getColumns());
8484
}
8585

86+
/**
87+
* Creates the appropriate remote chunk provider based on configuration.
88+
*
89+
* @param statementId The statement ID
90+
* @param resultManifest The result manifest containing chunk metadata
91+
* @param resultData The result data containing initial external links
92+
* @param session The session for fetching additional chunks
93+
* @param httpClient The HTTP client for downloading chunk data
94+
* @return A ChunkProvider instance
95+
*/
96+
private static ChunkProvider createRemoteChunkProvider(
97+
StatementId statementId,
98+
ResultManifest resultManifest,
99+
ResultData resultData,
100+
IDatabricksSession session,
101+
IDatabricksHttpClient httpClient)
102+
throws DatabricksSQLException {
103+
104+
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
105+
106+
if (connectionContext.isStreamingChunkProviderEnabled()) {
107+
LOGGER.info(
108+
"Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId());
109+
110+
ChunkLinkFetcher linkFetcher = new SeaChunkLinkFetcher(session, statementId);
111+
CompressionCodec compressionCodec = resultManifest.getResultCompression();
112+
int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize();
113+
int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow();
114+
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
115+
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();
116+
117+
// Convert ExternalLinks to ChunkLinkFetchResult for the provider
118+
ChunkLinkFetchResult initialLinks =
119+
convertToChunkLinkFetchResult(resultData.getExternalLinks());
120+
121+
return new StreamingChunkProvider(
122+
linkFetcher,
123+
httpClient,
124+
compressionCodec,
125+
statementId,
126+
maxChunksInMemory,
127+
linkPrefetchWindow,
128+
chunkReadyTimeoutSeconds,
129+
cloudFetchSpeedThreshold,
130+
initialLinks);
131+
} else {
132+
// Use the original RemoteChunkProvider
133+
return new RemoteChunkProvider(
134+
statementId,
135+
resultManifest,
136+
resultData,
137+
session,
138+
httpClient,
139+
connectionContext.getCloudFetchThreadPoolSize());
140+
}
141+
}
142+
86143
public ArrowStreamResult(
87144
TFetchResultsResp resultsResp,
88145
boolean isInlineArrow,
@@ -110,16 +167,63 @@ public ArrowStreamResult(
110167
if (isInlineArrow) {
111168
this.chunkProvider = new InlineChunkProvider(resultsResp, parentStatement, session);
112169
} else {
113-
CompressionCodec compressionCodec =
114-
CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata());
115170
this.chunkProvider =
116-
new RemoteChunkProvider(
117-
parentStatement,
118-
resultsResp,
119-
session,
120-
httpClient,
121-
session.getConnectionContext().getCloudFetchThreadPoolSize(),
122-
compressionCodec);
171+
createThriftRemoteChunkProvider(resultsResp, parentStatement, session, httpClient);
172+
}
173+
}
174+
175+
/**
176+
* Creates the appropriate remote chunk provider for Thrift based on configuration.
177+
*
178+
* @param resultsResp The Thrift fetch results response
179+
* @param parentStatement The parent statement for fetching additional chunks
180+
* @param session The session for fetching additional chunks
181+
* @param httpClient The HTTP client for downloading chunk data
182+
* @return A ChunkProvider instance
183+
*/
184+
private static ChunkProvider createThriftRemoteChunkProvider(
185+
TFetchResultsResp resultsResp,
186+
IDatabricksStatementInternal parentStatement,
187+
IDatabricksSession session,
188+
IDatabricksHttpClient httpClient)
189+
throws DatabricksSQLException {
190+
191+
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
192+
CompressionCodec compressionCodec =
193+
CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata());
194+
195+
if (connectionContext.isStreamingChunkProviderEnabled()) {
196+
StatementId statementId = parentStatement.getStatementId();
197+
LOGGER.info("Using StreamingChunkProvider for Thrift statementId: {}", statementId);
198+
199+
ChunkLinkFetcher linkFetcher = new ThriftChunkLinkFetcher(session, statementId);
200+
int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize();
201+
int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow();
202+
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
203+
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();
204+
205+
// Convert initial Thrift links to ChunkLinkFetchResult
206+
ChunkLinkFetchResult initialLinks = convertThriftLinksToChunkLinkFetchResult(resultsResp);
207+
208+
return new StreamingChunkProvider(
209+
linkFetcher,
210+
httpClient,
211+
compressionCodec,
212+
statementId,
213+
maxChunksInMemory,
214+
linkPrefetchWindow,
215+
chunkReadyTimeoutSeconds,
216+
cloudFetchSpeedThreshold,
217+
initialLinks);
218+
} else {
219+
// Use the original RemoteChunkProvider
220+
return new RemoteChunkProvider(
221+
parentStatement,
222+
resultsResp,
223+
session,
224+
httpClient,
225+
connectionContext.getCloudFetchThreadPoolSize(),
226+
compressionCodec);
123227
}
124228
}
125229

@@ -268,4 +372,79 @@ private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
268372
columnInfos.add(getColumnInfoFromTColumnDesc(tColumnDesc));
269373
}
270374
}
375+
376+
/**
377+
* Converts a collection of ExternalLinks to a ChunkLinkFetchResult.
378+
*
379+
* @param externalLinks The external links to convert, may be null
380+
* @return A ChunkLinkFetchResult, or null if input is null or empty
381+
*/
382+
private static ChunkLinkFetchResult convertToChunkLinkFetchResult(
383+
Collection<ExternalLink> externalLinks) {
384+
if (externalLinks == null || externalLinks.isEmpty()) {
385+
return null;
386+
}
387+
388+
List<ExternalLink> linkList =
389+
externalLinks instanceof List
390+
? (List<ExternalLink>) externalLinks
391+
: new ArrayList<>(externalLinks);
392+
393+
// Derive hasMore and nextRowOffset from last link (SEA style)
394+
ExternalLink lastLink = linkList.get(linkList.size() - 1);
395+
boolean hasMore = lastLink.getNextChunkIndex() != null;
396+
long nextFetchIndex = hasMore ? lastLink.getNextChunkIndex() : -1;
397+
long nextRowOffset = lastLink.getRowOffset() + lastLink.getRowCount();
398+
399+
return ChunkLinkFetchResult.of(linkList, hasMore, nextFetchIndex, nextRowOffset);
400+
}
401+
402+
/**
403+
* Converts Thrift result links to a ChunkLinkFetchResult.
404+
*
405+
* <p>This method converts TSparkArrowResultLink from the Thrift response to the unified
406+
* ChunkLinkFetchResult format used by StreamingChunkProvider.
407+
*
408+
* @param resultsResp The Thrift fetch results response containing initial links
409+
* @return A ChunkLinkFetchResult, or null if no links
410+
*/
411+
private static ChunkLinkFetchResult convertThriftLinksToChunkLinkFetchResult(
412+
TFetchResultsResp resultsResp) {
413+
List<TSparkArrowResultLink> resultLinks = resultsResp.getResults().getResultLinks();
414+
if (resultLinks == null || resultLinks.isEmpty()) {
415+
return null;
416+
}
417+
418+
List<ExternalLink> chunkLinks = new ArrayList<>();
419+
int lastIndex = resultLinks.size() - 1;
420+
boolean hasMoreRows = resultsResp.hasMoreRows;
421+
422+
for (int linkIndex = 0; linkIndex < resultLinks.size(); linkIndex++) {
423+
TSparkArrowResultLink thriftLink = resultLinks.get(linkIndex);
424+
425+
// Convert Thrift link to ExternalLink (sets chunkIndex, rowOffset, rowCount, etc.)
426+
ExternalLink externalLink = createExternalLink(thriftLink, linkIndex);
427+
428+
// For the last link, set nextChunkIndex based on hasMoreRows
429+
if (linkIndex == lastIndex) {
430+
if (hasMoreRows) {
431+
// More chunks available - next fetch should start from lastIndex + 1
432+
externalLink.setNextChunkIndex((long) linkIndex + 1);
433+
}
434+
// If hasMoreRows is false, nextChunkIndex remains null (end of stream)
435+
} else {
436+
// Not the last link - next chunk follows immediately
437+
externalLink.setNextChunkIndex((long) linkIndex + 1);
438+
}
439+
440+
chunkLinks.add(externalLink);
441+
}
442+
443+
// Calculate next fetch positions from last link
444+
TSparkArrowResultLink lastThriftLink = resultLinks.get(lastIndex);
445+
long nextFetchIndex = hasMoreRows ? lastIndex + 1 : -1;
446+
long nextRowOffset = lastThriftLink.getStartRowOffset() + lastThriftLink.getRowCount();
447+
448+
return ChunkLinkFetchResult.of(chunkLinks, hasMoreRows, nextFetchIndex, nextRowOffset);
449+
}
271450
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkLinkDownloadService.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
import com.databricks.jdbc.exception.DatabricksValidationException;
1010
import com.databricks.jdbc.log.JdbcLogger;
1111
import com.databricks.jdbc.log.JdbcLoggerFactory;
12+
import com.databricks.jdbc.model.core.ChunkLinkFetchResult;
1213
import com.databricks.jdbc.model.core.ExternalLink;
1314
import java.time.Instant;
14-
import java.util.Collection;
1515
import java.util.Map;
1616
import java.util.concurrent.CompletableFuture;
1717
import java.util.concurrent.ConcurrentHashMap;
@@ -219,16 +219,18 @@ private void triggerNextBatchDownload() {
219219
CompletableFuture.runAsync(
220220
() -> {
221221
try {
222-
Collection<ExternalLink> links =
223-
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex);
222+
// rowOffset is 0 here as this service is used by RemoteChunkProvider (SEA-only)
223+
// which fetches by chunkIndex, not rowOffset
224+
ChunkLinkFetchResult result =
225+
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex, 0);
224226
LOGGER.info(
225227
"Retrieved {} links for batch starting at {} for statement id {}",
226-
links.size(),
228+
result.getChunkLinks().size(),
227229
batchStartIndex,
228230
statementId);
229231

230232
// Complete futures for all chunks in this batch
231-
for (ExternalLink link : links) {
233+
for (ExternalLink link : result.getChunkLinks()) {
232234
CompletableFuture<ExternalLink> future =
233235
chunkIndexToLinkFuture.get(link.getChunkIndex());
234236
if (future != null) {
@@ -241,9 +243,12 @@ private void triggerNextBatchDownload() {
241243
}
242244

243245
// Update next batch start index and trigger next batch
244-
if (!links.isEmpty()) {
246+
if (!result.getChunkLinks().isEmpty()) {
245247
long maxChunkIndex =
246-
links.stream().mapToLong(ExternalLink::getChunkIndex).max().getAsLong();
248+
result.getChunkLinks().stream()
249+
.mapToLong(ExternalLink::getChunkIndex)
250+
.max()
251+
.getAsLong();
247252
nextBatchStartIndex.set(maxChunkIndex + 1);
248253
LOGGER.debug("Updated next batch start index to {}", maxChunkIndex + 1);
249254

0 commit comments

Comments
 (0)