Skip to content

Commit 612ad2f

Browse files
sandy2008claude
andcommitted
fix(ruler): stop per-tenant notifier when manager creation fails
newManager started the per-user notifier (and its discovery/notification goroutines) and registered it in r.notifiers before creating the rule manager. When managerFactory returned an error the user was never added to r.userManagers, so the removal loop in SyncRuleGroups never stopped the notifier, leaking it and its goroutines until the process exited. The same applied to the applyConfig error path in getOrCreateNotifier. Tear down the partially-initialized notifier and per-user metrics registry on any early return from newManager (disarmed on success), and stop the notifier directly when applyConfig fails. Adds regression tests for both leak paths. Fixes #7595 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com>
1 parent 40b1ecb commit 612ad2f

3 files changed

Lines changed: 230 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
3434
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
3535
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
36+
* [BUGFIX] Ruler: Stop the per-tenant notifier (and its Alertmanager service-discovery goroutines) and remove the per-user metrics registry when rule manager creation fails, instead of leaking them until process shutdown. #7597
3637
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
3738
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
3839
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389

pkg/ruler/manager.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,37 @@ func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID strin
300300
reg := prometheus.NewRegistry()
301301
r.userManagerMetrics.AddUserRegistry(userID, reg)
302302

303+
// getOrCreateNotifier starts the per-user notifier (and its discovery and
304+
// notification goroutines) and registers it in r.notifiers before the rule
305+
// manager is created. If anything below fails, this user is never added to
306+
// r.userManagers, so the removal loop in SyncRuleGroups would never stop the
307+
// notifier, leaking it and its goroutines until the process exits (issue
308+
// #7595). Tear the partially-initialized state down on any early return; the
309+
// cleanup is disarmed once a manager is successfully returned. removeNotifier
310+
// takes r.notifiersMtx while this method runs under r.userManagerMtx; those two
311+
// locks are always acquired in that order (userManagerMtx, then notifiersMtx)
312+
// and never the reverse, so this cannot deadlock. removeNotifier is a no-op
313+
// when no notifier was registered.
314+
success := false
315+
defer func() {
316+
if !success {
317+
r.removeNotifier(userID)
318+
r.userManagerMetrics.RemoveUserRegistry(userID)
319+
}
320+
}()
321+
303322
notifier, err := r.getOrCreateNotifier(userID, reg)
304323
if err != nil {
305324
return nil, err
306325
}
307326

308-
return r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg)
327+
manager, err := r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg)
328+
if err != nil {
329+
return nil, err
330+
}
331+
332+
success = true
333+
return manager, nil
309334
}
310335

