-
Notifications
You must be signed in to change notification settings - Fork 856
Expand file tree
/
Copy pathprefetch_queryable.go
More file actions
162 lines (145 loc) · 4.14 KB
/
prefetch_queryable.go
File metadata and controls
162 lines (145 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package ruler
import (
"context"
"sync"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
)
type selectMergerCtxKey struct{}
// selectMergerState is injected into context by the iteration func.
// The QueryFunc wrapper lazily executes the prefetch on first access.
type selectMergerState struct {
plan []mergedSelect
once sync.Once
cache *prefetchCache
}
func withSelectMergerPlan(ctx context.Context, plan []mergedSelect) context.Context {
return context.WithValue(ctx, selectMergerCtxKey{}, &selectMergerState{plan: plan})
}
// selectMergerQueryFunc wraps a QueryFunc to check context for a merge plan.
// On first call, it lazily pre-fetches using the inner QueryFunc, then serves from cache.
func selectMergerQueryFunc(inner rules.QueryFunc) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
state, _ := ctx.Value(selectMergerCtxKey{}).(*selectMergerState)
if state == nil {
return inner(ctx, qs, t)
}
// Lazy prefetch: execute plan on first call.
state.once.Do(func() {
state.cache = executePrefetch(ctx, state.plan, inner, t)
})
if state.cache != nil {
selectors := extractSelectorsFromExpr(qs)
if len(selectors) == 1 {
if vec, ok := state.cache.get(selectors[0]); ok {
return vec, nil
}
}
}
return inner(ctx, qs, t)
}
}
// prefetchEntry holds pre-fetched results for a merged selector.
type prefetchEntry struct {
matchers []*labels.Matcher
vector promql.Vector
}
// prefetchCache holds all pre-fetched data for a single group evaluation.
type prefetchCache struct {
entries []prefetchEntry
}
func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (promql.Vector, bool) {
for _, e := range c.entries {
if isMatcherSetSuperset(e.matchers, queryMatchers) {
extra := extraMatchers(e.matchers, queryMatchers)
if len(extra) == 0 {
return e.vector, true
}
return filterVector(e.vector, extra), true
}
}
return nil, false
}
func filterVector(vec promql.Vector, filters []*labels.Matcher) promql.Vector {
var result promql.Vector
for _, s := range vec {
if matchesAll(s.Metric, filters) {
result = append(result, s)
}
}
return result
}
// executePrefetch runs the merged selectors via QueryFunc and populates a cache.
// Called without cache in context, so inner falls through to the real query.
func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache {
// Remove the state from context to prevent recursion during prefetch.
ctx = context.WithValue(ctx, selectMergerCtxKey{}, (*selectMergerState)(nil))
cache := &prefetchCache{}
for _, ms := range plan {
vec, err := queryFunc(ctx, ms.prefetchExpr, ts)
if err != nil {
continue
}
cache.entries = append(cache.entries, prefetchEntry{
matchers: ms.mergedMatchers,
vector: vec,
})
}
return cache
}
func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool {
for _, sup := range superMatchers {
found := false
for _, sub := range subMatchers {
if sub.Name == sup.Name {
found = true
if !isMatcherSuperset(sup, sub) {
return false
}
break
}
}
if !found {
return false
}
}
return true
}
func extraMatchers(entryMatchers, queryMatchers []*labels.Matcher) []*labels.Matcher {
var result []*labels.Matcher
for _, qm := range queryMatchers {
isExtra := true
for _, em := range entryMatchers {
if em.Name == qm.Name && em.Type == qm.Type && em.Value == qm.Value {
isExtra = false
break
}
}
if isExtra {
result = append(result, qm)
}
}
return result
}
func matchesAll(lset labels.Labels, matchers []*labels.Matcher) bool {
for _, m := range matchers {
if !m.Matches(lset.Get(m.Name)) {
return false
}
}
return true
}
func extractSelectorsFromExpr(qs string) [][]*labels.Matcher {
expr, err := parser.ParseExpr(qs)
if err != nil {
return nil
}
var result [][]*labels.Matcher
extractSelectors(expr, func(vs *parser.VectorSelector) {
result = append(result, vs.LabelMatchers)
})
return result
}