Skip to content

Commit 0732fe3

Browse files
sandy2008claude
andauthored
fix(querier): evict per-tenant metadata fetchers for inactive tenants (#7573)
BucketScanBlocksFinder lazily creates one block.MetadataFetcher per tenant (each with its own Prometheus registry and on-disk meta cache) and caches it in d.fetchers. These entries were never removed, so for a process that has seen N tenants over its lifetime the fetcher map, the aggregated per-tenant Prometheus registries, and the on-disk meta caches all grew unbounded even after tenants were deleted from storage. After every scan, reconcile d.fetchers against the active-user set returned by ScanUsers: evict the fetcher, unregister its per-tenant registry via RemoveUserRegistry, and delete the fetcher's on-disk meta-syncer cache for tenants that are no longer active. The active set comes from a successful ScanUsers call, so reconciliation runs even when individual per-tenant scans fail, keeping the leak bounded under tenant churn; it is skipped only when the context is cancelled. Only the fetcher's own "meta-syncer" cache sub-directory is removed (not the whole CacheDir/<userID> tree) because in single-binary mode CacheDir is the store-gateway's SyncDir, whose block data lives under the same per-tenant directory and must not be deleted here. Fixes #7572 Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 97b14b1 commit 0732fe3

3 files changed

Lines changed: 267 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* [BUGFIX] Ring: Fix ring token conflict resolution only applied to updated instance and make constantly token conflict check during instance observe period.
5555
* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541
5656
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
57+
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
5758

5859
## 1.21.0 2026-04-24
5960

pkg/querier/blocks_finder_bucket_scan.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package querier
33
import (
44
"context"
55
"maps"
6+
"os"
67
"path"
78
"path/filepath"
89
"slices"
@@ -267,6 +268,17 @@ pushJobsLoop:
267268
}
268269
d.userMx.Unlock()
269270

271+
// Reconcile the cached metadata fetchers (and their per-tenant Prometheus registries and
272+
// on-disk meta caches) against the set of currently active tenants, so these resources stay
273+
// bounded as tenants are deleted from storage. userIDs comes from a successful ScanUsers call
274+
// (we return early above if it failed), so it is the authoritative active set regardless of
275+
// any per-tenant scan errors collected in resErrs; we therefore reconcile even on the
276+
// partial-error path, so the leak stays bounded under tenant churn. We only skip when the
277+
// context has been cancelled (i.e. the service is shutting down).
278+
if ctx.Err() == nil {
279+
d.evictInactiveUserFetchers(userIDs)
280+
}
281+
270282
return resErrs.Err()
271283
}
272284

@@ -417,6 +429,61 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
417429
return f, userBucket, deletionMarkFilter, nil
418430
}
419431

432+
// metaSyncerCacheDirName is the sub-directory, under the per-tenant cache directory, where
433+
// block.NewMetaFetcher stores its cached meta.json files (see createMetaFetcher).
434+
const metaSyncerCacheDirName = "meta-syncer"
435+
436+
// evictInactiveUserFetchers reconciles the per-tenant metadata fetchers against the set of
437+
// currently active tenants. For every tenant that is no longer active it removes the cached
438+
// fetcher, unregisters its per-tenant Prometheus registry, and deletes the fetcher's on-disk meta
439+
// cache. Without this, d.fetchers, d.fetchersMetrics and the on-disk cache would grow unbounded
440+
// for the lifetime of the process as tenants are deleted from storage.
441+
func (d *BucketScanBlocksFinder) evictInactiveUserFetchers(activeUserIDs []string) {
442+
active := make(map[string]struct{}, len(activeUserIDs))
443+
for _, userID := range activeUserIDs {
444+
active[userID] = struct{}{}
445+
}
446+
447+
// Evict the in-memory fetchers and their per-tenant Prometheus registries.
448+
var evicted []string
449+
d.fetchersMx.Lock()
450+
for userID := range d.fetchers {
451+
if _, ok := active[userID]; ok {
452+
continue
453+
}
454+
455+
d.fetchersMetrics.RemoveUserRegistry(userID)
456+
delete(d.fetchers, userID)
457+
evicted = append(evicted, userID)
458+
}
459+
d.fetchersMx.Unlock()
460+
461+
if len(evicted) == 0 {
462+
return
463+
}
464+
465+
level.Info(d.logger).Log("msg", "evicted metadata fetchers for inactive tenants", "count", len(evicted))
466+
467+
// Delete each evicted fetcher's on-disk meta cache, outside the lock to keep disk I/O off it.
468+
// We remove only the fetcher's own "meta-syncer" sub-directory, not the whole CacheDir/<userID>
469+
// tree: in single-binary mode CacheDir is the store-gateway's SyncDir, whose block data also
470+
// lives under CacheDir/<userID>/ and must not be deleted here. We key this off the fetchers
471+
// this process evicted rather than sweeping CacheDir, so we never reach into a co-located
472+
// store-gateway's cache; stale directories left by a previous process are reaped by the
473+
// store-gateway's own cleanup (single-binary) and are otherwise a negligible disk residual.
474+
for _, userID := range evicted {
475+
metaCacheDir := filepath.Join(d.cfg.CacheDir, userID, metaSyncerCacheDirName)
476+
if err := os.RemoveAll(metaCacheDir); err != nil {
477+
level.Warn(d.logger).Log("msg", "failed to delete cached metadata fetcher directory for inactive user", "user", userID, "dir", metaCacheDir, "err", err)
478+
continue
479+
}
480+
481+
// Best-effort removal of the now-empty per-tenant directory. os.Remove only succeeds on an
482+
// empty directory, so a co-located store-gateway's data under the same path is preserved.
483+
_ = os.Remove(filepath.Join(d.cfg.CacheDir, userID))
484+
}
485+
}
486+
420487
func (d *BucketScanBlocksFinder) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block {
421488
d.userMx.RLock()
422489
defer d.userMx.RUnlock()

pkg/querier/blocks_finder_bucket_scan_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"path/filepath"
78
"strings"
89
"testing"
910
"time"
@@ -416,6 +417,204 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t
416417
assert.Empty(t, deletionMarks)
417418
}
418419

