Skip to content

Commit 1327f1f

Browse files
committed
Add select merger integration: cachedQueryFunc, executePrefetch, config wiring
- Add cachedQueryFunc that wraps QueryFunc to serve from pre-fetched cache - Add executePrefetch that evaluates merged selectors and populates cache - Add buildSelectorExpr helper to construct PromQL selector strings - Add filterVector to filter cached Vector by additional matchers - Add mergedSelectIterationFunc in manager.go (Plan → Pre-fetch → Evaluate) - Add prefetchExpr field to mergedSelect and track broadest rule expression - Add SelectMergerEnabled/SelectMergerMinRules config fields to ruler.go - Wire mergedSelectIterationFunc when config is enabled - Add vendored SetQueryFunc/Opts methods on prometheus rules.Group - Update prefetchCache to store promql.Vector instead of []storage.Series - Update tests to match new cache.get signature (returns Vector)
1 parent 2a40ea0 commit 1327f1f

6 files changed

Lines changed: 223 additions & 19 deletions

File tree

pkg/ruler/manager.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory
134134
logger: logger,
135135
ruleGroupIterationFunc: defaultRuleGroupIterationFunc,
136136
}
137+
if cfg.SelectMergerEnabled {
138+
m.ruleGroupIterationFunc = mergedSelectIterationFunc(cfg.SelectMergerMinRules)
139+
}
137140
if cfg.RulesBackupEnabled() {
138141
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
139142
}
@@ -292,6 +295,24 @@ func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, eval
292295
promRules.DefaultEvalIterationFunc(ctx, g, evalTimestamp)
293296
}
294297

