From e6fd395250b22f7c07d34ddf8cdfb900b10a7e45 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 14:58:46 +0530 Subject: [PATCH 01/10] Add bounded SEA API support for CloudFetch (UseBoundedSeaApi=0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part 1 of bounded SEA API compliance for CloudFetch: 1. New connection property UseBoundedSeaApi (default 0/off). When enabled: - Sends row_offset query parameter on GetResultData requests - Forces StreamingChunkProvider (which uses next_chunk_index, not total_chunk_count) even when streaming is explicitly disabled 2. StreamingChunkProvider already uses next_chunk_index for continuation and end-of-stream detection — no changes needed to its core logic. 3. Legacy RemoteChunkProvider (uses total_chunk_count) is bypassed when bounded SEA is enabled. row_offset is derived from the previous link's row_offset + row_count and sent as a query parameter on /sql/statements/{id}/result/chunks/{idx}. This is required for future >100GB results and cluster-side fetch. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksConnectionContext.java | 5 +++++ .../databricks/jdbc/api/impl/arrow/ArrowStreamResult.java | 4 +++- .../databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java | 4 ++-- .../jdbc/api/internal/IDatabricksConnectionContext.java | 3 +++ .../com/databricks/jdbc/common/DatabricksJdbcUrlParams.java | 4 ++++ .../jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java | 4 ++++ 6 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index 956695ab51..6a793a1231 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1"); } + @Override + public boolean isBoundedSeaApiEnabled() { + return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1"); + } + @Override public int getThriftMaxBatchesInMemory() { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 1569b7efb8..bd8949f2f4 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -102,7 +102,9 @@ private static ChunkProvider createRemoteChunkProvider( IDatabricksConnectionContext connectionContext = session.getConnectionContext(); - if (connectionContext.isStreamingChunkProviderEnabled()) { + // Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count + if (connectionContext.isStreamingChunkProviderEnabled() + || connectionContext.isBoundedSeaApiEnabled()) { LOGGER.info( "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java index 9be8dccf10..7df1586cca 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId) @Override public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset) throws SQLException { - // SEA uses startChunkIndex; startRowOffset is ignored + // SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API LOGGER.debug( "Fetching links starting from chunk index {} for statement {}", startChunkIndex, @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset @Override public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException { - // SEA uses chunkIndex; rowOffset is ignored + // SEA uses chunkIndex; rowOffset is passed through for bounded SEA API LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId); ChunkLinkFetchResult result = diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index 8ca5ae0cd2..5754ec0daf 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() { */ boolean isCloudFetchEnabled(); + /** Returns whether bounded SEA API mode is enabled for CloudFetch. */ + boolean isBoundedSeaApiEnabled(); + /** * Returns the maximum number of batches to keep in memory for Thrift streaming. * diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 639288248d..70ec2d4842 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -204,6 +204,10 @@ public enum DatabricksJdbcUrlParams { "EnableSeaSyncMetadata", "Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode", "1"), + USE_BOUNDED_SEA_API( + "UseBoundedSeaApi", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count", + "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", "Disable requesting OAuth refresh tokens (omit offline_access unless explicitly provided)", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 4b73982445..21cee3ce6c 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -529,6 +529,10 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + // Bounded SEA API: send row_offset to support future >100GB results and cluster-side fetch + if (connectionContext.isBoundedSeaApiEnabled() && chunkStartRowOffset > 0) { + path = path + "?row_offset=" + chunkStartRowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN")); From 5d577dc02d07960e0e2ef6489d3dfd06659a06e5 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 15:04:03 +0530 Subject: [PATCH 02/10] Add batched link refresh reconciliation for bounded SEA API During coalesced link refresh, the server may return links for chunks not yet in the provider's map (newly-discovered chunks beyond highestKnownChunkIndex). Previously these were silently skipped. Now: create new chunks from refresh response links, update highestKnownChunkIndex, and set endOfStreamReached from the response's hasMore flag. Follows the per-chunk state-machine reconciliation from the bounded SEA API spec. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../impl/arrow/StreamingChunkProvider.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index dbb1bbe789..4e02d6e0af 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -596,12 +596,13 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti // Single batch FetchResults RPC from the lowest expired offset ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset); - // Update ALL pre-download chunks that received fresh links. - // Always overwrite even if the current link hasn't expired yet, since the - // server-provided link has a later expiry and prevents near-expiry races. + // Reconcile ALL links from the refresh response with local chunk state. for (ExternalLink link : result.getChunkLinks()) { ArrowResultChunk c = chunks.get(link.getChunkIndex()); if (c != null) { + // Existing chunk: update link only for pre-download states. + // DOWNLOADING stays as-is (download task owns the state machine). + // DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory). ChunkStatus status = c.getStatus(); if (status == ChunkStatus.PENDING || status == ChunkStatus.URL_FETCHED @@ -609,9 +610,26 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti || status == ChunkStatus.DOWNLOAD_RETRY) { c.setChunkLink(link); } + } else { + // New chunk from server not yet in our map — create it. + // This handles the bounded SEA case where the refresh response + // may include chunks beyond our current highestKnownChunkIndex. + try { + createChunkFromLink(link); + } catch (Exception e) { + LOGGER.debug( + "Failed to create chunk {} from refresh response: {}", + link.getChunkIndex(), + e.getMessage()); + } } } + // Update end-of-stream from refresh response + if (!result.hasMore()) { + endOfStreamReached = true; + } + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { From 271c4c2838481ec5b9cc1f00f269be3e1d273b71 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 15:19:43 +0530 Subject: [PATCH 03/10] Bring StreamingChunkDownloadTask to parity with ChunkDownloadTask MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes 3 gaps found by comparing with the legacy ChunkDownloadTask: 1. Outer catch(Throwable) + exception chaining in finally: uncaught exceptions were lost — the finally block created a generic exception without the original cause. Now captures uncaughtException and chains it, matching ChunkDownloadTask's pattern. 2. Thread context propagation: download threads had no connection context or statementId for telemetry/logging. Now captures caller's context via DatabricksThreadContextHolder and clears in finally. 3. Download timing: added task-level timing log (totalMs, retries) matching ChunkDownloadTask's diagnostics. Also includes the RuntimeException catch (parity with PR #1302). Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../arrow/StreamingChunkDownloadTask.java | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index efc664b247..15a7e315d0 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -1,6 +1,8 @@ package com.databricks.jdbc.api.impl.arrow; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; import com.databricks.jdbc.dbclient.IDatabricksHttpClient; import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.log.JdbcLogger; @@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable { private final LinkRefresher linkRefresher; private final double cloudFetchSpeedThreshold; + // Capture caller's thread context for telemetry/logging on the download thread + private final IDatabricksConnectionContext connectionContext; + private final String statementId; + public StreamingChunkDownloadTask( ArrowResultChunk chunk, IDatabricksHttpClient httpClient, @@ -40,13 +46,21 @@ public StreamingChunkDownloadTask( this.compressionCodec = compressionCodec; this.linkRefresher = linkRefresher; this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold; + this.connectionContext = DatabricksThreadContextHolder.getConnectionContext(); + this.statementId = DatabricksThreadContextHolder.getStatementId(); } @Override public Void call() throws DatabricksSQLException { int retries = 0; boolean downloadSuccessful = false; + Throwable uncaughtException = null; + + // Propagate caller's thread context for telemetry/logging + DatabricksThreadContextHolder.setConnectionContext(this.connectionContext); + DatabricksThreadContextHolder.setStatementId(this.statementId); + long taskStartTime = System.nanoTime(); try { while (!downloadSuccessful) { try { @@ -62,9 +76,14 @@ public Void call() throws DatabricksSQLException { chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold); downloadSuccessful = true; - LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex()); + long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000; + LOGGER.debug( + "Chunk download complete: chunkIndex={}, totalMs={}, retries={}", + chunk.getChunkIndex(), + taskTotalMs, + retries); - } catch (IOException | SQLException e) { + } catch (IOException | SQLException | RuntimeException e) { retries++; if (retries >= MAX_RETRIES) { LOGGER.error( @@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - // Status will be set to DOWNLOAD_FAILED in the finally block + chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", @@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException { } } } + } catch (Throwable t) { + uncaughtException = t; + throw t; } finally { if (downloadSuccessful) { chunk.getChunkReadyFuture().complete(null); } else { + LOGGER.info( + "Download failed for chunk {}: {}", + chunk.getChunkIndex(), + uncaughtException != null ? uncaughtException.getMessage() : "unknown"); chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); chunk .getChunkReadyFuture() .completeExceptionally( new DatabricksSQLException( "Download failed for chunk " + chunk.getChunkIndex(), + uncaughtException, DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR)); } + + DatabricksThreadContextHolder.clearAllContext(); } return null; From 95fcfc9a69a81ea51bff4af83fe063161b0d8354 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Mon, 25 May 2026 17:05:23 +0530 Subject: [PATCH 04/10] Address review feedback on bounded SEA API PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0-1: Remove redundant chunk.setStatus(DOWNLOAD_FAILED) in inner catch — defer entirely to finally block. Fixes StreamingChunkDownloadTaskTest. P0-2: Add NEXT_CHANGELOG.md entry under ### Added for UseBoundedSeaApi. P1-1: Call triggerDownloads() after reconciliation creates new chunks from refresh response — prevents newly-discovered chunks sitting PENDING. P1-2/P1-3: Un-gated changes (new chunk creation, EOS from refresh, triggerDownloads) are intentional parity fixes for all EnableStreamingChunkProvider=1 users. EnableStreamingChunkProvider defaults to off, so default users are unaffected. P1-4: Revert RuntimeException from inner catch — DatabricksError is caught by outer catch(Throwable) and fails immediately (no retry), matching ChunkDownloadTask behavior exactly. NPE/ISE won't be retried. P2-1: Always send row_offset (even 0 for chunk 0) when bounded SEA enabled — explicit is safer than relying on server default. P2-3: Update nextLinkFetchIndex after reconciliation to avoid prefetch thread re-fetching chunks already discovered via refresh. P2-5: Add "Requires server support" to connection property help text. Co-authored-by: Isaac Signed-off-by: Gopal Lal --- NEXT_CHANGELOG.md | 5 +++++ .../impl/arrow/StreamingChunkDownloadTask.java | 4 ++-- .../api/impl/arrow/StreamingChunkProvider.java | 15 +++++++++++---- .../jdbc/common/DatabricksJdbcUrlParams.java | 2 +- .../impl/sqlexec/DatabricksSdkClient.java | 4 ++-- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index d27a25e263..708fbe50cd 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -3,6 +3,11 @@ ## [Unreleased] ### Added +- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running). +- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends, + ensuring consistent behavior for SQL warehouses regardless of underlying + protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`. +- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support. ### Updated diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index 15a7e315d0..9d9c183ee5 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -83,7 +83,7 @@ public Void call() throws DatabricksSQLException { taskTotalMs, retries); - } catch (IOException | SQLException | RuntimeException e) { + } catch (IOException | SQLException e) { retries++; if (retries >= MAX_RETRIES) { LOGGER.error( @@ -91,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); + // Status set to DOWNLOAD_FAILED in the finally block throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index 4e02d6e0af..21225cc849 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -611,9 +611,9 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti c.setChunkLink(link); } } else { - // New chunk from server not yet in our map — create it. - // This handles the bounded SEA case where the refresh response - // may include chunks beyond our current highestKnownChunkIndex. + // Server returned a chunk not yet in our map — create it. + // Handles cases where refresh response includes chunks beyond + // our current highestKnownChunkIndex. try { createChunkFromLink(link); } catch (Exception e) { @@ -625,11 +625,18 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti } } - // Update end-of-stream from refresh response + // Update end-of-stream and prefetch index from refresh response if (!result.hasMore()) { endOfStreamReached = true; + } else if (result.getNextFetchIndex() > nextLinkFetchIndex) { + // Avoid re-fetching chunks that the refresh already discovered + nextLinkFetchIndex = result.getNextFetchIndex(); + nextRowOffsetToFetch = result.getNextRowOffset(); } + // Trigger downloads for any newly-created chunks + triggerDownloads(); + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 70ec2d4842..b9a70f1a8a 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -206,7 +206,7 @@ public enum DatabricksJdbcUrlParams { "1"), USE_BOUNDED_SEA_API( "UseBoundedSeaApi", - "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.", "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 21cee3ce6c..b714f2a385 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -529,8 +529,8 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); - // Bounded SEA API: send row_offset to support future >100GB results and cluster-side fetch - if (connectionContext.isBoundedSeaApiEnabled() && chunkStartRowOffset > 0) { + // Bounded SEA API: always send row_offset (even 0 for chunk 0) + if (connectionContext.isBoundedSeaApiEnabled()) { path = path + "?row_offset=" + chunkStartRowOffset; } try { From 7e0b2c1160f222fe1330af7787f659a2c3ea47ea Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 13:54:56 +0530 Subject: [PATCH 05/10] Fix race condition in createChunkFromLink and non-atomic fetch position updates P1-1: Use putIfAbsent in createChunkFromLink to prevent double row-count and chunk replacement when called concurrently from prefetch and download threads. Without this, a race could leave a CompletableFuture that is never completed, causing consumer hangs. P1-2: Bundle nextLinkFetchIndex + nextRowOffsetToFetch into an immutable FetchPosition holder updated via volatile reference. This ensures the prefetch thread always reads a consistent (index, rowOffset) pair, which matters for bounded SEA API where row_offset is used by the server. P2-1: Bump reconciliation failure log from DEBUG to WARN for production visibility. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../impl/arrow/StreamingChunkProvider.java | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index 21225cc849..a4d12e88bd 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -47,6 +47,17 @@ */ public class StreamingChunkProvider implements ChunkProvider { + /** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */ + private static final class FetchPosition { + final long chunkIndex; + final long rowOffset; + + FetchPosition(long chunkIndex, long rowOffset) { + this.chunkIndex = chunkIndex; + this.rowOffset = rowOffset; + } + } + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(StreamingChunkProvider.class); private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-"; @@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider { // - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency private final AtomicLong currentChunkIndex = new AtomicLong(-1); private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1); - private volatile long nextLinkFetchIndex = 0; - private volatile long nextRowOffsetToFetch = 0; + // Bundled as an immutable pair for atomic reads/writes across threads. + // The prefetch thread reads this (fetchNextLinkBatch) while the download thread + // may update it (getRefreshedLink reconciliation). A volatile reference to an + // immutable holder ensures both fields are always read consistently. + private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0); private final AtomicLong nextDownloadIndex = new AtomicLong(0); // State flags @@ -347,11 +361,11 @@ private void linkPrefetchLoop() { long targetIndex = currentChunkIndex.get() + linkPrefetchWindow; // Wait if we're caught up - while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) { + while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) { if (closed) break; LOGGER.debug( "Prefetch caught up, waiting for consumer. next={}, target={}", - nextLinkFetchIndex, + nextFetchPosition.chunkIndex, targetIndex); consumerAdvanced.await(); targetIndex = currentChunkIndex.get() + linkPrefetchWindow; @@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException { return; } + FetchPosition pos = nextFetchPosition; LOGGER.debug( "Fetching links starting from index {}, row offset {} for statement {}", - nextLinkFetchIndex, - nextRowOffsetToFetch, + pos.chunkIndex, + pos.rowOffset, statementId); - ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch); + ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset); if (result.isEndOfStream()) { LOGGER.info("End of stream reached for statement {}", statementId); @@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException { createChunkFromLink(link); } - // Update next fetch positions + // Update next fetch position atomically if (result.hasMore()) { - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } else { endOfStreamReached = true; LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId); @@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) createChunkFromLink(link); } - // Set next fetch positions using unified API + // Set next fetch position atomically if (initialLinks.hasMore()) { - nextLinkFetchIndex = initialLinks.getNextFetchIndex(); - nextRowOffsetToFetch = initialLinks.getNextRowOffset(); + FetchPosition pos = + new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset()); + nextFetchPosition = pos; LOGGER.debug( "Next fetch position set to chunk index {}, row offset {} from initial links", - nextLinkFetchIndex, - nextRowOffsetToFetch); + pos.chunkIndex, + pos.rowOffset); } else { endOfStreamReached = true; LOGGER.info("End of stream reached from initial links for statement {}", statementId); @@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) */ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException { long chunkIndex = link.getChunkIndex(); - if (chunks.containsKey(chunkIndex)) { - LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); - return; - } - long rowCount = link.getRowCount(); long rowOffset = link.getRowOffset(); @@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce .build(); chunk.setChunkLink(link); - chunks.put(chunkIndex, chunk); + + // Atomic insert — if another thread already created this chunk, skip. + // This is safe because createChunkFromLink can be called concurrently from + // the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink). + ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk); + if (existing != null) { + LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); + return; + } + highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex)); totalRowCount.addAndGet(rowCount); @@ -617,7 +636,7 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti try { createChunkFromLink(link); } catch (Exception e) { - LOGGER.debug( + LOGGER.warn( "Failed to create chunk {} from refresh response: {}", link.getChunkIndex(), e.getMessage()); @@ -625,13 +644,14 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti } } - // Update end-of-stream and prefetch index from refresh response + // Update end-of-stream and prefetch position from refresh response if (!result.hasMore()) { endOfStreamReached = true; - } else if (result.getNextFetchIndex() > nextLinkFetchIndex) { - // Avoid re-fetching chunks that the refresh already discovered - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + } else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) { + // Avoid re-fetching chunks that the refresh already discovered. + // Atomic update of both fields via immutable holder. + nextFetchPosition = + new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } // Trigger downloads for any newly-created chunks From 26823e33ba459424d0fde31281f369fa44b4b3ee Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 22:31:03 +0530 Subject: [PATCH 06/10] Stop depending on manifest.total_chunk_count in bounded SEA CloudFetch path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the bounded SEA API contract, drivers must not depend on manifest.{chunks, total_chunk_count, total_row_count}. Pass null for totalChunkCount when converting initial ExternalLinks for StreamingChunkProvider — the provider derives end-of-stream from next_chunk_index on ExternalLink instead. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/arrow/ArrowStreamResult.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index bd8949f2f4..8ef7871658 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -115,10 +115,13 @@ private static ChunkProvider createRemoteChunkProvider( int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - // Convert ExternalLinks to ChunkLinkFetchResult for the provider + // Convert ExternalLinks to ChunkLinkFetchResult for the provider. + // Bounded SEA API: pass null for totalChunkCount — we must not depend on + // manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract. + Long totalChunkCount = + connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount(); ChunkLinkFetchResult initialLinks = - convertToChunkLinkFetchResult( - resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); + convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount); return new StreamingChunkProvider( linkFetcher, From 600446699b338aa8aef9a4cf6aff0e3a63bda0bf Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 22:50:12 +0530 Subject: [PATCH 07/10] Stop depending on manifest.total_row_count in bounded SEA API for isLast/isAfterLast When boundedSeaApiEnabled=true, isLast() and isAfterLast() now use hasNext() instead of resultSetMetaData.getTotalRows(). The bounded SEA API contract does not guarantee manifest.total_row_count is populated; the chunk providers derive end-of-stream from next_chunk_index instead. Gated behind isBoundedSeaApiEnabled() + ArrowStreamResult instanceof check so existing Thrift and non-bounded SEA behavior is unchanged. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksResultSet.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index 5406994072..5461d77d96 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -77,6 +77,7 @@ enum ResultSetType { private ResultSetType resultSetType = ResultSetType.UNASSIGNED; private boolean complexDatatypeSupport = false; + private boolean boundedSeaApiEnabled = false; // Cached telemetry collector resolved once at construction time to avoid // per-row overhead in next(). The connection-to-collector mapping is stable @@ -128,6 +129,7 @@ public DatabricksResultSet( resultSetMetaData = null; } this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled(); + this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled(); this.statementType = statementType; this.updateCount = null; this.parentStatement = parentStatement; @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException { public boolean isAfterLast() throws SQLException { checkIfClosed(); // Account for client-side maxRows truncation - return truncatedByMaxRows - || executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); + if (truncatedByMaxRows) { + return true; + } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + // which derives end-of-stream from next_chunk_index via the chunk provider. + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } + return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); } @Override @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException { || executionResult instanceof StreamingInlineArrowResult) { return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1; } From ea2011042d11c05d7f3a9d06f9389e39cc4d1994 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 12:12:13 +0530 Subject: [PATCH 08/10] Add SEA inline Arrow support (UseBoundedSeaApi=1 + EnableQueryResultDownload=0) When UseBoundedSeaApi=1 AND EnableQueryResultDownload=0: 1. Session conf: sends can_cloud_download=false to tell server to inline results instead of using cloud storage. 2. Execute request: uses ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS (instead of JSON_ARRAY + INLINE). This gets Arrow IPC data in the attachment field rather than JSON in data_array. 3. New SeaInlineArrowChunkProvider: lazily fetches inline Arrow chunks via GetResultData with row_offset. Similar to Thrift's LazyThriftInlineArrowResult but using SEA's ArrowIPC format in the attachment field. Decompresses via DecompressionUtil, creates ArrowResultChunk per chunk. End-of-stream when nextChunkIndex is null. 4. getResultChunksData overload: accepts rowOffset parameter, appends ?row_offset=N when bounded SEA is enabled. Without UseBoundedSeaApi=1, EnableQueryResultDownload=0 continues to use the existing JSON_ARRAY inline format (unchanged). Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../api/impl/arrow/ArrowStreamResult.java | 12 +- .../arrow/SeaInlineArrowChunkProvider.java | 185 ++++++++++++++++++ .../impl/sqlexec/DatabricksSdkClient.java | 42 +++- 3 files changed, 232 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 8ef7871658..0a652766d2 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -64,7 +64,17 @@ public ArrowStreamResult( this.session = session; // Check if the result data contains the arrow data inline boolean isInlineArrow = resultData.getAttachment() != null; - if (isInlineArrow) { + IDatabricksConnectionContext connectionContext = session.getConnectionContext(); + if (isInlineArrow + && connectionContext.isBoundedSeaApiEnabled() + && !connectionContext.isCloudFetchEnabled()) { + // Bounded SEA inline Arrow: multi-chunk lazy fetch via GetResultData with row_offset + LOGGER.debug( + "Creating ArrowStreamResult with SEA inline Arrow for statementId: {}", + statementId.toSQLExecStatementId()); + this.chunkProvider = + new SeaInlineArrowChunkProvider(resultData, resultManifest, statementId, session); + } else if (isInlineArrow) { LOGGER.debug( "Creating ArrowStreamResult with inline attachment for statementId: {}", statementId.toSQLExecStatementId()); diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java new file mode 100644 index 0000000000..3823b7f0d5 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java @@ -0,0 +1,185 @@ +package com.databricks.jdbc.api.impl.arrow; + +import static com.databricks.jdbc.common.util.DecompressionUtil.decompress; + +import com.databricks.jdbc.api.internal.IDatabricksSession; +import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.dbclient.impl.common.StatementId; +import com.databricks.jdbc.dbclient.impl.sqlexec.DatabricksSdkClient; +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.log.JdbcLogger; +import com.databricks.jdbc.log.JdbcLoggerFactory; +import com.databricks.jdbc.model.core.ResultData; +import com.databricks.jdbc.model.core.ResultManifest; +import java.io.ByteArrayInputStream; + +/** + * ChunkProvider for SEA inline Arrow results (bounded SEA API). Fetches chunks lazily via + * GetResultData with row_offset, similar to Thrift's LazyThriftInlineArrowResult but using SEA's + * ArrowIPC format in the attachment field. + * + *

Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0 (cloud fetch disabled). + */ +class SeaInlineArrowChunkProvider implements ChunkProvider { + + private static final JdbcLogger LOGGER = + JdbcLoggerFactory.getLogger(SeaInlineArrowChunkProvider.class); + + private final IDatabricksSession session; + private final StatementId statementId; + private final CompressionCodec compressionCodec; + + private ArrowResultChunk currentChunk; + private long currentChunkIndex; + private long nextChunkIndex; + private long nextRowOffset; + private boolean hasMore; + private long totalRowCount; + private boolean isClosed; + + /** + * @param initialResultData The first chunk from the ExecuteStatement response + * @param resultManifest Manifest with compression and schema info + * @param statementId Statement ID for subsequent fetch calls + * @param session Session for making GetResultData calls + */ + SeaInlineArrowChunkProvider( + ResultData initialResultData, + ResultManifest resultManifest, + StatementId statementId, + IDatabricksSession session) + throws DatabricksSQLException { + this.session = session; + this.statementId = statementId; + this.compressionCodec = resultManifest.getResultCompression(); + this.currentChunkIndex = -1; + this.isClosed = false; + this.totalRowCount = 0; + + // Process initial chunk from the execute response + this.currentChunk = processResultData(initialResultData); + this.hasMore = initialResultData.getNextChunkIndex() != null; + if (hasMore) { + this.nextChunkIndex = initialResultData.getNextChunkIndex(); + } + long rowCount = initialResultData.getRowCount() != null ? initialResultData.getRowCount() : 0; + long rowOffset = + initialResultData.getRowOffset() != null ? initialResultData.getRowOffset() : 0; + this.nextRowOffset = rowOffset + rowCount; + this.totalRowCount += rowCount; + + LOGGER.debug( + "SeaInlineArrowChunkProvider created for statement {}: hasMore={}, nextChunkIndex={}, nextRowOffset={}", + statementId.toSQLExecStatementId(), + hasMore, + nextChunkIndex, + nextRowOffset); + } + + @Override + public boolean hasNextChunk() { + // First call: initial chunk not yet consumed + if (currentChunkIndex == -1) { + return true; + } + return hasMore; + } + + @Override + public boolean next() throws DatabricksSQLException { + if (currentChunkIndex == -1) { + // First call — return the initial chunk (already loaded) + currentChunkIndex = 0; + return true; + } + + if (!hasMore) { + return false; + } + + // Fetch next chunk via GetResultData + try { + DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient(); + ResultData resultData = + client.getResultChunksData(statementId, nextChunkIndex, nextRowOffset); + + // Release previous chunk + if (currentChunk != null) { + currentChunk.releaseChunk(); + } + + currentChunk = processResultData(resultData); + currentChunkIndex = nextChunkIndex; + + // Update continuation from response + hasMore = resultData.getNextChunkIndex() != null; + if (hasMore) { + nextChunkIndex = resultData.getNextChunkIndex(); + } + long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; + nextRowOffset += rowCount; + totalRowCount += rowCount; + + LOGGER.debug( + "Fetched inline chunk {}: rowCount={}, hasMore={}, nextRowOffset={}", + currentChunkIndex, + rowCount, + hasMore, + nextRowOffset); + + return true; + } catch (Exception e) { + throw new DatabricksSQLException( + "Failed to fetch inline Arrow chunk " + nextChunkIndex + ": " + e.getMessage(), + e, + com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.SDK_CLIENT_ERROR); + } + } + + @Override + public ArrowResultChunk getChunk() { + return currentChunk; + } + + @Override + public void close() { + isClosed = true; + if (currentChunk != null) { + currentChunk.releaseChunk(); + currentChunk = null; + } + } + + @Override + public long getRowCount() { + return totalRowCount; + } + + @Override + public long getChunkCount() { + return currentChunkIndex + 1; + } + + @Override + public boolean isClosed() { + return isClosed; + } + + /** Decompresses attachment bytes and creates an ArrowResultChunk. */ + private ArrowResultChunk processResultData(ResultData resultData) throws DatabricksSQLException { + byte[] attachment = resultData.getAttachment(); + if (attachment == null || attachment.length == 0) { + throw new DatabricksSQLException( + "No inline Arrow data (attachment) in result for chunk", + com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.RESULT_SET_ERROR); + } + + byte[] decompressedBytes = + decompress(attachment, compressionCodec, "SEA inline Arrow chunk decompression"); + + long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; + return ArrowResultChunk.builder() + .withInputStream(new ByteArrayInputStream(decompressedBytes), rowCount) + .build(); + } +} diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index b714f2a385..0a1a2510d5 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -123,6 +123,15 @@ public ImmutableSessionInfo createSession( if (schema != null) { request.setSchema(schema); } + // Bounded SEA inline Arrow: tell server not to use cloud storage for results + if (connectionContext.isBoundedSeaApiEnabled() && !connectionContext.isCloudFetchEnabled()) { + if (sessionConf == null) { + sessionConf = new java.util.HashMap<>(); + } else { + sessionConf = new java.util.HashMap<>(sessionConf); + } + sessionConf.put("can_cloud_download", "false"); + } if (sessionConf != null && !sessionConf.isEmpty()) { request.setSessionConfigs(sessionConf); } @@ -585,15 +594,27 @@ private ChunkLinkFetchResult buildChunkLinkFetchResult(Collection @Override public ResultData getResultChunksData(StatementId typedStatementId, long chunkIndex) throws DatabricksSQLException { + return getResultChunksData(typedStatementId, chunkIndex, 0); + } + + /** + * Fetches inline result data for a specific chunk, with row_offset for bounded SEA inline Arrow. + */ + public ResultData getResultChunksData( + StatementId typedStatementId, long chunkIndex, long rowOffset) throws DatabricksSQLException { DatabricksThreadContextHolder.setStatementId(typedStatementId); String statementId = typedStatementId.toSQLExecStatementId(); LOGGER.debug( - "public ResultData getResultChunksData(String statementId = {}, long chunkIndex = {})", + "getResultChunksData(statementId={}, chunkIndex={}, rowOffset={})", statementId, - chunkIndex); + chunkIndex, + rowOffset); GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + if (connectionContext.isBoundedSeaApiEnabled() && rowOffset > 0) { + path = path + "?row_offset=" + rowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN")); @@ -717,17 +738,26 @@ private ExecuteStatementRequest getRequest( boolean executeAsync) throws SQLException { boolean cloudFetch = useCloudFetchForResult(); - Format format = cloudFetch ? Format.ARROW_STREAM : Format.JSON_ARRAY; + // Bounded SEA inline Arrow: use ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS even without CloudFetch + boolean boundedSeaInline = connectionContext.isBoundedSeaApiEnabled() && !cloudFetch; + Format format = (cloudFetch || boundedSeaInline) ? Format.ARROW_STREAM : Format.JSON_ARRAY; Disposition defaultDisposition = connectionContext.isSqlExecHybridResultsEnabled() ? Disposition.INLINE_OR_EXTERNAL_LINKS : Disposition.EXTERNAL_LINKS; - Disposition disposition = cloudFetch ? defaultDisposition : Disposition.INLINE; + Disposition disposition; + if (cloudFetch) { + disposition = defaultDisposition; + } else if (boundedSeaInline) { + disposition = Disposition.INLINE_OR_EXTERNAL_LINKS; + } else { + disposition = Disposition.INLINE; + } long maxRows = (parentStatement == null) ? DEFAULT_RESULT_ROW_LIMIT : parentStatement.getMaxRows(); CompressionCodec compressionCodec = session.getCompressionCodec(); - if (disposition.equals(Disposition.INLINE)) { - LOGGER.debug("Results are inline, skipping compression."); + if (disposition.equals(Disposition.INLINE) && !boundedSeaInline) { + LOGGER.debug("Results are inline JSON, skipping compression."); compressionCodec = CompressionCodec.NONE; } List parameterListItems = From 565a0e94031ed51c6a41e723b8ce93bca775f1d8 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 12:40:22 +0530 Subject: [PATCH 09/10] Rewrite SeaInlineArrowChunkProvider with rolling-window prefetch mirroring ThriftStreamingProvider Replace synchronous lazy-fetch design with background prefetch thread using ReentrantLock + Condition signaling, ConcurrentHashMap for indexed chunk storage, and explicit chunksInMemory tracking for backpressure. This ensures consistency with ThriftStreamingProvider's design patterns. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../arrow/SeaInlineArrowChunkProvider.java | 416 ++++++++++++++---- 1 file changed, 332 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java index 3823b7f0d5..619f2b3ed3 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java @@ -2,6 +2,7 @@ import static com.databricks.jdbc.common.util.DecompressionUtil.decompress; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.common.CompressionCodec; import com.databricks.jdbc.dbclient.impl.common.StatementId; @@ -11,38 +12,63 @@ import com.databricks.jdbc.log.JdbcLoggerFactory; import com.databricks.jdbc.model.core.ResultData; import com.databricks.jdbc.model.core.ResultManifest; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; import java.io.ByteArrayInputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** - * ChunkProvider for SEA inline Arrow results (bounded SEA API). Fetches chunks lazily via - * GetResultData with row_offset, similar to Thrift's LazyThriftInlineArrowResult but using SEA's - * ArrowIPC format in the attachment field. + * ChunkProvider for SEA inline Arrow results with rolling-window prefetch. Fetches chunks via + * GetResultData with row_offset, using a background thread to prefetch ahead of the consumer. * - *

Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0 (cloud fetch disabled). + *

Design mirrors {@link com.databricks.jdbc.api.impl.streaming.ThriftStreamingProvider + * ThriftStreamingProvider} for consistency: uses Lock + Condition for signaling, ConcurrentHashMap + * for indexed batch storage, and explicit batchesInMemory tracking for backpressure. + * + *

Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0. */ class SeaInlineArrowChunkProvider implements ChunkProvider { private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(SeaInlineArrowChunkProvider.class); + private static final String PREFETCH_THREAD_NAME = "sea-inline-prefetcher"; + + // Configuration + private final int maxBatchesInMemory; + private final int chunkReadyTimeoutSeconds; + // Dependencies private final IDatabricksSession session; private final StatementId statementId; private final CompressionCodec compressionCodec; - private ArrowResultChunk currentChunk; - private long currentChunkIndex; - private long nextChunkIndex; - private long nextRowOffset; - private boolean hasMore; - private long totalRowCount; - private boolean isClosed; + // Indexed chunk storage (mirrors ThriftStreamingProvider's ConcurrentHashMap) + private final ConcurrentMap chunks = new ConcurrentHashMap<>(); + + // Position tracking + private final AtomicLong currentChunkIndex = new AtomicLong(-1); + private final AtomicLong highestFetchedChunkIndex = new AtomicLong(-1); + private final AtomicLong nextFetchChunkIndex = new AtomicLong(1); // 0 is initial chunk + private final AtomicLong nextFetchRowOffset = new AtomicLong(0); + private final AtomicLong totalRowCount = new AtomicLong(0); + + // State and synchronization (mirrors ThriftStreamingProvider's lock + conditions) + private volatile boolean endOfStream = false; + private volatile boolean closed = false; + private volatile DatabricksSQLException prefetchError = null; + private final ReentrantLock prefetchLock = new ReentrantLock(); + private final Condition chunkAvailable = prefetchLock.newCondition(); + private final Condition consumerAdvanced = prefetchLock.newCondition(); + private final AtomicInteger chunksInMemory = new AtomicInteger(0); + + // Prefetch thread + private final Thread prefetchThread; - /** - * @param initialResultData The first chunk from the ExecuteStatement response - * @param resultManifest Manifest with compression and schema info - * @param statementId Statement ID for subsequent fetch calls - * @param session Session for making GetResultData calls - */ SeaInlineArrowChunkProvider( ResultData initialResultData, ResultManifest resultManifest, @@ -52,126 +78,348 @@ class SeaInlineArrowChunkProvider implements ChunkProvider { this.session = session; this.statementId = statementId; this.compressionCodec = resultManifest.getResultCompression(); - this.currentChunkIndex = -1; - this.isClosed = false; - this.totalRowCount = 0; - // Process initial chunk from the execute response - this.currentChunk = processResultData(initialResultData); - this.hasMore = initialResultData.getNextChunkIndex() != null; - if (hasMore) { - this.nextChunkIndex = initialResultData.getNextChunkIndex(); + IDatabricksConnectionContext ctx = session.getConnectionContext(); + int configuredMax = ctx.getThriftMaxBatchesInMemory(); + // Need at least 2 to enable any prefetching (same as ThriftStreamingProvider) + if (configuredMax < 2) { + LOGGER.warn( + "Configured maxBatchesInMemory={} is less than the minimum of 2; using 2 instead.", + configuredMax); } + this.maxBatchesInMemory = Math.max(2, configuredMax); + this.chunkReadyTimeoutSeconds = ctx.getChunkReadyTimeoutSeconds(); + + // Process initial chunk + ArrowResultChunk firstChunk = processResultData(initialResultData); long rowCount = initialResultData.getRowCount() != null ? initialResultData.getRowCount() : 0; long rowOffset = initialResultData.getRowOffset() != null ? initialResultData.getRowOffset() : 0; - this.nextRowOffset = rowOffset + rowCount; - this.totalRowCount += rowCount; + totalRowCount.addAndGet(rowCount); + nextFetchRowOffset.set(rowOffset + rowCount); + + // Store initial chunk + chunks.put(0L, firstChunk); + highestFetchedChunkIndex.set(0); + chunksInMemory.incrementAndGet(); + + // Determine if there are more chunks + if (initialResultData.getNextChunkIndex() != null) { + nextFetchChunkIndex.set(initialResultData.getNextChunkIndex()); + } else { + endOfStream = true; + } + + // Start prefetch thread (mirrors ThriftStreamingProvider) + this.prefetchThread = new Thread(this::prefetchLoop, PREFETCH_THREAD_NAME); + this.prefetchThread.setDaemon(true); + this.prefetchThread.start(); + + notifyConsumerAdvanced(); LOGGER.debug( - "SeaInlineArrowChunkProvider created for statement {}: hasMore={}, nextChunkIndex={}, nextRowOffset={}", + "SeaInlineArrowChunkProvider created: statement={}, maxBatches={}, endOfStream={}", statementId.toSQLExecStatementId(), - hasMore, - nextChunkIndex, - nextRowOffset); + maxBatchesInMemory, + endOfStream); } @Override public boolean hasNextChunk() { - // First call: initial chunk not yet consumed - if (currentChunkIndex == -1) { - return true; - } - return hasMore; + if (closed) return false; + if (!endOfStream) return true; + return currentChunkIndex.get() < highestFetchedChunkIndex.get(); } @Override public boolean next() throws DatabricksSQLException { - if (currentChunkIndex == -1) { - // First call — return the initial chunk (already loaded) - currentChunkIndex = 0; - return true; + if (closed) return false; + + checkPrefetchError(); + + // Release previous chunk + long prevIndex = currentChunkIndex.get(); + if (prevIndex >= 0) { + releaseChunk(prevIndex); } - if (!hasMore) { + if (!hasNextChunk()) { return false; } - // Fetch next chunk via GetResultData - try { - DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient(); - ResultData resultData = - client.getResultChunksData(statementId, nextChunkIndex, nextRowOffset); - - // Release previous chunk - if (currentChunk != null) { - currentChunk.releaseChunk(); - } + long nextIndex = currentChunkIndex.incrementAndGet(); + notifyConsumerAdvanced(); - currentChunk = processResultData(resultData); - currentChunkIndex = nextChunkIndex; + // Wait for the chunk to be available (mirrors ThriftStreamingProvider.getCurrentBatch) + ArrowResultChunk chunk = chunks.get(nextIndex); + if (chunk == null) { + LOGGER.debug("Chunk {} not yet available, waiting for prefetch", nextIndex); + waitForChunkCreation(nextIndex); + chunk = chunks.get(nextIndex); + } - // Update continuation from response - hasMore = resultData.getNextChunkIndex() != null; - if (hasMore) { - nextChunkIndex = resultData.getNextChunkIndex(); - } - long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; - nextRowOffset += rowCount; - totalRowCount += rowCount; - - LOGGER.debug( - "Fetched inline chunk {}: rowCount={}, hasMore={}, nextRowOffset={}", - currentChunkIndex, - rowCount, - hasMore, - nextRowOffset); - - return true; - } catch (Exception e) { + if (chunk == null) { + LOGGER.error("Chunk {} not found after waiting", nextIndex); throw new DatabricksSQLException( - "Failed to fetch inline Arrow chunk " + nextChunkIndex + ": " + e.getMessage(), - e, - com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.SDK_CLIENT_ERROR); + "Chunk " + nextIndex + " not found after waiting", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); } + + return true; } @Override public ArrowResultChunk getChunk() { - return currentChunk; + long idx = currentChunkIndex.get(); + if (idx < 0) return null; + return chunks.get(idx); } @Override public void close() { - isClosed = true; - if (currentChunk != null) { - currentChunk.releaseChunk(); - currentChunk = null; + if (closed) return; + + LOGGER.debug("Closing SeaInlineArrowChunkProvider, total rows: {}", totalRowCount.get()); + closed = true; + + notifyConsumerAdvanced(); + notifyChunkAvailable(); + + // Interrupt and wait for prefetch thread (mirrors ThriftStreamingProvider) + prefetchThread.interrupt(); + try { + prefetchThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("Interrupted while waiting for prefetch thread to terminate"); } + + // Release all chunks + for (ArrowResultChunk chunk : chunks.values()) { + try { + chunk.releaseChunk(); + } catch (Exception e) { + LOGGER.warn("Error releasing chunk during close: {}", e.getMessage(), e); + } + } + chunks.clear(); } @Override public long getRowCount() { - return totalRowCount; + return totalRowCount.get(); } @Override public long getChunkCount() { - return currentChunkIndex + 1; + return currentChunkIndex.get() + 1; } @Override public boolean isClosed() { - return isClosed; + return closed; + } + + // ==================== Prefetch Logic ==================== + + /** Background prefetch loop — mirrors ThriftStreamingProvider.prefetchLoop(). */ + private void prefetchLoop() { + LOGGER.debug("Prefetch thread started"); + + while (!closed && !Thread.currentThread().isInterrupted()) { + try { + // Wait until queue has room (backpressure from slow consumer) + prefetchLock.lock(); + try { + while (!closed && !endOfStream && chunksInMemory.get() >= maxBatchesInMemory) { + LOGGER.debug( + "Prefetch waiting: chunks={}/{}", chunksInMemory.get(), maxBatchesInMemory); + consumerAdvanced.await(); + } + } finally { + prefetchLock.unlock(); + } + + if (closed || endOfStream) break; + + fetchNextChunkInternal(); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("Prefetch thread interrupted"); + break; + } catch (DatabricksSQLException e) { + LOGGER.error("Prefetch error: {}", e.getMessage()); + prefetchError = e; + notifyChunkAvailable(); + break; + } catch (Exception e) { + LOGGER.error("Unexpected prefetch error: {}", e.getMessage(), e); + prefetchError = + new DatabricksSQLException( + "Unexpected prefetch error: " + e.getMessage(), + e, + DatabricksDriverErrorCode.SDK_CLIENT_ERROR); + notifyChunkAvailable(); + break; + } + } + + LOGGER.debug("Prefetch thread exiting"); } + /** + * Fetches a single chunk from the server — mirrors + * ThriftStreamingProvider.fetchNextBatchInternal(). + */ + private void fetchNextChunkInternal() throws DatabricksSQLException { + long chunkIndex = nextFetchChunkIndex.get(); + long rowOffset = nextFetchRowOffset.get(); + + LOGGER.debug("Fetching inline chunk {} at offset {}", chunkIndex, rowOffset); + + DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient(); + ResultData resultData = client.getResultChunksData(statementId, chunkIndex, rowOffset); + + ArrowResultChunk chunk = processResultData(resultData); + long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; + + // Store chunk and update state + chunks.put(chunkIndex, chunk); + chunksInMemory.incrementAndGet(); + highestFetchedChunkIndex.updateAndGet(cur -> Math.max(cur, chunkIndex)); + totalRowCount.addAndGet(rowCount); + nextFetchRowOffset.addAndGet(rowCount); + + LOGGER.debug( + "Chunk {} ready: rowCount={}, hasMore={}", + chunkIndex, + rowCount, + resultData.getNextChunkIndex() != null); + + // Update continuation + if (resultData.getNextChunkIndex() != null) { + nextFetchChunkIndex.set(resultData.getNextChunkIndex()); + } else { + endOfStream = true; + LOGGER.debug("End of stream at chunk {}", chunkIndex); + } + + notifyChunkAvailable(); + } + + // ==================== Resource Management ==================== + + private void releaseChunk(long chunkIndex) { + ArrowResultChunk chunk = chunks.remove(chunkIndex); + if (chunk != null) { + // Decrement counter BEFORE release to prevent prefetch stall if release throws + chunksInMemory.decrementAndGet(); + try { + chunk.releaseChunk(); + } catch (Exception e) { + LOGGER.warn("Error releasing chunk {}: {}", chunkIndex, e.getMessage(), e); + } + LOGGER.debug("Released chunk {}, chunks in memory: {}", chunkIndex, chunksInMemory.get()); + notifyConsumerAdvanced(); + } + } + + /** + * Waits for a chunk to be created by the prefetch thread. Mirrors + * ThriftStreamingProvider.waitForBatchCreation(). + */ + private void waitForChunkCreation(long chunkIndex) throws DatabricksSQLException { + prefetchLock.lock(); + try { + long waitStartTime = System.currentTimeMillis(); + long timeoutMillis = chunkReadyTimeoutSeconds * 1000L; + + while (!closed && !chunks.containsKey(chunkIndex)) { + checkPrefetchError(); + if (endOfStream && chunkIndex > highestFetchedChunkIndex.get()) { + LOGGER.error( + "Chunk {} does not exist (highest fetched: {})", + chunkIndex, + highestFetchedChunkIndex.get()); + throw new DatabricksSQLException( + "Chunk " + + chunkIndex + + " does not exist (highest: " + + highestFetchedChunkIndex.get() + + ")", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + + long elapsedMillis = System.currentTimeMillis() - waitStartTime; + if (elapsedMillis >= timeoutMillis) { + LOGGER.error( + "Timeout waiting for chunk {} to be created (timeout: {}s)", + chunkIndex, + chunkReadyTimeoutSeconds); + throw new DatabricksSQLException( + "Timeout waiting for chunk " + + chunkIndex + + " to be created (timeout: " + + chunkReadyTimeoutSeconds + + "s)", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + + try { + long remainingMillis = timeoutMillis - elapsedMillis; + chunkAvailable.await(remainingMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted waiting for chunk {} creation", chunkIndex); + throw new DatabricksSQLException( + "Interrupted waiting for chunk", + e, + DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR); + } + } + } finally { + prefetchLock.unlock(); + } + } + + private void checkPrefetchError() throws DatabricksSQLException { + if (prefetchError != null) { + LOGGER.error("Prefetch failed: {}", prefetchError.getMessage(), prefetchError); + throw new DatabricksSQLException( + "Prefetch failed: " + prefetchError.getMessage(), + prefetchError, + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + } + + private void notifyConsumerAdvanced() { + prefetchLock.lock(); + try { + consumerAdvanced.signalAll(); + } finally { + prefetchLock.unlock(); + } + } + + private void notifyChunkAvailable() { + prefetchLock.lock(); + try { + chunkAvailable.signalAll(); + } finally { + prefetchLock.unlock(); + } + } + + // ==================== Data Processing ==================== + /** Decompresses attachment bytes and creates an ArrowResultChunk. */ private ArrowResultChunk processResultData(ResultData resultData) throws DatabricksSQLException { byte[] attachment = resultData.getAttachment(); if (attachment == null || attachment.length == 0) { throw new DatabricksSQLException( - "No inline Arrow data (attachment) in result for chunk", - com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.RESULT_SET_ERROR); + "No inline Arrow data (attachment) in result", + DatabricksDriverErrorCode.RESULT_SET_ERROR); } byte[] decompressedBytes = From 70dfef82d8a39f8fd07ab1ccc8af1feaf3a29b05 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Wed, 27 May 2026 14:26:19 +0530 Subject: [PATCH 10/10] Add single-writer invariant comment for fetch position fields Document why nextFetchChunkIndex and nextFetchRowOffset are safe as separate AtomicLongs (single prefetch thread writer) unlike StreamingChunkProvider which needs a bundled FetchPosition holder. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java index 619f2b3ed3..dfc5bb3524 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java @@ -53,6 +53,10 @@ class SeaInlineArrowChunkProvider implements ChunkProvider { // Position tracking private final AtomicLong currentChunkIndex = new AtomicLong(-1); private final AtomicLong highestFetchedChunkIndex = new AtomicLong(-1); + // Single-writer: nextFetchChunkIndex and nextFetchRowOffset are only written by + // the prefetch thread (fetchNextChunkInternal), so separate AtomicLongs are safe. + // Unlike StreamingChunkProvider where download threads can also update the position + // (requiring a bundled FetchPosition holder), this provider has no second writer. private final AtomicLong nextFetchChunkIndex = new AtomicLong(1); // 0 is initial chunk private final AtomicLong nextFetchRowOffset = new AtomicLong(0); private final AtomicLong totalRowCount = new AtomicLong(0);