Skip to content

Commit b069b4e

Browse files
gopalldbclaude
andauthored
[ES-1765150] Coalesce concurrent expired-link refreshes into single batch RPC (#1277)
## Summary - When multiple cloud fetch download threads detect expired presigned URLs simultaneously, they each made independent FetchResults RPCs, causing thread pool exhaustion under high concurrency - Introduces a centralized coalescing mechanism in `StreamingChunkProvider.getRefreshedLink()` that serializes refresh requests under a lock, performs a single batch `fetchLinks()` call from the lowest expired chunk offset, and distributes fresh links to all expired chunks - Adds `LinkRefresher` functional interface to decouple download tasks from the coalescing logic, with fallback to single `refetchLink()` when the batch doesn't include the target chunk ## Test plan - [x] `StreamingChunkDownloadTaskTest` — 6/6 pass (uses `LinkRefresher` mock) - [x] `StreamingChunkProviderTest$LinkExpiryTests` — 9/9 pass (verify batch coalescing behavior) - [ ] Manual verification with high-concurrency cloud fetch workload to confirm reduced FetchResults RPC count 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c724642 commit b069b4e

6 files changed

Lines changed: 266 additions & 91 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- Fixed socket leak in SDK HTTP client that prevented CRaC checkpointing. The SDK's connection pool was not shut down on `connection.close()`, leaving TCP sockets open.
2121
- Fixed Date fields within complex types (ARRAY, STRUCT, MAP) being returned as epoch day integers instead of proper date values.
2222
- Fixed `DatabaseMetaData.getColumns()` returning the column type name in `COLUMN_DEF` for columns with no default value. `COLUMN_DEF` now correctly returns `null` per the JDBC specification.
23+
- Coalesce concurrent expired cloud fetch link refreshes into a single batch FetchResults RPC to prevent thread pool exhaustion under high concurrency.
2324

2425
---
2526
*Note: When making changes, please add your change under the appropriate section
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.databricks.jdbc.api.impl.arrow;
2+
3+
import com.databricks.jdbc.model.core.ExternalLink;
4+
import java.sql.SQLException;
5+
6+
/**
7+
* Callback interface for refreshing expired chunk links. Used by {@link StreamingChunkDownloadTask}
8+
* to delegate link refresh to {@link StreamingChunkProvider}, which coalesces concurrent refresh
9+
* requests into a single batch RPC.
10+
*/
11+
@FunctionalInterface
12+
interface LinkRefresher {
13+
/**
14+
* Refreshes an expired link for the given chunk.
15+
*
16+
* @param chunkIndex The chunk index whose link has expired
17+
* @param rowOffset The row offset of the chunk (used by Thrift for FETCH_ABSOLUTE)
18+
* @return The refreshed ExternalLink with a new expiration time
19+
* @throws SQLException if the refresh operation fails
20+
*/
21+
ExternalLink refreshLink(long chunkIndex, long rowOffset) throws SQLException;
22+
}

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTask.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
import com.databricks.jdbc.exception.DatabricksSQLException;
66
import com.databricks.jdbc.log.JdbcLogger;
77
import com.databricks.jdbc.log.JdbcLoggerFactory;
8-
import com.databricks.jdbc.model.core.ExternalLink;
98
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
109
import java.io.IOException;
1110
import java.sql.SQLException;
1211
import java.util.concurrent.Callable;
1312

1413
/**
15-
* A download task for streaming chunk provider. Simpler than ChunkDownloadTask - uses
16-
* ChunkLinkFetcher directly for link refresh instead of ChunkLinkDownloadService.
14+
* A download task for streaming chunk provider. Uses a {@link LinkRefresher} callback for link
15+
* refresh, which enables the {@link StreamingChunkProvider} to coalesce concurrent expired-link
16+
* refreshes into a single batch RPC.
1717
*/
1818
public class StreamingChunkDownloadTask implements Callable<Void> {
1919

@@ -26,19 +26,19 @@ public class StreamingChunkDownloadTask implements Callable<Void> {
2626
private final ArrowResultChunk chunk;
2727
private final IDatabricksHttpClient httpClient;
2828
private final CompressionCodec compressionCodec;
29-
private final ChunkLinkFetcher linkFetcher;
29+
private final LinkRefresher linkRefresher;
3030
private final double cloudFetchSpeedThreshold;
3131

3232
public StreamingChunkDownloadTask(
3333
ArrowResultChunk chunk,
3434
IDatabricksHttpClient httpClient,
3535
CompressionCodec compressionCodec,
36-
ChunkLinkFetcher linkFetcher,
36+
LinkRefresher linkRefresher,
3737
double cloudFetchSpeedThreshold) {
3838
this.chunk = chunk;
3939
this.httpClient = httpClient;
4040
this.compressionCodec = compressionCodec;
41-
this.linkFetcher = linkFetcher;
41+
this.linkRefresher = linkRefresher;
4242
this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold;
4343
}
4444

@@ -50,12 +50,12 @@ public Void call() throws DatabricksSQLException {
5050
try {
5151
while (!downloadSuccessful) {
5252
try {
53-
// Check if link is expired and refresh if needed
53+
// Check if link is expired and refresh if needed.
54+
// The LinkRefresher (StreamingChunkProvider.getRefreshedLink) updates the chunk's
55+
// link directly under the refetchLock, so we don't need to set it here.
5456
if (chunk.isChunkLinkInvalid()) {
5557
LOGGER.debug("Link invalid for chunk {}, refetching", chunk.getChunkIndex());
56-
ExternalLink freshLink =
57-
linkFetcher.refetchLink(chunk.getChunkIndex(), chunk.getStartRowOffset());
58-
chunk.setChunkLink(freshLink);
58+
linkRefresher.refreshLink(chunk.getChunkIndex(), chunk.getStartRowOffset());
5959
}
6060

6161
// Perform the download

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public class StreamingChunkProvider implements ChunkProvider {
9999
// that must be consistent within the loop.
100100
private final ReentrantLock downloadLock = new ReentrantLock();
101101

102+
// Synchronization for coalescing concurrent expired-link refreshes.
103+
// When multiple download threads detect expired links simultaneously, only one thread
104+
// performs the actual batch FetchResults RPC while others wait and reuse the result.
105+
private final Object refetchLock = new Object();
106+
102107
// Executors
103108
private final ExecutorService downloadExecutor;
104109
private final Thread linkPrefetchThread;
@@ -511,12 +516,112 @@ private void triggerDownloads() {
511516
}
512517
}
513518

519+
/**
520+
* Coalesces concurrent expired-link refresh requests into a single batch RPC.
521+
*
522+
* <p>When multiple download threads detect expired links simultaneously, the first thread to
523+
* acquire the lock performs a single {@link ChunkLinkFetcher#fetchLinks} call from the lowest
524+
* expired chunk offset. The response refreshes all expired in-memory chunks. Subsequent threads
525+
* find their chunks already refreshed via the double-check pattern.
526+
*
527+
* <p>If the batch response does not include the requested chunk, falls back to a single {@link
528+
* ChunkLinkFetcher#refetchLink} call.
529+
*
530+
* @param chunkIndex The chunk index whose link has expired
531+
* @param rowOffset The row offset of the chunk (used by Thrift)
532+
* @return The refreshed ExternalLink
533+
* @throws SQLException if the refresh fails
534+
*/
535+
ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLException {
536+
synchronized (refetchLock) {
537+
// Double-check: another thread may have already refreshed while we waited on the lock
538+
ArrowResultChunk targetChunk = chunks.get(chunkIndex);
539+
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {
540+
return targetChunk.getChunkLink();
541+
}
542+
543+
// Find the minimum expired chunk index among pre-download chunks.
544+
// Only called from StreamingChunkDownloadTask before download starts, so the
545+
// target chunk is always pre-download. We also skip chunks that have already been
546+
// downloaded (DOWNLOAD_IN_PROGRESS or later) since their links are no longer needed.
547+
long minExpiredIndex = Long.MAX_VALUE;
548+
long minExpiredRowOffset = 0;
549+
int expiredCount = 0;
550+
for (ArrowResultChunk c : chunks.values()) {
551+
ChunkStatus status = c.getStatus();
552+
if (status != ChunkStatus.PENDING
553+
&& status != ChunkStatus.URL_FETCHED
554+
&& status != ChunkStatus.DOWNLOAD_FAILED
555+
&& status != ChunkStatus.DOWNLOAD_RETRY) {
556+
continue; // Already downloading or downloaded — link refresh not needed
557+
}
558+
if (c.isChunkLinkInvalid()) {
559+
expiredCount++;
560+
if (c.getChunkIndex() < minExpiredIndex) {
561+
minExpiredIndex = c.getChunkIndex();
562+
minExpiredRowOffset = c.getStartRowOffset();
563+
}
564+
}
565+
}
566+
567+
if (minExpiredIndex == Long.MAX_VALUE) {
568+
// No expired chunks found — race condition resolved by another thread
569+
if (targetChunk != null) {
570+
return targetChunk.getChunkLink();
571+
}
572+
throw new DatabricksSQLException(
573+
"Chunk " + chunkIndex + " not found during link refresh",
574+
DatabricksDriverErrorCode.CHUNK_READY_ERROR);
575+
}
576+
577+
LOGGER.info(
578+
"Coalesced link refresh: fetching from chunk {} (row offset {}) for {} expired chunks",
579+
minExpiredIndex,
580+
minExpiredRowOffset,
581+
expiredCount);
582+
583+
// Single batch FetchResults RPC from the lowest expired offset
584+
ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset);
585+
586+
// Update ALL pre-download chunks that received fresh links.
587+
// Always overwrite even if the current link hasn't expired yet, since the
588+
// server-provided link has a later expiry and prevents near-expiry races.
589+
for (ExternalLink link : result.getChunkLinks()) {
590+
ArrowResultChunk c = chunks.get(link.getChunkIndex());
591+
if (c != null) {
592+
ChunkStatus status = c.getStatus();
593+
if (status == ChunkStatus.PENDING
594+
|| status == ChunkStatus.URL_FETCHED
595+
|| status == ChunkStatus.DOWNLOAD_FAILED
596+
|| status == ChunkStatus.DOWNLOAD_RETRY) {
597+
c.setChunkLink(link);
598+
}
599+
}
600+
}
601+
602+
// Check if our target chunk was refreshed by the batch
603+
targetChunk = chunks.get(chunkIndex);
604+
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {
605+
return targetChunk.getChunkLink();
606+
}
607+
608+
// Fallback: batch response did not include the requested chunk
609+
LOGGER.warn(
610+
"Batch refresh did not include chunk {}, falling back to single refetch", chunkIndex);
611+
ExternalLink fallbackLink = linkFetcher.refetchLink(chunkIndex, rowOffset);
612+
if (targetChunk != null) {
613+
targetChunk.setChunkLink(fallbackLink);
614+
}
615+
return fallbackLink;
616+
}
617+
}
618+
514619
private void submitDownloadTask(ArrowResultChunk chunk) {
515620
LOGGER.debug("Submitting download task for chunk {}", chunk.getChunkIndex());
516621

517622
StreamingChunkDownloadTask task =
518623
new StreamingChunkDownloadTask(
519-
chunk, httpClient, compressionCodec, linkFetcher, cloudFetchSpeedThreshold);
624+
chunk, httpClient, compressionCodec, this::getRefreshedLink, cloudFetchSpeedThreshold);
520625

521626
downloadExecutor.submit(task);
522627
}

src/test/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkDownloadTaskTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class StreamingChunkDownloadTaskTest {
3838

3939
@Mock private ArrowResultChunk chunk;
4040
@Mock private IDatabricksHttpClient httpClient;
41-
@Mock private ChunkLinkFetcher linkFetcher;
41+
@Mock private LinkRefresher linkRefresher;
4242

4343
private StreamingChunkDownloadTask downloadTask;
4444
private CompletableFuture<Void> downloadFuture;
@@ -48,7 +48,7 @@ void setUp() {
4848
downloadFuture = new CompletableFuture<>();
4949
downloadTask =
5050
new StreamingChunkDownloadTask(
51-
chunk, httpClient, CompressionCodec.NONE, linkFetcher, CLOUD_FETCH_SPEED_THRESHOLD);
51+
chunk, httpClient, CompressionCodec.NONE, linkRefresher, CLOUD_FETCH_SPEED_THRESHOLD);
5252
}
5353

5454
@Test
@@ -137,17 +137,16 @@ void testLinkRefreshWhenLinkIsInvalid() throws Exception {
137137
when(chunk.isChunkLinkInvalid()).thenReturn(true, false);
138138

139139
ExternalLink freshLink = mock(ExternalLink.class);
140-
when(linkFetcher.refetchLink(5L, 100L)).thenReturn(freshLink);
140+
when(linkRefresher.refreshLink(5L, 100L)).thenReturn(freshLink);
141141

142142
doNothing()
143143
.when(chunk)
144144
.downloadData(httpClient, CompressionCodec.NONE, CLOUD_FETCH_SPEED_THRESHOLD);
145145

146146
downloadTask.call();
147147

148-
// Verify link was refreshed
149-
verify(linkFetcher, times(1)).refetchLink(5L, 100L);
150-
verify(chunk, times(1)).setChunkLink(freshLink);
148+
// Verify link was refreshed (setChunkLink is done inside getRefreshedLink, not here)
149+
verify(linkRefresher, times(1)).refreshLink(5L, 100L);
151150
verify(chunk, times(1))
152151
.downloadData(httpClient, CompressionCodec.NONE, CLOUD_FETCH_SPEED_THRESHOLD);
153152
assertTrue(downloadFuture.isDone());
@@ -162,7 +161,7 @@ void testLinkRefreshFailureAndRetry() throws Exception {
162161
when(chunk.isChunkLinkInvalid()).thenReturn(true, true, false);
163162

164163
ExternalLink freshLink = mock(ExternalLink.class);
165-
when(linkFetcher.refetchLink(5L, 100L)).thenReturn(freshLink);
164+
when(linkRefresher.refreshLink(5L, 100L)).thenReturn(freshLink);
166165

167166
// First download fails with IOException, retry succeeds
168167
doThrow(new SocketException("Connection reset"))
@@ -172,9 +171,8 @@ void testLinkRefreshFailureAndRetry() throws Exception {
172171

173172
downloadTask.call();
174173

175-
// Should refresh link twice (once per attempt)
176-
verify(linkFetcher, times(2)).refetchLink(5L, 100L);
177-
verify(chunk, times(2)).setChunkLink(freshLink);
174+
// Should refresh link twice (once per attempt; setChunkLink done inside getRefreshedLink)
175+
verify(linkRefresher, times(2)).refreshLink(5L, 100L);
178176
verify(chunk, times(2))
179177
.downloadData(httpClient, CompressionCodec.NONE, CLOUD_FETCH_SPEED_THRESHOLD);
180178
verify(chunk, times(1)).setStatus(ChunkStatus.DOWNLOAD_RETRY);
@@ -253,7 +251,7 @@ void testStatusTransitionsDuringRetries() throws Exception {
253251
spiedChunk,
254252
httpClient,
255253
CompressionCodec.NONE,
256-
linkFetcher,
254+
linkRefresher,
257255
CLOUD_FETCH_SPEED_THRESHOLD);
258256

259257
// Execute the task

0 commit comments

Comments
 (0)