From 084f24be5bca634a0fe5c92f93fbf5f5b04e616a Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:07:34 -0700 Subject: [PATCH 1/5] Call FrontEnd for the worker id --- cmd/epp/main.go | 12 +- epp-config-dynamo.yaml | 23 ++ pkg/bbr/handlers/request.go | 125 +++++-- pkg/bbr/handlers/server.go | 3 +- .../plugins/dynamo_inject_workerid/plugin.go | 62 ++++ .../plugins/dynamo_kv_scorer/plugin.go | 319 ++++++++++++++++++ 6 files changed, 516 insertions(+), 28 deletions(-) create mode 100644 epp-config-dynamo.yaml create mode 100644 pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go create mode 100644 pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go diff --git a/cmd/epp/main.go b/cmd/epp/main.go index b5e06177bc..35e97d2799 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -22,13 +22,19 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner" + eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + + // Dynamo plugins + dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid" + dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer" ) func main() { - // Register all known plugin factories + // Register built-in plugins. runner.RegisterAllPlugins() - // For adding out-of-tree plugins to the plugins registry, use the following: - // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) + + eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) + eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory) if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil { os.Exit(1) diff --git a/epp-config-dynamo.yaml b/epp-config-dynamo.yaml new file mode 100644 index 0000000000..959c2ea497 --- /dev/null +++ b/epp-config-dynamo.yaml @@ -0,0 +1,23 @@ +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: + # Required: tells EPP which profile to use (even if you only have one) + - type: single-profile-handler + + # Picker: chooses the final endpoint after scoring + - name: picker + type: max-score-picker + - name: dyn-pre + type: dynamo-inject-workerid + parameters: {} + - name: dyn-kv + type: kv-aware-scorer + parameters: + frontendURL: http://127.0.0.1:8000/v1/chat/completions + timeoutMS: 10000 +schedulingProfiles: + - name: default + plugins: + - pluginRef: dyn-kv + weight: 1 + - pluginRef: picker diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 32fffc0217..b9cd35fce3 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,25 +1,10 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package handlers import ( "context" "encoding/json" "fmt" + "strings" // <— needed to normalize header keys/trim basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -29,13 +14,31 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -const modelHeader = "X-Gateway-Model-Name" +const ( + modelHeader = "X-Gateway-Model-Name" + workerIDHeader = "x-worker-instance-id" + injectHintHeader = "x-epp-inject-nvext-worker-instance-id" +) // HandleRequestBody handles request bodies. func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) { logger := log.FromContext(ctx) var ret []*eppb.ProcessingResponse + // If we captured a worker id hint in the headers phase, inject it into body JSON: + // nvext.backend_instance_id = + if wid := strings.TrimSpace(s.workerIDHint); wid != "" { + // ensure nvext is a map[string]any + if nv, ok := data["nvext"]; !ok || nv == nil { + data["nvext"] = map[string]any{"backend_instance_id": wid} + } else if m, ok := nv.(map[string]any); ok { + m["backend_instance_id"] = wid + } else { + // if nvext was some other type, replace with a clean map + data["nvext"] = map[string]any{"backend_instance_id": wid} + } + } + requestBodyBytes, err := json.Marshal(data) if err != nil { return nil, err @@ -45,7 +48,9 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] if !ok { metrics.RecordModelNotInBodyCounter() logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter") + if s.streaming { + // still stream the possibly mutated body ret = append(ret, &eppb.ProcessingResponse{ Response: &eppb.ProcessingResponse_RequestHeaders{ RequestHeaders: &eppb.HeadersResponse{}, @@ -53,14 +58,24 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] }) ret = addStreamedBodyResponse(ret, requestBodyBytes) return ret, nil - } else { - ret = append(ret, &eppb.ProcessingResponse{ + } + + // non-streaming: return a body response with the (possibly) mutated body + return []*eppb.ProcessingResponse{ + { Response: &eppb.ProcessingResponse_RequestBody{ - RequestBody: &eppb.BodyResponse{}, + RequestBody: &eppb.BodyResponse{ + Response: &eppb.CommonResponse{ + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_Body{ + Body: requestBodyBytes, + }, + }, + }, + }, }, - }) - } - return ret, nil + }, + }, nil } modelStr, ok := modelVal.(string) @@ -73,6 +88,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] metrics.RecordSuccessCounter() if s.streaming { + // set the model header, then stream the (possibly) mutated body ret = append(ret, &eppb.ProcessingResponse{ Response: &eppb.ProcessingResponse_RequestHeaders{ RequestHeaders: &eppb.HeadersResponse{ @@ -86,22 +102,47 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] RawValue: []byte(modelStr), }, }, + // also keep the worker id header if we have one + func() *basepb.HeaderValueOption { + if strings.TrimSpace(s.workerIDHint) == "" { + return nil + } + return &basepb.HeaderValueOption{ + Header: &basepb.HeaderValue{ + Key: workerIDHeader, + RawValue: []byte(s.workerIDHint), + }, + } + }(), }, }, }, }, }, }) + + // prune nil entries if worker id not present + hm := ret[len(ret)-1].GetRequestHeaders().GetResponse().GetHeaderMutation() + if hm != nil && hm.SetHeaders != nil { + out := hm.SetHeaders[:0] + for _, h := range hm.SetHeaders { + if h != nil { + out = append(out, h) + } + } + hm.SetHeaders = out + } + ret = addStreamedBodyResponse(ret, requestBodyBytes) return ret, nil } + // Non-streaming: set model header and replace the body with our mutated JSON return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_RequestBody{ RequestBody: &eppb.BodyResponse{ Response: &eppb.CommonResponse{ - // Necessary so that the new headers are used in the routing decision. ClearRouteCache: true, HeaderMutation: &eppb.HeaderMutation{ SetHeaders: []*basepb.HeaderValueOption{ @@ -111,6 +152,22 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] RawValue: []byte(modelStr), }, }, + func() *basepb.HeaderValueOption { + if strings.TrimSpace(s.workerIDHint) == "" { + return nil + } + return &basepb.HeaderValueOption{ + Header: &basepb.HeaderValue{ + Key: workerIDHeader, + RawValue: []byte(s.workerIDHint), + }, + } + }(), + }, + }, + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_Body{ + Body: requestBodyBytes, }, }, }, @@ -141,6 +198,26 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy // HandleRequestHeaders handles request headers. func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) { + // Look for our hint header and/or direct worker id header. + s.workerIDHint = "" // reset per request + if m := headers.GetHeaders(); m != nil { + for _, h := range m.GetHeaders() { + k := strings.ToLower(h.GetKey()) + if k == injectHintHeader || k == workerIDHeader { + if rv := h.GetRawValue(); len(rv) > 0 { + s.workerIDHint = strings.TrimSpace(string(rv)) + } else { + s.workerIDHint = strings.TrimSpace(h.GetValue()) + } + // prefer the inject hint if both are present; break on the hint + if k == injectHintHeader { + break + } + } + } + } + + // No header mutations needed here; body phase will do the JSON injection. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/bbr/handlers/server.go b/pkg/bbr/handlers/server.go index a5803806bc..e8c109282c 100644 --- a/pkg/bbr/handlers/server.go +++ b/pkg/bbr/handlers/server.go @@ -38,7 +38,8 @@ func NewServer(streaming bool) *Server { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type Server struct { - streaming bool + streaming bool + workerIDHint string } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { diff --git a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go new file mode 100644 index 0000000000..5c7d4438c3 --- /dev/null +++ b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go @@ -0,0 +1,62 @@ +package dynamo_inject_workerid + +import ( + "context" + "encoding/json" + "strings" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + typeString = "dynamo-inject-workerid" + pluginName = "dynamo-inject-workerid" + WorkerIDHeader = "x-worker-instance-id" + injectHintHeader = "x-epp-inject-nvext-worker-instance-id" +) + +var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil) +var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil) + +type InjectWorkerIDPreRequest struct { + typedName plugins.TypedName +} + +func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest { + return &InjectWorkerIDPreRequest{ + typedName: plugins.TypedName{Type: typeString, Name: pluginName}, + } +} + +func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest { + p.typedName.Name = name + return p +} + +func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewInjectWorkerIDPreRequest().WithName(name), nil +} + +func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName } + +func (p *InjectWorkerIDPreRequest) PreRequest( + _ context.Context, + req *schedtypes.LLMRequest, + _ *schedtypes.SchedulingResult, + _ int, +) { + if req == nil { + return + } + if req.Headers == nil { + req.Headers = map[string]string{} + } + wid := strings.TrimSpace(req.Headers[WorkerIDHeader]) + if wid == "" { + return + } + req.Headers[WorkerIDHeader] = wid + req.Headers[injectHintHeader] = wid +} diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go new file mode 100644 index 0000000000..9ad723978e --- /dev/null +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -0,0 +1,319 @@ +package dynamo_kv_scorer + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + log "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + PluginName = "dynamo-kv-scorer" + KVAwareScorerType = "kv-aware-scorer" + StateKeyWorkerInstanceID = schedtypes.StateKey("dynamo/worker-instance-id") + WorkerIDHeader = "x-worker-instance-id" +) + +type params struct { + FrontendURL string `json:"frontendURL"` + TimeoutMS int `json:"timeoutMS"` +} + +// tiny wrapper so we can store a string in CycleState +type stateString string + +func (s stateString) Clone() schedtypes.StateData { return s } + +type KVAwareScorer struct { + typedName plugins.TypedName + feURL string + feTimeout time.Duration +} + +// compile-time assertions +var _ plugins.Plugin = (*KVAwareScorer)(nil) +var _ framework.Scorer = (*KVAwareScorer)(nil) + +func NewKVAwareScorer() *KVAwareScorer { + return &KVAwareScorer{ + typedName: plugins.TypedName{Type: KVAwareScorerType, Name: PluginName}, + feURL: "http://127.0.0.1:8000/v1/chat/completions", + feTimeout: 10 * time.Second, + } +} + +func (k *KVAwareScorer) WithName(name string) *KVAwareScorer { k.typedName.Name = name; return k } +func (k *KVAwareScorer) WithFrontend(url string, timeout time.Duration) *KVAwareScorer { + if url != "" { + k.feURL = url + } + if timeout > 0 { + k.feTimeout = timeout + } + return k +} + +func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + p := params{} + _ = json.Unmarshal(raw, &p) + timeout := time.Duration(p.TimeoutMS) * time.Millisecond + if timeout <= 0 { + timeout = 10 * time.Second + } + return NewKVAwareScorer().WithName(name).WithFrontend(p.FrontendURL, timeout), nil +} + +func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName } + +func (k *KVAwareScorer) Score( + ctx context.Context, + cycle *schedtypes.CycleState, + req *schedtypes.LLMRequest, + pods []schedtypes.Pod, +) map[schedtypes.Pod]float64 { + logger := log.FromContext(ctx) + + workerID, err := k.callFrontEndForWorker(ctx, req) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "FrontEnd call failed; proceeding without worker id") + } else if workerID != "" { + cycle.Write(StateKeyWorkerInstanceID, stateString(workerID)) + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[WorkerIDHeader] = workerID + } + + // neutral/uniform scores – only your scorer runs in the profile, so this “wins” + out := make(map[schedtypes.Pod]float64, len(pods)) + for _, p := range pods { + out[p] = 1.0 + } + return out +} + +// Call the Dynamo FrontEnd and extract worker_instance_id via SSE. +func (k *KVAwareScorer) callFrontEndForWorker( + ctx context.Context, + req *schedtypes.LLMRequest, +) (string, error) { + logger := log.FromContext(ctx) + + feBody := buildFrontEndBodyFromLLMRequest(req) + payload, err := json.Marshal(feBody) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd marshal failed") + return "", fmt.Errorf("marshal FrontEnd body: %w", err) + } + + reqCtx, cancel := context.WithTimeout(ctx, k.feTimeout) + defer cancel() + + httpReq, err := http.NewRequestWithContext(reqCtx, http.MethodPost, k.feURL, bytes.NewReader(payload)) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd request build failed") + return "", fmt.Errorf("build FrontEnd request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Accept", "text/event-stream") + + client := &http.Client{Timeout: 0} + resp, err := client.Do(httpReq) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd POST failed") + return "", fmt.Errorf("FrontEnd POST failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + errBody, _ := io.ReadAll(resp.Body) + logger.V(logutil.DEFAULT).Error(nil, "Dynamo FrontEnd non-2xx response", + "status_code", resp.StatusCode, "response_body", string(errBody)) + return "", fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) + } + + ct := strings.ToLower(resp.Header.Get("Content-Type")) + if !strings.Contains(ct, "text/event-stream") { + logger.V(logutil.DEFAULT).Error(nil, "Unexpected non-SSE response") + return "", fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) + } + + // Parse SSE: expect `event: worker_instance_id`, a quoted id in a comment or data, and `data: [DONE]` + reader := bufio.NewReader(resp.Body) + workerID, perr := parseWorkerIDFromSSE(ctx, reader) + if perr != nil { + return "", perr + } + return workerID, nil +} + +// Build the exact body we send to the FrontEnd, only from LLMRequest (no header merging). +func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any { + feBody := make(map[string]any, 8) + + // We call /v1/chat/completions so must provide messages + userText := "" + if req != nil && strings.TrimSpace(req.Prompt) != "" { + userText = req.Prompt + } + feBody["messages"] = []map[string]any{ + {"role": "user", "content": userText}, + } + + if req != nil && strings.TrimSpace(req.TargetModel) != "" { + feBody["model"] = req.TargetModel + } + + // Force SSE so we can parse worker_instance_id + feBody["stream"] = true + + feBody["max_tokens"] = 1 + feBody["temperature"] = 0.0 + + // Ask the Dynamo to include worker id + feBody["nvext"] = map[string]any{ + "annotations": []string{"query_instance_id"}, + } + + return feBody +} + +// parseWorkerIDFromSSE scans an SSE stream for a worker_instance_id. +// Expected pattern: +// +// event: worker_instance_id +// : "8303679623149182543" +// data: [DONE] +// +// Also supports JSON in data lines with either top-level worker_instance_id +// or annotations.worker_instance_id. +func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, error) { + logger := log.FromContext(ctx) + + var ( + eventName string + dataBuf strings.Builder // accumulates "data:" lines for one event + commentBuf strings.Builder // accumulates ":" comment lines + ) + + flushEvent := func() (string, bool, error) { + data := strings.TrimSpace(dataBuf.String()) + comment := strings.TrimSpace(commentBuf.String()) + dataBuf.Reset() + commentBuf.Reset() + + // [DONE] ends the stream + if data == "[DONE]" || comment == "[DONE]" { + logger.V(logutil.DEFAULT).Info("SSE stream DONE") + return "", true, nil + } + + // Prefer the named event + if eventName == "worker_instance_id" { + candidate := data + if candidate == "" { + candidate = comment + } + if candidate != "" { + // Try JSON string + var s string + if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { + logger.V(logutil.VERBOSE).Info("worker_instance_id extracted from named event", "worker_instance_id", s) + return s, false, nil + } + // Fallback: strip quotes + clean := strings.Trim(candidate, "\"") + if clean != "" && clean != "[DONE]" { + logger.V(logutil.DEFAULT).Info("worker_instance_id extracted (raw) from named event", "worker_instance_id", clean) + return clean, false, nil + } + } + } + + // Generic JSON in data: + if data != "" { + var msg map[string]any + if json.Unmarshal([]byte(data), &msg) == nil { + if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { + logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE payload root", "worker_instance_id", wid) + return wid, false, nil + } + if ann, ok := msg["annotations"].(map[string]any); ok { + if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { + logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE annotations", "worker_instance_id", wid) + return wid, false, nil + } + } + } + } + return "", false, nil + } + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + // Flush any pending event on EOF + if wid, done, _ := flushEvent(); wid != "" { + return wid, nil + } else if done { + return "", fmt.Errorf("worker_instance_id not found before DONE") + } + logger.V(logutil.DEFAULT).Error(nil, "EOF before worker_instance_id") + return "", fmt.Errorf("worker_instance_id not found in SSE stream (EOF)") + } + logger.V(logutil.DEFAULT).Error(err, "SSE read error") + return "", fmt.Errorf("sse read error: %w", err) + } + + l := strings.TrimRight(line, "\r\n") + if l == "" { + // End of current event; process it + if wid, done, _ := flushEvent(); wid != "" { + return wid, nil + } else if done { + return "", fmt.Errorf("worker_instance_id not found before DONE") + } + eventName = "" // reset for next event + continue + } + + // Comment line + if strings.HasPrefix(l, ":") { + commentLine := strings.TrimSpace(l[1:]) + if commentBuf.Len() > 0 { + commentBuf.WriteByte('\n') + } + commentBuf.WriteString(commentLine) + continue + } + + // "field: value" + if idx := strings.IndexByte(l, ':'); idx != -1 { + field := l[:idx] + val := strings.TrimSpace(l[idx+1:]) + switch field { + case "event": + eventName = val + case "data": + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.WriteString(val) + default: + // ignore id, retry, etc. + } + } + } +} From c2f8a40817b6248b66cab7b21e3a3639e54c34af Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:23:03 -0700 Subject: [PATCH 2/5] fix comments and 1 const --- cmd/epp/main.go | 2 +- pkg/bbr/handlers/request.go | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 35e97d2799..3db34225e2 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -30,7 +30,7 @@ import ( ) func main() { - // Register built-in plugins. + // Register all known plugin factories runner.RegisterAllPlugins() eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index b9cd35fce3..d5a19ed2bf 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,3 +1,16 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package handlers import ( @@ -14,8 +27,10 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) +const modelHeader = "X-Gateway-Model-Name" + +// Dynamo-related const ( - modelHeader = "X-Gateway-Model-Name" workerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" ) @@ -143,6 +158,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] Response: &eppb.ProcessingResponse_RequestBody{ RequestBody: &eppb.BodyResponse{ Response: &eppb.CommonResponse{ + // Necessary so that the new headers are used in the routing decision. ClearRouteCache: true, HeaderMutation: &eppb.HeaderMutation{ SetHeaders: []*basepb.HeaderValueOption{ From ab5b5fd43f983e83b94af3c394f41058001e918f Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Thu, 21 Aug 2025 18:26:24 -0700 Subject: [PATCH 3/5] cleanup comments --- cmd/epp/main.go | 2 ++ pkg/bbr/handlers/request.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 3db34225e2..8592735d2c 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -32,6 +32,8 @@ import ( func main() { // Register all known plugin factories runner.RegisterAllPlugins() + // For adding out-of-tree plugins to the plugins registry, use the following: + // plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function) eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory) eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index d5a19ed2bf..70a4b04d73 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -1,9 +1,12 @@ /* Copyright 2025 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +20,7 @@ import ( "context" "encoding/json" "fmt" - "strings" // <— needed to normalize header keys/trim + "strings" basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -63,7 +66,6 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] if !ok { metrics.RecordModelNotInBodyCounter() logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter") - if s.streaming { // still stream the possibly mutated body ret = append(ret, &eppb.ProcessingResponse{ From 513598468ec14d1c0d1bbdfe2f48e45ede5e3330 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 22 Aug 2025 13:13:04 -0700 Subject: [PATCH 4/5] Add managing token_data --- pkg/bbr/handlers/request.go | 40 +++- pkg/bbr/handlers/server.go | 5 +- .../plugins/dynamo_inject_workerid/plugin.go | 7 + .../plugins/dynamo_kv_scorer/plugin.go | 172 ++++++++++++++---- 4 files changed, 184 insertions(+), 40 deletions(-) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 70a4b04d73..573276e5fc 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -18,6 +18,7 @@ package handlers import ( "context" + "encoding/base64" "encoding/json" "fmt" "strings" @@ -36,6 +37,7 @@ const modelHeader = "X-Gateway-Model-Name" const ( workerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" + tokenDataHeader = "x-epp-inject-nvext-token-data" ) // HandleRequestBody handles request bodies. @@ -57,6 +59,23 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([] } } + // If we captured token_data in headers, decode and inject as nvext.token_data + if td := strings.TrimSpace(s.tokenDataHint); td != "" { + // header value is base64(JSON array) + if raw, err := base64.StdEncoding.DecodeString(td); err == nil { + var arr []int64 + if err := json.Unmarshal(raw, &arr); err == nil && len(arr) > 0 { + // ensure nvext map exists + nv, ok := data["nvext"].(map[string]any) + if !ok || nv == nil { + nv = map[string]any{} + data["nvext"] = nv + } + nv["token_data"] = arr + } + } + } + requestBodyBytes, err := json.Marshal(data) if err != nil { return nil, err @@ -216,20 +235,29 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy // HandleRequestHeaders handles request headers. func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) { - // Look for our hint header and/or direct worker id header. - s.workerIDHint = "" // reset per request + // reset per-request + s.workerIDHint = "" + s.tokenDataHint = "" + if m := headers.GetHeaders(); m != nil { for _, h := range m.GetHeaders() { k := strings.ToLower(h.GetKey()) - if k == injectHintHeader || k == workerIDHeader { + + switch k { + case injectHintHeader, workerIDHeader: + // Prefer raw bytes if present; otherwise use value (Envoy can deliver either) if rv := h.GetRawValue(); len(rv) > 0 { s.workerIDHint = strings.TrimSpace(string(rv)) } else { s.workerIDHint = strings.TrimSpace(h.GetValue()) } - // prefer the inject hint if both are present; break on the hint - if k == injectHintHeader { - break + // NOTE: don't return; we still want to scan for tokenDataHeader + + case tokenDataHeader: + if rv := h.GetRawValue(); len(rv) > 0 { + s.tokenDataHint = strings.TrimSpace(string(rv)) + } else { + s.tokenDataHint = strings.TrimSpace(h.GetValue()) } } } diff --git a/pkg/bbr/handlers/server.go b/pkg/bbr/handlers/server.go index e8c109282c..eb2893fdc6 100644 --- a/pkg/bbr/handlers/server.go +++ b/pkg/bbr/handlers/server.go @@ -38,8 +38,9 @@ func NewServer(streaming bool) *Server { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type Server struct { - streaming bool - workerIDHint string + streaming bool + workerIDHint string + tokenDataHint string } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { diff --git a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go index 5c7d4438c3..b6708fa4d4 100644 --- a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go +++ b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go @@ -15,6 +15,7 @@ const ( pluginName = "dynamo-inject-workerid" WorkerIDHeader = "x-worker-instance-id" injectHintHeader = "x-epp-inject-nvext-worker-instance-id" + TokenDataHeader = "x-epp-inject-nvext-token-data" ) var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil) @@ -59,4 +60,10 @@ func (p *InjectWorkerIDPreRequest) PreRequest( } req.Headers[WorkerIDHeader] = wid req.Headers[injectHintHeader] = wid + + // Pass through token-data header if scorer set it + if td := strings.TrimSpace(req.Headers[TokenDataHeader]); td != "" { + req.Headers[TokenDataHeader] = td + } + } diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 9ad723978e..7d60e07c37 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -23,6 +24,7 @@ const ( KVAwareScorerType = "kv-aware-scorer" StateKeyWorkerInstanceID = schedtypes.StateKey("dynamo/worker-instance-id") WorkerIDHeader = "x-worker-instance-id" + TokenDataHeader = "x-epp-inject-nvext-token-data" ) type params struct { @@ -84,7 +86,7 @@ func (k *KVAwareScorer) Score( ) map[schedtypes.Pod]float64 { logger := log.FromContext(ctx) - workerID, err := k.callFrontEndForWorker(ctx, req) + workerID, tokenData, err := k.callFrontEndForWorker(ctx, req) if err != nil { logger.V(logutil.DEFAULT).Error(err, "FrontEnd call failed; proceeding without worker id") } else if workerID != "" { @@ -93,6 +95,12 @@ func (k *KVAwareScorer) Score( req.Headers = map[string]string{} } req.Headers[WorkerIDHeader] = workerID + if len(tokenData) > 0 { + if req.Headers == nil { + req.Headers = map[string]string{} + } + req.Headers[TokenDataHeader] = encodeTokenData(tokenData) + } } // neutral/uniform scores – only your scorer runs in the profile, so this “wins” @@ -107,14 +115,14 @@ func (k *KVAwareScorer) Score( func (k *KVAwareScorer) callFrontEndForWorker( ctx context.Context, req *schedtypes.LLMRequest, -) (string, error) { +) (string, []int64, error) { logger := log.FromContext(ctx) feBody := buildFrontEndBodyFromLLMRequest(req) payload, err := json.Marshal(feBody) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd marshal failed") - return "", fmt.Errorf("marshal FrontEnd body: %w", err) + return "", nil, fmt.Errorf("marshal FrontEnd body: %w", err) } reqCtx, cancel := context.WithTimeout(ctx, k.feTimeout) @@ -123,7 +131,7 @@ func (k *KVAwareScorer) callFrontEndForWorker( httpReq, err := http.NewRequestWithContext(reqCtx, http.MethodPost, k.feURL, bytes.NewReader(payload)) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd request build failed") - return "", fmt.Errorf("build FrontEnd request: %w", err) + return "", nil, fmt.Errorf("build FrontEnd request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") @@ -132,7 +140,7 @@ func (k *KVAwareScorer) callFrontEndForWorker( resp, err := client.Do(httpReq) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Dynamo FrontEnd POST failed") - return "", fmt.Errorf("FrontEnd POST failed: %w", err) + return "", nil, fmt.Errorf("FrontEnd POST failed: %w", err) } defer resp.Body.Close() @@ -140,22 +148,22 @@ func (k *KVAwareScorer) callFrontEndForWorker( errBody, _ := io.ReadAll(resp.Body) logger.V(logutil.DEFAULT).Error(nil, "Dynamo FrontEnd non-2xx response", "status_code", resp.StatusCode, "response_body", string(errBody)) - return "", fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) + return "", nil, fmt.Errorf("Dynamo FrontEnd error: %d body=%s", resp.StatusCode, string(errBody)) } ct := strings.ToLower(resp.Header.Get("Content-Type")) if !strings.Contains(ct, "text/event-stream") { logger.V(logutil.DEFAULT).Error(nil, "Unexpected non-SSE response") - return "", fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) + return "", nil, fmt.Errorf("unexpected non-SSE response (Content-Type=%q)", resp.Header.Get("Content-Type")) } // Parse SSE: expect `event: worker_instance_id`, a quoted id in a comment or data, and `data: [DONE]` reader := bufio.NewReader(resp.Body) - workerID, perr := parseWorkerIDFromSSE(ctx, reader) + workerID, tokenData, perr := parseSelectionFromSSE(ctx, reader) if perr != nil { - return "", perr + return "", nil, perr } - return workerID, nil + return workerID, tokenData, nil } // Build the exact body we send to the FrontEnd, only from LLMRequest (no header merging). @@ -189,7 +197,7 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any return feBody } -// parseWorkerIDFromSSE scans an SSE stream for a worker_instance_id. +// This function scans an SSE stream for a worker_instance_id and token_data. // Expected pattern: // // event: worker_instance_id @@ -198,16 +206,21 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any // // Also supports JSON in data lines with either top-level worker_instance_id // or annotations.worker_instance_id. -func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, error) { +func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) { logger := log.FromContext(ctx) var ( eventName string dataBuf strings.Builder // accumulates "data:" lines for one event commentBuf strings.Builder // accumulates ":" comment lines + gotWID string + gotTD []int64 ) - flushEvent := func() (string, bool, error) { + // collect the exact SSE bytes for debugging + var rawBuf strings.Builder + + flushEvent := func() (bool, error) { data := strings.TrimSpace(dataBuf.String()) comment := strings.TrimSpace(commentBuf.String()) dataBuf.Reset() @@ -216,7 +229,11 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er // [DONE] ends the stream if data == "[DONE]" || comment == "[DONE]" { logger.V(logutil.DEFAULT).Info("SSE stream DONE") - return "", true, nil + logger.V(logutil.DEFAULT).Info("SSE raw stream", "raw", rawBuf.String()) + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") + } + return true, nil } // Prefer the named event @@ -230,60 +247,93 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er var s string if json.Unmarshal([]byte(candidate), &s) == nil && s != "" { logger.V(logutil.VERBOSE).Info("worker_instance_id extracted from named event", "worker_instance_id", s) - return s, false, nil + gotWID = s + return false, nil } // Fallback: strip quotes clean := strings.Trim(candidate, "\"") if clean != "" && clean != "[DONE]" { logger.V(logutil.DEFAULT).Info("worker_instance_id extracted (raw) from named event", "worker_instance_id", clean) - return clean, false, nil + gotWID = clean + return false, nil } } } + if eventName == "token_data" { + candidate := data + if candidate == "" { + candidate = comment + } + if candidate != "" { + if arr := toInt64SliceJSON(candidate); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data extracted from named event", "count", len(arr)) + return false, nil + } + } + } // Generic JSON in data: if data != "" { var msg map[string]any if json.Unmarshal([]byte(data), &msg) == nil { if wid, ok := msg["worker_instance_id"].(string); ok && wid != "" { logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE payload root", "worker_instance_id", wid) - return wid, false, nil + gotWID = wid } if ann, ok := msg["annotations"].(map[string]any); ok { if wid, ok := ann["worker_instance_id"].(string); ok && wid != "" { logger.V(logutil.DEFAULT).Info("worker_instance_id found in SSE annotations", "worker_instance_id", wid) - return wid, false, nil + gotWID = wid + } + } + if td, ok := msg["token_data"]; ok { + if arr := toInt64Slice(td); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data found in SSE payload root", "count", len(arr)) + } + } else if nv, ok := msg["nvext"].(map[string]any); ok { + if td, ok := nv["token_data"]; ok { + if arr := toInt64Slice(td); len(arr) > 0 { + gotTD = arr + logger.V(logutil.DEFAULT).Info("token_data found in SSE nvext", "count", len(arr)) + } } } } } - return "", false, nil + return false, nil } for { line, err := reader.ReadString('\n') + // capture the raw stream as-is for debugging + rawBuf.WriteString(line) if err != nil { if err == io.EOF { - // Flush any pending event on EOF - if wid, done, _ := flushEvent(); wid != "" { - return wid, nil - } else if done { - return "", fmt.Errorf("worker_instance_id not found before DONE") + _, _ = flushEvent() + logger.V(logutil.DEFAULT).Info("SSE raw stream (EOF)", "raw", rawBuf.String()) + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("EOF: worker_instance_id present, token_data missing") } - logger.V(logutil.DEFAULT).Error(nil, "EOF before worker_instance_id") - return "", fmt.Errorf("worker_instance_id not found in SSE stream (EOF)") + if gotWID != "" || len(gotTD) > 0 { + return gotWID, gotTD, nil + } + logger.V(logutil.DEFAULT).Error(nil, "EOF before selection fields present") + return "", nil, fmt.Errorf("selection not found in SSE stream (EOF)") } logger.V(logutil.DEFAULT).Error(err, "SSE read error") - return "", fmt.Errorf("sse read error: %w", err) + return "", nil, fmt.Errorf("sse read error: %w", err) } l := strings.TrimRight(line, "\r\n") if l == "" { - // End of current event; process it - if wid, done, _ := flushEvent(); wid != "" { - return wid, nil - } else if done { - return "", fmt.Errorf("worker_instance_id not found before DONE") + // End of current event. + if done, _ := flushEvent(); done { + if gotWID != "" && len(gotTD) == 0 { + logger.V(logutil.DEFAULT).Info("SSE DONE: worker_instance_id present, token_data missing") + } + return gotWID, gotTD, nil } eventName = "" // reset for next event continue @@ -317,3 +367,61 @@ func parseWorkerIDFromSSE(ctx context.Context, reader *bufio.Reader) (string, er } } } + +// encodeTokenData turns []int64 into base64(JSON array) for a safe header value. +func encodeTokenData(tokens []int64) string { + b, _ := json.Marshal(tokens) + return base64.StdEncoding.EncodeToString(b) +} + +// Accepts interface{} from a parsed JSON map +func toInt64Slice(v any) []int64 { + xs, ok := v.([]any) + if !ok { + return nil + } + out := make([]int64, 0, len(xs)) + for _, it := range xs { + switch n := it.(type) { + case float64: + out = append(out, int64(n)) + case int64: + out = append(out, n) + case json.Number: + if i, err := n.Int64(); err == nil { + out = append(out, i) + } + } + } + return out +} + +// Accepts raw JSON (string) for events like: +// event: worker_instance_id\n: \"8228244551594056720\"\n\n +// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ +// "\n\ndata: [DONE]\n\n" +// replaces the old toInt64SliceJSON +func toInt64SliceJSON(s string) []int64 { + // case 1: direct JSON array + var arr []int64 + if err := json.Unmarshal([]byte(s), &arr); err == nil && len(arr) > 0 { + return arr + } + // case 2: s is a JSON string that itself contains a JSON array + var inner string + if err := json.Unmarshal([]byte(s), &inner); err == nil && inner != "" { + var arr2 []int64 + if err := json.Unmarshal([]byte(inner), &arr2); err == nil && len(arr2) > 0 { + return arr2 + } + } + // case 3: strip quotes and try once more + unquoted := strings.Trim(s, "\"") + if unquoted != s { + var arr3 []int64 + if err := json.Unmarshal([]byte(unquoted), &arr3); err == nil && len(arr3) > 0 { + return arr3 + } + } + return nil +} From e34790efeb7d82e0f18d611a2f15cc47b3210978 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Fri, 22 Aug 2025 13:22:13 -0700 Subject: [PATCH 5/5] Cleanup and move the config file --- pkg/bbr/handlers/request.go | 3 --- .../plugins/dynamo_kv_scorer/epp-config-dynamo.yaml | 1 + pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go | 6 +++++- 3 files changed, 6 insertions(+), 4 deletions(-) rename epp-config-dynamo.yaml => pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml (85%) diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go index 573276e5fc..1aa1b85268 100644 --- a/pkg/bbr/handlers/request.go +++ b/pkg/bbr/handlers/request.go @@ -245,14 +245,11 @@ func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.Proces switch k { case injectHintHeader, workerIDHeader: - // Prefer raw bytes if present; otherwise use value (Envoy can deliver either) if rv := h.GetRawValue(); len(rv) > 0 { s.workerIDHint = strings.TrimSpace(string(rv)) } else { s.workerIDHint = strings.TrimSpace(h.GetValue()) } - // NOTE: don't return; we still want to scan for tokenDataHeader - case tokenDataHeader: if rv := h.GetRawValue(); len(rv) > 0 { s.tokenDataHint = strings.TrimSpace(string(rv)) diff --git a/epp-config-dynamo.yaml b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml similarity index 85% rename from epp-config-dynamo.yaml rename to pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml index 959c2ea497..2d92be03b7 100644 --- a/epp-config-dynamo.yaml +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml @@ -1,3 +1,4 @@ +# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods apiVersion: inference.networking.x-k8s.io/v1alpha1 kind: EndpointPickerConfig plugins: diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go index 7d60e07c37..50eb5f6907 100644 --- a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go +++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go @@ -203,7 +203,11 @@ func buildFrontEndBodyFromLLMRequest(req *schedtypes.LLMRequest) map[string]any // event: worker_instance_id // : "8303679623149182543" // data: [DONE] -// + +// or with tokens: +// event: worker_instance_id\n: \"8228244551594056720\"\n\n +// event: token_data\n: \"[151644,872,198,151644,872,198,14990,151645,198,151645,198,151644,77091,198]\ +// "\n\ndata: [DONE]\n\n" // Also supports JSON in data lines with either top-level worker_instance_id // or annotations.worker_instance_id. func parseSelectionFromSSE(ctx context.Context, reader *bufio.Reader) (string, []int64, error) {