From 084f24be5bca634a0fe5c92f93fbf5f5b04e616a Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:07:34 -0700 Subject: [PATCH 01/15] Call FrontEnd for the worker id --- cmd/epp/main.go | 12 +- epp-config-dynamo.yaml | 23 ++ pkg/bbr/handlers/request.go | 125 +++++-- pkg/bbr/handlers/server.go | 3 +- .../plugins/dynamo_inject_workerid/plugin.go | 62 ++++ .../plugins/dynamo_kv_scorer/plugin.go | 319 ++++++++++++++++++ 6 files changed, 516 insertions(+), 28 deletions(-) create mode 100644 epp-config-dynamo.yaml create mode 100644 pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go create mode 100644 pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go diff --git a/cmd/epp/main.go b/cmd/epp/main.go index b5e06177bc..35e97d2799 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -22,13 +22,19 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner" + eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + + // Dynamo plugins + dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid" + dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer" ) func main() { - // Register all known plugin factories + // Register built-in plugins. runner.RegisterAllPlugins() - // For adding out-of-tree plugins to the plugins registry, use the following: - // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) + + eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) + eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory) if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil { os.Exit(1) diff --git a/epp-config-dynamo.yaml b/epp-config-dynamo.yaml new file mode 100644 index 0000000000..959c2ea497 --- /dev/null +++ b/epp-config-dynamo.yaml @@ -0,0 +1,23 @@ +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: + # Required: tells EPP which profile to use (even if you only have one) + - type: single-profile-handler + + # Picker: chooses the final endpoint after scoring + - name: picker + type: max-score-picker + - name: dyn-pre + type: dynamo-inject-workerid + parameters: {} + - name: dyn-kv + type: kv-aware-scorer + parameters: + frontendURL: http://127.0.0.1:8000/v1/chat/completions + timeoutMS: 10000 +schedulingProfiles: + - name: default + plugins: + - pluginRef: dyn-kv + weight: 1 + - pluginRef: picker diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 32fffc0217..b9cd35fce3 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,25 +1,10 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package handlers import ( "context" "encoding/json" "fmt" + "strings" // <— needed to normalize header keys/trim basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -29,13 +14,31 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -const modelHeader = "X-Gateway-Model-Name" +const ( + modelHeader = "X-Gateway-Model-Name" + workerIDHeader = "x-worker-instance-id" + injectHintHeader = "x-epp-inject-nvext-worker-instance-id" +) // HandleRequestBody handles request bodies. func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) { logger := log.FromContext(ctx) var ret []*eppb.ProcessingResponse + // If we captured a worker id hint in the headers phase, inject it into body JSON: + // nvext.backend_instance_id = + if wid := strings.TrimSpace(s.workerIDHint); wid != "" { + // ensure nvext is a map[string]any + if nv, ok := data["nvext"]; !ok || nv == nil { + data["nvext"] = map[string]any{"backend_instance_id": wid} + } else if m, ok := nv.(map[string]any); ok { + m["backend_instance_id"] = wid + } else { + // if nvext was some other type, replace with a clean map + data["nvext"] = map[string]any{"backend_instance_id": wid} + } + } + requestBodyBytes, err := json.Marshal(data) if err != nil { return nil, err @@ -45,7 +48,9 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] if !ok { metrics.RecordModelNotInBodyCounter() logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter") + if s.streaming { + // still stream the possibly mutated body ret = append(ret, &eppb.ProcessingResponse{ Response: &eppb.ProcessingResponse_RequestHeaders{ RequestHeaders: &eppb.HeadersResponse{}, @@ -53,14 +58,24 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] }) ret = addStreamedBodyResponse(ret, requestBodyBytes) return ret, nil - } else { - ret = append(ret, &eppb.ProcessingResponse{ + } + + // non-streaming: return a body response with the (possibly) mutated body + return []*eppb.ProcessingResponse{ + { Response: &eppb.ProcessingResponse_RequestBody{ - RequestBody: &eppb.BodyResponse{}, + RequestBody: &eppb.BodyResponse{ + Response: &eppb.CommonResponse{ + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_Body{ + Body: requestBodyBytes, + }, + }, + }, + }, }, - }) - } - return ret, nil + }, + }, nil } modelStr, ok := modelVal.(string) @@ -73,6 +88,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] metrics.RecordSuccessCounter() if s.streaming { + // set the model header, then stream the (possibly) mutated body ret = append(ret, &eppb.ProcessingResponse{ Response: &eppb.ProcessingResponse_RequestHeaders{ RequestHeaders: &eppb.HeadersResponse{ @@ -86,22 +102,47 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] RawValue: []byte(modelStr), }, }, + // also keep the worker id header if we have one + func() *basepb.HeaderValueOption { + if strings.TrimSpace(s.workerIDHint) == "" { + return nil + } + return &basepb.HeaderValueOption{ + Header: &basepb.HeaderValue{ + Key: workerIDHeader, + RawValue: []byte(s.workerIDHint), + }, + } + }(), }, }, }, }, }, }) + + // prune nil entries if worker id not present + hm := ret[len(ret)-1].GetRequestHeaders().GetResponse().GetHeaderMutation() + if hm != nil && hm.SetHeaders != nil { + out := hm.SetHeaders[:0] + for _, h := range hm.SetHeaders { + if h != nil { + out = append(out, h) + } + } + hm.SetHeaders = out + } + ret = addStreamedBodyResponse(ret, requestBodyBytes) return ret, nil } + // Non-streaming: set model header and replace the body with our mutated JSON return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_RequestBody{ RequestBody: &eppb.BodyResponse{ Response: &eppb.CommonResponse{ - // Necessary so that the new headers are used in the routing decision. ClearRouteCache: true, HeaderMutation: &eppb.HeaderMutation{ SetHeaders: []*basepb.HeaderValueOption{ @@ -111,6 +152,22 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] RawValue: []byte(modelStr), }, }, + func() *basepb.HeaderValueOption { + if strings.TrimSpace(s.workerIDHint) == "" { + return nil + } + return &basepb.HeaderValueOption{ + Header: &basepb.HeaderValue{ + Key: workerIDHeader, + RawValue: []byte(s.workerIDHint), + }, + } + }(), + }, + }, + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_Body{ + Body: requestBodyBytes, }, }, }, @@ -141,6 +198,26 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy // HandleRequestHeaders handles request headers. func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) { + // Look for our hint header and/or direct worker id header. + s.workerIDHint = "" // reset per request + if m := headers.GetHeaders(); m != nil { + for _, h := range m.GetHeaders() { + k := strings.ToLower(h.GetKey()) + if k == injectHintHeader || k == workerIDHeader { + if rv := h.GetRawValue(); len(rv) > 0 { + s.workerIDHint = strings.TrimSpace(string(rv)) + } else { + s.workerIDHint = strings.TrimSpace(h.GetValue()) + } + // prefer the inject hint if both are present; break on the hint + if k == injectHintHeader { + break + } + } + } + } + + // No header mutations needed here; body phase will do the JSON injection. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/bbr/handlers/server.go b/pkg/bbr/handlers/server.go index a5803806bc..e8c109282c 100644 --- a/pkg/bbr/handlers/server.go +++ b/pkg/bbr/handlers/server.go @@ -38,7 +38,8 @@ func NewServer(streaming bool) *Server { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type Server struct { - streaming bool + streaming bool + workerIDHint string } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { diff --git a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go new file mode 100644 index 0000000000..5c7d4438c3 --- /dev/null +++ b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go @@ -0,0 +1,62 @@ +package dynamo_inject_workerid + +import ( + "context" + "encoding/json" + "strings" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + typeString = "dynamo-inject-workerid" + pluginName = "dynamo-inject-workerid" + WorkerIDHeader = "x-worker-instance-id" + injectHintHeader = "x-epp-inject-nvext-worker-instance-id" +) + +var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil) +var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil) + +type InjectWorkerIDPreRequest struct { + typedName plugins.TypedName +} + +func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest { + return &InjectWorkerIDPreRequest{ + typedName: plugins.TypedName{Type: typeString, Name: pluginName}, + } +} + +func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest { + p.typedName.Name = name + return p +} + +func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewInjectWorkerIDPreRequest().WithName(name), nil +} + +func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName } + +func (p *InjectWorkerIDPreRequest) PreRequest( + _ context.Context, + req *schedtypes.LLMRequest, + _ *schedtypes.SchedulingResult, + _ int, +) { + if req == nil { + return + } + if req.Headers == nil { + req.Headers = map[string]string{} + } + wid := strings.TrimSpace(req.Headers[WorkerIDHeader]) + if wid == "" { + return + } + req.Headers[WorkerIDHeader] = wid + req.Headers[injectHintHeader] = wid +} diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go new file mode 100644 index 0000000000..9ad723978e --- /dev/null +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -0,0 +1,319 @@ +package dynamo_kv_scorer + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + log "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + PluginName = "dynamo-kv-scorer" + KVAwareScorerType = "kv-aware-scorer" + StateKeyWorkerInstanceID = schedtypes.StateKey("dynamo/worker-instance-id") + WorkerIDHeader = "x-worker-instance-id" +) + +type params struct { + FrontendURL string `json:"frontendURL"` + TimeoutMS int `json:"timeoutMS"` +} + +// tiny wrapper so we can store a string in CycleState +type stateString string + +func (s stateString) Clone() schedtypes.StateData { return s } + +type KVAwareScorer struct { + typedName plugins.TypedName + feURL string + feTimeout time.Duration +} + +// compile-time assertions +var _ plugins.Plugin = (*KVAwareScorer)(nil) +var _ framework.Scorer = (*KVAwareScorer)(nil) + +func NewKVAwareScorer() *KVAwareScorer { + return &KVAwareScorer{ + typedName: plugins.TypedName{Type: KVAwareScorerType, Name: PluginName}, + feURL: "http://127.0.0.1:8000/v1/chat/completions", + feTimeout: 10 * time.Second, + } +} + +func (k *KVAwareScorer) WithName(name string) *KVAwareScorer { k.typedName.Name = name; return k } +func (k *KVAwareScorer) WithFrontend(url string, timeout time.Duration) *KVAwareScorer { + if url != "" { + k.feURL = url + } + if timeout > 0 { + k.feTimeout = timeout + } + return k +} + +func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + p := params{} + _ = json.Unmarshal(raw, &p) + timeout := time.Duration(p.TimeoutMS) * time.Millisecond + if timeout <= 0 { + timeout = 10 * time.Second + } + return NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout), nil +} + +func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName } + +func (k *KVAwareScorer) Score( + ctx context.Context, + cycle *schedtypes.CycleState, + req *schedtypes.LLMRequest, + pods []schedtypes.Pod, +) map[schedtypes.Pod]float64 { + logger := log.FromContext(ctx) + + workerID, err := k.callFrontEndForWorker(ctx, req) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "FrontEnd call failed; proceeding without worker id") + } else if workerID != "" { + cycle.Write(StateKeyWorkerInstanceID, stateString(workerID)) + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[WorkerIDHeader] = workerID + } + + // neutral/uniform scores – only your scorer runs in the profile, so this “wins” + out := make(map[schedtypes.Pod]float64, len(pods)) + for _, p := range pods { + out[p] = 1.0 + } + return out +} + +// Call the Dynamo FrontEnd and extract worker_instance_id via SSE. +func (k *KVAwareScorer) callFrontEndForWorker( + ctx context.Context, + req *schedtypes.LLMRequest, +) (string, error) { + logger := log.FromContext(ctx) + + feBody := buildFrontEndBodyFromLLMRequest(req) + payload, err := json.Marshal(feBody) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd marshal failed") + return "", fmt.Errorf("marshal FrontEnd body: %w", err) + } + + reqCtx, cancel := context.WithTimeout(ctx, k.feTimeout) + defer cancel() + + httpReq, err := http.NewRequestWithContext(reqCtx, http.MethodPost, k.feURL, bytes.NewReader(payload)) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd request build failed") + return "", fmt.Errorf("build FrontEnd request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Accept", "text/event-stream") + + client := &http.Client{Timeout: 0} + resp, err := client.Do(httpReq) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd POST failed") + return "", fmt.Errorf("FrontEnd POST failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + errBody, _ := io.ReadAll(resp.Body) + logger.V(logutil.DEFAULT).Error(nil, "Dynamo FrontEnd non-2xx response", + "status_code", resp.StatusCode, "response_body", string(errBody)) + return "", fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) + } + + ct := strings.ToLower(resp.Header.Get("Content-Type")) + if !strings.Contains(ct, "text/event-stream") { + logger.V(logutil.DEFAULT).Error(nil, "Unexpected non-SSE response") + return "", fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) + } + + // Parse SSE: expect `event: worker_instance_id`, a quoted id in a comment or data, and `data: [DONE]` + reader := bufio.NewReader(resp.Body) + workerID, perr := parseWorkerIDFromSSE(ctx, reader) + if perr != nil { + return "", perr + } + return workerID, nil +} + +// Build the exact body we send to the FrontEnd, only from LLMRequest (no header merging). +func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any { + feBody := make(map[string]any, 8) + + // We call /v1/chat/completions so must provide messages + userText := "" + if req != nil && strings.TrimSpace(req.Prompt) != "" { + userText = req.Prompt + } + feBody["messages"] = []map[string]any{ + {"role": "user", "content": userText}, + } + + if req != nil && strings.TrimSpace(req.TargetModel) != "" { + feBody["model"] = req.TargetModel + } + + // Force SSE so we can parse worker_instance_id + feBody["stream"] = true + + feBody["max_tokens"] = 1 + feBody["temperature"] = 0.0 + + // Ask the Dynamo to include worker id + feBody["nvext"] = map[string]any{ + "annotations": []string{"query_instance_id"}, + } + + return feBody +} + +// parseWorkerIDFromSSE scans an SSE stream for a worker_instance_id. +// Expected pattern: +// +// event: worker_instance_id +// : "8303679623149182543" +// data: [DONE] +// +// Also supports JSON in data lines with either top-level worker_instance_id +// or annotations.worker_instance_id. +func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, error) { + logger := log.FromContext(ctx) + + var ( + eventName string + dataBuf strings.Builder // accumulates "data:" lines for one event + commentBuf strings.Builder // accumulates ":" comment lines + ) + + flushEvent := func() (string, bool, error) { + data := strings.TrimSpace(dataBuf.String()) + comment := strings.TrimSpace(commentBuf.String()) + dataBuf.Reset() + commentBuf.Reset() + + // [DONE] ends the stream + if data == "[DONE]" || comment == "[DONE]" { + logger.V(logutil.DEFAULT).Info("SSE stream DONE") + return "", true, nil + } + + // Prefer the named event + if eventName == "worker_instance_id" { + candidate := data + if candidate == "" { + candidate = comment + } + if candidate != "" { + // Try JSON string + var s string + if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { + logger.V(logutil.VERBOSE).Info("worker_instance_id extracted from named event", "worker_instance_id", s) + return s, false, nil + } + // Fallback: strip quotes + clean := strings.Trim(candidate, "\"") + if clean != "" && clean != "[DONE]" { + logger.V(logutil.DEFAULT).Info("worker_instance_id extracted (raw) from named event", "worker_instance_id", clean) + return clean, false, nil + } + } + } + + // Generic JSON in data: + if data != "" { + var msg map[string]any + if json.Unmarshal([]byte(data), &msg) == nil { + if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { + logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE payload root", "worker_instance_id", wid) + return wid, false, nil + } + if ann, ok := msg["annotations"].(map[string]any); ok { + if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { + logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE annotations", "worker_instance_id", wid) + return wid, false, nil + } + } + } + } + return "", false, nil + } + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + // Flush any pending event on EOF + if wid, done, _ := flushEvent(); wid != "" { + return wid, nil + } else if done { + return "", fmt.Errorf("worker_instance_id not found before DONE") + } + logger.V(logutil.DEFAULT).Error(nil, "EOF before worker_instance_id") + return "", fmt.Errorf("worker_instance_id not found in SSE stream (EOF)") + } + logger.V(logutil.DEFAULT).Error(err, "SSE read error") + return "", fmt.Errorf("sse read error: %w", err) + } + + l := strings.TrimRight(line, "\r\n") + if l == "" { + // End of current event; process it + if wid, done, _ := flushEvent(); wid != "" { + return wid, nil + } else if done { + return "", fmt.Errorf("worker_instance_id not found before DONE") + } + eventName = "" // reset for next event + continue + } + + // Comment line + if strings.HasPrefix(l, ":") { + commentLine := strings.TrimSpace(l[1:]) + if commentBuf.Len() > 0 { + commentBuf.WriteByte('\n') + } + commentBuf.WriteString(commentLine) + continue + } + + // "field: value" + if idx := strings.IndexByte(l, ':'); idx != -1 { + field := l[:idx] + val := strings.TrimSpace(l[idx+1:]) + switch field { + case "event": + eventName = val + case "data": + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.WriteString(val) + default: + // ignore id, retry, etc. + } + } + } +} From c2f8a40817b6248b66cab7b21e3a3639e54c34af Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:23:03 -0700 Subject: [PATCH 02/15] fix comments and 1 const --- cmd/epp/main.go | 2 +- pkg/bbr/handlers/request.go | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 35e97d2799..3db34225e2 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -30,7 +30,7 @@ import ( ) func main() { - // Register built-in plugins. + // Register all known plugin factories runner.RegisterAllPlugins() eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index b9cd35fce3..d5a19ed2bf 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,3 +1,16 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package handlers import ( @@ -14,8 +27,10 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) +const modelHeader = "X-Gateway-Model-Name" + +// Dynamo-related const ( - modelHeader = "X-Gateway-Model-Name" workerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" ) @@ -143,6 +158,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] Response: &eppb.ProcessingResponse_RequestBody{ RequestBody: &eppb.BodyResponse{ Response: &eppb.CommonResponse{ + // Necessary so that the new headers are used in the routing decision. ClearRouteCache: true, HeaderMutation: &eppb.HeaderMutation{ SetHeaders: []*basepb.HeaderValueOption{ From ab5b5fd43f983e83b94af3c394f41058001e918f Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:26:24 -0700 Subject: [PATCH 03/15] cleanup comments --- cmd/epp/main.go | 2 ++ pkg/bbr/handlers/request.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 3db34225e2..8592735d2c 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -32,6 +32,8 @@ import ( func main() { // Register all known plugin factories runner.RegisterAllPlugins() + // For adding out-of-tree plugins to the plugins registry, use the following: + // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index d5a19ed2bf..70a4b04d73 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,9 +1,12 @@ /* Copyright 2025 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +20,7 @@ import ( "context" "encoding/json" "fmt" - "strings" // <— needed to normalize header keys/trim + "strings" basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -63,7 +66,6 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] if !ok { metrics.RecordModelNotInBodyCounter() logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter") - if s.streaming { // still stream the possibly mutated body ret = append(ret, &eppb.ProcessingResponse{ From 513598468ec14d1c0d1bbdfe2f48e45ede5e3330 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 22 Aug 2025 13:13:04 -0700 Subject: [PATCH 04/15] Add managing token_data --- pkg/bbr/handlers/request.go | 40 +++- pkg/bbr/handlers/server.go | 5 +- .../plugins/dynamo_inject_workerid/plugin.go | 7 + .../plugins/dynamo_kv_scorer/plugin.go | 172 ++++++++++++++---- 4 files changed, 184 insertions(+), 40 deletions(-) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 70a4b04d73..573276e5fc 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -18,6 +18,7 @@ package handlers import ( "context" + "encoding/base64" "encoding/json" "fmt" "strings" @@ -36,6 +37,7 @@ const modelHeader = "X-Gateway-Model-Name" const ( workerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" + tokenDataHeader = "x-epp-inject-nvext-token-data" ) // HandleRequestBody handles request bodies. @@ -57,6 +59,23 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] } } + // If we captured token_data in headers, decode and inject as nvext.token_data + if td := strings.TrimSpace(s.tokenDataHint); td != "" { + // header value is base64(JSON array) + if raw, err := base64.StdEncoding.DecodeString(td); err == nil { + var arr []int64 + if err := json.Unmarshal(raw, &arr); err == nil && len(arr) > 0 { + // ensure nvext map exists + nv, ok := data["nvext"].(map[string]any) + if !ok || nv == nil { + nv = map[string]any{} + data["nvext"] = nv + } + nv["token_data"] = arr + } + } + } + requestBodyBytes, err := json.Marshal(data) if err != nil { return nil, err @@ -216,20 +235,29 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy // HandleRequestHeaders handles request headers. func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) { - // Look for our hint header and/or direct worker id header. - s.workerIDHint = "" // reset per request + // reset per-request + s.workerIDHint = "" + s.tokenDataHint = "" + if m := headers.GetHeaders(); m != nil { for _, h := range m.GetHeaders() { k := strings.ToLower(h.GetKey()) - if k == injectHintHeader || k == workerIDHeader { + + switch k { + case injectHintHeader, workerIDHeader: + // Prefer raw bytes if present; otherwise use value (Envoy can deliver either) if rv := h.GetRawValue(); len(rv) > 0 { s.workerIDHint = strings.TrimSpace(string(rv)) } else { s.workerIDHint = strings.TrimSpace(h.GetValue()) } - // prefer the inject hint if both are present; break on the hint - if k == injectHintHeader { - break + // NOTE: don't return; we still want to scan for tokenDataHeader + + case tokenDataHeader: + if rv := h.GetRawValue(); len(rv) > 0 { + s.tokenDataHint = strings.TrimSpace(string(rv)) + } else { + s.tokenDataHint = strings.TrimSpace(h.GetValue()) } } } diff --git a/pkg/bbr/handlers/server.go b/pkg/bbr/handlers/server.go index e8c109282c..eb2893fdc6 100644 --- a/pkg/bbr/handlers/server.go +++ b/pkg/bbr/handlers/server.go @@ -38,8 +38,9 @@ func NewServer(streaming bool) *Server { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type Server struct { - streaming bool - workerIDHint string + streaming bool + workerIDHint string + tokenDataHint string } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { diff --git a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go index 5c7d4438c3..b6708fa4d4 100644 --- a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go +++ b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go @@ -15,6 +15,7 @@ const ( pluginName = "dynamo-inject-workerid" WorkerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" + TokenDataHeader = "x-epp-inject-nvext-token-data" ) var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil) @@ -59,4 +60,10 @@ func (p *InjectWorkerIDPreRequest) PreRequest( } req.Headers[WorkerIDHeader] = wid req.Headers[injectHintHeader] = wid + + // Pass through token-data header if scorer set it + if td := strings.TrimSpace(req.Headers[TokenDataHeader]); td != "" { + req.Headers[TokenDataHeader] = td + } + } diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 9ad723978e..7d60e07c37 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -23,6 +24,7 @@ const ( KVAwareScorerType = "kv-aware-scorer" StateKeyWorkerInstanceID = schedtypes.StateKey("dynamo/worker-instance-id") WorkerIDHeader = "x-worker-instance-id" + TokenDataHeader = "x-epp-inject-nvext-token-data" ) type params struct { @@ -84,7 +86,7 @@ func (k *KVAwareScorer) Score( ) map[schedtypes.Pod]float64 { logger := log.FromContext(ctx) - workerID, err := k.callFrontEndForWorker(ctx, req) + workerID, tokenData, err := k.callFrontEndForWorker(ctx, req) if err != nil { logger.V(logutil.DEFAULT).Error(err, "FrontEnd call failed; proceeding without worker id") } else if workerID != "" { @@ -93,6 +95,12 @@ func (k *KVAwareScorer) Score( req.Headers = map[string]string{} } req.Headers[WorkerIDHeader] = workerID + if len(tokenData) > 0 { + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[TokenDataHeader] = encodeTokenData(tokenData) + } } // neutral/uniform scores – only your scorer runs in the profile, so this “wins” @@ -107,14 +115,14 @@ func (k *KVAwareScorer) Score( func (k *KVAwareScorer) callFrontEndForWorker( ctx context.Context, req *schedtypes.LLMRequest, -) (string, error) { +) (string, []int64, error) { logger := log.FromContext(ctx) feBody := buildFrontEndBodyFromLLMRequest(req) payload, err := json.Marshal(feBody) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd marshal failed") - return "", fmt.Errorf("marshal FrontEnd body: %w", err) + return "", nil, fmt.Errorf("marshal FrontEnd body: %w", err) } reqCtx, cancel := context.WithTimeout(ctx, k.feTimeout) @@ -123,7 +131,7 @@ func (k *KVAwareScorer) callFrontEndForWorker( httpReq, err := http.NewRequestWithContext(reqCtx, http.MethodPost, k.feURL, bytes.NewReader(payload)) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd request build failed") - return "", fmt.Errorf("build FrontEnd request: %w", err) + return "", nil, fmt.Errorf("build FrontEnd request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") @@ -132,7 +140,7 @@ func (k *KVAwareScorer) callFrontEndForWorker( resp, err := client.Do(httpReq) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd POST failed") - return "", fmt.Errorf("FrontEnd POST failed: %w", err) + return "", nil, fmt.Errorf("FrontEnd POST failed: %w", err) } defer resp.Body.Close() @@ -140,22 +148,22 @@ func (k *KVAwareScorer) callFrontEndForWorker( errBody, _ := io.ReadAll(resp.Body) logger.V(logutil.DEFAULT).Error(nil, "Dynamo FrontEnd non-2xx response", "status_code", resp.StatusCode, "response_body", string(errBody)) - return "", fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) + return "", nil, fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) } ct := strings.ToLower(resp.Header.Get("Content-Type")) if !strings.Contains(ct, "text/event-stream") { logger.V(logutil.DEFAULT).Error(nil, "Unexpected non-SSE response") - return "", fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) + return "", nil, fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) } // Parse SSE: expect `event: worker_instance_id`, a quoted id in a comment or data, and `data: [DONE]` reader := bufio.NewReader(resp.Body) - workerID, perr := parseWorkerIDFromSSE(ctx, reader) + workerID, tokenData, perr := parseSelectionFromSSE(ctx, reader) if perr != nil { - return "", perr + return "", nil, perr } - return workerID, nil + return workerID, tokenData, nil } // Build the exact body we send to the FrontEnd, only from LLMRequest (no header merging). @@ -189,7 +197,7 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any return feBody } -// parseWorkerIDFromSSE scans an SSE stream for a worker_instance_id. +// This function scans an SSE stream for a worker_instance_id and token_data. // Expected pattern: // // event: worker_instance_id @@ -198,16 +206,21 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any // // Also supports JSON in data lines with either top-level worker_instance_id // or annotations.worker_instance_id. -func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, error) { +func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) { logger := log.FromContext(ctx) var ( eventName string dataBuf strings.Builder // accumulates "data:" lines for one event commentBuf strings.Builder // accumulates ":" comment lines + gotWID string + gotTD []int64 ) - flushEvent := func() (string, bool, error) { + // collect the exact SSE bytes for debugging + var rawBuf strings.Builder + + flushEvent := func() (bool, error) { data := strings.TrimSpace(dataBuf.String()) comment := strings.TrimSpace(commentBuf.String()) dataBuf.Reset() @@ -216,7 +229,11 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er // [DONE] ends the stream if data == "[DONE]" || comment == "[DONE]" { logger.V(logutil.DEFAULT).Info("SSE stream DONE") - return "", true, nil + logger.V(logutil.DEFAULT).Info("SSE raw stream", "raw", rawBuf.String()) + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") + } + return true, nil } // Prefer the named event @@ -230,60 +247,93 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er var s string if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { logger.V(logutil.VERBOSE).Info("worker_instance_id extracted from named event", "worker_instance_id", s) - return s, false, nil + gotWID = s + return false, nil } // Fallback: strip quotes clean := strings.Trim(candidate, "\"") if clean != "" && clean != "[DONE]" { logger.V(logutil.DEFAULT).Info("worker_instance_id extracted (raw) from named event", "worker_instance_id", clean) - return clean, false, nil + gotWID = clean + return false, nil } } } + if eventName == "token_data" { + candidate := data + if candidate == "" { + candidate = comment + } + if candidate != "" { + if arr := toInt64SliceJSON(candidate); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data extracted from named event", "count", len(arr)) + return false, nil + } + } + } // Generic JSON in data: if data != "" { var msg map[string]any if json.Unmarshal([]byte(data), &msg) == nil { if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE payload root", "worker_instance_id", wid) - return wid, false, nil + gotWID = wid } if ann, ok := msg["annotations"].(map[string]any); ok { if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE annotations", "worker_instance_id", wid) - return wid, false, nil + gotWID = wid + } + } + if td, ok := msg["token_data"]; ok { + if arr := toInt64Slice(td); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data found in SSE payload root", "count", len(arr)) + } + } else if nv, ok := msg["nvext"].(map[string]any); ok { + if td, ok := nv["token_data"]; ok { + if arr := toInt64Slice(td); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data found in SSE nvext", "count", len(arr)) + } } } } } - return "", false, nil + return false, nil } for { line, err := reader.ReadString('\n') + // capture the raw stream as-is for debugging + rawBuf.WriteString(line) if err != nil { if err == io.EOF { - // Flush any pending event on EOF - if wid, done, _ := flushEvent(); wid != "" { - return wid, nil - } else if done { - return "", fmt.Errorf("worker_instance_id not found before DONE") + _, _ = flushEvent() + logger.V(logutil.DEFAULT).Info("SSE raw stream (EOF)", "raw", rawBuf.String()) + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("EOF: worker_instance_id present, token_data missing") } - logger.V(logutil.DEFAULT).Error(nil, "EOF before worker_instance_id") - return "", fmt.Errorf("worker_instance_id not found in SSE stream (EOF)") + if gotWID != "" || len(gotTD) > 0 { + return gotWID, gotTD, nil + } + logger.V(logutil.DEFAULT).Error(nil, "EOF before selection fields present") + return "", nil, fmt.Errorf("selection not found in SSE stream (EOF)") } logger.V(logutil.DEFAULT).Error(err, "SSE read error") - return "", fmt.Errorf("sse read error: %w", err) + return "", nil, fmt.Errorf("sse read error: %w", err) } l := strings.TrimRight(line, "\r\n") if l == "" { - // End of current event; process it - if wid, done, _ := flushEvent(); wid != "" { - return wid, nil - } else if done { - return "", fmt.Errorf("worker_instance_id not found before DONE") + // End of current event. + if done, _ := flushEvent(); done { + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") + } + return gotWID, gotTD, nil } eventName = "" // reset for next event continue @@ -317,3 +367,61 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er } } } + +// encodeTokenData turns []int64 into base64(JSON array) for a safe header value. +func encodeTokenData(tokens []int64) string { + b, _ := json.Marshal(tokens) + return base64.StdEncoding.EncodeToString(b) +} + +// Accepts interface{} from a parsed JSON map +func toInt64Slice(v any) []int64 { + xs, ok := v.([]any) + if !ok { + return nil + } + out := make([]int64, 0, len(xs)) + for _, it := range xs { + switch n := it.(type) { + case float64: + out = append(out, int64(n)) + case int64: + out = append(out, n) + case json.Number: + if i, err := n.Int64(); err == nil { + out = append(out, i) + } + } + } + return out +} + +// Accepts raw JSON (string) for events like: +// event: worker_instance_id\n: \"8228244551594056720\"\n\n +// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ +// "\n\ndata: [DONE]\n\n" +// replaces the old toInt64SliceJSON +func toInt64SliceJSON(s string) []int64 { + // case 1: direct JSON array + var arr []int64 + if err := json.Unmarshal([]byte(s), &arr); err == nil && len(arr) > 0 { + return arr + } + // case 2: s is a JSON string that itself contains a JSON array + var inner string + if err := json.Unmarshal([]byte(s), &inner); err == nil && inner != "" { + var arr2 []int64 + if err := json.Unmarshal([]byte(inner), &arr2); err == nil && len(arr2) > 0 { + return arr2 + } + } + // case 3: strip quotes and try once more + unquoted := strings.Trim(s, "\"") + if unquoted != s { + var arr3 []int64 + if err := json.Unmarshal([]byte(unquoted), &arr3); err == nil && len(arr3) > 0 { + return arr3 + } + } + return nil +} From e34790efeb7d82e0f18d611a2f15cc47b3210978 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 22 Aug 2025 13:22:13 -0700 Subject: [PATCH 05/15] Cleanup and move the config file --- pkg/bbr/handlers/request.go | 3 --- .../plugins/dynamo_kv_scorer/epp-config-dynamo.yaml | 1 + pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go | 6 +++++- 3 files changed, 6 insertions(+), 4 deletions(-) rename epp-config-dynamo.yaml => pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml (85%) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 573276e5fc..1aa1b85268 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -245,14 +245,11 @@ func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.Proces switch k { case injectHintHeader, workerIDHeader: - // Prefer raw bytes if present; otherwise use value (Envoy can deliver either) if rv := h.GetRawValue(); len(rv) > 0 { s.workerIDHint = strings.TrimSpace(string(rv)) } else { s.workerIDHint = strings.TrimSpace(h.GetValue()) } - // NOTE: don't return; we still want to scan for tokenDataHeader - case tokenDataHeader: if rv := h.GetRawValue(); len(rv) > 0 { s.tokenDataHint = strings.TrimSpace(string(rv)) diff --git a/epp-config-dynamo.yaml b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml similarity index 85% rename from epp-config-dynamo.yaml rename to pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml index 959c2ea497..2d92be03b7 100644 --- a/epp-config-dynamo.yaml +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml @@ -1,3 +1,4 @@ +# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods apiVersion: inference.networking.x-k8s.io/v1alpha1 kind: EndpointPickerConfig plugins: diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 7d60e07c37..50eb5f6907 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -203,7 +203,11 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any // event: worker_instance_id // : "8303679623149182543" // data: [DONE] -// + +// or with tokens: +// event: worker_instance_id\n: \"8228244551594056720\"\n\n +// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ +// "\n\ndata: [DONE]\n\n" // Also supports JSON in data lines with either top-level worker_instance_id // or annotations.worker_instance_id. func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) { From 39d8b0f3e707cc2e37934ed12a9ccab1c1c9e808 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 10 Sep 2025 16:17:28 -0700 Subject: [PATCH 06/15] Integration with the router bindings Signed-off-by: Anna Tchernych --- Dockerfile.dynamo | 65 +++++++ Makefile | 42 +++++ .../plugins/dynamo_kv_scorer/plugin.go | 173 +++++++++++++++++- 3 files changed, 277 insertions(+), 3 deletions(-) create mode 100644 Dockerfile.dynamo diff --git a/Dockerfile.dynamo b/Dockerfile.dynamo new file mode 100644 index 0000000000..67db775b8d --- /dev/null +++ b/Dockerfile.dynamo @@ -0,0 +1,65 @@ +# Dockerfile.dynamo - Custom Dockerfile for Dynamo FFI plugin +ARG BUILDER_IMAGE=golang:1.24 +ARG BASE_IMAGE=gcr.io/distroless/static:nonroot + +## Multistage build with CGO enabled for Dynamo FFI +FROM ${BUILDER_IMAGE} AS builder + +# Enable CGO for static library linking +ENV CGO_ENABLED=1 +ENV GOOS=linux +ENV GOARCH=amd64 + +# Install build essentials for CGO compilation +RUN apt-get update && apt-get install -y \ + gcc \ + libc6-dev \ + && rm -rf /var/lib/apt/lists/* + +ARG COMMIT_SHA=unknown +ARG BUILD_REF + +# Set up workspace +WORKDIR /src + +# Copy dependencies first for better caching +COPY go.mod go.sum ./ +RUN go mod download + +# Copy all source code including your Dynamo plugin +COPY cmd/epp ./cmd/epp +COPY pkg/epp ./pkg/epp +COPY internal ./internal +COPY api ./api + +# Make sure your static library and headers are available +# The plugin.go has these CGO directives: +# #cgo CFLAGS: -I${SRCDIR}/include +# #cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -ldl -lpthread -lm + +# Verify the library files exist (optional debug step) +RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/include/ || echo "Headers not found" +RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/lib/ || echo "Library not found" + +# Build with CGO enabled +WORKDIR /src/cmd/epp +RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" -o /epp + +## Deploy stage - need to use a base with glibc for CGO +FROM ubuntu:22.04 AS runtime + +# Install minimal runtime dependencies +RUN apt-get update && apt-get install -y \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* \ + && groupadd -r nonroot \ + && useradd -r -g nonroot nonroot + +WORKDIR / +COPY --from=builder /epp /epp + +# Switch to non-root user for security +USER nonroot:nonroot + +ENTRYPOINT ["/epp"] + diff --git a/Makefile b/Makefile index dee7e99e0e..4679ce2e0f 100644 --- a/Makefile +++ b/Makefile @@ -170,6 +170,48 @@ verify-all: ##@ Build +##@ Dynamo EPP with FFI + +# Build the Dynamo EPP image with CGO static library support +.PHONY: dynamo-image-local-build +dynamo-image-local-build: ## Build the Dynamo EPP image using Docker Buildx for local development. + BUILDER=$(shell $(DOCKER_BUILDX_CMD) create --use) + $(MAKE) dynamo-image-build PUSH=$(PUSH) + $(MAKE) dynamo-image-build LOAD=$(LOAD) + $(DOCKER_BUILDX_CMD) rm $$BUILDER + +.PHONY: dynamo-image-local-push +dynamo-image-local-push: PUSH=--push ## Build the Dynamo EPP image for local development and push it to $IMAGE_REPO. +dynamo-image-local-push: dynamo-image-local-build + +.PHONY: dynamo-image-local-load +dynamo-image-local-load: LOAD=--load ## Build the Dynamo EPP image for local development and load it in the local Docker registry. +dynamo-image-local-load: dynamo-image-local-build + +.PHONY: dynamo-image-build +dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support. + $(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \ + --platform=$(PLATFORMS) \ + --build-arg BASE_IMAGE=ubuntu:22.04 \ + --build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \ + --build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \ + --build-arg BUILD_REF=${BUILD_REF} \ + $(PUSH) \ + $(LOAD) \ + $(IMAGE_BUILD_EXTRA_OPTS) ./ + +.PHONY: dynamo-image-push +dynamo-image-push: PUSH=--push ## Build the Dynamo EPP image and push it to $IMAGE_REPO. +dynamo-image-push: dynamo-image-build + +.PHONY: dynamo-image-load +dynamo-image-load: LOAD=--load ## Build the Dynamo EPP image and load it in the local Docker registry. +dynamo-image-load: dynamo-image-build + +.PHONY: dynamo-image-kind +dynamo-image-kind: dynamo-image-build ## Build the Dynamo EPP image and load it to kind cluster $KIND_CLUSTER ("kind" by default). + kind load docker-image $(IMAGE_TAG) --name $(KIND_CLUSTER) + # Build the container image .PHONY: image-local-build image-local-build: ## Build the EPP image using Docker Buildx for local development. diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 50eb5f6907..81b08ac8bf 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -1,5 +1,12 @@ package dynamo_kv_scorer +/* +#cgo CFLAGS: -I${SRCDIR}/include +#cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -ldl -lpthread -lm -lstdc++ +#include "dynamo_simple.h" +*/ +import "C" + import ( "bufio" "bytes" @@ -9,8 +16,11 @@ import ( "fmt" "io" "net/http" + "os" "strings" + "sync" "time" + "unsafe" log "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -27,6 +37,9 @@ const ( TokenDataHeader = "x-epp-inject-nvext-token-data" ) +var warmupOnce sync.Once +var warmupErr error + type params struct { FrontendURL string `json:"frontendURL"` TimeoutMS int `json:"timeoutMS"` @@ -73,7 +86,18 @@ func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (p if timeout <= 0 { timeout = 10 * time.Second } - return NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout), nil + + s := NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout) + + // For the Dynamo Router Library + warmupOnce.Do(func() { + warmupErr = initFFI() + }) + if warmupErr != nil { + return nil, fmt.Errorf("Dynamo FFI init for the Router failed: %w", warmupErr) + } + + return s, nil } func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName } @@ -86,10 +110,17 @@ func (k *KVAwareScorer) Score( ) map[schedtypes.Pod]float64 { logger := log.FromContext(ctx) - workerID, tokenData, err := k.callFrontEndForWorker(ctx, req) + initFFI() + workerID, tokenData, err := k.callDynamoRouter(ctx, req) if err != nil { - logger.V(logutil.DEFAULT).Error(err, "FrontEnd call failed; proceeding without worker id") + logger.V(logutil.DEFAULT).Error(err, "Dynamo call failed; proceeding without worker id") } else if workerID != "" { + logger.V(logutil.DEFAULT).Info( + "Dynamo router selected worker", + "workerID", workerID, + "tokenDataCount", len(tokenData), + "tokenData", tokenData, + ) cycle.Write(StateKeyWorkerInstanceID, stateString(workerID)) if req.Headers == nil { req.Headers = map[string]string{} @@ -429,3 +460,139 @@ func toInt64SliceJSON(s string) []int64 { } return nil } + +// Integration with the Dynamo Router + +// ---- One-time FFI init (adjust to your deployment) ---- + +var ( + ffiOnce sync.Once + ffiErr error + + // Configuration loaded from environment variables + ffiNamespace string + ffiComponent string + ffiWorkerID int64 + ffiKvBlkSize uint32 +) + +// Load configuration from environment variables +func loadDynamoConfig() { + // Set defaults + ffiNamespace = getEnvOrDefault("DYNAMO_NAMESPACE", "default") + ffiComponent = getEnvOrDefault("DYNAMO_COMPONENT", "my-model") + ffiWorkerID = getEnvInt64OrDefault("DYNAMO_WORKER_ID", 1) + ffiKvBlkSize = uint32(getEnvInt64OrDefault("DYNAMO_KV_BLOCK_SIZE", 32)) +} + +func getEnvOrDefault(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +func getEnvInt64OrDefault(key string, defaultValue int64) int64 { + if value := os.Getenv(key); value != "" { + if parsed, err := fmt.Sscanf(value, "%d", &defaultValue); err == nil && parsed == 1 { + return defaultValue + } + } + return defaultValue +} + +// Call this once in your plugin setup (or lazily before first route call) +func initFFI() error { + ffiOnce.Do(func() { + // Load configuration from environment + loadDynamoConfig() + + ns := C.CString(ffiNamespace) + cm := C.CString(ffiComponent) + defer C.free(unsafe.Pointer(ns)) + defer C.free(unsafe.Pointer(cm)) + + // Bring up runtime + publisher (safe even if you only query) + if C.dynamo_llm_init(ns, cm, C.longlong(ffiWorkerID), C.uint(ffiKvBlkSize)) != C.OK { + ffiErr = fmt.Errorf("dynamo_llm_init failed") + return + } + // Initialize router (with defaults) + if C.dynamo_kv_router_init(ns, cm, C.uint(ffiKvBlkSize)) != C.OK { + ffiErr = fmt.Errorf("dynamo_kv_router_init failed") + return + } + }) + return ffiErr +} + +func defaultRouterOverride() *C.struct_DynamoRouterConfigOverride { + // No overrides: both has_* = false; numeric fields ignored. + // TODO: where can I read these from? + cfg := C.struct_DynamoRouterConfigOverride{ + has_overlap_score_weight: C.bool(false), + overlap_score_weight: C.double(0), + has_router_temperature: C.bool(false), + router_temperature: C.double(0), + } + return &cfg +} + +// Uses FFI to get (worker_instance_id, tokens) in-process. +// Use this as an alternative to making the call over HTTP with the k.callFrontEndForWorker +func (k *KVAwareScorer) callDynamoRouter( + ctx context.Context, + req *schedtypes.LLMRequest, +) (string, []int64, error) { + logger := log.FromContext(ctx) + + if err := initFFI(); err != nil { + logger.V(logutil.DEFAULT).Error(err, "FFI init failed") + return "", nil, err + } + + // contextID should be unique per request. If your framework has one, use it. + // Otherwise fall back to a deterministic hash or a random UUID. + contextID := req.RequestId + if contextID == "" { + contextID = "gaie-epp" // TODO: replace with your request-id/trace-id + } + prompt := req.Prompt + + cCtx := C.CString(contextID) + cPrm := C.CString(prompt) + defer C.free(unsafe.Pointer(cCtx)) + defer C.free(unsafe.Pointer(cPrm)) + + var cWorker C.longlong + var cTokens *C.uint + var cCount C.ulong // uintptr_t in header; maps to C.ulong here + + cfg := defaultRouterOverride() + rc := C.dynamo_kv_router_query_instance_id_with_config( + cCtx, + cPrm, + cfg, + &cWorker, + &cTokens, + &cCount, + ) + if rc != C.OK { + return "", nil, fmt.Errorf("dynamo_kv_router_query_instance_id failed") + } + + // Copy tokens into Go memory then free C memory immediately + count := int(uintptr(cCount)) + var tokens64 []int64 + if count > 0 && cTokens != nil { + src := unsafe.Slice((*uint32)(unsafe.Pointer(cTokens)), count) + tokens64 = make([]int64, count) + for i := 0; i < count; i++ { + tokens64[i] = int64(src[i]) + } + C.dynamo_kv_router_free_tokens((*C.uint)(cTokens)) + } + + workerID := fmt.Sprintf("%d", int64(cWorker)) + return workerID, tokens64, nil +} From 1be87f60c6dc698134361d5557f34cc7a59043b5 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 24 Sep 2025 13:01:20 -0700 Subject: [PATCH 07/15] clean Signed-off-by: Anna Tchernych --- DYNAMO_KV_SCORER_README.md | 180 +++++++ Dockerfile.dynamo | 59 +-- .../plugins/dynamo_kv_scorer/plugin.go | 468 +++++++++++------- 3 files changed, 500 insertions(+), 207 deletions(-) create mode 100644 DYNAMO_KV_SCORER_README.md diff --git a/DYNAMO_KV_SCORER_README.md b/DYNAMO_KV_SCORER_README.md new file mode 100644 index 0000000000..596fc638fc --- /dev/null +++ b/DYNAMO_KV_SCORER_README.md @@ -0,0 +1,180 @@ +# Dynamo KV Scorer Plugin - Configuration Guide + +## 🚨 BREAKING CHANGE: Mandatory Environment Variables + +As of this version, `DYNAMO_KV_BLOCK_SIZE` is **MANDATORY** to prevent silent KV routing failures. + +## 📋 Required Configuration + +### **DYNAMO_KV_BLOCK_SIZE** (MANDATORY) + +**This environment variable is now REQUIRED and must exactly match your model card's `kv_cache_block_size`.** + +```bash +# Example: If your model card specifies kv_cache_block_size: 512 +export DYNAMO_KV_BLOCK_SIZE=512 +``` + +#### Validation Rules + +- ✅ **Must be set** (no default value) +- ✅ **Must be integer** between 16 and 8192 +- ✅ **Must be power of 2** (16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192) +- ✅ **Must exactly match** model card's `kv_cache_block_size` + +#### Common Values + +- **256** - Small models, memory-constrained environments +- **512** - Most common, good balance of performance and memory +- **1024** - Large models, high-performance deployments +- **2048** - Very large models with high memory availability + +## 🔍 How to Find Your Model's Block Size + +### Method 1: Check Model Card + +```bash +# Look in your model deployment configuration +cat model-deployment.yaml | grep kv_cache_block_size +# Output: kv_cache_block_size: 512 +``` + +### Method 2: Check Running Workers + +```bash +# Query Dynamo worker configuration +kubectl get configmap dynamo-worker-config -o yaml | grep kv_cache_block_size +``` + +### Method 3: Check Model Documentation + +Most model cards specify the recommended block size in their configuration files or documentation. + +## 🚀 Example Deployment + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: epp-with-dynamo-kv-scorer +spec: + template: + spec: + containers: + - name: epp + env: + - name: DYNAMO_KV_BLOCK_SIZE + value: "512" # MUST match your model card! + - name: DYNAMO_NAMESPACE + value: "production" + - name: DYNAMO_COMPONENT + value: "backend" + - name: DYNAMO_MODEL + value: "Qwen/Qwen3-0.6B" +``` + +### Docker Compose + +```yaml +services: + epp: + environment: + - DYNAMO_KV_BLOCK_SIZE=512 # MUST match your model card! + - DYNAMO_NAMESPACE=production + - DYNAMO_COMPONENT=backend + - DYNAMO_MODEL=Qwen/Qwen3-0.6B +``` + +### Shell Script + +```bash +#!/bin/bash +# deployment.sh + +# Get block size from your model configuration +MODEL_KV_BLOCK_SIZE=$(kubectl get configmap model-config -o jsonpath='{.data.kv_cache_block_size}') + +# Deploy EPP with matching block size +export DYNAMO_KV_BLOCK_SIZE=${MODEL_KV_BLOCK_SIZE} +export DYNAMO_NAMESPACE=production +export DYNAMO_COMPONENT=backend +export DYNAMO_MODEL=Qwen/Qwen3-0.6B + +./start-epp +``` + +## ⚠️ Error Messages + +### Missing Environment Variable + +``` +DYNAMO_KV_BLOCK_SIZE environment variable is required but not set. +This must match your model card's kv_cache_block_size exactly. +Common values: 256, 512, 1024 +``` + +### Invalid Value + +``` +DYNAMO_KV_BLOCK_SIZE='abc' is not a valid integer +``` + +### Out of Range + +``` +DYNAMO_KV_BLOCK_SIZE=99999 is outside reasonable range [16, 8192] +``` + +### Not Power of 2 + +``` +DYNAMO_KV_BLOCK_SIZE=100 should be a power of 2 (16, 32, 64, 128, 256, 512, 1024, etc.) +``` + +## 🔧 Migration Guide + +### Before (with silent failures) + +```bash +# This would silently fail if model card had different block size +export DYNAMO_KV_BLOCK_SIZE=512 # might not match model! +``` + +### After (explicit configuration) + +```bash +# Step 1: Check your model's actual block size +MODEL_BLOCK_SIZE=$(get-model-block-size) # Your method here + +# Step 2: Set environment variable to match exactly +export DYNAMO_KV_BLOCK_SIZE=${MODEL_BLOCK_SIZE} + +# Step 3: EPP will fail fast if there's a mismatch +``` + +## 📊 Impact + +- ✅ **Prevents silent failures** - KV routing either works correctly or fails fast +- ✅ **Explicit configuration** - Forces users to know their model's requirements +- ✅ **Better debugging** - Clear error messages for misconfigurations +- ✅ **Performance assurance** - Guarantees KV-aware routing works when configured + +## 🆘 Troubleshooting + +### Q: How do I know if my block size is correct? + +A: The plugin will log on startup: `Dynamo KV Scorer: Loaded mandatory DYNAMO_KV_BLOCK_SIZE=512` + +### Q: What happens if I set the wrong block size? + +A: KV-aware routing will fail completely, falling back to round-robin scheduling. This will degrade performance but not break functionality. + +### Q: Can I still use defaults? + +A: No. This is now mandatory to prevent silent misconfigurations that are hard to debug. + +### Q: Why is this mandatory now? + +A: Previously, wrong block sizes caused silent KV routing failures that were very difficult to diagnose. Making it mandatory forces explicit, correct configuration. diff --git a/Dockerfile.dynamo b/Dockerfile.dynamo index 67db775b8d..3f0e0a07c5 100644 --- a/Dockerfile.dynamo +++ b/Dockerfile.dynamo @@ -1,65 +1,66 @@ # Dockerfile.dynamo - Custom Dockerfile for Dynamo FFI plugin ARG BUILDER_IMAGE=golang:1.24 -ARG BASE_IMAGE=gcr.io/distroless/static:nonroot +ARG BASE_IMAGE=ubuntu:22.04 -## Multistage build with CGO enabled for Dynamo FFI +############################ +# Builder +############################ FROM ${BUILDER_IMAGE} AS builder -# Enable CGO for static library linking ENV CGO_ENABLED=1 ENV GOOS=linux ENV GOARCH=amd64 - -# Install build essentials for CGO compilation -RUN apt-get update && apt-get install -y \ - gcc \ +# be explicit; helps cgo when linking libstdc++ +ENV CC=gcc +ENV CXX=g++ + +# C/C++ toolchain for cgo, and libstdc++ for link-time +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + gcc g++ \ libc6-dev \ - && rm -rf /var/lib/apt/lists/* + ca-certificates \ + && rm -rf /var/lib/apt/lists/* ARG COMMIT_SHA=unknown ARG BUILD_REF -# Set up workspace WORKDIR /src -# Copy dependencies first for better caching +# deps first (cache) COPY go.mod go.sum ./ RUN go mod download -# Copy all source code including your Dynamo plugin +# source COPY cmd/epp ./cmd/epp COPY pkg/epp ./pkg/epp COPY internal ./internal COPY api ./api -# Make sure your static library and headers are available -# The plugin.go has these CGO directives: -# #cgo CFLAGS: -I${SRCDIR}/include -# #cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -ldl -lpthread -lm - -# Verify the library files exist (optional debug step) +# sanity (optional) RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/include/ || echo "Headers not found" RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/lib/ || echo "Library not found" -# Build with CGO enabled +# build WORKDIR /src/cmd/epp -RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" -o /epp +RUN go build \ + -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" \ + -o /epp -## Deploy stage - need to use a base with glibc for CGO -FROM ubuntu:22.04 AS runtime +############################ +# Runtime +############################ +FROM ${BASE_IMAGE} AS runtime -# Install minimal runtime dependencies -RUN apt-get update && apt-get install -y \ +# Minimal runtime deps; include libstdc++ runtime for -lstdc++ +RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ - && rm -rf /var/lib/apt/lists/* \ - && groupadd -r nonroot \ - && useradd -r -g nonroot nonroot + libstdc++6 \ + && rm -rf /var/lib/apt/lists/* \ + && groupadd -r nonroot && useradd -r -g nonroot nonroot WORKDIR / COPY --from=builder /epp /epp -# Switch to non-root user for security USER nonroot:nonroot - ENTRYPOINT ["/epp"] - diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 81b08ac8bf..c415fa2460 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -1,9 +1,67 @@ package dynamo_kv_scorer /* -#cgo CFLAGS: -I${SRCDIR}/include -#cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -ldl -lpthread -lm -lstdc++ -#include "dynamo_simple.h" +#cgo CPPFLAGS: -I${SRCDIR}/include +#cgo CXXFLAGS: -std=c++17 +#cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -lstdc++ -ldl -lpthread -lm + +#include +#include +#include // for free +#include + +// enum underlying type is uint32_t; matches cbindgen output +typedef uint32_t dynamo_llm_result_t; +enum { DYNAMO_OK = 0, DYNAMO_ERR = 1 }; + +// opaque handle forward-decl +struct WorkerSelectionPipeline; +typedef struct WorkerSelectionPipeline WorkerSelectionPipeline; + +// Prototypes (C-compatible) +dynamo_llm_result_t dynamo_llm_init(const char *namespace_c_str, + const char *component_c_str, + int64_t worker_id, + uint32_t kv_block_size); + +dynamo_llm_result_t dynamo_llm_shutdown(void); +dynamo_llm_result_t dynamo_llm_load_publisher_create(void); + +dynamo_llm_result_t dynamo_kv_event_publish_stored(uint64_t event_id, + const uint32_t *token_ids, + const uintptr_t *num_block_tokens, + const uint64_t *block_ids, + size_t num_blocks, + const uint64_t *parent_hash, + uint64_t lora_id); + +dynamo_llm_result_t dynamo_kv_event_publish_removed(uint64_t event_id, + const uint64_t *block_ids, + size_t num_blocks); + +dynamo_llm_result_t dynamo_create_worker_selection_pipeline(const char *namespace_c_str, + const char *component_c_str, + const char *model_name_c_str, + bool use_kv_routing, + double busy_threshold, + double overlap_score_weight, + double router_temperature, + bool use_kv_events, + bool router_replica_sync, + WorkerSelectionPipeline **pipeline_out); + +dynamo_llm_result_t dynamo_destroy_worker_selection_pipeline(WorkerSelectionPipeline *pipeline); + +dynamo_llm_result_t dynamo_query_worker_selection_and_annotate(WorkerSelectionPipeline *pipeline, + const char *request_json_c_str, + int64_t *worker_instance_id_out, + uint32_t **token_ids_out, + size_t *token_count_out, + char **annotated_request_json_out); + +dynamo_llm_result_t dynamo_free_worker_selection_result(uint32_t *token_ids, + size_t token_count, + char *annotated_request_json); */ import "C" @@ -37,6 +95,8 @@ const ( TokenDataHeader = "x-epp-inject-nvext-token-data" ) +// --------------------------- config / env --------------------------- + var warmupOnce sync.Once var warmupErr error @@ -45,7 +105,6 @@ type params struct { TimeoutMS int `json:"timeoutMS"` } -// tiny wrapper so we can store a string in CycleState type stateString string func (s stateString) Clone() schedtypes.StateData { return s } @@ -56,7 +115,6 @@ type KVAwareScorer struct { feTimeout time.Duration } -// compile-time assertions var _ plugins.Plugin = (*KVAwareScorer)(nil) var _ framework.Scorer = (*KVAwareScorer)(nil) @@ -89,12 +147,17 @@ func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (p s := NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout) - // For the Dynamo Router Library + // one-time FFI init (runtime + persistent pipeline) warmupOnce.Do(func() { + defer func() { + if r := recover(); r != nil { + warmupErr = fmt.Errorf("Dynamo configuration error: %v", r) + } + }() warmupErr = initFFI() }) if warmupErr != nil { - return nil, fmt.Errorf("Dynamo FFI init for the Router failed: %w", warmupErr) + return nil, fmt.Errorf("!!! Dynamo FFI init for the Router failed: %w", warmupErr) } return s, nil @@ -102,47 +165,8 @@ func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (p func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName } -func (k *KVAwareScorer) Score( - ctx context.Context, - cycle *schedtypes.CycleState, - req *schedtypes.LLMRequest, - pods []schedtypes.Pod, -) map[schedtypes.Pod]float64 { - logger := log.FromContext(ctx) +// --------------------------- SSE helpers (unchanged) --------------------------- - initFFI() - workerID, tokenData, err := k.callDynamoRouter(ctx, req) - if err != nil { - logger.V(logutil.DEFAULT).Error(err, "Dynamo call failed; proceeding without worker id") - } else if workerID != "" { - logger.V(logutil.DEFAULT).Info( - "Dynamo router selected worker", - "workerID", workerID, - "tokenDataCount", len(tokenData), - "tokenData", tokenData, - ) - cycle.Write(StateKeyWorkerInstanceID, stateString(workerID)) - if req.Headers == nil { - req.Headers = map[string]string{} - } - req.Headers[WorkerIDHeader] = workerID - if len(tokenData) > 0 { - if req.Headers == nil { - req.Headers = map[string]string{} - } - req.Headers[TokenDataHeader] = encodeTokenData(tokenData) - } - } - - // neutral/uniform scores – only your scorer runs in the profile, so this “wins” - out := make(map[schedtypes.Pod]float64, len(pods)) - for _, p := range pods { - out[p] = 1.0 - } - return out -} - -// Call the Dynamo FrontEnd and extract worker_instance_id via SSE. func (k *KVAwareScorer) callFrontEndForWorker( ctx context.Context, req *schedtypes.LLMRequest, @@ -188,7 +212,6 @@ func (k *KVAwareScorer) callFrontEndForWorker( return "", nil, fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) } - // Parse SSE: expect `event: worker_instance_id`, a quoted id in a comment or data, and `data: [DONE]` reader := bufio.NewReader(resp.Body) workerID, tokenData, perr := parseSelectionFromSSE(ctx, reader) if perr != nil { @@ -197,62 +220,31 @@ func (k *KVAwareScorer) callFrontEndForWorker( return workerID, tokenData, nil } -// Build the exact body we send to the FrontEnd, only from LLMRequest (no header merging). func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any { feBody := make(map[string]any, 8) - - // We call /v1/chat/completions so must provide messages userText := "" if req != nil && strings.TrimSpace(req.Prompt) != "" { userText = req.Prompt } - feBody["messages"] = []map[string]any{ - {"role": "user", "content": userText}, - } - + feBody["messages"] = []map[string]any{{"role": "user", "content": userText}} if req != nil && strings.TrimSpace(req.TargetModel) != "" { feBody["model"] = req.TargetModel } - - // Force SSE so we can parse worker_instance_id feBody["stream"] = true - feBody["max_tokens"] = 1 feBody["temperature"] = 0.0 - - // Ask the Dynamo to include worker id - feBody["nvext"] = map[string]any{ - "annotations": []string{"query_instance_id"}, - } - return feBody } -// This function scans an SSE stream for a worker_instance_id and token_data. -// Expected pattern: -// -// event: worker_instance_id -// : "8303679623149182543" -// data: [DONE] - -// or with tokens: -// event: worker_instance_id\n: \"8228244551594056720\"\n\n -// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ -// "\n\ndata: [DONE]\n\n" -// Also supports JSON in data lines with either top-level worker_instance_id -// or annotations.worker_instance_id. func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) { logger := log.FromContext(ctx) - var ( eventName string - dataBuf strings.Builder // accumulates "data:" lines for one event - commentBuf strings.Builder // accumulates ":" comment lines + dataBuf strings.Builder + commentBuf strings.Builder gotWID string gotTD []int64 ) - - // collect the exact SSE bytes for debugging var rawBuf strings.Builder flushEvent := func() (bool, error) { @@ -261,7 +253,6 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ dataBuf.Reset() commentBuf.Reset() - // [DONE] ends the stream if data == "[DONE]" || comment == "[DONE]" { logger.V(logutil.DEFAULT).Info("SSE stream DONE") logger.V(logutil.DEFAULT).Info("SSE raw stream", "raw", rawBuf.String()) @@ -271,24 +262,19 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ return true, nil } - // Prefer the named event if eventName == "worker_instance_id" { candidate := data if candidate == "" { candidate = comment } if candidate != "" { - // Try JSON string var s string if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { - logger.V(logutil.VERBOSE).Info("worker_instance_id extracted from named event", "worker_instance_id", s) gotWID = s return false, nil } - // Fallback: strip quotes clean := strings.Trim(candidate, "\"") if clean != "" && clean != "[DONE]" { - logger.V(logutil.DEFAULT).Info("worker_instance_id extracted (raw) from named event", "worker_instance_id", clean) gotWID = clean return false, nil } @@ -303,35 +289,30 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ if candidate != "" { if arr := toInt64SliceJSON(candidate); len(arr) > 0 { gotTD = arr - logger.V(logutil.DEFAULT).Info("token_data extracted from named event", "count", len(arr)) return false, nil } } } - // Generic JSON in data: + if data != "" { var msg map[string]any if json.Unmarshal([]byte(data), &msg) == nil { if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { - logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE payload root", "worker_instance_id", wid) gotWID = wid } if ann, ok := msg["annotations"].(map[string]any); ok { if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { - logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE annotations", "worker_instance_id", wid) gotWID = wid } } if td, ok := msg["token_data"]; ok { if arr := toInt64Slice(td); len(arr) > 0 { gotTD = arr - logger.V(logutil.DEFAULT).Info("token_data found in SSE payload root", "count", len(arr)) } } else if nv, ok := msg["nvext"].(map[string]any); ok { if td, ok := nv["token_data"]; ok { if arr := toInt64Slice(td); len(arr) > 0 { gotTD = arr - logger.V(logutil.DEFAULT).Info("token_data found in SSE nvext", "count", len(arr)) } } } @@ -342,7 +323,6 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ for { line, err := reader.ReadString('\n') - // capture the raw stream as-is for debugging rawBuf.WriteString(line) if err != nil { if err == io.EOF { @@ -363,18 +343,16 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ l := strings.TrimRight(line, "\r\n") if l == "" { - // End of current event. if done, _ := flushEvent(); done { if gotWID != "" && len(gotTD) == 0 { logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") } return gotWID, gotTD, nil } - eventName = "" // reset for next event + eventName = "" continue } - // Comment line if strings.HasPrefix(l, ":") { commentLine := strings.TrimSpace(l[1:]) if commentBuf.Len() > 0 { @@ -384,7 +362,6 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ continue } - // "field: value" if idx := strings.IndexByte(l, ':'); idx != -1 { field := l[:idx] val := strings.TrimSpace(l[idx+1:]) @@ -397,19 +374,16 @@ func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, [ } dataBuf.WriteString(val) default: - // ignore id, retry, etc. } } } } -// encodeTokenData turns []int64 into base64(JSON array) for a safe header value. func encodeTokenData(tokens []int64) string { b, _ := json.Marshal(tokens) return base64.StdEncoding.EncodeToString(b) } -// Accepts interface{} from a parsed JSON map func toInt64Slice(v any) []int64 { xs, ok := v.([]any) if !ok { @@ -431,18 +405,11 @@ func toInt64Slice(v any) []int64 { return out } -// Accepts raw JSON (string) for events like: -// event: worker_instance_id\n: \"8228244551594056720\"\n\n -// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ -// "\n\ndata: [DONE]\n\n" -// replaces the old toInt64SliceJSON func toInt64SliceJSON(s string) []int64 { - // case 1: direct JSON array var arr []int64 if err := json.Unmarshal([]byte(s), &arr); err == nil && len(arr) > 0 { return arr } - // case 2: s is a JSON string that itself contains a JSON array var inner string if err := json.Unmarshal([]byte(s), &inner); err == nil && inner != "" { var arr2 []int64 @@ -450,7 +417,6 @@ func toInt64SliceJSON(s string) []int64 { return arr2 } } - // case 3: strip quotes and try once more unquoted := strings.Trim(s, "\"") if unquoted != s { var arr3 []int64 @@ -461,85 +427,175 @@ func toInt64SliceJSON(s string) []int64 { return nil } -// Integration with the Dynamo Router - -// ---- One-time FFI init (adjust to your deployment) ---- +// --------------------------- FFI integration --------------------------- var ( ffiOnce sync.Once ffiErr error - // Configuration loaded from environment variables - ffiNamespace string - ffiComponent string - ffiWorkerID int64 - ffiKvBlkSize uint32 + ffiNamespace string + ffiComponent string + ffiModel string + ffiOverlapScoreWeight float64 + ffiRouterTemperature float64 + ffiKvBlockSize uint32 + ffiWorkerID int64 + + runtimeInitialized bool + + // Boxed pipeline handle (owned on the Rust side, opaque here) + pipeline *C.struct_WorkerSelectionPipeline + pipelineMutex sync.RWMutex ) -// Load configuration from environment variables func loadDynamoConfig() { - // Set defaults - ffiNamespace = getEnvOrDefault("DYNAMO_NAMESPACE", "default") - ffiComponent = getEnvOrDefault("DYNAMO_COMPONENT", "my-model") + ffiNamespace = getEnvOrDefault("DYNAMO_NAMESPACE", "vllm-agg") + ffiComponent = getEnvOrDefault("DYNAMO_COMPONENT", "backend") + ffiModel = getEnvOrDefault("DYNAMO_MODEL", "Qwen/Qwen3-0.6B") ffiWorkerID = getEnvInt64OrDefault("DYNAMO_WORKER_ID", 1) - ffiKvBlkSize = uint32(getEnvInt64OrDefault("DYNAMO_KV_BLOCK_SIZE", 32)) -} -func getEnvOrDefault(key, defaultValue string) string { - if value := os.Getenv(key); value != "" { - return value + ffiOverlapScoreWeight = getEnvFloatOrDefault("DYNAMO_OVERLAP_SCORE_WEIGHT", -1.0) + ffiRouterTemperature = getEnvFloatOrDefault("DYNAMO_ROUTER_TEMPERATURE", -1.0) + + kvBlockSizeStr := os.Getenv("DYNAMO_KV_BLOCK_SIZE") + if kvBlockSizeStr == "" { + panic("DYNAMO_KV_BLOCK_SIZE is required and must match the model card's kv_cache_block_size") + } + var tmp int64 + if n, err := fmt.Sscanf(kvBlockSizeStr, "%d", &tmp); err != nil || n != 1 { + panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE='%s' is not a valid integer", kvBlockSizeStr)) } - return defaultValue + ffiKvBlockSize = uint32(tmp) + if ffiKvBlockSize < 16 || ffiKvBlockSize > 8192 { + panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE=%d outside [16,8192]", ffiKvBlockSize)) + } + if (ffiKvBlockSize & (ffiKvBlockSize - 1)) != 0 { + panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE=%d must be a power of 2", ffiKvBlockSize)) + } + fmt.Printf("Dynamo KV Scorer: Loaded DYNAMO_KV_BLOCK_SIZE=%d\n", ffiKvBlockSize) } -func getEnvInt64OrDefault(key string, defaultValue int64) int64 { - if value := os.Getenv(key); value != "" { - if parsed, err := fmt.Sscanf(value, "%d", &defaultValue); err == nil && parsed == 1 { - return defaultValue +func getEnvOrDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} +func getEnvInt64OrDefault(key string, def int64) int64 { + if v := os.Getenv(key); v != "" { + var p int64 + if n, err := fmt.Sscanf(v, "%d", &p); err == nil && n == 1 { + return p } } - return defaultValue + return def +} +func getEnvFloatOrDefault(key string, def float64) float64 { + if v := os.Getenv(key); v != "" { + var p float64 + if n, err := fmt.Sscanf(v, "%f", &p); err == nil && n == 1 { + return p + } + } + return def +} +func getEnvBoolOrDefault(key string, def bool) bool { + if v := os.Getenv(key); v != "" { + switch strings.ToLower(v) { + case "true", "1", "yes", "on": + return true + case "false", "0", "no", "off": + return false + } + } + return def } -// Call this once in your plugin setup (or lazily before first route call) +// initFFI: initialize runtime and create a persistent boxed pipeline. func initFFI() error { ffiOnce.Do(func() { - // Load configuration from environment loadDynamoConfig() ns := C.CString(ffiNamespace) cm := C.CString(ffiComponent) + model := C.CString(ffiModel) defer C.free(unsafe.Pointer(ns)) defer C.free(unsafe.Pointer(cm)) + defer C.free(unsafe.Pointer(model)) - // Bring up runtime + publisher (safe even if you only query) - if C.dynamo_llm_init(ns, cm, C.longlong(ffiWorkerID), C.uint(ffiKvBlkSize)) != C.OK { + // 1) runtime + if rc := C.dynamo_llm_init(ns, cm, C.int64_t(ffiWorkerID), C.uint32_t(ffiKvBlockSize)); rc != C.DYNAMO_OK { ffiErr = fmt.Errorf("dynamo_llm_init failed") return } - // Initialize router (with defaults) - if C.dynamo_kv_router_init(ns, cm, C.uint(ffiKvBlkSize)) != C.OK { - ffiErr = fmt.Errorf("dynamo_kv_router_init failed") + runtimeInitialized = true + + // 2) create persistent pipeline + pipelineMutex.Lock() + defer pipelineMutex.Unlock() + + rc := C.dynamo_create_worker_selection_pipeline( + ns, + cm, + model, + C.bool(true), // use_kv_routing + C.double(-1.0), // busy_threshold (default) + C.double(ffiOverlapScoreWeight), // overlap_score_weight (neg = default) + C.double(ffiRouterTemperature), // router_temperature (neg = default) + C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)), + C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", false)), + &pipeline, + ) + if rc != C.DYNAMO_OK { + ffiErr = fmt.Errorf("dynamo_create_worker_selection_pipeline failed") return } }) return ffiErr } -func defaultRouterOverride() *C.struct_DynamoRouterConfigOverride { - // No overrides: both has_* = false; numeric fields ignored. - // TODO: where can I read these from? - cfg := C.struct_DynamoRouterConfigOverride{ - has_overlap_score_weight: C.bool(false), - overlap_score_weight: C.double(0), - has_router_temperature: C.bool(false), - router_temperature: C.double(0), +// --------------------------- scoring --------------------------- + +func (k *KVAwareScorer) Score( + ctx context.Context, + cycle *schedtypes.CycleState, + req *schedtypes.LLMRequest, + pods []schedtypes.Pod, +) map[schedtypes.Pod]float64 { + logger := log.FromContext(ctx) + + workerID, tokenData, err := k.callDynamoRouter(ctx, req) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo call failed; proceeding without worker id") + } else if workerID != "" { + logger.V(logutil.DEFAULT).Info( + "Dynamo router selected worker", + "workerID", workerID, + "tokenDataCount", len(tokenData), + "tokenData", tokenData, + ) + cycle.Write(StateKeyWorkerInstanceID, stateString(workerID)) + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[WorkerIDHeader] = workerID + if len(tokenData) > 0 { + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[TokenDataHeader] = encodeTokenData(tokenData) + } + } + + out := make(map[schedtypes.Pod]float64, len(pods)) + for _, p := range pods { + out[p] = 1.0 } - return &cfg + return out } -// Uses FFI to get (worker_instance_id, tokens) in-process. -// Use this as an alternative to making the call over HTTP with the k.callFrontEndForWorker +// --------------------------- router call (persistent only) --------------------------- + func (k *KVAwareScorer) callDynamoRouter( ctx context.Context, req *schedtypes.LLMRequest, @@ -550,39 +606,49 @@ func (k *KVAwareScorer) callDynamoRouter( logger.V(logutil.DEFAULT).Error(err, "FFI init failed") return "", nil, err } - - // contextID should be unique per request. If your framework has one, use it. - // Otherwise fall back to a deterministic hash or a random UUID. - contextID := req.RequestId - if contextID == "" { - contextID = "gaie-epp" // TODO: replace with your request-id/trace-id + if !runtimeInitialized { + return "", nil, fmt.Errorf("dynamo runtime not initialized") } - prompt := req.Prompt - cCtx := C.CString(contextID) - cPrm := C.CString(prompt) - defer C.free(unsafe.Pointer(cCtx)) - defer C.free(unsafe.Pointer(cPrm)) + pipelineMutex.RLock() + currentPipeline := pipeline + pipelineMutex.RUnlock() - var cWorker C.longlong - var cTokens *C.uint - var cCount C.ulong // uintptr_t in header; maps to C.ulong here + if currentPipeline == nil { + return "", nil, fmt.Errorf("dynamo worker selection pipeline not created") + } - cfg := defaultRouterOverride() - rc := C.dynamo_kv_router_query_instance_id_with_config( - cCtx, - cPrm, - cfg, - &cWorker, + // Build OpenAI-compatible JSON request + requestBody := buildOpenAIRequest(req) + requestJSON, err := json.Marshal(requestBody) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to marshal OpenAI request") + return "", nil, fmt.Errorf("marshal OpenAI request: %w", err) + } + cRequestJSON := C.CString(string(requestJSON)) + defer C.free(unsafe.Pointer(cRequestJSON)) + + // Output variables + var cWorkerID C.int64_t + var cTokens *C.uint32_t + var cTokenCount C.size_t + var cAnnotatedJSON *C.char + + // Call the worker selection pipeline + rc := C.dynamo_query_worker_selection_and_annotate( + currentPipeline, + cRequestJSON, + &cWorkerID, &cTokens, - &cCount, + &cTokenCount, + &cAnnotatedJSON, ) - if rc != C.OK { - return "", nil, fmt.Errorf("dynamo_kv_router_query_instance_id failed") + if rc != C.DYNAMO_OK { + return "", nil, fmt.Errorf("!!! dynamo_query_worker_selection_and_annotate failed") } - // Copy tokens into Go memory then free C memory immediately - count := int(uintptr(cCount)) + // Copy tokens into Go memory and free C memory + count := int(uintptr(cTokenCount)) var tokens64 []int64 if count > 0 && cTokens != nil { src := unsafe.Slice((*uint32)(unsafe.Pointer(cTokens)), count) @@ -590,9 +656,55 @@ func (k *KVAwareScorer) callDynamoRouter( for i := 0; i < count; i++ { tokens64[i] = int64(src[i]) } - C.dynamo_kv_router_free_tokens((*C.uint)(cTokens)) } + C.dynamo_free_worker_selection_result(cTokens, cTokenCount, cAnnotatedJSON) + + workerID := fmt.Sprintf("%d", int64(cWorkerID)) + logger.V(logutil.DEFAULT).Info("Worker selection completed", + "workerID", workerID, "tokenCount", count) - workerID := fmt.Sprintf("%d", int64(cWorker)) return workerID, tokens64, nil } + +func buildOpenAIRequest(req *schedtypes.LLMRequest) map[string]any { + requestBody := make(map[string]any) + userText := "default prompt" + if req != nil && strings.TrimSpace(req.Prompt) != "" { + userText = req.Prompt + } + requestBody["messages"] = []map[string]any{{"role": "user", "content": userText}} + if req != nil && strings.TrimSpace(req.TargetModel) != "" { + requestBody["model"] = req.TargetModel + } else { + requestBody["model"] = ffiModel + } + requestBody["max_tokens"] = 1 + requestBody["temperature"] = 0.0 + requestBody["stream"] = true + requestBody["nvext"] = map[string]any{ + "annotations": []string{"query_instance_id"}, + } + return requestBody +} + +// --------------------------- shutdown --------------------------- + +func cleanupDynamo() error { + pipelineMutex.Lock() + defer pipelineMutex.Unlock() + + if pipeline != nil { + if rc := C.dynamo_destroy_worker_selection_pipeline(pipeline); rc != C.DYNAMO_OK { + fmt.Printf("Warning: dynamo_destroy_worker_selection_pipeline failed\n") + } + pipeline = nil + } + + if runtimeInitialized { + if rc := C.dynamo_llm_shutdown(); rc != C.DYNAMO_OK { + return fmt.Errorf("dynamo_llm_shutdown failed") + } + runtimeInitialized = false + } + return nil +} From e701ef0ccefe5e902ed8bee98b2a7f3ea0ed59a9 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 29 Sep 2025 09:26:33 -0700 Subject: [PATCH 08/15] rm FrontEnd-related Signed-off-by: Anna Tchernych --- .../dynamo_kv_scorer/epp-config-dynamo.yaml | 3 - .../plugins/dynamo_kv_scorer/plugin.go | 300 +----------------- 2 files changed, 9 insertions(+), 294 deletions(-) diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml index 2d92be03b7..b689c00171 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml @@ -13,9 +13,6 @@ plugins: parameters: {} - name: dyn-kv type: kv-aware-scorer - parameters: - frontendURL: http://127.0.0.1:8000/v1/chat/completions - timeoutMS: 10000 schedulingProfiles: - name: default plugins: diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index c415fa2460..aced5fcc11 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -66,18 +66,13 @@ dynamo_llm_result_t dynamo_free_worker_selection_result(uint32_t *token_ids, import "C" import ( - "bufio" - "bytes" "context" "encoding/base64" "encoding/json" "fmt" - "io" - "net/http" "os" "strings" "sync" - "time" "unsafe" log "sigs.k8s.io/controller-runtime/pkg/log" @@ -100,19 +95,14 @@ const ( var warmupOnce sync.Once var warmupErr error +type stateString string type params struct { - FrontendURL string `json:"frontendURL"` - TimeoutMS int `json:"timeoutMS"` } -type stateString string - func (s stateString) Clone() schedtypes.StateData { return s } type KVAwareScorer struct { typedName plugins.TypedName - feURL string - feTimeout time.Duration } var _ plugins.Plugin = (*KVAwareScorer)(nil) @@ -121,31 +111,16 @@ var _ framework.Scorer = (*KVAwareScorer)(nil) func NewKVAwareScorer() *KVAwareScorer { return &KVAwareScorer{ typedName: plugins.TypedName{Type: KVAwareScorerType, Name: PluginName}, - feURL: "http://127.0.0.1:8000/v1/chat/completions", - feTimeout: 10 * time.Second, } } func (k *KVAwareScorer) WithName(name string) *KVAwareScorer { k.typedName.Name = name; return k } -func (k *KVAwareScorer) WithFrontend(url string, timeout time.Duration) *KVAwareScorer { - if url != "" { - k.feURL = url - } - if timeout > 0 { - k.feTimeout = timeout - } - return k -} func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { p := params{} _ = json.Unmarshal(raw, &p) - timeout := time.Duration(p.TimeoutMS) * time.Millisecond - if timeout <= 0 { - timeout = 10 * time.Second - } - s := NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout) + s := NewKVAwareScorer().WithName(name) // one-time FFI init (runtime + persistent pipeline) warmupOnce.Do(func() { @@ -165,268 +140,6 @@ func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (p func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName } -// --------------------------- SSE helpers (unchanged) --------------------------- - -func (k *KVAwareScorer) callFrontEndForWorker( - ctx context.Context, - req *schedtypes.LLMRequest, -) (string, []int64, error) { - logger := log.FromContext(ctx) - - feBody := buildFrontEndBodyFromLLMRequest(req) - payload, err := json.Marshal(feBody) - if err != nil { - logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd marshal failed") - return "", nil, fmt.Errorf("marshal FrontEnd body: %w", err) - } - - reqCtx, cancel := context.WithTimeout(ctx, k.feTimeout) - defer cancel() - - httpReq, err := http.NewRequestWithContext(reqCtx, http.MethodPost, k.feURL, bytes.NewReader(payload)) - if err != nil { - logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd request build failed") - return "", nil, fmt.Errorf("build FrontEnd request: %w", err) - } - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("Accept", "text/event-stream") - - client := &http.Client{Timeout: 0} - resp, err := client.Do(httpReq) - if err != nil { - logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd POST failed") - return "", nil, fmt.Errorf("FrontEnd POST failed: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - errBody, _ := io.ReadAll(resp.Body) - logger.V(logutil.DEFAULT).Error(nil, "Dynamo FrontEnd non-2xx response", - "status_code", resp.StatusCode, "response_body", string(errBody)) - return "", nil, fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) - } - - ct := strings.ToLower(resp.Header.Get("Content-Type")) - if !strings.Contains(ct, "text/event-stream") { - logger.V(logutil.DEFAULT).Error(nil, "Unexpected non-SSE response") - return "", nil, fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) - } - - reader := bufio.NewReader(resp.Body) - workerID, tokenData, perr := parseSelectionFromSSE(ctx, reader) - if perr != nil { - return "", nil, perr - } - return workerID, tokenData, nil -} - -func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any { - feBody := make(map[string]any, 8) - userText := "" - if req != nil && strings.TrimSpace(req.Prompt) != "" { - userText = req.Prompt - } - feBody["messages"] = []map[string]any{{"role": "user", "content": userText}} - if req != nil && strings.TrimSpace(req.TargetModel) != "" { - feBody["model"] = req.TargetModel - } - feBody["stream"] = true - feBody["max_tokens"] = 1 - feBody["temperature"] = 0.0 - return feBody -} - -func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) { - logger := log.FromContext(ctx) - var ( - eventName string - dataBuf strings.Builder - commentBuf strings.Builder - gotWID string - gotTD []int64 - ) - var rawBuf strings.Builder - - flushEvent := func() (bool, error) { - data := strings.TrimSpace(dataBuf.String()) - comment := strings.TrimSpace(commentBuf.String()) - dataBuf.Reset() - commentBuf.Reset() - - if data == "[DONE]" || comment == "[DONE]" { - logger.V(logutil.DEFAULT).Info("SSE stream DONE") - logger.V(logutil.DEFAULT).Info("SSE raw stream", "raw", rawBuf.String()) - if gotWID != "" && len(gotTD) == 0 { - logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") - } - return true, nil - } - - if eventName == "worker_instance_id" { - candidate := data - if candidate == "" { - candidate = comment - } - if candidate != "" { - var s string - if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { - gotWID = s - return false, nil - } - clean := strings.Trim(candidate, "\"") - if clean != "" && clean != "[DONE]" { - gotWID = clean - return false, nil - } - } - } - - if eventName == "token_data" { - candidate := data - if candidate == "" { - candidate = comment - } - if candidate != "" { - if arr := toInt64SliceJSON(candidate); len(arr) > 0 { - gotTD = arr - return false, nil - } - } - } - - if data != "" { - var msg map[string]any - if json.Unmarshal([]byte(data), &msg) == nil { - if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { - gotWID = wid - } - if ann, ok := msg["annotations"].(map[string]any); ok { - if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { - gotWID = wid - } - } - if td, ok := msg["token_data"]; ok { - if arr := toInt64Slice(td); len(arr) > 0 { - gotTD = arr - } - } else if nv, ok := msg["nvext"].(map[string]any); ok { - if td, ok := nv["token_data"]; ok { - if arr := toInt64Slice(td); len(arr) > 0 { - gotTD = arr - } - } - } - } - } - return false, nil - } - - for { - line, err := reader.ReadString('\n') - rawBuf.WriteString(line) - if err != nil { - if err == io.EOF { - _, _ = flushEvent() - logger.V(logutil.DEFAULT).Info("SSE raw stream (EOF)", "raw", rawBuf.String()) - if gotWID != "" && len(gotTD) == 0 { - logger.V(logutil.DEFAULT).Info("EOF: worker_instance_id present, token_data missing") - } - if gotWID != "" || len(gotTD) > 0 { - return gotWID, gotTD, nil - } - logger.V(logutil.DEFAULT).Error(nil, "EOF before selection fields present") - return "", nil, fmt.Errorf("selection not found in SSE stream (EOF)") - } - logger.V(logutil.DEFAULT).Error(err, "SSE read error") - return "", nil, fmt.Errorf("sse read error: %w", err) - } - - l := strings.TrimRight(line, "\r\n") - if l == "" { - if done, _ := flushEvent(); done { - if gotWID != "" && len(gotTD) == 0 { - logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") - } - return gotWID, gotTD, nil - } - eventName = "" - continue - } - - if strings.HasPrefix(l, ":") { - commentLine := strings.TrimSpace(l[1:]) - if commentBuf.Len() > 0 { - commentBuf.WriteByte('\n') - } - commentBuf.WriteString(commentLine) - continue - } - - if idx := strings.IndexByte(l, ':'); idx != -1 { - field := l[:idx] - val := strings.TrimSpace(l[idx+1:]) - switch field { - case "event": - eventName = val - case "data": - if dataBuf.Len() > 0 { - dataBuf.WriteByte('\n') - } - dataBuf.WriteString(val) - default: - } - } - } -} - -func encodeTokenData(tokens []int64) string { - b, _ := json.Marshal(tokens) - return base64.StdEncoding.EncodeToString(b) -} - -func toInt64Slice(v any) []int64 { - xs, ok := v.([]any) - if !ok { - return nil - } - out := make([]int64, 0, len(xs)) - for _, it := range xs { - switch n := it.(type) { - case float64: - out = append(out, int64(n)) - case int64: - out = append(out, n) - case json.Number: - if i, err := n.Int64(); err == nil { - out = append(out, i) - } - } - } - return out -} - -func toInt64SliceJSON(s string) []int64 { - var arr []int64 - if err := json.Unmarshal([]byte(s), &arr); err == nil && len(arr) > 0 { - return arr - } - var inner string - if err := json.Unmarshal([]byte(s), &inner); err == nil && inner != "" { - var arr2 []int64 - if err := json.Unmarshal([]byte(inner), &arr2); err == nil && len(arr2) > 0 { - return arr2 - } - } - unquoted := strings.Trim(s, "\"") - if unquoted != s { - var arr3 []int64 - if err := json.Unmarshal([]byte(unquoted), &arr3); err == nil && len(arr3) > 0 { - return arr3 - } - } - return nil -} - // --------------------------- FFI integration --------------------------- var ( @@ -523,14 +236,14 @@ func initFFI() error { defer C.free(unsafe.Pointer(cm)) defer C.free(unsafe.Pointer(model)) - // 1) runtime + // Init Dynamo runtime if rc := C.dynamo_llm_init(ns, cm, C.int64_t(ffiWorkerID), C.uint32_t(ffiKvBlockSize)); rc != C.DYNAMO_OK { ffiErr = fmt.Errorf("dynamo_llm_init failed") return } runtimeInitialized = true - // 2) create persistent pipeline + // Create persistent pipeline pipelineMutex.Lock() defer pipelineMutex.Unlock() @@ -556,6 +269,11 @@ func initFFI() error { // --------------------------- scoring --------------------------- +func encodeTokenData(tokens []int64) string { + b, _ := json.Marshal(tokens) + return base64.StdEncoding.EncodeToString(b) +} + func (k *KVAwareScorer) Score( ctx context.Context, cycle *schedtypes.CycleState, From a5b1af3b4b3e9ec86874bb2698e1736585dac38e Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Mon, 29 Sep 2025 14:22:45 -0700 Subject: [PATCH 09/15] cleanup Signed-off-by: Anna Tchernych --- DYNAMO_KV_SCORER_README.md | 180 ------------------ .../plugins/dynamo_kv_scorer/plugin.go | 4 +- 2 files changed, 2 insertions(+), 182 deletions(-) delete mode 100644 DYNAMO_KV_SCORER_README.md diff --git a/DYNAMO_KV_SCORER_README.md b/DYNAMO_KV_SCORER_README.md deleted file mode 100644 index 596fc638fc..0000000000 --- a/DYNAMO_KV_SCORER_README.md +++ /dev/null @@ -1,180 +0,0 @@ -# Dynamo KV Scorer Plugin - Configuration Guide - -## 🚨 BREAKING CHANGE: Mandatory Environment Variables - -As of this version, `DYNAMO_KV_BLOCK_SIZE` is **MANDATORY** to prevent silent KV routing failures. - -## 📋 Required Configuration - -### **DYNAMO_KV_BLOCK_SIZE** (MANDATORY) - -**This environment variable is now REQUIRED and must exactly match your model card's `kv_cache_block_size`.** - -```bash -# Example: If your model card specifies kv_cache_block_size: 512 -export DYNAMO_KV_BLOCK_SIZE=512 -``` - -#### Validation Rules - -- ✅ **Must be set** (no default value) -- ✅ **Must be integer** between 16 and 8192 -- ✅ **Must be power of 2** (16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192) -- ✅ **Must exactly match** model card's `kv_cache_block_size` - -#### Common Values - -- **256** - Small models, memory-constrained environments -- **512** - Most common, good balance of performance and memory -- **1024** - Large models, high-performance deployments -- **2048** - Very large models with high memory availability - -## 🔍 How to Find Your Model's Block Size - -### Method 1: Check Model Card - -```bash -# Look in your model deployment configuration -cat model-deployment.yaml | grep kv_cache_block_size -# Output: kv_cache_block_size: 512 -``` - -### Method 2: Check Running Workers - -```bash -# Query Dynamo worker configuration -kubectl get configmap dynamo-worker-config -o yaml | grep kv_cache_block_size -``` - -### Method 3: Check Model Documentation - -Most model cards specify the recommended block size in their configuration files or documentation. - -## 🚀 Example Deployment - -### Kubernetes Deployment - -```yaml -apiVersion: apps/v1 -kind: Deployment -metadata: - name: epp-with-dynamo-kv-scorer -spec: - template: - spec: - containers: - - name: epp - env: - - name: DYNAMO_KV_BLOCK_SIZE - value: "512" # MUST match your model card! - - name: DYNAMO_NAMESPACE - value: "production" - - name: DYNAMO_COMPONENT - value: "backend" - - name: DYNAMO_MODEL - value: "Qwen/Qwen3-0.6B" -``` - -### Docker Compose - -```yaml -services: - epp: - environment: - - DYNAMO_KV_BLOCK_SIZE=512 # MUST match your model card! - - DYNAMO_NAMESPACE=production - - DYNAMO_COMPONENT=backend - - DYNAMO_MODEL=Qwen/Qwen3-0.6B -``` - -### Shell Script - -```bash -#!/bin/bash -# deployment.sh - -# Get block size from your model configuration -MODEL_KV_BLOCK_SIZE=$(kubectl get configmap model-config -o jsonpath='{.data.kv_cache_block_size}') - -# Deploy EPP with matching block size -export DYNAMO_KV_BLOCK_SIZE=${MODEL_KV_BLOCK_SIZE} -export DYNAMO_NAMESPACE=production -export DYNAMO_COMPONENT=backend -export DYNAMO_MODEL=Qwen/Qwen3-0.6B - -./start-epp -``` - -## ⚠️ Error Messages - -### Missing Environment Variable - -``` -DYNAMO_KV_BLOCK_SIZE environment variable is required but not set. -This must match your model card's kv_cache_block_size exactly. -Common values: 256, 512, 1024 -``` - -### Invalid Value - -``` -DYNAMO_KV_BLOCK_SIZE='abc' is not a valid integer -``` - -### Out of Range - -``` -DYNAMO_KV_BLOCK_SIZE=99999 is outside reasonable range [16, 8192] -``` - -### Not Power of 2 - -``` -DYNAMO_KV_BLOCK_SIZE=100 should be a power of 2 (16, 32, 64, 128, 256, 512, 1024, etc.) -``` - -## 🔧 Migration Guide - -### Before (with silent failures) - -```bash -# This would silently fail if model card had different block size -export DYNAMO_KV_BLOCK_SIZE=512 # might not match model! -``` - -### After (explicit configuration) - -```bash -# Step 1: Check your model's actual block size -MODEL_BLOCK_SIZE=$(get-model-block-size) # Your method here - -# Step 2: Set environment variable to match exactly -export DYNAMO_KV_BLOCK_SIZE=${MODEL_BLOCK_SIZE} - -# Step 3: EPP will fail fast if there's a mismatch -``` - -## 📊 Impact - -- ✅ **Prevents silent failures** - KV routing either works correctly or fails fast -- ✅ **Explicit configuration** - Forces users to know their model's requirements -- ✅ **Better debugging** - Clear error messages for misconfigurations -- ✅ **Performance assurance** - Guarantees KV-aware routing works when configured - -## 🆘 Troubleshooting - -### Q: How do I know if my block size is correct? - -A: The plugin will log on startup: `Dynamo KV Scorer: Loaded mandatory DYNAMO_KV_BLOCK_SIZE=512` - -### Q: What happens if I set the wrong block size? - -A: KV-aware routing will fail completely, falling back to round-robin scheduling. This will degrade performance but not break functionality. - -### Q: Can I still use defaults? - -A: No. This is now mandatory to prevent silent misconfigurations that are hard to debug. - -### Q: Why is this mandatory now? - -A: Previously, wrong block sizes caused silent KV routing failures that were very difficult to diagnose. Making it mandatory forces explicit, correct configuration. diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index aced5fcc11..fe0c7b9954 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -132,7 +132,7 @@ func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (p warmupErr = initFFI() }) if warmupErr != nil { - return nil, fmt.Errorf("!!! Dynamo FFI init for the Router failed: %w", warmupErr) + return nil, fmt.Errorf("Dynamo FFI init for the Router failed: %w", warmupErr) } return s, nil @@ -362,7 +362,7 @@ func (k *KVAwareScorer) callDynamoRouter( &cAnnotatedJSON, ) if rc != C.DYNAMO_OK { - return "", nil, fmt.Errorf("!!! dynamo_query_worker_selection_and_annotate failed") + return "", nil, fmt.Errorf("dynamo_query_worker_selection_and_annotate failed") } // Copy tokens into Go memory and free C memory From 7a674c8df7e60ea71e4c7245ca04ffea2cc31768 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Tue, 30 Sep 2025 10:44:15 -0700 Subject: [PATCH 10/15] add env vars Signed-off-by: Anna Tchernych --- pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index fe0c7b9954..1f6a41f1e7 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -251,10 +251,10 @@ func initFFI() error { ns, cm, model, - C.bool(true), // use_kv_routing - C.double(-1.0), // busy_threshold (default) - C.double(ffiOverlapScoreWeight), // overlap_score_weight (neg = default) - C.double(ffiRouterTemperature), // router_temperature (neg = default) + C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_ROUTING", true)), + C.double(getEnvFloatOrDefault("DYNAMO_BUSY_THRESHOLD", -1.0)), + C.double(ffiOverlapScoreWeight), + C.double(ffiRouterTemperature), C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)), C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", false)), &pipeline, From f5ad6affbde915053ad911dfb6a2b7d9a9a575a9 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 10 Oct 2025 11:59:57 -0700 Subject: [PATCH 11/15] rm Dynamo.epp image build Signed-off-by: Anna Tchernych --- Dockerfile.dynamo | 66 ----------------------------------------------- 1 file changed, 66 deletions(-) delete mode 100644 Dockerfile.dynamo diff --git a/Dockerfile.dynamo b/Dockerfile.dynamo deleted file mode 100644 index 3f0e0a07c5..0000000000 --- a/Dockerfile.dynamo +++ /dev/null @@ -1,66 +0,0 @@ -# Dockerfile.dynamo - Custom Dockerfile for Dynamo FFI plugin -ARG BUILDER_IMAGE=golang:1.24 -ARG BASE_IMAGE=ubuntu:22.04 - -############################ -# Builder -############################ -FROM ${BUILDER_IMAGE} AS builder - -ENV CGO_ENABLED=1 -ENV GOOS=linux -ENV GOARCH=amd64 -# be explicit; helps cgo when linking libstdc++ -ENV CC=gcc -ENV CXX=g++ - -# C/C++ toolchain for cgo, and libstdc++ for link-time -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - gcc g++ \ - libc6-dev \ - ca-certificates \ - && rm -rf /var/lib/apt/lists/* - -ARG COMMIT_SHA=unknown -ARG BUILD_REF - -WORKDIR /src - -# deps first (cache) -COPY go.mod go.sum ./ -RUN go mod download - -# source -COPY cmd/epp ./cmd/epp -COPY pkg/epp ./pkg/epp -COPY internal ./internal -COPY api ./api - -# sanity (optional) -RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/include/ || echo "Headers not found" -RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/lib/ || echo "Library not found" - -# build -WORKDIR /src/cmd/epp -RUN go build \ - -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" \ - -o /epp - -############################ -# Runtime -############################ -FROM ${BASE_IMAGE} AS runtime - -# Minimal runtime deps; include libstdc++ runtime for -lstdc++ -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - libstdc++6 \ - && rm -rf /var/lib/apt/lists/* \ - && groupadd -r nonroot && useradd -r -g nonroot nonroot - -WORKDIR / -COPY --from=builder /epp /epp - -USER nonroot:nonroot -ENTRYPOINT ["/epp"] From e34979c99fc9a7323ec6c6b5b8ec46de2949ccf2 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 15 Oct 2025 12:50:18 -0700 Subject: [PATCH 12/15] router_sync=True by default Signed-off-by: Anna Tchernych --- pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 1f6a41f1e7..83a4ace264 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -256,7 +256,7 @@ func initFFI() error { C.double(ffiOverlapScoreWeight), C.double(ffiRouterTemperature), C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)), - C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", false)), + C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", true)), &pipeline, ) if rc != C.DYNAMO_OK { From 689fed38e74c35a1583a5754bfdea643e3770621 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 29 Oct 2025 13:31:03 -0700 Subject: [PATCH 13/15] add print to pkg/epp/requestcontrol/director.go Signed-off-by: Anna Tchernych --- pkg/epp/requestcontrol/director.go | 73 +++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 670d9222af..7e39bd40e1 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -193,41 +193,100 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2 func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + // DEBUG: Print all pods in datastore at the start + allPodsInDatastore := d.datastore.PodGetAll() + fmt.Printf("\n========== DEBUG: getCandidatePodsForScheduling START ==========\n") + fmt.Printf("Total pods in datastore: %d\n", len(allPodsInDatastore)) + for i, podMetrics := range allPodsInDatastore { + pod := podMetrics.GetPod() + metrics := podMetrics.GetMetrics() + fmt.Printf("Pod #%d:\n", i+1) + fmt.Printf(" Name: %s\n", pod.NamespacedName.Name) + fmt.Printf(" Namespace: %s\n", pod.NamespacedName.Namespace) + fmt.Printf(" Address (IP): %s\n", pod.Address) + fmt.Printf(" Metrics:\n") + fmt.Printf(" WaitingQueueSize: %d\n", metrics.WaitingQueueSize) + fmt.Printf(" RunningQueueSize: %d\n", metrics.RunningQueueSize) + fmt.Printf(" KVCacheUsagePercent: %.2f%%\n", metrics.KVCacheUsagePercent*100) + fmt.Printf(" KvCacheMaxTokenCapacity: %d\n", metrics.KvCacheMaxTokenCapacity) + fmt.Printf(" ActiveModels: %v\n", metrics.ActiveModels) + fmt.Printf(" WaitingModels: %v\n", metrics.WaitingModels) + fmt.Printf(" MaxActiveModels: %d\n", metrics.MaxActiveModels) + fmt.Printf(" UpdateTime: %v\n", metrics.UpdateTime) + } + fmt.Printf("========== END: All Pods in Datastore ==========\n\n") + subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any) + fmt.Printf("DEBUG: Checking for subset metadata in namespace '%s'\n", subsetHintNamespace) + fmt.Printf("DEBUG: Subset metadata found: %v\n", found) + if !found { - return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) + fmt.Printf("DEBUG: No subset metadata found, returning ALL pods from datastore\n") + fmt.Printf("DEBUG: About to return %d pods (all pods)\n", len(allPodsInDatastore)) + result := d.toSchedulerPodMetrics(allPodsInDatastore) + fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) + fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (no subset) ==========\n\n") + return result } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[subsetHintKey].([]any) + fmt.Printf("DEBUG: Checking for endpoint subset key '%s' in subset map\n", subsetHintKey) + fmt.Printf("DEBUG: Endpoint subset key found: %v\n", found) + if !found { - return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) + fmt.Printf("DEBUG: Endpoint subset key not found, returning ALL pods from datastore\n") + fmt.Printf("DEBUG: About to return %d pods (all pods)\n", len(allPodsInDatastore)) + result := d.toSchedulerPodMetrics(allPodsInDatastore) + fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) + fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (no subset key) ==========\n\n") + return result } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") + fmt.Printf("DEBUG: Empty endpoint subset list, returning 0 pods\n") + fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (empty subset) ==========\n\n") return []schedulingtypes.Pod{} } // Create a map of endpoint addresses for easy lookup endpoints := make(map[string]bool) - for _, endpoint := range endpointSubsetList { + fmt.Printf("DEBUG: Endpoint subset list contains %d entries:\n", len(endpointSubsetList)) + for i, endpoint := range endpointSubsetList { // Extract address from endpoint // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080") epStr := strings.Split(endpoint.(string), ":")[0] endpoints[epStr] = true + fmt.Printf(" Endpoint #%d: %s -> IP filter: %s\n", i+1, endpoint, epStr) } podTotalCount := 0 + fmt.Printf("DEBUG: Starting pod filtering against %d endpoint IPs\n", len(endpoints)) podFitleredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { podTotalCount++ - if _, found := endpoints[pm.GetPod().Address]; found { - return true + podIP := pm.GetPod().Address + matched := false + if _, found := endpoints[podIP]; found { + matched = true } - return false + fmt.Printf(" Checking pod #%d (Name: %s, IP: %s) -> Match: %v\n", + podTotalCount, pm.GetPod().NamespacedName.Name, podIP, matched) + return matched }) + fmt.Printf("DEBUG: Filtering complete. Checked %d pods, %d matched\n", podTotalCount, len(podFitleredList)) loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList)) - return d.toSchedulerPodMetrics(podFitleredList) + fmt.Printf("DEBUG: Filtered pods details:\n") + for i, podMetrics := range podFitleredList { + pod := podMetrics.GetPod() + fmt.Printf(" Filtered Pod #%d: Name=%s, IP=%s\n", i+1, pod.NamespacedName.Name, pod.Address) + } + + result := d.toSchedulerPodMetrics(podFitleredList) + fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) + fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (filtered) ==========\n\n") + + return result } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins From b1e4d91d6724b7faa4931621dd7fecd517a41cea Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 14 Nov 2025 17:18:02 -0800 Subject: [PATCH 14/15] upd ubuntu version Signed-off-by: Anna Tchernych --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 4679ce2e0f..d3f9ec74ac 100644 --- a/Makefile +++ b/Makefile @@ -192,7 +192,7 @@ dynamo-image-local-load: dynamo-image-local-build dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support. $(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \ --platform=$(PLATFORMS) \ - --build-arg BASE_IMAGE=ubuntu:22.04 \ + --build-arg BASE_IMAGE=ubuntu:24.04 \ --build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \ --build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \ --build-arg BUILD_REF=${BUILD_REF} \ From f2fa5789068af42e651c1c5e2969c71029d9014e Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 14 Nov 2025 17:26:01 -0800 Subject: [PATCH 15/15] restore director.go Signed-off-by: Anna Tchernych --- pkg/epp/requestcontrol/director.go | 73 +++--------------------------- 1 file changed, 7 insertions(+), 66 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 7e39bd40e1..670d9222af 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -193,100 +193,41 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2 func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - // DEBUG: Print all pods in datastore at the start - allPodsInDatastore := d.datastore.PodGetAll() - fmt.Printf("\n========== DEBUG: getCandidatePodsForScheduling START ==========\n") - fmt.Printf("Total pods in datastore: %d\n", len(allPodsInDatastore)) - for i, podMetrics := range allPodsInDatastore { - pod := podMetrics.GetPod() - metrics := podMetrics.GetMetrics() - fmt.Printf("Pod #%d:\n", i+1) - fmt.Printf(" Name: %s\n", pod.NamespacedName.Name) - fmt.Printf(" Namespace: %s\n", pod.NamespacedName.Namespace) - fmt.Printf(" Address (IP): %s\n", pod.Address) - fmt.Printf(" Metrics:\n") - fmt.Printf(" WaitingQueueSize: %d\n", metrics.WaitingQueueSize) - fmt.Printf(" RunningQueueSize: %d\n", metrics.RunningQueueSize) - fmt.Printf(" KVCacheUsagePercent: %.2f%%\n", metrics.KVCacheUsagePercent*100) - fmt.Printf(" KvCacheMaxTokenCapacity: %d\n", metrics.KvCacheMaxTokenCapacity) - fmt.Printf(" ActiveModels: %v\n", metrics.ActiveModels) - fmt.Printf(" WaitingModels: %v\n", metrics.WaitingModels) - fmt.Printf(" MaxActiveModels: %d\n", metrics.MaxActiveModels) - fmt.Printf(" UpdateTime: %v\n", metrics.UpdateTime) - } - fmt.Printf("========== END: All Pods in Datastore ==========\n\n") - subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any) - fmt.Printf("DEBUG: Checking for subset metadata in namespace '%s'\n", subsetHintNamespace) - fmt.Printf("DEBUG: Subset metadata found: %v\n", found) - if !found { - fmt.Printf("DEBUG: No subset metadata found, returning ALL pods from datastore\n") - fmt.Printf("DEBUG: About to return %d pods (all pods)\n", len(allPodsInDatastore)) - result := d.toSchedulerPodMetrics(allPodsInDatastore) - fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) - fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (no subset) ==========\n\n") - return result + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } // Check if endpoint key is present in the subset map and ensure there is at least one value endpointSubsetList, found := subsetMap[subsetHintKey].([]any) - fmt.Printf("DEBUG: Checking for endpoint subset key '%s' in subset map\n", subsetHintKey) - fmt.Printf("DEBUG: Endpoint subset key found: %v\n", found) - if !found { - fmt.Printf("DEBUG: Endpoint subset key not found, returning ALL pods from datastore\n") - fmt.Printf("DEBUG: About to return %d pods (all pods)\n", len(allPodsInDatastore)) - result := d.toSchedulerPodMetrics(allPodsInDatastore) - fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) - fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (no subset key) ==========\n\n") - return result + return d.toSchedulerPodMetrics(d.datastore.PodGetAll()) } else if len(endpointSubsetList) == 0 { loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") - fmt.Printf("DEBUG: Empty endpoint subset list, returning 0 pods\n") - fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (empty subset) ==========\n\n") return []schedulingtypes.Pod{} } // Create a map of endpoint addresses for easy lookup endpoints := make(map[string]bool) - fmt.Printf("DEBUG: Endpoint subset list contains %d entries:\n", len(endpointSubsetList)) - for i, endpoint := range endpointSubsetList { + for _, endpoint := range endpointSubsetList { // Extract address from endpoint // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080") epStr := strings.Split(endpoint.(string), ":")[0] endpoints[epStr] = true - fmt.Printf(" Endpoint #%d: %s -> IP filter: %s\n", i+1, endpoint, epStr) } podTotalCount := 0 - fmt.Printf("DEBUG: Starting pod filtering against %d endpoint IPs\n", len(endpoints)) podFitleredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { podTotalCount++ - podIP := pm.GetPod().Address - matched := false - if _, found := endpoints[podIP]; found { - matched = true + if _, found := endpoints[pm.GetPod().Address]; found { + return true } - fmt.Printf(" Checking pod #%d (Name: %s, IP: %s) -> Match: %v\n", - podTotalCount, pm.GetPod().NamespacedName.Name, podIP, matched) - return matched + return false }) - fmt.Printf("DEBUG: Filtering complete. Checked %d pods, %d matched\n", podTotalCount, len(podFitleredList)) loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList)) - fmt.Printf("DEBUG: Filtered pods details:\n") - for i, podMetrics := range podFitleredList { - pod := podMetrics.GetPod() - fmt.Printf(" Filtered Pod #%d: Name=%s, IP=%s\n", i+1, pod.NamespacedName.Name, pod.Address) - } - - result := d.toSchedulerPodMetrics(podFitleredList) - fmt.Printf("DEBUG: Converted to scheduler pod metrics, returning %d pods\n", len(result)) - fmt.Printf("========== DEBUG: getCandidatePodsForScheduling END (filtered) ==========\n\n") - - return result + return d.toSchedulerPodMetrics(podFitleredList) } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins