Skip to content

Commit 2557e6f

Browse files
Fix CloudFetch goroutine leak that retains Arrow buffers after Close (#357)
## Summary Fixes #356. Under high CloudFetch concurrency (≥6 simultaneous downloads), in-flight `cloudFetchDownloadTask` goroutines could leak when the consumer closed the iterator before draining all results. Each leaked goroutine pinned a downloaded chunk in the Go heap, producing the multi-GiB heap plateau described in the issue that only released on process restart. ## Root cause `cloudFetchDownloadTask.Run` sends the download result on an **unbuffered** channel without honoring context cancellation: ```go cft.resultChan <- cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), ...} ``` Sequence that triggers the leak: 1. `cloudIPCStreamIterator.Next` schedules `MaxDownloadThreads` (default 10) tasks concurrently. 2. The consumer dequeues task 1, gets its result, returns. 3. Tasks 2..N have completed their HTTP read in parallel and are now **blocked** on the unbuffered send, holding their downloaded buffer. 4. The consumer abandons the iterator (timeout, error, early close, etc.) and calls `iterator.Close()`. 5. `Close` calls `task.cancel()` on each remaining task. But context cancellation does **not** unblock an in-flight channel send — the goroutines stay blocked forever, retaining their buffers. In v1.7.1 (the version the reporter is on) the goroutine had already decoded the bytes into Arrow records *before* the send, so the leaked memory was Arrow-allocator buffers — matching the stack trace in the issue: ``` (*cloudFetchDownloadTask).Run.func1 getArrowRecords → (*ipc.Reader).Next → newRecord → loadArray → loadBinary → buffer → (*ipcSource).buffer → NewResizableBuffer → (*Buffer).Resize → (*GoAllocator).Allocate ``` In the current code (v1.11.0) the decode happens later in `batchIterator.Next`, so the leak is the raw decompressed `buf` instead — same shape, smaller per-goroutine retention, same plateau pattern. ## Fix Route every channel send through a helper that selects on `ctx.Done()`: ```go func (cft *cloudFetchDownloadTask) sendResult(result cloudFetchDownloadTaskResult) { select { case cft.resultChan <- result: case <-cft.ctx.Done(): } } ``` `cloudIPCStreamIterator.Close` already calls `task.cancel()` for every queued task, so cancellation now correctly drains stuck goroutines and lets their buffers be GC'd. ## Test plan - [x] New unit test `TestCloudFetchIterator_CloseReleasesInFlightDownloads` reproduces the leak: spawns `MaxDownloadThreads` concurrent downloads, releases them after the iterator has consumed only the first, then calls `Close()` and asserts that no `cloudFetchDownloadTask.Run` goroutines remain. - Fails on `main` (~9 leaked goroutines after `Close`). - Passes with this change. - [x] Full `go test ./...` passes locally. - [x] `go vet` and `gofmt` clean. ## Who is affected Any user with CloudFetch enabled (default since v1.7.0) whose query context can be cancelled or whose result set can be abandoned mid-stream — i.e., basically everyone running large CloudFetch queries with timeouts. This pull request and its description were written by Isaac. Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent f4d9992 commit 2557e6f

2 files changed

Lines changed: 111 additions & 3 deletions

File tree

internal/rows/arrowbased/batchloader.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (cft *cloudFetchDownloadTask) Run() {
297297
downloadStart := time.Now()
298298
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient)
299299
if err != nil {
300-
cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err}
300+
cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err})
301301
return
302302
}
303303

@@ -306,7 +306,7 @@ func (cft *cloudFetchDownloadTask) Run() {
306306
data.Close() //nolint:errcheck,gosec // G104: close after reading data
307307
downloadMs := time.Since(downloadStart).Milliseconds()
308308
if err != nil {
309-
cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err}
309+
cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err})
310310
return
311311
}
312312

@@ -316,10 +316,21 @@ func (cft *cloudFetchDownloadTask) Run() {
316316
cft.link.RowCount,
317317
)
318318

319-
cft.resultChan <- cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs}
319+
cft.sendResult(cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs})
320320
}()
321321
}
322322

