Skip to content

Commit eea02ab

Browse files
committed
Add SEA inline Arrow support (UseBoundedSeaApi=1 + EnableQueryResultDownload=0)
When UseBoundedSeaApi=1 AND EnableQueryResultDownload=0: 1. Session conf: sends can_cloud_download=false to tell server to inline results instead of using cloud storage. 2. Execute request: uses ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS (instead of JSON_ARRAY + INLINE). This gets Arrow IPC data in the attachment field rather than JSON in data_array. 3. New SeaInlineArrowChunkProvider: lazily fetches inline Arrow chunks via GetResultData with row_offset. Similar to Thrift's LazyThriftInlineArrowResult but using SEA's ArrowIPC format in the attachment field. Decompresses via DecompressionUtil, creates ArrowResultChunk per chunk. End-of-stream when nextChunkIndex is null. 4. getResultChunksData overload: accepts rowOffset parameter, appends ?row_offset=N when bounded SEA is enabled. Without UseBoundedSeaApi=1, EnableQueryResultDownload=0 continues to use the existing JSON_ARRAY inline format (unchanged). Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 1fa1b2b commit eea02ab

3 files changed

Lines changed: 232 additions & 7 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,17 @@ public ArrowStreamResult(
6464
this.session = session;
6565
// Check if the result data contains the arrow data inline
6666
boolean isInlineArrow = resultData.getAttachment() != null;
67-
if (isInlineArrow) {
67+
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
68+
if (isInlineArrow
69+
&& connectionContext.isBoundedSeaApiEnabled()
70+
&& !connectionContext.isCloudFetchEnabled()) {
71+
// Bounded SEA inline Arrow: multi-chunk lazy fetch via GetResultData with row_offset
72+
LOGGER.debug(
73+
"Creating ArrowStreamResult with SEA inline Arrow for statementId: {}",
74+
statementId.toSQLExecStatementId());
75+
this.chunkProvider =
76+
new SeaInlineArrowChunkProvider(resultData, resultManifest, statementId, session);
77+
} else if (isInlineArrow) {
6878
LOGGER.debug(
6979
"Creating ArrowStreamResult with inline attachment for statementId: {}",
7080
statementId.toSQLExecStatementId());
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package com.databricks.jdbc.api.impl.arrow;
2+
3+
import static com.databricks.jdbc.common.util.DecompressionUtil.decompress;
4+
5+
import com.databricks.jdbc.api.internal.IDatabricksSession;
6+
import com.databricks.jdbc.common.CompressionCodec;
7+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
8+
import com.databricks.jdbc.dbclient.impl.sqlexec.DatabricksSdkClient;
9+
import com.databricks.jdbc.exception.DatabricksSQLException;
10+
import com.databricks.jdbc.log.JdbcLogger;
11+
import com.databricks.jdbc.log.JdbcLoggerFactory;
12+
import com.databricks.jdbc.model.core.ResultData;
13+
import com.databricks.jdbc.model.core.ResultManifest;
14+
import java.io.ByteArrayInputStream;
15+
16+
/**
17+
* ChunkProvider for SEA inline Arrow results (bounded SEA API). Fetches chunks lazily via
18+
* GetResultData with row_offset, similar to Thrift's LazyThriftInlineArrowResult but using SEA's
19+
* ArrowIPC format in the attachment field.
20+
*
21+
* <p>Used when UseBoundedSeaApi=1 AND EnableQueryResultDownload=0 (cloud fetch disabled).
22+
*/
23+
class SeaInlineArrowChunkProvider implements ChunkProvider {
24+
25+
private static final JdbcLogger LOGGER =
26+
JdbcLoggerFactory.getLogger(SeaInlineArrowChunkProvider.class);
27+
28+
private final IDatabricksSession session;
29+
private final StatementId statementId;
30+
private final CompressionCodec compressionCodec;
31+
32+
private ArrowResultChunk currentChunk;
33+
private long currentChunkIndex;
34+
private long nextChunkIndex;
35+
private long nextRowOffset;
36+
private boolean hasMore;
37+
private long totalRowCount;
38+
private boolean isClosed;
39+
40+
/**
41+
* @param initialResultData The first chunk from the ExecuteStatement response
42+
* @param resultManifest Manifest with compression and schema info
43+
* @param statementId Statement ID for subsequent fetch calls
44+
* @param session Session for making GetResultData calls
45+
*/
46+
SeaInlineArrowChunkProvider(
47+
ResultData initialResultData,
48+
ResultManifest resultManifest,
49+
StatementId statementId,
50+
IDatabricksSession session)
51+
throws DatabricksSQLException {
52+
this.session = session;
53+
this.statementId = statementId;
54+
this.compressionCodec = resultManifest.getResultCompression();
55+
this.currentChunkIndex = -1;
56+
this.isClosed = false;
57+
this.totalRowCount = 0;
58+
59+
// Process initial chunk from the execute response
60+
this.currentChunk = processResultData(initialResultData);
61+
this.hasMore = initialResultData.getNextChunkIndex() != null;
62+
if (hasMore) {
63+
this.nextChunkIndex = initialResultData.getNextChunkIndex();
64+
}
65+
long rowCount = initialResultData.getRowCount() != null ? initialResultData.getRowCount() : 0;
66+
long rowOffset =
67+
initialResultData.getRowOffset() != null ? initialResultData.getRowOffset() : 0;
68+
this.nextRowOffset = rowOffset + rowCount;
69+
this.totalRowCount += rowCount;
70+
71+
LOGGER.debug(
72+
"SeaInlineArrowChunkProvider created for statement {}: hasMore={}, nextChunkIndex={}, nextRowOffset={}",
73+
statementId.toSQLExecStatementId(),
74+
hasMore,
75+
nextChunkIndex,
76+
nextRowOffset);
77+
}
78+
79+
@Override
80+
public boolean hasNextChunk() {
81+
// First call: initial chunk not yet consumed
82+
if (currentChunkIndex == -1) {
83+
return true;
84+
}
85+
return hasMore;
86+
}
87+
88+
@Override
89+
public boolean next() throws DatabricksSQLException {
90+
if (currentChunkIndex == -1) {
91+
// First call — return the initial chunk (already loaded)
92+
currentChunkIndex = 0;
93+
return true;
94+
}
95+
96+
if (!hasMore) {
97+
return false;
98+
}
99+
100+
// Fetch next chunk via GetResultData
101+
try {
102+
DatabricksSdkClient client = (DatabricksSdkClient) session.getDatabricksClient();
103+
ResultData resultData =
104+
client.getResultChunksData(statementId, nextChunkIndex, nextRowOffset);
105+
106+
// Release previous chunk
107+
if (currentChunk != null) {
108+
currentChunk.releaseChunk();
109+
}
110+
111+
currentChunk = processResultData(resultData);
112+
currentChunkIndex = nextChunkIndex;
113+
114+
// Update continuation from response
115+
hasMore = resultData.getNextChunkIndex() != null;
116+
if (hasMore) {
117+
nextChunkIndex = resultData.getNextChunkIndex();
118+
}
119+
long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0;
120+
nextRowOffset += rowCount;
121+
totalRowCount += rowCount;
122+
123+
LOGGER.debug(
124+
"Fetched inline chunk {}: rowCount={}, hasMore={}, nextRowOffset={}",
125+
currentChunkIndex,
126+
rowCount,
127+
hasMore,
128+
nextRowOffset);
129+
130+
return true;
131+
} catch (Exception e) {
132+
throw new DatabricksSQLException(
133+
"Failed to fetch inline Arrow chunk " + nextChunkIndex + ": " + e.getMessage(),
134+
e,
135+
com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.SDK_CLIENT_ERROR);
136+
}
137+
}
138+
139+
@Override
140+
public ArrowResultChunk getChunk() {
141+
return currentChunk;
142+
}
143+
144+
@Override
145+
public void close() {
146+
isClosed = true;
147+
if (currentChunk != null) {
148+
currentChunk.releaseChunk();
149+
currentChunk = null;
150+
}
151+
}
152+
153+
@Override
154+
public long getRowCount() {
155+
return totalRowCount;
156+
}
157+
158+
@Override
159+
public long getChunkCount() {
160+
return currentChunkIndex + 1;
161+
}
162+
163+
@Override
164+
public boolean isClosed() {
165+
return isClosed;
166+
}
167+
168+
/** Decompresses attachment bytes and creates an ArrowResultChunk. */
169+
private ArrowResultChunk processResultData(ResultData resultData) throws DatabricksSQLException {
170+
byte[] attachment = resultData.getAttachment();
171+
if (attachment == null || attachment.length == 0) {
172+
throw new DatabricksSQLException(
173+
"No inline Arrow data (attachment) in result for chunk",
174+
com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode.RESULT_SET_ERROR);
175+
}
176+
177+
byte[] decompressedBytes =
178+
decompress(attachment, compressionCodec, "SEA inline Arrow chunk decompression");
179+
180+
long rowCount = resultData.getRowCount() != null ? resultData.getRowCount() : 0;
181+
return ArrowResultChunk.builder()
182+
.withInputStream(new ByteArrayInputStream(decompressedBytes), rowCount)
183+
.build();
184+
}
185+
}

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ public ImmutableSessionInfo createSession(
123123
if (schema != null) {
124124
request.setSchema(schema);
125125
}
126+
// Bounded SEA inline Arrow: tell server not to use cloud storage for results
127+
if (connectionContext.isBoundedSeaApiEnabled() && !connectionContext.isCloudFetchEnabled()) {
128+
if (sessionConf == null) {
129+
sessionConf = new java.util.HashMap<>();
130+
} else {
131+
sessionConf = new java.util.HashMap<>(sessionConf);
132+
}
133+
sessionConf.put("can_cloud_download", "false");
134+
}
126135
if (sessionConf != null && !sessionConf.isEmpty()) {
127136
request.setSessionConfigs(sessionConf);
128137
}
@@ -585,15 +594,27 @@ private ChunkLinkFetchResult buildChunkLinkFetchResult(Collection<ExternalLink>
585594
@Override
586595
public ResultData getResultChunksData(StatementId typedStatementId, long chunkIndex)
587596
throws DatabricksSQLException {
597+
return getResultChunksData(typedStatementId, chunkIndex, 0);
598+
}
599+
600+
/**
601+
* Fetches inline result data for a specific chunk, with row_offset for bounded SEA inline Arrow.
602+
*/
603+
public ResultData getResultChunksData(
604+
StatementId typedStatementId, long chunkIndex, long rowOffset) throws DatabricksSQLException {
588605
DatabricksThreadContextHolder.setStatementId(typedStatementId);
589606
String statementId = typedStatementId.toSQLExecStatementId();
590607
LOGGER.debug(
591-
"public ResultData getResultChunksData(String statementId = {}, long chunkIndex = {})",
608+
"getResultChunksData(statementId={}, chunkIndex={}, rowOffset={})",
592609
statementId,
593-
chunkIndex);
610+
chunkIndex,
611+
rowOffset);
594612
GetStatementResultChunkNRequest request =
595613
new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex);
596614
String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex);
615+
if (connectionContext.isBoundedSeaApiEnabled() && rowOffset > 0) {
616+
path = path + "?row_offset=" + rowOffset;
617+
}
597618
try {
598619
Request req = new Request(Request.GET, path, apiClient.serialize(request));
599620
req.withHeaders(getHeaders("getStatementResultN"));
@@ -717,17 +738,26 @@ private ExecuteStatementRequest getRequest(
717738
boolean executeAsync)
718739
throws SQLException {
719740
boolean cloudFetch = useCloudFetchForResult();
720-
Format format = cloudFetch ? Format.ARROW_STREAM : Format.JSON_ARRAY;
741+
// Bounded SEA inline Arrow: use ARROW_STREAM + INLINE_OR_EXTERNAL_LINKS even without CloudFetch
742+
boolean boundedSeaInline = connectionContext.isBoundedSeaApiEnabled() && !cloudFetch;
743+
Format format = (cloudFetch || boundedSeaInline) ? Format.ARROW_STREAM : Format.JSON_ARRAY;
721744
Disposition defaultDisposition =
722745
connectionContext.isSqlExecHybridResultsEnabled()
723746
? Disposition.INLINE_OR_EXTERNAL_LINKS
724747
: Disposition.EXTERNAL_LINKS;
725-
Disposition disposition = cloudFetch ? defaultDisposition : Disposition.INLINE;
748+
Disposition disposition;
749+
if (cloudFetch) {
750+
disposition = defaultDisposition;
751+
} else if (boundedSeaInline) {
752+
disposition = Disposition.INLINE_OR_EXTERNAL_LINKS;
753+
} else {
754+
disposition = Disposition.INLINE;
755+
}
726756
long maxRows =
727757
(parentStatement == null) ? DEFAULT_RESULT_ROW_LIMIT : parentStatement.getMaxRows();
728758
CompressionCodec compressionCodec = session.getCompressionCodec();
729-
if (disposition.equals(Disposition.INLINE)) {
730-
LOGGER.debug("Results are inline, skipping compression.");
759+
if (disposition.equals(Disposition.INLINE) && !boundedSeaInline) {
760+
LOGGER.debug("Results are inline JSON, skipping compression.");
731761
compressionCodec = CompressionCodec.NONE;
732762
}
733763
List<StatementParameterListItem> parameterListItems =

0 commit comments

Comments
 (0)