Skip to content

Commit e5eadb1

Browse files
authored
include resolved tenant fingerprint in result cache keys to prevent stale data after tenant changes (#7562)
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent aa717c3 commit e5eadb1

8 files changed

Lines changed: 391 additions & 22 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
4444
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
4545
* [BUGFIX] Ingester: Close TSDB when compaction fails during `createTSDB`, preventing resource leaks (file descriptors, mmap handles) that could lead to ingester instability. #7560
46+
* [BUGFIX] Tenant Federation: Fix result cache returning stale data after a new tenant is added when `-tenant-federation.regex-matcher-enabled=true`. The resolved tenant set is now hashed and included in the cache key so that any change to the matched tenant list automatically invalidates cached entries. Non-regex users are unaffected. #7562
4647
* [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534
4748
* [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded. #7528
4849
* [BUGFIX] Ring: Fix ring token conflict resolution only applied to updated instance and make constantly token conflict check during instance observe period.

integration/querier_tenant_federation_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
package integration
44

55
import (
6+
"context"
67
"fmt"
8+
"math/rand"
9+
"path/filepath"
710
"strings"
811
"testing"
912
"time"
@@ -13,13 +16,139 @@ import (
1316
"github.com/prometheus/prometheus/prompb"
1417
"github.com/stretchr/testify/assert"
1518
"github.com/stretchr/testify/require"
19+
"github.com/thanos-io/thanos/pkg/block"
20+
"github.com/thanos-io/thanos/pkg/block/metadata"
1621

1722
"github.com/cortexproject/cortex/integration/e2e"
1823
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
1924
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2025
"github.com/cortexproject/cortex/integration/e2ecortex"
26+
"github.com/cortexproject/cortex/pkg/storage/bucket"
27+
"github.com/cortexproject/cortex/pkg/util/log"
2128
)
2229

30+
// extractTenantIDsFromMatrix returns the unique __tenant_id__ label values
31+
// present across all streams in a query range result matrix.
32+
func extractTenantIDsFromMatrix(m model.Matrix) []string {
33+
seen := map[string]struct{}{}
34+
for _, stream := range m {
35+
if tid, ok := stream.Metric[model.LabelName("__tenant_id__")]; ok {
36+
seen[string(tid)] = struct{}{}
37+
}
38+
}
39+
result := make([]string, 0, len(seen))
40+
for tid := range seen {
41+
result = append(result, tid)
42+
}
43+
return result
44+
}
45+
46+
// TestRegexResolver_ResultCacheStale verifies the behavior of
47+
// query result caching with dynamic tenant discovery.
48+
func TestRegexResolver_ResultCacheStale(t *testing.T) {
49+
ctx := context.Background()
50+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
51+
52+
s, err := e2e.NewScenario(networkName)
53+
require.NoError(t, err)
54+
defer s.Close()
55+
56+
memcached := e2ecache.NewMemcached()
57+
consul := e2edb.NewConsulWithName("consul")
58+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
59+
60+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
61+
// Enable result cache with memcached.
62+
"-querier.cache-results": "true",
63+
"-querier.split-queries-by-interval": "24h",
64+
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
65+
66+
// Regex tenant federation with fast discovery.
67+
"-tenant-federation.enabled": "true",
68+
"-tenant-federation.regex-matcher-enabled": "true",
69+
"-tenant-federation.user-sync-interval": "5s",
70+
71+
// Disable bucket index so the store-gateway picks up new blocks via
72+
// direct bucket scan
73+
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
74+
"-blocks-storage.bucket-store.sync-interval": "2s",
75+
})
76+
77+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
78+
require.NoError(t, s.StartAndWaitReady(minio))
79+
80+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
81+
require.NoError(t, err)
82+
83+
// entire range is cached on the first query
84+
blockStart := time.Now().Add(-20 * time.Minute)
85+
blockEnd := time.Now().Add(-10 * time.Minute)
86+
seriesLabels := []labels.Labels{labels.FromStrings(labels.MetricName, "series_1")}
87+
dir := t.TempDir()
88+
89+
// Upload blocks for user-0 and user-1.
90+
for _, tenantID := range []string{"user-0", "user-1"} {
91+
bkt := bucket.NewUserBucketClient(tenantID, storage.GetBucket(), nil)
92+
id, err := e2e.CreateBlock(ctx, rnd, dir, seriesLabels, 10,
93+
blockStart.UnixMilli(), blockEnd.UnixMilli(), 30_000, 1)
94+
require.NoError(t, err)
95+
require.NoError(t, block.Upload(ctx, log.Logger, bkt,
96+
filepath.Join(dir, id.String()), metadata.NoneFunc))
97+
}
98+
99+
// Start all services.
100+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
101+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
102+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
103+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
104+
require.NoError(t, s.StartAndWaitReady(ingester, distributor, storeGateway))
105+
require.NoError(t, s.Start(queryFrontend))
106+
107+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
108+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
109+
}), "")
110+
require.NoError(t, s.StartAndWaitReady(querier))
111+
require.NoError(t, s.WaitReady(queryFrontend))
112+
113+
// Wait for the store-gateway to load the 2 initial blocks.
114+
require.NoError(t, storeGateway.WaitSumMetricsWithOptions(
115+
e2e.Equals(2), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics))
116+
117+
// Wait for the regex resolver to discover user-0 and user-1.
118+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_regex_resolver_discovered_users"))
119+
120+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+")
121+
require.NoError(t, err)
122+
123+
result1, err := c.QueryRange("series_1", blockStart, blockEnd, 30*time.Second)
124+
require.NoError(t, err)
125+
require.Equal(t, model.ValMatrix, result1.Type())
126+
require.ElementsMatch(t, []string{"user-0", "user-1"},
127+
extractTenantIDsFromMatrix(result1.(model.Matrix)),
128+
"first query must return data for exactly the initial two tenants")
129+
130+
bkt2 := bucket.NewUserBucketClient("user-2", storage.GetBucket(), nil)
131+
id2, err := e2e.CreateBlock(ctx, rnd, dir, seriesLabels, 10,
132+
blockStart.UnixMilli(), blockEnd.UnixMilli(), 30_000, 1)
133+
require.NoError(t, err)
134+
require.NoError(t, block.Upload(ctx, log.Logger, bkt2,
135+
filepath.Join(dir, id2.String()), metadata.NoneFunc))
136+
137+
// Wait for user-2's block to be picked up by the store-gateway.
138+
require.NoError(t, storeGateway.WaitSumMetricsWithOptions(
139+
e2e.Equals(3), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics))
140+
141+
// Wait for the regex resolver to discover user-2.
142+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(3), "cortex_regex_resolver_discovered_users"))
143+
144+
result2, err := c.QueryRange("series_1", blockStart, blockEnd, 30*time.Second)
145+
require.NoError(t, err)
146+
require.Equal(t, model.ValMatrix, result2.Type())
147+
require.ElementsMatch(t, []string{"user-0", "user-1", "user-2"},
148+
extractTenantIDsFromMatrix(result2.(model.Matrix)),
149+
"second query must include newly discovered user-2")
150+
}
151+
23152
type querierTenantFederationConfig struct {
24153
querySchedulerEnabled bool
25154
shuffleShardingEnabled bool

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ type Cortex struct {
356356
StoreGateway *storegateway.StoreGateway
357357
MemberlistKV *memberlist.KVInitService
358358

359+
// RegexResolver is set when tenant-federation.regex-matcher-enabled=true.
360+
RegexResolver *tenantfederation.RegexResolver
361+
359362
// Queryables that the querier should use to query the long
360363
// term storage. It depends on the storage engine used.
361364
StoreQueryables []querier.QueryableWithFilter

pkg/cortex/modules.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ const (
9898
Purger string = "purger"
9999
QueryScheduler string = "query-scheduler"
100100
TenantFederation string = "tenant-federation"
101+
RegexResolverService string = "regex-resolver"
101102
ResourceMonitor string = "resource-monitor"
102103
All string = "all"
103104
)
@@ -300,6 +301,28 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
300301
return nil, nil
301302
}
302303

304+
func (t *Cortex) initRegexResolverService() (serv services.Service, err error) {
305+
if !t.Cfg.TenantFederation.Enabled || !t.Cfg.TenantFederation.RegexMatcherEnabled {
306+
return nil, nil
307+
}
308+
309+
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")
310+
311+
reg := prometheus.DefaultRegisterer
312+
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
313+
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, reg)
314+
}
315+
316+
regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, reg, bucketClientFactory, util_log.Logger)
317+
if err != nil {
318+
return nil, fmt.Errorf("failed to initialize regex resolver: %v", err)
319+
}
320+
users.WithDefaultResolver(regexResolver)
321+
t.RegexResolver = regexResolver
322+
323+
return regexResolver, nil
324+
}
325+
303326
// Enable merge querier if multi tenant query federation is enabled
304327
func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
305328
if t.Cfg.TenantFederation.Enabled {
@@ -312,24 +335,6 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
312335
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg))
313336
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation, reg)
314337
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg)
315-
316-
if t.Cfg.TenantFederation.RegexMatcherEnabled {
317-
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")
318-
319-
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
320-
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, reg)
321-
}
322-
323-
regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, reg, bucketClientFactory, util_log.Logger)
324-
if err != nil {
325-
return nil, fmt.Errorf("failed to initialize regex resolver: %v", err)
326-
}
327-
users.WithDefaultResolver(regexResolver)
328-
329-
return regexResolver, nil
330-
}
331-
332-
return nil, nil
333338
}
334339

