Skip to content

Commit 2a40ea0

Browse files
committed
feat(ruler): add prefetch queryable with filterSeriesSet
1 parent eeacbb5 commit 2a40ea0

2 files changed

Lines changed: 285 additions & 0 deletions

File tree

pkg/ruler/prefetch_queryable.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package ruler
2+
3+
import (
4+
"github.com/prometheus/prometheus/model/labels"
5+
"github.com/prometheus/prometheus/promql/parser"
6+
"github.com/prometheus/prometheus/storage"
7+
"github.com/prometheus/prometheus/util/annotations"
8+
)
9+
10+
// prefetchEntry holds pre-fetched series for a merged selector.
11+
type prefetchEntry struct {
12+
matchers []*labels.Matcher
13+
series []storage.Series
14+
}
15+
16+
// prefetchCache holds all pre-fetched data for a single group evaluation.
17+
type prefetchCache struct {
18+
entries []prefetchEntry
19+
}
20+
21+
// get checks if any pre-fetched entry is a superset of the requested matchers.
22+
func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (storage.SeriesSet, bool) {
23+
for _, e := range c.entries {
24+
if isMatcherSetSuperset(e.matchers, queryMatchers) {
25+
extra := extraMatchers(e.matchers, queryMatchers)
26+
if len(extra) == 0 {
27+
return newSliceSeriesSet(e.series), true
28+
}
29+
return newFilterSeriesSet(newSliceSeriesSet(e.series), extra), true
30+
}
31+
}
32+
return nil, false
33+
}
34+
35+
// isMatcherSetSuperset returns true if superMatchers covers all series that subMatchers would match.
36+
func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool {
37+
for _, sup := range superMatchers {
38+
found := false
39+
for _, sub := range subMatchers {
40+
if sub.Name == sup.Name {
41+
found = true
42+
if !isMatcherSuperset(sup, sub) {
43+
return false
44+
}
45+
break
46+
}
47+
}
48+
// super has a matcher on a label sub doesn't filter — super is more restrictive.
49+
if !found {
50+
return false
51+
}
52+
}
53+
return true
54+
}
55+
56+
// extraMatchers returns matchers from queryMatchers that need local filtering.
57+
func extraMatchers(entryMatchers, queryMatchers []*labels.Matcher) []*labels.Matcher {
58+
var result []*labels.Matcher
59+
for _, qm := range queryMatchers {
60+
isExtra := true
61+
for _, em := range entryMatchers {
62+
if em.Name == qm.Name && em.Type == qm.Type && em.Value == qm.Value {
63+
isExtra = false
64+
break
65+
}
66+
}
67+
if isExtra {
68+
result = append(result, qm)
69+
}
70+
}
71+
return result
72+
}
73+
74+
// filterSeriesSet wraps a SeriesSet and applies additional matchers locally.
75+
type filterSeriesSet struct {
76+
inner storage.SeriesSet
77+
filters []*labels.Matcher
78+
cur storage.Series
79+
}
80+
81+
func newFilterSeriesSet(inner storage.SeriesSet, filters []*labels.Matcher) storage.SeriesSet {
82+
return &filterSeriesSet{inner: inner, filters: filters}
83+
}
84+
85+
func (f *filterSeriesSet) Next() bool {
86+
for f.inner.Next() {
87+
s := f.inner.At()
88+
if matchesAll(s.Labels(), f.filters) {
89+
f.cur = s
90+
return true
91+
}
92+
}
93+
return false
94+
}
95+
96+
func (f *filterSeriesSet) At() storage.Series { return f.cur }
97+
func (f *filterSeriesSet) Err() error { return f.inner.Err() }
98+
func (f *filterSeriesSet) Warnings() annotations.Annotations { return nil }
99+
100+
func matchesAll(lset labels.Labels, matchers []*labels.Matcher) bool {
101+
for _, m := range matchers {
102+
if !m.Matches(lset.Get(m.Name)) {
103+
return false
104+
}
105+
}
106+
return true
107+
}
108+
109+
// sliceSeriesSet serves series from a pre-materialized slice.
110+
type sliceSeriesSet struct {
111+
series []storage.Series
112+
idx int
113+
}
114+
115+
func newSliceSeriesSet(series []storage.Series) storage.SeriesSet {
116+
return &sliceSeriesSet{series: series, idx: -1}
117+
}
118+
119+
func (s *sliceSeriesSet) Next() bool {
120+
s.idx++
121+
return s.idx < len(s.series)
122+
}
123+
124+
func (s *sliceSeriesSet) At() storage.Series { return s.series[s.idx] }
125+
func (s *sliceSeriesSet) Err() error { return nil }
126+
func (s *sliceSeriesSet) Warnings() annotations.Annotations { return nil }
127+
128+
// extractSelectorsFromExpr extracts VectorSelector matchers from a PromQL expression string.
129+
func extractSelectorsFromExpr(qs string) [][]*labels.Matcher {
130+
expr, err := parser.ParseExpr(qs)
131+
if err != nil {
132+
return nil
133+
}
134+
var result [][]*labels.Matcher
135+
extractSelectors(expr, func(vs *parser.VectorSelector) {
136+
result = append(result, vs.LabelMatchers)
137+
})
138+
return result
139+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package ruler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/prometheus/prometheus/model/labels"
7+
"github.com/prometheus/prometheus/storage"
8+
"github.com/prometheus/prometheus/tsdb/chunkenc"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
type mockSeries struct {
14+
lset labels.Labels
15+
}
16+
17+
func (s *mockSeries) Labels() labels.Labels { return s.lset }
18+
func (s *mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator { return chunkenc.NewNopIterator() }
19+
20+
func TestFilterSeriesSet(t *testing.T) {
21+
series := []storage.Series{
22+
&mockSeries{lset: labels.FromStrings("__name__", "http_requests", "job", "api", "env", "prod")},
23+
&mockSeries{lset: labels.FromStrings("__name__", "http_requests", "job", "web", "env", "prod")},
24+
&mockSeries{lset: labels.FromStrings("__name__", "http_requests", "job", "api", "env", "dev")},
25+
}
26+
27+
// Filter to only job="api"
28+
filters := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")}
29+
ss := newFilterSeriesSet(newSliceSeriesSet(series), filters)
30+
31+
var got []labels.Labels
32+
for ss.Next() {
33+
got = append(got, ss.At().Labels())
34+
}
35+
require.NoError(t, ss.Err())
36+
assert.Len(t, got, 2)
37+
assert.Equal(t, "api", got[0].Get("job"))
38+
assert.Equal(t, "api", got[1].Get("job"))
39+
}
40+
41+
func TestFilterSeriesSet_RegexFilter(t *testing.T) {
42+
series := []storage.Series{
43+
&mockSeries{lset: labels.FromStrings("__name__", "cpu", "host", "server-1")},
44+
&mockSeries{lset: labels.FromStrings("__name__", "cpu", "host", "server-2")},
45+
&mockSeries{lset: labels.FromStrings("__name__", "cpu", "host", "db-1")},
46+
}
47+
48+
filters := []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "host", "server-.*")}
49+
ss := newFilterSeriesSet(newSliceSeriesSet(series), filters)
50+
51+
var got []labels.Labels
52+
for ss.Next() {
53+
got = append(got, ss.At().Labels())
54+
}
55+
require.NoError(t, ss.Err())
56+
assert.Len(t, got, 2)
57+
assert.Equal(t, "server-1", got[0].Get("host"))
58+
assert.Equal(t, "server-2", got[1].Get("host"))
59+
}
60+
61+
func TestPrefetchCache_FindSuperset(t *testing.T) {
62+
// Cache has a broad entry: __name__="http", job=~".*"
63+
cache := &prefetchCache{
64+
entries: []prefetchEntry{
65+
{
66+
matchers: []*labels.Matcher{
67+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
68+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
69+
},
70+
series: []storage.Series{
71+
&mockSeries{lset: labels.FromStrings("__name__", "http", "job", "api")},
72+
&mockSeries{lset: labels.FromStrings("__name__", "http", "job", "web")},
73+
},
74+
},
75+
},
76+
}
77+
78+
// Query for __name__="http", job="api" — should be served from cache with filter
79+
queryMatchers := []*labels.Matcher{
80+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
81+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
82+
}
83+
84+
ss, ok := cache.get(queryMatchers)
85+
require.True(t, ok)
86+
87+
var got []labels.Labels
88+
for ss.Next() {
89+
got = append(got, ss.At().Labels())
90+
}
91+
require.NoError(t, ss.Err())
92+
assert.Len(t, got, 1)
93+
assert.Equal(t, "api", got[0].Get("job"))
94+
}
95+
96+
func TestIsMatcherSetSuperset(t *testing.T) {
97+
tests := []struct {
98+
name string
99+
super []*labels.Matcher
100+
sub []*labels.Matcher
101+
want bool
102+
}{
103+
{
104+
name: "identical sets",
105+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
106+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
107+
want: true,
108+
},
109+
{
110+
name: "regex superset of equal",
111+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")},
112+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
113+
want: true,
114+
},
115+
{
116+
name: "super has extra label — more restrictive, not superset",
117+
super: []*labels.Matcher{
118+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
119+
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
120+
},
121+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
122+
want: false,
123+
},
124+
{
125+
name: "sub has extra label — super is broader",
126+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
127+
sub: []*labels.Matcher{
128+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
129+
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
130+
},
131+
want: true,
132+
},
133+
{
134+
name: "different values — not superset",
135+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "web")},
136+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
137+
want: false,
138+
},
139+
}
140+
141+
for _, tt := range tests {
142+
t.Run(tt.name, func(t *testing.T) {
143+
assert.Equal(t, tt.want, isMatcherSetSuperset(tt.super, tt.sub))
144+
})
145+
}
146+
}

0 commit comments

Comments
 (0)