diff --git a/engine/engine.go b/engine/engine.go index f18a93a85..0d8f40f1d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -74,6 +74,11 @@ type Opts struct { // This will default to false. EnableXFunctions bool + // EnableExtendedRangeSelectors enables the anchored and smoothed modifiers + // for range vector selectors. + // See https://github.com/prometheus/proposals/blob/main/proposals/0052-extended-range-selectors-semantics.md + EnableExtendedRangeSelectors bool + // EnableAnalysis enables query analysis. EnableAnalysis bool @@ -183,7 +188,8 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { scanners: scanners, activeQueryTracker: queryTracker, - disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, + disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, + enableExtendedRangeSelectors: opts.EnableExtendedRangeSelectors, logger: opts.Logger, lookbackDelta: opts.LookbackDelta, @@ -214,7 +220,8 @@ type Engine struct { scanners engstorage.Scanners activeQueryTracker promql.QueryTracker - disableDuplicateLabelChecks bool + disableDuplicateLabelChecks bool + enableExtendedRangeSelectors bool logger *slog.Logger lookbackDelta time.Duration @@ -238,6 +245,14 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts } defer e.activeQueryTracker.Delete(idx) + // NOTE: parser.EnableExtendedRangeSelectors is a process-global variable + // in the upstream Prometheus parser. Once set to true, it remains enabled + // for all subsequent parses in the process, matching how Prometheus handles + // it (set once at startup via --enable-feature). Two engine instances in + // the same process cannot independently control this flag. + if e.enableExtendedRangeSelectors { + parser.EnableExtendedRangeSelectors = true + } expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { return nil, err @@ -336,6 +351,14 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * } defer e.activeQueryTracker.Delete(idx) + // NOTE: parser.EnableExtendedRangeSelectors is a process-global variable + // in the upstream Prometheus parser. Once set to true, it remains enabled + // for all subsequent parses in the process, matching how Prometheus handles + // it (set once at startup via --enable-feature). Two engine instances in + // the same process cannot independently control this flag. + if e.enableExtendedRangeSelectors { + parser.EnableExtendedRangeSelectors = true + } expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { return nil, err diff --git a/engine/engine_test.go b/engine/engine_test.go index 1542ebb62..10beb1bb0 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -75,7 +75,9 @@ func (s *skipTest) Run(name string, t func(*testing.T)) bool { func TestPromqlAcceptance(t *testing.T) { // promql acceptance tests disable experimental functions again // since we use them in our tests too we need to enable them afterwards again - t.Cleanup(func() { parser.EnableExperimentalFunctions = true }) + t.Cleanup(func() { + parser.EnableExperimentalFunctions = true + }) engine := engine.New(engine.Opts{ EngineOpts: promql.EngineOpts{ @@ -85,13 +87,14 @@ func TestPromqlAcceptance(t *testing.T) { MaxSamples: 5e10, Timeout: 1 * time.Hour, NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() }, - }}) + }, + EnableExtendedRangeSelectors: true, + }) st := &skipTest{ skipTests: []string{ "testdata/name_label_dropping.test", // feature unsupported "testdata/type_and_unit.test", // feature unsupported - "testdata/extended_vectors.test", // experimental anchored/smoothed modifiers unsupported "testdata/info.test", // info() function unsupported "testdata/literals.test", // string literal expressions as query results unsupported "testdata/range_queries.test", // matrix selector as instant query result unsupported diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go new file mode 100644 index 000000000..562870870 --- /dev/null +++ b/engine/extended_range_test.go @@ -0,0 +1,525 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package engine_test + +import ( + "context" + "fmt" + "math" + "sync" + "testing" + "time" + + "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" + engstorage "github.com/thanos-io/promql-engine/storage" + promscan "github.com/thanos-io/promql-engine/storage/prometheus" + + "github.com/efficientgo/core/testutil" + "github.com/google/go-cmp/cmp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/promqltest" + promstorage "github.com/prometheus/prometheus/storage" +) + +func TestAnchoredSmoothedModifiers(t *testing.T) { + parser.EnableExtendedRangeSelectors = true + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + + cases := []struct { + name string + load string + query string + start time.Time + end time.Time + step time.Duration + }{ + // Anchored rate/increase/delta on a linear counter. + { + name: "anchored rate on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `rate(http_total[30s] anchored)`, + }, + { + name: "anchored increase on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] anchored)`, + }, + { + name: "anchored delta on gauge", + load: `load 10s + temperature 20 22 21 23 25 24 26 28 27 29 30`, + query: `delta(temperature[30s] anchored)`, + }, + // Smoothed rate/increase/delta on a linear counter. + { + name: "smoothed rate on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `rate(http_total[30s] smoothed)`, + }, + { + name: "smoothed increase on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] smoothed)`, + }, + { + name: "smoothed delta on gauge", + load: `load 10s + temperature 20 22 21 23 25 24 26 28 27 29 30`, + query: `delta(temperature[30s] smoothed)`, + }, + // Anchored with counter resets. + { + name: "anchored increase with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `increase(resets_total[30s] anchored)`, + }, + { + name: "anchored rate with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `rate(resets_total[30s] anchored)`, + }, + // Smoothed with counter resets. + { + name: "smoothed increase with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `increase(resets_total[30s] smoothed)`, + }, + { + name: "smoothed rate with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `rate(resets_total[30s] smoothed)`, + }, + // Anchored resets and changes (only supported for anchored). + { + name: "anchored resets", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `resets(resets_total[30s] anchored)`, + }, + { + name: "anchored changes", + load: `load 10s + metric 1 1 2 2 3 3 4 4 5 5 6`, + query: `changes(metric[30s] anchored)`, + }, + // Counter reset at range boundary (regression test: smoothed interpolation + // must handle counter resets to avoid negative increase values). + { + name: "smoothed increase counter reset at boundary", + load: `load 10s + counter_boundary 0 4 5 1 6 11`, + query: `increase(counter_boundary[10s] smoothed)`, + }, + { + name: "smoothed rate counter reset at boundary", + load: `load 10s + counter_boundary 0 4 5 1 6 11`, + query: `rate(counter_boundary[10s] smoothed)`, + }, + // Non-linear data. + { + name: "anchored rate on quadratic counter", + load: `load 10s + quadratic 0 1 4 9 16 25 36 49 64 81 100`, + query: `rate(quadratic[30s] anchored)`, + }, + { + name: "smoothed rate on quadratic counter", + load: `load 10s + quadratic 0 1 4 9 16 25 36 49 64 81 100`, + query: `rate(quadratic[30s] smoothed)`, + }, + // Multiple series. + { + name: "anchored increase multiple series", + load: `load 10s + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] anchored)`, + }, + { + name: "smoothed increase multiple series", + load: `load 10s + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] smoothed)`, + }, + } + + start := time.Unix(0, 0) + end := time.Unix(100, 0) + step := 10 * time.Second + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + storage := promqltest.LoadedStorage(t, tc.load) + defer storage.Close() + + tcStart := start + if !tc.start.IsZero() { + tcStart = tc.start + } + tcEnd := end + if !tc.end.IsZero() { + tcEnd = tc.end + } + tcStep := step + if tc.step != 0 { + tcStep = tc.step + } + + for _, disableOptimizers := range []bool{false, true} { + t.Run(fmt.Sprintf("disableOptimizers=%v", disableOptimizers), func(t *testing.T) { + optimizers := logicalplan.AllOptimizers + if disableOptimizers { + optimizers = logicalplan.NoOptimizers + } + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: optimizers, + SelectorBatchSize: 1, + EnableExtendedRangeSelectors: true, + }) + + ctx := context.Background() + q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tcStart, tcEnd, tcStep) + testutil.Ok(t, err) + defer q1.Close() + newResult := q1.Exec(ctx) + + oldEngine := promql.NewEngine(opts) + q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, tc.query, tcStart, tcEnd, tcStep) + testutil.Ok(t, err) + defer q2.Close() + oldResult := q2.Exec(ctx) + + testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) + }) + } + }) + } +} + +func TestAnchoredSmoothedWhitelist(t *testing.T) { + parser.EnableExtendedRangeSelectors = true + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + } + + load := `load 10s + metric 0 10 20 30 40 50` + + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + EnableExtendedRangeSelectors: true, + }) + ctx := context.Background() + + // Functions not in the whitelist should error. + unsupportedAnchored := []string{ + `avg_over_time(metric[30s] anchored)`, + `sum_over_time(metric[30s] anchored)`, + `max_over_time(metric[30s] anchored)`, + `deriv(metric[30s] anchored)`, + } + for _, query := range unsupportedAnchored { + t.Run("unsupported_anchored/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + defer q.Close() + res := q.Exec(ctx) + testutil.NotOk(t, res.Err) + }) + } + + unsupportedSmoothed := []string{ + `avg_over_time(metric[30s] smoothed)`, + `resets(metric[30s] smoothed)`, + `changes(metric[30s] smoothed)`, + } + for _, query := range unsupportedSmoothed { + t.Run("unsupported_smoothed/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + defer q.Close() + res := q.Exec(ctx) + testutil.NotOk(t, res.Err) + }) + } + + // Functions in the whitelist should work. + supportedAnchored := []string{ + `rate(metric[30s] anchored)`, + `increase(metric[30s] anchored)`, + `delta(metric[30s] anchored)`, + `resets(metric[30s] anchored)`, + `changes(metric[30s] anchored)`, + } + for _, query := range supportedAnchored { + t.Run("supported_anchored/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + q.Close() + }) + } + + supportedSmoothed := []string{ + `rate(metric[30s] smoothed)`, + `increase(metric[30s] smoothed)`, + `delta(metric[30s] smoothed)`, + } + for _, query := range supportedSmoothed { + t.Run("supported_smoothed/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + q.Close() + }) + } +} + +// hintsSpy wraps real scanners and records the SelectHints passed to NewMatrixSelector. +type hintsSpy struct { + inner engstorage.Scanners + mu sync.Mutex + matrixHintsCap []promstorage.SelectHints +} + +func (s *hintsSpy) Close() error { return s.inner.Close() } + +func (s *hintsSpy) NewVectorSelector(ctx context.Context, opts *query.Options, hints promstorage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) { + return s.inner.NewVectorSelector(ctx, opts, hints, selector) +} + +func (s *hintsSpy) NewMatrixSelector(ctx context.Context, opts *query.Options, hints promstorage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) { + s.mu.Lock() + s.matrixHintsCap = append(s.matrixHintsCap, hints) + s.mu.Unlock() + return s.inner.NewMatrixSelector(ctx, opts, hints, selector, call) +} + +func (s *hintsSpy) capturedHints() []promstorage.SelectHints { + s.mu.Lock() + defer s.mu.Unlock() + return s.matrixHintsCap +} + +func FuzzAnchoredSmoothedModifiers(f *testing.F) { + f.Add(int64(0), uint32(30), uint32(300), uint32(10), 0.0, 10.0, 5.0, 20.0, uint8(0)) + + f.Fuzz(func(t *testing.T, seed int64, startTS, endTS, intervalSeconds uint32, initialVal1, inc1, initialVal2, inc2 float64, funcIdx uint8) { + if math.IsNaN(initialVal1) || math.IsNaN(initialVal2) || math.IsNaN(inc1) || math.IsNaN(inc2) { + return + } + if math.IsInf(initialVal1, 0) || math.IsInf(initialVal2, 0) || math.IsInf(inc1, 0) || math.IsInf(inc2, 0) { + return + } + if inc1 < 0 || inc2 < 0 || intervalSeconds == 0 || endTS <= startTS { + return + } + // Cap values to avoid overflow. + if initialVal1 > 1e12 || initialVal2 > 1e12 || inc1 > 1e8 || inc2 > 1e8 { + return + } + // Ensure query range falls within data range (41 samples at 10s = 400s). + // This avoids edge cases at data boundaries where Prometheus' extendFloats + // synthesis for changes/resets differs from the thanos engine approach. + const maxDataTS uint32 = 400 + if endTS > maxDataTS { + endTS = maxDataTS + } + if startTS >= endTS { + return + } + + parser.EnableExtendedRangeSelectors = true + + type queryDef struct { + name string + query string + } + + // Queries covering all supported function+modifier combinations. + allQueries := []queryDef{ + {"rate_anchored", `rate(http_requests_total[30s] anchored)`}, + {"rate_smoothed", `rate(http_requests_total[30s] smoothed)`}, + {"increase_anchored", `increase(http_requests_total[30s] anchored)`}, + {"increase_smoothed", `increase(http_requests_total[30s] smoothed)`}, + {"delta_anchored", `delta(http_requests_total[30s] anchored)`}, + {"delta_smoothed", `delta(http_requests_total[30s] smoothed)`}, + {"resets_anchored", `resets(http_requests_total[30s] anchored)`}, + {"changes_anchored", `changes(http_requests_total[30s] anchored)`}, + {"rate_anchored_1m", `rate(http_requests_total[1m] anchored)`}, + {"rate_smoothed_1m", `rate(http_requests_total[1m] smoothed)`}, + {"sum_rate_anchored", `sum(rate(http_requests_total[30s] anchored))`}, + {"sum_rate_smoothed", `sum(rate(http_requests_total[30s] smoothed))`}, + } + + // Pick one query per fuzz iteration to keep it fast. + selected := allQueries[int(funcIdx)%len(allQueries)] + + load := fmt.Sprintf(`load 10s + http_requests_total{pod="nginx-1"} %.2f+%.2fx40 + http_requests_total{pod="nginx-2"} %.2f+%.2fx40`, initialVal1, inc1, initialVal2, inc2) + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + start := time.Unix(int64(startTS), 0) + end := time.Unix(int64(endTS), 0) + interval := time.Duration(intervalSeconds) * time.Second + + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + EnableExtendedRangeSelectors: true, + }) + oldEngine := promql.NewEngine(opts) + + ctx := context.Background() + + q1, err := newEngine.NewRangeQuery(ctx, storage, nil, selected.query, start, end, interval) + if err != nil { + return // Skip unsupported queries. + } + defer q1.Close() + newResult := q1.Exec(ctx) + + q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, selected.query, start, end, interval) + if err != nil { + t.Fatalf("prometheus engine error for %s: %v", selected.name, err) + } + defer q2.Close() + oldResult := q2.Exec(ctx) + + if !cmp.Equal(oldResult, newResult, comparer) { + t.Logf("load: %s", load) + t.Logf("query: %s (%s), start: %d, end: %d, interval: %v", selected.query, selected.name, start.UnixMilli(), end.UnixMilli(), interval) + t.Errorf("result mismatch.\nnew: %s\nold: %s", newResult.String(), oldResult.String()) + } + }) +} + +func TestAnchoredSmoothedSelectHints(t *testing.T) { + parser.EnableExtendedRangeSelectors = true + + load := `load 10s + metric 0 10 20 30 40 50 60 70 80 90 100` + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + lookbackDelta := 5 * time.Minute // engine default + + cases := []struct { + name string + query string + expectedRange int64 // hints.Range in ms — should always be original selector range + }{ + { + name: "standard rate — Range equals selector range", + query: `rate(metric[30s])`, + expectedRange: 30000, + }, + { + name: "anchored rate — Range equals selector range, not widened", + query: `rate(metric[30s] anchored)`, + expectedRange: 30000, + }, + { + name: "smoothed rate — Range equals selector range, not widened", + query: `rate(metric[30s] smoothed)`, + expectedRange: 30000, + }, + { + name: "anchored increase — Range equals selector range", + query: `increase(metric[1m] anchored)`, + expectedRange: 60000, + }, + { + name: "smoothed delta — Range equals selector range", + query: `delta(metric[1m] smoothed)`, + expectedRange: 60000, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + opts := engine.Opts{ + EngineOpts: promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + }, + EnableExtendedRangeSelectors: true, + } + qOpts := &query.Options{ + Start: time.Unix(0, 0), + End: time.Unix(100, 0), + Step: 10 * time.Second, + LookbackDelta: lookbackDelta, + ExtLookbackDelta: 1 * time.Hour, + } + + // Parse and build the logical plan to create real scanners. + expr, err := parser.NewParser(tc.query).ParseExpr() + testutil.Ok(t, err) + + planOpts := logicalplan.PlanOptions{} + lplan, err := logicalplan.NewFromAST(expr, qOpts, planOpts) + testutil.Ok(t, err) + optimizedPlan, _ := lplan.Optimize(logicalplan.AllOptimizers) + + realScanners, err := promscan.NewPrometheusScanners(storage, qOpts, optimizedPlan) + testutil.Ok(t, err) + defer realScanners.Close() + + spy := &hintsSpy{inner: realScanners} + + eng := engine.NewWithScanners(opts, spy) + q, err := eng.NewRangeQuery(context.Background(), storage, nil, tc.query, time.Unix(0, 0), time.Unix(100, 0), 10*time.Second) + testutil.Ok(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + testutil.Ok(t, res.Err) + + hints := spy.capturedHints() + if len(hints) == 0 { + t.Fatal("expected at least one NewMatrixSelector call, got none") + } + for i, h := range hints { + if h.Range != tc.expectedRange { + t.Errorf("hints[%d].Range = %d, want %d (original selector range)", i, h.Range, tc.expectedRange) + } + } + }) + } +} diff --git a/execution/execution.go b/execution/execution.go index 8df9332d8..91cfd8cbf 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -18,7 +18,10 @@ package execution import ( "context" + "maps" + "slices" "sort" + "strings" "time" "github.com/thanos-io/promql-engine/execution/aggregate" @@ -37,6 +40,7 @@ import ( "github.com/thanos-io/promql-engine/storage" "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" promstorage "github.com/prometheus/prometheus/storage" @@ -88,6 +92,11 @@ func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Sca func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { start, end := getTimeRangesForVectorSelector(e, opts, 0) + if e.Smoothed { + // Smoothed instant vectors need samples after the evaluation time + // for interpolation, so extend the end by lookbackDelta. + end += opts.LookbackDelta.Milliseconds() + } hints.Start = start hints.End = end op, err := scanners.NewVectorSelector(ctx, opts, hints, *e) @@ -181,14 +190,44 @@ func newAbsentOverTimeOperator(ctx context.Context, call *logicalplan.FunctionCa func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.MatrixSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // TODO(saswatamcode): Range vector result might need new operator // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 + vs := t.VectorSelector + + // Validate function whitelist for anchored/smoothed modifiers. + // Return a deferred error operator so the error surfaces at Exec time, + // matching Prometheus behavior expected by the test framework. + if vs.Anchored { + if _, ok := parse.AnchoredSafeFunctions[e.Func.Name]; !ok { + return newDeferredError(errors.Newf("anchored modifier can only be used with: %s - not with %s", strings.Join(slices.Sorted(maps.Keys(parse.AnchoredSafeFunctions)), ", "), e.Func.Name)), nil + } + } + if vs.Smoothed { + if _, ok := parse.SmoothedSafeFunctions[e.Func.Name]; !ok { + return newDeferredError(errors.Newf("smoothed modifier can only be used with: %s - not with %s", strings.Join(slices.Sorted(maps.Keys(parse.SmoothedSafeFunctions)), ", "), e.Func.Name)), nil + } + } + milliSecondRange := t.Range.Milliseconds() if parse.IsExtFunction(e.Func.Name) { milliSecondRange += opts.ExtLookbackDelta.Milliseconds() } - start, end := getTimeRangesForVectorSelector(t.VectorSelector, opts, milliSecondRange) + // For anchored/smoothed, widen the query window (Start/End) but keep + // hints.Range equal to the original selector range. Backends use Range for + // resolution selection and pushdown decisions and should see the true range. + queryRange := milliSecondRange + if vs.Anchored { + queryRange += opts.LookbackDelta.Milliseconds() + } + if vs.Smoothed { + queryRange += opts.LookbackDelta.Milliseconds() + } + + start, end := getTimeRangesForVectorSelector(t.VectorSelector, opts, queryRange) hints.Start = start hints.End = end + if vs.Smoothed { + hints.End += opts.LookbackDelta.Milliseconds() + } hints.Range = milliSecondRange op, err := scanners.NewMatrixSelector(ctx, opts, hints, *t, *e) if err != nil { @@ -409,6 +448,29 @@ func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLa return exchange.NewDuplicateLabelCheck(op, opts), nil } +// deferredError is an operator that returns an error on first Next() call, +// allowing validation errors to surface at evaluation time rather than +// query creation time, matching Prometheus behavior. +type deferredError struct { + model.VectorOperator + err error +} + +func newDeferredError(err error) model.VectorOperator { + return &deferredError{ + VectorOperator: noop.NewOperator(&query.Options{}), + err: err, + } +} + +func (d *deferredError) Next(context.Context, []model.StepVector) (int, error) { + return 0, d.err +} + +func (d *deferredError) Series(context.Context) ([]labels.Labels, error) { + return nil, d.err +} + // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) { start := opts.Start.UnixMilli() diff --git a/execution/noop/operator.go b/execution/noop/operator.go index 5b4f03dc3..8e9951270 100644 --- a/execution/noop/operator.go +++ b/execution/noop/operator.go @@ -26,6 +26,7 @@ func NewOperator(opts *query.Options) model.VectorOperator { false, // selectTimestamp 0, // shard 1, // numShards + false, // smoothed ) return &operator{VectorOperator: scanner} } diff --git a/execution/parse/functions.go b/execution/parse/functions.go index 0ba8173df..2721a9a79 100644 --- a/execution/parse/functions.go +++ b/execution/parse/functions.go @@ -34,6 +34,16 @@ func IsExtFunction(functionName string) bool { return ok } +// AnchoredSafeFunctions lists functions that support the anchored modifier. +var AnchoredSafeFunctions = map[string]struct{}{ + "resets": {}, "changes": {}, "rate": {}, "increase": {}, "delta": {}, +} + +// SmoothedSafeFunctions lists functions that support the smoothed modifier. +var SmoothedSafeFunctions = map[string]struct{}{ + "rate": {}, "increase": {}, "delta": {}, +} + func UnknownFunctionError(name string) error { msg := fmt.Sprintf("unknown function: %s", name) if _, ok := parser.Functions[name]; ok { diff --git a/execution/remote/operator.go b/execution/remote/operator.go index fa71bef07..61615b839 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -40,7 +40,7 @@ func NewExecution(query promql.Query, queryRangeStart, queryRangeEnd time.Time, opts: opts, queryRangeStart: queryRangeStart, queryRangeEnd: queryRangeEnd, - vectorSelector: promstorage.NewVectorSelector(storage, opts, 0, 0, false, 0, 1), + vectorSelector: promstorage.NewVectorSelector(storage, opts, 0, 0, false, 0, 1, false), } return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper) diff --git a/logicalplan/plan.go b/logicalplan/plan.go index c9cf3f8f5..4bde0d682 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -132,6 +132,14 @@ func getTimeRangesForSelector(qOpts *query.Options, n *parser.VectorSelector, pa start -= int64(qOpts.ExtLookbackDelta.Milliseconds()) } + if n.Anchored { + start -= qOpts.LookbackDelta.Milliseconds() + } + if n.Smoothed { + start -= qOpts.LookbackDelta.Milliseconds() + end += qOpts.LookbackDelta.Milliseconds() + } + return start, end } diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index bbf6c054a..b9624f867 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -27,6 +27,11 @@ type FunctionArgs struct { // quantile_over_time and predict_linear use one, so we only use one here. ScalarPoint float64 ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor) + + // Anchored/Smoothed modifiers for extended range selectors. + // See https://github.com/prometheus/proposals/blob/main/proposals/0052-extended-range-selectors-semantics.md + Anchored bool + Smoothed bool } type FunctionCall func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) @@ -326,13 +331,15 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) == 0 { return 0., nil, false, 0, nil } - return changes(f.Samples), nil, true, 0, nil + start := pickFirstSampleIndex(f) + return changes(f.Samples[start:]), nil, true, 0, nil }, "resets": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { if len(f.Samples) == 0 { return 0., nil, false, 0, nil } - return resets(f.Samples), nil, true, 0, nil + start := pickFirstSampleIndex(f) + return resets(f.Samples[start:]), nil, true, 0, nil }, "deriv": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { if len(f.Samples) < 2 { @@ -362,18 +369,27 @@ var rangeVectorFuncs = map[string]FunctionCall{ return v, fh, true, warn, nil }, "rate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } return extrapolatedRate(f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) }, "delta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } return extrapolatedRate(f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) }, "increase": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } @@ -639,6 +655,135 @@ func extendedRate(samples []Sample, isCounter, isRate bool, stepTime int64, sele return resultValue, nil } +// extendedRangeRate implements the anchored and smoothed rate/increase/delta semantics +// from Prometheus proposal 0052. It computes boundary values at the range start and end, +// either by picking real sample values (anchored) or interpolating (smoothed), then +// computes the difference with counter-reset correction. +// +// This implementation mirrors prometheus/prometheus promql/functions.go extendedRate. +func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selectRange, offset int64, smoothed bool) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if len(samples) == 0 { + return 0, nil, false, 0, nil + } + + if samples[0].V.H != nil { + return 0, nil, false, 0, errors.New("native histograms are not supported with anchored/smoothed modifiers") + } + + lastSampleIndex := len(samples) - 1 + rangeEnd := stepTime - offset + rangeStart := rangeEnd - selectRange + + // Find firstSampleIndex: the last sample at or before rangeStart, clamped to 0. + firstSampleIndex := max(sort.Search(lastSampleIndex, func(i int) bool { + return samples[i].T > rangeStart + })-1, 0) + + // For smoothed, extend lastSampleIndex to include the first sample at or after rangeEnd. + if smoothed { + lastSampleIndex = sort.Search(lastSampleIndex, func(i int) bool { + return samples[i].T >= rangeEnd + }) + if lastSampleIndex >= len(samples) { + lastSampleIndex = len(samples) - 1 + } + } + + // If the last sample is at or before rangeStart, there's nothing in the range. + if samples[lastSampleIndex].T <= rangeStart { + return 0, nil, false, 0, nil + } + + left := pickOrInterpolateLeft(samples, firstSampleIndex, rangeStart, smoothed, isCounter) + right := pickOrInterpolateRight(samples, lastSampleIndex, rangeEnd, smoothed, isCounter) + + resultValue := right - left + + if isCounter { + // Narrow to samples strictly within the range for counter-reset correction, + // since pickOrInterpolateLeft/Right already handle resets at boundaries. + corrFirst := firstSampleIndex + if samples[corrFirst].T <= rangeStart { + corrFirst++ + } + corrLast := lastSampleIndex + if corrLast >= 0 && samples[corrLast].T >= rangeEnd { + corrLast-- + } + + // Clamp to valid slice bounds. correctForCounterResets handles empty + // interior correctly — it still checks the right boundary value. + corrLast = max(corrLast, corrFirst-1) + resultValue += correctForCounterResets(left, right, samples[corrFirst:corrLast+1]) + } + + if isRate { + rangeDuration := float64(selectRange) / 1000.0 + if rangeDuration == 0 { + return 0, nil, false, 0, nil + } + resultValue /= rangeDuration + } + + return resultValue, nil, true, 0, nil +} + +// pickOrInterpolateLeft returns the value at the left boundary of the range. +// For anchored: uses the real sample value at/before rangeStart. +// For smoothed: interpolates between the samples bracketing rangeStart, with +// counter-reset awareness. +// If no sample exists before rangeStart, the first sample value is used. +func pickOrInterpolateLeft(samples []Sample, first int, rangeStart int64, smoothed, isCounter bool) float64 { + if smoothed && samples[first].T < rangeStart && first+1 < len(samples) { + return interpolateAt(samples[first], samples[first+1], rangeStart, isCounter) + } + return samples[first].V.F +} + +// pickOrInterpolateRight returns the value at the right boundary of the range. +// For anchored: uses the last sample at/before rangeEnd. +// For smoothed: interpolates between the samples bracketing rangeEnd, with +// counter-reset awareness. +// If no sample exists after rangeEnd, the last sample value is used. +func pickOrInterpolateRight(samples []Sample, last int, rangeEnd int64, smoothed, isCounter bool) float64 { + if smoothed && last > 0 && samples[last].T > rangeEnd { + return interpolateAt(samples[last-1], samples[last], rangeEnd, isCounter) + } + return samples[last].V.F +} + +// interpolateAt performs linear interpolation between two samples at the given timestamp. +// If isCounter is true and there is a counter reset (y2 < y1), it models the +// counter as starting from 0 post-reset by setting y1 to 0. +// +// This matches Prometheus v0.310.0's interpolate function in promql/functions.go. +func interpolateAt(left, right Sample, timestamp int64, isCounter bool) float64 { + y1 := left.V.F + y2 := right.V.F + if isCounter && y2 < y1 { + y1 = 0 + } + return y1 + (y2-y1)*float64(timestamp-left.T)/float64(right.T-left.T) +} + +// correctForCounterResets calculates the correction for counter resets across +// interior samples and the right boundary value. This matches Prometheus' +// correctForCounterResets in promql/functions.go. +func correctForCounterResets(left, right float64, points []Sample) float64 { + var correction float64 + prev := left + for _, p := range points { + if p.V.F < prev { + correction += prev + } + prev = p.V.F + } + if right < prev { + correction += prev + } + return correction +} + // histogramRate is a helper function for extrapolatedRate. It requires // points[0] to be a histogram. It returns nil if any other Point in points is // not a histogram. @@ -965,6 +1110,19 @@ func stdvarOverTime(points []Sample) (float64, bool, warnings.Warnings) { return ((aux + cAux) / count), true, warn } +// pickFirstSampleIndex returns the index of the last sample at or before the +// range start for anchored selectors. For non-anchored, returns 0. +// This matches Prometheus's pickFirstSampleIndex in promql/functions.go. +func pickFirstSampleIndex(f FunctionArgs) int { + if !f.Anchored || len(f.Samples) == 0 { + return 0 + } + rangeStart := f.StepTime - f.Offset - f.SelectRange + return max(sort.Search(len(f.Samples)-1, func(i int) bool { + return f.Samples[i].T > rangeStart + })-1, 0) +} + func changes(points []Sample) float64 { count := 0. diff --git a/ringbuffer/generic.go b/ringbuffer/generic.go index 5357f8237..3bf59db7c 100644 --- a/ringbuffer/generic.go +++ b/ringbuffer/generic.go @@ -46,6 +46,9 @@ type GenericRingBuffer struct { selectRange int64 extLookback int64 call FunctionCall + + anchored bool + smoothed bool } func New(ctx context.Context, size int, selectRange, offset int64, call FunctionCall) *GenericRingBuffer { @@ -63,6 +66,20 @@ func NewWithExtLookback(ctx context.Context, size int, selectRange, offset, extL } } +// NewAnchored creates a ring buffer for anchored range selectors. +func NewAnchored(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer { + buf := NewWithExtLookback(ctx, size, selectRange, offset, extLookback, call) + buf.anchored = true + return buf +} + +// NewSmoothed creates a ring buffer for smoothed range selectors. +func NewSmoothed(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer { + buf := NewWithExtLookback(ctx, size, selectRange, offset, extLookback, call) + buf.smoothed = true + return buf +} + func (r *GenericRingBuffer) SampleCount() int { c := 0 for _, s := range r.items { @@ -123,6 +140,12 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) { } if r.extLookback > 0 && drop > 0 && r.items[drop-1].T >= mint-r.extLookback { drop-- + // For anchored/smoothed, keep one additional lookback sample so that + // functions like changes/resets can compare against a sample before + // the range boundary (matching Prometheus pickFirstSampleIndex). + if (r.anchored || r.smoothed) && drop > 0 && r.items[drop-1].T >= mint-r.extLookback { + drop-- + } } keep := len(r.items) - drop @@ -142,6 +165,8 @@ func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarA ScalarPoint: scalarArg, ScalarPoint2: scalarArg2, // only for double_exponential_smoothing MetricAppearedTs: metricAppearedTs, + Anchored: r.anchored, + Smoothed: r.smoothed, }) } diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index 1b876c243..c8d959b67 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -54,13 +54,17 @@ type matrixSelector struct { fhReader *histogram.FloatHistogram opts *query.Options - numSteps int - mint int64 - maxt int64 - step int64 - selectRange int64 - offset int64 - isExtFunction bool + numSteps int + mint int64 + maxt int64 + step int64 + selectRange int64 + offset int64 + isExtFunction bool + isAnchored bool + isSmoothed bool + needsExtLookback bool + lookbackDelta int64 currentStep int64 currentSeries int64 @@ -90,11 +94,13 @@ func NewMatrixSelector( selectRange, offset time.Duration, batchSize int64, shard, numShard int, + anchored, smoothed bool, ) (model.VectorOperator, error) { call, err := ringbuffer.NewRangeVectorFunc(functionName) if err != nil { return nil, err } + isExt := parse.IsExtFunction(functionName) m := &matrixSelector{ storage: selector, call: call, @@ -108,7 +114,12 @@ func NewMatrixSelector( mint: opts.Start.UnixMilli(), maxt: opts.End.UnixMilli(), step: opts.Step.Milliseconds(), - isExtFunction: parse.IsExtFunction(functionName), + isExtFunction: isExt, + isAnchored: anchored, + isSmoothed: smoothed, + + needsExtLookback: isExt || anchored || smoothed, + lookbackDelta: opts.LookbackDelta.Milliseconds(), selectRange: selectRange.Milliseconds(), offset: offset.Milliseconds(), @@ -193,8 +204,11 @@ func (o *matrixSelector) Next(ctx context.Context, buf []model.StepVector) (int, for currStep := 0; currStep < n && seriesTs <= o.maxt; currStep++ { maxt := seriesTs - o.offset mint := maxt - o.selectRange + if o.isSmoothed { + maxt += o.lookbackDelta + } - if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil { + if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.needsExtLookback, o.isAnchored || o.isSmoothed); err != nil { return 0, err } // TODO(saswatamcode): Handle multi-arg functions for matrixSelectors. @@ -317,6 +331,12 @@ func (o *matrixSelector) shouldCheckSampleLimit(firstSeries int64) bool { } func (o *matrixSelector) newBuffer(ctx context.Context) ringbuffer.Buffer { + if o.isAnchored { + return ringbuffer.NewAnchored(ctx, 8, o.selectRange, o.offset, o.lookbackDelta-1, o.call) + } + if o.isSmoothed { + return ringbuffer.NewSmoothed(ctx, 8, o.selectRange, o.offset, o.lookbackDelta-1, o.call) + } if ringbuffer.UseStreamingRingBuffers(*o.opts, o.selectRange) { switch o.functionName { case "rate": @@ -374,7 +394,16 @@ func (m *matrixScanner) selectPoints( mint, maxt, evalt int64, fh *histogram.FloatHistogram, isExtFunction bool, + recoverLastSample bool, ) error { + // For anchored/smoothed selectors: push lastSample back into the buffer + // before Reset so the ext lookback retention logic can decide whether to + // keep it as the boundary sample. lastSample always has the largest T, + // so append order is preserved. + if recoverLastSample && m.lastSample.T != math.MinInt64 && m.lastSample.T <= maxt { + m.buffer.Push(m.lastSample.T, m.lastSample.V) + m.lastSample.T = math.MinInt64 + } m.buffer.Reset(mint, evalt) if m.lastSample.T > maxt { return nil @@ -383,7 +412,6 @@ func (m *matrixScanner) selectPoints( if bufMaxt := m.buffer.MaxT() + 1; bufMaxt > mint { mint = bufMaxt } - mint = max(mint, m.buffer.MaxT()+1) if m.lastSample.T > mint { m.buffer.Push(m.lastSample.T, m.lastSample.V) m.lastSample.T = math.MinInt64 @@ -426,10 +454,15 @@ func (m *matrixScanner) selectPoints( m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil return nil } - if isExtFunction { + if isExtFunction || recoverLastSample { if t > mint || !appendedPointBeforeMint { m.buffer.Push(t, ringbuffer.Value{F: v}) appendedPointBeforeMint = true + } else if recoverLastSample { + // For anchored/smoothed, keep two samples at/before mint so + // that functions like changes/resets can compare against + // the sample preceding the range boundary. + m.buffer.Push(t, ringbuffer.Value{F: v}) } else { m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) { s.T, s.V.F, s.V.H = t, v, nil diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index bac38431c..e457f67d8 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -75,6 +75,7 @@ func (p Scanners) NewVectorSelector( logicalNode.SelectTimestamp, i, opts.DecodingConcurrency, + logicalNode.Smoothed, ), 2, opts) operators = append(operators, operator) } @@ -149,6 +150,8 @@ func (p Scanners) NewMatrixSelector( vs.BatchSize, i, opts.DecodingConcurrency, + vs.Anchored, + vs.Smoothed, ) if err != nil { return nil, err diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 915c06c3b..7dfbccf64 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -52,6 +52,7 @@ type vectorSelector struct { numShards int selectTimestamp bool + smoothed bool opts *query.Options lastTrackedSamples int @@ -65,6 +66,7 @@ func NewVectorSelector( batchSize int64, selectTimestamp bool, shard, numShards int, + smoothed bool, ) model.VectorOperator { o := &vectorSelector{ storage: selector, @@ -82,6 +84,7 @@ func NewVectorSelector( numShards: numShards, selectTimestamp: selectTimestamp, + smoothed: smoothed, opts: queryOpts, } @@ -156,7 +159,18 @@ func (o *vectorSelector) Next(ctx context.Context, buf []model.StepVector) (int, ) for currStep := 0; currStep < n && seriesTs <= o.maxt; currStep++ { currStepSamples = 0 - t, v, h, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + var ( + t int64 + v float64 + h *histogram.FloatHistogram + ok bool + err error + ) + if o.smoothed { + t, v, ok, err = selectPointSmoothed(series.samples, seriesTs, o.lookbackDelta, o.offset) + } else { + t, v, h, ok, err = selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + } if err != nil { return 0, err } @@ -254,6 +268,54 @@ func (o *vectorSelector) shouldCheckSampleLimit(fromSeries int64) bool { return isEndOfBatch || isLastSeries } +// selectPointSmoothed returns an interpolated value at the given timestamp by +// looking at samples in [ts-lookbackDelta, ts+lookbackDelta]. When a sample +// exists exactly at ts it is returned as-is. When samples exist on both sides, +// linear interpolation is performed. When only a previous sample exists, its +// value is carried forward. +func selectPointSmoothed(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, bool, error) { + refTime := ts - offset + + valueType := it.Seek(refTime) + switch valueType { + case chunkenc.ValNone: + if it.Err() != nil { + return 0, 0, false, it.Err() + } + case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: + // Histograms not supported for smoothed instant vectors. + return 0, 0, false, nil + case chunkenc.ValFloat: + seekT, seekV := it.At() + if value.IsStaleNaN(seekV) { + return 0, 0, false, nil + } + if seekT == refTime { + // Exact match. + return ts, seekV, true, nil + } + // seekT > refTime: we have the "next" sample. Check for a previous one. + if seekT <= refTime+lookbackDelta { + prevT, prevV, _, prevOK := it.PeekPrev() + if prevOK && !value.IsStaleNaN(prevV) && prevT > refTime-lookbackDelta { + // Interpolate between prev and next. + v := prevV + (seekV-prevV)*float64(refTime-prevT)/float64(seekT-prevT) + return ts, v, true, nil + } + } + default: + panic(errors.Newf("unknown value type %v", valueType)) + } + + // No sample at or after refTime (or next sample is beyond lookback). + // Fall back to previous sample (carry forward). + prevT, prevV, _, prevOK := it.PeekPrev() + if !prevOK || prevT <= refTime-lookbackDelta || value.IsStaleNaN(prevV) { + return 0, 0, false, nil + } + return ts, prevV, true, nil +} + // TODO(fpetkovski): Add max samples limit. func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { refTime := ts - offset