Skip to content

Commit 2a76760

Browse files
Fix race condition between chunk release and download error handling (#1407)
## Description Fixes #1367 — Race condition between `CHUNK_RELEASED` and `DOWNLOAD_FAILED` causes invalid state transition warnings during Arrow Cloud Fetch operations. ### Root Cause When chunk download threads are running in parallel and their connections are cut off by `IdleConnectionEvictor` (due to no socket-level data movement caused by extreme resource pressure in low-resource environments), the downloads fail. When the consumer reaches a failed chunk, `rs.next()` throws a `DatabricksSQLException`. This can trigger `rs.close()` — either explicitly by the user's error handling code, implicitly by try-with-resources, or automatically by frameworks that close resources on error. Additionally, `rs.close()` can also be called independently by the user before consuming all data. In either case, `doClose()` calls `shutdownNow()` on the download thread pool and releases all chunks to `CHUNK_RELEASED`. But `shutdownNow()` only sends an interrupt signal — it doesn't wait for threads to stop. Download threads that are still processing their errors may try to set `DOWNLOAD_FAILED` after their chunks have already been released. The transition `CHUNK_RELEASED → DOWNLOAD_FAILED` is invalid in the chunk lifecycle state machine, causing the error/warning. ### Race Condition Sequence ``` Thread A (consumer/main) Thread B (download) ───────────────────────── ───────────────────── downloading chunk [7]... connection killed by evictor IOException caught retries exhausted ↓ rs.next() → gets error (processing error...) rs.close() → doClose() → shutdownNow() (interrupt sent, but not yet processed) → chunk[7].releaseChunk() → CHUNK_RELEASED ✓ → setStatus(DOWNLOAD_FAILED) CHUNK_RELEASED → DOWNLOAD_FAILED ✗ Invalid transition! ``` ### Fix Added `awaitTermination(3, TimeUnit.SECONDS)` after `shutdownNow()` in both `RemoteChunkProvider.doClose()` and `StreamingChunkProvider.close()`. This ensures download threads complete their error handling path (catch → finally → setStatus) before chunks are released. The 3-second timeout is a conservative upper bound. After `shutdownNow()` interrupts threads, they exit their retry sleep (`Thread.sleep(1500)`) immediately via `InterruptedException` and process the error path in milliseconds. `awaitTermination()` returns as soon as all threads finish — it does not wait the full 3 seconds. ### Files Changed | File | Change | |------|--------| | `RemoteChunkProvider.java` | Added `awaitTermination(3s)` after `shutdownNow()` in `doClose()`, added INFO log for close lifecycle | | `StreamingChunkProvider.java` | Added `awaitTermination(3s)` after `shutdownNow()` in `close()` | | `NEXT_CHANGELOG.md` | Added changelog entry | ## Testing ### Reproduction (Docker + iptables) Reproduced the error/warning logs from issue #1367 using: 1. Docker container with network throttled to 5mbit (slow chunk downloads) 2. `iptables REJECT --tcp-reset` to kill connections mid-download (simulating `IdleConnectionEvictor` behavior) 3. Calling `rs.close()` while download threads process errors 4. Temporary `MAX_RETRIES=0` + uninterruptible sleep before `setStatus(DOWNLOAD_FAILED)` to widen the race window **Before fix** — driver logs showed: ``` WARNING: Invalid state transition for chunk [1]: CHUNK_RELEASED -> DOWNLOAD_FAILED WARNING: Invalid state transition for chunk [2]: CHUNK_RELEASED -> DOWNLOAD_FAILED ``` **After fix** — no invalid state transitions. `awaitTermination()` waited for threads to finish before releasing chunks. Verified for both `RemoteChunkProvider` and `StreamingChunkProvider`. ## Additional Notes to the Reviewer - The `IdleConnectionEvictor` shares a connection pool between the Thrift transport and chunk downloads (`DatabricksHttpClient` via `DatabricksHttpClientFactory`). In resource-constrained environments, CPU starvation causes the Java layer to stop reading from sockets, making active download connections appear idle to the evictor. - `shutdownNow()` is best-effort per Java docs — it cannot guarantee threads will stop. The interrupt flag is set, but threads only respond when they hit an interruptible operation (`Thread.sleep`, I/O). Code after the interrupt (catch blocks, finally blocks, `setStatus()` calls) is pure computation that doesn't check interrupts. - The same race pattern existed in both `RemoteChunkProvider` (standard Cloud Fetch) and `StreamingChunkProvider` (opt-in via `EnableStreamingChunkProvider`). Both are fixed. - `RemoteChunkProviderV2` (incubator) downloads chunks sequentially without a thread pool, so it is not affected. --------- Signed-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com> Co-authored-by: Samikshya Chand <148681192+samikshya-db@users.noreply.github.com>
1 parent 25169a4 commit 2a76760

4 files changed

Lines changed: 42 additions & 1 deletion

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- Added support for using SQL SHOW commands for Thrift-mode metadata operations (`getTables`, `getColumns`, `getSchemas`, `getFunctions`, `getPrimaryKeys`, `getImportedKeys`, `getCrossReference`). Enable by setting `UseQueryForMetadata=1`. This aligns Thrift metadata behavior with Statement Execution API (SEA) mode.
1111

1212
### Fixed
13+
- Fixed race condition between chunk download error handling and result set close that could cause invalid state transition warnings (`CHUNK_RELEASED -> DOWNLOAD_FAILED`) during Arrow Cloud Fetch operations in resource-constrained environments.
1314
- Fixed `EnableBatchedInserts` silently falling back to individual execution when table or schema names contain special characters (e.g., hyphens) inside backtick-quoted identifiers. Added a warn log when the fallback occurs.
1415
- Fixed `IntervalConverter` crash (`IllegalArgumentException: Invalid interval metadata`) when INTERVAL columns are returned via CloudFetch. Arrow metadata from CloudFetch uses underscored format (`INTERVAL_YEAR_MONTH`, `INTERVAL_DAY_TIME`) which the driver's regex did not accept.
1516
- Fixed `Statement` being prematurely closed after queries that return inline results, which prevented re-execution, `getResultSet()`, and `getExecutionResult()` from working. Statements now remain open and reusable until explicitly closed by the caller.

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ protected ValueVector getColumnVector(int recordBatchIndex, int columnIndex) {
241241
* @param targetStatus new status to set
242242
*/
243243
protected void setStatus(ChunkStatus targetStatus) {
244+
if (getStatus() == ChunkStatus.CHUNK_RELEASED) {
245+
LOGGER.debug(
246+
"Skipping transition to {} — chunk [{}] already released", targetStatus, chunkIndex);
247+
return;
248+
}
244249
try {
245250
stateMachine.transition(targetStatus);
246251
} catch (DatabricksParsingException e) {

src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
88
import com.databricks.jdbc.dbclient.impl.common.StatementId;
99
import com.databricks.jdbc.exception.DatabricksSQLException;
10+
import com.databricks.jdbc.log.JdbcLogger;
11+
import com.databricks.jdbc.log.JdbcLoggerFactory;
1012
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
1113
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
1214
import com.databricks.jdbc.model.core.ResultData;
@@ -16,9 +18,11 @@
1618
import java.util.concurrent.ExecutorService;
1719
import java.util.concurrent.Executors;
1820
import java.util.concurrent.ThreadFactory;
21+
import java.util.concurrent.TimeUnit;
1922
import java.util.concurrent.atomic.AtomicInteger;
2023

2124
public class RemoteChunkProvider extends AbstractRemoteChunkProvider<ArrowResultChunk> {
25+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(RemoteChunkProvider.class);
2226
private static final String CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX =
2327
"databricks-jdbc-chunks-downloader-";
2428
private ExecutorService chunkDownloaderExecutorService;
@@ -125,8 +129,26 @@ public void downloadNextChunks() {
125129
/** {@inheritDoc} */
126130
@Override
127131
protected void doClose() {
132+
LOGGER.debug(
133+
"doClose() called — shutting down executor and releasing all {} chunks (thread: {})",
134+
chunkIndexToChunksMap.size(),
135+
Thread.currentThread().getName());
128136
isClosed = true;
129-
chunkDownloaderExecutorService.shutdownNow();
137+
if (chunkDownloaderExecutorService != null) {
138+
chunkDownloaderExecutorService.shutdownNow();
139+
// Wait for download threads to finish error handling before releasing chunks.
140+
// After shutdownNow() interrupts threads, they exit their retry sleep and process
141+
// the error path (catch → finally → setStatus) in milliseconds. 3 seconds is a
142+
// conservative upper bound to avoid racing with that error handling path.
143+
try {
144+
if (!chunkDownloaderExecutorService.awaitTermination(3, TimeUnit.SECONDS)) {
145+
LOGGER.warn("Download threads did not terminate within timeout");
146+
}
147+
} catch (InterruptedException e) {
148+
LOGGER.error(e, "Interrupted while waiting for download threads to terminate");
149+
Thread.currentThread().interrupt();
150+
}
151+
}
130152
chunkIndexToChunksMap.values().forEach(ArrowResultChunk::releaseChunk);
131153
DatabricksThreadContextHolder.clearStatementInfo();
132154
}

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.ExecutorService;
1919
import java.util.concurrent.Executors;
2020
import java.util.concurrent.ThreadFactory;
21+
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.TimeoutException;
2223
import java.util.concurrent.atomic.AtomicInteger;
2324
import java.util.concurrent.atomic.AtomicLong;
@@ -276,6 +277,18 @@ public void close() {
276277
// Shutdown download executor
277278
if (downloadExecutor != null) {
278279
downloadExecutor.shutdownNow();
280+
// Wait for download threads to finish error handling before releasing chunks.
281+
// After shutdownNow() interrupts threads, they exit their retry sleep and process
282+
// the error path (catch → finally → setStatus) in milliseconds. 3 seconds is a
283+
// conservative upper bound to avoid racing with that error handling path.
284+
try {
285+
if (!downloadExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
286+
LOGGER.warn("Download threads did not terminate within timeout");
287+
}
288+
} catch (InterruptedException e) {
289+
LOGGER.error(e, "Interrupted while waiting for download threads to terminate");
290+
Thread.currentThread().interrupt();
291+
}
279292
}
280293

281294
// Release all chunks

0 commit comments

Comments
 (0)