Skip to content

Commit e64310b

Browse files
In case of null links check for end-of-stream (#1138)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> NO_CHANGELOG=true In case of no/null links, check for end-of-stream. This will prevent driver making fetch calls to the server for closed statements. ## Testing <!-- Describe how the changes have been tested--> - manual testing - unit tests ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent 1120009 commit e64310b

2 files changed

Lines changed: 89 additions & 4 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ private static ChunkProvider createRemoteChunkProvider(
116116

117117
// Convert ExternalLinks to ChunkLinkFetchResult for the provider
118118
ChunkLinkFetchResult initialLinks =
119-
convertToChunkLinkFetchResult(resultData.getExternalLinks());
119+
convertToChunkLinkFetchResult(
120+
resultData.getExternalLinks(), resultManifest.getTotalChunkCount());
120121

121122
return new StreamingChunkProvider(
122123
linkFetcher,
@@ -377,11 +378,17 @@ private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
377378
* Converts a collection of ExternalLinks to a ChunkLinkFetchResult.
378379
*
379380
* @param externalLinks The external links to convert, may be null
380-
* @return A ChunkLinkFetchResult, or null if input is null or empty
381+
* @param totalChunkCount The total chunk count from result manifest, may be null
382+
* @return A ChunkLinkFetchResult, or endOfStream if chunk count is zero, or null if unknown
381383
*/
382384
private static ChunkLinkFetchResult convertToChunkLinkFetchResult(
383-
Collection<ExternalLink> externalLinks) {
385+
Collection<ExternalLink> externalLinks, Long totalChunkCount) {
384386
if (externalLinks == null || externalLinks.isEmpty()) {
387+
// If total chunk count is zero, return end of stream
388+
if (totalChunkCount != null && totalChunkCount == 0) {
389+
LOGGER.debug("Total chunk count is zero, returning end of stream");
390+
return ChunkLinkFetchResult.endOfStream();
391+
}
385392
return null;
386393
}
387394

@@ -396,6 +403,13 @@ private static ChunkLinkFetchResult convertToChunkLinkFetchResult(
396403
long nextFetchIndex = hasMore ? lastLink.getNextChunkIndex() : -1;
397404
long nextRowOffset = lastLink.getRowOffset() + lastLink.getRowCount();
398405

406+
LOGGER.debug(
407+
"Converting ExternalLinks to ChunkLinkFetchResult: linkCount={}, hasMore={}, nextFetchIndex={}, nextRowOffset={}",
408+
linkList.size(),
409+
hasMore,
410+
nextFetchIndex,
411+
nextRowOffset);
412+
399413
return ChunkLinkFetchResult.of(linkList, hasMore, nextFetchIndex, nextRowOffset);
400414
}
401415

@@ -406,12 +420,17 @@ private static ChunkLinkFetchResult convertToChunkLinkFetchResult(
406420
* ChunkLinkFetchResult format used by StreamingChunkProvider.
407421
*
408422
* @param resultsResp The Thrift fetch results response containing initial links
409-
* @return A ChunkLinkFetchResult, or null if no links
423+
* @return A ChunkLinkFetchResult, or endOfStream if no more rows, or null if unknown
410424
*/
411425
private static ChunkLinkFetchResult convertThriftLinksToChunkLinkFetchResult(
412426
TFetchResultsResp resultsResp) {
413427
List<TSparkArrowResultLink> resultLinks = resultsResp.getResults().getResultLinks();
414428
if (resultLinks == null || resultLinks.isEmpty()) {
429+
// If hasMoreRows is false, return end of stream
430+
if (!resultsResp.hasMoreRows) {
431+
LOGGER.debug("No result links and hasMoreRows is false, returning end of stream");
432+
return ChunkLinkFetchResult.endOfStream();
433+
}
415434
return null;
416435
}
417436

src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResultTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,4 +609,70 @@ private Object[][] createTestData(Schema schema, int rows) {
609609
}
610610
return data;
611611
}
612+
613+
// ==================== End of Stream Conversion Tests ====================
614+
615+
@Test
616+
public void testSeaEmptyLinksWithZeroChunkCountReturnsEndOfStream() throws Exception {
617+
// Enable StreamingChunkProvider
618+
Properties props = new Properties();
619+
props.setProperty("EnableStreamingChunkProvider", "1");
620+
IDatabricksConnectionContext connectionContext =
621+
DatabricksConnectionContextFactory.create(JDBC_URL, props);
622+
623+
assertTrue(connectionContext.isStreamingChunkProviderEnabled());
624+
625+
DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient);
626+
627+
// Create result manifest with zero chunks and empty external links
628+
ResultManifest resultManifest =
629+
new ResultManifest()
630+
.setTotalChunkCount(0L)
631+
.setTotalRowCount(0L)
632+
.setResultCompression(CompressionCodec.NONE)
633+
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L));
634+
635+
ResultData localResultData = new ResultData().setExternalLinks(new ArrayList<>());
636+
637+
// Should create result successfully with end-of-stream signal
638+
ArrowStreamResult result =
639+
new ArrowStreamResult(
640+
resultManifest, localResultData, STATEMENT_ID, localSession, mockHttpClient);
641+
642+
assertNotNull(result);
643+
assertFalse(result.hasNext(), "Empty result should have no data");
644+
assertDoesNotThrow(result::close);
645+
}
646+
647+
@Test
648+
public void testSeaNullLinksWithZeroChunkCountReturnsEndOfStream() throws Exception {
649+
// Enable StreamingChunkProvider
650+
Properties props = new Properties();
651+
props.setProperty("EnableStreamingChunkProvider", "1");
652+
IDatabricksConnectionContext connectionContext =
653+
DatabricksConnectionContextFactory.create(JDBC_URL, props);
654+
655+
assertTrue(connectionContext.isStreamingChunkProviderEnabled());
656+
657+
DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient);
658+
659+
// Create result manifest with zero chunks and null external links
660+
ResultManifest resultManifest =
661+
new ResultManifest()
662+
.setTotalChunkCount(0L)
663+
.setTotalRowCount(0L)
664+
.setResultCompression(CompressionCodec.NONE)
665+
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L));
666+
667+
ResultData localResultData = new ResultData().setExternalLinks(null);
668+
669+
// Should create result successfully with end-of-stream signal
670+
ArrowStreamResult result =
671+
new ArrowStreamResult(
672+
resultManifest, localResultData, STATEMENT_ID, localSession, mockHttpClient);
673+
674+
assertNotNull(result);
675+
assertFalse(result.hasNext(), "Empty result should have no data");
676+
assertDoesNotThrow(result::close);
677+
}
612678
}

0 commit comments

Comments
 (0)