Skip to content

Commit 95fcfc9

Browse files
committed
Address review feedback on bounded SEA API PR
P0-1: Remove redundant chunk.setStatus(DOWNLOAD_FAILED) in inner catch — defer entirely to finally block. Fixes StreamingChunkDownloadTaskTest. P0-2: Add NEXT_CHANGELOG.md entry under ### Added for UseBoundedSeaApi. P1-1: Call triggerDownloads() after reconciliation creates new chunks from refresh response — prevents newly-discovered chunks sitting PENDING. P1-2/P1-3: Un-gated changes (new chunk creation, EOS from refresh, triggerDownloads) are intentional parity fixes for all EnableStreamingChunkProvider=1 users. EnableStreamingChunkProvider defaults to off, so default users are unaffected. P1-4: Revert RuntimeException from inner catch — DatabricksError is caught by outer catch(Throwable) and fails immediately (no retry), matching ChunkDownloadTask behavior exactly. NPE/ISE won't be retried. P2-1: Always send row_offset (even 0 for chunk 0) when bounded SEA enabled — explicit is safer than relying on server default. P2-3: Update nextLinkFetchIndex after reconciliation to avoid prefetch thread re-fetching chunks already discovered via refresh. P2-5: Add "Requires server support" to connection property help text. Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 271c4c2 commit 95fcfc9

5 files changed

Lines changed: 21 additions & 9 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
## [Unreleased]
44

55
### Added
6+
- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
7+
- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends,
8+
ensuring consistent behavior for SQL warehouses regardless of underlying
9+
protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.
10+
- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support.
611

712
### Updated
813

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ public Void call() throws DatabricksSQLException {
8383
taskTotalMs,
8484
retries);
8585

86-
} catch (IOException | SQLException | RuntimeException e) {
86+
} catch (IOException | SQLException e) {
8787
retries++;
8888
if (retries >= MAX_RETRIES) {
8989
LOGGER.error(
9090
"Failed to download chunk {} after {} attempts: {}",
9191
chunk.getChunkIndex(),
9292
MAX_RETRIES,
9393
e.getMessage());
94-
chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED);
94+
// Status set to DOWNLOAD_FAILED in the finally block
9595
throw new DatabricksSQLException(
9696
String.format(
9797
"Failed to download chunk %d after %d attempts",

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -611,9 +611,9 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
611611
c.setChunkLink(link);
612612
}
613613
} else {
614-
// New chunk from server not yet in our map — create it.
615-
// This handles the bounded SEA case where the refresh response
616-
// may include chunks beyond our current highestKnownChunkIndex.
614+
// Server returned a chunk not yet in our map — create it.
615+
// Handles cases where refresh response includes chunks beyond
616+
// our current highestKnownChunkIndex.
617617
try {
618618
createChunkFromLink(link);
619619
} catch (Exception e) {
@@ -625,11 +625,18 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
625625
}
626626
}
627627

628-
// Update end-of-stream from refresh response
628+
// Update end-of-stream and prefetch index from refresh response
629629
if (!result.hasMore()) {
630630
endOfStreamReached = true;
631+
} else if (result.getNextFetchIndex() > nextLinkFetchIndex) {
632+
// Avoid re-fetching chunks that the refresh already discovered
633+
nextLinkFetchIndex = result.getNextFetchIndex();
634+
nextRowOffsetToFetch = result.getNextRowOffset();
631635
}
632636

637+
// Trigger downloads for any newly-created chunks
638+
triggerDownloads();
639+
633640
// Check if our target chunk was refreshed by the batch
634641
targetChunk = chunks.get(chunkIndex);
635642
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public enum DatabricksJdbcUrlParams {
206206
"1"),
207207
USE_BOUNDED_SEA_API(
208208
"UseBoundedSeaApi",
209-
"Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count",
209+
"Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.",
210210
"0"),
211211
DISABLE_OAUTH_REFRESH_TOKEN(
212212
"DisableOauthRefreshToken",

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ public ChunkLinkFetchResult getResultChunks(
529529
GetStatementResultChunkNRequest request =
530530
new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex);
531531
String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex);
532-
// Bounded SEA API: send row_offset to support future >100GB results and cluster-side fetch
533-
if (connectionContext.isBoundedSeaApiEnabled() && chunkStartRowOffset > 0) {
532+
// Bounded SEA API: always send row_offset (even 0 for chunk 0)
533+
if (connectionContext.isBoundedSeaApiEnabled()) {
534534
path = path + "?row_offset=" + chunkStartRowOffset;
535535
}
536536
try {

0 commit comments

Comments
 (0)