335340
return nil, nil
@@ -555,6 +560,14 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
555560
users.WithDefaultResolver(tenantfederation.NewRegexValidator())
556561
}
557562

563+
// Build a lazy resolver function for the result cache.
564+
var tenantResolverFn func() users.Resolver
565+
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
566+
tenantResolverFn = func() users.Resolver {
567+
return t.RegexResolver
568+
}
569+
}
570+
558571
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
559572
t.Cfg.QueryRange,
560573
util_log.Logger,
@@ -568,6 +581,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
568581
t.Cfg.Querier.DefaultEvaluationInterval,
569582
t.Cfg.Querier.DistributedExecEnabled,
570583
t.Cfg.Querier.ThanosEngine.LogicalOptimizers,
584+
tenantResolverFn,
571585
)
572586
if err != nil {
573587
return nil, err
@@ -945,6 +959,7 @@ func (t *Cortex) setupModuleManager() error {
945959
mm.RegisterModule(Purger, nil)
946960
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
947961
mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule)
962+
mm.RegisterModule(RegexResolverService, t.initRegexResolverService, modules.UserInvisibleModule)
948963
mm.RegisterModule(All, nil)
949964

950965
// Add dependencies
@@ -964,7 +979,7 @@ func (t *Cortex) setupModuleManager() error {
964979
Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV, ResourceMonitor},
965980
Querier: {TenantFederation},
966981
StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV, GrpcClientService},
967-
QueryFrontendTripperware: {API, OverridesConfig},
982+
QueryFrontendTripperware: {API, OverridesConfig, RegexResolverService},
968983
QueryFrontend: {QueryFrontendTripperware},
969984
QueryScheduler: {API, OverridesConfig},
970985
Ruler: {DistributorService, OverridesConfig, StoreQueryable, RulerStorage},
@@ -976,7 +991,8 @@ func (t *Cortex) setupModuleManager() error {
976991
StoreGateway: {API, OverridesConfig, MemberlistKV, ResourceMonitor},
977992
TenantDeletion: {API, OverridesConfig},
978993
Purger: {TenantDeletion},
979-
TenantFederation: {Queryable},
994+
TenantFederation: {Queryable, RegexResolverService},
995+
RegexResolverService: {},
980996
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler, Compactor, AlertManager},
981997
}
982998
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cortexproject/cortex/pkg/querier"
3131
"github.com/cortexproject/cortex/pkg/querier/tripperware"
3232
"github.com/cortexproject/cortex/pkg/util/flagext"
33+
"github.com/cortexproject/cortex/pkg/util/users"
3334
)
3435

