Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 56 additions & 58 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
"time"

"github.com/thanos-io/promql-engine/execution"
"github.com/thanos-io/promql-engine/execution/execopts"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
engstorage "github.com/thanos-io/promql-engine/storage"
promstorage "github.com/thanos-io/promql-engine/storage/prometheus"
"github.com/thanos-io/promql-engine/warnings"
Expand Down Expand Up @@ -83,33 +83,31 @@ type Opts struct {
DisableDuplicateLabelChecks bool
}

// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
type QueryOpts struct {
// These values are used to implement promql.QueryOpts, they have weird "Param" suffix because
// they are accessed by methods of the same name.
LookbackDeltaParam time.Duration
EnablePerStepStatsParam bool
// QueryOptions allows overriding engine default options on a per-query basis.
type QueryOptions struct {
// LookbackDelta overrides the engine's default lookback delta for this query.
LookbackDelta time.Duration

// DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
// EnablePerStepStats enables per-step statistics for this query.
EnablePerStepStats bool

// DecodingConcurrency overrides the engine's default decoding concurrency for this query.
DecodingConcurrency int

// SelectorBatchSize can be used to override the SelectorBatchSize engine setting.
// SelectorBatchSize overrides the engine's default selector batch size for this query.
SelectorBatchSize int64

// LogicalOptimizers can be used to override the LogicalOptimizers engine setting.
// LogicalOptimizers overrides the engine's default logical optimizers for this query.
LogicalOptimizers []logicalplan.Optimizer
}

func (opts QueryOpts) LookbackDelta() time.Duration { return opts.LookbackDeltaParam }
func (opts QueryOpts) EnablePerStepStats() bool { return opts.EnablePerStepStatsParam }

func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts {
func fromPromQLOpts(opts promql.QueryOpts) *QueryOptions {
if opts == nil {
return &QueryOpts{}
return &QueryOptions{}
}
return &QueryOpts{
LookbackDeltaParam: opts.LookbackDelta(),
EnablePerStepStatsParam: opts.EnablePerStepStats(),
return &QueryOptions{
LookbackDelta: opts.LookbackDelta(),
EnablePerStepStats: opts.EnablePerStepStats(),
}
}

Expand Down Expand Up @@ -229,7 +227,7 @@ type Engine struct {
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
Expand All @@ -245,15 +243,15 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
// the presentation layer and not when computing the results.
resultSort := newResultSort(expr)

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
execOpts := e.makeExecutionOpts(ts, ts, 0, opts)
if execOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}

planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}
initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts)
initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts)
if err != nil {
return nil, errors.Wrap(err, "creating plan")
}
Expand All @@ -262,18 +260,18 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

scanners, err := e.storageScanners(q, qOpts, optimizedPlan)
scanners, err := e.storageScanners(q, execOpts, optimizedPlan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, qOpts)
exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, execOpts)
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()
return &compatibilityQuery{
Query: &Query{exec: exec, opts: qOpts},
Query: &Query{exec: exec, opts: execOpts},
engine: e,
plan: optimizedPlan,
warns: warns,
Expand All @@ -284,38 +282,38 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
}, nil
}

func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
execOpts := e.makeExecutionOpts(ts, ts, 0, opts)
if execOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

scnrs, err := e.storageScanners(q, qOpts, lplan)
scnrs, err := e.storageScanners(q, execOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts)
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: qOpts},
Query: &Query{exec: exec, opts: execOpts},
engine: e,
plan: lplan,
warns: warns,
Expand All @@ -327,7 +325,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
}, nil
}

func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
Expand All @@ -343,15 +341,15 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
execOpts := e.makeExecutionOpts(start, end, step, opts)
if execOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}

initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts)
initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts)
if err != nil {
return nil, errors.Wrap(err, "creating plan")
}
Expand All @@ -360,19 +358,19 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

scnrs, err := e.storageScanners(q, qOpts, optimizedPlan)
scnrs, err := e.storageScanners(q, execOpts, optimizedPlan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, qOpts)
exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, execOpts)
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: qOpts},
Query: &Query{exec: exec, opts: execOpts},
engine: e,
plan: optimizedPlan,
warns: warns,
Expand All @@ -381,37 +379,37 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
}, nil
}

func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
execOpts := e.makeExecutionOpts(start, end, step, opts)
if execOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))

