Skip to content

Commit e67fdbc

Browse files
authored
feat(ingester): Add head-only queried series metrics (#7500)
Add two new metrics that track query activity against the TSDB head only: 1. cortex_ingester_queried_series_from_head: Estimated unique series queried from head within a configurable time window (HLL-based). 2. cortex_ingester_queried_metric_series_in_head: Current head cardinality for each metric name that was queried within the configured window. Implementation uses a BlockChunkQuerierFunc wrapper that intercepts Select calls only for head queriers (identified by RangeHead/Head ULIDs). The wrapper collects series hashes for the HLL tracker and records metric names for the per-metric cardinality tracker. Key design decisions: - Reuses existing ActiveQueriedSeries HLL and ActiveQueriedSeriesService for the total series metric (no new service needed) - Per-metric-name tracking uses a simple map[string]time.Time with lazy count lookup via seriesInMetric.getSeriesCountForMetric() on collection - Only __name__= equality matchers trigger per-metric tracking - Sampling is supported for the HLL metric to reduce overhead - Both metrics share the same configurable time window New configuration flags (all experimental): - ingester.head-queried-series-metrics-enabled - ingester.head-queried-series-metrics-windows - ingester.head-queried-series-metrics-window-duration - ingester.head-queried-series-metrics-sample-rate Signed-off-by: Ben Ye <benye@amazon.com>
1 parent e5eadb1 commit e67fdbc

10 files changed

Lines changed: 371 additions & 7 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
66
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
77
* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
8+
* [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
89
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302
910
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
1011
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385

docs/configuration/config-file-reference.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3804,6 +3804,24 @@ lifecycler:
38043804
# CLI flag: -ingester.active-queried-series-metrics-windows
38053805
[active_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]
38063806
3807+
# Experimental: Enable tracking of series queried from head only and expose them
3808+
# as metrics.
3809+
# CLI flag: -ingester.head-queried-series-metrics-enabled
3810+
[head_queried_series_metrics_enabled: <boolean> | default = false]
3811+
3812+
# Duration of each sub-window for head queried series tracking.
3813+
# CLI flag: -ingester.head-queried-series-metrics-window-duration
3814+
[head_queried_series_metrics_window_duration: <duration> | default = 15m]
3815+
3816+
# Sampling rate for head queried series tracking (1.0 = 100%%).
3817+
# CLI flag: -ingester.head-queried-series-metrics-sample-rate
3818+
[head_queried_series_metrics_sample_rate: <float> | default = 1]
3819+
3820+
# Time windows to expose head queried series metrics. Also controls how long
3821+
# per-metric-name cardinality is reported after last query.
3822+
# CLI flag: -ingester.head-queried-series-metrics-windows
3823+
[head_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]
3824+
38073825
# Enable uploading compacted blocks.
38083826
# CLI flag: -ingester.upload-compacted-blocks-enabled
38093827
[upload_compacted_blocks_enabled: <boolean> | default = true]

docs/configuration/v1-guarantees.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,9 @@ Currently experimental features are:
137137
- `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (int) CLI flag
138138
- `-blocks-storage.expanded_postings_cache.head.lazy-matcher-simple-cost-ratio` (int) CLI flag
139139
- `-blocks-storage.expanded_postings_cache.head.lazy-matcher-complex-cost-ratio` (int) CLI flag
140+
- Ingester: Head Queried Series Metrics
141+
- Enable on Ingester via `-ingester.head-queried-series-metrics-enabled=true`
142+
- Tracks unique series queried from head only (not blocks) using HLL
143+
- `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h)
144+
- `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size
145+
- `-ingester.head-queried-series-metrics-sample-rate` query sampling rate
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package ingester
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/oklog/ulid/v2"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/storage"
10+
"github.com/prometheus/prometheus/tsdb"
11+
)
12+
13+
var (
14+
rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD")
15+
headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
16+
)
17+
18+
// isHead returns true if the given BlockReader is a head block (in-order or OOO).
19+
func isHead(b tsdb.BlockReader) bool {
20+
id := b.Meta().ULID
21+
return id == rangeHeadULID || id == headULID
22+
}
23+
24+
// headQueriedSeriesChunkQuerier wraps a ChunkQuerier for the head block and
25+
// intercepts Select calls to collect series hashes for HLL tracking.
26+
type headQueriedSeriesChunkQuerier struct {
27+
storage.ChunkQuerier
28+
headQueriedSeries *ActiveQueriedSeries
29+
activeQueriedSeriesService *ActiveQueriedSeriesService
30+
userID string
31+
sampled bool
32+
}
33+
34+
func (q *headQueriedSeriesChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
35+
ss := q.ChunkQuerier.Select(ctx, sortSeries, hints, matchers...)
36+
37+
// Wrap series set for hash collection only if sampled.
38+
if !q.sampled {
39+
return ss
40+
}
41+
return &headQueriedSeriesSet{
42+
ChunkSeriesSet: ss,
43+
headQueriedSeries: q.headQueriedSeries,
44+
activeQueriedSeriesService: q.activeQueriedSeriesService,
45+
userID: q.userID,
46+
hashes: getQueriedSeriesHashesSlice(),
47+
}
48+
}
49+
50+
// headQueriedSeriesSet wraps a ChunkSeriesSet to collect series label hashes
51+
// during iteration and flush them to the HLL tracker when iteration completes.
52+
type headQueriedSeriesSet struct {
53+
storage.ChunkSeriesSet
54+
headQueriedSeries *ActiveQueriedSeries
55+
activeQueriedSeriesService *ActiveQueriedSeriesService
56+
userID string
57+
hashes []uint64
58+
}
59+
60+
func (s *headQueriedSeriesSet) Next() bool {
61+
if !s.ChunkSeriesSet.Next() {
62+
s.flush()
63+
return false
64+
}
65+
s.hashes = append(s.hashes, s.ChunkSeriesSet.At().Labels().Hash())
66+
return true
67+
}
68+
69+
func (s *headQueriedSeriesSet) At() storage.ChunkSeries {
70+
return s.ChunkSeriesSet.At()
71+
}
72+
73+
func (s *headQueriedSeriesSet) flush() {
74+
if len(s.hashes) > 0 && s.activeQueriedSeriesService != nil {
75+
s.activeQueriedSeriesService.UpdateSeriesBatch(s.headQueriedSeries, s.hashes, time.Now(), s.userID)
76+
} else if len(s.hashes) > 0 {
77+
// If no service, return the slice to the pool.
78+
putQueriedSeriesHashesSlice(s.hashes)
79+
}
80+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package ingester
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/oklog/ulid/v2"
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/storage"
12+
"github.com/prometheus/prometheus/tsdb"
13+
"github.com/prometheus/prometheus/tsdb/chunks"
14+
"github.com/prometheus/prometheus/tsdb/tombstones"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
func TestIsHead(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
ulid ulid.ULID
24+
expected bool
25+
}{
26+
{"rangeHead", rangeHeadULID, true},
27+
{"head", headULID, true},
28+
{"random block", ulid.MustNew(1, nil), false},
29+
}
30+
31+
for _, tc := range tests {
32+
t.Run(tc.name, func(t *testing.T) {
33+
b := &mockBlockReader{meta: tsdb.BlockMeta{ULID: tc.ulid}}
34+
assert.Equal(t, tc.expected, isHead(b))
35+
})
36+
}
37+
}
38+
39+
func TestHeadQueriedSeriesSet_CollectsHashes(t *testing.T) {
40+
lbls1 := labels.FromStrings("__name__", "metric_a", "job", "foo")
41+
lbls2 := labels.FromStrings("__name__", "metric_a", "job", "bar")
42+
43+
inner := &mockChunkSeriesSet{
44+
series: []storage.ChunkSeries{
45+
&mockChunkSeries{lbls: lbls1},
46+
&mockChunkSeries{lbls: lbls2},
47+
},
48+
}
49+
50+
hll := NewActiveQueriedSeries(
51+
[]time.Duration{2 * time.Hour},
52+
15*time.Minute,
53+
1.0,
54+
nil,
55+
)
56+
57+
// Use a real service to process the hashes.
58+
svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil)
59+
require.NoError(t, svc.StartAsync(context.Background()))
60+
defer svc.StopAsync()
61+
require.NoError(t, svc.AwaitRunning(context.Background()))
62+
63+
wrapper := &headQueriedSeriesChunkQuerier{
64+
ChunkQuerier: &mockChunkQuerierWithSeries{inner: inner},
65+
headQueriedSeries: hll,
66+
activeQueriedSeriesService: svc,
67+
userID: "user-1",
68+
sampled: true,
69+
}
70+
71+
matchers := []*labels.Matcher{
72+
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric_a"),
73+
}
74+
75+
ss := wrapper.Select(context.Background(), false, nil, matchers...)
76+
count := 0
77+
for ss.Next() {
78+
count++
79+
}
80+
assert.Equal(t, 2, count)
81+
82+
// Give the async worker time to process.
83+
time.Sleep(50 * time.Millisecond)
84+
85+
estimate, err := hll.GetSeriesQueried(time.Now(), 2*time.Hour)
86+
require.NoError(t, err)
87+
assert.Equal(t, uint64(2), estimate)
88+
}
89+
90+
// Mock implementations
91+
92+
type mockBlockReader struct {
93+
meta tsdb.BlockMeta
94+
}
95+
96+
func (m *mockBlockReader) Meta() tsdb.BlockMeta { return m.meta }
97+
func (m *mockBlockReader) Index() (tsdb.IndexReader, error) { return nil, nil }
98+
func (m *mockBlockReader) Chunks() (tsdb.ChunkReader, error) { return nil, nil }
99+
func (m *mockBlockReader) Tombstones() (tombstones.Reader, error) { return nil, nil }
100+
func (m *mockBlockReader) Size() int64 { return 0 }
101+
func (m *mockBlockReader) String() string { return "" }
102+
func (m *mockBlockReader) MinTime() int64 { return 0 }
103+
func (m *mockBlockReader) MaxTime() int64 { return 0 }
104+
105+
type mockChunkQuerierWithSeries struct {
106+
inner *mockChunkSeriesSet
107+
}
108+
109+
func (q *mockChunkQuerierWithSeries) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet {
110+
return q.inner
111+
}
112+
func (q *mockChunkQuerierWithSeries) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
113+
return nil, nil, nil
114+
}
115+
func (q *mockChunkQuerierWithSeries) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
116+
return nil, nil, nil
117+
}
118+
func (q *mockChunkQuerierWithSeries) Close() error { return nil }
119+
120+
type mockChunkSeriesSet struct {
121+
series []storage.ChunkSeries
122+
idx int
123+
}
124+
125+
func (m *mockChunkSeriesSet) Next() bool {
126+
if m.idx >= len(m.series) {
127+
return false
128+
}
129+
m.idx++
130+
return true
131+
}
132+
func (m *mockChunkSeriesSet) At() storage.ChunkSeries { return m.series[m.idx-1] }
133+
func (m *mockChunkSeriesSet) Err() error { return nil }
134+
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
135+
136+
type mockChunkSeries struct {
137+
lbls labels.Labels
138+
}
139+
140+
func (m *mockChunkSeries) Labels() labels.Labels { return m.lbls }
141+
func (m *mockChunkSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return nil }

0 commit comments

Comments
 (0)