Skip to content
Open
42 changes: 42 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,48 @@ verify-all:

##@ Build

##@ Dynamo EPP with FFI

# Build the Dynamo EPP image with CGO static library support
.PHONY: dynamo-image-local-build
dynamo-image-local-build: ## Build the Dynamo EPP image using Docker Buildx for local development.
BUILDER=$(shell $(DOCKER_BUILDX_CMD) create --use)
$(MAKE) dynamo-image-build PUSH=$(PUSH)
$(MAKE) dynamo-image-build LOAD=$(LOAD)
$(DOCKER_BUILDX_CMD) rm $$BUILDER

.PHONY: dynamo-image-local-push
dynamo-image-local-push: PUSH=--push ## Build the Dynamo EPP image for local development and push it to $IMAGE_REPO.
dynamo-image-local-push: dynamo-image-local-build

.PHONY: dynamo-image-local-load
dynamo-image-local-load: LOAD=--load ## Build the Dynamo EPP image for local development and load it in the local Docker registry.
dynamo-image-local-load: dynamo-image-local-build

.PHONY: dynamo-image-build
dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support.
$(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \
--platform=$(PLATFORMS) \
--build-arg BASE_IMAGE=ubuntu:24.04 \
--build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
--build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \
--build-arg BUILD_REF=${BUILD_REF} \
$(PUSH) \
$(LOAD) \
$(IMAGE_BUILD_EXTRA_OPTS) ./

.PHONY: dynamo-image-push
dynamo-image-push: PUSH=--push ## Build the Dynamo EPP image and push it to $IMAGE_REPO.
dynamo-image-push: dynamo-image-build

.PHONY: dynamo-image-load
dynamo-image-load: LOAD=--load ## Build the Dynamo EPP image and load it in the local Docker registry.
dynamo-image-load: dynamo-image-build

.PHONY: dynamo-image-kind
dynamo-image-kind: dynamo-image-build ## Build the Dynamo EPP image and load it to kind cluster $KIND_CLUSTER ("kind" by default).
kind load docker-image $(IMAGE_TAG) --name $(KIND_CLUSTER)

# Build the container image
.PHONY: image-local-build
image-local-build: ## Build the EPP image using Docker Buildx for local development.
Expand Down
8 changes: 8 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"

// Dynamo plugins
dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid"
dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)

func main() {
Expand All @@ -30,6 +35,9 @@ func main() {
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)

eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory)
eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory)

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
Expand Down
132 changes: 126 additions & 6 deletions pkg/bbr/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package handlers

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"

basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand All @@ -31,11 +33,49 @@ import (

const modelHeader = "X-Gateway-Model-Name"

// Dynamo-related
const (
workerIDHeader = "x-worker-instance-id"
injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
tokenDataHeader = "x-epp-inject-nvext-token-data"
)

// HandleRequestBody handles request bodies.
func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
var ret []*eppb.ProcessingResponse

// If we captured a worker id hint in the headers phase, inject it into body JSON:
// nvext.backend_instance_id = <workerID>
if wid := strings.TrimSpace(s.workerIDHint); wid != "" {
// ensure nvext is a map[string]any
if nv, ok := data["nvext"]; !ok || nv == nil {
data["nvext"] = map[string]any{"backend_instance_id": wid}
} else if m, ok := nv.(map[string]any); ok {
m["backend_instance_id"] = wid
} else {
// if nvext was some other type, replace with a clean map
data["nvext"] = map[string]any{"backend_instance_id": wid}
}
}

// If we captured token_data in headers, decode and inject as nvext.token_data
if td := strings.TrimSpace(s.tokenDataHint); td != "" {
// header value is base64(JSON array)
if raw, err := base64.StdEncoding.DecodeString(td); err == nil {
var arr []int64
if err := json.Unmarshal(raw, &arr); err == nil && len(arr) > 0 {
// ensure nvext map exists
nv, ok := data["nvext"].(map[string]any)
if !ok || nv == nil {
nv = map[string]any{}
data["nvext"] = nv
}
nv["token_data"] = arr
}
}
}

requestBodyBytes, err := json.Marshal(data)
if err != nil {
return nil, err
Expand All @@ -46,21 +86,32 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordModelNotInBodyCounter()
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
if s.streaming {
// still stream the possibly mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{},
},
})
ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
} else {
ret = append(ret, &eppb.ProcessingResponse{
}

// non-streaming: return a body response with the (possibly) mutated body
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestBody{
RequestBody: &eppb.BodyResponse{},
RequestBody: &eppb.BodyResponse{
Response: &eppb.CommonResponse{
BodyMutation: &eppb.BodyMutation{
Mutation: &eppb.BodyMutation_Body{
Body: requestBodyBytes,
},
},
},
},
},
})
}
return ret, nil
},
}, nil
}

