Skip to content

Commit a13e3d9

Browse files
authored
perf(ingester): Convert postings cache from FIFO to LRU eviction (#7510)
The expanded postings cache previously used FIFO eviction, which evicts the oldest entry regardless of access frequency. Under memory pressure with high series churn, actively-queried entries get evicted simply because they were inserted earliest, causing repeated expensive recomputations. LRU eviction moves entries to the back of the list on every access, ensuring frequently-queried entries (e.g., ruler queries running every 30s) stay cached even when the cache is full. Only entries that haven't been accessed recently get evicted. Implementation: add a map[string]*list.Element for O(1) MoveToBack on cache hit. The eviction logic (shouldEvictHead/evictHead) remains unchanged — it still evicts from the front, but now the front contains the least-recently-used entry instead of the oldest-inserted entry. Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent 24e8624 commit a13e3d9

3 files changed

Lines changed: 145 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [ENHANCEMENT] Query Frontend: Add `query_too_expensive` reason to QFE and `reason` field to query stats. #7479
2525
* [ENHANCEMENT] Distributor: Add HMAC-SHA256 stream authentication for `PushStream` via `-distributor.sign-write-requests-keys`. #7475
2626
* [ENHANCEMENT] Instrument Ingester CPU profile with source for read APIs. #7494
27+
* [ENHANCEMENT] Ingester: Convert expanded postings cache from FIFO to LRU eviction to retain frequently-queried entries under memory pressure. #7510
2728
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
2829
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
2930
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ExpandedPostingsCache interface {
134134
type blocksPostingsForMatchersCache struct {
135135
userId string
136136

137-
headCache *fifoCache[[]storage.SeriesRef]
138-
blocksCache *fifoCache[[]storage.SeriesRef]
137+
headCache *lruCache[[]storage.SeriesRef]
138+
blocksCache *lruCache[[]storage.SeriesRef]
139139
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
140140
timeNow func() time.Time
141141

@@ -158,8 +158,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
158158
}
159159

160160
return &blocksPostingsForMatchersCache{
161-
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162-
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
161+
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162+
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
163163
postingsForMatchersFunc: cfg.PostingsForMatchers,
164164
timeNow: cfg.timeNow,
165165
metrics: metrics,
@@ -352,7 +352,7 @@ func (s *seedByHash) incrementSeed(userId string, v string) {
352352
s.seedByHash[i]++
353353
}
354354

355-
type fifoCache[V any] struct {
355+
type lruCache[V any] struct {
356356
cfg PostingsCacheConfig
357357
cachedValues *sync.Map
358358
timeNow func() time.Time
@@ -365,8 +365,8 @@ type fifoCache[V any] struct {
365365
cachedBytes int64
366366
}
367367

368-
func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
369-
return &fifoCache[V]{
368+
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
369+
return &lruCache[V]{
370370
cachedValues: new(sync.Map),
371371
cached: list.New(),
372372
cfg: cfg,
@@ -376,15 +376,15 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded
376376
}
377377
}
378378

379-
func (c *fifoCache[V]) clear() {
379+
func (c *lruCache[V]) clear() {
380380
c.cachedMtx.Lock()
381381
defer c.cachedMtx.Unlock()
382382
c.cached = list.New()
383383
c.cachedBytes = 0
384384
c.cachedValues = new(sync.Map)
385385
}
386386

387-
func (c *fifoCache[V]) expire() {
387+
func (c *lruCache[V]) expire() {
388388
if c.cfg.Ttl <= 0 {
389389
return
390390
}
@@ -402,13 +402,13 @@ func (c *fifoCache[V]) expire() {
402402
}
403403
}
404404

405-
func (c *fifoCache[V]) size() int {
405+
func (c *lruCache[V]) size() int {
406406
c.cachedMtx.RLock()
407407
defer c.cachedMtx.RUnlock()
408408
return c.cached.Len()
409409
}
410410

411-
func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
411+
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
412412
r := &cacheEntryPromise[V]{
413413
done: make(chan struct{}),
414414
}
@@ -434,27 +434,41 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
434434
// If the promise is already in the cache, lets wait it to fetch the data.
435435
<-loaded.(*cacheEntryPromise[V]).done
436436

437+
// LRU: move to back on access
438+
c.cachedMtx.Lock()
439+
if elem := loaded.(*cacheEntryPromise[V]).elem; elem != nil {
440+
c.cached.MoveToBack(elem)
441+
}
442+
c.cachedMtx.Unlock()
443+
437444
// If is cached but is expired, lets try to replace the cache value.
438445
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
439446
c.metrics.CacheMiss.WithLabelValues(c.name, "expired").Inc()
440447
r.v, r.sizeBytes, r.err = fetch()
441448
r.sizeBytes += int64(len(k))
442449
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
443-
loaded = r
444450
r.ts = c.timeNow()
451+
// Replace the list element: remove old, push new to back
452+
c.cachedMtx.Lock()
453+
if oldElem := loaded.(*cacheEntryPromise[V]).elem; oldElem != nil {
454+
c.cached.Remove(oldElem)
455+
}
456+
r.elem = c.cached.PushBack(k)
457+
c.cachedMtx.Unlock()
458+
loaded = r
445459
ok = false
446460
}
447461
}
448462

449463
return loaded.(*cacheEntryPromise[V]), ok
450464
}
451465

452-
func (c *fifoCache[V]) contains(k string) bool {
466+
func (c *lruCache[V]) contains(k string) bool {
453467
_, ok := c.cachedValues.Load(k)
454468
return ok
455469
}
456470

457-
func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
471+
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
458472
h := c.cached.Front()
459473
if h == nil {
460474
return "", false
@@ -475,7 +489,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
475489
return "", false
476490
}
477491

478-
func (c *fifoCache[V]) evictHead() {
492+
func (c *lruCache[V]) evictHead() {
479493
front := c.cached.Front()
480494
c.cached.Remove(front)
481495
oldestKey := front.Value.(string)
@@ -484,18 +498,22 @@ func (c *fifoCache[V]) evictHead() {
484498
}
485499
}
486500

487-
func (c *fifoCache[V]) created(key string, sizeBytes int64) {
501+
func (c *lruCache[V]) created(key string, sizeBytes int64) {
488502
if c.cfg.Ttl <= 0 {
489503
c.cachedValues.Delete(key)
490504
return
491505
}
492506
c.cachedMtx.Lock()
493507
defer c.cachedMtx.Unlock()
494-
c.cached.PushBack(key)
508+
elem := c.cached.PushBack(key)
509+
// Store the element reference in the promise for O(1) LRU access
510+
if p, ok := c.cachedValues.Load(key); ok {
511+
p.(*cacheEntryPromise[V]).elem = elem
512+
}
495513
c.cachedBytes += sizeBytes
496514
}
497515

498-
func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
516+
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
499517
if oldSize == newSizeBytes {
500518
return
501519
}
@@ -508,6 +526,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
508526
type cacheEntryPromise[V any] struct {
509527
ts time.Time
510528
sizeBytes int64
529+
elem *list.Element
511530

512531
done chan struct{}
513532
v V

pkg/storage/tsdb/expanded_postings_cache_test.go

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
5555
MaxBytes: 10 << 20,
5656
}
5757
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
58-
cache := newFifoCache[int](cfg, "test", m, time.Now)
58+
cache := newLruCache[int](cfg, "test", m, time.Now)
5959
calls := atomic.Int64{}
6060
concurrency := 100
6161
wg := sync.WaitGroup{}
@@ -84,7 +84,7 @@ func TestFifoCacheDisabled(t *testing.T) {
8484
cfg.Enabled = false
8585
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
8686
timeNow := time.Now
87-
cache := newFifoCache[int](cfg, "test", m, timeNow)
87+
cache := newLruCache[int](cfg, "test", m, timeNow)
8888
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
8989
return 1, 0, nil
9090
})
@@ -127,7 +127,7 @@ func TestFifoCacheExpire(t *testing.T) {
127127
r := prometheus.NewPedanticRegistry()
128128
m := NewPostingCacheMetrics(r)
129129
timeNow := time.Now
130-
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
130+
cache := newLruCache[int](c.cfg, "test", m, timeNow)
131131

132132
for i := range numberOfKeys {
133133
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
@@ -292,3 +292,107 @@ func TestPostingsCacheFetchTimeout(t *testing.T) {
292292

293293
close(fetchShouldBlock)
294294
}
295+
296+
func TestLruCacheEvictsLeastRecentlyUsed(t *testing.T) {
297+
r := prometheus.NewPedanticRegistry()
298+
m := NewPostingCacheMetrics(r)
299+
300+
// Cache fits exactly 3 entries (each entry = 8 bytes value + 4 bytes key = 12 bytes)
301+
cfg := PostingsCacheConfig{
302+
Enabled: true,
303+
Ttl: time.Hour,
304+
MaxBytes: int64(3 * (8 + 4)),
305+
}
306+
cache := newLruCache[int](cfg, "test", m, time.Now)
307+
308+
// Insert 3 entries: A, B, C
309+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
310+
cache.getPromiseForKey("bbbb", func() (int, int64, error) { return 2, 8, nil })
311+
cache.getPromiseForKey("cccc", func() (int, int64, error) { return 3, 8, nil })
312+
313+
require.True(t, cache.contains("aaaa"))
314+
require.True(t, cache.contains("bbbb"))
315+
require.True(t, cache.contains("cccc"))
316+
317+
// Access A to make it recently used (B is now least recently used)
318+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
319+
320+
// Insert D — should evict B (least recently used), not A
321+
cache.getPromiseForKey("dddd", func() (int, int64, error) { return 4, 8, nil })
322+
323+
require.True(t, cache.contains("aaaa"), "A should still be cached (recently accessed)")
324+
require.False(t, cache.contains("bbbb"), "B should be evicted (least recently used)")
325+
require.True(t, cache.contains("cccc"), "C should still be cached")
326+
require.True(t, cache.contains("dddd"), "D should be cached (just inserted)")
327+
}
328+
329+
func BenchmarkLruCacheHitUnderPressure(b *testing.B) {
330+
// Simulates: 50 "hot" queries (rulers, every 30s) + many "cold" queries (ad-hoc)
331+
// Cache fits 100 entries. With FIFO, cold queries push out hot ones.
332+
// With LRU, hot queries stay cached because they're accessed frequently.
333+
334+
r := prometheus.NewPedanticRegistry()
335+
m := NewPostingCacheMetrics(r)
336+
337+
keySize := 20
338+
cfg := PostingsCacheConfig{
339+
Enabled: true,
340+
Ttl: time.Hour,
341+
MaxBytes: int64(100 * (8 + keySize)), // fits 100 entries
342+
}
343+
cache := newLruCache[int](cfg, "bench", m, time.Now)
344+
345+
// Pre-populate with 50 hot keys
346+
hotKeys := make([]string, 50)
347+
for i := range hotKeys {
348+
hotKeys[i] = RepeatStringIfNeeded(fmt.Sprintf("hot-%d", i), keySize)
349+
cache.getPromiseForKey(hotKeys[i], func() (int, int64, error) { return i, 8, nil })
350+
}
351+
352+
coldIdx := 0
353+
b.ReportAllocs()
354+
355+
for i := 0; b.Loop(); i++ {
356+
if i%3 == 0 {
357+
// 1/3 of accesses are hot queries (simulating ruler every 30s)
358+
key := hotKeys[i%len(hotKeys)]
359+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
360+
} else {
361+
// 2/3 are cold unique queries (ad-hoc from Grafana)
362+
key := RepeatStringIfNeeded(fmt.Sprintf("cold-%d", coldIdx), keySize)
363+
coldIdx++
364+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
365+
}
366+
}
367+
368+
// Report hit rate for hot keys
369+
hits := 0
370+
for _, k := range hotKeys {
371+
if cache.contains(k) {
372+
hits++
373+
}
374+
}
375+
b.ReportMetric(float64(hits)/float64(len(hotKeys))*100, "%hot-retained")
376+
}
377+
378+
func BenchmarkCacheGetPromise(b *testing.B) {
379+
r := prometheus.NewPedanticRegistry()
380+
m := NewPostingCacheMetrics(r)
381+
cfg := PostingsCacheConfig{Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20}
382+
cache := newLruCache[int](cfg, "bench", m, time.Now)
383+
384+
// Pre-populate 1000 keys
385+
for i := range 1000 {
386+
key := fmt.Sprintf("key-%04d", i)
387+
cache.getPromiseForKey(key, func() (int, int64, error) { return i, 100, nil })
388+
}
389+
390+
b.RunParallel(func(pb *testing.PB) {
391+
i := 0
392+
for pb.Next() {
393+
key := fmt.Sprintf("key-%04d", i%1000)
394+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 100, nil })
395+
i++
396+
}
397+
})
398+
}

0 commit comments

Comments
 (0)