diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index d27a25e26..708fbe50c 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -3,6 +3,11 @@ ## [Unreleased] ### Added +- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running). +- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends, + ensuring consistent behavior for SQL warehouses regardless of underlying + protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`. +- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support. ### Updated diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index 956695ab5..6a793a123 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1"); } + @Override + public boolean isBoundedSeaApiEnabled() { + return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1"); + } + @Override public int getThriftMaxBatchesInMemory() { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index 540699407..5461d77d9 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -77,6 +77,7 @@ enum ResultSetType { private ResultSetType resultSetType = ResultSetType.UNASSIGNED; private boolean complexDatatypeSupport = false; + private boolean boundedSeaApiEnabled = false; // Cached telemetry collector resolved once at construction time to avoid // per-row overhead in next(). The connection-to-collector mapping is stable @@ -128,6 +129,7 @@ public DatabricksResultSet( resultSetMetaData = null; } this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled(); + this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled(); this.statementType = statementType; this.updateCount = null; this.parentStatement = parentStatement; @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException { public boolean isAfterLast() throws SQLException { checkIfClosed(); // Account for client-side maxRows truncation - return truncatedByMaxRows - || executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); + if (truncatedByMaxRows) { + return true; + } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + // which derives end-of-stream from next_chunk_index via the chunk provider. + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } + return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows(); } @Override @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException { || executionResult instanceof StreamingInlineArrowResult) { return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); } + // Bounded SEA API: manifest.total_row_count is not populated, so use hasNext() + if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) { + return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext(); + } return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1; } diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 1569b7efb..0a652766d 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -64,7 +64,17 @@ public ArrowStreamResult( this.session = session; // Check if the result data contains the arrow data inline boolean isInlineArrow = resultData.getAttachment() != null; - if (isInlineArrow) { + IDatabricksConnectionContext connectionContext = session.getConnectionContext(); + if (isInlineArrow + && connectionContext.isBoundedSeaApiEnabled() + && !connectionContext.isCloudFetchEnabled()) { + // Bounded SEA inline Arrow: multi-chunk lazy fetch via GetResultData with row_offset + LOGGER.debug( + "Creating ArrowStreamResult with SEA inline Arrow for statementId: {}", + statementId.toSQLExecStatementId()); + this.chunkProvider = + new SeaInlineArrowChunkProvider(resultData, resultManifest, statementId, session); + } else if (isInlineArrow) { LOGGER.debug( "Creating ArrowStreamResult with inline attachment for statementId: {}", statementId.toSQLExecStatementId()); @@ -102,7 +112,9 @@ private static ChunkProvider createRemoteChunkProvider( IDatabricksConnectionContext connectionContext = session.getConnectionContext(); - if (connectionContext.isStreamingChunkProviderEnabled()) { + // Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count + if (connectionContext.isStreamingChunkProviderEnabled() + || connectionContext.isBoundedSeaApiEnabled()) { LOGGER.info( "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); @@ -113,10 +125,13 @@ private static ChunkProvider createRemoteChunkProvider( int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - // Convert ExternalLinks to ChunkLinkFetchResult for the provider + // Convert ExternalLinks to ChunkLinkFetchResult for the provider. + // Bounded SEA API: pass null for totalChunkCount — we must not depend on + // manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract. + Long totalChunkCount = + connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount(); ChunkLinkFetchResult initialLinks = - convertToChunkLinkFetchResult( - resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); + convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount); return new StreamingChunkProvider( linkFetcher, diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java index 9be8dccf1..7df1586cc 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaChunkLinkFetcher.java @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId) @Override public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset) throws SQLException { - // SEA uses startChunkIndex; startRowOffset is ignored + // SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API LOGGER.debug( "Fetching links starting from chunk index {} for statement {}", startChunkIndex, @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset @Override public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException { - // SEA uses chunkIndex; rowOffset is ignored + // SEA uses chunkIndex; rowOffset is passed through for bounded SEA API LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId); ChunkLinkFetchResult result = diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java new file mode 100644 index 000000000..dfc5bb352 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/SeaInlineArrowChunkProvider.java @@ -0,0 +1,437 @@ +package com.databricks.jdbc.api.impl.arrow; + +import static com.databricks.jdbc.common.util.DecompressionUtil.decompress; + +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; +import com.databricks.jdbc.api.internal.IDatabricksSession; +import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.dbclient.impl.common.StatementId; +import com.databricks.jdbc.dbclient.impl.sqlexec.DatabricksSdkClient; +import com.databricks.jdbc.exception.DatabricksSQLException; +import com.databricks.jdbc.log.JdbcLogger; +import com.databricks.jdbc.log.JdbcLoggerFactory; +import com.databricks.jdbc.model.core.ResultData; +import com.databricks.jdbc.model.core.ResultManifest; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; +import java.io.ByteArrayInputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * ChunkProvider for SEA inline Arrow results with rolling-window prefetch. Fetches chunks via + * GetResultData with row_offset, using a background thread to prefetch ahead of the consumer. + * + *

