Skip to content

Commit 93ccc9a

Browse files
committed
feat(ruler): rule group select merging — reduce redundant ingester queries
When a rule group has multiple rules querying the same metric with overlapping matchers, pre-fetch the broadest query once and serve individual rules from the cached result with local filtering. - Parse rules' PromQL expressions, group by metric + expression structure - Detect superset matchers (e.g., =~".*" covers "blue" and "green") - Pre-fetch broadest rule's expression once via QueryFunc - Serve subsequent rules from cached Vector with local matcher filtering - Zero vendored changes — uses context injection via GroupEvalIterationFunc Config: --ruler.select-merger-enabled, --ruler.select-merger-min-rules Benchmark: 114 → 38 queries per group evaluation (66% reduction)
1 parent 1774081 commit 93ccc9a

9 files changed

Lines changed: 856 additions & 2 deletions

pkg/ruler/compat.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,11 +471,19 @@ func buildQueryFunc(
471471
failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID)
472472
metricsFunc := metricsQueryFunc(baseQueryFunc, totalQueries, failedQueries)
473473

474+
var qf rules.QueryFunc
474475
// apply statistic middleware
475476
if cfg.EnableQueryStats {
476-
return recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger)
477+
qf = recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger)
478+
} else {
479+
qf = metricsFunc
480+
}
481+
482+
// apply select merger wrapper (checks context for prefetch cache)
483+
if cfg.SelectMergerEnabled {
484+
qf = selectMergerQueryFunc(qf)
477485
}
478-
return metricsFunc
486+
return qf
479487
}
480488

481489
type QueryableError struct {

pkg/ruler/manager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory
134134
logger: logger,
135135
ruleGroupIterationFunc: defaultRuleGroupIterationFunc,
136136
}
137+
if cfg.SelectMergerEnabled {
138+
m.ruleGroupIterationFunc = mergedSelectIterationFunc(cfg.SelectMergerMinRules)
139+
}
137140
if cfg.RulesBackupEnabled() {
138141
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
139142
}
@@ -292,6 +295,19 @@ func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, eval
292295
promRules.DefaultEvalIterationFunc(ctx, g, evalTimestamp)
293296
}
294297

