Skip to content

Commit ee9396f

Browse files
committed
feat(ingester): Add head-only queried series metrics
Add two new metrics that track query activity against the TSDB head only: 1. cortex_ingester_queried_series_from_head: Estimated unique series queried from head within a configurable time window (HLL-based). 2. cortex_ingester_queried_metric_series_in_head: Current head cardinality for each metric name that was queried within the configured window. Implementation uses a BlockChunkQuerierFunc wrapper that intercepts Select calls only for head queriers (identified by RangeHead/Head ULIDs). The wrapper collects series hashes for the HLL tracker and records metric names for the per-metric cardinality tracker. Key design decisions: - Reuses existing ActiveQueriedSeries HLL and ActiveQueriedSeriesService for the total series metric (no new service needed) - Per-metric-name tracking uses a simple map[string]time.Time with lazy count lookup via seriesInMetric.getSeriesCountForMetric() on collection - Only __name__= equality matchers trigger per-metric tracking - Sampling is supported for the HLL metric to reduce overhead - Both metrics share the same configurable time window New configuration flags (all experimental): - ingester.head-queried-series-metrics-enabled - ingester.head-queried-series-metrics-windows - ingester.head-queried-series-metrics-window-duration - ingester.head-queried-series-metrics-sample-rate Signed-off-by: Ben Ye <benye@amazon.com>
1 parent 8ec34d8 commit ee9396f

13 files changed

Lines changed: 629 additions & 21 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
66
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
77
* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
8+
* [FEATURE] Ingester: Add experimental head-only queried series metrics. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL, and `cortex_ingester_queried_metric_head_series` reports current head cardinality for recently queried metric names. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
89
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
910
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
1011
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385

docs/configuration/config-file-reference.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3785,6 +3785,24 @@ lifecycler:
37853785
# CLI flag: -ingester.active-queried-series-metrics-windows
37863786
[active_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]
37873787
3788+
# Experimental: Enable tracking of series queried from head only and expose them
3789+
# as metrics.
3790+
# CLI flag: -ingester.head-queried-series-metrics-enabled
3791+
[head_queried_series_metrics_enabled: <boolean> | default = false]
3792+
3793+
# Duration of each sub-window for head queried series tracking.
3794+
# CLI flag: -ingester.head-queried-series-metrics-window-duration
3795+
[head_queried_series_metrics_window_duration: <duration> | default = 15m]
3796+
3797+
# Sampling rate for head queried series tracking (1.0 = 100%%).
3798+
# CLI flag: -ingester.head-queried-series-metrics-sample-rate
3799+
[head_queried_series_metrics_sample_rate: <float> | default = 1]
3800+
3801+
# Time windows to expose head queried series metrics. Also controls how long
3802+
# per-metric-name cardinality is reported after last query.
3803+
# CLI flag: -ingester.head-queried-series-metrics-windows
3804+
[head_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]
3805+
37883806
# Enable uploading compacted blocks.
37893807
# CLI flag: -ingester.upload-compacted-blocks-enabled
37903808
[upload_compacted_blocks_enabled: <boolean> | default = true]

