Skip to content

Commit 23d4abc

Browse files
sandy2008claude
andcommitted
[BUGFIX] Ingester: avoid send-on-closed-channel panic in ActiveQueriedSeriesService
ActiveQueriedSeriesService.stopping() previously closed updateChan while concurrent UpdateSeriesBatch callers could still be inside a non-blocking select+default send. select+default does NOT protect against panicking on send to a closed channel — that always panics. As a result, any in-flight UpdateSeriesBatch during ingester shutdown could crash the process with "panic: send on closed channel". Stop closing the channel. Workers already exit via the existing <-ctx.Done() arm (BasicService cancels the service context before invoking stopping), and m.workers.Wait() still synchronises shutdown. A non-blocking drain after Wait() returns pooled hash slices, keeping shutdown allocation behavior clean. Late sends that arrive after the drain exits are tolerated: UpdateSeriesBatch uses a non-blocking select+default send so producers never block, and any leftover entries in the buffered channel are reclaimed when the service is GC'd. Add TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown: a deterministic regression test that races 32 concurrent producers against StopAndAwaitTerminated, with a two-phase hammer (before and after shutdown) and per-goroutine panic recovery. The test fails deterministically (5/5 iterations) when the close() is reintroduced and passes 100/100 iterations under -race with the fix. Fixes #7531 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com>
1 parent d0d09de commit 23d4abc

3 files changed

