Skip to content

Commit 5801928

Browse files
committed
Fix race condition in createChunkFromLink and non-atomic fetch position updates
P1-1: Use putIfAbsent in createChunkFromLink to prevent double row-count and chunk replacement when called concurrently from prefetch and download threads. Without this, a race could leave a CompletableFuture that is never completed, causing consumer hangs. P1-2: Bundle nextLinkFetchIndex + nextRowOffsetToFetch into an immutable FetchPosition holder updated via volatile reference. This ensures the prefetch thread always reads a consistent (index, rowOffset) pair, which matters for bounded SEA API where row_offset is used by the server. P2-1: Bump reconciliation failure log from DEBUG to WARN for production visibility. Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1 parent 1fa1b2b commit 5801928

1 file changed

Lines changed: 47 additions & 27 deletions

File tree

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@
4747
*/
4848
public class StreamingChunkProvider implements ChunkProvider {
4949

50+
/** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */
51+
private static final class FetchPosition {
52+
final long chunkIndex;
53+
final long rowOffset;
54+
55+
FetchPosition(long chunkIndex, long rowOffset) {
56+
this.chunkIndex = chunkIndex;
57+
this.rowOffset = rowOffset;
58+
}
59+
}
60+
5061
private static final JdbcLogger LOGGER =
5162
JdbcLoggerFactory.getLogger(StreamingChunkProvider.class);
5263
private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-";
@@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider {
7586
// - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency
7687
private final AtomicLong currentChunkIndex = new AtomicLong(-1);
7788
private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1);
78-
private volatile long nextLinkFetchIndex = 0;
79-
private volatile long nextRowOffsetToFetch = 0;
89+
// Bundled as an immutable pair for atomic reads/writes across threads.
90+
// The prefetch thread reads this (fetchNextLinkBatch) while the download thread
91+
// may update it (getRefreshedLink reconciliation). A volatile reference to an
92+
// immutable holder ensures both fields are always read consistently.
93+
private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0);
8094
private final AtomicLong nextDownloadIndex = new AtomicLong(0);
8195

8296
// State flags
@@ -347,11 +361,11 @@ private void linkPrefetchLoop() {
347361
long targetIndex = currentChunkIndex.get() + linkPrefetchWindow;
348362

349363
// Wait if we're caught up
350-
while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) {
364+
while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) {
351365
if (closed) break;
352366
LOGGER.debug(
353367
"Prefetch caught up, waiting for consumer. next={}, target={}",
354-
nextLinkFetchIndex,
368+
nextFetchPosition.chunkIndex,
355369
targetIndex);
356370
consumerAdvanced.await();
357371
targetIndex = currentChunkIndex.get() + linkPrefetchWindow;
@@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException {
396410
return;
397411
}
398412

413+
FetchPosition pos = nextFetchPosition;
399414
LOGGER.debug(
400415
"Fetching links starting from index {}, row offset {} for statement {}",
401-
nextLinkFetchIndex,
402-
nextRowOffsetToFetch,
416+
pos.chunkIndex,
417+
pos.rowOffset,
403418
statementId);
404419

405-
ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch);
420+
ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset);
406421

407422
if (result.isEndOfStream()) {
408423
LOGGER.info("End of stream reached for statement {}", statementId);
@@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException {
415430
createChunkFromLink(link);
416431
}
417432

418-
// Update next fetch positions
433+
// Update next fetch position atomically
419434
if (result.hasMore()) {
420-
nextLinkFetchIndex = result.getNextFetchIndex();
421-
nextRowOffsetToFetch = result.getNextRowOffset();
435+
nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
422436
} else {
423437
endOfStreamReached = true;
424438
LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId);
@@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
450464
createChunkFromLink(link);
451465
}
452466

453-
// Set next fetch positions using unified API
467+
// Set next fetch position atomically
454468
if (initialLinks.hasMore()) {
455-
nextLinkFetchIndex = initialLinks.getNextFetchIndex();
456-
nextRowOffsetToFetch = initialLinks.getNextRowOffset();
469+
FetchPosition pos =
470+
new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset());
471+
nextFetchPosition = pos;
457472
LOGGER.debug(
458473
"Next fetch position set to chunk index {}, row offset {} from initial links",
459-
nextLinkFetchIndex,
460-
nextRowOffsetToFetch);
474+
pos.chunkIndex,
475+
pos.rowOffset);
461476
} else {
462477
endOfStreamReached = true;
463478
LOGGER.info("End of stream reached from initial links for statement {}", statementId);
@@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
471486
*/
472487
private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException {
473488
long chunkIndex = link.getChunkIndex();
474-
if (chunks.containsKey(chunkIndex)) {
475-
LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
476-
return;
477-
}
478-
479489
long rowCount = link.getRowCount();
480490
long rowOffset = link.getRowOffset();
481491

@@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce
488498
.build();
489499

490500
chunk.setChunkLink(link);
491-
chunks.put(chunkIndex, chunk);
501+
502+
// Atomic insert — if another thread already created this chunk, skip.
503+
// This is safe because createChunkFromLink can be called concurrently from
504+
// the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink).
505+
ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk);
506+
if (existing != null) {
507+
LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
508+
return;
509+
}
510+
492511
highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex));
493512
totalRowCount.addAndGet(rowCount);
494513

@@ -617,21 +636,22 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
617636
try {
618637
createChunkFromLink(link);
619638
} catch (Exception e) {
620-
LOGGER.debug(
639+
LOGGER.warn(
621640
"Failed to create chunk {} from refresh response: {}",
622641
link.getChunkIndex(),
623642
e.getMessage());
624643
}
625644
}
626645
}
627646

628-
// Update end-of-stream and prefetch index from refresh response
647+
// Update end-of-stream and prefetch position from refresh response
629648
if (!result.hasMore()) {
630649
endOfStreamReached = true;
631-
} else if (result.getNextFetchIndex() > nextLinkFetchIndex) {
632-
// Avoid re-fetching chunks that the refresh already discovered
633-
nextLinkFetchIndex = result.getNextFetchIndex();
634-
nextRowOffsetToFetch = result.getNextRowOffset();
650+
} else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) {
651+
// Avoid re-fetching chunks that the refresh already discovered.
652+
// Atomic update of both fields via immutable holder.
653+
nextFetchPosition =
654+
new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
635655
}
636656

637657
// Trigger downloads for any newly-created chunks

0 commit comments

Comments
 (0)