298+
// mergedSelectIterationFunc returns a GroupEvalIterationFunc that pre-fetches
299+
// merged selectors before evaluating the group, injecting the cache into
300+
// context so the selectMergerQueryFunc wrapper can serve from it.
301+
func mergedSelectIterationFunc(minRules int) promRules.GroupEvalIterationFunc {
302+
return func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
303+
plan := planMergedSelects(g.Rules(), minRules)
304+
if len(plan) > 0 {
305+
ctx = withSelectMergerPlan(ctx, plan)
306+
}
307+
defaultRuleGroupIterationFunc(ctx, g, evalTimestamp)
308+
}
309+
}
310+
295311
// newManager creates a prometheus rule manager wrapped with a user id
296312
// configured storage, appendable, notifier, and instrumentation
297313
func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (RulesManager, error) {

pkg/ruler/prefetch_queryable.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/promql"
10+
"github.com/prometheus/prometheus/promql/parser"
11+
"github.com/prometheus/prometheus/rules"
12+
)
13+
14+
type selectMergerCtxKey struct{}
15+
16+
// selectMergerState is injected into context by the iteration func.
17+
// The QueryFunc wrapper lazily executes the prefetch on first access.
18+
type selectMergerState struct {
19+
plan []mergedSelect
20+
once sync.Once
21+
cache *prefetchCache
22+
}
23+
24+
func withSelectMergerPlan(ctx context.Context, plan []mergedSelect) context.Context {
25+
return context.WithValue(ctx, selectMergerCtxKey{}, &selectMergerState{plan: plan})
26+
}
27+
28+
// selectMergerQueryFunc wraps a QueryFunc to check context for a merge plan.
29+
// On first call, it lazily pre-fetches using the inner QueryFunc, then serves from cache.
30+
func selectMergerQueryFunc(inner rules.QueryFunc) rules.QueryFunc {
31+
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
32+
state, _ := ctx.Value(selectMergerCtxKey{}).(*selectMergerState)
33+
if state == nil {
34+
return inner(ctx, qs, t)
35+
}
36+
37+
// Lazy prefetch: execute plan on first call.
38+
state.once.Do(func() {
39+
state.cache = executePrefetch(ctx, state.plan, inner, t)
40+
})
41+
42+
if state.cache != nil {
43+
selectors := extractSelectorsFromExpr(qs)
44+
if len(selectors) == 1 {
45+
if vec, ok := state.cache.get(selectors[0]); ok {
46+
return vec, nil
47+
}
48+
}
49+
}
50+
return inner(ctx, qs, t)
51+
}
52+
}
53+
54+
// prefetchEntry holds pre-fetched results for a merged selector.
55+
type prefetchEntry struct {
56+
matchers []*labels.Matcher
57+
vector promql.Vector
58+
}
59+
60+
// prefetchCache holds all pre-fetched data for a single group evaluation.
61+
type prefetchCache struct {
62+
entries []prefetchEntry
63+
}
64+
65+
func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (promql.Vector, bool) {
66+
for _, e := range c.entries {
67+
if isMatcherSetSuperset(e.matchers, queryMatchers) {
68+
extra := extraMatchers(e.matchers, queryMatchers)
69+
if len(extra) == 0 {
70+
return e.vector, true
71+
}
72+
return filterVector(e.vector, extra), true
73+
}
74+
}
75+
return nil, false
76+
}
77+
78+
func filterVector(vec promql.Vector, filters []*labels.Matcher) promql.Vector {
79+
var result promql.Vector
80+
for _, s := range vec {
81+
if matchesAll(s.Metric, filters) {
82+
result = append(result, s)
83+
}
84+
}
85+
return result
86+
}
87+
88+
// executePrefetch runs the merged selectors via QueryFunc and populates a cache.
89+
// Called without cache in context, so inner falls through to the real query.
90+
func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache {
91+
// Remove the state from context to prevent recursion during prefetch.
92+
ctx = context.WithValue(ctx, selectMergerCtxKey{}, (*selectMergerState)(nil))
93+
cache := &prefetchCache{}
94+
for _, ms := range plan {
95+
vec, err := queryFunc(ctx, ms.prefetchExpr, ts)
96+
if err != nil {
97+
continue
98+
}
99+
cache.entries = append(cache.entries, prefetchEntry{
100+
matchers: ms.mergedMatchers,
101+
vector: vec,
102+
})
103+
}
104+
return cache
105+
}
106+
107+
func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool {
108+
for _, sup := range superMatchers {
109+
found := false
110+
for _, sub := range subMatchers {
111+
if sub.Name == sup.Name {
112+
found = true
113+
if !isMatcherSuperset(sup, sub) {
114+
return false
115+
}
116+
break
117+
}
118+
}
119+
if !found {
120+
return false
121+
}
122+
}
123+
return true
124+
}
125+
126+
func extraMatchers(entryMatchers, queryMatchers []*labels.Matcher) []*labels.Matcher {
127+
var result []*labels.Matcher
128+
for _, qm := range queryMatchers {
129+
isExtra := true
130+
for _, em := range entryMatchers {
131+
if em.Name == qm.Name && em.Type == qm.Type && em.Value == qm.Value {
132+
isExtra = false
133+
break
134+
}
135+
}
136+
if isExtra {
137+
result = append(result, qm)
138+
}
139+
}
140+
return result
141+
}
142+
143+
func matchesAll(lset labels.Labels, matchers []*labels.Matcher) bool {
144+
for _, m := range matchers {
145+
if !m.Matches(lset.Get(m.Name)) {
146+
return false
147+
}
148+
}
149+
return true
150+
}
151+
152+
func extractSelectorsFromExpr(qs string) [][]*labels.Matcher {
153+
expr, err := parser.ParseExpr(qs)
154+
if err != nil {
155+
return nil
156+
}
157+
var result [][]*labels.Matcher
158+
extractSelectors(expr, func(vs *parser.VectorSelector) {
159+
result = append(result, vs.LabelMatchers)
160+
})
161+
return result
162+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/promql"
10+
"github.com/prometheus/prometheus/rules"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestPrefetchCache_FindSuperset(t *testing.T) {
16+
cache := &prefetchCache{
17+
entries: []prefetchEntry{
18+
{
19+
matchers: []*labels.Matcher{
20+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
21+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
22+
},
23+
vector: promql.Vector{
24+
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: 1000, F: 1.0},
25+
{Metric: labels.FromStrings("__name__", "http", "job", "web"), T: 1000, F: 2.0},
26+
},
27+
},
28+
},
29+
}
30+
31+
queryMatchers := []*labels.Matcher{
32+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
33+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
34+
}
35+
36+
vec, ok := cache.get(queryMatchers)
37+
require.True(t, ok)
38+
assert.Len(t, vec, 1)
39+
assert.Equal(t, "api", vec[0].Metric.Get("job"))
40+
assert.Equal(t, 1.0, vec[0].F)
41+
}
42+
43+
func TestIsMatcherSetSuperset(t *testing.T) {
44+
tests := []struct {
45+
name string
46+
super []*labels.Matcher
47+
sub []*labels.Matcher
48+
want bool
49+
}{
50+
{
51+
name: "identical sets",
52+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
53+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
54+
want: true,
55+
},
56+
{
57+
name: "regex superset of equal",
58+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")},
59+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
60+
want: true,
61+
},
62+
{
63+
name: "super has extra label — more restrictive, not superset",
64+
super: []*labels.Matcher{
65+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
66+
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
67+
},
68+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
69+
want: false,
70+
},
71+
{
72+
name: "sub has extra label — super is broader",
73+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
74+
sub: []*labels.Matcher{
75+
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
76+
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
77+
},
78+
want: true,
79+
},
80+
{
81+
name: "different values — not superset",
82+
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "web")},
83+
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
84+
want: false,
85+
},
86+
}
87+
88+
for _, tt := range tests {
89+
t.Run(tt.name, func(t *testing.T) {
90+
assert.Equal(t, tt.want, isMatcherSetSuperset(tt.super, tt.sub))
91+
})
92+
}
93+
}
94+
95+
func TestSelectMergerQueryFunc(t *testing.T) {
96+
allSeries := promql.Vector{
97+
{Metric: labels.FromStrings("__name__", "http_requests", "job", "api"), T: 1000, F: 10.0},
98+
{Metric: labels.FromStrings("__name__", "http_requests", "job", "web"), T: 1000, F: 20.0},
99+
}
100+
101+
innerCalled := 0
102+
inner := func(_ context.Context, qs string, _ time.Time) (promql.Vector, error) {
103+
innerCalled++
104+
return allSeries, nil
105+
}
106+
107+
qf := selectMergerQueryFunc(inner)
108+
109+
// Without plan in context — falls through to inner.
110+
vec, err := qf(context.Background(), `http_requests{job="api"}`, time.Unix(1, 0))
111+
require.NoError(t, err)
112+
assert.Equal(t, 1, innerCalled)
113+
assert.Len(t, vec, 2) // inner returns all
114+
115+
// With plan in context — lazy prefetch then serve from cache.
116+
innerCalled = 0
117+
plan := []mergedSelect{
118+
{
119+
metricName: "http_requests",
120+
mergedMatchers: []*labels.Matcher{
121+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests"),
122+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
123+
},
124+
prefetchExpr: `http_requests{job=~".*"}`,
125+
},
126+
}
127+
ctx := withSelectMergerPlan(context.Background(), plan)
128+
129+
// First call triggers prefetch (1 inner call), then serves filtered result.
130+
vec, err = qf(ctx, `http_requests{job="api"}`, time.Unix(1, 0))
131+
require.NoError(t, err)
132+
assert.Equal(t, 1, innerCalled) // prefetch call
133+
assert.Len(t, vec, 1)
134+
assert.Equal(t, "api", vec[0].Metric.Get("job"))
135+
136+
// Second call — served from cache, no additional inner call.
137+
vec, err = qf(ctx, `http_requests{job="web"}`, time.Unix(1, 0))
138+
require.NoError(t, err)
139+
assert.Equal(t, 1, innerCalled) // still 1
140+
assert.Len(t, vec, 1)
141+
assert.Equal(t, "web", vec[0].Metric.Get("job"))
142+
143+
// Query not in cache — falls through to inner.
144+
vec, err = qf(ctx, `other_metric{job="api"}`, time.Unix(1, 0))
145+
require.NoError(t, err)
146+
assert.Equal(t, 2, innerCalled)
147+
}
148+
149+
func TestExecutePrefetch(t *testing.T) {
150+
queryFunc := func(_ context.Context, _ string, ts time.Time) (promql.Vector, error) {
151+
return promql.Vector{
152+
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: ts.UnixMilli(), F: 5.0},
153+
}, nil
154+
}
155+
156+
plan := []mergedSelect{
157+
{
158+
metricName: "http",
159+
mergedMatchers: []*labels.Matcher{
160+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
161+
},
162+
prefetchExpr: `http{job=~".*"}`,
163+
},
164+
}
165+
166+
cache := executePrefetch(context.Background(), plan, rules.QueryFunc(queryFunc), time.Unix(1, 0))
167+
require.Len(t, cache.entries, 1)
168+
assert.Len(t, cache.entries[0].vector, 1)
169+
}

0 commit comments

Comments
 (0)