Skip to content

Commit 5099e71

Browse files
authored
fix(compactor): Clean up stale bucket index metrics on ownership change (#7487)
When a tenant's compactor ownership changes due to ring rebalancing, the old compactor's cortex_bucket_index_last_successful_update_timestamp_seconds metric was not being cleaned up. This caused the metric to have duplicate series (one stale from the old owner, one fresh from the new owner), triggering false alarms on bucket index update rate. The fix ensures scanUsers() properly detects tenants that were previously owned but are no longer in the current owned set, and cleans up their metrics. Also extracts metric deletion into a reusable deleteUserMetrics() helper to reduce code duplication. Signed-off-by: Ben Ye <benye@amazon.com>
1 parent 2fa65c0 commit 5099e71

3 files changed

Lines changed: 100 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* [BUGFIX] gRPC: Fix panic when `grpc_compression` is set to `snappy` on ingester client or store-gateway client configurations. #7459
3333
* [BUGFIX] Config: Mask Swift, etcd, Redis, and HTTP basic-auth credentials on the `/config` endpoint. #7473
3434
* [BUGFIX] Memberlist: Drop incoming TCP transport packets when digest verification fails, preventing corrupted payloads from being forwarded. #7474
35+
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485
3536

3637
## 1.21.0 2026-04-24
3738

pkg/compactor/blocks_cleaner.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -412,32 +412,50 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
412412
markedForDeletion = append(markedForDeletion, deleted...)
413413
isMarkedForDeletion := util.StringsMap(markedForDeletion)
414414
allUsers := append(active, markedForDeletion...)
415+
currentOwnedUsers := util.StringsMap(allUsers)
416+
415417
// Delete per-tenant metrics for all tenants not belonging anymore to this shard.
416418
// Such tenants have been moved to a different shard, so their updated metrics will
417419
// be exported by the new shard.
418420
for _, userID := range c.lastOwnedUsers {
419421
if !isActive[userID] && !isMarkedForDeletion[userID] {
420-
c.tenantBlocks.DeleteLabelValues(userID)
421-
c.tenantParquetBlocks.DeleteLabelValues(userID)
422-
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
423-
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
424-
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
425-
c.tenantPartialBlocks.DeleteLabelValues(userID)
426-
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
427-
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
428-
c.remainingPlannedCompactions.DeleteLabelValues(userID)
429-
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
430-
c.inProgressCompactions.DeleteLabelValues(userID)
431-
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
432-
}
433-
}
422+
c.deleteUserMetrics(userID)
423+
}
424+
}
425+
426+
// Also reset metrics for any user that was previously tracked but is no longer
427+
// in the current owned set. This handles the case where a user's ownership changed
428+
// due to ring rebalancing but the user still exists in the bucket (so it appears
429+
// in the base scanner results but is filtered out by the shard check).
430+
for _, userID := range c.lastOwnedUsers {
431+
if _, stillOwned := currentOwnedUsers[userID]; !stillOwned {
432+
c.deleteUserMetrics(userID)
434433
}
435434
}
435+
436436
c.lastOwnedUsers = allUsers
437437

438438
return active, markedForDeletion, nil
439439
}
440440

441+
// deleteUserMetrics removes all per-tenant metrics for the given user.
442+
func (c *BlocksCleaner) deleteUserMetrics(userID string) {
443+
c.tenantBlocks.DeleteLabelValues(userID)
444+
c.tenantParquetBlocks.DeleteLabelValues(userID)
445+
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
446+
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
447+
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
448+
c.tenantPartialBlocks.DeleteLabelValues(userID)
449+
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
450+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
451+
c.remainingPlannedCompactions.DeleteLabelValues(userID)
452+
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
453+
c.inProgressCompactions.DeleteLabelValues(userID)
454+
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
455+
}
456+
}
457+
}
458+
441459
func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
442460
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
443461
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)
@@ -473,7 +491,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
473491
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
474492
return err
475493
}
476-
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
494+
c.deleteUserMetrics(userID)
477495

478496
var blocksToDelete []any
479497
err := userBucket.Iter(ctx, "", func(name string) error {
@@ -524,12 +542,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
524542
}
525543

526544
// Given all blocks have been deleted, we can also remove the metrics.
527-
c.tenantBlocks.DeleteLabelValues(userID)
528-
c.tenantParquetBlocks.DeleteLabelValues(userID)
529-
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
530-
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
531-
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
532-
c.tenantPartialBlocks.DeleteLabelValues(userID)
545+
c.deleteUserMetrics(userID)
533546

534547
if deletedBlocks.Load() > 0 {
535548
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks.Load())

pkg/compactor/blocks_cleaner_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,71 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
600600
))
601601
}
602602

603+
func TestBlocksCleaner_ShouldCleanupBucketIndexMetricOnOwnershipChange(t *testing.T) {
604+
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
605+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
606+
607+
// Create blocks for two users.
608+
createTSDBBlock(t, bucketClient, "user-1", 10, 20, nil)
609+
createTSDBBlock(t, bucketClient, "user-2", 30, 40, nil)
610+
611+
cfg := BlocksCleanerConfig{
612+
DeletionDelay: time.Hour,
613+
CleanupInterval: time.Minute,
614+
CleanupConcurrency: 1,
615+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
616+
}
617+
618+
ctx := context.Background()
619+
logger := log.NewNopLogger()
620+
reg := prometheus.NewRegistry()
621+
scanner, err := users.NewScanner(users.UsersScannerConfig{
622+
Strategy: users.UserScanStrategyList,
623+
}, bucketClient, logger, reg)
624+
require.NoError(t, err)
625+
cfgProvider := newMockConfigProvider()
626+
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
627+
Name: blocksMarkedForDeletionName,
628+
Help: blocksMarkedForDeletionHelp,
629+
}, append(commonLabels, reasonLabelName))
630+
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
631+
632+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
633+
634+
// First run: both users are owned by this compactor.
635+
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
636+
require.NoError(t, err)
637+
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
638+
require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers))
639+
640+
// Verify bucket index last update metric is set for both users.
641+
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1")))
642+
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-2")))
643+
644+
// Simulate ring rebalancing: user-2 ownership moves to a different compactor.
645+
// The ShardedScanner now only returns user-1.
646+
cleaner.usersScanner, err = users.NewScanner(users.UsersScannerConfig{
647+
Strategy: users.UserScanStrategyList,
648+
}, bucketClient, logger, reg)
649+
require.NoError(t, err)
650+
cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) {
651+
return userID == "user-1", nil
652+
}, logger)
653+
654+
// Second run: user-2 is no longer owned.
655+
activeUsers, deleteUsers, err = cleaner.scanUsers(ctx)
656+
require.NoError(t, err)
657+
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
658+
require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers))
659+
660+
// Verify: user-1 metric still exists and is non-zero, user-2 metric has been cleaned up.
661+
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1")))
662+
// user-2 metric should have been deleted. Calling WithLabelValues creates a new zero-value gauge,
663+
// so we verify by checking the total number of metrics in the GaugeVec.
664+
assert.Equal(t, 1, prom_testutil.CollectAndCount(cleaner.tenantBucketIndexLastUpdate),
665+
"expected only user-1 metric to remain after ownership change")
666+
}
667+
603668
func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
604669
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
605670
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

0 commit comments

Comments
 (0)