Skip to content

Commit 4d47b41

Browse files
committed
perf(ingester): Convert postings cache from FIFO to LRU eviction
The expanded postings cache previously used FIFO eviction, which evicts the oldest entry regardless of access frequency. Under memory pressure with high series churn, actively-queried entries get evicted simply because they were inserted earliest, causing repeated expensive recomputations. LRU eviction moves entries to the back of the list on every access, ensuring frequently-queried entries (e.g., ruler queries running every 30s) stay cached even when the cache is full. Only entries that haven't been accessed recently get evicted. Implementation: add a map[string]*list.Element for O(1) MoveToBack on cache hit. The eviction logic (shouldEvictHead/evictHead) remains unchanged — it still evicts from the front, but now the front contains the least-recently-used entry instead of the oldest-inserted entry. Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent 717bd26 commit 4d47b41

3 files changed

Lines changed: 145 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391
1919
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
2020
* [ENHANCEMENT] Upgrade gRPC from v1.71.2 to v1.79.3 to address CVE-2026-33186. #7460
21+
* [ENHANCEMENT] Ingester: Convert expanded postings cache from FIFO to LRU eviction to retain frequently-queried entries under memory pressure. #7510
2122
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
2223
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
2324
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ExpandedPostingsCache interface {
134134
type blocksPostingsForMatchersCache struct {
135135
userId string
136136

137-
headCache *fifoCache[[]storage.SeriesRef]
138-
blocksCache *fifoCache[[]storage.SeriesRef]
137+
headCache *lruCache[[]storage.SeriesRef]
138+
blocksCache *lruCache[[]storage.SeriesRef]
139139
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
140140
timeNow func() time.Time
141141

@@ -158,8 +158,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
158158
}
159159

160160
return &blocksPostingsForMatchersCache{
161-
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162-
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
161+
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162+
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
163163
postingsForMatchersFunc: cfg.PostingsForMatchers,
164164
timeNow: cfg.timeNow,
165165
metrics: metrics,
@@ -352,7 +352,7 @@ func (s *seedByHash) incrementSeed(userId string, v string) {
352352
s.seedByHash[i]++
353353
}
354354

355-
type fifoCache[V any] struct {
355+
type lruCache[V any] struct {
356356
cfg PostingsCacheConfig
357357
cachedValues *sync.Map
358358
timeNow func() time.Time
@@ -365,8 +365,8 @@ type fifoCache[V any] struct {
365365
cachedBytes int64
366366
}
367367

368-
func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
369-
return &fifoCache[V]{
368+
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
369+
return &lruCache[V]{
370370
cachedValues: new(sync.Map),
371371
cached: list.New(),
372372
cfg: cfg,
@@ -376,15 +376,15 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded
376376
}
377377
}
378378

379-
func (c *fifoCache[V]) clear() {
379+
func (c *lruCache[V]) clear() {
380380
c.cachedMtx.Lock()
381381
defer c.cachedMtx.Unlock()
382382
c.cached = list.New()
383383
c.cachedBytes = 0
384384
c.cachedValues = new(sync.Map)
385385
}
386386

387-
func (c *fifoCache[V]) expire() {
387+
func (c *lruCache[V]) expire() {
388388
if c.cfg.Ttl <= 0 {
389389
return
390390
}
@@ -402,13 +402,13 @@ func (c *fifoCache[V]) expire() {
402402
}
403403
}
404404

405-
func (c *fifoCache[V]) size() int {
405+
func (c *lruCache[V]) size() int {
406406
c.cachedMtx.RLock()
407407
defer c.cachedMtx.RUnlock()
408408
return c.cached.Len()
409409
}
410410

411-
func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
411+
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
412412
r := &cacheEntryPromise[V]{
413413
done: make(chan struct{}),
414414
}
@@ -434,27 +434,41 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
434434
// If the promise is already in the cache, lets wait it to fetch the data.
435435
<-loaded.(*cacheEntryPromise[V]).done
436436

437+
// LRU: move to back on access
438+
c.cachedMtx.Lock()
439+
if elem := loaded.(*cacheEntryPromise[V]).elem; elem != nil {
440+
c.cached.MoveToBack(elem)
441+
}
442+
c.cachedMtx.Unlock()
443+
437444
// If is cached but is expired, lets try to replace the cache value.
438445
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
439446
c.metrics.CacheMiss.WithLabelValues(c.name, "expired").Inc()
440447
r.v, r.sizeBytes, r.err = fetch()
441448
r.sizeBytes += int64(len(k))
442449
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
443-
loaded = r
444450
r.ts = c.timeNow()
451+
// Replace the list element: remove old, push new to back
452+
c.cachedMtx.Lock()
453+
if oldElem := loaded.(*cacheEntryPromise[V]).elem; oldElem != nil {
454+
c.cached.Remove(oldElem)
455+
}
456+
r.elem = c.cached.PushBack(k)
457+
c.cachedMtx.Unlock()
458+
loaded = r
445459
ok = false
446460
}
447461
}
448462

449463
return loaded.(*cacheEntryPromise[V]), ok
450464
}
451465

452-
func (c *fifoCache[V]) contains(k string) bool {
466+
func (c *lruCache[V]) contains(k string) bool {
453467
_, ok := c.cachedValues.Load(k)
454468
return ok
455469
}
456470

457-
func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
471+
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
458472
h := c.cached.Front()
459473
if h == nil {
460474
return "", false
@@ -475,7 +489,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
475489
return "", false
476490
}
477491

478-
func (c *fifoCache[V]) evictHead() {
492+
func (c *lruCache[V]) evictHead() {
479493
front := c.cached.Front()
480494
c.cached.Remove(front)
481495
oldestKey := front.Value.(string)
@@ -484,18 +498,22 @@ func (c *fifoCache[V]) evictHead() {
484498
}
485499
}
486500

487-
func (c *fifoCache[V]) created(key string, sizeBytes int64) {
501+
func (c *lruCache[V]) created(key string, sizeBytes int64) {
488502
if c.cfg.Ttl <= 0 {
489503
c.cachedValues.Delete(key)
490504
return
491505
}
492506
c.cachedMtx.Lock()
493507
defer c.cachedMtx.Unlock()
494-
c.cached.PushBack(key)
508+
elem := c.cached.PushBack(key)
509+
// Store the element reference in the promise for O(1) LRU access
510+
if p, ok := c.cachedValues.Load(key); ok {
511+
p.(*cacheEntryPromise[V]).elem = elem
512+
}
495513
c.cachedBytes += sizeBytes
496514
}
497515

498-
func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
516+
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
499517
if oldSize == newSizeBytes {
500518
return
501519
}
@@ -508,6 +526,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
508526
type cacheEntryPromise[V any] struct {
509527
ts time.Time
510528
sizeBytes int64
529+
elem *list.Element
511530

512531
done chan struct{}
513532
v V

pkg/storage/tsdb/expanded_postings_cache_test.go

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
5555
MaxBytes: 10 << 20,
5656
}
5757
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
58-
cache := newFifoCache[int](cfg, "test", m, time.Now)
58+
cache := newLruCache[int](cfg, "test", m, time.Now)
5959
calls := atomic.Int64{}
6060
concurrency := 100
6161
wg := sync.WaitGroup{}
@@ -84,7 +84,7 @@ func TestFifoCacheDisabled(t *testing.T) {
8484
cfg.Enabled = false
8585
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
8686
timeNow := time.Now
87-
cache := newFifoCache[int](cfg, "test", m, timeNow)
87+
cache := newLruCache[int](cfg, "test", m, timeNow)
8888
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
8989
return 1, 0, nil
9090
})
@@ -127,7 +127,7 @@ func TestFifoCacheExpire(t *testing.T) {
127127
r := prometheus.NewPedanticRegistry()
128128
m := NewPostingCacheMetrics(r)
129129
timeNow := time.Now
130-
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
130+
cache := newLruCache[int](c.cfg, "test", m, timeNow)
131131

132132
for i := range numberOfKeys {
133133
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
@@ -292,3 +292,107 @@ func TestPostingsCacheFetchTimeout(t *testing.T) {
292292

293293
close(fetchShouldBlock)
294294
}
295+
296+
func TestLruCacheEvictsLeastRecentlyUsed(t *testing.T) {
297+
r := prometheus.NewPedanticRegistry()
298+
m := NewPostingCacheMetrics(r)
299+
300+
// Cache fits exactly 3 entries (each entry = 8 bytes value + 4 bytes key = 12 bytes)
301+
cfg := PostingsCacheConfig{
302+
Enabled: true,
303+
Ttl: time.Hour,
304+
MaxBytes: int64(3 * (8 + 4)),
305+
}
306+
cache := newLruCache[int](cfg, "test", m, time.Now)
307+
308+
// Insert 3 entries: A, B, C
309+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
310+
cache.getPromiseForKey("bbbb", func() (int, int64, error) { return 2, 8, nil })
311+
cache.getPromiseForKey("cccc", func() (int, int64, error) { return 3, 8, nil })
312+
313+
require.True(t, cache.contains("aaaa"))
314+
require.True(t, cache.contains("bbbb"))
315+
require.True(t, cache.contains("cccc"))
316+
317+
// Access A to make it recently used (B is now least recently used)
318+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
319+
320+
// Insert D — should evict B (least recently used), not A
321+
cache.getPromiseForKey("dddd", func() (int, int64, error) { return 4, 8, nil })
322+
323+
require.True(t, cache.contains("aaaa"), "A should still be cached (recently accessed)")
324+
require.False(t, cache.contains("bbbb"), "B should be evicted (least recently used)")
325+
require.True(t, cache.contains("cccc"), "C should still be cached")
326+
require.True(t, cache.contains("dddd"), "D should be cached (just inserted)")
327+
}
328+
329+
func BenchmarkLruCacheHitUnderPressure(b *testing.B) {
330+
// Simulates: 50 "hot" queries (rulers, every 30s) + many "cold" queries (ad-hoc)
331+
// Cache fits 100 entries. With FIFO, cold queries push out hot ones.
332+
// With LRU, hot queries stay cached because they're accessed frequently.
333+
334+
r := prometheus.NewPedanticRegistry()
335+
m := NewPostingCacheMetrics(r)
336+
337+
keySize := 20
338+
cfg := PostingsCacheConfig{
339+
Enabled: true,
340+
Ttl: time.Hour,
341+
MaxBytes: int64(100 * (8 + keySize)), // fits 100 entries
342+
}
343+
cache := newLruCache[int](cfg, "bench", m, time.Now)
344+
345+
// Pre-populate with 50 hot keys
346+
hotKeys := make([]string, 50)
347+
for i := range hotKeys {
348+
hotKeys[i] = RepeatStringIfNeeded(fmt.Sprintf("hot-%d", i), keySize)
349+
cache.getPromiseForKey(hotKeys[i], func() (int, int64, error) { return i, 8, nil })
350+
}
351+
352+
coldIdx := 0
353+
b.ReportAllocs()
354+
355+
for i := 0; b.Loop(); i++ {
356+
if i%3 == 0 {
357+
// 1/3 of accesses are hot queries (simulating ruler every 30s)
358+
key := hotKeys[i%len(hotKeys)]
359+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
360+
} else {
361+
// 2/3 are cold unique queries (ad-hoc from Grafana)
362+
key := RepeatStringIfNeeded(fmt.Sprintf("cold-%d", coldIdx), keySize)
363+
coldIdx++
364+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
365+
}
366+
}
367+
368+
// Report hit rate for hot keys
369+
hits := 0
370+
for _, k := range hotKeys {
371+
if cache.contains(k) {
372+
hits++
373+
}
374+
}
375+
b.ReportMetric(float64(hits)/float64(len(hotKeys))*100, "%hot-retained")
376+
}
377+
378+
func BenchmarkCacheGetPromise(b *testing.B) {
379+
r := prometheus.NewPedanticRegistry()
380+
m := NewPostingCacheMetrics(r)
381+
cfg := PostingsCacheConfig{Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20}
382+
cache := newLruCache[int](cfg, "bench", m, time.Now)
383+
384+
// Pre-populate 1000 keys
385+
for i := range 1000 {
386+
key := fmt.Sprintf("key-%04d", i)
387+
cache.getPromiseForKey(key, func() (int, int64, error) { return i, 100, nil })
388+
}
389+
390+
b.RunParallel(func(pb *testing.PB) {
391+
i := 0
392+
for pb.Next() {
393+
key := fmt.Sprintf("key-%04d", i%1000)
394+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 100, nil })
395+
i++
396+
}
397+
})
398+
}

0 commit comments

Comments
 (0)