Design mirrors {@link com.databricks.jdbc.api.impl.streaming.ThriftStreamingProvider + * ThriftStreamingProvider} for consistency: uses Lock + Condition for signaling, ConcurrentHashMap + * for indexed batch storage, and explicit batchesInMemory tracking for backpressure. + * + *

Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0. + */ +class SeaInlineArrowChunkProvider implements ChunkProvider { + + private static final JdbcLogger LOGGER = + JdbcLoggerFactory.getLogger(SeaInlineArrowChunkProvider.class); + private static final String PREFETCH_THREAD_NAME = "sea-inline-prefetcher"; + + // Configuration + private final int maxBatchesInMemory; + private final int chunkReadyTimeoutSeconds; + + // Dependencies + private final IDatabricksSession session; + private final StatementId statementId; + private final CompressionCodec compressionCodec; + + // Indexed chunk storage (mirrors ThriftStreamingProvider's ConcurrentHashMap) + private final ConcurrentMap chunks = new ConcurrentHashMap<>(); + + // Position tracking + private final AtomicLong currentChunkIndex = new AtomicLong(-1); + private final AtomicLong highestFetchedChunkIndex = new AtomicLong(-1); + // Single-writer: nextFetchChunkIndex and nextFetchRowOffset are only written by + // the prefetch thread (fetchNextChunkInternal), so separate AtomicLongs are safe. + // Unlike StreamingChunkProvider where download threads can also update the position + // (requiring a bundled FetchPosition holder), this provider has no second writer. + private final AtomicLong nextFetchChunkIndex = new AtomicLong(1); // 0 is initial chunk + private final AtomicLong nextFetchRowOffset = new AtomicLong(0); + private final AtomicLong totalRowCount = new AtomicLong(0); + + // State and synchronization (mirrors ThriftStreamingProvider's lock + conditions) + private volatile boolean endOfStream = false; + private volatile boolean closed = false; + private volatile DatabricksSQLException prefetchError = null; + private final ReentrantLock prefetchLock = new ReentrantLock(); + private final Condition chunkAvailable = prefetchLock.newCondition(); + private final Condition consumerAdvanced = prefetchLock.newCondition(); + private final AtomicInteger chunksInMemory = new AtomicInteger(0); + + // Prefetch thread + private final Thread prefetchThread; + + SeaInlineArrowChunkProvider( + ResultData initialResultData, + ResultManifest resultManifest, + StatementId statementId, + IDatabricksSession session) + throws DatabricksSQLException { + this.session = session; + this.statementId = statementId; + this.compressionCodec = resultManifest.getResultCompression(); + + IDatabricksConnectionContext ctx = session.getConnectionContext(); + int configuredMax = ctx.getThriftMaxBatchesInMemory(); + // Need at least 2 to enable any prefetching (same as ThriftStreamingProvider) + if (configuredMax < 2) { + LOGGER.warn( + "Configured maxBatchesInMemory={} is less than the minimum of 2; using 2 instead.", + configuredMax); + } + this.maxBatchesInMemory = Math.max(2, configuredMax); + this.chunkReadyTimeoutSeconds = ctx.getChunkReadyTimeoutSeconds(); + + // Process initial chunk + ArrowResultChunk firstChunk = processResultData(initialResultData); + long rowCount = initialResultData.getRowCount() != null ? initialResultData.getRowCount() : 0; + long rowOffset = + initialResultData.getRowOffset() != null ? initialResultData.getRowOffset() : 0; + totalRowCount.addAndGet(rowCount); + nextFetchRowOffset.set(rowOffset + rowCount); + + // Store initial chunk + chunks.put(0L, firstChunk); + highestFetchedChunkIndex.set(0); + chunksInMemory.incrementAndGet(); + + // Determine if there are more chunks + if (initialResultData.getNextChunkIndex() != null) { + nextFetchChunkIndex.set(initialResultData.getNextChunkIndex()); + } else { + endOfStream = true; + } + + // Start prefetch thread (mirrors ThriftStreamingProvider) + this.prefetchThread = new Thread(this::prefetchLoop, PREFETCH_THREAD_NAME); + this.prefetchThread.setDaemon(true); + this.prefetchThread.start(); + + notifyConsumerAdvanced(); + + LOGGER.debug( + "SeaInlineArrowChunkProvider created: statement={}, maxBatches={}, endOfStream={}", + statementId.toSQLExecStatementId(), + maxBatchesInMemory, + endOfStream); + } + + @Override + public boolean hasNextChunk() { + if (closed) return false; + if (!endOfStream) return true; + return currentChunkIndex.get() < highestFetchedChunkIndex.get(); + } + + @Override + public boolean next() throws DatabricksSQLException { + if (closed) return false; + + checkPrefetchError(); + + // Release previous chunk + long prevIndex = currentChunkIndex.get(); + if (prevIndex >= 0) { + releaseChunk(prevIndex); + } + + if (!hasNextChunk()) { + return false; + } + + long nextIndex = currentChunkIndex.incrementAndGet(); + notifyConsumerAdvanced(); + + // Wait for the chunk to be available (mirrors ThriftStreamingProvider.getCurrentBatch) + ArrowResultChunk chunk = chunks.get(nextIndex); + if (chunk == null) { + LOGGER.debug("Chunk {} not yet available, waiting for prefetch", nextIndex); + waitForChunkCreation(nextIndex); + chunk = chunks.get(nextIndex); + } + + if (chunk == null) { + LOGGER.error("Chunk {} not found after waiting", nextIndex); + throw new DatabricksSQLException( + "Chunk " + nextIndex + " not found after waiting", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + + return true; + } + + @Override + public ArrowResultChunk getChunk() { + long idx = currentChunkIndex.get(); + if (idx < 0) return null; + return chunks.get(idx); + } + + @Override + public void close() { + if (closed) return; + + LOGGER.debug("Closing SeaInlineArrowChunkProvider, total rows: {}", totalRowCount.get()); + closed = true; + + notifyConsumerAdvanced(); + notifyChunkAvailable(); + + // Interrupt and wait for prefetch thread (mirrors ThriftStreamingProvider) + prefetchThread.interrupt(); + try { + prefetchThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("Interrupted while waiting for prefetch thread to terminate"); + } + + // Release all chunks + for (ArrowResultChunk chunk : chunks.values()) { + try { + chunk.releaseChunk(); + } catch (Exception e) { + LOGGER.warn("Error releasing chunk during close: {}", e.getMessage(), e); + } + } + chunks.clear(); + } + + @Override + public long getRowCount() { + return totalRowCount.get(); + } + + @Override + public long getChunkCount() { + return currentChunkIndex.get() + 1; + } + + @Override + public boolean isClosed() { + return closed; + } + + // ==================== Prefetch Logic ==================== + + /** Background prefetch loop — mirrors ThriftStreamingProvider.prefetchLoop(). */ + private void prefetchLoop() { + LOGGER.debug("Prefetch thread started"); + + while (!closed && !Thread.currentThread().isInterrupted()) { + try { + // Wait until queue has room (backpressure from slow consumer) + prefetchLock.lock(); + try { + while (!closed && !endOfStream && chunksInMemory.get() >= maxBatchesInMemory) { + LOGGER.debug( + "Prefetch waiting: chunks={}/{}", chunksInMemory.get(), maxBatchesInMemory); + consumerAdvanced.await(); + } + } finally { + prefetchLock.unlock(); + } + + if (closed || endOfStream) break; + + fetchNextChunkInternal(); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("Prefetch thread interrupted"); + break; + } catch (DatabricksSQLException e) { + LOGGER.error("Prefetch error: {}", e.getMessage()); + prefetchError = e; + notifyChunkAvailable(); + break; + } catch (Exception e) { + LOGGER.error("Unexpected prefetch error: {}", e.getMessage(), e); + prefetchError = + new DatabricksSQLException( + "Unexpected prefetch error: " + e.getMessage(), + e, + DatabricksDriverErrorCode.SDK_CLIENT_ERROR); + notifyChunkAvailable(); + break; + } + } + + LOGGER.debug("Prefetch thread exiting"); + } + + /** + * Fetches a single chunk from the server — mirrors + * ThriftStreamingProvider.fetchNextBatchInternal(). + */ + private void fetchNextChunkInternal() throws DatabricksSQLException { + long chunkIndex = nextFetchChunkIndex.get(); + long rowOffset = nextFetchRowOffset.get(); + + LOGGER.debug("Fetching inline chunk {} at offset {}", chunkIndex, rowOffset); + + DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient(); + ResultData resultData = client.getResultChunksData(statementId, chunkIndex, rowOffset); + + ArrowResultChunk chunk = processResultData(resultData); + long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; + + // Store chunk and update state + chunks.put(chunkIndex, chunk); + chunksInMemory.incrementAndGet(); + highestFetchedChunkIndex.updateAndGet(cur -> Math.max(cur, chunkIndex)); + totalRowCount.addAndGet(rowCount); + nextFetchRowOffset.addAndGet(rowCount); + + LOGGER.debug( + "Chunk {} ready: rowCount={}, hasMore={}", + chunkIndex, + rowCount, + resultData.getNextChunkIndex() != null); + + // Update continuation + if (resultData.getNextChunkIndex() != null) { + nextFetchChunkIndex.set(resultData.getNextChunkIndex()); + } else { + endOfStream = true; + LOGGER.debug("End of stream at chunk {}", chunkIndex); + } + + notifyChunkAvailable(); + } + + // ==================== Resource Management ==================== + + private void releaseChunk(long chunkIndex) { + ArrowResultChunk chunk = chunks.remove(chunkIndex); + if (chunk != null) { + // Decrement counter BEFORE release to prevent prefetch stall if release throws + chunksInMemory.decrementAndGet(); + try { + chunk.releaseChunk(); + } catch (Exception e) { + LOGGER.warn("Error releasing chunk {}: {}", chunkIndex, e.getMessage(), e); + } + LOGGER.debug("Released chunk {}, chunks in memory: {}", chunkIndex, chunksInMemory.get()); + notifyConsumerAdvanced(); + } + } + + /** + * Waits for a chunk to be created by the prefetch thread. Mirrors + * ThriftStreamingProvider.waitForBatchCreation(). + */ + private void waitForChunkCreation(long chunkIndex) throws DatabricksSQLException { + prefetchLock.lock(); + try { + long waitStartTime = System.currentTimeMillis(); + long timeoutMillis = chunkReadyTimeoutSeconds * 1000L; + + while (!closed && !chunks.containsKey(chunkIndex)) { + checkPrefetchError(); + if (endOfStream && chunkIndex > highestFetchedChunkIndex.get()) { + LOGGER.error( + "Chunk {} does not exist (highest fetched: {})", + chunkIndex, + highestFetchedChunkIndex.get()); + throw new DatabricksSQLException( + "Chunk " + + chunkIndex + + " does not exist (highest: " + + highestFetchedChunkIndex.get() + + ")", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + + long elapsedMillis = System.currentTimeMillis() - waitStartTime; + if (elapsedMillis >= timeoutMillis) { + LOGGER.error( + "Timeout waiting for chunk {} to be created (timeout: {}s)", + chunkIndex, + chunkReadyTimeoutSeconds); + throw new DatabricksSQLException( + "Timeout waiting for chunk " + + chunkIndex + + " to be created (timeout: " + + chunkReadyTimeoutSeconds + + "s)", + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + + try { + long remainingMillis = timeoutMillis - elapsedMillis; + chunkAvailable.await(remainingMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted waiting for chunk {} creation", chunkIndex); + throw new DatabricksSQLException( + "Interrupted waiting for chunk", + e, + DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR); + } + } + } finally { + prefetchLock.unlock(); + } + } + + private void checkPrefetchError() throws DatabricksSQLException { + if (prefetchError != null) { + LOGGER.error("Prefetch failed: {}", prefetchError.getMessage(), prefetchError); + throw new DatabricksSQLException( + "Prefetch failed: " + prefetchError.getMessage(), + prefetchError, + DatabricksDriverErrorCode.CHUNK_READY_ERROR); + } + } + + private void notifyConsumerAdvanced() { + prefetchLock.lock(); + try { + consumerAdvanced.signalAll(); + } finally { + prefetchLock.unlock(); + } + } + + private void notifyChunkAvailable() { + prefetchLock.lock(); + try { + chunkAvailable.signalAll(); + } finally { + prefetchLock.unlock(); + } + } + + // ==================== Data Processing ==================== + + /** Decompresses attachment bytes and creates an ArrowResultChunk. */ + private ArrowResultChunk processResultData(ResultData resultData) throws DatabricksSQLException { + byte[] attachment = resultData.getAttachment(); + if (attachment == null || attachment.length == 0) { + throw new DatabricksSQLException( + "No inline Arrow data (attachment) in result", + DatabricksDriverErrorCode.RESULT_SET_ERROR); + } + + byte[] decompressedBytes = + decompress(attachment, compressionCodec, "SEA inline Arrow chunk decompression"); + + long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0; + return ArrowResultChunk.builder() + .withInputStream(new ByteArrayInputStream(decompressedBytes), rowCount) + .build(); + } +} diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java index efc664b24..9d9c183ee 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java @@ -1,6 +1,8 @@ package com.databricks.jdbc.api.impl.arrow; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.common.CompressionCodec; +import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; import com.databricks.jdbc.dbclient.IDatabricksHttpClient; import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.log.JdbcLogger; @@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable { private final LinkRefresher linkRefresher; private final double cloudFetchSpeedThreshold; + // Capture caller's thread context for telemetry/logging on the download thread + private final IDatabricksConnectionContext connectionContext; + private final String statementId; + public StreamingChunkDownloadTask( ArrowResultChunk chunk, IDatabricksHttpClient httpClient, @@ -40,13 +46,21 @@ public StreamingChunkDownloadTask( this.compressionCodec = compressionCodec; this.linkRefresher = linkRefresher; this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold; + this.connectionContext = DatabricksThreadContextHolder.getConnectionContext(); + this.statementId = DatabricksThreadContextHolder.getStatementId(); } @Override public Void call() throws DatabricksSQLException { int retries = 0; boolean downloadSuccessful = false; + Throwable uncaughtException = null; + + // Propagate caller's thread context for telemetry/logging + DatabricksThreadContextHolder.setConnectionContext(this.connectionContext); + DatabricksThreadContextHolder.setStatementId(this.statementId); + long taskStartTime = System.nanoTime(); try { while (!downloadSuccessful) { try { @@ -62,7 +76,12 @@ public Void call() throws DatabricksSQLException { chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold); downloadSuccessful = true; - LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex()); + long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000; + LOGGER.debug( + "Chunk download complete: chunkIndex={}, totalMs={}, retries={}", + chunk.getChunkIndex(), + taskTotalMs, + retries); } catch (IOException | SQLException e) { retries++; @@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException { chunk.getChunkIndex(), MAX_RETRIES, e.getMessage()); - // Status will be set to DOWNLOAD_FAILED in the finally block + // Status set to DOWNLOAD_FAILED in the finally block throw new DatabricksSQLException( String.format( "Failed to download chunk %d after %d attempts", @@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException { } } } + } catch (Throwable t) { + uncaughtException = t; + throw t; } finally { if (downloadSuccessful) { chunk.getChunkReadyFuture().complete(null); } else { + LOGGER.info( + "Download failed for chunk {}: {}", + chunk.getChunkIndex(), + uncaughtException != null ? uncaughtException.getMessage() : "unknown"); chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); chunk .getChunkReadyFuture() .completeExceptionally( new DatabricksSQLException( "Download failed for chunk " + chunk.getChunkIndex(), + uncaughtException, DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR)); } + + DatabricksThreadContextHolder.clearAllContext(); } return null; diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java index dbb1bbe78..a4d12e88b 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java @@ -47,6 +47,17 @@ */ public class StreamingChunkProvider implements ChunkProvider { + /** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */ + private static final class FetchPosition { + final long chunkIndex; + final long rowOffset; + + FetchPosition(long chunkIndex, long rowOffset) { + this.chunkIndex = chunkIndex; + this.rowOffset = rowOffset; + } + } + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(StreamingChunkProvider.class); private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-"; @@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider { // - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency private final AtomicLong currentChunkIndex = new AtomicLong(-1); private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1); - private volatile long nextLinkFetchIndex = 0; - private volatile long nextRowOffsetToFetch = 0; + // Bundled as an immutable pair for atomic reads/writes across threads. + // The prefetch thread reads this (fetchNextLinkBatch) while the download thread + // may update it (getRefreshedLink reconciliation). A volatile reference to an + // immutable holder ensures both fields are always read consistently. + private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0); private final AtomicLong nextDownloadIndex = new AtomicLong(0); // State flags @@ -347,11 +361,11 @@ private void linkPrefetchLoop() { long targetIndex = currentChunkIndex.get() + linkPrefetchWindow; // Wait if we're caught up - while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) { + while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) { if (closed) break; LOGGER.debug( "Prefetch caught up, waiting for consumer. next={}, target={}", - nextLinkFetchIndex, + nextFetchPosition.chunkIndex, targetIndex); consumerAdvanced.await(); targetIndex = currentChunkIndex.get() + linkPrefetchWindow; @@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException { return; } + FetchPosition pos = nextFetchPosition; LOGGER.debug( "Fetching links starting from index {}, row offset {} for statement {}", - nextLinkFetchIndex, - nextRowOffsetToFetch, + pos.chunkIndex, + pos.rowOffset, statementId); - ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch); + ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset); if (result.isEndOfStream()) { LOGGER.info("End of stream reached for statement {}", statementId); @@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException { createChunkFromLink(link); } - // Update next fetch positions + // Update next fetch position atomically if (result.hasMore()) { - nextLinkFetchIndex = result.getNextFetchIndex(); - nextRowOffsetToFetch = result.getNextRowOffset(); + nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); } else { endOfStreamReached = true; LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId); @@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) createChunkFromLink(link); } - // Set next fetch positions using unified API + // Set next fetch position atomically if (initialLinks.hasMore()) { - nextLinkFetchIndex = initialLinks.getNextFetchIndex(); - nextRowOffsetToFetch = initialLinks.getNextRowOffset(); + FetchPosition pos = + new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset()); + nextFetchPosition = pos; LOGGER.debug( "Next fetch position set to chunk index {}, row offset {} from initial links", - nextLinkFetchIndex, - nextRowOffsetToFetch); + pos.chunkIndex, + pos.rowOffset); } else { endOfStreamReached = true; LOGGER.info("End of stream reached from initial links for statement {}", statementId); @@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks) */ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException { long chunkIndex = link.getChunkIndex(); - if (chunks.containsKey(chunkIndex)) { - LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); - return; - } - long rowCount = link.getRowCount(); long rowOffset = link.getRowOffset(); @@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce .build(); chunk.setChunkLink(link); - chunks.put(chunkIndex, chunk); + + // Atomic insert — if another thread already created this chunk, skip. + // This is safe because createChunkFromLink can be called concurrently from + // the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink). + ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk); + if (existing != null) { + LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex); + return; + } + highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex)); totalRowCount.addAndGet(rowCount); @@ -596,12 +615,13 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti // Single batch FetchResults RPC from the lowest expired offset ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset); - // Update ALL pre-download chunks that received fresh links. - // Always overwrite even if the current link hasn't expired yet, since the - // server-provided link has a later expiry and prevents near-expiry races. + // Reconcile ALL links from the refresh response with local chunk state. for (ExternalLink link : result.getChunkLinks()) { ArrowResultChunk c = chunks.get(link.getChunkIndex()); if (c != null) { + // Existing chunk: update link only for pre-download states. + // DOWNLOADING stays as-is (download task owns the state machine). + // DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory). ChunkStatus status = c.getStatus(); if (status == ChunkStatus.PENDING || status == ChunkStatus.URL_FETCHED @@ -609,9 +629,34 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti || status == ChunkStatus.DOWNLOAD_RETRY) { c.setChunkLink(link); } + } else { + // Server returned a chunk not yet in our map — create it. + // Handles cases where refresh response includes chunks beyond + // our current highestKnownChunkIndex. + try { + createChunkFromLink(link); + } catch (Exception e) { + LOGGER.warn( + "Failed to create chunk {} from refresh response: {}", + link.getChunkIndex(), + e.getMessage()); + } } } + // Update end-of-stream and prefetch position from refresh response + if (!result.hasMore()) { + endOfStreamReached = true; + } else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) { + // Avoid re-fetching chunks that the refresh already discovered. + // Atomic update of both fields via immutable holder. + nextFetchPosition = + new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset()); + } + + // Trigger downloads for any newly-created chunks + triggerDownloads(); + // Check if our target chunk was refreshed by the batch targetChunk = chunks.get(chunkIndex); if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) { diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index 8ca5ae0cd..5754ec0da 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() { */ boolean isCloudFetchEnabled(); + /** Returns whether bounded SEA API mode is enabled for CloudFetch. */ + boolean isBoundedSeaApiEnabled(); + /** * Returns the maximum number of batches to keep in memory for Thrift streaming. * diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index 639288248..b9a70f1a8 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -204,6 +204,10 @@ public enum DatabricksJdbcUrlParams { "EnableSeaSyncMetadata", "Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode", "1"), + USE_BOUNDED_SEA_API( + "UseBoundedSeaApi", + "Use bounded SEA API for CloudFetch: send row_offset on GetResultData, force StreamingChunkProvider, stop relying on total_chunk_count. Requires server support.", + "0"), DISABLE_OAUTH_REFRESH_TOKEN( "DisableOauthRefreshToken", "Disable requesting OAuth refresh tokens (omit offline_access unless explicitly provided)", diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index 4b7398244..0a1a2510d 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -123,6 +123,15 @@ public ImmutableSessionInfo createSession( if (schema != null) { request.setSchema(schema); } + // Bounded SEA inline Arrow: tell server not to use cloud storage for results + if (connectionContext.isBoundedSeaApiEnabled() && !connectionContext.isCloudFetchEnabled()) { + if (sessionConf == null) { + sessionConf = new java.util.HashMap<>(); + } else { + sessionConf = new java.util.HashMap<>(sessionConf); + } + sessionConf.put("can_cloud_download", "false"); + } if (sessionConf != null && !sessionConf.isEmpty()) { request.setSessionConfigs(sessionConf); } @@ -529,6 +538,10 @@ public ChunkLinkFetchResult getResultChunks( GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + // Bounded SEA API: always send row_offset (even 0 for chunk 0) + if (connectionContext.isBoundedSeaApiEnabled()) { + path = path + "?row_offset=" + chunkStartRowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN")); @@ -581,15 +594,27 @@ private ChunkLinkFetchResult buildChunkLinkFetchResult(Collection @Override public ResultData getResultChunksData(StatementId typedStatementId, long chunkIndex) throws DatabricksSQLException { + return getResultChunksData(typedStatementId, chunkIndex, 0); + } + + /** + * Fetches inline result data for a specific chunk, with row_offset for bounded SEA inline Arrow. + */ + public ResultData getResultChunksData( + StatementId typedStatementId, long chunkIndex, long rowOffset) throws DatabricksSQLException { DatabricksThreadContextHolder.setStatementId(typedStatementId); String statementId = typedStatementId.toSQLExecStatementId(); LOGGER.debug( - "public ResultData getResultChunksData(String statementId = {}, long chunkIndex = {})", + "getResultChunksData(statementId={}, chunkIndex={}, rowOffset={})", statementId, - chunkIndex); + chunkIndex, + rowOffset); GetStatementResultChunkNRequest request = new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex); String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex); + if (connectionContext.isBoundedSeaApiEnabled() && rowOffset > 0) { + path = path + "?row_offset=" + rowOffset; + } try { Request req = new Request(Request.GET, path, apiClient.serialize(request)); req.withHeaders(getHeaders("getStatementResultN")); @@ -713,17 +738,26 @@ private ExecuteStatementRequest getRequest( boolean executeAsync) throws SQLException { boolean cloudFetch = useCloudFetchForResult(); - Format format = cloudFetch ? Format.ARROW_STREAM : Format.JSON_ARRAY; + // Bounded SEA inline Arrow: use ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS even without CloudFetch + boolean boundedSeaInline = connectionContext.isBoundedSeaApiEnabled() && !cloudFetch; + Format format = (cloudFetch || boundedSeaInline) ? Format.ARROW_STREAM : Format.JSON_ARRAY; Disposition defaultDisposition = connectionContext.isSqlExecHybridResultsEnabled() ? Disposition.INLINE_OR_EXTERNAL_LINKS : Disposition.EXTERNAL_LINKS; - Disposition disposition = cloudFetch ? defaultDisposition : Disposition.INLINE; + Disposition disposition; + if (cloudFetch) { + disposition = defaultDisposition; + } else if (boundedSeaInline) { + disposition = Disposition.INLINE_OR_EXTERNAL_LINKS; + } else { + disposition = Disposition.INLINE; + } long maxRows = (parentStatement == null) ? DEFAULT_RESULT_ROW_LIMIT : parentStatement.getMaxRows(); CompressionCodec compressionCodec = session.getCompressionCodec(); - if (disposition.equals(Disposition.INLINE)) { - LOGGER.debug("Results are inline, skipping compression."); + if (disposition.equals(Disposition.INLINE) && !boundedSeaInline) { + LOGGER.debug("Results are inline JSON, skipping compression."); compressionCodec = CompressionCodec.NONE; } List parameterListItems =