Skip to content

Commit c7c60df

Browse files
sandy2008claude
andcommitted
fix(scheduler): stop FragmentTable cleanup goroutine on shutdown
NewFragmentTable starts a periodicCleanup goroutine (driven by a ticker) that had no stop mechanism: the loop ranged over ticker.C forever, the deferred ticker.Stop() was unreachable, and FragmentTable had no Close method, so the goroutine and ticker lived for the whole process lifetime and could not be reclaimed (issue #7596). Add an idempotent Close() (done channel + sync.Once); periodicCleanup now selects on the done channel and returns. The scheduler closes the fragment table in its stopping hook so the goroutine is reclaimed on shutdown. A package goleak guard plus a regression test cover it, and the existing tests now stop the table they create. Fixes #7596 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com>
1 parent 40b1ecb commit c7c60df

5 files changed

Lines changed: 46 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
3434
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
3535
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
36+
* [BUGFIX] Query Scheduler: Stop the fragment table's background cleanup goroutine (and its ticker) on shutdown; it previously had no stop mechanism and lived for the process lifetime. #7599
3637
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
3738
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
3839
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ require (
9696
github.com/tjhop/slog-gokit v0.1.4
9797
go.opentelemetry.io/collector/pdata v1.45.0
9898
go.uber.org/automaxprocs v1.6.0
99+
go.uber.org/goleak v1.3.0
99100
google.golang.org/protobuf v1.36.11
100101
)
101102

@@ -283,7 +284,6 @@ require (
283284
go.opentelemetry.io/otel/metric v1.43.0 // indirect
284285
go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect
285286
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
286-
go.uber.org/goleak v1.3.0 // indirect
287287
go.uber.org/multierr v1.11.0 // indirect
288288
go.uber.org/zap v1.27.0 // indirect
289289
go.yaml.in/yaml/v2 v2.4.3 // indirect

pkg/scheduler/fragment_table/fragment_table.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type FragmentTable struct {
1818
mappings map[distributed_execution.FragmentKey]*fragmentEntry
1919
mu sync.RWMutex
2020
expiration time.Duration
21+
done chan struct{}
22+
closeOnce sync.Once
2123
}
2224

2325
// NewFragmentTable creates a new FragmentTable with the specified expiration duration.
@@ -27,6 +29,7 @@ func NewFragmentTable(expiration time.Duration) *FragmentTable {
2729
ft := &FragmentTable{
2830
mappings: make(map[distributed_execution.FragmentKey]*fragmentEntry),
2931
expiration: expiration,
32+
done: make(chan struct{}),
3033
}
3134
go ft.periodicCleanup()
3235
return ft
@@ -55,6 +58,14 @@ func (f *FragmentTable) GetAddrByID(queryID uint64, fragmentID uint64) (string,
5558
return "", false
5659
}
5760

61+
// Close stops the background cleanup goroutine started by NewFragmentTable.
62+
// It is safe to call more than once.
63+
func (f *FragmentTable) Close() {
64+
f.closeOnce.Do(func() {
65+
close(f.done)
66+
})
67+
}
68+
5869
func (f *FragmentTable) cleanupExpired() {
5970
f.mu.Lock()
6071
defer f.mu.Unlock()
@@ -73,7 +84,12 @@ func (f *FragmentTable) cleanupExpired() {
7384
func (f *FragmentTable) periodicCleanup() {
7485
ticker := time.NewTicker(f.expiration / 2)
7586
defer ticker.Stop()
76-
for range ticker.C {
77-
f.cleanupExpired()
87+
for {
88+
select {
89+
case <-ticker.C:
90+
f.cleanupExpired()
91+
case <-f.done:
92+
return
93+
}
7894
}
7995
}

pkg/scheduler/fragment_table/fragment_table_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@ import (
77

88
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
10+
"go.uber.org/goleak"
1011
)
1112

13+
// TestMain runs the package tests under goleak so that any FragmentTable whose
14+
// cleanup goroutine is not stopped (via Close) fails the package.
15+
func TestMain(m *testing.M) {
16+
goleak.VerifyTestMain(m)
17+
}
18+
1219
func TestNewFragmentTable(t *testing.T) {
1320
tests := []struct {
1421
name string
@@ -27,6 +34,7 @@ func TestNewFragmentTable(t *testing.T) {
2734
for _, tt := range tests {
2835
t.Run(tt.name, func(t *testing.T) {
2936
ft := NewFragmentTable(tt.expiration)
37+
t.Cleanup(ft.Close)
3038
require.NotNil(t, ft)
3139
require.NotNil(t, ft.mappings)
3240
assert.Equal(t, tt.expiration, ft.expiration)
@@ -36,6 +44,7 @@ func TestNewFragmentTable(t *testing.T) {
3644

3745
func TestFragmentTable_AddAndGetAddress(t *testing.T) {
3846
ft := NewFragmentTable(time.Hour)
47+
t.Cleanup(ft.Close)
3948

4049
tests := []struct {
4150
name string
@@ -84,6 +93,7 @@ func TestFragmentTable_AddAndGetAddress(t *testing.T) {
8493
func TestFragmentTable_Expiration(t *testing.T) {
8594
expiration := 100 * time.Millisecond
8695
ft := NewFragmentTable(expiration)
96+
t.Cleanup(ft.Close)
8797

8898
t.Run("entries expire after timeout", func(t *testing.T) {
8999
ft.AddAddressByID(1, 1, "addr1")
@@ -106,6 +116,7 @@ func TestFragmentTable_Expiration(t *testing.T) {
106116

107117
func TestFragmentTable_ConcurrentAccess(t *testing.T) {
108118
ft := NewFragmentTable(time.Hour)
119+
t.Cleanup(ft.Close)
109120

110121
const (
111122
numGoroutines = 10
@@ -140,6 +151,7 @@ func TestFragmentTable_ConcurrentAccess(t *testing.T) {
140151
func TestFragmentTable_PeriodicCleanup(t *testing.T) {
141152
expiration := 100 * time.Millisecond
142153
ft := NewFragmentTable(expiration)
154+
t.Cleanup(ft.Close)
143155

144156
ft.AddAddressByID(1, 1, "addr1")
145157
ft.AddAddressByID(1, 2, "addr2")
@@ -163,3 +175,15 @@ func TestFragmentTable_PeriodicCleanup(t *testing.T) {
163175
_, ok = ft.GetAddrByID(1, 2)
164176
require.False(t, ok)
165177
}
178+
179+
// TestFragmentTable_Close verifies that Close stops the background cleanup
180+
// goroutine — asserted package-wide by goleak in TestMain — and is safe to call
181+
// more than once.
182+
func TestFragmentTable_Close(t *testing.T) {
183+
ft := NewFragmentTable(time.Hour)
184+
185+
ft.Close()
186+
187+
// Close is idempotent: a second call must not panic (e.g. double close).
188+
require.NotPanics(t, ft.Close)
189+
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,8 @@ func (s *Scheduler) running(ctx context.Context) error {
695695

696696
// Close the Scheduler.
697697
func (s *Scheduler) stopping(_ error) error {
698+
// Stop the fragment table's background cleanup goroutine.
699+
s.fragmentTable.Close()
698700
// This will also stop the requests queue, which stop accepting new requests and errors out any tracked requests.
699701
return services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
700702
}

0 commit comments

Comments
 (0)