Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"

// Dynamo plugins
dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid"
dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)

func main() {
Expand All @@ -30,6 +35,9 @@ func main() {
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)

eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory)
eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory)

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
Expand Down
132 changes: 126 additions & 6 deletions pkg/bbr/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package handlers

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"

basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand All @@ -31,11 +33,49 @@ import (

const modelHeader = "X-Gateway-Model-Name"

// Dynamo-related
const (
workerIDHeader = "x-worker-instance-id"
injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
tokenDataHeader = "x-epp-inject-nvext-token-data"
)

// HandleRequestBody handles request bodies.
func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
var ret []*eppb.ProcessingResponse

// If we captured a worker id hint in the headers phase, inject it into body JSON:
// nvext.backend_instance_id = <workerID>
if wid := strings.TrimSpace(s.workerIDHint); wid != "" {
// ensure nvext is a map[string]any
if nv, ok := data["nvext"]; !ok || nv == nil {
data["nvext"] = map[string]any{"backend_instance_id": wid}
} else if m, ok := nv.(map[string]any); ok {
m["backend_instance_id"] = wid
} else {
// if nvext was some other type, replace with a clean map
data["nvext"] = map[string]any{"backend_instance_id": wid}
}
}

// If we captured token_data in headers, decode and inject as nvext.token_data
if td := strings.TrimSpace(s.tokenDataHint); td != "" {
// header value is base64(JSON array)
if raw, err := base64.StdEncoding.DecodeString(td); err == nil {
var arr []int64
if err := json.Unmarshal(raw, &arr); err == nil && len(arr) > 0 {
// ensure nvext map exists
nv, ok := data["nvext"].(map[string]any)
if !ok || nv == nil {
nv = map[string]any{}
data["nvext"] = nv
}
nv["token_data"] = arr
}
}
}

requestBodyBytes, err := json.Marshal(data)
if err != nil {
return nil, err
Expand All @@ -46,21 +86,32 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordModelNotInBodyCounter()
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
if s.streaming {
// still stream the possibly mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{},
},
})
ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
} else {
ret = append(ret, &eppb.ProcessingResponse{
}

// non-streaming: return a body response with the (possibly) mutated body
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestBody{
RequestBody: &eppb.BodyResponse{},
RequestBody: &eppb.BodyResponse{
Response: &eppb.CommonResponse{
BodyMutation: &eppb.BodyMutation{
Mutation: &eppb.BodyMutation_Body{
Body: requestBodyBytes,
},
},
},
},
},
})
}
return ret, nil
},
}, nil
}

modelStr, ok := modelVal.(string)
Expand All @@ -73,6 +124,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordSuccessCounter()

if s.streaming {
// set the model header, then stream the (possibly) mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{
Expand All @@ -86,16 +138,42 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
// also keep the worker id header if we have one
func() *basepb.HeaderValueOption {
if strings.TrimSpace(s.workerIDHint) == "" {
return nil
}
return &basepb.HeaderValueOption{
Header: &basepb.HeaderValue{
Key: workerIDHeader,
RawValue: []byte(s.workerIDHint),
},
}
}(),
},
},
},
},
},
})

// prune nil entries if worker id not present
hm := ret[len(ret)-1].GetRequestHeaders().GetResponse().GetHeaderMutation()
if hm != nil && hm.SetHeaders != nil {
out := hm.SetHeaders[:0]
for _, h := range hm.SetHeaders {
if h != nil {
out = append(out, h)
}
}
hm.SetHeaders = out
}

ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
}

// Non-streaming: set model header and replace the body with our mutated JSON
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestBody{
Expand All @@ -111,6 +189,22 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
func() *basepb.HeaderValueOption {
if strings.TrimSpace(s.workerIDHint) == "" {
return nil
}
return &basepb.HeaderValueOption{
Header: &basepb.HeaderValue{
Key: workerIDHeader,
RawValue: []byte(s.workerIDHint),
},
}
}(),
},
},
BodyMutation: &eppb.BodyMutation{
Mutation: &eppb.BodyMutation_Body{
Body: requestBodyBytes,
},
},
},
Expand Down Expand Up @@ -141,6 +235,32 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy

// HandleRequestHeaders handles request headers.
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) {
// reset per-request
s.workerIDHint = ""
s.tokenDataHint = ""

if m := headers.GetHeaders(); m != nil {
for _, h := range m.GetHeaders() {
k := strings.ToLower(h.GetKey())

switch k {
case injectHintHeader, workerIDHeader:
if rv := h.GetRawValue(); len(rv) > 0 {
s.workerIDHint = strings.TrimSpace(string(rv))
} else {
s.workerIDHint = strings.TrimSpace(h.GetValue())
}
case tokenDataHeader:
if rv := h.GetRawValue(); len(rv) > 0 {
s.tokenDataHint = strings.TrimSpace(string(rv))
} else {
s.tokenDataHint = strings.TrimSpace(h.GetValue())
}
}
}
}

// No header mutations needed here; body phase will do the JSON injection.
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestHeaders{
Expand Down
4 changes: 3 additions & 1 deletion pkg/bbr/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func NewServer(streaming bool) *Server {
// Server implements the Envoy external processing server.
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct {
streaming bool
streaming bool
workerIDHint string
tokenDataHint string
}

func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
Expand Down
69 changes: 69 additions & 0 deletions pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dynamo_inject_workerid

import (
"context"
"encoding/json"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
typeString = "dynamo-inject-workerid"
pluginName = "dynamo-inject-workerid"
WorkerIDHeader = "x-worker-instance-id"
injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
TokenDataHeader = "x-epp-inject-nvext-token-data"
)

var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil)
var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil)

type InjectWorkerIDPreRequest struct {
typedName plugins.TypedName
}

func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest {
return &InjectWorkerIDPreRequest{
typedName: plugins.TypedName{Type: typeString, Name: pluginName},
}
}

func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest {
p.typedName.Name = name
return p
}

func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewInjectWorkerIDPreRequest().WithName(name), nil
}

func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName }

func (p *InjectWorkerIDPreRequest) PreRequest(
_ context.Context,
req *schedtypes.LLMRequest,
_ *schedtypes.SchedulingResult,
_ int,
) {
if req == nil {
return
}
if req.Headers == nil {
req.Headers = map[string]string{}
}
wid := strings.TrimSpace(req.Headers[WorkerIDHeader])
if wid == "" {
return
}
req.Headers[WorkerIDHeader] = wid
req.Headers[injectHintHeader] = wid

// Pass through token-data header if scorer set it
if td := strings.TrimSpace(req.Headers[TokenDataHeader]); td != "" {
req.Headers[TokenDataHeader] = td
}

}
24 changes: 24 additions & 0 deletions pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
# Required: tells EPP which profile to use (even if you only have one)
- type: single-profile-handler

# Picker: chooses the final endpoint after scoring
- name: picker
type: max-score-picker
- name: dyn-pre
type: dynamo-inject-workerid
parameters: {}
- name: dyn-kv
type: kv-aware-scorer
parameters:
frontendURL: http://127.0.0.1:8000/v1/chat/completions
timeoutMS: 10000
schedulingProfiles:
- name: default
plugins:
- pluginRef: dyn-kv
weight: 1
- pluginRef: picker
Loading