Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
084f24b
Call FrontEnd for the worker id
atchernych Aug 22, 2025
c2f8a40
fix comments and 1 const
atchernych Aug 22, 2025
ab5b5fd
cleanup comments
atchernych Aug 22, 2025
5135984
Add managing token_data
atchernych Aug 22, 2025
e34790e
Cleanup and move the config file
atchernych Aug 22, 2025
39d8b0f
Integration with the router bindings
atchernych Sep 10, 2025
1be87f6
clean
atchernych Sep 24, 2025
e701ef0
rm FrontEnd-related
atchernych Sep 29, 2025
a5b1af3
cleanup
atchernych Sep 29, 2025
7a674c8
add env vars
atchernych Sep 30, 2025
f5ad6af
rm Dynamo.epp image build
atchernych Oct 10, 2025
e34979c
router_sync=True by default
atchernych Oct 15, 2025
689fed3
add print to pkg/epp/requestcontrol/director.go
atchernych Oct 29, 2025
b1e4d91
upd ubuntu version
atchernych Nov 15, 2025
f2fa578
restore director.go
atchernych Nov 15, 2025
05fb204
MOVE plugin response to the body from headers
atchernych Nov 18, 2025
9e08188
rm unneded comments
atchernych Nov 18, 2025
2997112
rm unneded stuff
atchernych Nov 18, 2025
1e18b01
rm comment
atchernych Nov 18, 2025
97c0114
pas tokens in body
atchernych Nov 18, 2025
4b8dc58
fix worker_id_type
atchernych Dec 8, 2025
d354415
add disagg
atchernych Dec 13, 2025
ba79e0a
account for disag
atchernych Dec 18, 2025
4ebd7da
update flag DYNAMO_ENFORCE_DISAGG=false
atchernych Dec 19, 2025
0068439
Add dynamo_router_free_request and add_request
atchernych Dec 20, 2025
37f5465
add cleanup plugin
atchernych Dec 20, 2025
f280700
forgot to add pkg/epp/requestcontrol/body_mutator
atchernych Dec 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions Dockerfile

This file was deleted.

42 changes: 42 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,48 @@ verify-all:

##@ Build

##@ Dynamo EPP with FFI

# Build the Dynamo EPP image with CGO static library support
.PHONY: dynamo-image-local-build
dynamo-image-local-build: ## Build the Dynamo EPP image using Docker Buildx for local development.
BUILDER=$(shell $(DOCKER_BUILDX_CMD) create --use)
$(MAKE) dynamo-image-build PUSH=$(PUSH)
$(MAKE) dynamo-image-build LOAD=$(LOAD)
$(DOCKER_BUILDX_CMD) rm $$BUILDER

.PHONY: dynamo-image-local-push
dynamo-image-local-push: PUSH=--push ## Build the Dynamo EPP image for local development and push it to $IMAGE_REPO.
dynamo-image-local-push: dynamo-image-local-build

.PHONY: dynamo-image-local-load
dynamo-image-local-load: LOAD=--load ## Build the Dynamo EPP image for local development and load it in the local Docker registry.
dynamo-image-local-load: dynamo-image-local-build

.PHONY: dynamo-image-build
dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support.
$(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \
--platform=$(PLATFORMS) \
--build-arg BASE_IMAGE=ubuntu:24.04 \
--build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
--build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \
--build-arg BUILD_REF=${BUILD_REF} \
$(PUSH) \
$(LOAD) \
$(IMAGE_BUILD_EXTRA_OPTS) ./

.PHONY: dynamo-image-push
dynamo-image-push: PUSH=--push ## Build the Dynamo EPP image and push it to $IMAGE_REPO.
dynamo-image-push: dynamo-image-build

.PHONY: dynamo-image-load
dynamo-image-load: LOAD=--load ## Build the Dynamo EPP image and load it in the local Docker registry.
dynamo-image-load: dynamo-image-build

.PHONY: dynamo-image-kind
dynamo-image-kind: dynamo-image-build ## Build the Dynamo EPP image and load it to kind cluster $KIND_CLUSTER ("kind" by default).
kind load docker-image $(IMAGE_TAG) --name $(KIND_CLUSTER)

# Build the container image
.PHONY: image-local-build
image-local-build: ## Build the EPP image using Docker Buildx for local development.
Expand Down
10 changes: 10 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"

// Dynamo plugins
dyncleanup "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_cleanup"
dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid"
dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)

