diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md
index d27a25e26..708fbe50c 100644
--- a/NEXT_CHANGELOG.md
+++ b/NEXT_CHANGELOG.md
@@ -3,6 +3,11 @@
## [Unreleased]
### Added
+- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
+- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends,
+ ensuring consistent behavior for SQL warehouses regardless of underlying
+ protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.
+- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support.
### Updated
diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java
index 956695ab5..6a793a123 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java
@@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1");
}
+ @Override
+ public boolean isBoundedSeaApiEnabled() {
+ return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1");
+ }
+
@Override
public int getThriftMaxBatchesInMemory() {
try {
diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java
index 540699407..5461d77d9 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java
@@ -77,6 +77,7 @@ enum ResultSetType {
private ResultSetType resultSetType = ResultSetType.UNASSIGNED;
private boolean complexDatatypeSupport = false;
+ private boolean boundedSeaApiEnabled = false;
// Cached telemetry collector resolved once at construction time to avoid
// per-row overhead in next(). The connection-to-collector mapping is stable
@@ -128,6 +129,7 @@ public DatabricksResultSet(
resultSetMetaData = null;
}
this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled();
+ this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled();
this.statementType = statementType;
this.updateCount = null;
this.parentStatement = parentStatement;
@@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException {
public boolean isAfterLast() throws SQLException {
checkIfClosed();
// Account for client-side maxRows truncation
- return truncatedByMaxRows
- || executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
+ if (truncatedByMaxRows) {
+ return true;
+ }
+ // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
+ // which derives end-of-stream from next_chunk_index via the chunk provider.
+ if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
+ return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
+ }
+ return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
}
@Override
@@ -974,6 +983,10 @@ public boolean isLast() throws SQLException {
|| executionResult instanceof StreamingInlineArrowResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
+ // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
+ if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
+ return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
+ }
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
}
diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java
index 1569b7efb..0a652766d 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java
@@ -64,7 +64,17 @@ public ArrowStreamResult(
this.session = session;
// Check if the result data contains the arrow data inline
boolean isInlineArrow = resultData.getAttachment() != null;
- if (isInlineArrow) {
+ IDatabricksConnectionContext connectionContext = session.getConnectionContext();
+ if (isInlineArrow
+ && connectionContext.isBoundedSeaApiEnabled()
+ && !connectionContext.isCloudFetchEnabled()) {
+ // Bounded SEA inline Arrow: multi-chunk lazy fetch via GetResultData with row_offset
+ LOGGER.debug(
+ "Creating ArrowStreamResult with SEA inline Arrow for statementId: {}",
+ statementId.toSQLExecStatementId());
+ this.chunkProvider =
+ new SeaInlineArrowChunkProvider(resultData, resultManifest, statementId, session);
+ } else if (isInlineArrow) {
LOGGER.debug(
"Creating ArrowStreamResult with inline attachment for statementId: {}",
statementId.toSQLExecStatementId());
@@ -102,7 +112,9 @@ private static ChunkProvider createRemoteChunkProvider(
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
- if (connectionContext.isStreamingChunkProviderEnabled()) {
+ // Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count
+ if (connectionContext.isStreamingChunkProviderEnabled()
+ || connectionContext.isBoundedSeaApiEnabled()) {
LOGGER.info(
"Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId());
@@ -113,10 +125,13 @@ private static ChunkProvider createRemoteChunkProvider(
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();
- // Convert ExternalLinks to ChunkLinkFetchResult for the provider
+ // Convert ExternalLinks to ChunkLinkFetchResult for the provider.
+ // Bounded SEA API: pass null for totalChunkCount — we must not depend on
+ // manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract.
+ Long totalChunkCount =
+ connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount();
ChunkLinkFetchResult initialLinks =
- convertToChunkLinkFetchResult(
- resultData.getExternalLinks(), resultManifest.getTotalChunkCount());
+ convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount);
return new StreamingChunkProvider(
linkFetcher,
diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java
index 9be8dccf1..7df1586cc 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java
@@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId)
@Override
public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset)
throws SQLException {
- // SEA uses startChunkIndex; startRowOffset is ignored
+ // SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API
LOGGER.debug(
"Fetching links starting from chunk index {} for statement {}",
startChunkIndex,
@@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset
@Override
public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException {
- // SEA uses chunkIndex; rowOffset is ignored
+ // SEA uses chunkIndex; rowOffset is passed through for bounded SEA API
LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId);
ChunkLinkFetchResult result =
diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java
new file mode 100644
index 000000000..dfc5bb352
--- /dev/null
+++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java
@@ -0,0 +1,437 @@
+package com.databricks.jdbc.api.impl.arrow;
+
+import static com.databricks.jdbc.common.util.DecompressionUtil.decompress;
+
+import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
+import com.databricks.jdbc.api.internal.IDatabricksSession;
+import com.databricks.jdbc.common.CompressionCodec;
+import com.databricks.jdbc.dbclient.impl.common.StatementId;
+import com.databricks.jdbc.dbclient.impl.sqlexec.DatabricksSdkClient;
+import com.databricks.jdbc.exception.DatabricksSQLException;
+import com.databricks.jdbc.log.JdbcLogger;
+import com.databricks.jdbc.log.JdbcLoggerFactory;
+import com.databricks.jdbc.model.core.ResultData;
+import com.databricks.jdbc.model.core.ResultManifest;
+import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
+import java.io.ByteArrayInputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ChunkProvider for SEA inline Arrow results with rolling-window prefetch. Fetches chunks via
+ * GetResultData with row_offset, using a background thread to prefetch ahead of the consumer.
+ *
+ *
Design mirrors {@link com.databricks.jdbc.api.impl.streaming.ThriftStreamingProvider
+ * ThriftStreamingProvider} for consistency: uses Lock + Condition for signaling, ConcurrentHashMap
+ * for indexed batch storage, and explicit batchesInMemory tracking for backpressure.
+ *
+ *
Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0.
+ */
+class SeaInlineArrowChunkProvider implements ChunkProvider {
+
+ private static final JdbcLogger LOGGER =
+ JdbcLoggerFactory.getLogger(SeaInlineArrowChunkProvider.class);
+ private static final String PREFETCH_THREAD_NAME = "sea-inline-prefetcher";
+
+ // Configuration
+ private final int maxBatchesInMemory;
+ private final int chunkReadyTimeoutSeconds;
+
+ // Dependencies
+ private final IDatabricksSession session;
+ private final StatementId statementId;
+ private final CompressionCodec compressionCodec;
+
+ // Indexed chunk storage (mirrors ThriftStreamingProvider's ConcurrentHashMap)
+ private final ConcurrentMap chunks = new ConcurrentHashMap<>();
+
+ // Position tracking
+ private final AtomicLong currentChunkIndex = new AtomicLong(-1);
+ private final AtomicLong highestFetchedChunkIndex = new AtomicLong(-1);
+ // Single-writer: nextFetchChunkIndex and nextFetchRowOffset are only written by
+ // the prefetch thread (fetchNextChunkInternal), so separate AtomicLongs are safe.
+ // Unlike StreamingChunkProvider where download threads can also update the position
+ // (requiring a bundled FetchPosition holder), this provider has no second writer.
+ private final AtomicLong nextFetchChunkIndex = new AtomicLong(1); // 0 is initial chunk
+ private final AtomicLong nextFetchRowOffset = new AtomicLong(0);
+ private final AtomicLong totalRowCount = new AtomicLong(0);
+
+ // State and synchronization (mirrors ThriftStreamingProvider's lock + conditions)
+ private volatile boolean endOfStream = false;
+ private volatile boolean closed = false;
+ private volatile DatabricksSQLException prefetchError = null;
+ private final ReentrantLock prefetchLock = new ReentrantLock();
+ private final Condition chunkAvailable = prefetchLock.newCondition();
+ private final Condition consumerAdvanced = prefetchLock.newCondition();
+ private final AtomicInteger chunksInMemory = new AtomicInteger(0);
+
+ // Prefetch thread
+ private final Thread prefetchThread;
+
+ SeaInlineArrowChunkProvider(
+ ResultData initialResultData,
+ ResultManifest resultManifest,
+ StatementId statementId,
+ IDatabricksSession session)
+ throws DatabricksSQLException {
+ this.session = session;
+ this.statementId = statementId;
+ this.compressionCodec = resultManifest.getResultCompression();
+
+ IDatabricksConnectionContext ctx = session.getConnectionContext();
+ int configuredMax = ctx.getThriftMaxBatchesInMemory();
+ // Need at least 2 to enable any prefetching (same as ThriftStreamingProvider)
+ if (configuredMax < 2) {
+ LOGGER.warn(
+ "Configured maxBatchesInMemory={} is less than the minimum of 2; using 2 instead.",
+ configuredMax);
+ }
+ this.maxBatchesInMemory = Math.max(2, configuredMax);
+ this.chunkReadyTimeoutSeconds = ctx.getChunkReadyTimeoutSeconds();
+
+ // Process initial chunk
+ ArrowResultChunk firstChunk = processResultData(initialResultData);
+ long rowCount = initialResultData.getRowCount() != null ? initialResultData.getRowCount() : 0;
+ long rowOffset =
+ initialResultData.getRowOffset() != null ? initialResultData.getRowOffset() : 0;
+ totalRowCount.addAndGet(rowCount);
+ nextFetchRowOffset.set(rowOffset + rowCount);
+
+ // Store initial chunk
+ chunks.put(0L, firstChunk);
+ highestFetchedChunkIndex.set(0);
+ chunksInMemory.incrementAndGet();
+
+ // Determine if there are more chunks
+ if (initialResultData.getNextChunkIndex() != null) {
+ nextFetchChunkIndex.set(initialResultData.getNextChunkIndex());
+ } else {
+ endOfStream = true;
+ }
+
+ // Start prefetch thread (mirrors ThriftStreamingProvider)
+ this.prefetchThread = new Thread(this::prefetchLoop, PREFETCH_THREAD_NAME);
+ this.prefetchThread.setDaemon(true);
+ this.prefetchThread.start();
+
+ notifyConsumerAdvanced();
+
+ LOGGER.debug(
+ "SeaInlineArrowChunkProvider created: statement={}, maxBatches={}, endOfStream={}",
+ statementId.toSQLExecStatementId(),
+ maxBatchesInMemory,
+ endOfStream);
+ }
+
+ @Override
+ public boolean hasNextChunk() {
+ if (closed) return false;
+ if (!endOfStream) return true;
+ return currentChunkIndex.get() < highestFetchedChunkIndex.get();
+ }
+
+ @Override
+ public boolean next() throws DatabricksSQLException {
+ if (closed) return false;
+
+ checkPrefetchError();
+
+ // Release previous chunk
+ long prevIndex = currentChunkIndex.get();
+ if (prevIndex >= 0) {
+ releaseChunk(prevIndex);
+ }
+
+ if (!hasNextChunk()) {
+ return false;
+ }
+
+ long nextIndex = currentChunkIndex.incrementAndGet();
+ notifyConsumerAdvanced();
+
+ // Wait for the chunk to be available (mirrors ThriftStreamingProvider.getCurrentBatch)
+ ArrowResultChunk chunk = chunks.get(nextIndex);
+ if (chunk == null) {
+ LOGGER.debug("Chunk {} not yet available, waiting for prefetch", nextIndex);
+ waitForChunkCreation(nextIndex);
+ chunk = chunks.get(nextIndex);
+ }
+
+ if (chunk == null) {
+ LOGGER.error("Chunk {} not found after waiting", nextIndex);
+ throw new DatabricksSQLException(
+ "Chunk " + nextIndex + " not found after waiting",
+ DatabricksDriverErrorCode.CHUNK_READY_ERROR);
+ }
+
+ return true;
+ }
+
+ @Override
+ public ArrowResultChunk getChunk() {
+ long idx = currentChunkIndex.get();
+ if (idx < 0) return null;
+ return chunks.get(idx);
+ }
+
+ @Override
+ public void close() {
+ if (closed) return;
+
+ LOGGER.debug("Closing SeaInlineArrowChunkProvider, total rows: {}", totalRowCount.get());
+ closed = true;
+
+ notifyConsumerAdvanced();
+ notifyChunkAvailable();
+
+ // Interrupt and wait for prefetch thread (mirrors ThriftStreamingProvider)
+ prefetchThread.interrupt();
+ try {
+ prefetchThread.join(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.debug("Interrupted while waiting for prefetch thread to terminate");
+ }
+
+ // Release all chunks
+ for (ArrowResultChunk chunk : chunks.values()) {
+ try {
+ chunk.releaseChunk();
+ } catch (Exception e) {
+ LOGGER.warn("Error releasing chunk during close: {}", e.getMessage(), e);
+ }
+ }
+ chunks.clear();
+ }
+
+ @Override
+ public long getRowCount() {
+ return totalRowCount.get();
+ }
+
+ @Override
+ public long getChunkCount() {
+ return currentChunkIndex.get() + 1;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // ==================== Prefetch Logic ====================
+
+ /** Background prefetch loop — mirrors ThriftStreamingProvider.prefetchLoop(). */
+ private void prefetchLoop() {
+ LOGGER.debug("Prefetch thread started");
+
+ while (!closed && !Thread.currentThread().isInterrupted()) {
+ try {
+ // Wait until queue has room (backpressure from slow consumer)
+ prefetchLock.lock();
+ try {
+ while (!closed && !endOfStream && chunksInMemory.get() >= maxBatchesInMemory) {
+ LOGGER.debug(
+ "Prefetch waiting: chunks={}/{}", chunksInMemory.get(), maxBatchesInMemory);
+ consumerAdvanced.await();
+ }
+ } finally {
+ prefetchLock.unlock();
+ }
+
+ if (closed || endOfStream) break;
+
+ fetchNextChunkInternal();
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.debug("Prefetch thread interrupted");
+ break;
+ } catch (DatabricksSQLException e) {
+ LOGGER.error("Prefetch error: {}", e.getMessage());
+ prefetchError = e;
+ notifyChunkAvailable();
+ break;
+ } catch (Exception e) {
+ LOGGER.error("Unexpected prefetch error: {}", e.getMessage(), e);
+ prefetchError =
+ new DatabricksSQLException(
+ "Unexpected prefetch error: " + e.getMessage(),
+ e,
+ DatabricksDriverErrorCode.SDK_CLIENT_ERROR);
+ notifyChunkAvailable();
+ break;
+ }
+ }
+
+ LOGGER.debug("Prefetch thread exiting");
+ }
+
+ /**
+ * Fetches a single chunk from the server — mirrors
+ * ThriftStreamingProvider.fetchNextBatchInternal().
+ */
+ private void fetchNextChunkInternal() throws DatabricksSQLException {
+ long chunkIndex = nextFetchChunkIndex.get();
+ long rowOffset = nextFetchRowOffset.get();
+
+ LOGGER.debug("Fetching inline chunk {} at offset {}", chunkIndex, rowOffset);
+
+ DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient();
+ ResultData resultData = client.getResultChunksData(statementId, chunkIndex, rowOffset);
+
+ ArrowResultChunk chunk = processResultData(resultData);
+ long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0;
+
+ // Store chunk and update state
+ chunks.put(chunkIndex, chunk);
+ chunksInMemory.incrementAndGet();
+ highestFetchedChunkIndex.updateAndGet(cur -> Math.max(cur, chunkIndex));
+ totalRowCount.addAndGet(rowCount);
+ nextFetchRowOffset.addAndGet(rowCount);
+
+ LOGGER.debug(
+ "Chunk {} ready: rowCount={}, hasMore={}",
+ chunkIndex,
+ rowCount,
+ resultData.getNextChunkIndex() != null);
+
+ // Update continuation
+ if (resultData.getNextChunkIndex() != null) {
+ nextFetchChunkIndex.set(resultData.getNextChunkIndex());
+ } else {
+ endOfStream = true;
+ LOGGER.debug("End of stream at chunk {}", chunkIndex);
+ }
+
+ notifyChunkAvailable();
+ }
+
+ // ==================== Resource Management ====================
+
+ private void releaseChunk(long chunkIndex) {
+ ArrowResultChunk chunk = chunks.remove(chunkIndex);
+ if (chunk != null) {
+ // Decrement counter BEFORE release to prevent prefetch stall if release throws
+ chunksInMemory.decrementAndGet();
+ try {
+ chunk.releaseChunk();
+ } catch (Exception e) {
+ LOGGER.warn("Error releasing chunk {}: {}", chunkIndex, e.getMessage(), e);
+ }
+ LOGGER.debug("Released chunk {}, chunks in memory: {}", chunkIndex, chunksInMemory.get());
+ notifyConsumerAdvanced();
+ }
+ }
+
+ /**
+ * Waits for a chunk to be created by the prefetch thread. Mirrors
+ * ThriftStreamingProvider.waitForBatchCreation().
+ */
+ private void waitForChunkCreation(long chunkIndex) throws DatabricksSQLException {
+ prefetchLock.lock();
+ try {
+ long waitStartTime = System.currentTimeMillis();
+ long timeoutMillis = chunkReadyTimeoutSeconds * 1000L;
+
+ while (!closed && !chunks.containsKey(chunkIndex)) {
+ checkPrefetchError();
+ if (endOfStream && chunkIndex > highestFetchedChunkIndex.get()) {
+ LOGGER.error(
+ "Chunk {} does not exist (highest fetched: {})",
+ chunkIndex,
+ highestFetchedChunkIndex.get());
+ throw new DatabricksSQLException(
+ "Chunk "
+ + chunkIndex
+ + " does not exist (highest: "
+ + highestFetchedChunkIndex.get()
+ + ")",
+ DatabricksDriverErrorCode.CHUNK_READY_ERROR);
+ }
+
+ long elapsedMillis = System.currentTimeMillis() - waitStartTime;
+ if (elapsedMillis >= timeoutMillis) {
+ LOGGER.error(
+ "Timeout waiting for chunk {} to be created (timeout: {}s)",
+ chunkIndex,
+ chunkReadyTimeoutSeconds);
+ throw new DatabricksSQLException(
+ "Timeout waiting for chunk "
+ + chunkIndex
+ + " to be created (timeout: "
+ + chunkReadyTimeoutSeconds
+ + "s)",
+ DatabricksDriverErrorCode.CHUNK_READY_ERROR);
+ }
+
+ try {
+ long remainingMillis = timeoutMillis - elapsedMillis;
+ chunkAvailable.await(remainingMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted waiting for chunk {} creation", chunkIndex);
+ throw new DatabricksSQLException(
+ "Interrupted waiting for chunk",
+ e,
+ DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
+ }
+ }
+ } finally {
+ prefetchLock.unlock();
+ }
+ }
+
+ private void checkPrefetchError() throws DatabricksSQLException {
+ if (prefetchError != null) {
+ LOGGER.error("Prefetch failed: {}", prefetchError.getMessage(), prefetchError);
+ throw new DatabricksSQLException(
+ "Prefetch failed: " + prefetchError.getMessage(),
+ prefetchError,
+ DatabricksDriverErrorCode.CHUNK_READY_ERROR);
+ }
+ }
+
+ private void notifyConsumerAdvanced() {
+ prefetchLock.lock();
+ try {
+ consumerAdvanced.signalAll();
+ } finally {
+ prefetchLock.unlock();
+ }
+ }
+
+ private void notifyChunkAvailable() {
+ prefetchLock.lock();
+ try {
+ chunkAvailable.signalAll();
+ } finally {
+ prefetchLock.unlock();
+ }
+ }
+
+ // ==================== Data Processing ====================
+
+ /** Decompresses attachment bytes and creates an ArrowResultChunk. */
+ private ArrowResultChunk processResultData(ResultData resultData) throws DatabricksSQLException {
+ byte[] attachment = resultData.getAttachment();
+ if (attachment == null || attachment.length == 0) {
+ throw new DatabricksSQLException(
+ "No inline Arrow data (attachment) in result",
+ DatabricksDriverErrorCode.RESULT_SET_ERROR);
+ }
+
+ byte[] decompressedBytes =
+ decompress(attachment, compressionCodec, "SEA inline Arrow chunk decompression");
+
+ long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0;
+ return ArrowResultChunk.builder()
+ .withInputStream(new ByteArrayInputStream(decompressedBytes), rowCount)
+ .build();
+ }
+}
diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java
index efc664b24..9d9c183ee 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java
@@ -1,6 +1,8 @@
package com.databricks.jdbc.api.impl.arrow;
+import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.CompressionCodec;
+import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
@@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable {
private final LinkRefresher linkRefresher;
private final double cloudFetchSpeedThreshold;
+ // Capture caller's thread context for telemetry/logging on the download thread
+ private final IDatabricksConnectionContext connectionContext;
+ private final String statementId;
+
public StreamingChunkDownloadTask(
ArrowResultChunk chunk,
IDatabricksHttpClient httpClient,
@@ -40,13 +46,21 @@ public StreamingChunkDownloadTask(
this.compressionCodec = compressionCodec;
this.linkRefresher = linkRefresher;
this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold;
+ this.connectionContext = DatabricksThreadContextHolder.getConnectionContext();
+ this.statementId = DatabricksThreadContextHolder.getStatementId();
}
@Override
public Void call() throws DatabricksSQLException {
int retries = 0;
boolean downloadSuccessful = false;
+ Throwable uncaughtException = null;
+
+ // Propagate caller's thread context for telemetry/logging
+ DatabricksThreadContextHolder.setConnectionContext(this.connectionContext);
+ DatabricksThreadContextHolder.setStatementId(this.statementId);
+ long taskStartTime = System.nanoTime();
try {
while (!downloadSuccessful) {
try {
@@ -62,7 +76,12 @@ public Void call() throws DatabricksSQLException {
chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold);
downloadSuccessful = true;
- LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex());
+ long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000;
+ LOGGER.debug(
+ "Chunk download complete: chunkIndex={}, totalMs={}, retries={}",
+ chunk.getChunkIndex(),
+ taskTotalMs,
+ retries);
} catch (IOException | SQLException e) {
retries++;
@@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException {
chunk.getChunkIndex(),
MAX_RETRIES,
e.getMessage());
- // Status will be set to DOWNLOAD_FAILED in the finally block
+ // Status set to DOWNLOAD_FAILED in the finally block
throw new DatabricksSQLException(
String.format(
"Failed to download chunk %d after %d attempts",
@@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException {
}
}
}
+ } catch (Throwable t) {
+ uncaughtException = t;
+ throw t;
} finally {
if (downloadSuccessful) {
chunk.getChunkReadyFuture().complete(null);
} else {
+ LOGGER.info(
+ "Download failed for chunk {}: {}",
+ chunk.getChunkIndex(),
+ uncaughtException != null ? uncaughtException.getMessage() : "unknown");
chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED);
chunk
.getChunkReadyFuture()
.completeExceptionally(
new DatabricksSQLException(
"Download failed for chunk " + chunk.getChunkIndex(),
+ uncaughtException,
DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR));
}
+
+ DatabricksThreadContextHolder.clearAllContext();
}
return null;
diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java
index dbb1bbe78..a4d12e88b 100644
--- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java
+++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java
@@ -47,6 +47,17 @@
*/
public class StreamingChunkProvider implements ChunkProvider {
+ /** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */
+ private static final class FetchPosition {
+ final long chunkIndex;
+ final long rowOffset;
+
+ FetchPosition(long chunkIndex, long rowOffset) {
+ this.chunkIndex = chunkIndex;
+ this.rowOffset = rowOffset;
+ }
+ }
+
private static final JdbcLogger LOGGER =
JdbcLoggerFactory.getLogger(StreamingChunkProvider.class);
private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-";
@@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider {
// - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency
private final AtomicLong currentChunkIndex = new AtomicLong(-1);
private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1);
- private volatile long nextLinkFetchIndex = 0;
- private volatile long nextRowOffsetToFetch = 0;
+ // Bundled as an immutable pair for atomic reads/writes across threads.
+ // The prefetch thread reads this (fetchNextLinkBatch) while the download thread
+ // may update it (getRefreshedLink reconciliation). A volatile reference to an
+ // immutable holder ensures both fields are always read consistently.
+ private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0);
private final AtomicLong nextDownloadIndex = new AtomicLong(0);
// State flags
@@ -347,11 +361,11 @@ private void linkPrefetchLoop() {
long targetIndex = currentChunkIndex.get() + linkPrefetchWindow;
// Wait if we're caught up
- while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) {
+ while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) {
if (closed) break;
LOGGER.debug(
"Prefetch caught up, waiting for consumer. next={}, target={}",
- nextLinkFetchIndex,
+ nextFetchPosition.chunkIndex,
targetIndex);
consumerAdvanced.await();
targetIndex = currentChunkIndex.get() + linkPrefetchWindow;
@@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException {
return;
}
+ FetchPosition pos = nextFetchPosition;
LOGGER.debug(
"Fetching links starting from index {}, row offset {} for statement {}",
- nextLinkFetchIndex,
- nextRowOffsetToFetch,
+ pos.chunkIndex,
+ pos.rowOffset,
statementId);
- ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch);
+ ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset);
if (result.isEndOfStream()) {
LOGGER.info("End of stream reached for statement {}", statementId);
@@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException {
createChunkFromLink(link);
}
- // Update next fetch positions
+ // Update next fetch position atomically
if (result.hasMore()) {
- nextLinkFetchIndex = result.getNextFetchIndex();
- nextRowOffsetToFetch = result.getNextRowOffset();
+ nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
} else {
endOfStreamReached = true;
LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId);
@@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
createChunkFromLink(link);
}
- // Set next fetch positions using unified API
+ // Set next fetch position atomically
if (initialLinks.hasMore()) {
- nextLinkFetchIndex = initialLinks.getNextFetchIndex();
- nextRowOffsetToFetch = initialLinks.getNextRowOffset();
+ FetchPosition pos =
+ new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset());
+ nextFetchPosition = pos;
LOGGER.debug(
"Next fetch position set to chunk index {}, row offset {} from initial links",
- nextLinkFetchIndex,
- nextRowOffsetToFetch);
+ pos.chunkIndex,
+ pos.rowOffset);
} else {
endOfStreamReached = true;
LOGGER.info("End of stream reached from initial links for statement {}", statementId);
@@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
*/
private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException {
long chunkIndex = link.getChunkIndex();
- if (chunks.containsKey(chunkIndex)) {
- LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
- return;
- }
-
long rowCount = link.getRowCount();
long rowOffset = link.getRowOffset();
@@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce
.build();
chunk.setChunkLink(link);
- chunks.put(chunkIndex, chunk);
+
+ // Atomic insert — if another thread already created this chunk, skip.
+ // This is safe because createChunkFromLink can be called concurrently from
+ // the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink).
+ ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk);
+ if (existing != null) {
+ LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
+ return;
+ }
+
highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex));
totalRowCount.addAndGet(rowCount);
@@ -596,12 +615,13 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
// Single batch FetchResults RPC from the lowest expired offset
ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset);
- // Update ALL pre-download chunks that received fresh links.
- // Always overwrite even if the current link hasn't expired yet, since the
- // server-provided link has a later expiry and prevents near-expiry races.
+ // Reconcile ALL links from the refresh response with local chunk state.
for (ExternalLink link : result.getChunkLinks()) {
ArrowResultChunk c = chunks.get(link.getChunkIndex());
if (c != null) {
+ // Existing chunk: update link only for pre-download states.
+ // DOWNLOADING stays as-is (download task owns the state machine).
+ // DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory).
ChunkStatus status = c.getStatus();
if (status == ChunkStatus.PENDING
|| status == ChunkStatus.URL_FETCHED
@@ -609,9 +629,34 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
|| status == ChunkStatus.DOWNLOAD_RETRY) {
c.setChunkLink(link);
}
+ } else {
+ // Server returned a chunk not yet in our map — create it.
+ // Handles cases where refresh response includes chunks beyond
+ // our current highestKnownChunkIndex.
+ try {
+ createChunkFromLink(link);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to create chunk {} from refresh response: {}",
+ link.getChunkIndex(),
+ e.getMessage());
+ }
}
}
+ // Update end-of-stream and prefetch position from refresh response
+ if (!result.hasMore()) {
+ endOfStreamReached = true;
+ } else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) {
+ // Avoid re-fetching chunks that the refresh already discovered.
+ // Atomic update of both fields via immutable holder.
+ nextFetchPosition =
+ new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
+ }
+
+ // Trigger downloads for any newly-created chunks
+ triggerDownloads();
+
// Check if our target chunk was refreshed by the batch
targetChunk = chunks.get(chunkIndex);
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {
diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java
index 8ca5ae0cd..5754ec0da 100644
--- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java
+++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java
@@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() {
*/
boolean isCloudFetchEnabled();
+ /** Returns whether bounded SEA API mode is enabled for CloudFetch. */
+ boolean isBoundedSeaApiEnabled();
+
/**
* Returns the maximum number of batches to keep in memory for Thrift streaming.
*
diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java
index 639288248..b9a70f1a8 100644
--- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java
+++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java
@@ -204,6 +204,10 @@ public enum DatabricksJdbcUrlParams {
"EnableSeaSyncMetadata",
"Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode",
"1"),
+ USE_BOUNDED_SEA_API(
+ "UseBoundedSeaApi",
+ "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.",
+ "0"),
DISABLE_OAUTH_REFRESH_TOKEN(
"DisableOauthRefreshToken",
"Disable requesting OAuth refresh tokens (omit offline_access unless explicitly provided)",
diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java
index 4b7398244..0a1a2510d 100644
--- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java
+++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java
@@ -123,6 +123,15 @@ public ImmutableSessionInfo createSession(
if (schema != null) {
request.setSchema(schema);
}
+ // Bounded SEA inline Arrow: tell server not to use cloud storage for results
+ if (connectionContext.isBoundedSeaApiEnabled() && !connectionContext.isCloudFetchEnabled()) {
+ if (sessionConf == null) {
+ sessionConf = new java.util.HashMap<>();
+ } else {
+ sessionConf = new java.util.HashMap<>(sessionConf);
+ }
+ sessionConf.put("can_cloud_download", "false");
+ }
if (sessionConf != null && !sessionConf.isEmpty()) {
request.setSessionConfigs(sessionConf);
}
@@ -529,6 +538,10 @@ public ChunkLinkFetchResult getResultChunks(
GetStatementResultChunkNRequest request =
new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex);
String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex);
+ // Bounded SEA API: always send row_offset (even 0 for chunk 0)
+ if (connectionContext.isBoundedSeaApiEnabled()) {
+ path = path + "?row_offset=" + chunkStartRowOffset;
+ }
try {
Request req = new Request(Request.GET, path, apiClient.serialize(request));
req.withHeaders(getHeaders("getStatementResultN"));
@@ -581,15 +594,27 @@ private ChunkLinkFetchResult buildChunkLinkFetchResult(Collection
@Override
public ResultData getResultChunksData(StatementId typedStatementId, long chunkIndex)
throws DatabricksSQLException {
+ return getResultChunksData(typedStatementId, chunkIndex, 0);
+ }
+
+ /**
+ * Fetches inline result data for a specific chunk, with row_offset for bounded SEA inline Arrow.
+ */
+ public ResultData getResultChunksData(
+ StatementId typedStatementId, long chunkIndex, long rowOffset) throws DatabricksSQLException {
DatabricksThreadContextHolder.setStatementId(typedStatementId);
String statementId = typedStatementId.toSQLExecStatementId();
LOGGER.debug(
- "public ResultData getResultChunksData(String statementId = {}, long chunkIndex = {})",
+ "getResultChunksData(statementId={}, chunkIndex={}, rowOffset={})",
statementId,
- chunkIndex);
+ chunkIndex,
+ rowOffset);
GetStatementResultChunkNRequest request =
new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex);
String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex);
+ if (connectionContext.isBoundedSeaApiEnabled() && rowOffset > 0) {
+ path = path + "?row_offset=" + rowOffset;
+ }
try {
Request req = new Request(Request.GET, path, apiClient.serialize(request));
req.withHeaders(getHeaders("getStatementResultN"));
@@ -713,17 +738,26 @@ private ExecuteStatementRequest getRequest(
boolean executeAsync)
throws SQLException {
boolean cloudFetch = useCloudFetchForResult();
- Format format = cloudFetch ? Format.ARROW_STREAM : Format.JSON_ARRAY;
+ // Bounded SEA inline Arrow: use ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS even without CloudFetch
+ boolean boundedSeaInline = connectionContext.isBoundedSeaApiEnabled() && !cloudFetch;
+ Format format = (cloudFetch || boundedSeaInline) ? Format.ARROW_STREAM : Format.JSON_ARRAY;
Disposition defaultDisposition =
connectionContext.isSqlExecHybridResultsEnabled()
? Disposition.INLINE_OR_EXTERNAL_LINKS
: Disposition.EXTERNAL_LINKS;
- Disposition disposition = cloudFetch ? defaultDisposition : Disposition.INLINE;
+ Disposition disposition;
+ if (cloudFetch) {
+ disposition = defaultDisposition;
+ } else if (boundedSeaInline) {
+ disposition = Disposition.INLINE_OR_EXTERNAL_LINKS;
+ } else {
+ disposition = Disposition.INLINE;
+ }
long maxRows =
(parentStatement == null) ? DEFAULT_RESULT_ROW_LIMIT : parentStatement.getMaxRows();
CompressionCodec compressionCodec = session.getCompressionCodec();
- if (disposition.equals(Disposition.INLINE)) {
- LOGGER.debug("Results are inline, skipping compression.");
+ if (disposition.equals(Disposition.INLINE) && !boundedSeaInline) {
+ LOGGER.debug("Results are inline JSON, skipping compression.");
compressionCodec = CompressionCodec.NONE;
}
List parameterListItems =