298+
// mergedSelectIterationFunc returns a GroupEvalIterationFunc that pre-fetches
299+
// merged selectors before evaluating the group, serving individual rule queries
300+
// from the cached results.
301+
func mergedSelectIterationFunc(minRules int) promRules.GroupEvalIterationFunc {
302+
return func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
303+
plan := planMergedSelects(g.Rules(), minRules)
304+
if len(plan) > 0 {
305+
cache := executePrefetch(ctx, plan, g.Opts().QueryFunc, evalTimestamp)
306+
if cache != nil && len(cache.entries) > 0 {
307+
orig := g.Opts().QueryFunc
308+
g.SetQueryFunc(cachedQueryFunc(orig, cache))
309+
defer g.SetQueryFunc(orig)
310+
}
311+
}
312+
defaultRuleGroupIterationFunc(ctx, g, evalTimestamp)
313+
}
314+
}
315+
295316
// newManager creates a prometheus rule manager wrapped with a user id
296317
// configured storage, appendable, notifier, and instrumentation
297318
func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (RulesManager, error) {

pkg/ruler/prefetch_queryable.go

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package ruler
22

33
import (
4+
"context"
5+
"strings"
6+
"time"
7+
48
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/promql"
510
"github.com/prometheus/prometheus/promql/parser"
11+
"github.com/prometheus/prometheus/rules"
612
"github.com/prometheus/prometheus/storage"
713
"github.com/prometheus/prometheus/util/annotations"
814
)
915

10-
// prefetchEntry holds pre-fetched series for a merged selector.
16+
// prefetchEntry holds pre-fetched results for a merged selector.
1117
type prefetchEntry struct {
1218
matchers []*labels.Matcher
13-
series []storage.Series
19+
vector promql.Vector
1420
}
1521

1622
// prefetchCache holds all pre-fetched data for a single group evaluation.
@@ -19,19 +25,81 @@ type prefetchCache struct {
1925
}
2026

2127
// get checks if any pre-fetched entry is a superset of the requested matchers.
22-
func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (storage.SeriesSet, bool) {
28+
// Returns the cached Vector filtered to match the query matchers.
29+
func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (promql.Vector, bool) {
2330
for _, e := range c.entries {
2431
if isMatcherSetSuperset(e.matchers, queryMatchers) {
2532
extra := extraMatchers(e.matchers, queryMatchers)
2633
if len(extra) == 0 {
27-
return newSliceSeriesSet(e.series), true
34+
return e.vector, true
2835
}
29-
return newFilterSeriesSet(newSliceSeriesSet(e.series), extra), true
36+
return filterVector(e.vector, extra), true
3037
}
3138
}
3239
return nil, false
3340
}
3441

42+
// filterVector returns only samples whose labels match all filters.
43+
func filterVector(vec promql.Vector, filters []*labels.Matcher) promql.Vector {
44+
var result promql.Vector
45+
for _, s := range vec {
46+
if matchesAll(s.Metric, filters) {
47+
result = append(result, s)
48+
}
49+
}
50+
return result
51+
}
52+
53+
// cachedQueryFunc wraps a QueryFunc to serve from pre-fetched cache.
54+
func cachedQueryFunc(inner rules.QueryFunc, cache *prefetchCache) rules.QueryFunc {
55+
if cache == nil || len(cache.entries) == 0 {
56+
return inner
57+
}
58+
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
59+
selectors := extractSelectorsFromExpr(qs)
60+
if len(selectors) == 1 {
61+
if vec, ok := cache.get(selectors[0]); ok {
62+
return vec, nil
63+
}
64+
}
65+
return inner(ctx, qs, t)
66+
}
67+
}
68+
69+
// executePrefetch runs the merged selectors via QueryFunc and populates a cache.
70+
func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache {
71+
cache := &prefetchCache{}
72+
for _, ms := range plan {
73+
expr := ms.prefetchExpr
74+
if expr == "" {
75+
expr = buildSelectorExpr(ms.mergedMatchers)
76+
}
77+
vec, err := queryFunc(ctx, expr, ts)
78+
if err != nil {
79+
continue
80+
}
81+
cache.entries = append(cache.entries, prefetchEntry{
82+
matchers: ms.mergedMatchers,
83+
vector: vec,
84+
})
85+
}
86+
return cache
87+
}
88+
89+
// buildSelectorExpr constructs a PromQL selector string from matchers.
90+
func buildSelectorExpr(ms []*labels.Matcher) string {
91+
var b strings.Builder
92+
b.WriteByte('{')
93+
for i, m := range ms {
94+
if i > 0 {
95+
b.WriteByte(',')
96+
}
97+
b.WriteString(m.String())
98+
}
99+
b.WriteByte('}')
100+
return b.String()
101+
}
102+
35103
// isMatcherSetSuperset returns true if superMatchers covers all series that subMatchers would match.
36104
func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool {
37105
for _, sup := range superMatchers {

pkg/ruler/prefetch_queryable_test.go

Lines changed: 79 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package ruler
22

33
import (
4+
"context"
45
"testing"
6+
"time"
57

68
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/promql"
710
"github.com/prometheus/prometheus/storage"
811
"github.com/prometheus/prometheus/tsdb/chunkenc"
912
"github.com/stretchr/testify/assert"
@@ -67,9 +70,9 @@ func TestPrefetchCache_FindSuperset(t *testing.T) {
6770
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
6871
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
6972
},
70-
series: []storage.Series{
71-
&mockSeries{lset: labels.FromStrings("__name__", "http", "job", "api")},
72-
&mockSeries{lset: labels.FromStrings("__name__", "http", "job", "web")},
73+
vector: promql.Vector{
74+
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: 1000, F: 1.0},
75+
{Metric: labels.FromStrings("__name__", "http", "job", "web"), T: 1000, F: 2.0},
7376
},
7477
},
7578
},
@@ -81,16 +84,11 @@ func TestPrefetchCache_FindSuperset(t *testing.T) {
8184
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
8285
}
8386

84-
ss, ok := cache.get(queryMatchers)
87+
vec, ok := cache.get(queryMatchers)
8588
require.True(t, ok)
86-
87-
var got []labels.Labels
88-
for ss.Next() {
89-
got = append(got, ss.At().Labels())
90-
}
91-
require.NoError(t, ss.Err())
92-
assert.Len(t, got, 1)
93-
assert.Equal(t, "api", got[0].Get("job"))
89+
assert.Len(t, vec, 1)
90+
assert.Equal(t, "api", vec[0].Metric.Get("job"))
91+
assert.Equal(t, 1.0, vec[0].F)
9492
}
9593

9694
func TestIsMatcherSetSuperset(t *testing.T) {
@@ -144,3 +142,72 @@ func TestIsMatcherSetSuperset(t *testing.T) {
144142
})
145143
}
146144
}
145+
146+
func TestCachedQueryFunc(t *testing.T) {
147+
cache := &prefetchCache{
148+
entries: []prefetchEntry{
149+
{
150+
matchers: []*labels.Matcher{
151+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests"),
152+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
153+
},
154+
vector: promql.Vector{
155+
{Metric: labels.FromStrings("__name__", "http_requests", "job", "api"), T: 1000, F: 10.0},
156+
{Metric: labels.FromStrings("__name__", "http_requests", "job", "web"), T: 1000, F: 20.0},
157+
},
158+
},
159+
},
160+
}
161+
162+
innerCalled := false
163+
inner := func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
164+
innerCalled = true
165+
return nil, nil
166+
}
167+
168+
qf := cachedQueryFunc(inner, cache)
169+
170+
// Query that matches cache — should NOT call inner
171+
vec, err := qf(context.Background(), `http_requests{job="api"}`, time.Unix(1, 0))
172+
require.NoError(t, err)
173+
assert.False(t, innerCalled)
174+
assert.Len(t, vec, 1)
175+
assert.Equal(t, 10.0, vec[0].F)
176+
177+
// Query that doesn't match cache — should call inner
178+
_, err = qf(context.Background(), `other_metric{job="api"}`, time.Unix(1, 0))
179+
require.NoError(t, err)
180+
assert.True(t, innerCalled)
181+
}
182+
183+
func TestExecutePrefetch(t *testing.T) {
184+
queryFunc := func(ctx context.Context, qs string, ts time.Time) (promql.Vector, error) {
185+
return promql.Vector{
186+
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: ts.UnixMilli(), F: 5.0},
187+
{Metric: labels.FromStrings("__name__", "http", "job", "web"), T: ts.UnixMilli(), F: 7.0},
188+
}, nil
189+
}
190+
191+
plan := []mergedSelect{
192+
{
193+
metricName: "http",
194+
mergedMatchers: []*labels.Matcher{
195+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
196+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
197+
},
198+
},
199+
}
200+
201+
cache := executePrefetch(context.Background(), plan, queryFunc, time.Unix(1, 0))
202+
require.Len(t, cache.entries, 1)
203+
assert.Len(t, cache.entries[0].vector, 2)
204+
}
205+
206+
func TestBuildSelectorExpr(t *testing.T) {
207+
ms := []*labels.Matcher{
208+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
209+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
210+
}
211+
got := buildSelectorExpr(ms)
212+
assert.Equal(t, `{__name__="http",job=~".*"}`, got)
213+
}