Lines changed: 114 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
4040
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
4141
* [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534
42+
* [BUGFIX] Ingester: Fix `panic: send on closed channel` in `ActiveQueriedSeriesService` on shutdown by removing the redundant channel close in `stopping()` and relying on `ctx.Done()` to signal worker exit. #7533
4243

4344
## 1.21.0 2026-04-24
4445

pkg/ingester/active_queried_series.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,27 @@ func (m *ActiveQueriedSeriesService) running(ctx context.Context) error {
421421
}
422422

423423
// stopping waits for all worker goroutines to finish.
424+
// Note: we intentionally do not close m.updateChan here. Closing it would race
425+
// with concurrent producers calling UpdateSeriesBatch (a select+default does
426+
// not prevent panics on send to a closed channel). Workers exit via ctx.Done()
427+
// which is signaled by the BasicService lifecycle when stopping.
424428
func (m *ActiveQueriedSeriesService) stopping(_ error) error {
425-
// Close the channel to signal workers to stop
426-
close(m.updateChan)
427-
// Wait for all workers to finish
429+
// Wait for all workers to finish. They will exit via ctx.Done().
428430
m.workers.Wait()
429-
return nil
431+
// Drain any remaining updates so pooled slices are returned even if the
432+
// channel was non-empty at shutdown. We never close the channel, so use a
433+
// non-blocking drain. Sends that arrive after this drain exits are
434+
// tolerated: UpdateSeriesBatch uses a non-blocking select+default send so
435+
// producers never block, and any entries left in the buffered channel are
436+
// reclaimed when the service is garbage-collected.
437+
for {
438+
select {
439+
case update := <-m.updateChan:
440+
putQueriedSeriesHashesSlice(update.hashes)
441+
default:
442+
return nil
443+
}
444+
}
430445
}
431446

432447
// processUpdates is a worker goroutine that processes updates from the update channel.
@@ -437,11 +452,7 @@ func (m *ActiveQueriedSeriesService) processUpdates(ctx context.Context) {
437452
select {
438453
case <-ctx.Done():
439454
return
440-
case update, ok := <-m.updateChan:
441-
if !ok {
442-
// Channel closed, exit
443-
return
444-
}
455+
case update := <-m.updateChan:
445456
// Process the update synchronously
446457
update.activeQueriedSeries.UpdateSeriesBatch(update.hashes, update.now)
447458
// Return the slice to the pool after processing

pkg/ingester/active_queried_series_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package ingester
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
7+
"sync/atomic"
68
"testing"
79
"time"
810

911
"github.com/go-kit/log"
1012
"github.com/prometheus/prometheus/model/labels"
1113
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/pkg/util/services"
1217
)
1318

1419
func TestActiveQueriedSeries_UpdateSeries(t *testing.T) {
@@ -453,3 +458,91 @@ func TestActiveQueriedSeries_EdgeCaseTimes(t *testing.T) {
453458
// Should handle gracefully
454459
assert.GreaterOrEqual(t, active, uint64(0))
455460
}
461+
462+
// TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown is a regression
463+
// test for https://github.com/cortexproject/cortex/issues/7531. The service
464+
// previously closed updateChan in stopping() while concurrent producers were
465+
// still inside select+default sends, causing a "send on closed channel" panic.
466+
// We hammer UpdateSeriesBatch concurrently with shutdown and assert no panic.
467+
//
468+
// Each producer performs a bounded number of sends so the test always
469+
// terminates promptly, even when the post-stop channel-full path is exercised
470+
// heavily (every send takes the `default` branch and logs).
471+
func TestActiveQueriedSeriesService_NoSendOnClosedChannelOnShutdown(t *testing.T) {
472+
const (
473+
producers = 32
474+
sendsPerProd = 5000
475+
windowDuration = 1 * time.Minute
476+
numWindows = 3
477+
)
478+
totalDuration := time.Duration(numWindows) * windowDuration
479+
480+
svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil)
481+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), svc))
482+
483+
a := NewActiveQueriedSeries([]time.Duration{totalDuration}, windowDuration, 1.0, log.NewNopLogger())
484+
485+
// Pre-compute a hash to keep the producer loop tight and avoid expensive
486+
// allocations that would dwarf the actual race window we're trying to hit.
487+
hash := labels.FromStrings("a", "1").Hash()
488+
489+
var (
490+
panicked atomic.Bool
491+
panicMsg atomic.Value // string
492+
wg sync.WaitGroup
493+
producing sync.WaitGroup
494+
gate = make(chan struct{}) // released after Stop() returns
495+
)
496+
wg.Add(producers)
497+
producing.Add(producers)
498+
for range producers {
499+
go func() {
500+
defer wg.Done()
501+
// Capture any panic (e.g. "send on closed channel") so the test
502+
// binary survives and we can fail the test cleanly.
503+
defer func() {
504+
if r := recover(); r != nil {
505+
panicked.Store(true)
506+
panicMsg.Store(fmt.Sprintf("%v", r))
507+
}
508+
}()
509+
510+
now := time.Now()
511+
producing.Done()
512+
// First phase: hammer while the service is still running so the
513+
// channel and workers are active.
514+
for range sendsPerProd {
515+
hashes := getQueriedSeriesHashesSlice()
516+
hashes = append(hashes, hash)
517+
svc.UpdateSeriesBatch(a, hashes, now, "tenant")
518+
}
519+
// Wait until the test goroutine has stopped the service, then send
520+
// a second burst to maximize the race window during/after
521+
// shutdown. With the bug, these sends would panic.
522+
<-gate
523+
for range sendsPerProd {
524+
hashes := getQueriedSeriesHashesSlice()
525+
hashes = append(hashes, hash)
526+
svc.UpdateSeriesBatch(a, hashes, now, "tenant")
527+
}
528+
}()
529+
}
530+
531+
// Wait for all producers to be actively hammering before initiating
532+
// shutdown, so Stop overlaps with concurrent sends.
533+
producing.Wait()
534+
535+
// Stop the service while producers are still in their first burst. With
536+
// the bug (close(updateChan) in stopping), the panic can fire here.
537+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), svc))
538+
539+
// Release the producers' second burst — these sends target the now-stopped
540+
// service and would unambiguously panic against a closed channel.
541+
close(gate)
542+
543+
wg.Wait()
544+
545+
if panicked.Load() {
546+
t.Fatalf("producer goroutine panicked during shutdown: %v", panicMsg.Load())
547+
}
548+
}

0 commit comments

Comments
 (0)