Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions Dockerfile

This file was deleted.

42 changes: 42 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,48 @@ verify-all:

##@ Build

##@ Dynamo EPP with FFI

# Build the Dynamo EPP image with CGO static library support
.PHONY: dynamo-image-local-build
dynamo-image-local-build: ## Build the Dynamo EPP image using Docker Buildx for local development.
BUILDER=$(shell $(DOCKER_BUILDX_CMD) create --use)
$(MAKE) dynamo-image-build PUSH=$(PUSH)
$(MAKE) dynamo-image-build LOAD=$(LOAD)
$(DOCKER_BUILDX_CMD) rm $$BUILDER

.PHONY: dynamo-image-local-push
dynamo-image-local-push: PUSH=--push ## Build the Dynamo EPP image for local development and push it to $IMAGE_REPO.
dynamo-image-local-push: dynamo-image-local-build

.PHONY: dynamo-image-local-load
dynamo-image-local-load: LOAD=--load ## Build the Dynamo EPP image for local development and load it in the local Docker registry.
dynamo-image-local-load: dynamo-image-local-build

.PHONY: dynamo-image-build
dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support.
$(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \
--platform=$(PLATFORMS) \
--build-arg BASE_IMAGE=ubuntu:24.04 \
--build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
--build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \
--build-arg BUILD_REF=${BUILD_REF} \
$(PUSH) \
$(LOAD) \
$(IMAGE_BUILD_EXTRA_OPTS) ./

.PHONY: dynamo-image-push
dynamo-image-push: PUSH=--push ## Build the Dynamo EPP image and push it to $IMAGE_REPO.
dynamo-image-push: dynamo-image-build

.PHONY: dynamo-image-load
dynamo-image-load: LOAD=--load ## Build the Dynamo EPP image and load it in the local Docker registry.
dynamo-image-load: dynamo-image-build

.PHONY: dynamo-image-kind
dynamo-image-kind: dynamo-image-build ## Build the Dynamo EPP image and load it to kind cluster $KIND_CLUSTER ("kind" by default).
kind load docker-image $(IMAGE_TAG) --name $(KIND_CLUSTER)

# Build the container image
.PHONY: image-local-build
image-local-build: ## Build the EPP image using Docker Buildx for local development.
Expand Down
8 changes: 8 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"

// Dynamo plugins
dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid"
dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)

func main() {
Expand All @@ -30,6 +35,9 @@ func main() {
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)

eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory)
eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory)

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
TargetModel: reqCtx.ResolvedTargetModel,
Prompt: prompt,
Headers: reqCtx.Request.Headers,
Annotations: map[string]any{},
}

logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)
Expand Down Expand Up @@ -253,7 +254,7 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
reqCtx.TargetPod = targetPod
reqCtx.TargetEndpoint = endpoint

d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort, reqCtx.Request.Body)

return reqCtx, nil
}
Expand Down Expand Up @@ -319,13 +320,20 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
return ""
}

func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult,
func (d *Director) runPreRequestPlugins(
ctx context.Context,
request *schedulingtypes.LLMRequest,
schedulingResult *schedulingtypes.SchedulingResult,
targetPort int,
body map[string]any,
) {
for _, plugin := range d.preRequestPlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.TypedName().Type)
before := time.Now()
plugin.PreRequest(ctx, request, schedulingResult, targetPort)
if mutator, ok := plugin.(RequestBodyMutator); ok && body != nil {
mutator.MutateRequestBody(ctx, request, schedulingResult, targetPort, body)
}
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.TypedName().Type, time.Since(before))
}
}
Expand Down
119 changes: 119 additions & 0 deletions pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package dynamo_inject_workerid

import (
"context"
"encoding/json"
"strconv"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
typeString = "dynamo-inject-workerid"
pluginName = "dynamo-inject-workerid"
WorkerIDHeader = "x-worker-instance-id"
tokenDataAnnotationKey = "dynamo/token-data"
)

var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil)
var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil)
var _ rc.RequestBodyMutator = (*InjectWorkerIDPreRequest)(nil)

type InjectWorkerIDPreRequest struct {
typedName plugins.TypedName
}

func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest {
return &InjectWorkerIDPreRequest{
typedName: plugins.TypedName{Type: typeString, Name: pluginName},
}
}

func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest {
p.typedName.Name = name
return p
}

func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewInjectWorkerIDPreRequest().WithName(name), nil
}

func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName }

func (p *InjectWorkerIDPreRequest) PreRequest(
_ context.Context,
req *schedtypes.LLMRequest,
_ *schedtypes.SchedulingResult,
_ int,
) {
if req == nil {
return
}
if req.Headers == nil {
req.Headers = map[string]string{}
}
wid := strings.TrimSpace(req.Headers[WorkerIDHeader])
if wid == "" {
return
}
req.Headers[WorkerIDHeader] = wid
}

func (p *InjectWorkerIDPreRequest) MutateRequestBody(
_ context.Context,
req *schedtypes.LLMRequest,
_ *schedtypes.SchedulingResult,
_ int,
body map[string]any,
) {
if req == nil || body == nil {
return
}
if req.Headers == nil {
return
}

wid := strings.TrimSpace(req.Headers[WorkerIDHeader])
if wid == "" {
return
}

nvext, _ := body["nvext"].(map[string]any)
if nvext == nil {
nvext = map[string]any{}
body["nvext"] = nvext
}
if widUint, err := strconv.ParseUint(wid, 10, 64); err == nil {
nvext["backend_instance_id"] = widUint
}

if tokens, ok := req.Annotations[tokenDataAnnotationKey]; ok {
switch v := tokens.(type) {
case []int64:
if len(v) > 0 {
nvext["token_data"] = v
}
case []any:
var out []int64
for _, elem := range v {
switch t := elem.(type) {
case int64:
out = append(out, t)
case float64:
out = append(out, int64(t))
}
}
if len(out) > 0 {
nvext["token_data"] = out
}
case json.RawMessage:
var out []int64
if err := json.Unmarshal(v, &out); err == nil && len(out) > 0 {
nvext["token_data"] = out
}
}
}
}
21 changes: 21 additions & 0 deletions pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
# Required: tells EPP which profile to use (even if you only have one)
- type: single-profile-handler

# Picker: chooses the final endpoint after scoring
- name: picker
type: max-score-picker
- name: dyn-pre
type: dynamo-inject-workerid
parameters: {}
- name: dyn-kv
type: kv-aware-scorer
schedulingProfiles:
- name: default
plugins:
- pluginRef: dyn-kv
weight: 1
- pluginRef: picker
Loading