3536
const day = 24 * time.Hour
@@ -107,6 +108,7 @@ func Middlewares(
107108
defaultEvaluationInterval time.Duration,
108109
distributedExecEnabled bool,
109110
localOptimizers []logicalplan.Optimizer,
111+
tenantResolverFn func() users.Resolver,
110112
) ([]tripperware.Middleware, cache.Cache, error) {
111113
// Metric used to keep track of each middleware execution duration.
112114
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
@@ -132,7 +134,7 @@ func Middlewares(
132134
return false
133135
}
134136

135-
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, splitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer)
137+
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, splitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer, tenantResolverFn)
136138
if err != nil {
137139
return nil, nil, err
138140
}

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func TestRoundTrip(t *testing.T) {
7070
time.Minute,
7171
false,
7272
logicalplan.DefaultOptimizers,
73+
nil,
7374
)
7475
require.NoError(t, err)
7576

@@ -195,6 +196,7 @@ func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) {
195196
time.Minute,
196197
tc.distributedEnabled,
197198
logicalplan.DefaultOptimizers,
199+
nil,
198200
)
199201
require.NoError(t, err)
200202

pkg/querier/tripperware/queryrange/results_cache.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"hash/fnv"
78
"net/http"
89
"slices"
910
"sort"
@@ -179,6 +180,7 @@ type resultsCache struct {
179180
merger tripperware.Merger
180181
shouldCache ShouldCacheFn
181182
cacheQueryableSamplesStats bool
183+
tenantResolverFn func() users.Resolver
182184
}
183185

