Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions compute/aggregators.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import (
"math"
"simd/archsimd"

Check failure on line 8 in compute/aggregators.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

could not import simd/archsimd (invalid package name: "")

Check failure on line 8 in compute/aggregators.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

could not import simd/archsimd (invalid package name: "")
"sort"

"github.com/thanos-io/promql-engine/warnings"
Expand Down Expand Up @@ -204,6 +205,68 @@
warn warnings.Warnings
}

func maxLikeAddFloatSIMD(vs []float64) float64 {
m := vs[0]
if len(vs) == 1 {
return m
}

// Float64x8 needs AVX-512. Use scalar fallback otherwise.
if !archsimd.X86.AVX512() {
for _, v := range vs[1:] {
if m < v || math.IsNaN(m) {
m = v
}
}
return m
}

vmax := archsimd.BroadcastFloat64x8(m)

i := 1
for ; i+8 <= len(vs); i += 8 {
v := archsimd.LoadFloat64x8Slice(vs[i:])

// replace if (v > vmax) OR (vmax is NaN)
replace := v.Greater(vmax).Or(vmax.IsNaN())

// Merge semantics are inverted (mask=false picks 2nd arg),
// so to pick v when replace=true:
vmax = v.Merge(vmax, replace)
}

// reduce lanes back to scalar with the same rule
var lanes [8]float64
vmax.Store(&lanes)
for _, v := range lanes {
if m < v || math.IsNaN(m) {
m = v
}
}

// tail
for ; i < len(vs); i++ {
v := vs[i]
if m < v || math.IsNaN(m) {
m = v
}
}

return m
}

func (c *MaxAcc) AddVectorSIMD(vs []float64, hs []*histogram.FloatHistogram) error {
if len(hs) > 0 {
c.warn |= warnings.WarnHistogramIgnoredInAggregation
}
if len(vs) == 0 {
return nil
}

c.addFloat(maxLikeAddFloatSIMD(vs))
return nil
}

func (c *MaxAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error {
if len(hs) > 0 {
c.warn |= warnings.WarnHistogramIgnoredInAggregation
Expand Down
131 changes: 131 additions & 0 deletions compute/aggregators_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package compute

import (
"math"
"testing"
)

func TestNewMaxAcc(t *testing.T) {
t.Run("empty accumulator has NoValue", func(t *testing.T) {
acc := NewMaxAcc()
if got := acc.ValueType(); got != NoValue {
t.Errorf("ValueType() = %v, want NoValue", got)
}
val, _ := acc.Value()
if val != 0 {
t.Errorf("Value() = %v, want 0", val)
}
})

t.Run("single Add sets max", func(t *testing.T) {
acc := NewMaxAcc()
_ = acc.Add(42, nil)
val, _ := acc.Value()
if val != 42 {
t.Errorf("Value() = %v, want 42", val)
}
if acc.ValueType() != SingleTypeValue {
t.Errorf("ValueType() = %v, want SingleTypeValue", acc.ValueType())
}
})

t.Run("multiple Add keeps max", func(t *testing.T) {
acc := NewMaxAcc()
for _, v := range []float64{3, 7, 1, 9, 2} {
_ = acc.Add(v, nil)
}
val, _ := acc.Value()
if val != 9 {
t.Errorf("Value() = %v, want 9", val)
}
})

t.Run("AddVector takes max of slice", func(t *testing.T) {
acc := NewMaxAcc()
vec := []float64{10, 5, 20, 15}
_ = acc.AddVector(vec, nil)
val, _ := acc.Value()
if val != 20 {
t.Errorf("Value() = %v, want 20", val)
}
})

t.Run("Add and AddVector combined", func(t *testing.T) {
acc := NewMaxAcc()
_ = acc.Add(100, nil)
_ = acc.AddVector([]float64{50, 80, 120}, nil)
val, _ := acc.Value()
if val != 120 {
t.Errorf("Value() = %v, want 120", val)
}
})

t.Run("AddVectorSIMD takes max of slice", func(t *testing.T) {
acc := NewMaxAcc()
vec := []float64{10, 5, 20, 15}
_ = acc.AddVectorSIMD(vec, nil)
val, _ := acc.Value()
if val != 20 {
t.Errorf("Value() = %v, want 20", val)
}
})

t.Run("Add and AddVectorSIMD combined", func(t *testing.T) {
acc := NewMaxAcc()
_ = acc.Add(100, nil)
_ = acc.AddVectorSIMD([]float64{50, 80, 120}, nil)
val, _ := acc.Value()
if val != 120 {
t.Errorf("Value() = %v, want 120", val)
}
})

t.Run("NaN is replaced by real number", func(t *testing.T) {
acc := NewMaxAcc()
_ = acc.Add(math.NaN(), nil)
_ = acc.Add(5, nil)
val, _ := acc.Value()
if val != 5 {
t.Errorf("Value() = %v, want 5", val)
}
})

t.Run("Reset clears state", func(t *testing.T) {
acc := NewMaxAcc()
_ = acc.Add(99, nil)
acc.Reset(0)
if acc.ValueType() != NoValue {
t.Errorf("after Reset, ValueType() = %v, want NoValue", acc.ValueType())
}
val, _ := acc.Value()
if val != 0 {
t.Errorf("after Reset, Value() = %v, want 0", val)
}
})
}

func BenchmarkMaxAcc_AddVector(b *testing.B) {
acc := NewMaxAcc()
vec := make([]float64, 100)
for i := range vec {
vec[i] = float64(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = acc.AddVector(vec, nil)
_, _ = acc.Value()
}
}

func BenchmarkMaxAcc_AddVectorSIMD(b *testing.B) {
acc := NewMaxAcc()
vec := make([]float64, 100)
for i := range vec {
vec[i] = float64(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = acc.AddVectorSIMD(vec, nil)
_, _ = acc.Value()
}
}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/thanos-io/promql-engine

go 1.24.0

toolchain go1.24.4
go 1.26.0

require (
github.com/cespare/xxhash/v2 v2.3.0
Expand Down
Loading