323+
// sendResult delivers the download result to the consumer, but drops it if the
324+
// task's context has already been cancelled. Without this guard, a goroutine
325+
// that finishes its work after the iterator is closed blocks forever on the
326+
// unbuffered resultChan and pins the downloaded buffer in the heap (issue #356).
327+
func (cft *cloudFetchDownloadTask) sendResult(result cloudFetchDownloadTaskResult) {
328+
select {
329+
case cft.resultChan <- result:
330+
case <-cft.ctx.Done():
331+
}
332+
}
333+
323334
// logCloudFetchSpeed calculates and logs download speed metrics
324335
func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) {
325336
if contentLength > 0 && duration.Seconds() > 0 {

internal/rows/arrowbased/batchloader_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import (
66
"fmt"
77
"net/http"
88
"net/http/httptest"
9+
"runtime"
10+
"strings"
911
"sync"
12+
"sync/atomic"
1013
"testing"
1114
"time"
1215

@@ -604,3 +607,97 @@ func generateMockArrowBytes(record arrow.Record) []byte {
604607
}
605608
return buf.Bytes()
606609
}
610+
611+
// TestCloudFetchIterator_CloseReleasesInFlightDownloads reproduces issue #356:
612+
// when the consumer closes the iterator while downloads are still in flight,
613+
// goroutines that completed their HTTP fetch get permanently blocked sending
614+
// to the unbuffered resultChan. They retain the downloaded buffers (Arrow
615+
// allocations in earlier versions, raw bytes in current code) until process
616+
// exit, producing a heap plateau that only releases on restart.
617+
//
618+
// The test schedules many concurrent downloads, lets them complete, and then
619+
// closes the iterator without consuming the queued results. After Close
620+
// returns, no cloudFetchDownloadTask goroutines must remain.
621+
func TestCloudFetchIterator_CloseReleasesInFlightDownloads(t *testing.T) {
622+
arrowBytes := generateMockArrowBytes(generateArrowRecord())
623+
624+
// Track in-flight downloads. The server signals each request as it starts
625+
// and waits on a release channel so the test can hold downloads in the
626+
// queued-but-not-yet-consumed state before closing the iterator.
627+
var inFlight atomic.Int64
628+
release := make(chan struct{})
629+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
630+
inFlight.Add(1)
631+
<-release
632+
w.WriteHeader(http.StatusOK)
633+
_, _ = w.Write(arrowBytes)
634+
}))
635+
defer server.Close()
636+
637+
const nLinks = 20
638+
links := make([]*cli_service.TSparkArrowResultLink, nLinks)
639+
for i := range links {
640+
links[i] = &cli_service.TSparkArrowResultLink{
641+
FileLink: server.URL,
642+
ExpiryTime: time.Now().Add(10 * time.Minute).Unix(),
643+
StartRowOffset: int64(i),
644+
RowCount: 1,
645+
}
646+
}
647+
648+
cfg := config.WithDefaults()
649+
cfg.UseLz4Compression = false
650+
cfg.MaxDownloadThreads = 10
651+
652+
bi, err := NewCloudBatchIterator(context.Background(), links, 0, nil, cfg, nil)
653+
assert.Nil(t, err)
654+
655+
// Kick off the first batch download. The iterator schedules
656+
// MaxDownloadThreads concurrent fetches behind the scenes.
657+
go func() { _, _ = bi.Next() }()
658+
659+
// Wait for all MaxDownloadThreads goroutines to be blocked inside the
660+
// server handler (they've issued the GET and are waiting for the body).
661+
assert.Eventually(t, func() bool {
662+
return inFlight.Load() == int64(cfg.MaxDownloadThreads)
663+
}, 5*time.Second, 10*time.Millisecond, "expected %d in-flight downloads", cfg.MaxDownloadThreads)
664+
665+
// Release the downloads so each goroutine finishes its HTTP read and
666+
// attempts to send its result on the unbuffered resultChan. Only the
667+
// first task's result will be read (by the Next() call above); the rest
668+
// will be queued, blocked on the send.
669+
close(release)
670+
671+
// Give the goroutines time to finish their HTTP work and reach the
672+
// channel send.
673+
time.Sleep(200 * time.Millisecond)
674+
675+
// Close the iterator without consuming the remaining batches.
676+
bi.Close()
677+
678+
// After Close, every cloudFetchDownloadTask goroutine must exit. We don't
679+
// compare against the total goroutine count because httptest keeps
680+
// persistent server/transport goroutines around — we look only for our
681+
// own download goroutines.
682+
assert.Eventually(t, func() bool {
683+
return countDownloadTaskGoroutines() == 0
684+
}, 5*time.Second, 50*time.Millisecond,
685+
"cloudFetchDownloadTask goroutines leaked after Close: have %d",
686+
countDownloadTaskGoroutines())
687+
}
688+
689+
// countDownloadTaskGoroutines returns the number of live goroutines whose
690+
// stack includes cloudFetchDownloadTask.Run. Used to detect the leak in
691+
// issue #356.
692+
func countDownloadTaskGoroutines() int {
693+
buf := make([]byte, 64*1024)
694+
for {
695+
n := runtime.Stack(buf, true)
696+
if n < len(buf) {
697+
buf = buf[:n]
698+
break
699+
}
700+
buf = make([]byte, 2*len(buf))
701+
}
702+
return strings.Count(string(buf), "cloudFetchDownloadTask).Run")
703+
}

0 commit comments

Comments
 (0)