184186
// NewResultsCacheMiddleware creates results cache middleware from config.
@@ -196,6 +198,7 @@ func NewResultsCacheMiddleware(
196198
extractor Extractor,
197199
shouldCache ShouldCacheFn,
198200
reg prometheus.Registerer,
201+
tenantResolverFn func() users.Resolver,
199202
) (tripperware.Middleware, cache.Cache, error) {
200203
c, err := cache.New(cfg.CacheConfig, reg, logger)
201204
if err != nil {
@@ -219,6 +222,7 @@ func NewResultsCacheMiddleware(
219222
now: time.Now,
220223
shouldCache: shouldCache,
221224
cacheQueryableSamplesStats: cfg.CacheQueryableSamplesStats,
225+
tenantResolverFn: tenantResolverFn,
222226
}
223227
}), c, nil
224228
}
@@ -242,7 +246,21 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar
242246
return s.next.Do(ctx, r)
243247
}
244248

245-
key := s.splitter.GenerateCacheKey(ctx, users.JoinTenantIDs(tenantIDs), r)
249+
// Build the user ID portion of the cache key.
250+
// For regex-federation users the resolved tenant set is hashed and appended
251+
// so that adding or removing a tenant invalidates cached entries automatically.
252+
cacheUserID := users.JoinTenantIDs(tenantIDs)
253+
if s.tenantResolverFn != nil {
254+
if resolver := s.tenantResolverFn(); resolver != nil {
255+
if resolvedIDs, resolveErr := resolver.TenantIDs(ctx); resolveErr == nil && len(resolvedIDs) > 0 {
256+
h := fnv.New64a()
257+
_, _ = h.Write([]byte(strings.Join(resolvedIDs, "|")))
258+
cacheUserID = fmt.Sprintf("%s:h%x", cacheUserID, h.Sum64())
259+
}
260+
}
261+
}
262+
263+
key := s.splitter.GenerateCacheKey(ctx, cacheUserID, r)
246264

247265
var (
248266
extents []tripperware.Extent

0 commit comments

Comments
 (0)