pkg/ruler/ruler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ type Config struct {
181181

182182
ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"`
183183

184+
SelectMergerEnabled bool `yaml:"select_merger_enabled"`
185+
SelectMergerMinRules int `yaml:"select_merger_min_rules"`
186+
184187
// NameValidationScheme is the scheme for validating metric and label names (set from root config).
185188
NameValidationScheme model.ValidationScheme `yaml:"-"`
186189
}
@@ -272,6 +275,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
272275

273276
f.BoolVar(&cfg.EnableHAEvaluation, "ruler.enable-ha-evaluation", false, "Enable high availability")
274277
f.DurationVar(&cfg.LivenessCheckTimeout, "ruler.liveness-check-timeout", 1*time.Second, "Timeout duration for non-primary rulers during liveness checks. If the check times out, the non-primary ruler will evaluate the rule group. Applicable when ruler.enable-ha-evaluation is true.")
278+
279+
f.BoolVar(&cfg.SelectMergerEnabled, "ruler.select-merger-enabled", false, "Enable merged select pre-fetching to reduce redundant queries for rules sharing the same metric.")
280+
f.IntVar(&cfg.SelectMergerMinRules, "ruler.select-merger-min-rules", 3, "Minimum number of rules querying the same metric to trigger merged pre-fetching.")
281+
275282
cfg.RingCheckPeriod = 5 * time.Second
276283
}
277284

pkg/ruler/select_merger.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,25 @@ type mergedSelect struct {
1010
metricName string
1111
mergedMatchers []*labels.Matcher
1212
originalEntries [][]*labels.Matcher // per-rule matchers
13+
prefetchExpr string // expression to evaluate for pre-fetch
1314
}
1415

1516
func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
1617
// Group selectors by metric name.
1718
type entry struct {
1819
matchers []*labels.Matcher
20+
exprStr string
1921
}
2022
groups := map[string][]entry{}
2123

2224
for _, r := range rls {
25+
exprStr := r.Query().String()
2326
extractSelectors(r.Query(), func(vs *parser.VectorSelector) {
2427
name := metricNameFromMatchers(vs.LabelMatchers)
2528
if name == "" {
2629
return
2730
}
28-
groups[name] = append(groups[name], entry{matchers: vs.LabelMatchers})
31+
groups[name] = append(groups[name], entry{matchers: vs.LabelMatchers, exprStr: exprStr})
2932
})
3033
}
3134

@@ -38,15 +41,47 @@ func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
3841
for i, e := range entries {
3942
originals[i] = e.matchers
4043
}
44+
merged := computeMergedMatchers(originals)
45+
46+
// Find the expression whose matchers equal the merged set (the broadest rule).
47+
prefetchExpr := ""
48+
for _, e := range entries {
49+
if matchersEqual(e.matchers, merged) {
50+
prefetchExpr = e.exprStr
51+
break
52+
}
53+
}
54+
4155
result = append(result, mergedSelect{
4256
metricName: name,
43-
mergedMatchers: computeMergedMatchers(originals),
57+
mergedMatchers: merged,
4458
originalEntries: originals,
59+
prefetchExpr: prefetchExpr,
4560
})
4661
}
4762
return result
4863
}
4964

65+
// matchersEqual returns true if two matcher slices contain the same matchers (order-independent).
66+
func matchersEqual(a, b []*labels.Matcher) bool {
67+
if len(a) != len(b) {
68+
return false
69+
}
70+
for _, am := range a {
71+
found := false
72+
for _, bm := range b {
73+
if am.Name == bm.Name && am.Type == bm.Type && am.Value == bm.Value {
74+
found = true
75+
break
76+
}
77+
}
78+
if !found {
79+
return false
80+
}
81+
}
82+
return true
83+
}
84+
5085
func extractSelectors(expr parser.Expr, fn func(*parser.VectorSelector)) {
5186
parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error {
5287
if vs, ok := node.(*parser.VectorSelector); ok {

vendor/github.com/prometheus/prometheus/rules/group.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)