From a2866c3a5329c70e4d2cfda0948e72ba5473b596 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sun, 7 Jun 2026 12:51:00 +0300 Subject: [PATCH 1/8] Add avg TPOT scorer. Signed-off-by: Mohammad --- cmd/runner/runner.go | 2 + .../modelselector/scorer/avgtpot/plugin.go | 111 ++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go diff --git a/cmd/runner/runner.go b/cmd/runner/runner.go index a7462e5..693299e 100644 --- a/cmd/runner/runner.go +++ b/cmd/runner/runner.go @@ -50,6 +50,7 @@ import ( "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/picker/maxscore" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/picker/random" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/picker/weightedrandom" + avgtpotscorer "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/avgtpot" inflightrequestsscorer "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/inflightrequests" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/requesthandling/basemodelextractor" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/requesthandling/bodyfieldtoheader" @@ -281,6 +282,7 @@ func (r *Runner) registerInTreePlugins() { plugin.Register(maxscore.MaxScorePickerType, maxscore.MaxScorePickerFactory) plugin.Register(weightedrandom.WeightedRandomPickerType, weightedrandom.WeightedRandomPickerFactory) plugin.Register(modelselectorplugin.ModelSelectorPluginType, modelselectorplugin.ModelSelectorPluginFactory) + plugin.Register(avgtpotscorer.PluginType, avgtpotscorer.ScorerFactory) plugin.Register(inflightrequestsscorer.PluginType, inflightrequestsscorer.ScorerFactory) } diff --git a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go new file mode 100644 index 0000000..98ebbac --- /dev/null +++ b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go @@ -0,0 +1,111 @@ +/* +Copyright 2026 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package avgtpot + +import ( + "context" + "encoding/json" + "math" + + "sigs.k8s.io/controller-runtime/pkg/log" + + logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/modelselector" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling" + requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata" +) + +const PluginType = "avg-tpot-scorer" + +// compile-time interface assertion +var _ modelselector.Scorer = &AvgTPOTScorer{} + +// AvgTPOTScorer scores models based on their exponential moving average TPOT. +// The model with the lowest AvgTPOT scores 1.0; the highest scores 0.0. +// Models with no observed TPOT yet (AvgTPOT == 0) are treated as idle and score 1.0. +// If all models have the same AvgTPOT, all score 1.0. +type AvgTPOTScorer struct { + typedName plugin.TypedName +} + +func ScorerFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { + return NewAvgTPOTScorer().WithName(name), nil +} + +func NewAvgTPOTScorer() *AvgTPOTScorer { + return &AvgTPOTScorer{ + typedName: plugin.TypedName{Type: PluginType, Name: PluginType}, + } +} + +func (s *AvgTPOTScorer) TypedName() plugin.TypedName { return s.typedName } + +func (s *AvgTPOTScorer) WithName(name string) *AvgTPOTScorer { + s.typedName.Name = name + return s +} + +// Score returns a score in [0,1] for each model. +// Formula: score = (max - avgTPOT) / (max - min) +func (s *AvgTPOTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requesthandling.InferenceRequest, models []datalayer.Model) map[datalayer.Model]float64 { + tpots := make(map[datalayer.Model]float64, len(models)) + minTPOT := math.MaxFloat64 + maxTPOT := 0.0 + + for _, model := range models { + v := avgTPOT(model) + tpots[model] = v + if v > maxTPOT { + maxTPOT = v + } + if v < minTPOT { + minTPOT = v + } + } + + scores := make(map[datalayer.Model]float64, len(models)) + for _, model := range models { + if maxTPOT == minTPOT { + scores[model] = 1.0 + } else { + scores[model] = (maxTPOT - tpots[model]) / (maxTPOT - minTPOT) + } + } + + if debugLogger := log.FromContext(ctx).V(logutil.DEBUG); debugLogger.Enabled() { + for _, model := range models { + debugLogger.Info("avg-tpot score", "model", model.GetName(), "avgTPOT", tpots[model], "score", scores[model]) + } + } + + return scores +} + +// avgTPOT returns the AvgTPOT for a model, or 0 if not yet observed. +func avgTPOT(model datalayer.Model) float64 { + val, ok := model.GetAttributes().Get(requestmetadata.RequestMetadataAttributeKey) + if !ok { + return 0 + } + rc, ok := val.(requestmetadata.ModelMetrics) + if !ok { + return 0 + } + return rc.AvgTPOT +} From f6749aab937ef371c5908aa5f2da5d825c5c0b4d Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 11 Jun 2026 09:23:55 +0300 Subject: [PATCH 2/8] Add configurable staleness decay shared by TTFT and TPOT scorers. Signed-off-by: Mohammad Nassar --- .../datalayer/requestmetadata/plugin.go | 3 +- .../modelselector/scorer/avgtpot/plugin.go | 72 ++++++++++++++--- .../modelselector/scorer/avgttft/plugin.go | 78 ++++++++++++------- .../scorer/avgttft/plugin_test.go | 18 +++++ .../scorer/internal/decay/decay.go | 46 +++++++++++ 5 files changed, 180 insertions(+), 37 deletions(-) create mode 100644 pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go diff --git a/pkg/framework/plugins/datalayer/requestmetadata/plugin.go b/pkg/framework/plugins/datalayer/requestmetadata/plugin.go index c6abe8d..e121667 100644 --- a/pkg/framework/plugins/datalayer/requestmetadata/plugin.go +++ b/pkg/framework/plugins/datalayer/requestmetadata/plugin.go @@ -93,7 +93,7 @@ type ModelMetrics struct { Requests int64 AvgTTFT float64 AvgTPOT float64 - LastObservedAt int64 // Unix nanoseconds of the last TTFT EMA update; 0 if never observed. + LastObservedAt int64 } func (r ModelMetrics) Clone() datalayer.Cloneable { return r } @@ -119,6 +119,7 @@ func (s *modelIntervalAccumulator) flush(now time.Time, model string, alpha floa } if s.tpotN > 0 { s.AvgTPOT = ema(s.AvgTPOT, s.tpotSum/float64(s.tpotN), alpha) + s.LastObservedAt = now.UnixNano() metrics.RecordModelAvgTPOT(model, s.AvgTPOT) } s.intervalStart = now diff --git a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go index 98ebbac..1abb5f0 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go +++ b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go @@ -19,7 +19,9 @@ package avgtpot import ( "context" "encoding/json" + "fmt" "math" + "time" "sigs.k8s.io/controller-runtime/pkg/log" @@ -29,28 +31,67 @@ import ( "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling" requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay" ) -const PluginType = "avg-tpot-scorer" +const ( + PluginType = "avg-tpot-scorer" + + // defaultDecayWeight = 0 keeps the scorer's behavior at raw AvgTPOT (no decay). + defaultDecayWeight = 0.0 + defaultStalenessThreshold = 30 * time.Second +) // compile-time interface assertion var _ modelselector.Scorer = &AvgTPOTScorer{} +// AvgTPOTScorerConfig holds the scorer's JSON parameters. +type AvgTPOTScorerConfig struct { + // DecayWeight scales the staleness decay in [0,1]; default 0 (disabled). + DecayWeight *float64 `json:"decayWeight,omitempty"` + // StalenessThreshold is the elapsed time for full staleness (e.g. "30s"). Default "30s". + StalenessThreshold string `json:"stalenessThreshold,omitempty"` +} + // AvgTPOTScorer scores models based on their exponential moving average TPOT. // The model with the lowest AvgTPOT scores 1.0; the highest scores 0.0. // Models with no observed TPOT yet (AvgTPOT == 0) are treated as idle and score 1.0. // If all models have the same AvgTPOT, all score 1.0. +// Stale EMAs can be decayed toward zero (see the decay package); off by default — set DecayWeight > 0 to enable. type AvgTPOTScorer struct { typedName plugin.TypedName + decayCfg decay.Config } -func ScorerFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { - return NewAvgTPOTScorer().WithName(name), nil +func ScorerFactory(name string, parameters json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { + config := AvgTPOTScorerConfig{ + StalenessThreshold: defaultStalenessThreshold.String(), + } + if len(parameters) > 0 { + if err := json.Unmarshal(parameters, &config); err != nil { + return nil, fmt.Errorf("failed to parse parameters for plugin %q: %w", name, err) + } + } + weight := defaultDecayWeight + if config.DecayWeight != nil { + weight = *config.DecayWeight + } + if weight < 0 || weight > 1 { + return nil, fmt.Errorf("invalid decayWeight %v for plugin %q: must be in [0, 1]", weight, name) + } + threshold, err := time.ParseDuration(config.StalenessThreshold) + if err != nil { + return nil, fmt.Errorf("invalid stalenessThreshold %q for plugin %q: %w", config.StalenessThreshold, name, err) + } + return NewAvgTPOTScorer(). + WithName(name). + WithDecay(decay.Config{Weight: weight, Threshold: threshold}), nil } func NewAvgTPOTScorer() *AvgTPOTScorer { return &AvgTPOTScorer{ typedName: plugin.TypedName{Type: PluginType, Name: PluginType}, + decayCfg: decay.Config{Weight: defaultDecayWeight, Threshold: defaultStalenessThreshold}, } } @@ -61,15 +102,21 @@ func (s *AvgTPOTScorer) WithName(name string) *AvgTPOTScorer { return s } -// Score returns a score in [0,1] for each model. -// Formula: score = (max - avgTPOT) / (max - min) +// WithDecay overrides the decay configuration. +func (s *AvgTPOTScorer) WithDecay(cfg decay.Config) *AvgTPOTScorer { + s.decayCfg = cfg + return s +} + +// Score returns score = (max - avgTPOT)/(max - min) per model, with optional staleness decay. func (s *AvgTPOTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requesthandling.InferenceRequest, models []datalayer.Model) map[datalayer.Model]float64 { + now := time.Now() tpots := make(map[datalayer.Model]float64, len(models)) minTPOT := math.MaxFloat64 maxTPOT := 0.0 for _, model := range models { - v := avgTPOT(model) + v := s.avgTPOT(model, now) tpots[model] = v if v > maxTPOT { maxTPOT = v @@ -97,8 +144,8 @@ func (s *AvgTPOTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ return scores } -// avgTPOT returns the AvgTPOT for a model, or 0 if not yet observed. -func avgTPOT(model datalayer.Model) float64 { +// avgTPOT returns the decay-adjusted AvgTPOT, or 0 if unobserved. +func (s *AvgTPOTScorer) avgTPOT(model datalayer.Model, now time.Time) float64 { val, ok := model.GetAttributes().Get(requestmetadata.RequestMetadataAttributeKey) if !ok { return 0 @@ -107,5 +154,12 @@ func avgTPOT(model datalayer.Model) float64 { if !ok { return 0 } - return rc.AvgTPOT + if rc.AvgTPOT == 0 { + return 0 + } + var lastObservedAt time.Time + if rc.LastObservedAt > 0 { + lastObservedAt = time.Unix(0, rc.LastObservedAt) + } + return decay.Apply(rc.AvgTPOT, lastObservedAt, rc.Requests, now, s.decayCfg) } diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go index 3bfb92d..a19a08e 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go @@ -19,6 +19,7 @@ package avgttft import ( "context" "encoding/json" + "fmt" "math" "time" @@ -30,33 +31,66 @@ import ( "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin" "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling" requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay" ) const ( PluginType = "avg-ttft-scorer" - // stalenessThreshold is the duration after which an unupdated AvgTTFT EMA is considered stale. - stalenessThreshold = 30 * time.Second + defaultDecayWeight = 1.0 + defaultStalenessThreshold = 30 * time.Second ) // compile-time interface assertion var _ modelselector.Scorer = &AvgTTFTScorer{} +// AvgTTFTScorerConfig holds the scorer's JSON parameters. +type AvgTTFTScorerConfig struct { + // DecayWeight scales the staleness decay in [0,1]; 0 disables. Default 1.0. + DecayWeight *float64 `json:"decayWeight,omitempty"` + // StalenessThreshold is the elapsed time for full staleness (e.g. "30s"). Default "30s". + StalenessThreshold string `json:"stalenessThreshold,omitempty"` +} + // AvgTTFTScorer scores models based on their exponential moving average TTFT. // The model with the lowest AvgTTFT scores 1.0; the highest scores 0.0. // Models with no observed TTFT yet (AvgTTFT == 0) are treated as idle and score 1.0. // If all models have the same AvgTTFT, all score 1.0. +// Stale EMAs are decayed toward zero (see the decay package); set DecayWeight=0 to disable. type AvgTTFTScorer struct { typedName plugin.TypedName + decayCfg decay.Config } -func ScorerFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { - return NewAvgTTFTScorer().WithName(name), nil +func ScorerFactory(name string, parameters json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { + config := AvgTTFTScorerConfig{ + StalenessThreshold: defaultStalenessThreshold.String(), + } + if len(parameters) > 0 { + if err := json.Unmarshal(parameters, &config); err != nil { + return nil, fmt.Errorf("failed to parse parameters for plugin %q: %w", name, err) + } + } + weight := defaultDecayWeight + if config.DecayWeight != nil { + weight = *config.DecayWeight + } + if weight < 0 || weight > 1 { + return nil, fmt.Errorf("invalid decayWeight %v for plugin %q: must be in [0, 1]", weight, name) + } + threshold, err := time.ParseDuration(config.StalenessThreshold) + if err != nil { + return nil, fmt.Errorf("invalid stalenessThreshold %q for plugin %q: %w", config.StalenessThreshold, name, err) + } + return NewAvgTTFTScorer(). + WithName(name). + WithDecay(decay.Config{Weight: weight, Threshold: threshold}), nil } func NewAvgTTFTScorer() *AvgTTFTScorer { return &AvgTTFTScorer{ typedName: plugin.TypedName{Type: PluginType, Name: PluginType}, + decayCfg: decay.Config{Weight: defaultDecayWeight, Threshold: defaultStalenessThreshold}, } } @@ -67,11 +101,13 @@ func (s *AvgTTFTScorer) WithName(name string) *AvgTTFTScorer { return s } -// Score returns a score in [0,1] for each model. -// Formula: score = (max - avgTTFT) / (max - min) -// avgTTFT applies a staleness decay to AvgTTFT when the EMA has not been -// updated recently and the model has few in-flight requests, allowing models -// that have recovered from saturation to regain a competitive score. +// WithDecay overrides the decay configuration. +func (s *AvgTTFTScorer) WithDecay(cfg decay.Config) *AvgTTFTScorer { + s.decayCfg = cfg + return s +} + +// Score returns score = (max - avgTTFT)/(max - min) per model, with optional staleness decay. func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requesthandling.InferenceRequest, models []datalayer.Model) map[datalayer.Model]float64 { now := time.Now() ttfts := make(map[datalayer.Model]float64, len(models)) @@ -79,7 +115,7 @@ func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ maxTTFT := 0.0 for _, model := range models { - v := avgTTFT(model, now) + v := s.avgTTFT(model, now) ttfts[model] = v if v > maxTTFT { maxTTFT = v @@ -107,16 +143,8 @@ func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ return scores } -// avgTTFT returns a decay-adjusted AvgTTFT for scoring. -// When the EMA is stale and the model has few in-flight requests, the returned -// value is smoothly reduced toward 0 so the model regains a competitive score. -// -// decay = staleness × idleness -// result = AvgTTFT × (1 - decay) -// -// staleness = min(elapsed / stalenessThreshold, 1.0), grows to 1 over threshold -// idleness = 1 / (1 + Requests), 1.0 when idle, drops under load -func avgTTFT(model datalayer.Model, now time.Time) float64 { +// avgTTFT returns the decay-adjusted AvgTTFT, or 0 if unobserved. +func (s *AvgTTFTScorer) avgTTFT(model datalayer.Model, now time.Time) float64 { val, ok := model.GetAttributes().Get(requestmetadata.RequestMetadataAttributeKey) if !ok { return 0 @@ -128,13 +156,9 @@ func avgTTFT(model datalayer.Model, now time.Time) float64 { if rc.AvgTTFT == 0 { return 0 } - - staleness := 0.0 + var lastObservedAt time.Time if rc.LastObservedAt > 0 { - elapsed := now.Sub(time.Unix(0, rc.LastObservedAt)) - staleness = math.Min(float64(elapsed)/float64(stalenessThreshold), 1.0) + lastObservedAt = time.Unix(0, rc.LastObservedAt) } - idleness := 1.0 / (1.0 + float64(rc.Requests)) - decay := staleness * idleness - return rc.AvgTTFT * (1 - decay) + return decay.Apply(rc.AvgTTFT, lastObservedAt, rc.Requests, now, s.decayCfg) } diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go index 708852f..7ed90d5 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go @@ -23,6 +23,7 @@ import ( fwdatalayer "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer" requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay" ) func modelWithAvgTTFT(name string, avgTTFT float64) fwdatalayer.Model { @@ -142,3 +143,20 @@ func TestStalenessDecay(t *testing.T) { } }) } + +// TestDecayDisabled verifies DecayWeight=0 ignores staleness entirely. +func TestDecayDisabled(t *testing.T) { + scorer := NewAvgTTFTScorer().WithDecay(decay.Config{Weight: 0, Threshold: 30 * time.Second}) + now := time.Now() + + // With decay off, the stale model keeps its raw TTFT and scores 0.0 (not 1.0). + stale := modelWithMetrics("stale", 1.0, 0, now.Add(-60*time.Second)) + other := modelWithAvgTTFT("other", 0.5) + scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other}) + if scores[stale] != 0.0 { + t.Errorf("decay-disabled stale model: expected score 0.0 (raw EMA), got %f", scores[stale]) + } + if scores[other] != 1.0 { + t.Errorf("decay-disabled other model: expected score 1.0, got %f", scores[other]) + } +} diff --git a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go new file mode 100644 index 0000000..f341433 --- /dev/null +++ b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go @@ -0,0 +1,46 @@ +/* +Copyright 2026 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package decay reduces an EMA-based metric toward zero as it goes stale. +package decay + +import ( + "math" + "time" +) + +// Config controls the decay applied by Apply. +type Config struct { + Weight float64 // [0,1]; 0 disables decay. + Threshold time.Duration // staleness reaches full strength after this elapsed time. +} + +// Apply returns ema * (1 - weight * staleness * idleness), where +// staleness = min(elapsed/threshold, 1) and idleness = 1/(1+requests). +// Returns ema unchanged when weight or threshold are non-positive, or lastObservedAt is zero. +func Apply(ema float64, lastObservedAt time.Time, requests int64, now time.Time, cfg Config) float64 { + if cfg.Weight <= 0 || cfg.Threshold <= 0 || lastObservedAt.IsZero() { + return ema + } + elapsed := now.Sub(lastObservedAt) + if elapsed <= 0 { + return ema + } + staleness := math.Min(float64(elapsed)/float64(cfg.Threshold), 1.0) + idleness := 1.0 / (1.0 + float64(requests)) + d := cfg.Weight * staleness * idleness + return ema * (1 - d) +} From fe8d0ee98db67c6610bfcf0801f5cceafb2dd5c0 Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 11 Jun 2026 11:04:30 +0300 Subject: [PATCH 3/8] Align both scorers. Signed-off-by: Mohammad Nassar --- .../plugins/modelselector/scorer/avgtpot/plugin.go | 7 +++---- .../modelselector/scorer/avgttft/plugin_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go index 1abb5f0..675824d 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go +++ b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin.go @@ -37,8 +37,7 @@ import ( const ( PluginType = "avg-tpot-scorer" - // defaultDecayWeight = 0 keeps the scorer's behavior at raw AvgTPOT (no decay). - defaultDecayWeight = 0.0 + defaultDecayWeight = 1.0 defaultStalenessThreshold = 30 * time.Second ) @@ -47,7 +46,7 @@ var _ modelselector.Scorer = &AvgTPOTScorer{} // AvgTPOTScorerConfig holds the scorer's JSON parameters. type AvgTPOTScorerConfig struct { - // DecayWeight scales the staleness decay in [0,1]; default 0 (disabled). + // DecayWeight scales the staleness decay in [0,1]; 0 disables. Default 1.0. DecayWeight *float64 `json:"decayWeight,omitempty"` // StalenessThreshold is the elapsed time for full staleness (e.g. "30s"). Default "30s". StalenessThreshold string `json:"stalenessThreshold,omitempty"` @@ -57,7 +56,7 @@ type AvgTPOTScorerConfig struct { // The model with the lowest AvgTPOT scores 1.0; the highest scores 0.0. // Models with no observed TPOT yet (AvgTPOT == 0) are treated as idle and score 1.0. // If all models have the same AvgTPOT, all score 1.0. -// Stale EMAs can be decayed toward zero (see the decay package); off by default — set DecayWeight > 0 to enable. +// Stale EMAs are decayed toward zero (see the decay package); set DecayWeight=0 to disable. type AvgTPOTScorer struct { typedName plugin.TypedName decayCfg decay.Config diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go index 7ed90d5..d4c45d6 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go @@ -38,10 +38,10 @@ func modelWithNoAttribute(name string) fwdatalayer.Model { return fwdatalayer.NewModel(name) } -func modelWithMetrics(name string, avgTTFT float64, requests int64, lastObservedAt time.Time) fwdatalayer.Model { +func modelWithMetrics(name string, requests int64, lastObservedAt time.Time) fwdatalayer.Model { model := fwdatalayer.NewModel(name) model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, requestmetadata.ModelMetrics{ - AvgTTFT: avgTTFT, + AvgTTFT: 1.0, Requests: requests, LastObservedAt: lastObservedAt.UnixNano(), }) @@ -113,7 +113,7 @@ func TestStalenessDecay(t *testing.T) { t.Run("fresh model — no decay applied", func(t *testing.T) { // LastObservedAt = now → staleness = 0 → effective TTFT = raw AvgTTFT - fresh := modelWithMetrics("fresh", 1.0, 0, now) + fresh := modelWithMetrics("fresh", 0, now) other := modelWithNoAttribute("other") // AvgTTFT=0, scores 1.0 scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{fresh, other}) if scores[fresh] != 0.0 { @@ -123,7 +123,7 @@ func TestStalenessDecay(t *testing.T) { t.Run("fully stale idle model — full decay, scores 1.0", func(t *testing.T) { // LastObservedAt = 60s ago (2× threshold), Requests=0 → decay=1.0 → effective TTFT=0 - stale := modelWithMetrics("stale", 1.0, 0, now.Add(-60*time.Second)) + stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second)) other := modelWithAvgTTFT("other", 0.5) scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other}) if scores[stale] != 1.0 { @@ -134,7 +134,7 @@ func TestStalenessDecay(t *testing.T) { t.Run("stale but still busy — decay suppressed by load", func(t *testing.T) { // staleness=1.0, Requests=9 → idleness=0.1 → decay=0.1 → effective TTFT = 0.9 × raw // The stale-busy model should still score lower than a fresh idle model. - staleBusy := modelWithMetrics("stale-busy", 1.0, 9, now.Add(-60*time.Second)) + staleBusy := modelWithMetrics("stale-busy", 9, now.Add(-60*time.Second)) fresh := modelWithAvgTTFT("fresh", 0.1) scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{staleBusy, fresh}) if scores[staleBusy] >= scores[fresh] { @@ -150,7 +150,7 @@ func TestDecayDisabled(t *testing.T) { now := time.Now() // With decay off, the stale model keeps its raw TTFT and scores 0.0 (not 1.0). - stale := modelWithMetrics("stale", 1.0, 0, now.Add(-60*time.Second)) + stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second)) other := modelWithAvgTTFT("other", 0.5) scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other}) if scores[stale] != 0.0 { From ac865f275bb8a87d705c0e87700db97ac7776bb8 Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 11 Jun 2026 11:18:41 +0300 Subject: [PATCH 4/8] Add decay test. Signed-off-by: Mohammad Nassar --- .../scorer/internal/decay/decay_test.go | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go diff --git a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go new file mode 100644 index 0000000..c00d7ce --- /dev/null +++ b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2026 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package decay + +import ( + "math" + "testing" + "time" +) + +// TestApply covers the decay formula across weight/threshold/staleness/idleness combinations. +func TestApply(t *testing.T) { + now := time.Unix(1_000_000, 0) + threshold := 30 * time.Second + + tests := []struct { + name string + ema float64 + lastObservedAt time.Time + requests int64 + cfg Config + want float64 + }{ + { + name: "weight zero — raw ema returned", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 0, + cfg: Config{Weight: 0, Threshold: threshold}, + want: 1.0, + }, + { + name: "threshold zero — raw ema returned", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 0, + cfg: Config{Weight: 1, Threshold: 0}, + want: 1.0, + }, + { + name: "never observed — raw ema returned", + ema: 1.0, + lastObservedAt: time.Time{}, + requests: 0, + cfg: Config{Weight: 1, Threshold: threshold}, + want: 1.0, + }, + { + name: "fresh observation — no decay", + ema: 1.0, + lastObservedAt: now, + requests: 0, + cfg: Config{Weight: 1, Threshold: threshold}, + want: 1.0, + }, + { + name: "fully stale idle — full decay to zero", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 0, + cfg: Config{Weight: 1, Threshold: threshold}, + want: 0.0, + }, + { + name: "fully stale busy — decay suppressed by load", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 9, + cfg: Config{Weight: 1, Threshold: threshold}, + // idleness = 1/10 = 0.1, staleness = 1.0, weight = 1 → decay = 0.1 → result = 0.9 + want: 0.9, + }, + { + name: "half threshold idle — half decay", + ema: 1.0, + lastObservedAt: now.Add(-15 * time.Second), + requests: 0, + cfg: Config{Weight: 1, Threshold: threshold}, + // staleness = 0.5, idleness = 1, weight = 1 → decay = 0.5 → result = 0.5 + want: 0.5, + }, + { + name: "half weight scales decay", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 0, + cfg: Config{Weight: 0.5, Threshold: threshold}, + // staleness = 1, idleness = 1, weight = 0.5 → decay = 0.5 → result = 0.5 + want: 0.5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := Apply(tt.ema, tt.lastObservedAt, tt.requests, now, tt.cfg) + if math.Abs(got-tt.want) > 1e-9 { + t.Errorf("Apply() = %f, want %f", got, tt.want) + } + }) + } +} From b75ca597d4b47b58015752a31a16c8b2dae20b57 Mon Sep 17 00:00:00 2001 From: Mohammad Nassar Date: Thu, 11 Jun 2026 11:37:51 +0300 Subject: [PATCH 5/8] Add tpot tests. Signed-off-by: Mohammad Nassar --- .../scorer/avgtpot/plugin_test.go | 138 ++++++++++++++++++ .../scorer/avgttft/plugin_test.go | 11 -- 2 files changed, 138 insertions(+), 11 deletions(-) create mode 100644 pkg/framework/plugins/modelselector/scorer/avgtpot/plugin_test.go diff --git a/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin_test.go b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin_test.go new file mode 100644 index 0000000..cc593ef --- /dev/null +++ b/pkg/framework/plugins/modelselector/scorer/avgtpot/plugin_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2026 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package avgtpot + +import ( + "context" + "testing" + "time" + + fwdatalayer "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer" + requestmetadata "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/datalayer/requestmetadata" + "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/plugins/modelselector/scorer/internal/decay" +) + +func modelWithAvgTPOT(name string, avgTPOT float64) fwdatalayer.Model { + model := fwdatalayer.NewModel(name) + model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, requestmetadata.ModelMetrics{ + AvgTPOT: avgTPOT, + }) + return model +} + +func modelWithNoAttribute(name string) fwdatalayer.Model { + return fwdatalayer.NewModel(name) +} + +func modelWithMetrics(name string, requests int64, lastObservedAt time.Time) fwdatalayer.Model { + model := fwdatalayer.NewModel(name) + model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, requestmetadata.ModelMetrics{ + AvgTPOT: 0.1, + Requests: requests, + LastObservedAt: lastObservedAt.UnixNano(), + }) + return model +} + +func TestAvgTPOTScorer(t *testing.T) { + scorer := NewAvgTPOTScorer() + + tests := []struct { + name string + models []fwdatalayer.Model + expectedScores []float64 + }{ + { + name: "lower TPOT gets higher score", + models: []fwdatalayer.Model{ + modelWithAvgTPOT("fast", 0.02), + modelWithAvgTPOT("slow", 0.1), + }, + expectedScores: []float64{1.0, 0.0}, + }, + { + name: "equal TPOT — all score 1.0", + models: []fwdatalayer.Model{ + modelWithAvgTPOT("m1", 0.05), + modelWithAvgTPOT("m2", 0.05), + }, + expectedScores: []float64{1.0, 1.0}, + }, + { + name: "no attribute scores optimistically (treated as 0)", + models: []fwdatalayer.Model{ + modelWithAvgTPOT("observed", 0.05), + modelWithNoAttribute("unobserved"), + }, + expectedScores: []float64{0.0, 1.0}, + }, + { + name: "three models — intermediate score is normalised", + // min=0.25, max=0.75; middle=0.5 → (0.75-0.5)/(0.75-0.25) = 0.5 + // 0.25, 0.5, 0.75 are exact in float64 so the comparison is safe without epsilon. + models: []fwdatalayer.Model{ + modelWithAvgTPOT("fast", 0.25), + modelWithAvgTPOT("mid", 0.5), + modelWithAvgTPOT("slow", 0.75), + }, + expectedScores: []float64{1.0, 0.5, 0.0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scores := scorer.Score(context.Background(), nil, nil, tt.models) + for i, model := range tt.models { + got := scores[model] + want := tt.expectedScores[i] + if got != want { + t.Errorf("model[%d] %q: expected score %f, got %f", i, model.GetName(), want, got) + } + } + }) + } +} + +// TestStalenessDecay verifies the decay recovers a stale idle model. +func TestStalenessDecay(t *testing.T) { + scorer := NewAvgTPOTScorer() + now := time.Now() + + // LastObservedAt = 60s ago (2× threshold), Requests=0 → decay=1.0 → effective TPOT=0 + stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second)) + other := modelWithAvgTPOT("other", 0.05) + scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other}) + if scores[stale] != 1.0 { + t.Errorf("fully stale idle model: expected score 1.0, got %f", scores[stale]) + } +} + +// TestDecayDisabled verifies DecayWeight=0 ignores staleness entirely. +func TestDecayDisabled(t *testing.T) { + scorer := NewAvgTPOTScorer().WithDecay(decay.Config{Weight: 0, Threshold: 30 * time.Second}) + now := time.Now() + + stale := modelWithMetrics("stale", 0, now.Add(-60*time.Second)) + other := modelWithAvgTPOT("other", 0.05) + scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{stale, other}) + if scores[stale] != 0.0 { + t.Errorf("decay-disabled stale model: expected score 0.0 (raw EMA), got %f", scores[stale]) + } + if scores[other] != 1.0 { + t.Errorf("decay-disabled other model: expected score 1.0, got %f", scores[other]) + } +} diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go index d4c45d6..b530fd1 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go @@ -131,17 +131,6 @@ func TestStalenessDecay(t *testing.T) { } }) - t.Run("stale but still busy — decay suppressed by load", func(t *testing.T) { - // staleness=1.0, Requests=9 → idleness=0.1 → decay=0.1 → effective TTFT = 0.9 × raw - // The stale-busy model should still score lower than a fresh idle model. - staleBusy := modelWithMetrics("stale-busy", 9, now.Add(-60*time.Second)) - fresh := modelWithAvgTTFT("fresh", 0.1) - scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{staleBusy, fresh}) - if scores[staleBusy] >= scores[fresh] { - t.Errorf("stale-busy model should score lower than fresh model: stale-busy=%f fresh=%f", - scores[staleBusy], scores[fresh]) - } - }) } // TestDecayDisabled verifies DecayWeight=0 ignores staleness entirely. From 2b1038d453b3efec6c4bdb59ccda20ad1cdd3df9 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Mon, 15 Jun 2026 12:56:47 +0300 Subject: [PATCH 6/8] Remove file. Signed-off-by: Mohammad --- .../datalayer/pricing/cost_digest_test.go | 40 ------------------- 1 file changed, 40 deletions(-) delete mode 100644 pkg/framework/interface/datalayer/pricing/cost_digest_test.go diff --git a/pkg/framework/interface/datalayer/pricing/cost_digest_test.go b/pkg/framework/interface/datalayer/pricing/cost_digest_test.go deleted file mode 100644 index e18c85e..0000000 --- a/pkg/framework/interface/datalayer/pricing/cost_digest_test.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2026 The llm-d Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pricing - -import ( - "testing" - - "github.com/caio/go-tdigest/v5" -) - -// newDigest builds a tdigest with the proposal's default compression and -// adds the given samples. It fails the test on any library error so that -// the call sites stay free of error handling. -func newDigest(t *testing.T, samples ...float64) *tdigest.TDigest { - t.Helper() - d, err := tdigest.New(tdigest.Compression(200)) - if err != nil { - t.Fatalf("tdigest.New: %v", err) - } - for _, s := range samples { - if err := d.Add(s); err != nil { - t.Fatalf("tdigest.Add(%v): %v", s, err) - } - } - return d -} From fbc536eaabcbc24d250fba4a6a621359e44e5954 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sun, 14 Jun 2026 17:12:23 +0300 Subject: [PATCH 7/8] Update avg-ttft-scorer with inflight-aware scoring with excess-load penalty. Signed-off-by: Mohammad --- examples/new-scorer-values.yaml | 41 ++++++++++ .../datalayer/requestmetadata/plugin.go | 6 +- .../modelselector/scorer/avgttft/plugin.go | 78 ++++++++++++++----- .../scorer/internal/decay/decay.go | 22 +++--- 4 files changed, 118 insertions(+), 29 deletions(-) create mode 100644 examples/new-scorer-values.yaml diff --git a/examples/new-scorer-values.yaml b/examples/new-scorer-values.yaml new file mode 100644 index 0000000..b4bdf31 --- /dev/null +++ b/examples/new-scorer-values.yaml @@ -0,0 +1,41 @@ +payloadProcessor: + listModels: + - facebook/opt-125m + - facebook/opt-350m + customConfig: + plugins: + - type: body-field-to-header + parameters: + fieldName: model + headerName: X-Gateway-Model-Name + - type: base-model-to-header + - type: model-selector + - type: avg-ttft-scorer + parameters: + decayWeight: 1.0 + stalenessThreshold: "10s" + inflightWeight: 1.0 + maxIdleProbes: 2 + - type: max-score-picker + - type: request-metadata-extractor + parameters: + emaAlpha: 0.1 + intervalDuration: 5s + - type: model-config-datasource + parameters: + modelsPath: /config/models.json + profiles: + - name: default + plugins: + request: + - pluginRef: model-selector + - pluginRef: avg-ttft-scorer + weight: 1.0 + - pluginRef: max-score-picker + - pluginRef: body-field-to-header + - pluginRef: base-model-to-header + datalayer: + extractors: + - pluginRef: request-metadata-extractor + datasources: + - pluginRef: model-config-datasource diff --git a/pkg/framework/plugins/datalayer/requestmetadata/plugin.go b/pkg/framework/plugins/datalayer/requestmetadata/plugin.go index e121667..b54a9db 100644 --- a/pkg/framework/plugins/datalayer/requestmetadata/plugin.go +++ b/pkg/framework/plugins/datalayer/requestmetadata/plugin.go @@ -88,9 +88,10 @@ func ExtractorFactory(name string, parameters json.RawMessage, h plugin.Handle) } // ModelMetrics holds per-model metadata: in-flight request count and -// EMA estimates for TTFT and TPOT. +// EMA estimates for TTFT, TPOT, and inflight count. type ModelMetrics struct { Requests int64 + AvgRequests float64 AvgTTFT float64 AvgTPOT float64 LastObservedAt int64 @@ -112,6 +113,9 @@ type modelIntervalAccumulator struct { // flush averages the accumulated interval observations into the EMA, emits Prometheus gauges, and resets the interval. func (s *modelIntervalAccumulator) flush(now time.Time, model string, alpha float64) { + // Always update AvgRequests — it samples the current inflight count each interval + // regardless of whether any responses arrived. + s.AvgRequests = ema(s.AvgRequests, float64(s.Requests), alpha) if s.ttftN > 0 { s.AvgTTFT = ema(s.AvgTTFT, s.ttftSum/float64(s.ttftN), alpha) s.LastObservedAt = now.UnixNano() diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go index a19a08e..e04558b 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin.go @@ -39,6 +39,7 @@ const ( defaultDecayWeight = 1.0 defaultStalenessThreshold = 30 * time.Second + defaultInflightWeight = 1.0 ) // compile-time interface assertion @@ -47,19 +48,32 @@ var _ modelselector.Scorer = &AvgTTFTScorer{} // AvgTTFTScorerConfig holds the scorer's JSON parameters. type AvgTTFTScorerConfig struct { // DecayWeight scales the staleness decay in [0,1]; 0 disables. Default 1.0. + // Decay is only applied when the model has at most MaxIdleProbes in-flight requests, + // so it cannot accidentally reward a saturated model. DecayWeight *float64 `json:"decayWeight,omitempty"` // StalenessThreshold is the elapsed time for full staleness (e.g. "30s"). Default "30s". StalenessThreshold string `json:"stalenessThreshold,omitempty"` + // InflightWeight scales the queue-depth penalty. + // predicted = baseTTFT * (1 + inflightWeight * requests) + // This is self-normalizing: a fast model (low AvgTTFT) incurs a small penalty per + // inflight request; a slow model incurs a proportionally larger one. + // 0 disables the inflight term (pure AvgTTFT behavior). Default 1.0. + InflightWeight *float64 `json:"inflightWeight,omitempty"` + // MaxIdleProbes is the maximum number of in-flight requests while staleness decay + // is still applied. Default 0 (stop decay at the first inflight request). + // Raising this to 2–3 lets a small parallel probe burst land before decay is + // suppressed, giving the EMA enough signal to recover quickly from staleness. + MaxIdleProbes *int64 `json:"maxIdleProbes,omitempty"` } -// AvgTTFTScorer scores models based on their exponential moving average TTFT. -// The model with the lowest AvgTTFT scores 1.0; the highest scores 0.0. -// Models with no observed TTFT yet (AvgTTFT == 0) are treated as idle and score 1.0. -// If all models have the same AvgTTFT, all score 1.0. -// Stale EMAs are decayed toward zero (see the decay package); set DecayWeight=0 to disable. +// AvgTTFTScorer scores models on predicted TTFT = baseTTFT × (1 + inflightWeight × requests), +// where baseTTFT is the EMA with optional staleness decay (applied only when the model is idle). +// The inflight term self-normalizes: a fast model incurs a small queue penalty per request, +// a slow model a proportionally larger one — no explicit capacity needed. type AvgTTFTScorer struct { - typedName plugin.TypedName - decayCfg decay.Config + typedName plugin.TypedName + decayCfg decay.Config + inflightWeight float64 } func ScorerFactory(name string, parameters json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { @@ -71,26 +85,36 @@ func ScorerFactory(name string, parameters json.RawMessage, _ plugin.Handle) (pl return nil, fmt.Errorf("failed to parse parameters for plugin %q: %w", name, err) } } - weight := defaultDecayWeight + decayWeight := defaultDecayWeight if config.DecayWeight != nil { - weight = *config.DecayWeight + decayWeight = *config.DecayWeight } - if weight < 0 || weight > 1 { - return nil, fmt.Errorf("invalid decayWeight %v for plugin %q: must be in [0, 1]", weight, name) + if decayWeight < 0 || decayWeight > 1 { + return nil, fmt.Errorf("invalid decayWeight %v for plugin %q: must be in [0, 1]", decayWeight, name) } threshold, err := time.ParseDuration(config.StalenessThreshold) if err != nil { return nil, fmt.Errorf("invalid stalenessThreshold %q for plugin %q: %w", config.StalenessThreshold, name, err) } + inflightWeight := defaultInflightWeight + if config.InflightWeight != nil { + inflightWeight = *config.InflightWeight + } + var maxIdleProbes int64 + if config.MaxIdleProbes != nil { + maxIdleProbes = *config.MaxIdleProbes + } return NewAvgTTFTScorer(). WithName(name). - WithDecay(decay.Config{Weight: weight, Threshold: threshold}), nil + WithDecay(decay.Config{Weight: decayWeight, Threshold: threshold, MaxIdleProbes: maxIdleProbes}). + WithInflight(inflightWeight), nil } func NewAvgTTFTScorer() *AvgTTFTScorer { return &AvgTTFTScorer{ - typedName: plugin.TypedName{Type: PluginType, Name: PluginType}, - decayCfg: decay.Config{Weight: defaultDecayWeight, Threshold: defaultStalenessThreshold}, + typedName: plugin.TypedName{Type: PluginType, Name: PluginType}, + decayCfg: decay.Config{Weight: defaultDecayWeight, Threshold: defaultStalenessThreshold}, + inflightWeight: defaultInflightWeight, } } @@ -107,6 +131,12 @@ func (s *AvgTTFTScorer) WithDecay(cfg decay.Config) *AvgTTFTScorer { return s } +// WithInflight overrides the inflight weight. +func (s *AvgTTFTScorer) WithInflight(weight float64) *AvgTTFTScorer { + s.inflightWeight = weight + return s +} + // Score returns score = (max - avgTTFT)/(max - min) per model, with optional staleness decay. func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requesthandling.InferenceRequest, models []datalayer.Model) map[datalayer.Model]float64 { now := time.Now() @@ -115,7 +145,7 @@ func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ maxTTFT := 0.0 for _, model := range models { - v := s.avgTTFT(model, now) + v := s.predictedTTFT(model, now) ttfts[model] = v if v > maxTTFT { maxTTFT = v @@ -136,15 +166,23 @@ func (s *AvgTTFTScorer) Score(ctx context.Context, _ *plugin.CycleState, _ *requ if debugLogger := log.FromContext(ctx).V(logutil.DEBUG); debugLogger.Enabled() { for _, model := range models { - debugLogger.Info("avg-ttft score", "model", model.GetName(), "avgTTFT", ttfts[model], "score", scores[model]) + debugLogger.Info("avg-ttft score", "model", model.GetName(), "predictedTTFT", ttfts[model], "score", scores[model]) } } return scores } -// avgTTFT returns the decay-adjusted AvgTTFT, or 0 if unobserved. -func (s *AvgTTFTScorer) avgTTFT(model datalayer.Model, now time.Time) float64 { +// predictedTTFT returns baseTTFT × (1 + inflightWeight × excess), where: +// - baseTTFT is the EMA with decay applied only when the model has at most MaxIdleProbes requests. +// - excess = max(requests − AvgRequests, 0): load above the model's normal operating point. +// +// The EMA already prices in latency at the typical load (AvgRequests), so penalising only +// the excess avoids double-counting steady-state concurrency while still reacting immediately +// to unexpected bursts. At cold start (AvgRequests==0) excess == requests, matching the +// previous behaviour. +// Returns 0 if no TTFT has been observed yet (model is treated as idle/new). +func (s *AvgTTFTScorer) predictedTTFT(model datalayer.Model, now time.Time) float64 { val, ok := model.GetAttributes().Get(requestmetadata.RequestMetadataAttributeKey) if !ok { return 0 @@ -160,5 +198,7 @@ func (s *AvgTTFTScorer) avgTTFT(model datalayer.Model, now time.Time) float64 { if rc.LastObservedAt > 0 { lastObservedAt = time.Unix(0, rc.LastObservedAt) } - return decay.Apply(rc.AvgTTFT, lastObservedAt, rc.Requests, now, s.decayCfg) + baseTTFT := decay.Apply(rc.AvgTTFT, lastObservedAt, rc.Requests, now, s.decayCfg) + excess := math.Max(float64(rc.Requests)-rc.AvgRequests, 0) + return baseTTFT * (1 + s.inflightWeight*excess) } diff --git a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go index f341433..6ef5ba9 100644 --- a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go +++ b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay.go @@ -24,15 +24,21 @@ import ( // Config controls the decay applied by Apply. type Config struct { - Weight float64 // [0,1]; 0 disables decay. - Threshold time.Duration // staleness reaches full strength after this elapsed time. + Weight float64 // [0,1]; 0 disables decay. + Threshold time.Duration // staleness reaches full strength after this elapsed time. + MaxIdleProbes int64 // decay continues while requests <= MaxIdleProbes (default 0: stop at first inflight). + // Raising MaxIdleProbes to 2–3 lets a small burst of parallel probe requests land before + // decay is suppressed, giving the EMA enough observations to recover quickly from staleness. } -// Apply returns ema * (1 - weight * staleness * idleness), where -// staleness = min(elapsed/threshold, 1) and idleness = 1/(1+requests). -// Returns ema unchanged when weight or threshold are non-positive, or lastObservedAt is zero. +// Apply returns ema * (1 - weight * staleness) while the model has at most MaxIdleProbes +// in-flight requests, so a recovered but idle (or lightly probed) model can regain a +// competitive score. Decay stops once requests > MaxIdleProbes, preventing the lock-in +// where a saturated model is mistakenly made cheaper by having its stale EMA decayed. +// Returns ema unchanged when weight or threshold are non-positive, lastObservedAt is zero, +// or requests > MaxIdleProbes. func Apply(ema float64, lastObservedAt time.Time, requests int64, now time.Time, cfg Config) float64 { - if cfg.Weight <= 0 || cfg.Threshold <= 0 || lastObservedAt.IsZero() { + if cfg.Weight <= 0 || cfg.Threshold <= 0 || lastObservedAt.IsZero() || requests > cfg.MaxIdleProbes { return ema } elapsed := now.Sub(lastObservedAt) @@ -40,7 +46,5 @@ func Apply(ema float64, lastObservedAt time.Time, requests int64, now time.Time, return ema } staleness := math.Min(float64(elapsed)/float64(cfg.Threshold), 1.0) - idleness := 1.0 / (1.0 + float64(requests)) - d := cfg.Weight * staleness * idleness - return ema * (1 - d) + return ema * (1 - cfg.Weight*staleness) } From 546e6f4a9991f68a0534deb036ccf2d764668f37 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Mon, 15 Jun 2026 12:07:26 +0300 Subject: [PATCH 8/8] Update tests. Signed-off-by: Mohammad --- .../datalayer/requestmetadata/plugin_test.go | 83 ++++++++++++++++++ .../scorer/avgttft/plugin_test.go | 86 +++++++++++++++++++ .../scorer/internal/decay/decay_test.go | 25 ++++-- 3 files changed, 189 insertions(+), 5 deletions(-) diff --git a/pkg/framework/plugins/datalayer/requestmetadata/plugin_test.go b/pkg/framework/plugins/datalayer/requestmetadata/plugin_test.go index 0a85638..2e65f7a 100644 --- a/pkg/framework/plugins/datalayer/requestmetadata/plugin_test.go +++ b/pkg/framework/plugins/datalayer/requestmetadata/plugin_test.go @@ -328,6 +328,89 @@ func TestAvgTPOTZeroCompletionTokensIgnored(t *testing.T) { } } +// TestAvgRequestsTracksInflight verifies that AvgRequests is sampled at flush time +// and blended into an EMA. With intervalDuration=0 every response triggers a flush. +// +// Sequence: two requests arrive (Requests=2), one response arrives (Requests=1 after decrement). +// Flush samples Requests=1 as the first observation → AvgRequests=1.0. +func TestAvgRequestsTracksInflight(t *testing.T) { + ext, ds := newRequestMetadataTest(t) + + if err := ext.Extract(context.Background(), []dlsrc.Event{ + makeRequestEvent("m1"), + makeRequestEvent("m1"), + makeResponseEvent(0), // Requests: 2→1 then flush → AvgRequests = 1.0 (first observation) + }); err != nil { + t.Fatalf("Extract failed: %v", err) + } + + rc := getRequestMetadata(t, ds, "m1") + if rc.AvgRequests != 1.0 { + t.Errorf("expected AvgRequests=1.0 (first observation), got %f", rc.AvgRequests) + } +} + +// TestAvgRequestsEMABlend verifies that successive flushes blend AvgRequests with the EMA. +// +// Flush 1: Requests=1 → AvgRequests = 1.0 (first observation, no blend) +// Flush 2: Requests=0 → AvgRequests = 0.1×0 + 0.9×1.0 = 0.9 +func TestAvgRequestsEMABlend(t *testing.T) { + ext, ds := newRequestMetadataTest(t) + + // First flush: one request in flight at flush time. + if err := ext.Extract(context.Background(), []dlsrc.Event{ + makeRequestEvent("m1"), + makeRequestEvent("m1"), + makeResponseEvent(0), // Requests: 2→1 at flush + }); err != nil { + t.Fatalf("first Extract failed: %v", err) + } + + // Second flush: no requests in flight at flush time. + if err := ext.Extract(context.Background(), []dlsrc.Event{ + makeResponseEvent(0), // Requests: 1→0 at flush + }); err != nil { + t.Fatalf("second Extract failed: %v", err) + } + + rc := getRequestMetadata(t, ds, "m1") + want := 0.1*0.0 + 0.9*1.0 // 0.9 + if rc.AvgRequests != want { + t.Errorf("expected AvgRequests=%f, got %f", want, rc.AvgRequests) + } +} + +// TestAvgRequestsUpdatesWithoutTTFT verifies that AvgRequests is updated on flush +// even when no TTFT observations arrived in the interval (AvgTTFT stays unchanged). +func TestAvgRequestsUpdatesWithoutTTFT(t *testing.T) { + ext, ds := newRequestMetadataTest(t) + + // Seed a TTFT value. + if err := ext.Extract(context.Background(), []dlsrc.Event{ + makeResponseEventWithTTFT(0, 500*time.Millisecond), + }); err != nil { + t.Fatalf("seed Extract failed: %v", err) + } + + // Two requests arrive; response decrements to 1 so flush sees Requests=1. + // No TTFT on this response — AvgTTFT must stay unchanged while AvgRequests updates. + if err := ext.Extract(context.Background(), []dlsrc.Event{ + makeRequestEvent("m1"), + makeRequestEvent("m1"), + makeResponseEvent(0), // no TTFT field; Requests: 2→1 at flush + }); err != nil { + t.Fatalf("second Extract failed: %v", err) + } + + rc := getRequestMetadata(t, ds, "m1") + if rc.AvgTTFT != 0.5 { + t.Errorf("expected AvgTTFT=0.5 (unchanged), got %f", rc.AvgTTFT) + } + if rc.AvgRequests == 0 { + t.Errorf("expected AvgRequests to be updated even without TTFT, got 0") + } +} + func TestExtractorFactoryWiresDatastore(t *testing.T) { ds := datastore.NewFakeDataStore() h := &fakeHandle{ds: ds} diff --git a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go index b530fd1..7bf3722 100644 --- a/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go +++ b/pkg/framework/plugins/modelselector/scorer/avgttft/plugin_test.go @@ -48,6 +48,12 @@ func modelWithMetrics(name string, requests int64, lastObservedAt time.Time) fwd return model } +func modelWithFullMetrics(name string, m requestmetadata.ModelMetrics) fwdatalayer.Model { + model := fwdatalayer.NewModel(name) + model.GetAttributes().Put(requestmetadata.RequestMetadataAttributeKey, m) + return model +} + func TestAvgTTFTScorer(t *testing.T) { scorer := NewAvgTTFTScorer() @@ -133,6 +139,86 @@ func TestStalenessDecay(t *testing.T) { } +// TestExcessPenaltyFlipsWinner verifies that a burst above AvgRequests raises predictedTTFT +// enough to flip the winner. fast has lower AvgTTFT but carries excess load; slow is idle. +// +// fast: AvgTTFT=0.2s, Requests=5, AvgRequests=0 → excess=5 → predicted = 0.2×(1+1×5) = 1.2s +// slow: AvgTTFT=1.0s, Requests=0, AvgRequests=0 → excess=0 → predicted = 1.0s +// 1.2 > 1.0 → slow wins despite higher base TTFT. +func TestExcessPenaltyFlipsWinner(t *testing.T) { + scorer := NewAvgTTFTScorer().WithInflight(1.0) + fast := modelWithFullMetrics("fast", requestmetadata.ModelMetrics{ + AvgTTFT: 0.2, Requests: 5, AvgRequests: 0, + }) + slow := modelWithFullMetrics("slow", requestmetadata.ModelMetrics{ + AvgTTFT: 1.0, Requests: 0, AvgRequests: 0, + }) + scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{fast, slow}) + if scores[fast] != 0.0 { + t.Errorf("fast (overloaded): expected score 0.0, got %f", scores[fast]) + } + if scores[slow] != 1.0 { + t.Errorf("slow (idle): expected score 1.0, got %f", scores[slow]) + } +} + +// TestSteadyStateNoExcessPenalty verifies that excess=0 when requests==AvgRequests, +// so the fast model still wins at its normal operating load. +// +// loaded: AvgTTFT=0.2s, Requests=5, AvgRequests=5 → excess=0 → predicted = 0.2s +// idle: AvgTTFT=1.0s, Requests=0, AvgRequests=0 → excess=0 → predicted = 1.0s +func TestSteadyStateNoExcessPenalty(t *testing.T) { + scorer := NewAvgTTFTScorer().WithInflight(1.0) + loaded := modelWithFullMetrics("loaded", requestmetadata.ModelMetrics{ + AvgTTFT: 0.2, Requests: 5, AvgRequests: 5, + }) + idle := modelWithFullMetrics("idle", requestmetadata.ModelMetrics{ + AvgTTFT: 1.0, Requests: 0, AvgRequests: 0, + }) + scores := scorer.Score(context.Background(), nil, nil, []fwdatalayer.Model{loaded, idle}) + if scores[loaded] != 1.0 { + t.Errorf("loaded (no excess): expected score 1.0, got %f", scores[loaded]) + } + if scores[idle] != 0.0 { + t.Errorf("idle: expected score 0.0, got %f", scores[idle]) + } +} + +// TestMaxIdleProbesAllowsDecay verifies that raising MaxIdleProbes permits staleness +// decay while the model carries a small probe burst. With MaxIdleProbes=2 and requests=2 +// the decay still applies; with MaxIdleProbes=0 it does not. +func TestMaxIdleProbesAllowsDecay(t *testing.T) { + now := time.Now() + staleAt := now.Add(-60 * time.Second) // well beyond any threshold + + // Scorer A: MaxIdleProbes=0 (default) — busy guard stops decay at requests=2. + scorerNoProbes := NewAvgTTFTScorer(). + WithDecay(decay.Config{Weight: 1.0, Threshold: 5 * time.Second, MaxIdleProbes: 0}). + WithInflight(0) + + // Scorer B: MaxIdleProbes=2 — decay allowed while requests <= 2. + scorerWithProbes := NewAvgTTFTScorer(). + WithDecay(decay.Config{Weight: 1.0, Threshold: 5 * time.Second, MaxIdleProbes: 2}). + WithInflight(0) + + // Model with requests=2 and stale observations. + stale := modelWithMetrics("stale", 2, staleAt) + other := modelWithAvgTTFT("other", 0.5) + models := []fwdatalayer.Model{stale, other} + + // Without MaxIdleProbes: requests=2 > 0 → decay suppressed → stale keeps its TTFT → scores 0. + scoresNoProbes := scorerNoProbes.Score(context.Background(), nil, nil, models) + if scoresNoProbes[stale] != 0.0 { + t.Errorf("MaxIdleProbes=0: stale model expected score 0.0 (decay suppressed), got %f", scoresNoProbes[stale]) + } + + // With MaxIdleProbes=2: requests=2 <= 2 → decay applies → TTFT decays to 0 → scores 1.0. + scoresWithProbes := scorerWithProbes.Score(context.Background(), nil, nil, models) + if scoresWithProbes[stale] != 1.0 { + t.Errorf("MaxIdleProbes=2: stale model expected score 1.0 (decay applied), got %f", scoresWithProbes[stale]) + } +} + // TestDecayDisabled verifies DecayWeight=0 ignores staleness entirely. func TestDecayDisabled(t *testing.T) { scorer := NewAvgTTFTScorer().WithDecay(decay.Config{Weight: 0, Threshold: 30 * time.Second}) diff --git a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go index c00d7ce..78991dc 100644 --- a/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go +++ b/pkg/framework/plugins/modelselector/scorer/internal/decay/decay_test.go @@ -76,13 +76,28 @@ func TestApply(t *testing.T) { want: 0.0, }, { - name: "fully stale busy — decay suppressed by load", + name: "busy model — decay suppressed when requests > MaxIdleProbes", ema: 1.0, lastObservedAt: now.Add(-60 * time.Second), - requests: 9, - cfg: Config{Weight: 1, Threshold: threshold}, - // idleness = 1/10 = 0.1, staleness = 1.0, weight = 1 → decay = 0.1 → result = 0.9 - want: 0.9, + requests: 1, + cfg: Config{Weight: 1, Threshold: threshold, MaxIdleProbes: 0}, + want: 1.0, + }, + { + name: "MaxIdleProbes — decay applies when requests <= MaxIdleProbes", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 2, + cfg: Config{Weight: 1, Threshold: threshold, MaxIdleProbes: 2}, + want: 0.0, // fully stale, decay = 1.0 → result = 0.0 + }, + { + name: "MaxIdleProbes — decay suppressed when requests exceed limit", + ema: 1.0, + lastObservedAt: now.Add(-60 * time.Second), + requests: 3, + cfg: Config{Weight: 1, Threshold: threshold, MaxIdleProbes: 2}, + want: 1.0, // requests=3 > MaxIdleProbes=2 → no decay }, { name: "half threshold idle — half decay",