modelStr, ok := modelVal.(string)
Expand All @@ -73,6 +124,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordSuccessCounter()

if s.streaming {
// set the model header, then stream the (possibly) mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{
Expand All @@ -86,16 +138,42 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
// also keep the worker id header if we have one
func() *basepb.HeaderValueOption {
if strings.TrimSpace(s.workerIDHint) == "" {
return nil
}
return &basepb.HeaderValueOption{
Header: &basepb.HeaderValue{
Key: workerIDHeader,
RawValue: []byte(s.workerIDHint),
},
}
}(),
},
},
},
},
},
})

// prune nil entries if worker id not present
hm := ret[len(ret)-1].GetRequestHeaders().GetResponse().GetHeaderMutation()
if hm != nil && hm.SetHeaders != nil {
out := hm.SetHeaders[:0]
for _, h := range hm.SetHeaders {
if h != nil {
out = append(out, h)
}
}
hm.SetHeaders = out
}

ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
}

// Non-streaming: set model header and replace the body with our mutated JSON
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestBody{
Expand All @@ -111,6 +189,22 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
func() *basepb.HeaderValueOption {
if strings.TrimSpace(s.workerIDHint) == "" {
return nil
}
return &basepb.HeaderValueOption{
Header: &basepb.HeaderValue{
Key: workerIDHeader,
RawValue: []byte(s.workerIDHint),
},
}
}(),
},
},
BodyMutation: &eppb.BodyMutation{
Mutation: &eppb.BodyMutation_Body{
Body: requestBodyBytes,
},
},
},
Expand Down Expand Up @@ -141,6 +235,32 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy

// HandleRequestHeaders handles request headers.
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) {
// reset per-request
s.workerIDHint = ""
s.tokenDataHint = ""

if m := headers.GetHeaders(); m != nil {
for _, h := range m.GetHeaders() {
k := strings.ToLower(h.GetKey())

switch k {
case injectHintHeader, workerIDHeader:
if rv := h.GetRawValue(); len(rv) > 0 {
s.workerIDHint = strings.TrimSpace(string(rv))
} else {
s.workerIDHint = strings.TrimSpace(h.GetValue())
}
case tokenDataHeader:
if rv := h.GetRawValue(); len(rv) > 0 {
s.tokenDataHint = strings.TrimSpace(string(rv))
} else {
s.tokenDataHint = strings.TrimSpace(h.GetValue())
}
}
}
}

// No header mutations needed here; body phase will do the JSON injection.
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestHeaders{
Expand Down
4 changes: 3 additions & 1 deletion pkg/bbr/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func NewServer(streaming bool) *Server {
// Server implements the Envoy external processing server.
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct {
streaming bool
streaming bool
workerIDHint string
tokenDataHint string
}

func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
Expand Down
69 changes: 69 additions & 0 deletions pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dynamo_inject_workerid

import (
"context"
"encoding/json"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
typeString = "dynamo-inject-workerid"
pluginName = "dynamo-inject-workerid"
WorkerIDHeader = "x-worker-instance-id"
injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
TokenDataHeader = "x-epp-inject-nvext-token-data"
)

var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil)
var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil)

type InjectWorkerIDPreRequest struct {
typedName plugins.TypedName
}

func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest {
return &InjectWorkerIDPreRequest{
typedName: plugins.TypedName{Type: typeString, Name: pluginName},
}
}

func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest {
p.typedName.Name = name
return p
}

func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewInjectWorkerIDPreRequest().WithName(name), nil
}

func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName }

func (p *InjectWorkerIDPreRequest) PreRequest(
_ context.Context,
req *schedtypes.LLMRequest,
_ *schedtypes.SchedulingResult,
_ int,
) {
if req == nil {
return
}
if req.Headers == nil {
req.Headers = map[string]string{}
}
wid := strings.TrimSpace(req.Headers[WorkerIDHeader])
if wid == "" {
return
}
req.Headers[WorkerIDHeader] = wid
req.Headers[injectHintHeader] = wid

// Pass through token-data header if scorer set it
if td := strings.TrimSpace(req.Headers[TokenDataHeader]); td != "" {
req.Headers[TokenDataHeader] = td
}

}
21 changes: 21 additions & 0 deletions pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
# Required: tells EPP which profile to use (even if you only have one)
- type: single-profile-handler

# Picker: chooses the final endpoint after scoring
- name: picker
type: max-score-picker
- name: dyn-pre
type: dynamo-inject-workerid
parameters: {}
- name: dyn-kv
type: kv-aware-scorer
schedulingProfiles:
- name: default
plugins:
- pluginRef: dyn-kv
weight: 1
- pluginRef: picker
Loading