docs/configuration/v1-guarantees.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,10 @@ Currently experimental features are:
133133
- Ingester: Active Series Tracker
134134
- Per-tenant `active_series_trackers` configuration in runtime config overrides
135135
- Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric
136+
- Ingester: Head Queried Series Metrics
137+
- Enable on Ingester via `-ingester.head-queried-series-metrics-enabled=true`
138+
- Tracks unique series queried from head only (not blocks) using HLL
139+
- Tracks per-metric-name head cardinality for recently queried metrics
140+
- `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h)
141+
- `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size
142+
- `-ingester.head-queried-series-metrics-sample-rate` query sampling rate
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package ingester
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/oklog/ulid/v2"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/storage"
10+
"github.com/prometheus/prometheus/tsdb"
11+
)
12+
13+
var (
14+
rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD")
15+
headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
16+
)
17+
18+
// isHead returns true if the given BlockReader is a head block (in-order or OOO).
19+
func isHead(b tsdb.BlockReader) bool {
20+
id := b.Meta().ULID
21+
return id == rangeHeadULID || id == headULID
22+
}
23+
24+
// headQueriedSeriesChunkQuerier wraps a ChunkQuerier for the head block and
25+
// intercepts Select calls to collect series hashes (for HLL) and record
26+
// queried metric names.
27+
type headQueriedSeriesChunkQuerier struct {
28+
storage.ChunkQuerier
29+
headQueriedSeries *ActiveQueriedSeries
30+
activeQueriedSeriesService *ActiveQueriedSeriesService
31+
queriedMetricTrackers []*QueriedMetricTracker
32+
userID string
33+
sampled bool
34+
}
35+
36+
func (q *headQueriedSeriesChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
37+
ss := q.ChunkQuerier.Select(ctx, sortSeries, hints, matchers...)
38+
39+
// Record metric name for per-metric cardinality tracking (always, regardless of sampling).
40+
if len(q.queriedMetricTrackers) > 0 {
41+
for _, m := range matchers {
42+
if m.Name == labels.MetricName && m.Type == labels.MatchEqual {
43+
now := time.Now()
44+
for _, tracker := range q.queriedMetricTrackers {
45+
tracker.MarkQueried(m.Value, now)
46+
}
47+
break
48+
}
49+
}
50+
}
51+
52+
// Wrap series set for hash collection only if sampled.
53+
if !q.sampled {
54+
return ss
55+
}
56+
return &headQueriedSeriesSet{
57+
ChunkSeriesSet: ss,
58+
headQueriedSeries: q.headQueriedSeries,
59+
activeQueriedSeriesService: q.activeQueriedSeriesService,
60+
userID: q.userID,
61+
hashes: getQueriedSeriesHashesSlice(),
62+
}
63+
}
64+
65+
// headQueriedSeriesSet wraps a ChunkSeriesSet to collect series label hashes
66+
// during iteration and flush them to the HLL tracker when iteration completes.
67+
type headQueriedSeriesSet struct {
68+
storage.ChunkSeriesSet
69+
headQueriedSeries *ActiveQueriedSeries
70+
activeQueriedSeriesService *ActiveQueriedSeriesService
71+
userID string
72+
hashes []uint64
73+
}
74+
75+
func (s *headQueriedSeriesSet) Next() bool {
76+
if !s.ChunkSeriesSet.Next() {
77+
s.flush()
78+
return false
79+
}
80+
s.hashes = append(s.hashes, s.ChunkSeriesSet.At().Labels().Hash())
81+
return true
82+
}
83+
84+
func (s *headQueriedSeriesSet) At() storage.ChunkSeries {
85+
return s.ChunkSeriesSet.At()
86+
}
87+
88+
func (s *headQueriedSeriesSet) flush() {
89+
if len(s.hashes) > 0 && s.activeQueriedSeriesService != nil {
90+
s.activeQueriedSeriesService.UpdateSeriesBatch(s.headQueriedSeries, s.hashes, time.Now(), s.userID)
91+
} else if len(s.hashes) > 0 {
92+
// If no service, return the slice to the pool.
93+
putQueriedSeriesHashesSlice(s.hashes)
94+
}
95+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package ingester
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/oklog/ulid/v2"
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/storage"
12+
"github.com/prometheus/prometheus/tsdb"
13+
"github.com/prometheus/prometheus/tsdb/chunks"
14+
"github.com/prometheus/prometheus/tsdb/tombstones"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
func TestIsHead(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
ulid ulid.ULID
24+
expected bool
25+
}{
26+
{"rangeHead", rangeHeadULID, true},
27+
{"head", headULID, true},
28+
{"random block", ulid.MustNew(1, nil), false},
29+
}
30+
31+
for _, tc := range tests {
32+
t.Run(tc.name, func(t *testing.T) {
33+
b := &mockBlockReader{meta: tsdb.BlockMeta{ULID: tc.ulid}}
34+
assert.Equal(t, tc.expected, isHead(b))
35+
})
36+
}
37+
}
38+
39+
func TestHeadQueriedSeriesChunkQuerier_RecordsMetricName(t *testing.T) {
40+
tracker := NewQueriedMetricTracker(2 * time.Hour)
41+
hll := NewActiveQueriedSeries(
42+
[]time.Duration{2 * time.Hour},
43+
15*time.Minute,
44+
1.0,
45+
nil,
46+
)
47+
48+
wrapper := &headQueriedSeriesChunkQuerier{
49+
ChunkQuerier: &noopChunkQuerier{},
50+
headQueriedSeries: hll,
51+
activeQueriedSeriesService: nil,
52+
queriedMetricTrackers: []*QueriedMetricTracker{tracker},
53+
userID: "user-1",
54+
sampled: false,
55+
}
56+
57+
matchers := []*labels.Matcher{
58+
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "http_requests_total"),
59+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
60+
}
61+
62+
wrapper.Select(context.Background(), false, nil, matchers...)
63+
64+
now := time.Now()
65+
metrics := tracker.GetActiveMetrics(now)
66+
require.Len(t, metrics, 1)
67+
assert.Equal(t, "http_requests_total", metrics[0])
68+
}
69+
70+
func TestHeadQueriedSeriesChunkQuerier_SkipsRegexMetricName(t *testing.T) {
71+
tracker := NewQueriedMetricTracker(2 * time.Hour)
72+
73+
wrapper := &headQueriedSeriesChunkQuerier{
74+
ChunkQuerier: &noopChunkQuerier{},
75+
queriedMetricTrackers: []*QueriedMetricTracker{tracker},
76+
userID: "user-1",
77+
sampled: false,
78+
}
79+
80+
matchers := []*labels.Matcher{
81+
labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "http_.*"),
82+
}
83+
84+
wrapper.Select(context.Background(), false, nil, matchers...)
85+
86+
metrics := tracker.GetActiveMetrics(time.Now())
87+
assert.Empty(t, metrics)
88+
}
89+
90+
func TestHeadQueriedSeriesSet_CollectsHashes(t *testing.T) {
91+
lbls1 := labels.FromStrings("__name__", "metric_a", "job", "foo")
92+
lbls2 := labels.FromStrings("__name__", "metric_a", "job", "bar")
93+
94+
inner := &mockChunkSeriesSet{
95+
series: []storage.ChunkSeries{
96+
&mockChunkSeries{lbls: lbls1},
97+
&mockChunkSeries{lbls: lbls2},
98+
},
99+
}
100+
101+
hll := NewActiveQueriedSeries(
102+
[]time.Duration{2 * time.Hour},
103+
15*time.Minute,
104+
1.0,
105+
nil,
106+
)
107+
108+
// Use a real service to process the hashes.
109+
svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil)
110+
require.NoError(t, svc.StartAsync(context.Background()))
111+
defer svc.StopAsync()
112+
require.NoError(t, svc.AwaitRunning(context.Background()))
113+
114+
tracker := NewQueriedMetricTracker(2 * time.Hour)
115+
wrapper := &headQueriedSeriesChunkQuerier{
116+
ChunkQuerier: &mockChunkQuerierWithSeries{inner: inner},
117+
headQueriedSeries: hll,
118+
activeQueriedSeriesService: svc,
119+
queriedMetricTrackers: []*QueriedMetricTracker{tracker},
120+
userID: "user-1",
121+
sampled: true,
122+
}
123+
124+
matchers := []*labels.Matcher{
125+
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric_a"),
126+
}
127+
128+
ss := wrapper.Select(context.Background(), false, nil, matchers...)
129+
count := 0
130+
for ss.Next() {
131+
count++
132+
}
133+
assert.Equal(t, 2, count)
134+
135+
// Give the async worker time to process.
136+
time.Sleep(50 * time.Millisecond)
137+
138+
estimate, err := hll.GetSeriesQueried(time.Now(), 2*time.Hour)
139+
require.NoError(t, err)
140+
assert.Equal(t, uint64(2), estimate)
141+
142+
// Also verify metric name was recorded.
143+
metrics := tracker.GetActiveMetrics(time.Now())
144+
require.Len(t, metrics, 1)
145+
assert.Equal(t, "metric_a", metrics[0])
146+
}
147+
148+
// Mock implementations
149+
150+
type mockBlockReader struct {
151+
meta tsdb.BlockMeta
152+
}
153+
154+
func (m *mockBlockReader) Meta() tsdb.BlockMeta { return m.meta }
155+
func (m *mockBlockReader) Index() (tsdb.IndexReader, error) { return nil, nil }
156+
func (m *mockBlockReader) Chunks() (tsdb.ChunkReader, error) { return nil, nil }
157+
func (m *mockBlockReader) Tombstones() (tombstones.Reader, error) { return nil, nil }
158+
func (m *mockBlockReader) Size() int64 { return 0 }
159+
func (m *mockBlockReader) String() string { return "" }
160+
func (m *mockBlockReader) MinTime() int64 { return 0 }
161+
func (m *mockBlockReader) MaxTime() int64 { return 0 }
162+
163+
type noopChunkQuerier struct{}
164+
165+
func (q *noopChunkQuerier) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet {
166+
return storage.EmptyChunkSeriesSet()
167+
}
168+
func (q *noopChunkQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
169+
return nil, nil, nil
170+
}
171+
func (q *noopChunkQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
172+
return nil, nil, nil
173+
}
174+
func (q *noopChunkQuerier) Close() error { return nil }
175+
176+
type mockChunkQuerierWithSeries struct {
177+
inner *mockChunkSeriesSet
178+
}
179+
180+
func (q *mockChunkQuerierWithSeries) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet {
181+
return q.inner
182+
}
183+
func (q *mockChunkQuerierWithSeries) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
184+
return nil, nil, nil
185+
}
186+
func (q *mockChunkQuerierWithSeries) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
187+
return nil, nil, nil
188+
}
189+
func (q *mockChunkQuerierWithSeries) Close() error { return nil }
190+
191+
type mockChunkSeriesSet struct {
192+
series []storage.ChunkSeries
193+
idx int
194+
}
195+
196+
func (m *mockChunkSeriesSet) Next() bool {
197+
if m.idx >= len(m.series) {
198+
return false
199+
}
200+
m.idx++
201+
return true
202+
}
203+
func (m *mockChunkSeriesSet) At() storage.ChunkSeries { return m.series[m.idx-1] }
204+
func (m *mockChunkSeriesSet) Err() error { return nil }
205+
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }
206+
207+
type mockChunkSeries struct {
208+
lbls labels.Labels
209+
}
210+
211+
func (m *mockChunkSeries) Labels() labels.Labels { return m.lbls }
212+
func (m *mockChunkSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return nil }

0 commit comments

Comments
 (0)