311336
func (r *DefaultMultiTenantManager) removeNotifier(userID string) {
@@ -374,6 +399,12 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
374399

375400
// This should never fail, unless there's a programming mistake.
376401
if err := n.applyConfig(r.notifierCfg); err != nil {
402+
// n.run() already started the notifier's discovery and notification
403+
// goroutines, but n has not been added to r.notifiers yet, so neither
404+
// removeNotifier nor Stop would ever stop it. Stop it directly to avoid
405+
// leaking those goroutines. We must not call removeNotifier here because
406+
// we already hold r.notifiersMtx, which it would try to re-acquire.
407+
n.stop()
377408
return nil, err
378409
}
379410

pkg/ruler/manager_test.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package ruler
22

33
import (
4+
"bytes"
45
"context"
6+
"errors"
7+
"runtime/pprof"
58
"sync"
69
"testing"
710
"time"
811

912
"github.com/go-kit/log"
1013
"github.com/prometheus/client_golang/prometheus"
14+
config_util "github.com/prometheus/common/config"
15+
promConfig "github.com/prometheus/prometheus/config"
1116
"github.com/prometheus/prometheus/model/labels"
1217
"github.com/prometheus/prometheus/model/rulefmt"
1318
"github.com/prometheus/prometheus/notifier"
@@ -304,6 +309,198 @@ func TestBackupRules(t *testing.T) {
304309
require.Equal(t, userRules[user2], m.GetBackupRules(user2))
305310
}
306311

312+
// TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError is a regression test for
313+
// https://github.com/cortexproject/cortex/issues/7595. When the manager factory
314+
// returns an error, newManager has already created and started the per-user
315+
// notifier (via getOrCreateNotifier -> n.run()) and registered it in r.notifiers.
316+
// Because the user is never added to r.userManagers, the removal loop in
317+
// SyncRuleGroups never stops that notifier, so it and its discovery/notification
318+
// goroutines used to leak until the process exited.
319+
func TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError(t *testing.T) {
320+
dir := t.TempDir()
321+
const user = "testUser"
322+
323+
factoryErr := errors.New("manager factory failed")
324+
failingFactory := func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ *client.Pool, _ prometheus.Registerer) (RulesManager, error) {
325+
return nil, factoryErr
326+
}
327+
328+
// Use a dedicated registry (not nil): a nil registry registers the notifier
329+
// service-discovery metrics on the global default registerer, which can
330+
// os.Exit(1) on duplicate registration when running alongside other tests.
331+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, failingFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
332+
require.NoError(t, err)
333+
t.Cleanup(m.Stop)
334+
335+
// Baseline notifier-run goroutines before the (failing) sync. Assert a delta
336+
// back to this baseline rather than an absolute zero so the check is robust to
337+
// any unrelated notifier goroutines from other tests sharing this process.
338+
before := countNotifierRunGoroutines()
339+
340+
userRules := map[string]rulespb.RuleGroupList{
341+
user: {
342+
&rulespb.RuleGroupDesc{
343+
Name: "group1",
344+
Namespace: "ns",
345+
Interval: 1 * time.Minute,
346+
User: user,
347+
},
348+
},
349+
}
350+
m.SyncRuleGroups(context.Background(), userRules)
351+
352+
// The factory failed, so the user must not be tracked as a live manager.
353+
require.Nil(t, getManager(m, user))
354+
355+
// The notifier must have been stopped and removed from the map.
356+
m.notifiersMtx.Lock()
357+
_, notifierExists := m.notifiers[user]
358+
m.notifiersMtx.Unlock()
359+
require.False(t, notifierExists, "notifier must be removed after a managerFactory error")
360+
361+
// The per-user metrics registry must have been removed too.
362+
require.False(t, hasUserManagerRegistry(t, m, user), "per-user metrics registry must be removed after a managerFactory error")
363+
364+
// Its goroutines (started by rulerNotifier.run) must not leak. removeNotifier
365+
// -> stop() -> wg.Wait() is synchronous, so they are gone by now; poll back to
366+
// the baseline to absorb any scheduling latency.
367+
test.Poll(t, 5*time.Second, before, func() interface{} {
368+
return countNotifierRunGoroutines()
369+
})
370+
}
371+
372+
// TestGetOrCreateNotifierStopsNotifierOnApplyConfigError is a regression test for
373+
// the secondary leak path in https://github.com/cortexproject/cortex/issues/7595:
374+
// getOrCreateNotifier starts the notifier with n.run() before calling
375+
// n.applyConfig. If applyConfig fails, the notifier was never inserted into
376+
// r.notifiers, so it must be stopped directly to avoid leaking its goroutines.
377+
func TestGetOrCreateNotifierStopsNotifierOnApplyConfigError(t *testing.T) {
378+
const user = "testUser"
379+
m, err := NewDefaultMultiTenantManager(Config{RulePath: t.TempDir()}, &ruleLimits{}, RuleManagerFactory(nil, nil), nil, prometheus.NewRegistry(), log.NewNopLogger())
380+
require.NoError(t, err)
381+
t.Cleanup(m.Stop)
382+
383+
// Force applyConfig to fail by pointing the Alertmanager TLS config at a CA
384+
// file that does not exist.
385+
m.notifierCfg = &promConfig.Config{
386+
AlertingConfig: promConfig.AlertingConfig{
387+
AlertmanagerConfigs: promConfig.AlertmanagerConfigs{
388+
{
389+
HTTPClientConfig: config_util.HTTPClientConfig{
390+
TLSConfig: config_util.TLSConfig{CAFile: "/does/not/exist"},
391+
},
392+
APIVersion: promConfig.AlertmanagerAPIVersionV2,
393+
},
394+
},
395+
},
396+
}
397+
398+
before := countNotifierRunGoroutines()
399+
_, err = m.getOrCreateNotifier(user, prometheus.NewRegistry())
400+
require.Error(t, err)
401+
402+
m.notifiersMtx.Lock()
403+
_, ok := m.notifiers[user]
404+
m.notifiersMtx.Unlock()
405+
require.False(t, ok, "notifier must not be registered when applyConfig fails")
406+
407+
test.Poll(t, 5*time.Second, before, func() interface{} {
408+
return countNotifierRunGoroutines()
409+
})
410+
}
411+
412+
// TestSyncRuleGroupsRecoversAfterManagerFactoryError verifies that a user whose
413+
// first manager creation failed — and whose notifier was therefore cleaned up by
414+
// the fix for https://github.com/cortexproject/cortex/issues/7595 — is created
415+
// normally on a later sync once the factory succeeds, i.e. the failure cleanup
416+
// does not leave the user in an unrecoverable state.
417+
func TestSyncRuleGroupsRecoversAfterManagerFactoryError(t *testing.T) {
418+
dir := t.TempDir()
419+
const user = "testUser"
420+
421+
fail := atomic.NewBool(true)
422+
base := RuleManagerFactory([][]*promRules.Group{{}, {}}, []time.Duration{time.Millisecond, time.Millisecond})
423+
factory := func(ctx context.Context, userID string, n *notifier.Manager, logger log.Logger, p *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
424+
if fail.Load() {
425+
return nil, errors.New("manager factory failed")
426+
}
427+
return base(ctx, userID, n, logger, p, reg)
428+
}
429+
430+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, factory, nil, prometheus.NewRegistry(), log.NewNopLogger())
431+
require.NoError(t, err)
432+
t.Cleanup(m.Stop)
433+
434+
before := countNotifierRunGoroutines()
435+
436+
userRules := map[string]rulespb.RuleGroupList{
437+
user: {
438+
&rulespb.RuleGroupDesc{Name: "group1", Namespace: "ns", Interval: 1 * time.Minute, User: user},
439+
},
440+
}
441+
442+
// First sync fails: no manager is tracked and the notifier must be cleaned up.
443+
m.SyncRuleGroups(context.Background(), userRules)
444+
require.Nil(t, getManager(m, user))
445+
m.notifiersMtx.Lock()
446+
_, notifierExists := m.notifiers[user]
447+
m.notifiersMtx.Unlock()
448+
require.False(t, notifierExists)
449+
test.Poll(t, 5*time.Second, before, func() interface{} {
450+
return countNotifierRunGoroutines()
451+
})
452+
453+
// Once the factory succeeds, the user is created normally and gets a notifier.
454+
fail.Store(false)
455+
m.SyncRuleGroups(context.Background(), userRules)
456+
require.NotNil(t, getManager(m, user))
457+
m.notifiersMtx.Lock()
458+
_, notifierExists = m.notifiers[user]
459+
m.notifiersMtx.Unlock()
460+
require.True(t, notifierExists, "notifier should be created when the user recovers")
461+
}
462+
463+
// countNotifierRunGoroutines returns the number of goroutines currently running
464+
// inside rulerNotifier.run (its discovery and notification loops). It is used to
465+
// assert that a notifier's goroutines have been stopped rather than leaked.
466+
func countNotifierRunGoroutines() int {
467+
var buf bytes.Buffer
468+
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
469+
count := 0
470+
for _, stack := range bytes.Split(buf.Bytes(), []byte("\n\n")) {
471+
// Matches both the run.funcN frames and the "created by ...run" line; each
472+
// stack block is counted at most once, so the count stays accurate even if
473+
// the closure frame is ever inlined away.
474+
if bytes.Contains(stack, []byte("github.com/cortexproject/cortex/pkg/ruler.(*rulerNotifier).run")) {
475+
count++
476+
}
477+
}
478+
return count
479+
}
480+
481+
// hasUserManagerRegistry reports whether a per-user metrics registry for the
482+
// given user is still registered with the manager's metrics aggregator. The
483+
// notifier registers prometheus_notifications_* metrics into that per-user
484+
// registry, so its presence is observable via the aggregated, user-labelled
485+
// output.
486+
func hasUserManagerRegistry(t *testing.T, m *DefaultMultiTenantManager, user string) bool {
487+
t.Helper()
488+
tmp := prometheus.NewRegistry()
489+
tmp.MustRegister(m.userManagerMetrics)
490+
mfs, err := tmp.Gather()
491+
require.NoError(t, err)
492+
for _, mf := range mfs {
493+
for _, metric := range mf.GetMetric() {
494+
for _, lp := range metric.GetLabel() {
495+
if lp.GetName() == "user" && lp.GetValue() == user {
496+
return true
497+
}
498+
}
499+
}
500+
}
501+
return false
502+
}
503+
307504
func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
308505
m.userManagerMtx.RLock()
309506
defer m.userManagerMtx.RUnlock()

0 commit comments

Comments
 (0)