From 8cff713b83b860ea50f4dcc13eba7be586cb9b5a Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 16:32:14 -0600 Subject: [PATCH 1/8] feat: Adds statistics to debug/vars for tag value cache size --- tsdb/index.go | 4 ++++ tsdb/index/inmem/inmem.go | 4 ++++ tsdb/index/tsi1/cache.go | 24 ++++++++++++++++++++ tsdb/index/tsi1/index.go | 47 +++++++++++++++++++++++++++++++++++++++ tsdb/store.go | 27 +++++++++++++++++----- 5 files changed, 101 insertions(+), 5 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index 3143939c466..588437c29e9 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -76,6 +76,10 @@ type Index interface { // Size of the index on disk, if applicable. DiskSizeBytes() int64 + // TagValueCacheBytes is the size of tag value cache for TSI indexes. + // This is only to be used with TSI. + TagValueCacheBytes() int64 + // Bytes estimates the memory footprint of this Index, in bytes. Bytes() int diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 0ab942815dc..996a1e76da8 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -1040,6 +1040,10 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat // DiskSizeBytes always returns zero bytes, since this is an in-memory index. func (i *Index) DiskSizeBytes() int64 { return 0 } +// TagValueCacheBytes always returns zero bytes, since the in-memory index +// does not use a tag value series ID cache. +func (i *Index) TagValueCacheBytes() int64 { return 0 } + // Rebuild recreates the measurement indexes to allow deleted series to be removed // and garbage collected. func (i *Index) Rebuild() { diff --git a/tsdb/index/tsi1/cache.go b/tsdb/index/tsi1/cache.go index cf427780afe..e5adc3226ae 100644 --- a/tsdb/index/tsi1/cache.go +++ b/tsdb/index/tsi1/cache.go @@ -3,6 +3,7 @@ package tsi1 import ( "container/list" "sync" + "unsafe" "github.com/influxdata/influxdb/tsdb" ) @@ -191,6 +192,29 @@ func (c *TagValueSeriesIDCache) checkEviction() { } } +// HeapSize estimates the total heap memory usage of the cache in bytes. +func (c *TagValueSeriesIDCache) HeapSize() int { + c.RLock() + defer c.RUnlock() + + size := int(unsafe.Sizeof(*c)) + for name, mmap := range c.cache { + size += len(name) + int(unsafe.Sizeof(mmap)) + for key, tkmap := range mmap { + size += len(key) + int(unsafe.Sizeof(tkmap)) + for value, ele := range tkmap { + size += len(value) + int(unsafe.Sizeof(ele)) + elem := ele.Value.(*seriesIDCacheElement) + size += len(elem.name) + len(elem.key) + len(elem.value) + if elem.SeriesIDSet != nil { + size += elem.SeriesIDSet.Bytes() + } + } + } + } + return size +} + // seriesIDCacheElement is an item stored within a cache. type seriesIDCacheElement struct { name string diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 6bfa12d09a8..c33bd729996 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -159,6 +159,13 @@ type Index struct { // Number of partitions used by the index. PartitionN uint64 + + // Number of bytes cache is currently using. + // Updated periodically by CollectTagValueCacheMetrics goroutine. + cacheBytes int64 + + // Closed when the index is closing, to signal background goroutines to stop. + closing chan struct{} } func (i *Index) UniqueReferenceID() uintptr { @@ -180,6 +187,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) * sSketch: hll.NewDefaultPlus(), sTSketch: hll.NewDefaultPlus(), PartitionN: DefaultPartitionN, + closing: make(chan struct{}), } for _, option := range options { @@ -260,6 +268,9 @@ func (i *Index) Open() (rErr error) { return errors.New("index already open") } + // Re-initialize closing channel for reopen support. + i.closing = make(chan struct{}) + // Ensure root exists. if err := os.MkdirAll(i.path, 0777); err != nil { return err @@ -303,6 +314,10 @@ func (i *Index) Open() (rErr error) { // Mark opened. i.opened = true i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN)) + + // Start background goroutine to periodically collect cache metrics. + i.collectTagValueCacheMetrics() + return nil } @@ -350,6 +365,14 @@ func (i *Index) Close() error { // close closes the index without locking func (i *Index) close() (rErr error) { + // Signal background goroutines to stop. + select { + case <-i.closing: + // Already closed. + default: + close(i.closing) + } + for _, p := range i.partitions { if (p != nil) && p.IsOpen() { if pErr := p.Close(); pErr != nil { @@ -1062,6 +1085,30 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, return tsdb.MergeSeriesIDIterators(a...), nil } +// TagValueCacheBytes returns the most recently sampled heap size of the +// tag value series ID cache, in bytes. +func (i *Index) TagValueCacheBytes() int64 { + return atomic.LoadInt64(&i.cacheBytes) +} + +// collectTagValueCacheMetrics starts a background goroutine that periodically +// samples the tag value cache heap size. It exits when the index is closed. +func (i *Index) collectTagValueCacheMetrics() { + const cacheTrigger = 10 * time.Minute + go func() { + ticker := time.NewTicker(cacheTrigger) + defer ticker.Stop() + for { + select { + case <-i.closing: + return + case <-ticker.C: + atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) + } + } + }() +} + // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { // Check series ID set cache... diff --git a/tsdb/store.go b/tsdb/store.go index e4b53b0fb17..fdf95d3e94a 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -45,11 +45,12 @@ var ( // Statistics gathered by the store. const ( - statDatabaseSeries = "numSeries" // number of series in a database - statDatabaseMeasurements = "numMeasurements" // number of measurements in a database - statPointsWritten = "pointsWritten" // number of points parsed by engines successfully - statValuesWritten = "valuesWritten" // number of values parsed by engines successfully - statSeriesCreated = "seriesCreated" // number of series created since startup + statDatabaseSeries = "numSeries" // number of series in a database + statDatabaseMeasurements = "numMeasurements" // number of measurements in a database + statTagValueCacheBytes = "tagValueCacheBytes" // bytes used by the tag value series ID cache + statPointsWritten = "pointsWritten" // number of points parsed by engines successfully + statValuesWritten = "valuesWritten" // number of values parsed by engines successfully + statSeriesCreated = "seriesCreated" // number of series created since startup ) // SeriesFileDirectory is the name of the directory containing series files for @@ -217,12 +218,28 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { continue } + // Collect tag value cache bytes from unique indexes for this database. + var tagValueCacheBytes int64 + s.mu.RLock() + uniqueIndexes := make(map[uintptr]Index) + for _, sh := range s.shards { + if sh.database == database { + idx := sh.index + uniqueIndexes[idx.UniqueReferenceID()] = idx + } + } + s.mu.RUnlock() + for _, idx := range uniqueIndexes { + tagValueCacheBytes += idx.TagValueCacheBytes() + } + statistics = append(statistics, models.Statistic{ Name: "database", Tags: models.StatisticTags{"database": database}.Merge(tags), Values: map[string]interface{}{ statDatabaseSeries: sc, statDatabaseMeasurements: mc, + statTagValueCacheBytes: tagValueCacheBytes, }, }) } From f110968e7df7640dbb74ca6484153b6a4db5284f Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 16:38:17 -0600 Subject: [PATCH 2/8] feat: Formatting --- tsdb/store.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tsdb/store.go b/tsdb/store.go index fdf95d3e94a..f37b21872f2 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -45,12 +45,12 @@ var ( // Statistics gathered by the store. const ( - statDatabaseSeries = "numSeries" // number of series in a database - statDatabaseMeasurements = "numMeasurements" // number of measurements in a database - statTagValueCacheBytes = "tagValueCacheBytes" // bytes used by the tag value series ID cache - statPointsWritten = "pointsWritten" // number of points parsed by engines successfully - statValuesWritten = "valuesWritten" // number of values parsed by engines successfully - statSeriesCreated = "seriesCreated" // number of series created since startup + statDatabaseSeries = "numSeries" // number of series in a database + statDatabaseMeasurements = "numMeasurements" // number of measurements in a database + statTagValueCacheBytes = "tagValueCacheBytes" // bytes used by the tag value series ID cache + statPointsWritten = "pointsWritten" // number of points parsed by engines successfully + statValuesWritten = "valuesWritten" // number of values parsed by engines successfully + statSeriesCreated = "seriesCreated" // number of series created since startup ) // SeriesFileDirectory is the name of the directory containing series files for From ccdbdf483d62311436361157a12d16db31b10204 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 17:10:03 -0600 Subject: [PATCH 3/8] feat: Adds tests for cache size --- tsdb/index/tsi1/cache_test.go | 48 +++++++++++++++++++++++++++++++++++ tsdb/index/tsi1/index.go | 5 +++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/tsdb/index/tsi1/cache_test.go b/tsdb/index/tsi1/cache_test.go index df76f0b246e..470af42938d 100644 --- a/tsdb/index/tsi1/cache_test.go +++ b/tsdb/index/tsi1/cache_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" ) // This function is used to log the components of disk size when DiskSizeBytes fails @@ -150,6 +151,53 @@ func TestTagValueSeriesIDCache_eviction(t *testing.T) { cache.Has(t, "m3", "k0", "v0", m3k0v0) } +// TestTagValueSeriesIDCache_HeapSize verifies that HeapSize tracks memory growth +// as entries are added, shrinks after eviction, and handles nil SeriesIDSets. +func TestTagValueSeriesIDCache_HeapSize(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(10)} + + emptySize := cache.HeapSize() + require.Positive(t, emptySize) + + cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(1, 2, 3)) + sizeAfterOne := cache.HeapSize() + require.Greater(t, sizeAfterOne, emptySize) + + cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(100, 200, 300, 400, 500)) + sizeAfterTwo := cache.HeapSize() + require.Greater(t, sizeAfterTwo, sizeAfterOne) + + cache.PutByString("m1", "k0", "v0", tsdb.NewSeriesIDSet(1000)) + sizeAfterThree := cache.HeapSize() + require.Greater(t, sizeAfterThree, sizeAfterTwo) +} + +// TestTagValueSeriesIDCache_HeapSize_eviction verifies that evicting a large +// entry and replacing it with a small one reduces the reported HeapSize. +func TestTagValueSeriesIDCache_HeapSize_eviction(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(2)} + + large := make([]uint64, 1000) + for i := range large { + large[i] = uint64(i) + } + cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(large...)) + cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(large...)) + fullSize := cache.HeapSize() + + cache.PutByString("m0", "k0", "v2", tsdb.NewSeriesIDSet(1)) + require.Less(t, cache.HeapSize(), fullSize) +} + +// TestTagValueSeriesIDCache_HeapSize_nil_set verifies that a nil SeriesIDSet +// entry is counted without panicking. +func TestTagValueSeriesIDCache_HeapSize_nil_set(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(10)} + + cache.PutByString("m0", "k0", "v0", nil) + require.Positive(t, cache.HeapSize()) +} + func TestTagValueSeriesIDCache_addToSet(t *testing.T) { cache := TestCache{NewTagValueSeriesIDCache(4)} cache.PutByString("m0", "k0", "v0", nil) // Puts a nil set in the cache. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index c33bd729996..c39b89f521d 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -1094,7 +1094,10 @@ func (i *Index) TagValueCacheBytes() int64 { // collectTagValueCacheMetrics starts a background goroutine that periodically // samples the tag value cache heap size. It exits when the index is closed. func (i *Index) collectTagValueCacheMetrics() { - const cacheTrigger = 10 * time.Minute + // take an initial sample + atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) + + const cacheTrigger = 10 * time.Second go func() { ticker := time.NewTicker(cacheTrigger) defer ticker.Stop() From 29e4264d41cff1363a1153d8036b481679dc863c Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 17:26:34 -0600 Subject: [PATCH 4/8] fix: fixes race condition between stats and closeNoLock --- tsdb/shard.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tsdb/shard.go b/tsdb/shard.go index 9736bdc4758..903c2cfba89 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -428,7 +428,9 @@ func (s *Shard) closeNoLock() error { } if e := s.index.Close(); e == nil { + s.mu.Lock() s.index = nil + s.mu.Unlock() } return err } From efff99c29f8501ec49013ca22d9cdfca8026ed86 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 17:42:03 -0600 Subject: [PATCH 5/8] fix: remove locking from index.Close() --- tsdb/shard.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tsdb/shard.go b/tsdb/shard.go index 903c2cfba89..9736bdc4758 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -428,9 +428,7 @@ func (s *Shard) closeNoLock() error { } if e := s.index.Close(); e == nil { - s.mu.Lock() s.index = nil - s.mu.Unlock() } return err } From f875ea3cf680031e7d6a61f151e4bd54ac47245b Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 17:47:27 -0600 Subject: [PATCH 6/8] feat: Rework tag value cache size collection per db --- tsdb/store.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/tsdb/store.go b/tsdb/store.go index f37b21872f2..a38b40c0446 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -201,6 +201,22 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() shards := s.shardsSlice() s.mu.RUnlock() + + // Collect tag value cache bytes from unique indexes, grouped by database. + dbCacheBytes := make(map[string]int64) + seenIndexes := make(map[uintptr]bool) + for _, sh := range shards { + idx, err := sh.Index() + if err != nil { + continue + } + id := idx.UniqueReferenceID() + if !seenIndexes[id] { + seenIndexes[id] = true + dbCacheBytes[sh.Database()] += idx.TagValueCacheBytes() + } + } + // Add all the series and measurements cardinality estimations. databases := s.Databases() statistics := make([]models.Statistic, 0, len(databases)) @@ -218,28 +234,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { continue } - // Collect tag value cache bytes from unique indexes for this database. - var tagValueCacheBytes int64 - s.mu.RLock() - uniqueIndexes := make(map[uintptr]Index) - for _, sh := range s.shards { - if sh.database == database { - idx := sh.index - uniqueIndexes[idx.UniqueReferenceID()] = idx - } - } - s.mu.RUnlock() - for _, idx := range uniqueIndexes { - tagValueCacheBytes += idx.TagValueCacheBytes() - } - statistics = append(statistics, models.Statistic{ Name: "database", Tags: models.StatisticTags{"database": database}.Merge(tags), Values: map[string]interface{}{ statDatabaseSeries: sc, statDatabaseMeasurements: mc, - statTagValueCacheBytes: tagValueCacheBytes, + statTagValueCacheBytes: dbCacheBytes[database], }, }) } From 6c1faf65942289952290e70fcb328a1133c07e7f Mon Sep 17 00:00:00 2001 From: WeblWabl Date: Fri, 27 Feb 2026 17:49:19 -0600 Subject: [PATCH 7/8] feat: use closing as param for goroutine Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tsdb/index/tsi1/index.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index c39b89f521d..d9d7afa2e97 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -1098,18 +1098,19 @@ func (i *Index) collectTagValueCacheMetrics() { atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) const cacheTrigger = 10 * time.Second - go func() { + closing := i.closing + go func(closing <-chan struct{}) { ticker := time.NewTicker(cacheTrigger) defer ticker.Stop() for { select { - case <-i.closing: + case <-closing: return case <-ticker.C: atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) } } - }() + }(closing) } // TagValueSeriesIDIterator returns a series iterator for a single tag value. From b98d18b794560804ade6b93be37726cacade49a5 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 27 Feb 2026 17:51:01 -0600 Subject: [PATCH 8/8] feat: check for nil index too --- tsdb/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/store.go b/tsdb/store.go index a38b40c0446..804d49ecb66 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -207,7 +207,7 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { seenIndexes := make(map[uintptr]bool) for _, sh := range shards { idx, err := sh.Index() - if err != nil { + if err != nil || idx == nil { continue } id := idx.UniqueReferenceID()