Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions pkg/framework/interface/datalayer/pricing/cost_digest_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion pkg/framework/plugins/datalayer/requestmetadata/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type ModelMetrics struct {
Requests int64
AvgTTFT float64
AvgTPOT float64
LastObservedAt int64 // Unix nanoseconds of the last TTFT EMA update; 0 if never observed.
LastObservedAt int64
}

func (r ModelMetrics) Clone() datalayer.Cloneable { return r }
Expand All @@ -119,6 +119,7 @@ func (s *modelIntervalAccumulator) flush(now time.Time, model string, alpha floa
}
if s.tpotN > 0 {
s.AvgTPOT = ema(s.AvgTPOT, s.tpotSum/float64(s.tpotN), alpha)
s.LastObservedAt = now.UnixNano()
metrics.RecordModelAvgTPOT(model, s.AvgTPOT)
}
s.intervalStart = now
Expand Down
71 changes: 62 additions & 9 deletions pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package avgtpot
import (
"context"
"encoding/json"
"fmt"
"math"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -29,28 +31,66 @@ import (
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling"
requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay"
)

const PluginType = "avg-tpot-scorer"
const (
PluginType = "avg-tpot-scorer"

defaultDecayWeight = 1.0
defaultStalenessThreshold = 30 * time.Second
)

// compile-time interface assertion
var _ modelselector.Scorer = &AvgTPOTScorer{}

// AvgTPOTScorerConfig holds the scorer's JSON parameters.
type AvgTPOTScorerConfig struct {
// DecayWeight scales the staleness decay in [0,1]; 0 disables. Default 1.0.
DecayWeight *float64 `json:"decayWeight,omitempty"`
// StalenessThreshold is the elapsed time for full staleness (e.g. "30s"). Default "30s".
StalenessThreshold string `json:"stalenessThreshold,omitempty"`
}

// AvgTPOTScorer scores models based on their exponential moving average TPOT.
// The model with the lowest AvgTPOT scores 1.0; the highest scores 0.0.
// Models with no observed TPOT yet (AvgTPOT == 0) are treated as idle and score 1.0.
// If all models have the same AvgTPOT, all score 1.0.
// Stale EMAs are decayed toward zero (see the decay package); set DecayWeight=0 to disable.
type AvgTPOTScorer struct {
typedName plugin.TypedName
decayCfg decay.Config
}

func ScorerFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
return NewAvgTPOTScorer().WithName(name), nil
func ScorerFactory(name string, parameters json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
config := AvgTPOTScorerConfig{
StalenessThreshold: defaultStalenessThreshold.String(),
}
if len(parameters) > 0 {
if err := json.Unmarshal(parameters, &config); err != nil {
return nil, fmt.Errorf("failed to parse parameters for plugin %q: %w", name, err)
}
}
weight := defaultDecayWeight
if config.DecayWeight != nil {
weight = *config.DecayWeight
}
if weight < 0 || weight > 1 {
return nil, fmt.Errorf("invalid decayWeight %v for plugin %q: must be in [0, 1]", weight, name)
}
threshold, err := time.ParseDuration(config.StalenessThreshold)
if err != nil {
return nil, fmt.Errorf("invalid stalenessThreshold %q for plugin %q: %w", config.StalenessThreshold, name, err)
}
return NewAvgTPOTScorer().
WithName(name).
WithDecay(decay.Config{Weight: weight, Threshold: threshold}), nil
}

func NewAvgTPOTScorer() *AvgTPOTScorer {
return &AvgTPOTScorer{
typedName: plugin.TypedName{Type: PluginType, Name: PluginType},
decayCfg: decay.Config{Weight: defaultDecayWeight, Threshold: defaultStalenessThreshold},
}
}

Expand All @@ -61,15 +101,21 @@ func (s *AvgTPOTScorer) WithName(name string) *AvgTPOTScorer {
return s
}

// Score returns a score in [0,1] for each model.
// Formula: score = (max - avgTPOT) / (max - min)
// WithDecay overrides the decay configuration.
func (s *AvgTPOTScorer) WithDecay(cfg decay.Config) *AvgTPOTScorer {
s.decayCfg = cfg
return s
}

// Score returns score = (max - avgTPOT)/(max - min) per model, with optional staleness decay.
func (s *AvgTPOTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requesthandling.InferenceRequest, models []datalayer.Model) map[datalayer.Model]float64 {
now := time.Now()
tpots := make(map[datalayer.Model]float64, len(models))
minTPOT := math.MaxFloat64
maxTPOT := 0.0

for _, model := range models {
v := avgTPOT(model)
v := s.avgTPOT(model, now)
tpots[model] = v
if v > maxTPOT {
maxTPOT = v
Expand Down Expand Up @@ -97,8 +143,8 @@ func (s *AvgTPOTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ
return scores
}

// avgTPOT returns the AvgTPOT for a model, or 0 if not yet observed.
func avgTPOT(model datalayer.Model) float64 {
// avgTPOT returns the decay-adjusted AvgTPOT, or 0 if unobserved.
func (s *AvgTPOTScorer) avgTPOT(model datalayer.Model, now time.Time) float64 {
val, ok := model.GetAttributes().Get(requestmetadata.RequestMetadataAttributeKey)
if !ok {
return 0
Expand All @@ -107,5 +153,12 @@ func avgTPOT(model datalayer.Model) float64 {
if !ok {
return 0
}
return rc.AvgTPOT
if rc.AvgTPOT == 0 {
return 0
}
var lastObservedAt time.Time
if rc.LastObservedAt > 0 {
lastObservedAt = time.Unix(0, rc.LastObservedAt)
}
return decay.Apply(rc.AvgTPOT, lastObservedAt, rc.Requests, now, s.decayCfg)
}
138 changes: 138 additions & 0 deletions pkg/framework/plugins/modelselector/scorer/avgtpot/plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
Copyright 2026 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package avgtpot

import (
"context"
"testing"
"time"

fwdatalayer "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer"
requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay"
)

func modelWithAvgTPOT(name string, avgTPOT float64) fwdatalayer.Model {
model := fwdatalayer.NewModel(name)
model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, requestmetadata.ModelMetrics{
AvgTPOT: avgTPOT,
})
return model
}

func modelWithNoAttribute(name string) fwdatalayer.Model {
return fwdatalayer.NewModel(name)
}

func modelWithMetrics(name string, requests int64, lastObservedAt time.Time) fwdatalayer.Model {
model := fwdatalayer.NewModel(name)
model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, requestmetadata.ModelMetrics{
AvgTPOT: 0.1,
Requests: requests,
LastObservedAt: lastObservedAt.UnixNano(),
})
return model
}

func TestAvgTPOTScorer(t *testing.T) {
scorer := NewAvgTPOTScorer()

tests := []struct {
name string
models []fwdatalayer.Model
expectedScores []float64
}{
{
name: "lower TPOT gets higher score",
models: []fwdatalayer.Model{
modelWithAvgTPOT("fast", 0.02),
modelWithAvgTPOT("slow", 0.1),
},
expectedScores: []float64{1.0, 0.0},
},
{
name: "equal TPOT — all score 1.0",
models: []fwdatalayer.Model{
modelWithAvgTPOT("m1", 0.05),
modelWithAvgTPOT("m2", 0.05),
},
expectedScores: []float64{1.0, 1.0},
},
{
name: "no attribute scores optimistically (treated as 0)",
models: []fwdatalayer.Model{
modelWithAvgTPOT("observed", 0.05),
modelWithNoAttribute("unobserved"),
},
expectedScores: []float64{0.0, 1.0},
},
{
name: "three models — intermediate score is normalised",
// min=0.25, max=0.75; middle=0.5 → (0.75-0.5)/(0.75-0.25) = 0.5
// 0.25, 0.5, 0.75 are exact in float64 so the comparison is safe without epsilon.
models: []fwdatalayer.Model{
modelWithAvgTPOT("fast", 0.25),
modelWithAvgTPOT("mid", 0.5),
modelWithAvgTPOT("slow", 0.75),
},
expectedScores: []float64{1.0, 0.5, 0.0},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scores := scorer.Score(context.Background(), nil, nil, tt.models)
for i, model := range tt.models {
got := scores[model]
want := tt.expectedScores[i]
if got != want {
t.Errorf("model[%d] %q: expected score %f, got %f", i, model.GetName(), want, got)
}
}
})
}
}

// TestStalenessDecay verifies the decay recovers a stale idle model.
func TestStalenessDecay(t *testing.T) {
scorer := NewAvgTPOTScorer()
now := time.Now()

// LastObservedAt = 60s ago (2× threshold), Requests=0 → decay=1.0 → effective TPOT=0
stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second))
other := modelWithAvgTPOT("other", 0.05)
scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other})
if scores[stale] != 1.0 {
t.Errorf("fully stale idle model: expected score 1.0, got %f", scores[stale])
}
}

// TestDecayDisabled verifies DecayWeight=0 ignores staleness entirely.
func TestDecayDisabled(t *testing.T) {
scorer := NewAvgTPOTScorer().WithDecay(decay.Config{Weight: 0, Threshold: 30 * time.Second})
now := time.Now()

stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second))
other := modelWithAvgTPOT("other", 0.05)
scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other})
if scores[stale] != 0.0 {
t.Errorf("decay-disabled stale model: expected score 0.0 (raw EMA), got %f", scores[stale])
}
if scores[other] != 1.0 {
t.Errorf("decay-disabled other model: expected score 1.0, got %f", scores[other])
}
}
Loading
Loading