Skip to content

Commit 7bad079

Browse files
committed
fix(ruler): fix critical correctness issues in select merging
1. Group by expression structure — rules with different functions (rate vs sum) on the same metric are no longer incorrectly merged. 2. Only include labels present in ALL entries — a label missing from any rule means that rule matches all values; including it in the merged set would make the pre-fetch too restrictive. 3. Skip when no prefetchExpr found — if no rule's matchers equal the merged matchers (no superset rule exists), skip merging for that metric rather than falling back to a bare selector that returns raw series instead of computed values. 4. Disable select merging when ConcurrentEvalsEnabled is true — SetQueryFunc is not thread-safe.
1 parent 08dd182 commit 7bad079

5 files changed

Lines changed: 86 additions & 20 deletions

File tree

pkg/ruler/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory
134134
logger: logger,
135135
ruleGroupIterationFunc: defaultRuleGroupIterationFunc,
136136
}
137-
if cfg.SelectMergerEnabled {
137+
if cfg.SelectMergerEnabled && !cfg.ConcurrentEvalsEnabled {
138138
m.ruleGroupIterationFunc = mergedSelectIterationFunc(cfg.SelectMergerMinRules)
139139
}
140140
if cfg.RulesBackupEnabled() {

pkg/ruler/prefetch_queryable.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,7 @@ func cachedQueryFunc(inner rules.QueryFunc, cache *prefetchCache) rules.QueryFun
7070
func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache {
7171
cache := &prefetchCache{}
7272
for _, ms := range plan {
73-
expr := ms.prefetchExpr
74-
if expr == "" {
75-
expr = buildSelectorExpr(ms.mergedMatchers)
76-
}
77-
vec, err := queryFunc(ctx, expr, ts)
73+
vec, err := queryFunc(ctx, ms.prefetchExpr, ts)
7874
if err != nil {
7975
continue
8076
}

pkg/ruler/select_merger.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ruler
22

33
import (
4+
"strings"
5+
46
"github.com/prometheus/prometheus/model/labels"
57
"github.com/prometheus/prometheus/promql/parser"
68
"github.com/prometheus/prometheus/rules"
@@ -14,11 +16,15 @@ type mergedSelect struct {
1416
}
1517

1618
func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
17-
// Group selectors by metric name.
19+
// Group selectors by metric name AND expression structure.
20+
// The "structure key" is the expression with matchers blanked out,
21+
// so rate(m{a="1"}[5m]) and rate(m{a="2"}[5m]) share a key but
22+
// sum(m{a="1"}) does not.
1823
type entry struct {
1924
matchers []*labels.Matcher
2025
exprStr string
2126
}
27+
// Key: metricName + "\x00" + expression structure
2228
groups := map[string][]entry{}
2329

2430
for _, r := range rls {
@@ -28,12 +34,13 @@ func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
2834
if name == "" {
2935
return
3036
}
31-
groups[name] = append(groups[name], entry{matchers: vs.LabelMatchers, exprStr: exprStr})
37+
key := name + "\x00" + exprStructureKey(r.Query(), vs)
38+
groups[key] = append(groups[key], entry{matchers: vs.LabelMatchers, exprStr: exprStr})
3239
})
3340
}
3441

3542
var result []mergedSelect
36-
for name, entries := range groups {
43+
for _, entries := range groups {
3744
if len(entries) < minRules {
3845
continue
3946
}
@@ -44,14 +51,19 @@ func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
4451
merged := computeMergedMatchers(originals)
4552

4653
// Find the expression whose matchers equal the merged set (the broadest rule).
54+
// If none found, skip — we can't safely pre-fetch without a full expression.
4755
prefetchExpr := ""
4856
for _, e := range entries {
4957
if matchersEqual(e.matchers, merged) {
5058
prefetchExpr = e.exprStr
5159
break
5260
}
5361
}
62+
if prefetchExpr == "" {
63+
continue
64+
}
5465

66+
name := metricNameFromMatchers(merged)
5567
result = append(result, mergedSelect{
5668
metricName: name,
5769
mergedMatchers: merged,
@@ -62,6 +74,16 @@ func planMergedSelects(rls []rules.Rule, minRules int) []mergedSelect {
6274
return result
6375
}
6476

77+
// exprStructureKey returns a string representing the expression structure
78+
// with the VectorSelector's matchers replaced by a placeholder.
79+
// Two expressions with the same structure key differ only in their matchers.
80+
func exprStructureKey(expr parser.Expr, vs *parser.VectorSelector) string {
81+
full := expr.String()
82+
selectorStr := vs.String()
83+
// Replace the specific selector with a placeholder.
84+
return strings.Replace(full, selectorStr, "{__PLACEHOLDER__}", 1)
85+
}
86+
6587
// matchersEqual returns true if two matcher slices contain the same matchers (order-independent).
6688
func matchersEqual(a, b []*labels.Matcher) bool {
6789
if len(a) != len(b) {
@@ -101,7 +123,10 @@ func metricNameFromMatchers(ms []*labels.Matcher) string {
101123
}
102124

103125
func computeMergedMatchers(entries [][]*labels.Matcher) []*labels.Matcher {
104-
// Collect all label names across entries.
126+
// Collect all label names across entries, but only keep labels
127+
// that appear in ALL entries. A label missing from any entry means
128+
// that entry matches all values for that label — including it in the
129+
// merged set would make the pre-fetch too restrictive.
105130
labelMatchers := map[string][]*labels.Matcher{}
106131
for _, ms := range entries {
107132
for _, m := range ms {
@@ -111,6 +136,10 @@ func computeMergedMatchers(entries [][]*labels.Matcher) []*labels.Matcher {
111136

112137
var result []*labels.Matcher
113138
for _, ms := range labelMatchers {
139+
// Only include if present in ALL entries.
140+
if len(ms) != len(entries) {
141+
continue
142+
}
114143
if sup := findSuperset(ms); sup != nil {
115144
result = append(result, sup)
116145
}

pkg/ruler/select_merger_bench_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@ func BenchmarkSelectMerger(b *testing.B) {
2424
colors := []string{"blue", "green", "canary"}
2525

2626
// Build 114 rules: 38 metrics × 3 deploy_color variants.
27+
// One variant uses =~".*" (superset) to enable merging.
2728
rls := make([]rules.Rule, 0, numMetrics*numColors)
2829
for i := range numMetrics {
29-
for _, c := range colors {
30-
expr := fmt.Sprintf(`sum(metric_%d{deploy_color="%s",job="svc"})`, i, c)
30+
for ci, c := range colors {
31+
var expr string
32+
if ci == numColors-1 {
33+
// Last color variant uses regex superset — enables merging.
34+
expr = fmt.Sprintf(`sum(metric_%d{deploy_color=~".*",job="svc"})`, i)
35+
} else {
36+
expr = fmt.Sprintf(`sum(metric_%d{deploy_color="%s",job="svc"})`, i, c)
37+
}
38+
_ = c
3139
e, err := parser.ParseExpr(expr)
3240
if err != nil {
3341
b.Fatal(err)

pkg/ruler/select_merger_test.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ func mustParseRule(t *testing.T, expr string) rules.Rule {
2626

2727
func TestSelectMerger_Plan_GroupsByMetric(t *testing.T) {
2828
rls := []rules.Rule{
29-
mustParseRule(t, `sum(http_requests_total{job="api"})`),
30-
mustParseRule(t, `sum(http_requests_total{job="web"})`),
29+
// http_requests_total: 3 rules with same structure, deploy_color varies, one has =~".*" superset
30+
mustParseRule(t, `sum(http_requests_total{job="api",deploy_color="blue"})`),
31+
mustParseRule(t, `sum(http_requests_total{job="api",deploy_color="green"})`),
32+
mustParseRule(t, `sum(http_requests_total{job="api",deploy_color=~".*"})`),
33+
// cpu_usage: 3 rules with same structure, host varies, one has =~".*" superset
3134
mustParseRule(t, `avg(cpu_usage{host="a"})`),
3235
mustParseRule(t, `avg(cpu_usage{host="b"})`),
36+
mustParseRule(t, `avg(cpu_usage{host=~".*"})`),
3337
}
3438

3539
result := planMergedSelects(rls, 2)
@@ -92,17 +96,46 @@ func TestSelectMerger_SupersetDetection(t *testing.T) {
9296

9397
func TestSelectMerger_MinRulesThreshold(t *testing.T) {
9498
rls := []rules.Rule{
95-
mustParseRule(t, `sum(http_requests_total{job="api"})`),
96-
mustParseRule(t, `sum(http_requests_total{job="web"})`),
99+
mustParseRule(t, `sum(http_requests_total{job="api",color="blue"})`),
100+
mustParseRule(t, `sum(http_requests_total{job="api",color="green"})`),
101+
mustParseRule(t, `sum(http_requests_total{job="api",color=~".*"})`),
97102
mustParseRule(t, `avg(cpu_usage{host="a"})`),
98103
}
99104

100-
// minRules=2: http_requests_total has 2 rules, cpu_usage has 1
105+
// minRules=2: http_requests_total has 3 rules with superset, cpu_usage has only 1
106+
result := planMergedSelects(rls, 2)
107+
require.Len(t, result, 1)
108+
assert.Equal(t, "http_requests_total", result[0].metricName)
109+
110+
// minRules=4: nothing qualifies
111+
result = planMergedSelects(rls, 4)
112+
assert.Len(t, result, 0)
113+
}
114+
115+
func TestSelectMerger_DifferentExprStructures_NotMerged(t *testing.T) {
116+
// Same metric but different expression structures should NOT be merged.
117+
rls := []rules.Rule{
118+
mustParseRule(t, `rate(http_requests_total{job="api",color="blue"}[5m])`),
119+
mustParseRule(t, `rate(http_requests_total{job="api",color=~".*"}[5m])`),
120+
mustParseRule(t, `sum(http_requests_total{job="api",color="red"})`),
121+
}
122+
101123
result := planMergedSelects(rls, 2)
102-
assert.Len(t, result, 1)
124+
// Only the rate() pair should merge (2 rules with same structure).
125+
// The sum() rule has a different structure.
126+
require.Len(t, result, 1)
103127
assert.Equal(t, "http_requests_total", result[0].metricName)
128+
assert.Contains(t, result[0].prefetchExpr, "rate")
129+
}
130+
131+
func TestSelectMerger_NoSuperset_NotMerged(t *testing.T) {
132+
// When no rule has a superset matcher, merging is skipped.
133+
rls := []rules.Rule{
134+
mustParseRule(t, `sum(http_requests_total{job="api"})`),
135+
mustParseRule(t, `sum(http_requests_total{job="web"})`),
136+
}
104137

105-
// minRules=3: nothing qualifies
106-
result = planMergedSelects(rls, 3)
138+
result := planMergedSelects(rls, 2)
139+
// job="api" and job="web" have no superset, so no prefetchExpr can be found.
107140
assert.Len(t, result, 0)
108141
}

0 commit comments

Comments
 (0)