scnrs, err := e.storageScanners(q, qOpts, lplan)
scnrs, err := e.storageScanners(q, execOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts)
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: qOpts},
Query: &Query{exec: exec, opts: execOpts},
engine: e,
plan: lplan,
warns: warns,
Expand All @@ -432,8 +430,8 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step)
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options {
res := &query.Options{
func (e *Engine) makeExecutionOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOptions) *execopts.Options {
res := &execopts.Options{
Start: start,
End: end,
Step: step,
Expand All @@ -449,11 +447,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
return res
}

if opts.LookbackDelta() > 0 {
res.LookbackDelta = opts.LookbackDelta()
if opts.LookbackDelta > 0 {
res.LookbackDelta = opts.LookbackDelta
}
if opts.EnablePerStepStats() {
res.EnablePerStepStats = opts.EnablePerStepStats()
if opts.EnablePerStepStats {
res.EnablePerStepStats = opts.EnablePerStepStats
}

if opts.DecodingConcurrency != 0 {
Expand All @@ -463,30 +461,30 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
return res
}

func (e *Engine) getLogicalOptimizers(opts *QueryOpts) []logicalplan.Optimizer {
func (e *Engine) getLogicalOptimizers(opts *QueryOptions) []logicalplan.Optimizer {
var optimizers []logicalplan.Optimizer
if len(opts.LogicalOptimizers) != 0 {
if opts != nil && len(opts.LogicalOptimizers) != 0 {
optimizers = slices.Clone(opts.LogicalOptimizers)
} else {
optimizers = slices.Clone(e.logicalOptimizers)
}
selectorBatchSize := e.selectorBatchSize
if opts.SelectorBatchSize != 0 {
if opts != nil && opts.SelectorBatchSize != 0 {
selectorBatchSize = opts.SelectorBatchSize
}
return append(optimizers, logicalplan.SelectorBatchSize{Size: selectorBatchSize})
}

func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
func (e *Engine) storageScanners(queryable storage.Queryable, execOpts *execopts.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
if e.scanners == nil {
return promstorage.NewPrometheusScanners(queryable, qOpts, lplan)
return promstorage.NewPrometheusScanners(queryable, execOpts, lplan)
}
return e.scanners, nil
}

type Query struct {
exec model.VectorOperator
opts *query.Options
opts *execopts.Options
}

// Explain returns human-readable explanation of the created executor.
Expand Down
10 changes: 5 additions & 5 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"time"

"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/execution/execopts"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"github.com/thanos-io/promql-engine/storage/prometheus"
"github.com/thanos-io/promql-engine/warnings"

Expand Down Expand Up @@ -2482,7 +2482,7 @@ type scannersWithWarns struct {
promScanners *prometheus.Scanners
}

func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) {
func newScannersWithWarns(warn error, qOpts *execopts.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) {
scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{
MockQuerier: storage.NoopQuerier(),
}, qOpts, lplan)
Expand All @@ -2497,12 +2497,12 @@ func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Pl

func (s *scannersWithWarns) Close() error { return nil }

func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
warnings.AddToContext(s.warn, ctx)
return s.promScanners.NewVectorSelector(ctx, opts, hints, selector)
}

func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) {
func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) {
warnings.AddToContext(s.warn, ctx)
return s.promScanners.NewMatrixSelector(ctx, opts, hints, selector, call)
}
Expand All @@ -2513,7 +2513,7 @@ func TestWarningsPlanCreation(t *testing.T) {
expectedWarn = errors.New("test warning")
)

scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{}, nil)
scnrs, err := newScannersWithWarns(expectedWarn, &execopts.Options{}, nil)
testutil.Ok(t, err)
newEngine := engine.NewWithScanners(opts, scnrs)
q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second)
Expand Down
4 changes: 2 additions & 2 deletions engine/projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestProjectionWithFuzz(t *testing.T) {
Queryable: storage,
}

normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime)
testutil.Ok(t, err)
defer normalQuery.Close()
normalResult := normalQuery.Exec(ctx)
Expand All @@ -252,7 +252,7 @@ func TestProjectionWithFuzz(t *testing.T) {
}
testutil.Ok(t, normalResult.Err, "query: %s", query)

projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, &engine.QueryOpts{}, query, queryTime)
projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, nil, query, queryTime)
testutil.Ok(t, err)

defer projectionQuery.Close()
Expand Down
4 changes: 2 additions & 2 deletions engine/propagate_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestPropagateMatchers(t *testing.T) {
}

t.Run(fmt.Sprintf("Query_%d", i), func(t *testing.T) {
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime)
testutil.Ok(t, err)
defer normalQuery.Close()
normalResult := normalQuery.Exec(ctx)
Expand All @@ -130,7 +130,7 @@ func TestPropagateMatchers(t *testing.T) {
}
testutil.Ok(t, normalResult.Err, "query: %s", query)

optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, nil, query, queryTime)
testutil.Ok(t, err)

defer optimizedQuery.Close()
Expand Down
Loading
Loading