Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## [Unreleased]

### Added
- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends,
ensuring consistent behavior for SQL warehouses regardless of underlying
protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.
- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support.

### Updated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1");
}

@Override
public boolean isBoundedSeaApiEnabled() {
return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1");
}

@Override
public int getThriftMaxBatchesInMemory() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum ResultSetType {
private ResultSetType resultSetType = ResultSetType.UNASSIGNED;

private boolean complexDatatypeSupport = false;
private boolean boundedSeaApiEnabled = false;

// Cached telemetry collector resolved once at construction time to avoid
// per-row overhead in next(). The connection-to-collector mapping is stable
Expand Down Expand Up @@ -128,6 +129,7 @@ public DatabricksResultSet(
resultSetMetaData = null;
}
this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled();
this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled();
this.statementType = statementType;
this.updateCount = null;
this.parentStatement = parentStatement;
Expand Down Expand Up @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException {
public boolean isAfterLast() throws SQLException {
checkIfClosed();
// Account for client-side maxRows truncation
return truncatedByMaxRows
|| executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
if (truncatedByMaxRows) {
return true;
}
// Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
// which derives end-of-stream from next_chunk_index via the chunk provider.
if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
}

@Override
Expand Down Expand Up @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException {
|| executionResult instanceof StreamingInlineArrowResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
// Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,17 @@ public ArrowStreamResult(
this.session = session;
// Check if the result data contains the arrow data inline
boolean isInlineArrow = resultData.getAttachment() != null;
if (isInlineArrow) {
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
if (isInlineArrow
&& connectionContext.isBoundedSeaApiEnabled()
&& !connectionContext.isCloudFetchEnabled()) {
// Bounded SEA inline Arrow: multi-chunk lazy fetch via GetResultData with row_offset
LOGGER.debug(
"Creating ArrowStreamResult with SEA inline Arrow for statementId: {}",
statementId.toSQLExecStatementId());
this.chunkProvider =
new SeaInlineArrowChunkProvider(resultData, resultManifest, statementId, session);
} else if (isInlineArrow) {
LOGGER.debug(
"Creating ArrowStreamResult with inline attachment for statementId: {}",
statementId.toSQLExecStatementId());
Expand Down Expand Up @@ -102,7 +112,9 @@ private static ChunkProvider createRemoteChunkProvider(

IDatabricksConnectionContext connectionContext = session.getConnectionContext();

if (connectionContext.isStreamingChunkProviderEnabled()) {
// Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count
if (connectionContext.isStreamingChunkProviderEnabled()
|| connectionContext.isBoundedSeaApiEnabled()) {
LOGGER.info(
"Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId());

Expand All @@ -113,10 +125,13 @@ private static ChunkProvider createRemoteChunkProvider(
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();

// Convert ExternalLinks to ChunkLinkFetchResult for the provider
// Convert ExternalLinks to ChunkLinkFetchResult for the provider.
// Bounded SEA API: pass null for totalChunkCount — we must not depend on
// manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract.
Long totalChunkCount =
connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount();
ChunkLinkFetchResult initialLinks =
convertToChunkLinkFetchResult(
resultData.getExternalLinks(), resultManifest.getTotalChunkCount());
convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount);

return new StreamingChunkProvider(
linkFetcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId)
@Override
public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset)
throws SQLException {
// SEA uses startChunkIndex; startRowOffset is ignored
// SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API
LOGGER.debug(
"Fetching links starting from chunk index {} for statement {}",
startChunkIndex,
Expand All @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset

@Override
public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException {
// SEA uses chunkIndex; rowOffset is ignored
// SEA uses chunkIndex; rowOffset is passed through for bounded SEA API
LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId);

ChunkLinkFetchResult result =
Expand Down
Loading
Loading