Skip to content

Commit 932c8a4

Browse files
committed
Register xfunctions in global parser for rule file loading
Signed-off-by: Paurush Garg <paurushg@amazon.com>
1 parent 74185ef commit 932c8a4

4 files changed

Lines changed: 135 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@
5555
* [BUGFIX] Ring: Fix ring token conflict resolution only applied to updated instance and make constantly token conflict check during instance observe period.
5656
* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541
5757
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
58-
* [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602
5958
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
59+
* [BUGFIX] Distributor: Release the push worker pool goroutines on shutdown by stopping the async executor during the stopping phase when `-distributor.num-push-workers` is set. #7602
60+
* [BUGFIX] Ruler: Register xfunctions (xincrease, xrate, xdelta) in the global parser before loading rule files. #7621
6061

6162
## 1.21.0 2026-04-24
6263

integration/ruler_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,3 +1887,57 @@ func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
18871887
},
18881888
}
18891889
}
1890+
1891+
func TestRulerXFunctionsWithThanosEngine(t *testing.T) {
1892+
s, err := e2e.NewScenario(networkName)
1893+
require.NoError(t, err)
1894+
defer s.Close()
1895+
1896+
consul := e2edb.NewConsul()
1897+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
1898+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1899+
1900+
flags := mergeFlags(
1901+
BlocksStorageFlags(),
1902+
RulerFlags(),
1903+
map[string]string{
1904+
"-querier.thanos-engine": "true",
1905+
"-querier.enable-x-functions": "true",
1906+
"-ruler.evaluation-interval": "2s",
1907+
"-ruler.poll-interval": "2s",
1908+
"-ruler.evaluation-delay-duration": "0",
1909+
"-distributor.replication-factor": "1",
1910+
},
1911+
)
1912+
1913+
const namespace = "test"
1914+
const user = "user-1"
1915+
1916+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1917+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1918+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
1919+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))
1920+
1921+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1922+
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1923+
1924+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
1925+
require.NoError(t, err)
1926+
1927+
series, _ := generateSeries("metric_total", time.Now(), prompb.Label{Name: "job", Value: "test"})
1928+
res, err := c.Push(series)
1929+
require.NoError(t, err)
1930+
require.Equal(t, 200, res.StatusCode)
1931+
1932+
ruleGroup := ruleGroupWithRule("xfunctions_group", "xincrease_rule", `xincrease(metric_total{job="test"}[5m])`)
1933+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))
1934+
1935+
m := ruleGroupMatcher(user, namespace, "xfunctions_group")
1936+
1937+
// Wait until ruler has loaded the rule group.
1938+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
1939+
1940+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
1941+
1942+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
1943+
}

pkg/cortex/modules.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"log/slog"
8+
"maps"
89
"net/http"
910
"runtime"
1011
"runtime/debug"
@@ -18,9 +19,11 @@ import (
1819
"github.com/prometheus/client_golang/prometheus"
1920
"github.com/prometheus/common/model"
2021
"github.com/prometheus/prometheus/promql"
22+
"github.com/prometheus/prometheus/promql/parser"
2123
"github.com/prometheus/prometheus/rules"
2224
prom_storage "github.com/prometheus/prometheus/storage"
2325
"github.com/thanos-io/objstore"
26+
"github.com/thanos-io/promql-engine/execution/parse"
2427
"github.com/thanos-io/thanos/pkg/discovery/dns"
2528
"github.com/thanos-io/thanos/pkg/querysharding"
2629
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
@@ -661,6 +664,11 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
661664
return
662665
}
663666

667+
// Register xfunctions (xincrease, xrate, xdelta) in the global parser
668+
if t.Cfg.Querier.ThanosEngine.Enabled && t.Cfg.Querier.ThanosEngine.EnableXFunctions {
669+
maps.Copy(parser.Functions, parse.XFunctions)
670+
}
671+
664672
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.OverridesConfig, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.NameValidationScheme)
665673
return
666674
}

pkg/cortex/modules_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cortex
22

33
import (
44
"context"
5+
"maps"
56
"net/http/httptest"
67
"os"
78
"reflect"
@@ -10,6 +11,7 @@ import (
1011
"testing"
1112

1213
"github.com/gorilla/mux"
14+
"github.com/prometheus/prometheus/promql/parser"
1315
prom_storage "github.com/prometheus/prometheus/storage"
1416
"github.com/stretchr/testify/assert"
1517
"github.com/stretchr/testify/require"
@@ -169,6 +171,75 @@ func (p *myPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
169171
return nil, nil
170172
}
171173

174+
func TestCortex_InitRulerStorage_RegistersXFunctions(t *testing.T) {
175+
tests := map[string]struct {
176+
enabled bool
177+
enableXFunctions bool
178+
expectRegistered bool
179+
}{
180+
"should register xfunctions when thanos engine is enabled and EnableXFunctions is true": {
181+
enabled: true,
182+
enableXFunctions: true,
183+
expectRegistered: true,
184+
},
185+
"should not register xfunctions when EnableXFunctions is false": {
186+
enabled: true,
187+
enableXFunctions: false,
188+
expectRegistered: false,
189+
},
190+
"should not register xfunctions when thanos engine is disabled": {
191+
enabled: false,
192+
enableXFunctions: true,
193+
expectRegistered: false,
194+
},
195+
}
196+
197+
for testName, testData := range tests {
198+
t.Run(testName, func(t *testing.T) {
199+
// Clean up global state after each test
200+
originalFunctions := make(map[string]*parser.Function, len(parser.Functions))
201+
maps.Copy(originalFunctions, parser.Functions)
202+
defer func() {
203+
// Restore original parser.Functions
204+
for k := range parser.Functions {
205+
if _, ok := originalFunctions[k]; !ok {
206+
delete(parser.Functions, k)
207+
}
208+
}
209+
}()
210+
211+
cfg := newDefaultConfig()
212+
cfg.Target = []string{"ruler"}
213+
cfg.RulerStorage.Backend = "local"
214+
cfg.RulerStorage.Local.Directory = os.TempDir()
215+
cfg.Querier.ThanosEngine.Enabled = testData.enabled
216+
cfg.Querier.ThanosEngine.EnableXFunctions = testData.enableXFunctions
217+
218+
cortex := &Cortex{
219+
Server: &server.Server{},
220+
Cfg: *cfg,
221+
}
222+
223+
_, err := cortex.initRulerStorage()
224+
require.NoError(t, err)
225+
226+
_, hasXincrease := parser.Functions["xincrease"]
227+
_, hasXrate := parser.Functions["xrate"]
228+
_, hasXdelta := parser.Functions["xdelta"]
229+
230+
if testData.expectRegistered {
231+
assert.True(t, hasXincrease, "xincrease should be registered")
232+
assert.True(t, hasXrate, "xrate should be registered")
233+
assert.True(t, hasXdelta, "xdelta should be registered")
234+
} else {
235+
assert.False(t, hasXincrease, "xincrease should not be registered")
236+
assert.False(t, hasXrate, "xrate should not be registered")
237+
assert.False(t, hasXdelta, "xdelta should not be registered")
238+
}
239+
})
240+
}
241+
}
242+
172243
type myQueryable struct{}
173244

174245
func (q *myQueryable) Querier(mint, maxt int64) (prom_storage.Querier, error) {

0 commit comments

Comments
 (0)