Skip to content

Commit ebe4f82

Browse files
authored
Fix: For fetching multiple chunks of Json chunked results (#1074)
# Fix: Fetch and assemble JSON_ARRAY results across multiple chunks ## Summary This PR fixes an issue where JSON_ARRAY-formatted results that are split across multiple chunks were not fully retrieved by the driver. It adds a dedicated JsonChunkProvider to fetch and combine chunked JSON results, wires it into InlineJsonResult, adds a client API to fetch chunk data, and includes comprehensive unit tests. ## Motivation Previously InlineJsonResult only used the initial response data and ignored any additional chunks. For results returned in the `JSON_ARRAY` format that span multiple chunks, the driver returned incomplete datasets. This produced incorrect query results for callers when responses were chunked by the backend. ## What this change does - Adds `JsonChunkProvider` (new) - Fetches and combines JSON chunks for a statement into a single in-memory List<List<Object>>. ## Tests and validation - Unit tests: run `JsonChunkProviderTest` and the full test suite. - Integration/manual validation:
1 parent 9a5e7c9 commit ebe4f82

21 files changed

Lines changed: 676 additions & 22 deletions

.github/workflows/prIntegrationTests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
include:
1717
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!M2MAuthIntegrationTests
1818
fake-service-type: 'SQL_EXEC'
19-
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests
19+
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests,!SqlExecApiIntegrationTests
2020
fake-service-type: 'THRIFT_SERVER'
2121
steps:
2222
- name: Checkout PR

.github/workflows/runIntegrationTests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
- test-command: mvn -B compile test -Dtest=*IntegrationTests
1818
token-secret: DATABRICKS_TOKEN
1919
fake-service-type: 'SQL_EXEC'
20-
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests
20+
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests,!SqlExecApiIntegrationTests
2121
token-secret: THRIFT_DATABRICKS_TOKEN
2222
fake-service-type: 'THRIFT_SERVER'
2323
steps:

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717
- Fix: driver failing to parse complex data types with nullable attributes.
1818
- Fixed: Resolved SDK token-caching regression causing token refresh on every call. SDK is now configured once to avoid excessive token endpoint hits and rate limiting.
1919
- Fixed: TimestampConverter.toString() returning ISO8601 format with timezone conversion instead of SQL standard format.
20+
- Fixed: Driver not loading complete JSON result in the case of SEA Inline without Arrow
2021
---
2122
*Note: When making changes, please add your change under the appropriate section with a brief description.*

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static IExecutionResult getResultHandler(
5757
return new ArrowStreamResult(manifest, data, statementId, session);
5858
case JSON_ARRAY:
5959
// This is used for metadata and update commands
60-
return new InlineJsonResult(manifest, data);
60+
return new InlineJsonResult(manifest, data, statementId, session);
6161
default:
6262
String errorMessage = String.format("Invalid response format %s", manifest.getFormat());
6363
LOGGER.error(errorMessage);

src/main/java/com/databricks/jdbc/api/impl/InlineJsonResult.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

3+
import com.databricks.jdbc.api.internal.IDatabricksSession;
4+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
35
import com.databricks.jdbc.exception.DatabricksSQLException;
46
import com.databricks.jdbc.model.core.ResultData;
57
import com.databricks.jdbc.model.core.ResultManifest;
@@ -11,11 +13,22 @@ public class InlineJsonResult implements IExecutionResult {
1113

1214
private long currentRow;
1315
private List<List<Object>> data;
14-
16+
private JsonChunkProvider chunkProvider;
1517
private boolean isClosed;
1618

17-
public InlineJsonResult(ResultManifest resultManifest, ResultData resultData) {
18-
this(getDataList(resultData.getDataArray()));
19+
public InlineJsonResult(
20+
ResultManifest resultManifest,
21+
ResultData resultData,
22+
StatementId statementId,
23+
IDatabricksSession session)
24+
throws DatabricksSQLException {
25+
26+
this.chunkProvider = new JsonChunkProvider(resultManifest, resultData, statementId, session);
27+
// Fetching data all at once as the data is at most 26Mb in total (SEA)
28+
this.data = chunkProvider.getAllData();
29+
30+
this.currentRow = -1;
31+
this.isClosed = false;
1932
}
2033

2134
public InlineJsonResult(Object[][] rows) {
@@ -31,21 +44,6 @@ public InlineJsonResult(List<List<Object>> rows) {
3144
this.isClosed = false;
3245
}
3346

34-
private static List<List<Object>> getDataList(Collection<Collection<String>> dataArray) {
35-
if (dataArray == null) {
36-
return new ArrayList<>();
37-
}
38-
List<List<Object>> dataList = new ArrayList<>();
39-
for (Collection<String> innerCollection : dataArray) {
40-
if (innerCollection == null) {
41-
dataList.add(Collections.emptyList());
42-
} else {
43-
dataList.add(new ArrayList<>(innerCollection));
44-
}
45-
}
46-
return dataList;
47-
}
48-
4947
@Override
5048
public Object getObject(int columnIndex) throws DatabricksSQLException {
5149
if (isClosed()) {
@@ -86,6 +84,9 @@ public boolean hasNext() {
8684
public void close() {
8785
this.isClosed = true;
8886
this.data = null;
87+
if (chunkProvider != null) {
88+
chunkProvider.close();
89+
}
8990
}
9091

9192
@Override
@@ -95,7 +96,7 @@ public long getRowCount() {
9596

9697
@Override
9798
public long getChunkCount() {
98-
return 0;
99+
return chunkProvider.getChunkCount();
99100
}
100101

101102
private boolean isClosed() {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package com.databricks.jdbc.api.impl;
2+
3+
import com.databricks.jdbc.api.internal.IDatabricksSession;
4+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
5+
import com.databricks.jdbc.exception.DatabricksSQLException;
6+
import com.databricks.jdbc.log.JdbcLogger;
7+
import com.databricks.jdbc.log.JdbcLoggerFactory;
8+
import com.databricks.jdbc.model.core.ResultData;
9+
import com.databricks.jdbc.model.core.ResultManifest;
10+
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
11+
import java.util.ArrayList;
12+
import java.util.Collection;
13+
import java.util.List;
14+
import java.util.Objects;
15+
16+
/**
17+
* Chunk provider for JSON_ARRAY format results that handles multiple chunks of data.
18+
*
19+
* <p>This provider fetches and combines data from multiple JSON chunks when the result is split
20+
* across multiple chunks. It follows the same pattern as Arrow chunk providers but works with JSON
21+
* data arrays instead of Arrow streams.
22+
*/
23+
public class JsonChunkProvider {
24+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(JsonChunkProvider.class);
25+
26+
private final StatementId statementId;
27+
private final IDatabricksSession session;
28+
private final ResultManifest resultManifest;
29+
private final List<List<Object>> allData;
30+
private boolean isClosed = false;
31+
32+
/**
33+
* Creates a new JsonChunkProvider.
34+
*
35+
* @param resultManifest the result manifest containing chunk information
36+
* @param initialResultData the first chunk of result data
37+
* @param statementId the statement ID for fetching additional chunks
38+
* @param session the session for making API calls
39+
* @throws DatabricksSQLException if there's an error processing the initial data or fetching
40+
* chunks
41+
*/
42+
public JsonChunkProvider(
43+
ResultManifest resultManifest,
44+
ResultData initialResultData,
45+
StatementId statementId,
46+
IDatabricksSession session)
47+
throws DatabricksSQLException {
48+
this.resultManifest = resultManifest;
49+
this.statementId = statementId;
50+
this.session = session;
51+
this.allData = new ArrayList<>();
52+
53+
// Process all chunks
54+
fetchAndCombineAllChunks(initialResultData);
55+
}
56+
57+
/**
58+
* Fetches and combines data from all chunks.
59+
*
60+
* @param initialResultData the first chunk of data
61+
* @throws DatabricksSQLException if there's an error fetching additional chunks
62+
*/
63+
private void fetchAndCombineAllChunks(ResultData initialResultData)
64+
throws DatabricksSQLException {
65+
LOGGER.debug("Starting to fetch and combine JSON chunks for statement: {}", statementId);
66+
67+
// Add data from the first chunk
68+
addDataFromChunk(initialResultData);
69+
70+
// Check if there are more chunks to fetch
71+
Long totalChunkCount = Objects.requireNonNullElse(resultManifest.getTotalChunkCount(), 0L);
72+
if (totalChunkCount > 1) {
73+
LOGGER.debug("Total chunks to fetch: {}", totalChunkCount);
74+
75+
// Fetch remaining chunks (starting from chunk index 1, since we already have chunk 0)
76+
for (long chunkIndex = 1; chunkIndex < totalChunkCount; chunkIndex++) {
77+
if (isClosed) {
78+
LOGGER.warn("Provider closed while fetching chunk {}", chunkIndex);
79+
break;
80+
}
81+
82+
try {
83+
LOGGER.debug("Fetching chunk {} of {}", chunkIndex, totalChunkCount);
84+
ResultData chunkData =
85+
session.getDatabricksClient().getResultChunksData(statementId, chunkIndex);
86+
addDataFromChunk(chunkData);
87+
} catch (DatabricksSQLException e) {
88+
LOGGER.error("Failed to fetch chunk {} for statement {}", chunkIndex, statementId, e);
89+
throw new DatabricksSQLException(
90+
String.format("Failed to fetch chunk %d: %s", chunkIndex, e.getMessage()),
91+
e,
92+
DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR);
93+
}
94+
}
95+
}
96+
97+
LOGGER.debug(
98+
"Successfully combined {} total rows from {} chunks", allData.size(), totalChunkCount);
99+
}
100+
101+
/**
102+
* Adds data from a single chunk to the combined data list.
103+
*
104+
* @param chunkData the chunk data to add
105+
*/
106+
private void addDataFromChunk(ResultData chunkData) {
107+
if (chunkData == null || chunkData.getDataArray() == null) {
108+
LOGGER.debug("Chunk data or data array is null, skipping");
109+
return;
110+
}
111+
112+
Collection<Collection<String>> dataArray = chunkData.getDataArray();
113+
LOGGER.debug(
114+
"Adding {} rows from chunk {}",
115+
dataArray.size(),
116+
chunkData.getChunkIndex() != null ? chunkData.getChunkIndex() : "unknown");
117+
118+
for (Collection<String> row : dataArray) {
119+
if (row != null) {
120+
allData.add(new ArrayList<>(row));
121+
} else {
122+
allData.add(new ArrayList<>());
123+
}
124+
}
125+
}
126+
127+
/**
128+
* Gets the combined data from all chunks.
129+
*
130+
* @return the list of all rows from all chunks
131+
*/
132+
public List<List<Object>> getAllData() {
133+
return allData;
134+
}
135+
136+
/**
137+
* Gets the total number of chunks that were processed.
138+
*
139+
* @return the chunk count
140+
*/
141+
public long getChunkCount() {
142+
LOGGER.debug("Getting total chunk count");
143+
return Objects.requireNonNullElse(resultManifest.getTotalChunkCount(), 0L);
144+
}
145+
146+
/** Closes the provider and releases resources. */
147+
public void close() {
148+
isClosed = true;
149+
allData.clear();
150+
LOGGER.debug("JsonChunkProvider closed for statement: {}", statementId);
151+
}
152+
153+
/**
154+
* Checks if the provider is closed.
155+
*
156+
* @return true if closed, false otherwise
157+
*/
158+
public boolean isClosed() {
159+
return isClosed;
160+
}
161+
}

src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.databricks.jdbc.exception.DatabricksSQLException;
1111
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
1212
import com.databricks.jdbc.model.core.ExternalLink;
13+
import com.databricks.jdbc.model.core.ResultData;
1314
import com.databricks.jdbc.telemetry.latency.DatabricksMetricsTimed;
1415
import com.databricks.sdk.core.DatabricksConfig;
1516
import java.sql.SQLException;
@@ -122,6 +123,16 @@ DatabricksResultSet getStatementResult(
122123
Collection<ExternalLink> getResultChunks(StatementId statementId, long chunkIndex)
123124
throws DatabricksSQLException;
124125

126+
/**
127+
* Fetches the result data for given chunk index and statement-Id.
128+
*
129+
* @param statementId statement-Id for which chunk data should be fetched
130+
* @param chunkIndex chunkIndex for which chunk data should be fetched
131+
* @return ResultData containing the chunk's data array and metadata
132+
*/
133+
ResultData getResultChunksData(StatementId statementId, long chunkIndex)
134+
throws DatabricksSQLException;
135+
125136
IDatabricksConnectionContext getConnectionContext();
126137

127138
/**

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,29 @@ public Collection<ExternalLink> getResultChunks(StatementId typedStatementId, lo
432432
}
433433
}
434434

435+
@Override
436+
public ResultData getResultChunksData(StatementId typedStatementId, long chunkIndex)
437+
throws DatabricksSQLException {
438+
DatabricksThreadContextHolder.setStatementId(typedStatementId);
439+
String statementId = typedStatementId.toSQLExecStatementId();
440+
LOGGER.debug(
441+
"public ResultData getResultChunksData(String statementId = {}, long chunkIndex = {})",
442+
statementId,
443+
chunkIndex);
444+
GetStatementResultChunkNRequest request =
445+
new GetStatementResultChunkNRequest().setStatementId(statementId).setChunkIndex(chunkIndex);
446+
String path = String.format(RESULT_CHUNK_PATH, statementId, chunkIndex);
447+
try {
448+
Request req = new Request(Request.GET, path, apiClient.serialize(request));
449+
req.withHeaders(getHeaders("getStatementResultN"));
450+
return apiClient.execute(req, ResultData.class);
451+
} catch (IOException e) {
452+
String errorMessage = "Error while processing the get result chunk request";
453+
LOGGER.error(errorMessage, e);
454+
throw new DatabricksSQLException(errorMessage, e, DatabricksDriverErrorCode.SDK_CLIENT_ERROR);
455+
}
456+
}
457+
435458
@Override
436459
public synchronized void resetAccessToken(String newAccessToken) {
437460
this.clientConfigurator.resetAccessTokenInConfig(newAccessToken);

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.databricks.jdbc.log.JdbcLoggerFactory;
3030
import com.databricks.jdbc.model.client.thrift.generated.*;
3131
import com.databricks.jdbc.model.core.ExternalLink;
32+
import com.databricks.jdbc.model.core.ResultData;
3233
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
3334
import com.databricks.sdk.core.DatabricksConfig;
3435
import com.google.common.annotations.VisibleForTesting;
@@ -327,6 +328,14 @@ public Collection<ExternalLink> getResultChunks(StatementId statementId, long ch
327328
return externalLinks;
328329
}
329330

331+
@Override
332+
public ResultData getResultChunksData(StatementId statementId, long chunkIndex)
333+
throws DatabricksSQLException {
334+
throw new DatabricksSQLException(
335+
"getResultChunksData method is not yet implemented for thrift client",
336+
DatabricksDriverErrorCode.INVALID_STATE);
337+
}
338+
330339
@Override
331340
public DatabricksResultSet listTypeInfo(IDatabricksSession session) {
332341
LOGGER.debug("public ResultSet getTypeInfo()");

0 commit comments

Comments
 (0)