Skip to content

Commit 24ba8b0

Browse files
[PECOBLR-1131] Fix incorrect refetching of expired CloudFetch links when using Thrift protocol. (#1066)
## Description [Thrift protocol](https://github.com/databricks-eng/runtime/blob/master/sql/hive-thriftserver/if/TCLIService.thrift#L2186) has a orientation field with values FETCH_NEXT, FETCH_PRIOR or FETCH_FIRST. This field is always set to FETCH_NEXT resulting in incorrect refetch. To fetch from a particular chunk index the Thrift protocol requires the start row offset to be set. The chunk index and start row offset information is available from the expired links. Use the start row offset to fetch the links in the Thrift protocol. ## Testing This fix is tested with an integration test that validates that the correct links are fetched when fetching from a pair of chunk index and start row offset. There are also unit tests to validate correct client behaviour when unexpected responses are received from the server. ## Additional Notes to the Reviewer I also made some changes to the validation of the results. Commented within the PR. --------- Co-authored-by: tejassp-db <> Co-authored-by: Samikshya Chand <148681192+samikshya-db@users.noreply.github.com>
1 parent 7d52002 commit 24ba8b0

39 files changed

Lines changed: 1159 additions & 153 deletions

File tree

.github/workflows/prIntegrationTests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
include:
1717
# SQL_EXEC mode: Tests SEA client behavior
1818
# Note: CircuitBreakerIntegrationTests requires THRIFT_SERVER mode (tested in second matrix entry)
19-
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!M2MAuthIntegrationTests,!CircuitBreakerIntegrationTests
19+
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!M2MAuthIntegrationTests,!CircuitBreakerIntegrationTests,!ThriftCloudFetchFakeIntegrationTests
2020
fake-service-type: 'SQL_EXEC'
2121
# THRIFT_SERVER mode: Tests Thrift client behavior and circuit breaker fallback
2222
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests,!SqlExecApiIntegrationTests

.github/workflows/runIntegrationTests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: Integration Tests Workflow - Main Branch
22

33
on:
44
push:
5-
branches: [main]
5+
branches: [ main ]
66

77
jobs:
88
build-and-test:
@@ -14,7 +14,7 @@ jobs:
1414
strategy:
1515
matrix:
1616
include:
17-
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!CircuitBreakerIntegrationTests
17+
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!CircuitBreakerIntegrationTests,!ThriftCloudFetchFakeIntegrationTests
1818
token-secret: DATABRICKS_TOKEN
1919
fake-service-type: 'SQL_EXEC'
2020
- test-command: mvn -B compile test -Dtest=*IntegrationTests,!M2MPrivateKeyCredentialsIntegrationTests,!SqlExecApiHybridResultsIntegrationTests,!DBFSVolumeIntegrationTests,!M2MAuthIntegrationTests,!UCVolumeIntegrationTests,!SqlExecApiIntegrationTests

NEXT_CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
### Updated
99

1010
### Fixed
11+
12+
- [PECOBLR-1131] Fix incorrect refetching of expired CloudFetch links when using Thrift protocol.
1113
- Fixed logging to respect params when the driver is shaded.
14+
1215
---
13-
*Note: When making changes, please add your change under the appropriate section with a brief description.*
16+
*Note: When making changes, please add your change under the appropriate section
17+
with a brief description.*

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.databricks.jdbc.api.IDatabricksResultSet;
77
import com.databricks.jdbc.api.IExecutionStatus;
88
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
9+
import com.databricks.jdbc.api.impl.arrow.ChunkProvider;
910
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
1011
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
1112
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
@@ -41,6 +42,7 @@
4142
import java.util.Calendar;
4243
import java.util.List;
4344
import java.util.Map;
45+
import java.util.Optional;
4446
import java.util.function.Supplier;
4547
import org.apache.http.entity.InputStreamEntity;
4648

@@ -2008,6 +2010,14 @@ private BigDecimal applyScaleToBigDecimal(BigDecimal bigDecimal, int columnIndex
20082010
return bigDecimal.setScale(scale, RoundingMode.HALF_UP);
20092011
}
20102012

2013+
@VisibleForTesting
2014+
public Optional<ChunkProvider> getChunkProvider() {
2015+
if (executionResult instanceof ArrowStreamResult) {
2016+
return Optional.ofNullable(((ArrowStreamResult) executionResult).getChunkProvider());
2017+
}
2018+
return Optional.empty();
2019+
}
2020+
20112021
@Override
20122022
public String toString() {
20132023
return (new ToStringer(DatabricksResultSet.class))

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ public Long getChunkIndex() {
117117
}
118118

119119
/**
120-
* Returns the starting row offset for this chunk.
120+
* Returns the start row offset of this chunk in the overall result set.
121121
*
122-
* @return the row offset
122+
* @return row offset
123123
*/
124-
public long getRowOffset() {
124+
public long getStartRowOffset() {
125125
return rowOffset;
126126
}
127127

@@ -156,6 +156,10 @@ public boolean releaseChunk() {
156156
return true;
157157
}
158158

159+
public ExternalLink getChunkLink() {
160+
return chunkLink;
161+
}
162+
159163
/**
160164
* Sets the external link details for this chunk.
161165
*

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractRemoteChunkProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ public long getAllowedChunksInMemory() {
215215
return allowedChunksInMemory;
216216
}
217217

218+
public T getChunkByIndex(long chunkIndex) {
219+
return chunkIndexToChunksMap.get(chunkIndex);
220+
}
221+
218222
/** Subclasses should override this method to perform their specific cleanup. */
219223
protected void doClose() {
220224
// Default implementation does nothing

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,16 @@ public long getChunkCount() {
364364
return chunkProvider.getChunkCount();
365365
}
366366

367+
/**
368+
* Returns the chunk provider for testing purposes.
369+
*
370+
* @return the chunk provider
371+
*/
372+
@VisibleForTesting
373+
public ChunkProvider getChunkProvider() {
374+
return chunkProvider;
375+
}
376+
367377
private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
368378
columnInfos = new ArrayList<>();
369379
if (resultManifest.getSchema() == null) {

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkLinkDownloadService.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,18 @@ private void triggerNextBatchDownload() {
215215
return;
216216
}
217217

218+
// Calculate row offset for this batch
219+
final long batchStartRowOffset = getChunkStartRowOffset(batchStartIndex);
220+
218221
LOGGER.info("Starting batch download from index {}", batchStartIndex);
219222
currentDownloadTask =
220223
CompletableFuture.runAsync(
221224
() -> {
222225
try {
223-
// rowOffset is 0 here as this service is used by RemoteChunkProvider (SEA-only)
224-
// which fetches by chunkIndex, not rowOffset
225226
ChunkLinkFetchResult result =
226-
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex, 0);
227+
session
228+
.getDatabricksClient()
229+
.getResultChunks(statementId, batchStartIndex, batchStartRowOffset);
227230
LOGGER.info(
228231
"Retrieved {} links for batch starting at {} for statement id {}",
229232
result.getChunkLinks().size(),
@@ -419,6 +422,28 @@ private void prepareNewBatchDownload(long startIndex) {
419422
isDownloadChainStarted.set(false);
420423
}
421424

425+
/**
426+
* Gets the start row offset for a given chunk index.
427+
*
428+
* @param chunkIndex the chunk index to get the row offset for
429+
* @return the start row offset for the chunk
430+
*/
431+
private long getChunkStartRowOffset(long chunkIndex) {
432+
T chunk = chunkIndexToChunksMap.get(chunkIndex);
433+
if (chunk == null) {
434+
// Should never happen.
435+
throw new IllegalStateException(
436+
"Chunk not found in map for index "
437+
+ chunkIndex
438+
+ ". "
439+
+ "Total chunks: "
440+
+ totalChunks
441+
+ ", StatementId: "
442+
+ statementId);
443+
}
444+
return chunk.getStartRowOffset();
445+
}
446+
422447
private boolean isChunkLinkExpired(ExternalLink link) {
423448
if (link == null || link.getExpiration() == null) {
424449
LOGGER.warn("Link or expiration is null, assuming link is expired");

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public Void call() throws DatabricksSQLException {
5353
if (chunk.isChunkLinkInvalid()) {
5454
LOGGER.debug("Link invalid for chunk {}, refetching", chunk.getChunkIndex());
5555
ExternalLink freshLink =
56-
linkFetcher.refetchLink(chunk.getChunkIndex(), chunk.getRowOffset());
56+
linkFetcher.refetchLink(chunk.getChunkIndex(), chunk.getStartRowOffset());
5757
chunk.setChunkLink(freshLink);
5858
}
5959

src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public static void verifySuccessStatus(TStatus status, String errorContext, Stri
9494
"Error thrift response received [%s] for statementId [%s]",
9595
errorContext, statementId)
9696
: String.format("Error thrift response received [%s]", errorContext);
97-
LOGGER.error(errorMessage);
9897
throw new DatabricksHttpException(errorMessage, status.getSqlState());
9998
}
10099
}

0 commit comments

Comments
 (0)