diff --git a/tsdb/index.go b/tsdb/index.go index 3143939c466..588437c29e9 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -76,6 +76,10 @@ type Index interface { // Size of the index on disk, if applicable. DiskSizeBytes() int64 + // TagValueCacheBytes is the size of tag value cache for TSI indexes. + // This is only to be used with TSI. + TagValueCacheBytes() int64 + // Bytes estimates the memory footprint of this Index, in bytes. Bytes() int diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 0ab942815dc..996a1e76da8 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -1040,6 +1040,10 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat // DiskSizeBytes always returns zero bytes, since this is an in-memory index. func (i *Index) DiskSizeBytes() int64 { return 0 } +// TagValueCacheBytes always returns zero bytes, since the in-memory index +// does not use a tag value series ID cache. +func (i *Index) TagValueCacheBytes() int64 { return 0 } + // Rebuild recreates the measurement indexes to allow deleted series to be removed // and garbage collected. func (i *Index) Rebuild() { diff --git a/tsdb/index/tsi1/cache.go b/tsdb/index/tsi1/cache.go index cf427780afe..e5adc3226ae 100644 --- a/tsdb/index/tsi1/cache.go +++ b/tsdb/index/tsi1/cache.go @@ -3,6 +3,7 @@ package tsi1 import ( "container/list" "sync" + "unsafe" "github.com/influxdata/influxdb/tsdb" ) @@ -191,6 +192,29 @@ func (c *TagValueSeriesIDCache) checkEviction() { } } +// HeapSize estimates the total heap memory usage of the cache in bytes. +func (c *TagValueSeriesIDCache) HeapSize() int { + c.RLock() + defer c.RUnlock() + + size := int(unsafe.Sizeof(*c)) + for name, mmap := range c.cache { + size += len(name) + int(unsafe.Sizeof(mmap)) + for key, tkmap := range mmap { + size += len(key) + int(unsafe.Sizeof(tkmap)) + for value, ele := range tkmap { + size += len(value) + int(unsafe.Sizeof(ele)) + elem := ele.Value.(*seriesIDCacheElement) + size += len(elem.name) + len(elem.key) + len(elem.value) + if elem.SeriesIDSet != nil { + size += elem.SeriesIDSet.Bytes() + } + } + } + } + return size +} + // seriesIDCacheElement is an item stored within a cache. type seriesIDCacheElement struct { name string diff --git a/tsdb/index/tsi1/cache_test.go b/tsdb/index/tsi1/cache_test.go index df76f0b246e..470af42938d 100644 --- a/tsdb/index/tsi1/cache_test.go +++ b/tsdb/index/tsi1/cache_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" ) // This function is used to log the components of disk size when DiskSizeBytes fails @@ -150,6 +151,53 @@ func TestTagValueSeriesIDCache_eviction(t *testing.T) { cache.Has(t, "m3", "k0", "v0", m3k0v0) } +// TestTagValueSeriesIDCache_HeapSize verifies that HeapSize tracks memory growth +// as entries are added, shrinks after eviction, and handles nil SeriesIDSets. +func TestTagValueSeriesIDCache_HeapSize(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(10)} + + emptySize := cache.HeapSize() + require.Positive(t, emptySize) + + cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(1, 2, 3)) + sizeAfterOne := cache.HeapSize() + require.Greater(t, sizeAfterOne, emptySize) + + cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(100, 200, 300, 400, 500)) + sizeAfterTwo := cache.HeapSize() + require.Greater(t, sizeAfterTwo, sizeAfterOne) + + cache.PutByString("m1", "k0", "v0", tsdb.NewSeriesIDSet(1000)) + sizeAfterThree := cache.HeapSize() + require.Greater(t, sizeAfterThree, sizeAfterTwo) +} + +// TestTagValueSeriesIDCache_HeapSize_eviction verifies that evicting a large +// entry and replacing it with a small one reduces the reported HeapSize. +func TestTagValueSeriesIDCache_HeapSize_eviction(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(2)} + + large := make([]uint64, 1000) + for i := range large { + large[i] = uint64(i) + } + cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(large...)) + cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(large...)) + fullSize := cache.HeapSize() + + cache.PutByString("m0", "k0", "v2", tsdb.NewSeriesIDSet(1)) + require.Less(t, cache.HeapSize(), fullSize) +} + +// TestTagValueSeriesIDCache_HeapSize_nil_set verifies that a nil SeriesIDSet +// entry is counted without panicking. +func TestTagValueSeriesIDCache_HeapSize_nil_set(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(10)} + + cache.PutByString("m0", "k0", "v0", nil) + require.Positive(t, cache.HeapSize()) +} + func TestTagValueSeriesIDCache_addToSet(t *testing.T) { cache := TestCache{NewTagValueSeriesIDCache(4)} cache.PutByString("m0", "k0", "v0", nil) // Puts a nil set in the cache. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 6bfa12d09a8..d9d7afa2e97 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -159,6 +159,13 @@ type Index struct { // Number of partitions used by the index. PartitionN uint64 + + // Number of bytes cache is currently using. + // Updated periodically by CollectTagValueCacheMetrics goroutine. + cacheBytes int64 + + // Closed when the index is closing, to signal background goroutines to stop. + closing chan struct{} } func (i *Index) UniqueReferenceID() uintptr { @@ -180,6 +187,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) * sSketch: hll.NewDefaultPlus(), sTSketch: hll.NewDefaultPlus(), PartitionN: DefaultPartitionN, + closing: make(chan struct{}), } for _, option := range options { @@ -260,6 +268,9 @@ func (i *Index) Open() (rErr error) { return errors.New("index already open") } + // Re-initialize closing channel for reopen support. + i.closing = make(chan struct{}) + // Ensure root exists. if err := os.MkdirAll(i.path, 0777); err != nil { return err @@ -303,6 +314,10 @@ func (i *Index) Open() (rErr error) { // Mark opened. i.opened = true i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN)) + + // Start background goroutine to periodically collect cache metrics. + i.collectTagValueCacheMetrics() + return nil } @@ -350,6 +365,14 @@ func (i *Index) Close() error { // close closes the index without locking func (i *Index) close() (rErr error) { + // Signal background goroutines to stop. + select { + case <-i.closing: + // Already closed. + default: + close(i.closing) + } + for _, p := range i.partitions { if (p != nil) && p.IsOpen() { if pErr := p.Close(); pErr != nil { @@ -1062,6 +1085,34 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, return tsdb.MergeSeriesIDIterators(a...), nil } +// TagValueCacheBytes returns the most recently sampled heap size of the +// tag value series ID cache, in bytes. +func (i *Index) TagValueCacheBytes() int64 { + return atomic.LoadInt64(&i.cacheBytes) +} + +// collectTagValueCacheMetrics starts a background goroutine that periodically +// samples the tag value cache heap size. It exits when the index is closed. +func (i *Index) collectTagValueCacheMetrics() { + // take an initial sample + atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) + + const cacheTrigger = 10 * time.Second + closing := i.closing + go func(closing <-chan struct{}) { + ticker := time.NewTicker(cacheTrigger) + defer ticker.Stop() + for { + select { + case <-closing: + return + case <-ticker.C: + atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize())) + } + } + }(closing) +} + // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) { // Check series ID set cache... diff --git a/tsdb/store.go b/tsdb/store.go index e4b53b0fb17..804d49ecb66 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -45,11 +45,12 @@ var ( // Statistics gathered by the store. const ( - statDatabaseSeries = "numSeries" // number of series in a database - statDatabaseMeasurements = "numMeasurements" // number of measurements in a database - statPointsWritten = "pointsWritten" // number of points parsed by engines successfully - statValuesWritten = "valuesWritten" // number of values parsed by engines successfully - statSeriesCreated = "seriesCreated" // number of series created since startup + statDatabaseSeries = "numSeries" // number of series in a database + statDatabaseMeasurements = "numMeasurements" // number of measurements in a database + statTagValueCacheBytes = "tagValueCacheBytes" // bytes used by the tag value series ID cache + statPointsWritten = "pointsWritten" // number of points parsed by engines successfully + statValuesWritten = "valuesWritten" // number of values parsed by engines successfully + statSeriesCreated = "seriesCreated" // number of series created since startup ) // SeriesFileDirectory is the name of the directory containing series files for @@ -200,6 +201,22 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() shards := s.shardsSlice() s.mu.RUnlock() + + // Collect tag value cache bytes from unique indexes, grouped by database. + dbCacheBytes := make(map[string]int64) + seenIndexes := make(map[uintptr]bool) + for _, sh := range shards { + idx, err := sh.Index() + if err != nil || idx == nil { + continue + } + id := idx.UniqueReferenceID() + if !seenIndexes[id] { + seenIndexes[id] = true + dbCacheBytes[sh.Database()] += idx.TagValueCacheBytes() + } + } + // Add all the series and measurements cardinality estimations. databases := s.Databases() statistics := make([]models.Statistic, 0, len(databases)) @@ -223,6 +240,7 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { Values: map[string]interface{}{ statDatabaseSeries: sc, statDatabaseMeasurements: mc, + statTagValueCacheBytes: dbCacheBytes[database], }, }) }