Skip to content

Add SEA inline Arrow support (stacked on #1468)#1470

Open
gopalldb wants to merge 10 commits into
databricks:mainfrom
gopalldb:feature/sea-inline-arrow
Open

Add SEA inline Arrow support (stacked on #1468)#1470
gopalldb wants to merge 10 commits into
databricks:mainfrom
gopalldb:feature/sea-inline-arrow

Conversation

@gopalldb
Copy link
Copy Markdown
Collaborator

Summary

Stacked on #1468 (bounded SEA API CloudFetch support). Please merge #1468 first.

When UseBoundedSeaApi=1 AND EnableQueryResultDownload=0, the driver uses inline Arrow results instead of falling back to JSON_ARRAY:

  • Sends can_cloud_download=false in session conf
  • Keeps INLINE_OR_EXTERNAL_LINKS disposition + ARROW_STREAM format
  • Server returns ArrowIPC data in ResultData.attachment field
  • Subsequent chunks fetched via GetResultData with chunk_index and row_offset
  • End-of-stream: next_chunk_index absent on response

New: SeaInlineArrowChunkProvider

Rolling-window prefetch provider mirroring ThriftStreamingProvider design for consistency:

  • ReentrantLock + Condition signaling (not BlockingQueue)
  • ConcurrentHashMap for indexed chunk storage
  • batchesInMemory counter with explicit backpressure
  • waitForChunkCreation() with elapsed-time tracking
  • checkPrefetchError() + notifyChunkAvailable() for error propagation
  • Thread join with 5s timeout on close
  • Min 2 batches in memory to enable prefetching

Files changed (on top of #1468)

  • SeaInlineArrowChunkProvider.java — new ChunkProvider with rolling-window prefetch
  • ArrowStreamResult.java — wire new provider when isBoundedSeaApiEnabled() && !isCloudFetchEnabled()
  • DatabricksSdkClient.javacan_cloud_download=false in session conf, ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS for inline, getResultChunksData overload with rowOffset

Test plan

  • Verify UseBoundedSeaApi=1 + EnableQueryResultDownload=0 → inline Arrow via SeaInlineArrowChunkProvider
  • Verify multi-chunk inline results iterate completely (end-of-stream when next_chunk_index absent)
  • Verify rolling-window prefetch: background thread fills ahead of consumer, bounded by maxBatchesInMemory
  • Verify can_cloud_download=false sent in session conf
  • Verify EnableQueryResultDownload=0 without UseBoundedSeaApi=1 → unchanged JSON_ARRAY behavior
  • Unit tests pass (core module: 0 failures)

NO_CHANGELOG=true

This pull request was AI-assisted by Isaac.

@gopalldb gopalldb force-pushed the feature/sea-inline-arrow branch from d6b9994 to 5aaeb03 Compare May 27, 2026 08:25
gopalldb added 5 commits May 27, 2026 14:46
Part 1 of bounded SEA API compliance for CloudFetch:

1. New connection property UseBoundedSeaApi (default 0/off). When enabled:
   - Sends row_offset query parameter on GetResultData requests
   - Forces StreamingChunkProvider (which uses next_chunk_index, not
     total_chunk_count) even when streaming is explicitly disabled

2. StreamingChunkProvider already uses next_chunk_index for continuation
   and end-of-stream detection — no changes needed to its core logic.

3. Legacy RemoteChunkProvider (uses total_chunk_count) is bypassed when
   bounded SEA is enabled.

row_offset is derived from the previous link's row_offset + row_count
and sent as a query parameter on /sql/statements/{id}/result/chunks/{idx}.
This is required for future >100GB results and cluster-side fetch.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
During coalesced link refresh, the server may return links for chunks
not yet in the provider's map (newly-discovered chunks beyond
highestKnownChunkIndex). Previously these were silently skipped.

Now: create new chunks from refresh response links, update
highestKnownChunkIndex, and set endOfStreamReached from the response's
hasMore flag. Follows the per-chunk state-machine reconciliation from
the bounded SEA API spec.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Fixes 3 gaps found by comparing with the legacy ChunkDownloadTask:

1. Outer catch(Throwable) + exception chaining in finally: uncaught
   exceptions were lost — the finally block created a generic exception
   without the original cause. Now captures uncaughtException and chains
   it, matching ChunkDownloadTask's pattern.

2. Thread context propagation: download threads had no connection context
   or statementId for telemetry/logging. Now captures caller's context
   via DatabricksThreadContextHolder and clears in finally.

3. Download timing: added task-level timing log (totalMs, retries)
   matching ChunkDownloadTask's diagnostics.

Also includes the RuntimeException catch (parity with PR databricks#1302).

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
P0-1: Remove redundant chunk.setStatus(DOWNLOAD_FAILED) in inner catch
— defer entirely to finally block. Fixes StreamingChunkDownloadTaskTest.

P0-2: Add NEXT_CHANGELOG.md entry under ### Added for UseBoundedSeaApi.

P1-1: Call triggerDownloads() after reconciliation creates new chunks
from refresh response — prevents newly-discovered chunks sitting PENDING.

P1-2/P1-3: Un-gated changes (new chunk creation, EOS from refresh,
triggerDownloads) are intentional parity fixes for all
EnableStreamingChunkProvider=1 users. EnableStreamingChunkProvider
defaults to off, so default users are unaffected.

P1-4: Revert RuntimeException from inner catch — DatabricksError is
caught by outer catch(Throwable) and fails immediately (no retry),
matching ChunkDownloadTask behavior exactly. NPE/ISE won't be retried.

P2-1: Always send row_offset (even 0 for chunk 0) when bounded SEA
enabled — explicit is safer than relying on server default.

P2-3: Update nextLinkFetchIndex after reconciliation to avoid prefetch
thread re-fetching chunks already discovered via refresh.

P2-5: Add "Requires server support" to connection property help text.

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
…on updates

P1-1: Use putIfAbsent in createChunkFromLink to prevent double row-count
and chunk replacement when called concurrently from prefetch and download
threads. Without this, a race could leave a CompletableFuture that is
never completed, causing consumer hangs.

P1-2: Bundle nextLinkFetchIndex + nextRowOffsetToFetch into an immutable
FetchPosition holder updated via volatile reference. This ensures the
prefetch thread always reads a consistent (index, rowOffset) pair, which
matters for bounded SEA API where row_offset is used by the server.

P2-1: Bump reconciliation failure log from DEBUG to WARN for production
visibility.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb force-pushed the feature/sea-inline-arrow branch from 996a458 to 66b8ab7 Compare May 27, 2026 09:17
…h path

Per the bounded SEA API contract, drivers must not depend on
manifest.{chunks, total_chunk_count, total_row_count}. Pass null for
totalChunkCount when converting initial ExternalLinks for
StreamingChunkProvider — the provider derives end-of-stream from
next_chunk_index on ExternalLink instead.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb force-pushed the feature/sea-inline-arrow branch from 66b8ab7 to cb7383a Compare May 27, 2026 17:01
gopalldb added 4 commits May 27, 2026 22:50
…ast/isAfterLast

When boundedSeaApiEnabled=true, isLast() and isAfterLast() now use
hasNext() instead of resultSetMetaData.getTotalRows(). The bounded SEA
API contract does not guarantee manifest.total_row_count is populated;
the chunk providers derive end-of-stream from next_chunk_index instead.

Gated behind isBoundedSeaApiEnabled() + ArrowStreamResult instanceof
check so existing Thrift and non-bounded SEA behavior is unchanged.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
…ownload=0)

When UseBoundedSeaApi=1 AND EnableQueryResultDownload=0:

1. Session conf: sends can_cloud_download=false to tell server to inline
   results instead of using cloud storage.

2. Execute request: uses ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS (instead
   of JSON_ARRAY + INLINE). This gets Arrow IPC data in the attachment
   field rather than JSON in data_array.

3. New SeaInlineArrowChunkProvider: lazily fetches inline Arrow chunks via
   GetResultData with row_offset. Similar to Thrift's
   LazyThriftInlineArrowResult but using SEA's ArrowIPC format in the
   attachment field. Decompresses via DecompressionUtil, creates
   ArrowResultChunk per chunk. End-of-stream when nextChunkIndex is null.

4. getResultChunksData overload: accepts rowOffset parameter, appends
   ?row_offset=N when bounded SEA is enabled.

Without UseBoundedSeaApi=1, EnableQueryResultDownload=0 continues to use
the existing JSON_ARRAY inline format (unchanged).

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
…oring ThriftStreamingProvider

Replace synchronous lazy-fetch design with background prefetch thread
using ReentrantLock + Condition signaling, ConcurrentHashMap for indexed
chunk storage, and explicit chunksInMemory tracking for backpressure.
This ensures consistency with ThriftStreamingProvider's design patterns.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Document why nextFetchChunkIndex and nextFetchRowOffset are safe as
separate AtomicLongs (single prefetch thread writer) unlike
StreamingChunkProvider which needs a bundled FetchPosition holder.

Signed-off-by: Gopal Lal <gopal.lal@databricks.com>

Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
@gopalldb gopalldb force-pushed the feature/sea-inline-arrow branch from cb7383a to 70dfef8 Compare May 27, 2026 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant