diff --git a/engine/engine.go b/engine/engine.go index 3a4c47b7d..f0929e42c 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -14,12 +14,12 @@ import ( "time" "github.com/thanos-io/promql-engine/execution" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" engstorage "github.com/thanos-io/promql-engine/storage" promstorage "github.com/thanos-io/promql-engine/storage/prometheus" "github.com/thanos-io/promql-engine/warnings" @@ -83,33 +83,31 @@ type Opts struct { DisableDuplicateLabelChecks bool } -// QueryOpts implements promql.QueryOpts but allows to override more engine default options. -type QueryOpts struct { - // These values are used to implement promql.QueryOpts, they have weird "Param" suffix because - // they are accessed by methods of the same name. - LookbackDeltaParam time.Duration - EnablePerStepStatsParam bool +// QueryOptions allows overriding engine default options on a per-query basis. +type QueryOptions struct { + // LookbackDelta overrides the engine's default lookback delta for this query. + LookbackDelta time.Duration - // DecodingConcurrency can be used to override the DecodingConcurrency engine setting. + // EnablePerStepStats enables per-step statistics for this query. + EnablePerStepStats bool + + // DecodingConcurrency overrides the engine's default decoding concurrency for this query. DecodingConcurrency int - // SelectorBatchSize can be used to override the SelectorBatchSize engine setting. + // SelectorBatchSize overrides the engine's default selector batch size for this query. SelectorBatchSize int64 - // LogicalOptimizers can be used to override the LogicalOptimizers engine setting. + // LogicalOptimizers overrides the engine's default logical optimizers for this query. LogicalOptimizers []logicalplan.Optimizer } -func (opts QueryOpts) LookbackDelta() time.Duration { return opts.LookbackDeltaParam } -func (opts QueryOpts) EnablePerStepStats() bool { return opts.EnablePerStepStatsParam } - -func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts { +func fromPromQLOpts(opts promql.QueryOpts) *QueryOptions { if opts == nil { - return &QueryOpts{} + return &QueryOptions{} } - return &QueryOpts{ - LookbackDeltaParam: opts.LookbackDelta(), - EnablePerStepStatsParam: opts.EnablePerStepStats(), + return &QueryOptions{ + LookbackDelta: opts.LookbackDelta(), + EnablePerStepStats: opts.EnablePerStepStats(), } } @@ -229,7 +227,7 @@ type Engine struct { noStepSubqueryIntervalFn func(time.Duration) time.Duration } -func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) { +func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, ts time.Time) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, qs) if err != nil { return nil, err @@ -245,15 +243,15 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts // the presentation layer and not when computing the results. resultSort := newResultSort(expr) - qOpts := e.makeQueryOpts(ts, ts, 0, opts) - if qOpts.StepsBatch > 64 { + execOpts := e.makeExecutionOpts(ts, ts, 0, opts) + if execOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } - initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts) + initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts) if err != nil { return nil, errors.Wrap(err, "creating plan") } @@ -262,18 +260,18 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - scanners, err := e.storageScanners(q, qOpts, optimizedPlan) + scanners, err := e.storageScanners(q, execOpts, optimizedPlan) if err != nil { return nil, errors.Wrap(err, "creating storage scanners") } - exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, qOpts) + exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, execOpts) if err != nil { return nil, err } e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: qOpts}, + Query: &Query{exec: exec, opts: execOpts}, engine: e, plan: optimizedPlan, warns: warns, @@ -284,38 +282,38 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts }, nil } -func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) { +func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, ts time.Time) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, root.String()) if err != nil { return nil, err } defer e.activeQueryTracker.Delete(idx) - qOpts := e.makeQueryOpts(ts, ts, 0, opts) - if qOpts.StepsBatch > 64 { + execOpts := e.makeExecutionOpts(ts, ts, 0, opts) + if execOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } - lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) + lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - scnrs, err := e.storageScanners(q, qOpts, lplan) + scnrs, err := e.storageScanners(q, execOpts, lplan) if err != nil { return nil, errors.Wrap(err, "creating storage scanners") } - exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) + exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts) if err != nil { return nil, err } e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: qOpts}, + Query: &Query{exec: exec, opts: execOpts}, engine: e, plan: lplan, warns: warns, @@ -327,7 +325,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab }, nil } -func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { +func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, qs) if err != nil { return nil, err @@ -343,15 +341,15 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } - qOpts := e.makeQueryOpts(start, end, step, opts) - if qOpts.StepsBatch > 64 { + execOpts := e.makeExecutionOpts(start, end, step, opts) + if execOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } - initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts) + initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts) if err != nil { return nil, errors.Wrap(err, "creating plan") } @@ -360,19 +358,19 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - scnrs, err := e.storageScanners(q, qOpts, optimizedPlan) + scnrs, err := e.storageScanners(q, execOpts, optimizedPlan) if err != nil { return nil, errors.Wrap(err, "creating storage scanners") } - exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, qOpts) + exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, execOpts) if err != nil { return nil, err } e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: qOpts}, + Query: &Query{exec: exec, opts: execOpts}, engine: e, plan: optimizedPlan, warns: warns, @@ -381,37 +379,37 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * }, nil } -func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) { +func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, root.String()) if err != nil { return nil, err } defer e.activeQueryTracker.Delete(idx) - qOpts := e.makeQueryOpts(start, end, step, opts) - if qOpts.StepsBatch > 64 { + execOpts := e.makeExecutionOpts(start, end, step, opts) + if execOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } - lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) + lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) - scnrs, err := e.storageScanners(q, qOpts, lplan) + scnrs, err := e.storageScanners(q, execOpts, lplan) if err != nil { return nil, errors.Wrap(err, "creating storage scanners") } ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) + exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts) if err != nil { return nil, err } e.metrics.totalQueries.Inc() return &compatibilityQuery{ - Query: &Query{exec: exec, opts: qOpts}, + Query: &Query{exec: exec, opts: execOpts}, engine: e, plan: lplan, warns: warns, @@ -432,8 +430,8 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step) } -func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options { - res := &query.Options{ +func (e *Engine) makeExecutionOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOptions) *execopts.Options { + res := &execopts.Options{ Start: start, End: end, Step: step, @@ -449,11 +447,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio return res } - if opts.LookbackDelta() > 0 { - res.LookbackDelta = opts.LookbackDelta() + if opts.LookbackDelta > 0 { + res.LookbackDelta = opts.LookbackDelta } - if opts.EnablePerStepStats() { - res.EnablePerStepStats = opts.EnablePerStepStats() + if opts.EnablePerStepStats { + res.EnablePerStepStats = opts.EnablePerStepStats } if opts.DecodingConcurrency != 0 { @@ -463,30 +461,30 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio return res } -func (e *Engine) getLogicalOptimizers(opts *QueryOpts) []logicalplan.Optimizer { +func (e *Engine) getLogicalOptimizers(opts *QueryOptions) []logicalplan.Optimizer { var optimizers []logicalplan.Optimizer - if len(opts.LogicalOptimizers) != 0 { + if opts != nil && len(opts.LogicalOptimizers) != 0 { optimizers = slices.Clone(opts.LogicalOptimizers) } else { optimizers = slices.Clone(e.logicalOptimizers) } selectorBatchSize := e.selectorBatchSize - if opts.SelectorBatchSize != 0 { + if opts != nil && opts.SelectorBatchSize != 0 { selectorBatchSize = opts.SelectorBatchSize } return append(optimizers, logicalplan.SelectorBatchSize{Size: selectorBatchSize}) } -func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) { +func (e *Engine) storageScanners(queryable storage.Queryable, execOpts *execopts.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) { if e.scanners == nil { - return promstorage.NewPrometheusScanners(queryable, qOpts, lplan) + return promstorage.NewPrometheusScanners(queryable, execOpts, lplan) } return e.scanners, nil } type Query struct { exec model.VectorOperator - opts *query.Options + opts *execopts.Options } // Explain returns human-readable explanation of the created executor. diff --git a/engine/engine_test.go b/engine/engine_test.go index 6d4b03b5c..a760372a0 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -22,10 +22,10 @@ import ( "time" "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/storage/prometheus" "github.com/thanos-io/promql-engine/warnings" @@ -2482,7 +2482,7 @@ type scannersWithWarns struct { promScanners *prometheus.Scanners } -func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) { +func newScannersWithWarns(warn error, qOpts *execopts.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) { scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{ MockQuerier: storage.NoopQuerier(), }, qOpts, lplan) @@ -2497,12 +2497,12 @@ func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Pl func (s *scannersWithWarns) Close() error { return nil } -func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) { +func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) { warnings.AddToContext(s.warn, ctx) return s.promScanners.NewVectorSelector(ctx, opts, hints, selector) } -func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) { +func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) { warnings.AddToContext(s.warn, ctx) return s.promScanners.NewMatrixSelector(ctx, opts, hints, selector, call) } @@ -2513,7 +2513,7 @@ func TestWarningsPlanCreation(t *testing.T) { expectedWarn = errors.New("test warning") ) - scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{}, nil) + scnrs, err := newScannersWithWarns(expectedWarn, &execopts.Options{}, nil) testutil.Ok(t, err) newEngine := engine.NewWithScanners(opts, scnrs) q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second) diff --git a/engine/projection_test.go b/engine/projection_test.go index 3d8fc27c8..e48ed093b 100644 --- a/engine/projection_test.go +++ b/engine/projection_test.go @@ -242,7 +242,7 @@ func TestProjectionWithFuzz(t *testing.T) { Queryable: storage, } - normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime) + normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime) testutil.Ok(t, err) defer normalQuery.Close() normalResult := normalQuery.Exec(ctx) @@ -252,7 +252,7 @@ func TestProjectionWithFuzz(t *testing.T) { } testutil.Ok(t, normalResult.Err, "query: %s", query) - projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, &engine.QueryOpts{}, query, queryTime) + projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, nil, query, queryTime) testutil.Ok(t, err) defer projectionQuery.Close() diff --git a/engine/propagate_selector_test.go b/engine/propagate_selector_test.go index d1fd33b12..814dce11a 100644 --- a/engine/propagate_selector_test.go +++ b/engine/propagate_selector_test.go @@ -120,7 +120,7 @@ func TestPropagateMatchers(t *testing.T) { } t.Run(fmt.Sprintf("Query_%d", i), func(t *testing.T) { - normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime) + normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime) testutil.Ok(t, err) defer normalQuery.Close() normalResult := normalQuery.Exec(ctx) @@ -130,7 +130,7 @@ func TestPropagateMatchers(t *testing.T) { } testutil.Ok(t, normalResult.Err, "query: %s", query) - optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime) + optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, nil, query, queryTime) testutil.Ok(t, err) defer optimizedQuery.Close() diff --git a/engine/user_defined_test.go b/engine/user_defined_test.go index 23be26a0c..7e6c3c0ff 100644 --- a/engine/user_defined_test.go +++ b/engine/user_defined_test.go @@ -10,9 +10,9 @@ import ( "time" "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" @@ -60,7 +60,7 @@ load 30s type injectVectorSelector struct{} -func (i injectVectorSelector) Optimize(plan logicalplan.Node, _ *query.Options) (logicalplan.Node, annotations.Annotations) { +func (i injectVectorSelector) Optimize(plan logicalplan.Node, _ *execopts.Options) (logicalplan.Node, annotations.Annotations) { logicalplan.TraverseBottomUp(nil, &plan, func(_, current *logicalplan.Node) bool { switch t := (*current).(type) { case *logicalplan.VectorSelector: @@ -77,7 +77,7 @@ type logicalVectorSelector struct { *logicalplan.VectorSelector } -func (c logicalVectorSelector) MakeExecutionOperator(_ context.Context, vectors *model.VectorPool, opts *query.Options, _ storage.SelectHints) (model.VectorOperator, error) { +func (c logicalVectorSelector) MakeExecutionOperator(_ context.Context, vectors *model.VectorPool, opts *execopts.Options, _ storage.SelectHints) (model.VectorOperator, error) { oper := &vectorSelectorOperator{ stepsBatch: opts.StepsBatch, vectors: vectors, diff --git a/execution/aggregate/count_values.go b/execution/aggregate/count_values.go index 7d527952c..08fe57ac5 100644 --- a/execution/aggregate/count_values.go +++ b/execution/aggregate/count_values.go @@ -10,9 +10,9 @@ import ( "strconv" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" prommodel "github.com/prometheus/common/model" @@ -37,7 +37,7 @@ type countValuesOperator struct { once sync.Once } -func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param string, by bool, grouping []string, opts *query.Options) model.VectorOperator { +func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param string, by bool, grouping []string, opts *execopts.Options) model.VectorOperator { // Grouping labels need to be sorted in order for metric hashing to work. // https://github.com/prometheus/prometheus/blob/8ed39fdab1ead382a354e45ded999eb3610f8d5f/model/labels/labels.go#L162-L181 slices.Sort(grouping) diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index 5d2477bef..27fd5a1ab 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -9,10 +9,10 @@ import ( "math" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/efficientgo/core/errors" @@ -49,7 +49,7 @@ func NewHashAggregate( aggregation parser.ItemType, by bool, labels []string, - opts *query.Options, + opts *execopts.Options, ) (model.VectorOperator, error) { // Verify that the aggregation is supported. if _, err := newScalarAccumulator(aggregation); err != nil { diff --git a/execution/aggregate/khashaggregate.go b/execution/aggregate/khashaggregate.go index 4a2ea55d7..f9b7f7daf 100644 --- a/execution/aggregate/khashaggregate.go +++ b/execution/aggregate/khashaggregate.go @@ -11,9 +11,9 @@ import ( "sort" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/efficientgo/core/errors" @@ -51,7 +51,7 @@ func NewKHashAggregate( aggregation parser.ItemType, by bool, labels []string, - opts *query.Options, + opts *execopts.Options, ) (model.VectorOperator, error) { var compare func(float64, float64) bool diff --git a/execution/binary/scalar.go b/execution/binary/scalar.go index 65c63cd14..50c2dc324 100644 --- a/execution/binary/scalar.go +++ b/execution/binary/scalar.go @@ -8,10 +8,10 @@ import ( "fmt" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/prometheus/prometheus/model/histogram" @@ -44,7 +44,7 @@ func NewScalar( rhsType parser.ValueType, opType parser.ItemType, returnBool bool, - opts *query.Options, + opts *execopts.Options, ) (model.VectorOperator, error) { op := &scalarOperator{ pool: pool, diff --git a/execution/binary/vector.go b/execution/binary/vector.go index 7d1682701..c85336ad3 100644 --- a/execution/binary/vector.go +++ b/execution/binary/vector.go @@ -8,10 +8,10 @@ import ( "fmt" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/cespare/xxhash/v2" @@ -63,7 +63,7 @@ func NewVectorOperator( matching *parser.VectorMatching, opType parser.ItemType, returnBool bool, - opts *query.Options, + opts *execopts.Options, ) (model.VectorOperator, error) { op := &vectorOperator{ pool: pool, diff --git a/execution/exchange/coalesce.go b/execution/exchange/coalesce.go index c0a2cadd3..09af517fe 100644 --- a/execution/exchange/coalesce.go +++ b/execution/exchange/coalesce.go @@ -9,9 +9,9 @@ import ( "sync" "sync/atomic" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -48,7 +48,7 @@ type coalesce struct { sampleOffsets []uint64 } -func NewCoalesce(pool *model.VectorPool, opts *query.Options, batchSize int64, operators ...model.VectorOperator) model.VectorOperator { +func NewCoalesce(pool *model.VectorPool, opts *execopts.Options, batchSize int64, operators ...model.VectorOperator) model.VectorOperator { if len(operators) == 1 { return operators[0] } diff --git a/execution/exchange/concurrent.go b/execution/exchange/concurrent.go index 78ea488cb..2e4b68a76 100644 --- a/execution/exchange/concurrent.go +++ b/execution/exchange/concurrent.go @@ -8,9 +8,9 @@ import ( "fmt" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -27,7 +27,7 @@ type concurrencyOperator struct { bufferSize int } -func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Options) model.VectorOperator { +func NewConcurrent(next model.VectorOperator, bufferSize int, opts *execopts.Options) model.VectorOperator { oper := &concurrencyOperator{ next: next, buffer: make(chan maybeStepVector, bufferSize), diff --git a/execution/exchange/dedup.go b/execution/exchange/dedup.go index 95c8d0e31..e8c7408cb 100644 --- a/execution/exchange/dedup.go +++ b/execution/exchange/dedup.go @@ -7,9 +7,9 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/histogram" @@ -41,7 +41,7 @@ type dedupOperator struct { dedupCache dedupCache } -func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator, opts *query.Options) model.VectorOperator { +func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator, opts *execopts.Options) model.VectorOperator { oper := &dedupOperator{ next: next, pool: pool, diff --git a/execution/exchange/duplicate_label.go b/execution/exchange/duplicate_label.go index c6599067f..095a43389 100644 --- a/execution/exchange/duplicate_label.go +++ b/execution/exchange/duplicate_label.go @@ -7,10 +7,10 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -25,7 +25,7 @@ type duplicateLabelCheckOperator struct { c []uint64 } -func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) model.VectorOperator { +func NewDuplicateLabelCheck(next model.VectorOperator, opts *execopts.Options) model.VectorOperator { oper := &duplicateLabelCheckOperator{ next: next, } diff --git a/query/options.go b/execution/execopts/options.go similarity index 99% rename from query/options.go rename to execution/execopts/options.go index 6bd01f665..ab0e44aad 100644 --- a/query/options.go +++ b/execution/execopts/options.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Community Authors. // Licensed under the Apache License 2.0. -package query +package execopts import ( "time" diff --git a/execution/execution.go b/execution/execution.go index 7f170fda3..361d69fb2 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -24,6 +24,7 @@ import ( "github.com/thanos-io/promql-engine/execution/aggregate" "github.com/thanos-io/promql-engine/execution/binary" "github.com/thanos-io/promql-engine/execution/exchange" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/function" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/noop" @@ -33,7 +34,6 @@ import ( "github.com/thanos-io/promql-engine/execution/step_invariant" "github.com/thanos-io/promql-engine/execution/unary" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/storage" "github.com/efficientgo/core/errors" @@ -44,7 +44,7 @@ import ( // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options) (model.VectorOperator, error) { +func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *execopts.Options) (model.VectorOperator, error) { hints := promstorage.SelectHints{ Start: opts.Start.UnixMilli(), End: opts.End.UnixMilli(), @@ -53,7 +53,7 @@ func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, o return newOperator(ctx, expr, storage, opts, hints) } -func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { switch e := expr.(type) { case *logicalplan.NumberLiteral: return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil @@ -86,14 +86,14 @@ func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Sca } } -func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { start, end := getTimeRangesForVectorSelector(e, opts, 0) hints.Start = start hints.End = end return scanners.NewVectorSelector(ctx, opts, hints, *e) } -func newCall(ctx context.Context, e *logicalplan.FunctionCall, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newCall(ctx context.Context, e *logicalplan.FunctionCall, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { hints.Func = e.Func.Name hints.Grouping = nil hints.By = false @@ -136,7 +136,7 @@ func newCall(ctx context.Context, e *logicalplan.FunctionCall, scanners storage. return newInstantVectorFunction(ctx, e, scanners, opts, hints) } -func newAbsentOverTimeOperator(ctx context.Context, call *logicalplan.FunctionCall, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newAbsentOverTimeOperator(ctx context.Context, call *logicalplan.FunctionCall, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { switch arg := call.Args[0].(type) { case *logicalplan.Subquery: matrixCall := &logicalplan.FunctionCall{ @@ -174,7 +174,7 @@ func newAbsentOverTimeOperator(ctx context.Context, call *logicalplan.FunctionCa } } -func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.MatrixSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.MatrixSelector, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // TODO(saswatamcode): Range vector result might need new operator // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 milliSecondRange := t.Range.Milliseconds() @@ -189,13 +189,13 @@ func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t return scanners.NewMatrixSelector(ctx, opts, hints, *t, *e) } -func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.Subquery, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.Subquery, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // TODO: We dont implement ext functions if parse.IsExtFunction(e.Func.Name) { return nil, parse.ErrNotImplemented } - nOpts := query.NestedOptionsForSubquery(opts, t.Step, t.Range, t.Offset) + nOpts := execopts.NestedOptionsForSubquery(opts, t.Step, t.Range, t.Offset) hints.Start = nOpts.Start.UnixMilli() hints.End = nOpts.End.UnixMilli() @@ -242,7 +242,7 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, scalarArg2, &outerOpts, e, t) } -func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { nextOperators := make([]model.VectorOperator, 0, len(e.Args)) for i := range e.Args { // Strings don't need an operator @@ -259,7 +259,7 @@ func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, return function.NewFunctionOperator(e, nextOperators, opts.StepsBatch, opts) } -func newAggregateExpression(ctx context.Context, e *logicalplan.Aggregation, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newAggregateExpression(ctx context.Context, e *logicalplan.Aggregation, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { hints.Func = e.Op.String() hints.Grouping = e.Grouping hints.By = !e.Without @@ -294,14 +294,14 @@ func newAggregateExpression(ctx context.Context, e *logicalplan.Aggregation, sca return exchange.NewConcurrent(next, 2, opts), nil } -func newBinaryExpression(ctx context.Context, e *logicalplan.Binary, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newBinaryExpression(ctx context.Context, e *logicalplan.Binary, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { if e.LHS.ReturnType() == parser.ValueTypeScalar || e.RHS.ReturnType() == parser.ValueTypeScalar { return newScalarBinaryOperator(ctx, e, scanners, opts, hints) } return newVectorBinaryOperator(ctx, e, scanners, opts, hints) } -func newVectorBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newVectorBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { leftOperator, err := newOperator(ctx, e.LHS, storage, opts, hints) if err != nil { return nil, err @@ -313,7 +313,7 @@ func newVectorBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage return binary.NewVectorOperator(model.NewVectorPool(opts.StepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) } -func newScalarBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newScalarBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { lhs, err := newOperator(ctx, e.LHS, storage, opts, hints) if err != nil { return nil, err @@ -326,7 +326,7 @@ func newScalarBinaryOperator(ctx context.Context, e *logicalplan.Binary, storage return binary.NewScalar(model.NewVectorPoolWithSize(opts.StepsBatch, 1), lhs, rhs, e.LHS.ReturnType(), e.RHS.ReturnType(), e.Op, e.ReturnBool, opts) } -func newUnaryExpression(ctx context.Context, e *logicalplan.Unary, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newUnaryExpression(ctx context.Context, e *logicalplan.Unary, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { next, err := newOperator(ctx, e.Expr, scanners, opts, hints) if err != nil { return nil, err @@ -343,7 +343,7 @@ func newUnaryExpression(ctx context.Context, e *logicalplan.Unary, scanners stor } } -func newStepInvariantExpression(ctx context.Context, e *logicalplan.StepInvariantExpr, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newStepInvariantExpression(ctx context.Context, e *logicalplan.StepInvariantExpr, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { switch t := e.Expr.(type) { case *logicalplan.NumberLiteral: return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, t.Val), nil @@ -355,7 +355,7 @@ func newStepInvariantExpression(ctx context.Context, e *logicalplan.StepInvarian return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(opts.StepsBatch, 1), next, e.Expr, opts) } -func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // The Deduplicate operator will deduplicate samples using a last-sample-wins strategy. // Sorting engines by MaxT ensures that samples produced due to // staleness will be overwritten and corrected by samples coming from @@ -377,7 +377,7 @@ func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners s return exchange.NewConcurrent(dedup, 2, opts), nil } -func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // Create a new remote query scoped to the calculated start time. qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, e.QueryRangeEnd, opts.Step) if err != nil { @@ -393,7 +393,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts return exchange.NewConcurrent(remoteExec, 2, opts), nil } -func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLabels, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { +func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLabels, storage storage.Scanners, opts *execopts.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { op, err := newOperator(ctx, e.Expr, storage, opts, hints) if err != nil { return nil, err @@ -402,7 +402,7 @@ func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLa } // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. -func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) { +func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *execopts.Options, evalRange int64) (int64, int64) { start := opts.Start.UnixMilli() end := opts.End.UnixMilli() if n.Timestamp != nil { diff --git a/execution/function/absent.go b/execution/function/absent.go index e605ab62c..d4575faa1 100644 --- a/execution/function/absent.go +++ b/execution/function/absent.go @@ -7,10 +7,10 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -27,7 +27,7 @@ func newAbsentOperator( funcExpr *logicalplan.FunctionCall, pool *model.VectorPool, next model.VectorOperator, - opts *query.Options, + opts *execopts.Options, ) model.VectorOperator { oper := &absentOperator{ funcExpr: funcExpr, diff --git a/execution/function/histogram.go b/execution/function/histogram.go index 29d5179e9..1cc79e3ac 100644 --- a/execution/function/histogram.go +++ b/execution/function/histogram.go @@ -10,11 +10,11 @@ import ( "strconv" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/cespare/xxhash/v2" @@ -62,7 +62,7 @@ func newHistogramOperator( pool *model.VectorPool, call *logicalplan.FunctionCall, nextOps []model.VectorOperator, - opts *query.Options, + opts *execopts.Options, ) model.VectorOperator { o := &histogramOperator{ pool: pool, diff --git a/execution/function/operator.go b/execution/function/operator.go index a8a6355be..8faf28afa 100644 --- a/execution/function/operator.go +++ b/execution/function/operator.go @@ -9,19 +9,19 @@ import ( "math" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" ) -func NewFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { +func NewFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *execopts.Options) (model.VectorOperator, error) { // Some functions need to be handled in special operators switch funcExpr.Func.Name { case "scalar": @@ -44,7 +44,7 @@ func NewFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.Vec return newInstantVectorFunctionOperator(funcExpr, nextOps, stepsBatch, opts) } -func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { +func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch int, opts *execopts.Options) (model.VectorOperator, error) { call, ok := noArgFuncs[funcExpr.Func.Name] if !ok { return nil, parse.UnknownFunctionError(funcExpr.Func.Name) @@ -92,7 +92,7 @@ type functionOperator struct { scalarPoints [][]float64 } -func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { +func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *execopts.Options) (model.VectorOperator, error) { call, ok := instantVectorFuncs[funcExpr.Func.Name] if !ok { return nil, parse.UnknownFunctionError(funcExpr.Func.Name) diff --git a/execution/function/relabel.go b/execution/function/relabel.go index 383b89c72..5b83bfc70 100644 --- a/execution/function/relabel.go +++ b/execution/function/relabel.go @@ -9,10 +9,10 @@ import ( "strings" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" prommodel "github.com/prometheus/common/model" @@ -29,7 +29,7 @@ type relabelOperator struct { func newRelabelOperator( next model.VectorOperator, funcExpr *logicalplan.FunctionCall, - opts *query.Options, + opts *execopts.Options, ) model.VectorOperator { oper := &relabelOperator{ next: next, diff --git a/execution/function/scalar.go b/execution/function/scalar.go index 97e56be9a..2ead3cf92 100644 --- a/execution/function/scalar.go +++ b/execution/function/scalar.go @@ -7,9 +7,9 @@ import ( "context" "math" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -19,7 +19,7 @@ type scalarOperator struct { next model.VectorOperator } -func newScalarOperator(pool *model.VectorPool, next model.VectorOperator, opts *query.Options) model.VectorOperator { +func newScalarOperator(pool *model.VectorPool, next model.VectorOperator, opts *execopts.Options) model.VectorOperator { oper := &scalarOperator{ pool: pool, next: next, diff --git a/execution/function/timestamp.go b/execution/function/timestamp.go index 997ea1c8a..87fc5b612 100644 --- a/execution/function/timestamp.go +++ b/execution/function/timestamp.go @@ -7,10 +7,10 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -22,7 +22,7 @@ type timestampOperator struct { once sync.Once } -func newTimestampOperator(next model.VectorOperator, opts *query.Options) model.VectorOperator { +func newTimestampOperator(next model.VectorOperator, opts *execopts.Options) model.VectorOperator { oper := ×tampOperator{ next: next, } diff --git a/execution/noop/operator.go b/execution/noop/operator.go index 8f10b4b78..f2fd48e0a 100644 --- a/execution/noop/operator.go +++ b/execution/noop/operator.go @@ -6,8 +6,8 @@ package noop import ( "context" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/storage/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -17,7 +17,7 @@ type operator struct { model.VectorOperator } -func NewOperator(opts *query.Options) model.VectorOperator { +func NewOperator(opts *execopts.Options) model.VectorOperator { scanner := prometheus.NewVectorSelector( model.NewVectorPool(0), noopSelector{}, diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 7e869c3b7..563efe4f1 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -9,9 +9,9 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" promstorage "github.com/thanos-io/promql-engine/storage/prometheus" "github.com/thanos-io/promql-engine/warnings" @@ -25,14 +25,14 @@ import ( type Execution struct { storage *storageAdapter query promql.Query - opts *query.Options + opts *execopts.Options queryRangeStart time.Time queryRangeEnd time.Time vectorSelector model.VectorOperator } -func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) model.VectorOperator { +func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *execopts.Options, _ storage.SelectHints) model.VectorOperator { storage := newStorageFromQuery(query, opts, engineLabels) oper := &Execution{ storage: storage, @@ -87,7 +87,7 @@ func (e *Execution) Samples() *stats.QuerySamples { type storageAdapter struct { query promql.Query - opts *query.Options + opts *execopts.Options lbls []labels.Labels once sync.Once @@ -95,7 +95,7 @@ type storageAdapter struct { series []promstorage.SignedSeries } -func newStorageFromQuery(query promql.Query, opts *query.Options, lbls []labels.Labels) *storageAdapter { +func newStorageFromQuery(query promql.Query, opts *execopts.Options, lbls []labels.Labels) *storageAdapter { return &storageAdapter{ query: query, opts: opts, diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index f3387dc87..c5c1bf9eb 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -8,9 +8,9 @@ import ( "fmt" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/labels" ) @@ -30,7 +30,7 @@ type numberLiteralSelector struct { val float64 } -func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) model.VectorOperator { +func NewNumberLiteralSelector(pool *model.VectorPool, opts *execopts.Options, val float64) model.VectorOperator { oper := &numberLiteralSelector{ vectorPool: pool, numSteps: opts.NumSteps(), diff --git a/execution/scan/subquery.go b/execution/scan/subquery.go index 8d5c55300..9251b5fac 100644 --- a/execution/scan/subquery.go +++ b/execution/scan/subquery.go @@ -9,11 +9,11 @@ import ( "math" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/ringbuffer" "github.com/prometheus/prometheus/model/histogram" @@ -34,7 +34,7 @@ type subqueryOperator struct { currentStep int64 step int64 stepsBatch int - opts *query.Options + opts *execopts.Options funcExpr *logicalplan.FunctionCall subQuery *logicalplan.Subquery @@ -53,7 +53,7 @@ type subqueryOperator struct { params2 []float64 } -func NewSubqueryOperator(pool *model.VectorPool, next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) { +func NewSubqueryOperator(pool *model.VectorPool, next, paramOp, paramOp2 model.VectorOperator, opts *execopts.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) { call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name) if err != nil { return nil, err diff --git a/execution/step_invariant/step_invariant.go b/execution/step_invariant/step_invariant.go index 6a1c78f32..f32cf77ce 100644 --- a/execution/step_invariant/step_invariant.go +++ b/execution/step_invariant/step_invariant.go @@ -7,10 +7,10 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -45,7 +45,7 @@ func NewStepInvariantOperator( pool *model.VectorPool, next model.VectorOperator, expr logicalplan.Node, - opts *query.Options, + opts *execopts.Options, ) (model.VectorOperator, error) { // We set interval to be at least 1. u := &stepInvariantOperator{ diff --git a/execution/telemetry/telemetry.go b/execution/telemetry/telemetry.go index ca486eb2e..2b63a3e3c 100644 --- a/execution/telemetry/telemetry.go +++ b/execution/telemetry/telemetry.go @@ -8,9 +8,9 @@ import ( "fmt" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -33,21 +33,21 @@ type OperatorTelemetry interface { UpdatePeak(count int) } -func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { +func NewTelemetry(operator fmt.Stringer, opts *execopts.Options) OperatorTelemetry { if opts.EnableAnalysis { return NewTrackedTelemetry(operator, opts, nil) } return NewNoopTelemetry(operator) } -func NewSubqueryTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { +func NewSubqueryTelemetry(operator fmt.Stringer, opts *execopts.Options) OperatorTelemetry { if opts.EnableAnalysis { return NewTrackedTelemetry(operator, opts, &logicalplan.Subquery{}) } return NewNoopTelemetry(operator) } -func NewStepInvariantTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { +func NewStepInvariantTelemetry(operator fmt.Stringer, opts *execopts.Options) OperatorTelemetry { if opts.EnableAnalysis { return NewTrackedTelemetry(operator, opts, &logicalplan.StepInvariantExpr{}) } @@ -105,7 +105,7 @@ type TrackedTelemetry struct { logicalNode logicalplan.Node } -func NewTrackedTelemetry(operator fmt.Stringer, opts *query.Options, logicalPlanNode logicalplan.Node) *TrackedTelemetry { +func NewTrackedTelemetry(operator fmt.Stringer, opts *execopts.Options, logicalPlanNode logicalplan.Node) *TrackedTelemetry { ss := stats.NewQuerySamples(opts.EnablePerStepStats) ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), StepTrackingInterval(opts.Step)) return &TrackedTelemetry{ diff --git a/execution/unary/unary.go b/execution/unary/unary.go index 646b076a6..adc1860ab 100644 --- a/execution/unary/unary.go +++ b/execution/unary/unary.go @@ -7,10 +7,10 @@ import ( "context" "sync" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -24,7 +24,7 @@ type unaryNegation struct { series []labels.Labels } -func NewUnaryNegation(next model.VectorOperator, opts *query.Options) (model.VectorOperator, error) { +func NewUnaryNegation(next model.VectorOperator, opts *execopts.Options) (model.VectorOperator, error) { u := &unaryNegation{ next: next, } diff --git a/logicalplan/codec_test.go b/logicalplan/codec_test.go index 766ba2a9c..6ab28e85a 100644 --- a/logicalplan/codec_test.go +++ b/logicalplan/codec_test.go @@ -7,7 +7,7 @@ import ( "math/rand" "testing" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/cortexproject/promqlsmith" "github.com/efficientgo/core/testutil" @@ -49,7 +49,7 @@ sum( t.Run(tcase.name, func(t *testing.T) { ast, err := parser.ParseExpr(tcase.query) testutil.Ok(t, err) - original, _ := NewFromAST(ast, &query.Options{}, PlanOptions{}) + original, _ := NewFromAST(ast, &execopts.Options{}, PlanOptions{}) original, _ = original.Optimize(DefaultOptimizers) bytes, err := Marshal(original.Root()) @@ -67,7 +67,7 @@ func TestUnmarshalMatchers(t *testing.T) { ast, err := parser.ParseExpr(expr) testutil.Ok(t, err) - original, _ := NewFromAST(ast, &query.Options{}, PlanOptions{}) + original, _ := NewFromAST(ast, &execopts.Options{}, PlanOptions{}) bytes, err := Marshal(original.Root()) testutil.Ok(t, err) clone, err := Unmarshal(bytes) @@ -102,7 +102,7 @@ func FuzzNodesMarshalJSON(f *testing.F) { return nil }) - original, _ := NewFromAST(qry, &query.Options{}, PlanOptions{}) + original, _ := NewFromAST(qry, &execopts.Options{}, PlanOptions{}) original, _ = original.Optimize(DefaultOptimizers) bytes, err := Marshal(original.Root()) diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 811984544..aad6388d1 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -12,7 +12,7 @@ import ( "time" "github.com/thanos-io/promql-engine/api" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -156,7 +156,7 @@ type DistributedExecutionOptimizer struct { SkipBinaryPushdown bool } -func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { +func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *execopts.Options) (Node, annotations.Annotations) { engines := m.Endpoints.Engines() sort.Slice(engines, func(i, j int) bool { return engines[i].MinT() < engines[j].MinT() @@ -256,7 +256,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) return plan, *warns } -func (m DistributedExecutionOptimizer) subqueryOpts(parents map[*Node]*Node, current *Node, opts *query.Options) *query.Options { +func (m DistributedExecutionOptimizer) subqueryOpts(parents map[*Node]*Node, current *Node, opts *execopts.Options) *execopts.Options { subqueryParents := make([]*Subquery, 0, len(parents)) for p := parents[current]; p != nil; p = parents[p] { if subquery, ok := (*p).(*Subquery); ok { @@ -264,7 +264,7 @@ func (m DistributedExecutionOptimizer) subqueryOpts(parents map[*Node]*Node, cur } } for i := len(subqueryParents) - 1; i >= 0; i-- { - opts = query.NestedOptionsForSubquery( + opts = execopts.NestedOptionsForSubquery( opts, subqueryParents[i].Step, subqueryParents[i].Range, @@ -306,7 +306,7 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi // distributeQuery takes a PromQL expression in the form of *parser.Expr and a set of remote engines. // For each engine which matches the time range of the query, it creates a RemoteExecution scoped to the range of the engine. // All remote executions are wrapped in a Deduplicate logical node to make sure that results from overlapping engines are deduplicated. -func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api.RemoteEngine, opts *query.Options, labelRanges labelSetRanges) Node { +func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api.RemoteEngine, opts *execopts.Options, labelRanges labelSetRanges) Node { startOffset := calculateStartOffset(expr, opts.LookbackDelta) allowedStartOffset := labelRanges.minOverlap(opts.Start.UnixMilli()-startOffset.Milliseconds(), opts.End.UnixMilli()) @@ -373,7 +373,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api } } -func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api.RemoteEngine, startOffset time.Duration, opts *query.Options) Node { +func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api.RemoteEngine, startOffset time.Duration, opts *execopts.Options) Node { queries := make(RemoteExecutions, 0, len(engines)) for i, e := range engines { if e.MaxT() < opts.Start.UnixMilli()-startOffset.Milliseconds() { @@ -423,7 +423,7 @@ func isAbsent(expr Node) bool { return call.Func.Name == "absent" || call.Func.Name == "absent_over_time" } -func getStartTimeForEngine(e api.RemoteEngine, opts *query.Options, offset time.Duration, globalMinT int64) (time.Time, bool) { +func getStartTimeForEngine(e api.RemoteEngine, opts *execopts.Options, offset time.Duration, globalMinT int64) (time.Time, bool) { if e.MinT() > opts.End.UnixMilli() { return time.Time{}, false } @@ -457,7 +457,7 @@ func getStartTimeForEngine(e api.RemoteEngine, opts *query.Options, offset time. // engine min time and the query step size. // The purpose of this alignment is to make sure that the steps for the remote query // have the same timestamps as the ones for the central query. -func calculateStepAlignedStart(opts *query.Options, engineMinTime time.Time) time.Time { +func calculateStepAlignedStart(opts *execopts.Options, engineMinTime time.Time) time.Time { originalSteps := numSteps(opts.Start, opts.End, opts.Step) remoteQuerySteps := numSteps(engineMinTime, opts.End, opts.Step) diff --git a/logicalplan/distribute_test.go b/logicalplan/distribute_test.go index 8ee4fc2ef..de276328b 100644 --- a/logicalplan/distribute_test.go +++ b/logicalplan/distribute_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/thanos-io/promql-engine/api" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" @@ -459,7 +459,7 @@ count by (cluster) ( expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, warns := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) testutil.Equals(t, expectedPlan, optimizedPlan.Root().String()) @@ -654,7 +654,7 @@ sum(dedup( expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{Start: queryStart, End: queryEnd, Step: queryStep}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: queryStart, End: queryEnd, Step: queryStep}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) testutil.Equals(t, expectedPlan, optimizedPlan.Root().String()) @@ -721,7 +721,7 @@ sum( expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{Start: tcase.queryStart, End: tcase.queryEnd, Step: time.Minute}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: tcase.queryStart, End: tcase.queryEnd, Step: time.Minute}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root())) @@ -781,7 +781,7 @@ sum by (pod) (dedup( expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, err := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, err := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) testutil.Ok(t, err) optimizedPlan, _ := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) @@ -809,7 +809,7 @@ sum(dedup( newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east")}), } - lplan, _ := NewFromAST(expr, &query.Options{Start: start, End: end, Step: step}, PlanOptions{}) + lplan, _ := NewFromAST(expr, &execopts.Options{Start: start, End: end, Step: step}, PlanOptions{}) optimizedPlan, _ := lplan.Optimize([]Optimizer{ DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)}, }) diff --git a/logicalplan/histogram_stats.go b/logicalplan/histogram_stats.go index f5ba8fbcc..e0b092804 100644 --- a/logicalplan/histogram_stats.go +++ b/logicalplan/histogram_stats.go @@ -4,14 +4,14 @@ package logicalplan import ( - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/util/annotations" ) type DetectHistogramStatsOptimizer struct{} -func (d DetectHistogramStatsOptimizer) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (d DetectHistogramStatsOptimizer) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { return d.optimize(plan, false) } diff --git a/logicalplan/merge_selects.go b/logicalplan/merge_selects.go index 4cc2f7cf2..9a40be4d3 100644 --- a/logicalplan/merge_selects.go +++ b/logicalplan/merge_selects.go @@ -6,7 +6,7 @@ package logicalplan import ( "slices" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/util/annotations" @@ -23,7 +23,7 @@ import ( // and apply an additional filter for {c="d"}. type MergeSelectsOptimizer struct{} -func (m MergeSelectsOptimizer) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (m MergeSelectsOptimizer) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { heap := make(matcherHeap) extractSelectors(heap, plan) replaceMatchers(heap, &plan) diff --git a/logicalplan/merge_selects_test.go b/logicalplan/merge_selects_test.go index fcb4e07fa..079f3bb0b 100644 --- a/logicalplan/merge_selects_test.go +++ b/logicalplan/merge_selects_test.go @@ -6,7 +6,7 @@ package logicalplan import ( "testing" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" @@ -49,7 +49,7 @@ func TestMergeSelects(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, tcase.expected, renderExprTree(optimizedPlan.Root())) }) @@ -169,7 +169,7 @@ func TestMergeSelectsWithProjections(t *testing.T) { optimizer := MergeSelectsOptimizer{} for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - optimizedPlan, _ := optimizer.Optimize(tc.plan, &query.Options{}) + optimizedPlan, _ := optimizer.Optimize(tc.plan, &execopts.Options{}) testutil.Equals(t, tc.expected, renderExprTree(optimizedPlan)) }) } diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index c256e80ca..807d62bcc 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -5,7 +5,7 @@ package logicalplan import ( "github.com/thanos-io/promql-engine/api" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/util/annotations" @@ -38,11 +38,11 @@ func labelSetsMatch(matchers []*labels.Matcher, lset ...labels.Labels) bool { return false } -func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool { +func matchingEngineTime(e api.RemoteEngine, opts *execopts.Options) bool { return !(opts.Start.UnixMilli() > e.MaxT() || opts.End.UnixMilli() < e.MinT()) } -func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { +func (m PassthroughOptimizer) Optimize(plan Node, opts *execopts.Options) (Node, annotations.Annotations) { engines := m.Endpoints.Engines() if len(engines) == 1 { if !matchingEngineTime(engines[0], opts) { diff --git a/logicalplan/passthrough_test.go b/logicalplan/passthrough_test.go index 422107664..ad16203c5 100644 --- a/logicalplan/passthrough_test.go +++ b/logicalplan/passthrough_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/thanos-io/promql-engine/api" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" @@ -26,7 +26,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, "remote(time())", renderExprTree(optimizedPlan.Root())) @@ -39,7 +39,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, "time()", renderExprTree(optimizedPlan.Root())) @@ -51,7 +51,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, "time()", renderExprTree(optimizedPlan.Root())) @@ -67,7 +67,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(selectorExpr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(selectorExpr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, `remote({region="east"})`, renderExprTree(optimizedPlan.Root())) @@ -83,7 +83,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(selectorExpr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(selectorExpr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, `{region=~"east|west"}`, renderExprTree(optimizedPlan.Root())) @@ -99,7 +99,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(selectorExpr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(selectorExpr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, `remote({region="east"}[5m])`, renderExprTree(optimizedPlan.Root())) @@ -115,7 +115,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan, _ := NewFromAST(selectorExpr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(selectorExpr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, `{region="east"}`, renderExprTree(optimizedPlan.Root())) diff --git a/logicalplan/plan.go b/logicalplan/plan.go index c9cf3f8f5..ee84d9b21 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -8,8 +8,8 @@ import ( "strings" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/parse" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -30,16 +30,16 @@ var DefaultOptimizers = []Optimizer{ type Plan interface { Optimize([]Optimizer) (Plan, annotations.Annotations) Root() Node - MinMaxTime(*query.Options) (int64, int64) + MinMaxTime(*execopts.Options) (int64, int64) } type Optimizer interface { - Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) + Optimize(plan Node, opts *execopts.Options) (Node, annotations.Annotations) } type plan struct { expr Node - opts *query.Options + opts *execopts.Options planOpts PlanOptions } @@ -48,7 +48,7 @@ type PlanOptions struct { } // New creates a new logical plan from logical node. -func New(root Node, queryOpts *query.Options, planOpts PlanOptions) Plan { +func New(root Node, queryOpts *execopts.Options, planOpts PlanOptions) Plan { return &plan{ expr: root, opts: queryOpts, @@ -56,7 +56,7 @@ func New(root Node, queryOpts *query.Options, planOpts PlanOptions) Plan { } } -func NewFromAST(ast parser.Expr, queryOpts *query.Options, planOpts PlanOptions) (Plan, error) { +func NewFromAST(ast parser.Expr, queryOpts *execopts.Options, planOpts PlanOptions) (Plan, error) { ast, err := promql.PreprocessExpr(ast, queryOpts.Start, queryOpts.End, queryOpts.Step) if err != nil { return nil, err @@ -82,7 +82,7 @@ func NewFromAST(ast parser.Expr, queryOpts *query.Options, planOpts PlanOptions) // NewFromBytes creates a new logical plan from a byte slice created with Marshal. // This method is used to deserialize a logical plan which has been sent over the wire. -func NewFromBytes(bytes []byte, queryOpts *query.Options, planOpts PlanOptions) (Plan, error) { +func NewFromBytes(bytes []byte, queryOpts *execopts.Options, planOpts PlanOptions) (Plan, error) { root, err := Unmarshal(bytes) if err != nil { return nil, err @@ -97,7 +97,7 @@ func NewFromBytes(bytes []byte, queryOpts *query.Options, planOpts PlanOptions) }, nil } -func getTimeRangesForSelector(qOpts *query.Options, n *parser.VectorSelector, parents []*Node, evalRange time.Duration) (int64, int64) { +func getTimeRangesForSelector(qOpts *execopts.Options, n *parser.VectorSelector, parents []*Node, evalRange time.Duration) (int64, int64) { start, end := qOpts.Start.UnixMilli(), qOpts.End.UnixMilli() subqOffset, subqRange, subqTs := logicalSubqueryTimes(parents) @@ -152,7 +152,7 @@ func extractFuncFromPath(p []*Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) { +func (p *plan) MinMaxTime(qOpts *execopts.Options) (int64, int64) { var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64 // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets @@ -546,10 +546,10 @@ func logicalSubqueryTimes(path []*Node) (time.Duration, time.Duration, *int64) { return subqOffset, subqRange, tsp } -func setOffsetForInnerSubqueries(expr parser.Expr, opts *query.Options) { +func setOffsetForInnerSubqueries(expr parser.Expr, opts *execopts.Options) { switch n := expr.(type) { case *parser.SubqueryExpr: - nOpts := query.NestedOptionsForSubquery(opts, n.Step, n.Range, n.Offset) + nOpts := execopts.NestedOptionsForSubquery(opts, n.Step, n.Range, n.Offset) setOffsetForAtModifier(nOpts.Start.UnixMilli(), n.Expr) setOffsetForInnerSubqueries(n.Expr, nOpts) default: diff --git a/logicalplan/plan_test.go b/logicalplan/plan_test.go index deec36a72..b02882966 100644 --- a/logicalplan/plan_test.go +++ b/logicalplan/plan_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/promql/parser" @@ -203,7 +203,7 @@ func TestDefaultOptimizers(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(DefaultOptimizers) expectedPlan := strings.Trim(spaces.ReplaceAllString(tcase.expected, " "), " ") testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root())) @@ -316,7 +316,7 @@ func TestMatcherPropagation(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) expectedPlan := strings.Trim(spaces.ReplaceAllString(tcase.expected, " "), " ") testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root())) @@ -368,7 +368,7 @@ func TestTrimSorts(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{}, PlanOptions{}) testutil.Equals(t, tcase.expected, plan.Root().String()) }) } @@ -421,7 +421,7 @@ func TestReduceConstantExpressions(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{}, PlanOptions{}) testutil.Equals(t, tcase.expected, plan.Root().String()) }) } diff --git a/logicalplan/projection.go b/logicalplan/projection.go index cbca7c983..06ea4eac6 100644 --- a/logicalplan/projection.go +++ b/logicalplan/projection.go @@ -7,7 +7,7 @@ import ( "maps" "slices" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/util/annotations" @@ -17,7 +17,7 @@ type ProjectionOptimizer struct { SeriesHashLabel string } -func (p ProjectionOptimizer) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (p ProjectionOptimizer) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { p.pushProjection(&plan, nil) return plan, nil } diff --git a/logicalplan/projection_test.go b/logicalplan/projection_test.go index d8f8834de..92a7820a4 100644 --- a/logicalplan/projection_test.go +++ b/logicalplan/projection_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/promql/parser" @@ -276,7 +276,7 @@ func TestProjectionOptimizer(t *testing.T) { expr, err := parser.ParseExpr(tc.expr) testutil.Ok(t, err) - plan, err := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) + plan, err := NewFromAST(expr, &execopts.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{}) testutil.Ok(t, err) optimizer := ProjectionOptimizer{SeriesHashLabel: "__series_hash__"} optimizedPlan, _ := optimizer.Optimize(plan.Root(), nil) diff --git a/logicalplan/propagate_selectors.go b/logicalplan/propagate_selectors.go index dd43a4f1d..56203edee 100644 --- a/logicalplan/propagate_selectors.go +++ b/logicalplan/propagate_selectors.go @@ -8,7 +8,7 @@ import ( "sort" "strings" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -19,7 +19,7 @@ import ( // two vector selectors in a binary expression. type PropagateMatchersOptimizer struct{} -func (m PropagateMatchersOptimizer) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (m PropagateMatchersOptimizer) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { Traverse(&plan, func(expr *Node) { binOp, ok := (*expr).(*Binary) if !ok { diff --git a/logicalplan/set_batch_size.go b/logicalplan/set_batch_size.go index ffc666f78..e67f1d434 100644 --- a/logicalplan/set_batch_size.go +++ b/logicalplan/set_batch_size.go @@ -4,7 +4,7 @@ package logicalplan import ( - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/util/annotations" @@ -20,7 +20,7 @@ type SelectorBatchSize struct { // If any aggregate is present in the plan, the batch size is set to the configured value. // The two exceptions where this cannot be done is if the aggregate is quantile, or // when a binary expression precedes the aggregate. -func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (m SelectorBatchSize) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { canBatch := false Traverse(&plan, func(current *Node) { switch e := (*current).(type) { diff --git a/logicalplan/set_batch_size_test.go b/logicalplan/set_batch_size_test.go index 15733eac5..346b88c4b 100644 --- a/logicalplan/set_batch_size_test.go +++ b/logicalplan/set_batch_size_test.go @@ -6,7 +6,7 @@ package logicalplan import ( "testing" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/promql/parser" @@ -97,7 +97,7 @@ func TestSetBatchSize(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan, _ := NewFromAST(expr, &query.Options{}, PlanOptions{}) + plan, _ := NewFromAST(expr, &execopts.Options{}, PlanOptions{}) optimizedPlan, _ := plan.Optimize(optimizers) testutil.Equals(t, tcase.expected, renderExprTree(optimizedPlan.Root())) }) diff --git a/logicalplan/sort_matchers.go b/logicalplan/sort_matchers.go index b3a204374..ec2d71dcc 100644 --- a/logicalplan/sort_matchers.go +++ b/logicalplan/sort_matchers.go @@ -6,7 +6,7 @@ package logicalplan import ( "sort" - "github.com/thanos-io/promql-engine/query" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/prometheus/prometheus/util/annotations" ) @@ -16,7 +16,7 @@ import ( // can rely on this property. type SortMatchers struct{} -func (m SortMatchers) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +func (m SortMatchers) Optimize(plan Node, _ *execopts.Options) (Node, annotations.Annotations) { Traverse(&plan, func(node *Node) { e, ok := (*node).(*VectorSelector) if !ok { diff --git a/logicalplan/user_defined.go b/logicalplan/user_defined.go index 6525837ea..778b9da17 100644 --- a/logicalplan/user_defined.go +++ b/logicalplan/user_defined.go @@ -6,8 +6,8 @@ package logicalplan import ( "context" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/storage" ) @@ -18,7 +18,7 @@ type UserDefinedExpr interface { MakeExecutionOperator( ctx context.Context, vectors *model.VectorPool, - opts *query.Options, + opts *execopts.Options, hints storage.SelectHints, ) (model.VectorOperator, error) } diff --git a/ringbuffer/overtime.go b/ringbuffer/overtime.go index 10a5f9f4d..1776e6bd1 100644 --- a/ringbuffer/overtime.go +++ b/ringbuffer/overtime.go @@ -8,8 +8,8 @@ import ( "math" "github.com/thanos-io/promql-engine/compute" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/prometheus/prometheus/model/histogram" @@ -24,7 +24,7 @@ const maxStreamingStepOverlap = 5 // overlapSteps calculates the number of evaluation steps that a range window overlaps. // This is the number of steps where a single sample contributes to the result. -func overlapSteps(opts query.Options, selectRange int64) int64 { +func overlapSteps(opts execopts.Options, selectRange int64) int64 { step := max(1, opts.Step.Milliseconds()) return min( (selectRange-1)/step+1, @@ -32,7 +32,7 @@ func overlapSteps(opts query.Options, selectRange int64) int64 { ) } -func UseStreamingRingBuffers(opts query.Options, selectRange int64) bool { +func UseStreamingRingBuffers(opts execopts.Options, selectRange int64) bool { return overlapSteps(opts, selectRange) <= maxStreamingStepOverlap } @@ -57,7 +57,7 @@ type stepState struct { warn error } -func newOverTimeBuffer(opts query.Options, selectRange, offset int64, accMaker func() compute.Accumulator) *OverTimeBuffer { +func newOverTimeBuffer(opts execopts.Options, selectRange, offset int64, accMaker func() compute.Accumulator) *OverTimeBuffer { var ( step = max(1, opts.Step.Milliseconds()) numSteps = overlapSteps(opts, selectRange) @@ -88,39 +88,39 @@ func newOverTimeBuffer(opts query.Options, selectRange, offset int64, accMaker f } } -func NewCountOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewCountOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewCountAcc() }) } -func NewMaxOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewMaxOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewMaxAcc() }) } -func NewMinOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewMinOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewMinAcc() }) } -func NewSumOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewSumOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewSumAcc() }) } -func NewAvgOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewAvgOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewAvgAcc() }) } -func NewStdDevOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewStdDevOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewStdDevAcc() }) } -func NewStdVarOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewStdVarOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewStdVarAcc() }) } -func NewPresentOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewPresentOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewGroupAcc() }) } -func NewLastOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer { +func NewLastOverTimeBuffer(opts execopts.Options, selectRange, offset int64) *OverTimeBuffer { return newOverTimeBuffer(opts, selectRange, offset, func() compute.Accumulator { return compute.NewLastAcc() }) } diff --git a/ringbuffer/rate.go b/ringbuffer/rate.go index c1612eb95..719f3584c 100644 --- a/ringbuffer/rate.go +++ b/ringbuffer/rate.go @@ -8,8 +8,8 @@ import ( "math" "slices" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/histogram" ) @@ -49,7 +49,7 @@ type stepRange struct { } // NewRateBuffer creates a new RateBuffer. -func NewRateBuffer(ctx context.Context, opts query.Options, isCounter, isRate bool, selectRange, offset int64) *RateBuffer { +func NewRateBuffer(ctx context.Context, opts execopts.Options, isCounter, isRate bool, selectRange, offset int64) *RateBuffer { var ( step = max(1, opts.Step.Milliseconds()) numSteps = min( @@ -194,7 +194,7 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64, func (r *RateBuffer) ReadIntoLast(func(*Sample)) {} -func querySteps(o query.Options) int64 { +func querySteps(o execopts.Options) int64 { // Instant evaluation is executed as a range evaluation with one step. if o.Step.Milliseconds() == 0 { return 1 diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index aecf7921c..f93a9ff63 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -11,11 +11,11 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/ringbuffer" "github.com/thanos-io/promql-engine/warnings" @@ -52,7 +52,7 @@ type matrixSelector struct { functionName string call ringbuffer.FunctionCall fhReader *histogram.FloatHistogram - opts *query.Options + opts *execopts.Options numSteps int mint int64 @@ -85,7 +85,7 @@ func NewMatrixSelector( functionName string, arg float64, arg2 float64, - opts *query.Options, + opts *execopts.Options, selectRange, offset time.Duration, batchSize int64, shard, numShard int, diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index dc7888f57..dfbdf49ca 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -8,10 +8,10 @@ import ( "math" "github.com/thanos-io/promql-engine/execution/exchange" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/warnings" "github.com/efficientgo/core/errors" @@ -32,7 +32,7 @@ func (s *Scanners) Close() error { return s.querier.Close() } -func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) { +func NewPrometheusScanners(queryable storage.Queryable, qOpts *execopts.Options, lplan logicalplan.Plan) (*Scanners, error) { var min, max int64 if lplan != nil { min, max = lplan.MinMaxTime(qOpts) @@ -49,7 +49,7 @@ func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lp func (p Scanners) NewVectorSelector( _ context.Context, - opts *query.Options, + opts *execopts.Options, hints storage.SelectHints, logicalNode logicalplan.VectorSelector, ) (model.VectorOperator, error) { @@ -85,7 +85,7 @@ func (p Scanners) NewVectorSelector( func (p Scanners) NewMatrixSelector( ctx context.Context, - opts *query.Options, + opts *execopts.Options, hints storage.SelectHints, logicalNode logicalplan.MatrixSelector, call logicalplan.FunctionCall, diff --git a/storage/prometheus/scanners_test.go b/storage/prometheus/scanners_test.go index f96cd7698..cb1104fea 100644 --- a/storage/prometheus/scanners_test.go +++ b/storage/prometheus/scanners_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" @@ -71,7 +71,7 @@ func TestScannersMinMaxTime(t *testing.T) { p, err := parser.ParseExpr(tcase.expr) require.NoError(t, err) - qOpts := &query.Options{ + qOpts := &execopts.Options{ Start: tcase.start, End: tcase.end, Step: tcase.step, diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 725616ab0..9d928c36c 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -9,10 +9,10 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/extlabels" - "github.com/thanos-io/promql-engine/query" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" @@ -59,7 +59,7 @@ type vectorSelector struct { func NewVectorSelector( pool *model.VectorPool, selector SeriesSelector, - queryOpts *query.Options, + queryOpts *execopts.Options, offset time.Duration, batchSize int64, selectTimestamp bool, diff --git a/storage/scanners.go b/storage/scanners.go index fba38b001..fff38b80f 100644 --- a/storage/scanners.go +++ b/storage/scanners.go @@ -6,15 +6,15 @@ package storage import ( "context" + "github.com/thanos-io/promql-engine/execution/execopts" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/storage" ) type Scanners interface { Close() error - NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) - NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) + NewVectorSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) + NewMatrixSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) }