420+
func TestBucketScanBlocksFinder_PeriodicScanEvictsDeletedUserFetcher(t *testing.T) {
421+
t.Parallel()
422+
ctx := context.Background()
423+
s, bucket, _, reg := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
424+
425+
cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
426+
cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
427+
428+
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
429+
430+
// The initial scan must have created and cached a per-tenant metadata fetcher, registered a
431+
// per-tenant Prometheus registry, and created an on-disk meta cache for the active tenant.
432+
s.fetchersMx.Lock()
433+
require.Equal(t, 1, len(s.fetchers))
434+
s.fetchersMx.Unlock()
435+
436+
userCacheDir := filepath.Join(s.cfg.CacheDir, "user-1")
437+
metaSyncerDir := filepath.Join(userCacheDir, metaSyncerCacheDirName)
438+
require.DirExists(t, metaSyncerDir)
439+
440+
// The per-tenant registry contributes to the aggregated cortex_blocks_meta_synced gauge.
441+
syncedBefore, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced")
442+
require.NoError(t, err)
443+
require.Greater(t, syncedBefore, 0)
444+
445+
// Delete the user from the bucket so it is no longer active.
446+
require.NoError(t, bucket.Delete(ctx, "user-1"))
447+
448+
// Trigger a periodic scan.
449+
require.NoError(t, s.scan(ctx))
450+
451+
// Once the user is no longer active, its cached fetcher, per-tenant Prometheus registry and
452+
// on-disk meta cache must all be released, otherwise they leak for the lifetime of the process.
453+
s.fetchersMx.Lock()
454+
assert.Equal(t, 0, len(s.fetchers))
455+
s.fetchersMx.Unlock()
456+
457+
// The fetcher's own meta-syncer cache must be removed; the now-empty parent dir is then
458+
// removed on a best-effort basis.
459+
assert.NoDirExists(t, metaSyncerDir)
460+
assert.NoDirExists(t, userCacheDir)
461+
462+
// The per-tenant registry was unregistered, so its gauge series no longer appear.
463+
syncedAfter, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced")
464+
require.NoError(t, err)
465+
assert.Equal(t, 0, syncedAfter)
466+
}
467+
468+
func TestBucketScanBlocksFinder_PeriodicScanEvictsOnlyInactiveUserFetchers(t *testing.T) {
469+
t.Parallel()
470+
ctx := context.Background()
471+
s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
472+
473+
cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
474+
cortex_testutil.MockStorageBlock(t, bucket, "user-2", 10, 20)
475+
cortex_testutil.MockStorageBlock(t, bucket, "user-3", 10, 20)
476+
477+
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
478+
479+
s.fetchersMx.Lock()
480+
require.Equal(t, 3, len(s.fetchers))
481+
s.fetchersMx.Unlock()
482+
483+
// Delete only user-2.
484+
require.NoError(t, bucket.Delete(ctx, "user-2"))
485+
require.NoError(t, s.scan(ctx))
486+
487+
// Only the inactive tenant's fetcher is evicted; the active tenants' fetchers are retained.
488+
s.fetchersMx.Lock()
489+
_, has1 := s.fetchers["user-1"]
490+
_, has2 := s.fetchers["user-2"]
491+
_, has3 := s.fetchers["user-3"]
492+
require.Equal(t, 2, len(s.fetchers))
493+
s.fetchersMx.Unlock()
494+
assert.True(t, has1)
495+
assert.False(t, has2)
496+
assert.True(t, has3)
497+
498+
// A returning tenant gets its fetcher re-created on the next scan.
499+
cortex_testutil.MockStorageBlock(t, bucket, "user-2", 20, 30)
500+
require.NoError(t, s.scan(ctx))
501+
502+
s.fetchersMx.Lock()
503+
_, has2 = s.fetchers["user-2"]
504+
require.Equal(t, 3, len(s.fetchers))
505+
s.fetchersMx.Unlock()
506+
assert.True(t, has2)
507+
}
508+
509+
// failUserBucket makes the per-tenant block listing fail for a single user, so that scanning that
510+
// user returns an error (resErrs != 0) while the top-level user listing used by ScanUsers (Iter
511+
// with an empty prefix) still succeeds.
512+
type failUserBucket struct {
513+
objstore.InstrumentedBucket
514+
failUser string
515+
}
516+
517+
func (b *failUserBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
518+
if strings.HasPrefix(dir, b.failUser) {
519+
return errors.New("injected listing failure")
520+
}
521+
return b.InstrumentedBucket.Iter(ctx, dir, f, options...)
522+
}
523+
524+
func (b *failUserBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
525+
if strings.HasPrefix(dir, b.failUser) {
526+
return errors.New("injected listing failure")
527+
}
528+
return b.InstrumentedBucket.IterWithAttributes(ctx, dir, f, options...)
529+
}
530+
531+
func (b *failUserBucket) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket {
532+
return b
533+
}
534+
535+
func (b *failUserBucket) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
536+
return b
537+
}
538+
539+
func TestBucketScanBlocksFinder_PeriodicScanEvictsInactiveUserDespiteOtherTenantScanError(t *testing.T) {
540+
t.Parallel()
541+
ctx := context.Background()
542+
543+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
544+
wrapped := &failUserBucket{InstrumentedBucket: bkt, failUser: "user-2"}
545+
546+
cfg := prepareBucketScanBlocksFinderConfig()
547+
cfg.CacheDir = t.TempDir()
548+
usersScanner, err := users.NewScanner(users.UsersScannerConfig{
549+
Strategy: users.UserScanStrategyList,
550+
MaxStalePeriod: time.Hour,
551+
CacheTTL: 0,
552+
}, wrapped, log.NewNopLogger(), nil)
553+
require.NoError(t, err)
554+
555+
s := NewBucketScanBlocksFinder(cfg, usersScanner, wrapped, nil, log.NewNopLogger(), nil)
556+
t.Cleanup(func() {
557+
s.StopAsync()
558+
require.NoError(t, s.AwaitTerminated(context.Background()))
559+
})
560+
561+
// Only user-1 is active at startup, so the initial scan succeeds and caches its fetcher.
562+
cortex_testutil.MockStorageBlock(t, bkt, "user-1", 10, 20)
563+
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
564+
565+
s.fetchersMx.Lock()
566+
require.Equal(t, 1, len(s.fetchers))
567+
s.fetchersMx.Unlock()
568+
569+
// Introduce an active-but-erroring tenant and delete user-1.
570+
cortex_testutil.MockStorageBlock(t, bkt, "user-2", 10, 20)
571+
require.NoError(t, bkt.Delete(ctx, "user-1"))
572+
573+
// This scan collects a per-tenant error for user-2 (resErrs != 0). The failing tenant is
574+
// retried with the finder's hardcoded backoff, so this scan takes a few seconds; that cost is
575+
// intrinsic and must not be "optimised" with a short-deadline context, which would cancel the
576+
// context and make scanBucket skip eviction entirely (eviction is gated on ctx.Err() == nil).
577+
require.Error(t, s.scanBucket(ctx))
578+
579+
// ...but eviction is decoupled from per-tenant scan errors, so the now-inactive user-1 is still
580+
// evicted while the erroring (but still active) user-2 is retained.
581+
s.fetchersMx.Lock()
582+
_, has1 := s.fetchers["user-1"]
583+
_, has2 := s.fetchers["user-2"]
584+
s.fetchersMx.Unlock()
585+
assert.False(t, has1)
586+
assert.True(t, has2)
587+
}
588+
589+
// TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction guards the
590+
// single-binary safety property: CacheDir is the store-gateway's SyncDir, whose block data lives
591+
// under the same per-tenant directory, so evicting a tenant must remove only the fetcher's own
592+
// meta-syncer cache and never the sibling block data.
593+
func TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction(t *testing.T) {
594+
t.Parallel()
595+
ctx := context.Background()
596+
s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())
597+
598+
cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
599+
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
600+
601+
metaSyncerDir := filepath.Join(s.cfg.CacheDir, "user-1", metaSyncerCacheDirName)
602+
require.DirExists(t, metaSyncerDir)
603+
604+
// Simulate a co-located store-gateway's block data under the same per-tenant directory.
605+
sgBlockFile := filepath.Join(s.cfg.CacheDir, "user-1", "01DTVP434PA9VFXSW2JKB3392D", "index")
606+
require.NoError(t, os.MkdirAll(filepath.Dir(sgBlockFile), 0o755))
607+
require.NoError(t, os.WriteFile(sgBlockFile, []byte("block-data"), 0o644))
608+
609+
require.NoError(t, bucket.Delete(ctx, "user-1"))
610+
require.NoError(t, s.scan(ctx))
611+
612+
// The fetcher's own meta-syncer cache is deleted...
613+
assert.NoDirExists(t, metaSyncerDir)
614+
// ...but the co-located store-gateway block data under the same per-tenant dir must survive.
615+
assert.FileExists(t, sgBlockFile)
616+
}
617+
419618
func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
420619
//parallel testing causes data race
421620
ctx := context.Background()

0 commit comments

Comments
 (0)