Skip to content

Commit c83a65b

Browse files
MagicalTuxclaude
andcommitted
Improve idle download locking to avoid blocking user reads
- Idle task now removes reader from slice while using it - Releases locks during HTTP I/O operations - Returns reader to pool if still useful, or closes it - Prevents idle downloads from blocking concurrent ReadAt calls 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 83e9662 commit c83a65b

1 file changed

Lines changed: 110 additions & 80 deletions

File tree

client.go

Lines changed: 110 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ func (dl *dlClient) closeLRUReader() {
282282
// idleTaskRun is called during idle periods to download missing blocks
283283
// in the background. It runs in a separate goroutine and downloads multiple
284284
// consecutive blocks until approximately 1 second has elapsed.
285+
// It releases locks during I/O to avoid blocking user ReadAt calls.
285286
func (dl *dlClient) idleTaskRun() {
286287
defer func() {
287288
atomic.AddUintptr(&dl.taskCnt, ^uintptr(0))
@@ -293,119 +294,148 @@ func (dl *dlClient) idleTaskRun() {
293294
}
294295
}()
295296

296-
dl.handler.lk.Lock()
297-
defer dl.handler.lk.Unlock()
298-
dl.lk.Lock()
299-
defer dl.lk.Unlock()
300-
301-
// increase timer now to avoid deletion
302-
dl.expire = time.Now().Add(time.Minute)
303-
304297
startTime := time.Now()
305298
blocksDownloaded := 0
306-
blkSize := dl.handler.getBlockSize()
307-
buf := make([]byte, blkSize)
299+
var buf []byte
308300

309-
// Helper to save progress if we downloaded anything
301+
// Save progress at the end if we downloaded anything
310302
defer func() {
311303
if blocksDownloaded > 0 {
304+
dl.handler.lk.Lock()
312305
dl.handler.savePart()
306+
dl.handler.lk.Unlock()
313307
dl.dlm.logf("idle: downloaded %d blocks in %v", blocksDownloaded, time.Since(startTime))
314308
}
315309
}()
316310

317-
// Find or create a reader for idle downloading
318-
// Try to find an existing reader at a useful position first
319-
var idleReader *httpReader
320-
for _, r := range dl.readers {
321-
if r == nil || r.resp == nil {
322-
continue
323-
}
324-
cnt := dl.handler.wantsFollowing(r.pos)
325-
if cnt > 0 {
326-
idleReader = r
327-
break
328-
}
329-
}
330-
331311
// Download blocks until ~1 second has passed
332312
for time.Since(startTime) < time.Second {
333-
// Check if we have an existing reader at a useful position
334-
if idleReader != nil && idleReader.resp != nil {
335-
cnt := dl.handler.wantsFollowing(idleReader.pos)
336-
if cnt <= 0 {
337-
// Current position already downloaded, close and find new position
338-
idleReader.resp.Body.Close()
339-
dl.removeReader(idleReader)
340-
idleReader = nil
313+
// Acquire locks to find/take a reader
314+
dl.handler.lk.Lock()
315+
dl.lk.Lock()
316+
317+
dl.expire = time.Now().Add(time.Minute)
318+
319+
// Initialize buffer on first iteration
320+
if buf == nil {
321+
buf = make([]byte, dl.handler.getBlockSize())
322+
}
323+
324+
// Find and take a reader at a useful position
325+
var idleReader *httpReader
326+
for i, r := range dl.readers {
327+
if r == nil || r.resp == nil {
341328
continue
342329
}
330+
cnt := dl.handler.wantsFollowing(r.pos)
331+
if cnt > 0 {
332+
idleReader = r
333+
// Remove from slice - we're taking ownership
334+
dl.readers = append(dl.readers[:i], dl.readers[i+1:]...)
335+
break
336+
}
337+
}
343338

344-
// Read the block
345-
rPos := idleReader.pos
346-
readBuf := buf[:cnt]
347-
n, err := io.ReadFull(idleReader.resp.Body, readBuf)
348-
if err != nil && err != io.ErrUnexpectedEOF {
349-
dl.dlm.logf("idle read failed: %s", err)
350-
idleReader.resp.Body.Close()
351-
dl.removeReader(idleReader)
352-
idleReader = nil
353-
if n == 0 {
354-
continue
339+
// If no suitable reader, find first missing block and create one
340+
var off int64 = -1
341+
if idleReader == nil {
342+
off = dl.handler.firstMissing()
343+
if off < 0 {
344+
if dl.handler.isComplete() {
345+
dl.complete = true
355346
}
347+
dl.lk.Unlock()
348+
dl.handler.lk.Unlock()
349+
return
350+
}
351+
}
352+
353+
// Release locks before I/O
354+
dl.lk.Unlock()
355+
dl.handler.lk.Unlock()
356+
357+
// Create new reader if needed (outside of lock)
358+
if idleReader == nil {
359+
req, err := http.NewRequest("GET", dl.url, nil)
360+
if err != nil {
361+
dl.dlm.logf("idle: failed to create request: %s", err)
362+
return
363+
}
364+
365+
if off != 0 {
366+
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
356367
}
357-
idleReader.pos += int64(n)
358-
idleReader.lastAccess = time.Now()
359368

360-
// Ingest without saving (we'll save once at the end)
361-
err = dl.handler.ingestDataBatch(readBuf[:n], rPos)
369+
dl.dlm.logf("idle: initializing HTTP connection download at byte %d~", off)
370+
371+
resp, err := dl.dlm.Client.Do(req)
362372
if err != nil {
363-
dl.dlm.logf("idle write failed: %s", err)
364-
dl.failure = true
373+
dl.dlm.logf("idle download failed: %s", err)
374+
return
375+
}
376+
if resp.StatusCode > 299 {
377+
resp.Body.Close()
378+
dl.dlm.logf("idle download failed due to status %s", resp.Status)
365379
return
366380
}
367-
blocksDownloaded++
368-
continue
369-
}
370381

371-
// No reader, find first missing block
372-
off := dl.handler.firstMissing()
373-
if off < 0 {
374-
if dl.handler.isComplete() {
375-
dl.complete = true
382+
idleReader = &httpReader{
383+
resp: resp,
384+
pos: off,
385+
lastAccess: time.Now(),
376386
}
377-
return
378387
}
379388

380-
// Create new reader for idle downloading (don't count against MaxReadersPerFile limit)
381-
req, err := http.NewRequest("GET", dl.url, nil)
382-
if err != nil {
383-
dl.dlm.logf("idle: failed to create request: %s", err)
384-
return
389+
// Read a block (outside of lock)
390+
dl.handler.lk.RLock()
391+
cnt := dl.handler.wantsFollowing(idleReader.pos)
392+
dl.handler.lk.RUnlock()
393+
394+
if cnt <= 0 {
395+
// Position already downloaded, close reader and continue
396+
idleReader.resp.Body.Close()
397+
continue
385398
}
386399

387-
if off != 0 {
388-
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", off))
400+
readBuf := buf[:cnt]
401+
rPos := idleReader.pos
402+
n, err := io.ReadFull(idleReader.resp.Body, readBuf)
403+
if err != nil && err != io.ErrUnexpectedEOF {
404+
dl.dlm.logf("idle read failed: %s", err)
405+
idleReader.resp.Body.Close()
406+
if n == 0 {
407+
continue
408+
}
389409
}
410+
idleReader.pos += int64(n)
411+
idleReader.lastAccess = time.Now()
390412

391-
dl.dlm.logf("idle: initializing HTTP connection download at byte %d~", off)
413+
// Ingest the data (needs lock)
414+
dl.handler.lk.Lock()
415+
err = dl.handler.ingestDataBatch(readBuf[:n], rPos)
416+
dl.handler.lk.Unlock()
392417

393-
resp, err := dl.dlm.Client.Do(req)
394418
if err != nil {
395-
dl.dlm.logf("idle download failed: %s", err)
419+
dl.dlm.logf("idle write failed: %s", err)
420+
idleReader.resp.Body.Close()
421+
dl.lk.Lock()
422+
dl.failure = true
423+
dl.lk.Unlock()
396424
return
397425
}
398-
if resp.StatusCode > 299 {
399-
resp.Body.Close()
400-
dl.dlm.logf("idle download failed due to status %s", resp.Status)
401-
return
402-
}
403-
404-
idleReader = &httpReader{
405-
resp: resp,
406-
pos: off,
407-
lastAccess: time.Now(),
426+
blocksDownloaded++
427+
428+
// Return reader to the pool if still useful, otherwise close it
429+
dl.handler.lk.RLock()
430+
stillUseful := dl.handler.wantsFollowing(idleReader.pos) > 0
431+
dl.handler.lk.RUnlock()
432+
433+
if stillUseful {
434+
dl.lk.Lock()
435+
dl.readers = append(dl.readers, idleReader)
436+
dl.lk.Unlock()
437+
} else {
438+
idleReader.resp.Body.Close()
408439
}
409-
dl.readers = append(dl.readers, idleReader)
410440
}
411441
}

0 commit comments

Comments
 (0)