Skip to content

Commit 712b1db

Browse files
committed
perf(ingester): Convert postings cache from FIFO to LRU eviction
The expanded postings cache previously used FIFO eviction, which evicts the oldest entry regardless of access frequency. Under memory pressure with high series churn, actively-queried entries get evicted simply because they were inserted earliest, causing repeated expensive recomputations. LRU eviction moves entries to the back of the list on every access, ensuring frequently-queried entries (e.g., ruler queries running every 30s) stay cached even when the cache is full. Only entries that haven't been accessed recently get evicted. Implementation: add a map[string]*list.Element for O(1) MoveToBack on cache hit. The eviction logic (shouldEvictHead/evictHead) remains unchanged — it still evicts from the front, but now the front contains the least-recently-used entry instead of the oldest-inserted entry. Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent 717bd26 commit 712b1db

2 files changed

Lines changed: 138 additions & 20 deletions

File tree

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ type ExpandedPostingsCache interface {
134134
type blocksPostingsForMatchersCache struct {
135135
userId string
136136

137-
headCache *fifoCache[[]storage.SeriesRef]
138-
blocksCache *fifoCache[[]storage.SeriesRef]
137+
headCache *lruCache[[]storage.SeriesRef]
138+
blocksCache *lruCache[[]storage.SeriesRef]
139139
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
140140
timeNow func() time.Time
141141

@@ -158,8 +158,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
158158
}
159159

160160
return &blocksPostingsForMatchersCache{
161-
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162-
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
161+
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
162+
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
163163
postingsForMatchersFunc: cfg.PostingsForMatchers,
164164
timeNow: cfg.timeNow,
165165
metrics: metrics,
@@ -352,7 +352,7 @@ func (s *seedByHash) incrementSeed(userId string, v string) {
352352
s.seedByHash[i]++
353353
}
354354

355-
type fifoCache[V any] struct {
355+
type lruCache[V any] struct {
356356
cfg PostingsCacheConfig
357357
cachedValues *sync.Map
358358
timeNow func() time.Time
@@ -365,8 +365,8 @@ type fifoCache[V any] struct {
365365
cachedBytes int64
366366
}
367367

368-
func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
369-
return &fifoCache[V]{
368+
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
369+
return &lruCache[V]{
370370
cachedValues: new(sync.Map),
371371
cached: list.New(),
372372
cfg: cfg,
@@ -376,15 +376,15 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded
376376
}
377377
}
378378

379-
func (c *fifoCache[V]) clear() {
379+
func (c *lruCache[V]) clear() {
380380
c.cachedMtx.Lock()
381381
defer c.cachedMtx.Unlock()
382382
c.cached = list.New()
383383
c.cachedBytes = 0
384384
c.cachedValues = new(sync.Map)
385385
}
386386

387-
func (c *fifoCache[V]) expire() {
387+
func (c *lruCache[V]) expire() {
388388
if c.cfg.Ttl <= 0 {
389389
return
390390
}
@@ -402,13 +402,13 @@ func (c *fifoCache[V]) expire() {
402402
}
403403
}
404404

405-
func (c *fifoCache[V]) size() int {
405+
func (c *lruCache[V]) size() int {
406406
c.cachedMtx.RLock()
407407
defer c.cachedMtx.RUnlock()
408408
return c.cached.Len()
409409
}
410410

411-
func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
411+
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
412412
r := &cacheEntryPromise[V]{
413413
done: make(chan struct{}),
414414
}
@@ -434,6 +434,13 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
434434
// If the promise is already in the cache, lets wait it to fetch the data.
435435
<-loaded.(*cacheEntryPromise[V]).done
436436

437+
// LRU: move to back on access
438+
if elem := loaded.(*cacheEntryPromise[V]).elem; elem != nil {
439+
c.cachedMtx.Lock()
440+
c.cached.MoveToBack(elem)
441+
c.cachedMtx.Unlock()
442+
}
443+
437444
// If is cached but is expired, lets try to replace the cache value.
438445
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
439446
c.metrics.CacheMiss.WithLabelValues(c.name, "expired").Inc()
@@ -449,12 +456,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
449456
return loaded.(*cacheEntryPromise[V]), ok
450457
}
451458

452-
func (c *fifoCache[V]) contains(k string) bool {
459+
func (c *lruCache[V]) contains(k string) bool {
453460
_, ok := c.cachedValues.Load(k)
454461
return ok
455462
}
456463

457-
func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
464+
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
458465
h := c.cached.Front()
459466
if h == nil {
460467
return "", false
@@ -475,7 +482,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
475482
return "", false
476483
}
477484

478-
func (c *fifoCache[V]) evictHead() {
485+
func (c *lruCache[V]) evictHead() {
479486
front := c.cached.Front()
480487
c.cached.Remove(front)
481488
oldestKey := front.Value.(string)
@@ -484,18 +491,22 @@ func (c *fifoCache[V]) evictHead() {
484491
}
485492
}
486493

487-
func (c *fifoCache[V]) created(key string, sizeBytes int64) {
494+
func (c *lruCache[V]) created(key string, sizeBytes int64) {
488495
if c.cfg.Ttl <= 0 {
489496
c.cachedValues.Delete(key)
490497
return
491498
}
492499
c.cachedMtx.Lock()
493500
defer c.cachedMtx.Unlock()
494-
c.cached.PushBack(key)
501+
elem := c.cached.PushBack(key)
502+
// Store the element reference in the promise for O(1) LRU access
503+
if p, ok := c.cachedValues.Load(key); ok {
504+
p.(*cacheEntryPromise[V]).elem = elem
505+
}
495506
c.cachedBytes += sizeBytes
496507
}
497508

498-
func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
509+
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
499510
if oldSize == newSizeBytes {
500511
return
501512
}
@@ -508,6 +519,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
508519
type cacheEntryPromise[V any] struct {
509520
ts time.Time
510521
sizeBytes int64
522+
elem *list.Element
511523

512524
done chan struct{}
513525
v V

pkg/storage/tsdb/expanded_postings_cache_test.go

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
5555
MaxBytes: 10 << 20,
5656
}
5757
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
58-
cache := newFifoCache[int](cfg, "test", m, time.Now)
58+
cache := newLruCache[int](cfg, "test", m, time.Now)
5959
calls := atomic.Int64{}
6060
concurrency := 100
6161
wg := sync.WaitGroup{}
@@ -84,7 +84,7 @@ func TestFifoCacheDisabled(t *testing.T) {
8484
cfg.Enabled = false
8585
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
8686
timeNow := time.Now
87-
cache := newFifoCache[int](cfg, "test", m, timeNow)
87+
cache := newLruCache[int](cfg, "test", m, timeNow)
8888
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
8989
return 1, 0, nil
9090
})
@@ -127,7 +127,7 @@ func TestFifoCacheExpire(t *testing.T) {
127127
r := prometheus.NewPedanticRegistry()
128128
m := NewPostingCacheMetrics(r)
129129
timeNow := time.Now
130-
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
130+
cache := newLruCache[int](c.cfg, "test", m, timeNow)
131131

132132
for i := range numberOfKeys {
133133
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
@@ -292,3 +292,109 @@ func TestPostingsCacheFetchTimeout(t *testing.T) {
292292

293293
close(fetchShouldBlock)
294294
}
295+
296+
func TestLruCacheEvictsLeastRecentlyUsed(t *testing.T) {
297+
r := prometheus.NewPedanticRegistry()
298+
m := NewPostingCacheMetrics(r)
299+
300+
// Cache fits exactly 3 entries (each entry = 8 bytes value + 4 bytes key = 12 bytes)
301+
cfg := PostingsCacheConfig{
302+
Enabled: true,
303+
Ttl: time.Hour,
304+
MaxBytes: int64(3 * (8 + 4)),
305+
}
306+
cache := newLruCache[int](cfg, "test", m, time.Now)
307+
308+
// Insert 3 entries: A, B, C
309+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
310+
cache.getPromiseForKey("bbbb", func() (int, int64, error) { return 2, 8, nil })
311+
cache.getPromiseForKey("cccc", func() (int, int64, error) { return 3, 8, nil })
312+
313+
require.True(t, cache.contains("aaaa"))
314+
require.True(t, cache.contains("bbbb"))
315+
require.True(t, cache.contains("cccc"))
316+
317+
// Access A to make it recently used (B is now least recently used)
318+
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
319+
320+
// Insert D — should evict B (least recently used), not A
321+
cache.getPromiseForKey("dddd", func() (int, int64, error) { return 4, 8, nil })
322+
323+
require.True(t, cache.contains("aaaa"), "A should still be cached (recently accessed)")
324+
require.False(t, cache.contains("bbbb"), "B should be evicted (least recently used)")
325+
require.True(t, cache.contains("cccc"), "C should still be cached")
326+
require.True(t, cache.contains("dddd"), "D should be cached (just inserted)")
327+
}
328+
329+
func BenchmarkLruCacheHitUnderPressure(b *testing.B) {
330+
// Simulates: 50 "hot" queries (rulers, every 30s) + many "cold" queries (ad-hoc)
331+
// Cache fits 100 entries. With FIFO, cold queries push out hot ones.
332+
// With LRU, hot queries stay cached because they're accessed frequently.
333+
334+
r := prometheus.NewPedanticRegistry()
335+
m := NewPostingCacheMetrics(r)
336+
337+
keySize := 20
338+
cfg := PostingsCacheConfig{
339+
Enabled: true,
340+
Ttl: time.Hour,
341+
MaxBytes: int64(100 * (8 + keySize)), // fits 100 entries
342+
}
343+
cache := newLruCache[int](cfg, "bench", m, time.Now)
344+
345+
// Pre-populate with 50 hot keys
346+
hotKeys := make([]string, 50)
347+
for i := range hotKeys {
348+
hotKeys[i] = RepeatStringIfNeeded(fmt.Sprintf("hot-%d", i), keySize)
349+
cache.getPromiseForKey(hotKeys[i], func() (int, int64, error) { return i, 8, nil })
350+
}
351+
352+
coldIdx := 0
353+
b.ResetTimer()
354+
b.ReportAllocs()
355+
356+
for i := 0; i < b.N; i++ {
357+
if i%3 == 0 {
358+
// 1/3 of accesses are hot queries (simulating ruler every 30s)
359+
key := hotKeys[i%len(hotKeys)]
360+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
361+
} else {
362+
// 2/3 are cold unique queries (ad-hoc from Grafana)
363+
key := RepeatStringIfNeeded(fmt.Sprintf("cold-%d", coldIdx), keySize)
364+
coldIdx++
365+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
366+
}
367+
}
368+
369+
// Report hit rate for hot keys
370+
hits := 0
371+
for _, k := range hotKeys {
372+
if cache.contains(k) {
373+
hits++
374+
}
375+
}
376+
b.ReportMetric(float64(hits)/float64(len(hotKeys))*100, "%hot-retained")
377+
}
378+
379+
func BenchmarkCacheGetPromise(b *testing.B) {
380+
r := prometheus.NewPedanticRegistry()
381+
m := NewPostingCacheMetrics(r)
382+
cfg := PostingsCacheConfig{Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20}
383+
cache := newLruCache[int](cfg, "bench", m, time.Now)
384+
385+
// Pre-populate 1000 keys
386+
for i := 0; i < 1000; i++ {
387+
key := fmt.Sprintf("key-%04d", i)
388+
cache.getPromiseForKey(key, func() (int, int64, error) { return i, 100, nil })
389+
}
390+
391+
b.ResetTimer()
392+
b.RunParallel(func(pb *testing.PB) {
393+
i := 0
394+
for pb.Next() {
395+
key := fmt.Sprintf("key-%04d", i%1000)
396+
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 100, nil })
397+
i++
398+
}
399+
})
400+
}

0 commit comments

Comments
 (0)