Skip to content

Commit b8d30b7

Browse files
committed
Add batched link refresh reconciliation for bounded SEA API
During coalesced link refresh, the server may return links for chunks not yet in the provider's map (newly-discovered chunks beyond highestKnownChunkIndex). Previously these were silently skipped. Now: create new chunks from refresh response links, update highestKnownChunkIndex, and set endOfStreamReached from the response's hasMore flag. Follows the per-chunk state-machine reconciliation from the bounded SEA API spec. Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 03f191b commit b8d30b7

1 file changed

Lines changed: 21 additions & 3 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -596,22 +596,40 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
596596
// Single batch FetchResults RPC from the lowest expired offset
597597
ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset);
598598

599-
// Update ALL pre-download chunks that received fresh links.
600-
// Always overwrite even if the current link hasn't expired yet, since the
601-
// server-provided link has a later expiry and prevents near-expiry races.
599+
// Reconcile ALL links from the refresh response with local chunk state.
602600
for (ExternalLink link : result.getChunkLinks()) {
603601
ArrowResultChunk c = chunks.get(link.getChunkIndex());
604602
if (c != null) {
603+
// Existing chunk: update link only for pre-download states.
604+
// DOWNLOADING stays as-is (download task owns the state machine).
605+
// DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory).
605606
ChunkStatus status = c.getStatus();
606607
if (status == ChunkStatus.PENDING
607608
|| status == ChunkStatus.URL_FETCHED
608609
|| status == ChunkStatus.DOWNLOAD_FAILED
609610
|| status == ChunkStatus.DOWNLOAD_RETRY) {
610611
c.setChunkLink(link);
611612
}
613+
} else {
614+
// New chunk from server not yet in our map — create it.
615+
// This handles the bounded SEA case where the refresh response
616+
// may include chunks beyond our current highestKnownChunkIndex.
617+
try {
618+
createChunkFromLink(link);
619+
} catch (Exception e) {
620+
LOGGER.debug(
621+
"Failed to create chunk {} from refresh response: {}",
622+
link.getChunkIndex(),
623+
e.getMessage());
624+
}
612625
}
613626
}
614627

628+
// Update end-of-stream from refresh response
629+
if (!result.hasMore()) {
630+
endOfStreamReached = true;
631+
}
632+
615633
// Check if our target chunk was refreshed by the batch
616634
targetChunk = chunks.get(chunkIndex);
617635
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {

0 commit comments

Comments
 (0)