func main() {
Expand All @@ -30,6 +36,10 @@ func main() {
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)

eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory)
eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory)
eppplugins.Register("dynamo-cleanup", dyncleanup.DynamoCleanupPluginFactory)

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/epp/requestcontrol/body_mutator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package requestcontrol

import (
"context"

schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

// RequestBodyMutator allows pre-request plugins to mutate the outbound request body.
// Implementations are invoked after the standard PreRequest hook completes.
type RequestBodyMutator interface {
MutateRequestBody(
ctx context.Context,
request *schedtypes.LLMRequest,
schedulingResult *schedtypes.SchedulingResult,
targetPort int,
body map[string]any,
)
}
12 changes: 10 additions & 2 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
TargetModel: reqCtx.ResolvedTargetModel,
Prompt: prompt,
Headers: reqCtx.Request.Headers,
Annotations: map[string]any{},
}

logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)
Expand Down Expand Up @@ -253,7 +254,7 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
reqCtx.TargetPod = targetPod
reqCtx.TargetEndpoint = endpoint

d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort, reqCtx.Request.Body)

return reqCtx, nil
}
Expand Down Expand Up @@ -319,13 +320,20 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
return ""
}

func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult,
func (d *Director) runPreRequestPlugins(
ctx context.Context,
request *schedulingtypes.LLMRequest,
schedulingResult *schedulingtypes.SchedulingResult,
targetPort int,
body map[string]any,
) {
for _, plugin := range d.preRequestPlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.TypedName().Type)
before := time.Now()
plugin.PreRequest(ctx, request, schedulingResult, targetPort)
if mutator, ok := plugin.(RequestBodyMutator); ok && body != nil {
mutator.MutateRequestBody(ctx, request, schedulingResult, targetPort, body)
}
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.TypedName().Type, time.Since(before))
}
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/epp/requestcontrol/plugins/dynamo_cleanup/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package dynamo_cleanup

import (
"context"
"encoding/json"

log "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"

dynamo "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)

const (
PluginName = "dynamo-cleanup"
PluginType = "dynamo-cleanup"
)

// DynamoCleanupPlugin is a PostResponse plugin that cleans up router state
// when a request completes. It calls dynamo_router_free_request to release
// the bookkeeping resources associated with the request.
type DynamoCleanupPlugin struct {
typedName plugins.TypedName
}

var _ plugins.Plugin = (*DynamoCleanupPlugin)(nil)
var _ rc.PostResponse = (*DynamoCleanupPlugin)(nil)

// NewDynamoCleanupPlugin creates a new DynamoCleanupPlugin instance.
func NewDynamoCleanupPlugin() *DynamoCleanupPlugin {
return &DynamoCleanupPlugin{
typedName: plugins.TypedName{Type: PluginType, Name: PluginName},
}
}

// WithName sets a custom name for the plugin.
func (p *DynamoCleanupPlugin) WithName(name string) *DynamoCleanupPlugin {
p.typedName.Name = name
return p
}

// DynamoCleanupPluginFactory creates a DynamoCleanupPlugin from configuration.
func DynamoCleanupPluginFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewDynamoCleanupPlugin().WithName(name), nil
}

// TypedName returns the plugin's type and name.
func (p *DynamoCleanupPlugin) TypedName() plugins.TypedName {
return p.typedName
}

// PostResponse is called after a response is received from the model server.
// It cleans up the router bookkeeping state for the completed request.
func (p *DynamoCleanupPlugin) PostResponse(
ctx context.Context,
request *schedtypes.LLMRequest,
response *rc.Response,
targetPod *backend.Pod,
) {
logger := log.FromContext(ctx)

if request == nil {
logger.V(logutil.DEBUG).Info("DynamoCleanupPlugin: request is nil, skipping cleanup")
return
}

requestID := request.RequestId
if requestID == "" {
logger.V(logutil.DEBUG).Info("DynamoCleanupPlugin: no request ID, skipping cleanup")
return
}

// Call the dynamo router to free the request bookkeeping
if err := dynamo.CallFreeRequest(requestID); err != nil {
logger.V(logutil.DEFAULT).Error(err, "DynamoCleanupPlugin: failed to free request",
"requestID", requestID)
return
}

logger.V(logutil.VERBOSE).Info("DynamoCleanupPlugin: freed request from router",
"requestID", requestID)
}

Loading