diff --git a/Makefile b/Makefile index 18df81dc4f..3b2eec395c 100644 --- a/Makefile +++ b/Makefile @@ -129,6 +129,15 @@ generate-proto: protoc-gen-go protoc-gen-go-grpc ## Generate Golang code from pr --go-grpc_out=module=sigs.k8s.io/gateway-api-inference-extension:. \ pkg/epp/framework/plugins/requesthandling/parsers/vllmgrpc/api/proto/*.proto +.PHONY: generate-proto-light +generate-proto-light: protoc-gen-go protoc-gen-go-grpc ## Generate Golang code from light EPP protobuf files. + PATH="$(LOCALBIN):$$PATH" $(PROTOC) \ + -I pkg/epp-light/proto \ + -I . \ + --go_out=module=sigs.k8s.io/gateway-api-inference-extension:. \ + --go-grpc_out=module=sigs.k8s.io/gateway-api-inference-extension:. \ + pkg/epp-light/proto/*.proto + # Use same code-generator version as k8s.io/api CODEGEN_VERSION := $(shell go list -m -f '{{.Version}}' k8s.io/api) CODEGEN = $(shell pwd)/bin/code-generator diff --git a/cmd/epp-light/main.go b/cmd/epp-light/main.go new file mode 100644 index 0000000000..8e52fb80dd --- /dev/null +++ b/cmd/epp-light/main.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The epp-light binary is a minimal, reference Endpoint Picker (EPP) implementation. +// To use a custom picker, call runner.NewRunner().WithPicker(myPicker).Run(...). +package main + +import ( + "os" + + ctrl "sigs.k8s.io/controller-runtime" + + "sigs.k8s.io/gateway-api-inference-extension/cmd/epp-light/runner" +) + +func main() { + if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil { + os.Exit(1) + } +} diff --git a/cmd/epp-light/runner/runner.go b/cmd/epp-light/runner/runner.go new file mode 100644 index 0000000000..4d4069d40f --- /dev/null +++ b/cmd/epp-light/runner/runner.go @@ -0,0 +1,118 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runner + +import ( + "context" + "fmt" + + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + epplight "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light/server" +) + +var scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1.Install(scheme)) +} + +// Runner encapsulates the light EPP server lifecycle. +// External projects can import this, swap in a custom EndpointPicker via +// WithPicker(), and call Run(). +type Runner struct { + picker epplight.EndpointPicker +} + +// NewRunner creates a Runner with the default RandomPicker. +func NewRunner() *Runner { + return &Runner{ + picker: epplight.NewRandomPicker(), + } +} + +// WithPicker sets a custom EndpointPicker implementation. +func (r *Runner) WithPicker(picker epplight.EndpointPicker) *Runner { + r.picker = picker + return r +} + +// Run parses flags, creates the controller manager, wires everything together, +// and blocks until the context is cancelled. +func (r *Runner) Run(ctx context.Context) error { + opts := server.NewOptions() + opts.AddFlags(pflag.CommandLine) + pflag.Parse() + + ctrl.SetLogger(zap.New()) + logger := ctrl.Log.WithName("epp-light") + + if err := opts.Validate(); err != nil { + return fmt.Errorf("invalid options: %w", err) + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + }) + if err != nil { + return fmt.Errorf("unable to create controller manager: %w", err) + } + + datastore := epplight.NewDatastore() + + // Use remote gRPC picker if --picker-address is set, otherwise use in-process picker. + picker := r.picker + if opts.PickerAddress != "" { + logger.Info("Using remote gRPC picker", "address", opts.PickerAddress) + grpcPicker, err := epplight.NewGRPCPicker(opts.PickerAddress) + if err != nil { + return fmt.Errorf("failed to create gRPC picker: %w", err) + } + picker = grpcPicker + } + + serverRunner := &server.ExtProcServerRunner{ + GRPCPort: opts.GRPCPort, + PoolNamespace: opts.PoolNamespace, + PoolName: opts.PoolName, + Datastore: datastore, + Picker: picker, + SecureServing: opts.SecureServing, + CertPath: opts.CertPath, + } + + if err := serverRunner.SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed to setup controllers: %w", err) + } + + if err := mgr.Add(serverRunner.AsRunnable(logger)); err != nil { + return fmt.Errorf("failed to add ext-proc server runnable: %w", err) + } + + logger.Info(fmt.Sprintf("Starting light EPP for pool %s/%s on port %d", + opts.PoolNamespace, opts.PoolName, opts.GRPCPort)) + + return mgr.Start(ctx) +} diff --git a/docs/proposals/XXXX-light-epp/README.md b/docs/proposals/XXXX-light-epp/README.md new file mode 100644 index 0000000000..b0cdc54929 --- /dev/null +++ b/docs/proposals/XXXX-light-epp/README.md @@ -0,0 +1,527 @@ +# Light EPP: Minimal Reference Endpoint Picker + +Author(s): @atchernych + +## Proposal Status + +***Provisional*** + +## Related Issues + +- [#2430 — Lighter EPP discussion](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/2430) +- [004 — Endpoint Picker Protocol](../004-endpoint-picker-protocol/README.md) +- [0683 — EPP Architecture Proposal](../0683-epp-architecture-proposal/README.md) + +## Summary + +This proposal introduces a **Light EPP** (`pkg/epp-light/`) — a minimal, API-focused Endpoint Picker implementation that separates the Endpoint Picker Protocol from the production scheduling logic. It defines a single `EndpointPicker` interface that anyone can implement to build their own EPP, while the framework handles all Envoy ext-proc protocol details, InferencePool CRD discovery, and pod lifecycle management. + +The Light EPP is intended to serve as: + +1. A **reference implementation** for the Endpoint Picker Protocol (proposal 004) +2. A **conformance testing target** for gateway providers +3. A **starting point** for third-party EPP implementations in any language + +## Goals + +- Define a clean, single-method `EndpointPicker` interface that decouples endpoint selection from protocol handling +- Fully implement the [Endpoint Picker Protocol](../004-endpoint-picker-protocol/README.md) (subset filtering, destination metadata, fallback endpoints) +- Support the stable `v1` InferencePool CRD for pod discovery +- Provide a default random-selection implementation as a reference +- **Enable cross-language implementations** via a gRPC `EndpointPickerService` protobuf definition, so Rust, Python, C++, or any language can implement custom endpoint selection +- Keep the codebase minimal (~17 files) and self-contained with zero imports from `pkg/epp/` + +## Non-Goals + +- Replace the full `pkg/epp/` for production workloads +- Support InferenceObjective (priority/fairness) or InferenceModelRewrite (traffic splitting) CRDs +- Implement scheduling frameworks, flow control, or plugin systems +- Provide Prometheus metrics or observability instrumentation (can be added later) +- Support the experimental `v1alpha2` InferencePool API + +## Motivation + +The current `pkg/epp/` is a full-featured, production-grade system but it creates a high barrier for: + +- **Gateway providers** who need a conformance target for the EPP protocol +- **Third-party implementors** who want to build custom endpoint selection without understanding 30+ files of internal machinery +- **Testing** where a simple, predictable EPP is sufficient + +As discussed in [#2430](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/2430), the EPP API/protocol should remain in Kubernetes governance while production selection logic moves to specialized projects (e.g., llm-d). The Light EPP embodies this separation. + +## Proposal + +### Architecture + +The Light EPP decomposes the EPP into two layers: + +``` + +-----------------------------------------+ + | Envoy (ext-proc client) | + +-------------------+---------------------+ + | + +-------------------v---------------------+ + | Protocol Layer (framework) | + | | + | - Ext-proc Process loop (gRPC) | + | - Request/response state machine | + | - EPP metadata generation | + | - Subset filtering | + | - InferencePool + Pod reconcilers | + +-------------------+---------------------+ + | + EndpointPicker.Pick() + | + +-------------------v---------------------+ + | Selection Layer (pluggable) | + | | + | Implement your own logic: | + | - Random (default) | + | - Model-aware routing | + | - Prefix-cache affinity | + | - Latency-based scoring | + | - Any custom algorithm | + +-----------------------------------------+ +``` + +### Cross-Language Support via gRPC Picker Service + +The Go `EndpointPicker` interface only works for Go implementations compiled into the same binary as the Light EPP server. To enable endpoint selection in **any language** (Rust, Python, C++, etc.), the Light EPP also defines a gRPC `EndpointPickerService` protobuf: + +```protobuf +// pkg/epp-light/proto/picker.proto + +syntax = "proto3"; +package epplight; + +service EndpointPickerService { + rpc Pick(PickRequest) returns (PickResponse); +} + +message PickRequest { + map headers = 1; + bytes body = 2; + string model = 3; + repeated string candidate_subset = 4; + repeated EndpointInfo endpoints = 5; +} + +message EndpointInfo { + string address = 1; + string port = 2; + string name = 3; + map labels = 4; +} + +message PickResponse { + string endpoint = 1; + repeated string fallbacks = 2; +} +``` + +The protobuf messages mirror the Go types exactly (`RequestInfo` ↔ `PickRequest`, `Endpoint` ↔ `EndpointInfo`, `PickResult` ↔ `PickResponse`), making the mapping trivial. + +#### How Both Paths Coexist + +A `GRPCPicker` adapter implements the Go `EndpointPicker` interface by calling out to a remote gRPC service. The `server.go` Process loop doesn't change — it always calls `s.picker.Pick()`. The picker is either local (same binary) or remote (separate process): + +``` + Local Go picker (same binary) + ┌────────────────────────┐ + │ RandomPicker │ + │ or custom Go picker │ + └────────────────────────┘ + ▲ + │ (implements EndpointPicker) + │ +server.go ──── picker.Pick() ──┤ + │ + │ (implements EndpointPicker via gRPC) + ▼ + ┌────────────────────────┐ + │ GRPCPicker │──── gRPC ────► Remote Service + │ (adapter/client) │ (separate process: + └────────────────────────┘ Rust, Python, etc.) +``` + +The `--picker-address` CLI flag controls which path is used: +- **Unset** → local Go picker (default `RandomPicker`, or custom Go picker via `runner.WithPicker()`) +- **Set** (e.g., `--picker-address=localhost:9010`) → `GRPCPicker` connects to a remote picker service + +#### Benefits of this Approach + +1. **Language-agnostic** — any language with gRPC support can implement the picker service. The `.proto` file is the contract. +2. **No protocol reimplementation** — non-Go implementations don't need to understand ext-proc, Envoy metadata, subset filtering, or Kubernetes CRDs. The Go Light EPP handles all of that. +3. **Zero overhead for Go** — local Go pickers use a direct function call within the same process, with no serialization or network hop. +4. **Independently deployable** — the remote picker service can be scaled, versioned, and deployed separately from the protocol layer. +5. **Testable** — the proto definition enables generating client/server stubs in any language for testing. + +#### Implementing an EPP in Rust + +A Rust implementation follows these steps: + +**Step 1: Generate Rust stubs from `picker.proto`** + +Add the proto to your Rust project and use `tonic-build`: + +```rust +// build.rs +fn main() -> Result<(), Box> { + tonic_build::compile_protos("path/to/picker.proto")?; + Ok(()) +} +``` + +**Step 2: Implement the `EndpointPickerService` trait** + +```rust +use tonic::{Request, Response, Status}; + +pub struct MyRustPicker; + +#[tonic::async_trait] +impl EndpointPickerService for MyRustPicker { + async fn pick( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + // Custom selection logic using req.model, req.headers, + // req.endpoints, req.candidate_subset, req.body, etc. + let chosen = req.endpoints.first() + .ok_or_else(|| Status::unavailable("no endpoints"))?; + + Ok(Response::new(PickResponse { + endpoint: format!("{}:{}", chosen.address, chosen.port), + fallbacks: vec![], + })) + } +} +``` + +**Step 3: Run the gRPC server** + +```rust +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "0.0.0.0:9010".parse()?; + let picker = MyRustPicker; + + Server::builder() + .add_service(EndpointPickerServiceServer::new(picker)) + .serve(addr) + .await?; + + Ok(()) +} +``` + +**Step 4: Run the Go Light EPP pointing to the Rust service** + +```bash +# Start the Rust picker service +cargo run # listens on :9010 + +# Start the Go Light EPP, delegating selection to Rust +epp-light --pool-name=my-pool --pool-namespace=default --picker-address=localhost:9010 +``` + +The full request flow becomes: + +``` +Client ──HTTP──► Envoy ──ext-proc──► Go Light EPP ──gRPC──► Rust Picker + (protocol layer) (selection logic) + handles: handles: + - ext-proc state - endpoint selection + - metadata keys - custom routing + - subset filtering - model affinity + - pod discovery - etc. + - InferencePool CRD +``` + +The Rust implementation only needs to decide *which* endpoint — everything else is handled by the Go protocol layer. + +#### When to Use the gRPC Picker vs. a Native ext-proc Implementation + +The gRPC picker service is a **convenience layer**, not a requirement. There are two valid approaches for building a non-Go EPP, and the right choice depends on the use case: + +**Approach A: gRPC Picker via the Light EPP (recommended for prototyping and simple use cases)** + +``` +Envoy ──ext-proc──► Go Light EPP ──gRPC──► Rust/Python Picker +``` + +The non-Go code only implements endpoint selection. The Go Light EPP handles: +- The ext-proc bidirectional streaming state machine with 7 request states, correct response ordering, and chunking (`server.go` — 273 lines) +- EPP protocol metadata generation: `envoy.lb` namespace, `x-gateway-destination-endpoint` in both headers and dynamic metadata, subset filtering from `envoy.lb.subset_hint` (`request.go` — 163 lines, `response.go` — 43 lines) +- InferencePool CRD watching via Kubernetes controller-runtime: pod reconciliation, label selectors, readiness tracking, active ports (`datastore.go` — 230 lines, `controller/` — 166 lines) +- Error responses (`ImmediateResponse` with 503/429 per the protocol spec) + +This is ~900 lines of protocol and infrastructure code that has nothing to do with choosing an endpoint. The gRPC picker service lets implementors skip all of it. + +**Latency note:** The additional gRPC hop adds ~0.1–0.5ms on localhost/sidecar, ~0.5–2ms across pods in the same AZ. The `PickRequest` message is small (headers map, model string, endpoint list), so serialization overhead is negligible. For context, the ext-proc call from Envoy to the Light EPP already incurs a similar hop, and model server inference responses typically take 50ms–10s+. + +**Best for:** rapid prototyping, simple selection logic, teams that don't want to maintain Kubernetes controller code in their language. + +**Approach B: Native ext-proc implementation (recommended for production)** + +``` +Envoy ──ext-proc──► Rust EPP (implements ext-proc directly) +``` + +The non-Go code implements the full Envoy ext-proc service, the EPP protocol metadata, and Kubernetes CRD watching natively. There is no Go intermediary. + +This is more work upfront (~900 lines equivalent of protocol and Kubernetes plumbing) but gives full control over the entire stack with no Go dependency. The ext-proc protocol is defined in protobuf (`envoy.service.ext_proc.v3.ExternalProcessor`), so any language with gRPC support can implement it directly. The EPP protocol (proposal 004) is just metadata key conventions on top of ext-proc — no additional wire protocol to implement. + +**Best for:** teams that want full ownership of the stack with no intermediary (e.g., llm-d), or when the selection logic needs tight integration with the request lifecycle (e.g., custom response processing, streaming-aware routing). + +Both approaches are valid. The `picker.proto` and the ext-proc protocol are complementary — they serve different levels of abstraction for different needs. + +### Core Interface + +The central abstraction is a single interface with one method: + +```go +type EndpointPicker interface { + Pick(ctx context.Context, req *RequestInfo, endpoints []Endpoint) (*PickResult, error) +} +``` + +Where: + +```go +type Endpoint struct { + Address string // Pod IP + Port string // Target port + Name string // Pod name (namespace/name) + Labels map[string]string // Pod labels +} + +type RequestInfo struct { + Headers map[string]string // HTTP request headers + Body []byte // Raw request body (nil for GET) + Model string // Model name from body, if present + CandidateSubset []string // From x-gateway-destination-endpoint-subset +} + +type PickResult struct { + Endpoint string // Primary endpoint (ip:port) + Fallbacks []string // Optional fallback endpoints (ip:port) +} +``` + +**Design properties:** + +- **Rich input** — `RequestInfo` provides headers, raw body, extracted model name, and the candidate subset, giving implementors everything they need for intelligent routing decisions +- **Rich endpoint data** — `Endpoint` includes pod labels, enabling label-based routing without a scheduling framework +- **Protocol compliance is handled** — subset filtering, metadata generation, and ext-proc state management are the framework's concern, not the implementor's +- **Flat types** — `Endpoint` is a simple struct, not an interface with metrics/attributes/factory indirection + +### What Changed vs. `pkg/epp/` + +The current EPP request flow is a 10-step orchestration pipeline: + +``` +Request → parse → getObjective → admit → locateCandidates → prepareData + → admissionPlugins → schedule → preRequest → modelRewrite → respond +``` + +The Light EPP collapses this to 4 steps: + +``` +Request → extractModel → resolveSubset+filterEndpoints → picker.Pick → respond +``` + +| Current `pkg/epp/` | Light EPP | Rationale | +|---|---|---| +| `Director` orchestrator | Direct `picker.Pick()` call | No need for multi-step orchestration | +| `Scheduler` with profiles/filters/scorers | `EndpointPicker` interface | Selection algorithm is the implementor's concern | +| `Parser` interface + `LLMRequestBody` | Simple JSON `model` extraction | Raw body available in `RequestInfo.Body` for custom parsing | +| InferenceObjective reconciler + priority | Removed | Out of scope | +| InferenceModelRewrite reconciler + traffic split | Removed | Out of scope | +| Flow control (saturation, fairness, queuing) | Removed | Implementation concern | +| Data layer (polling, notifications, extractors) | Removed | Implementation concern | +| Plugin registry + DAG validation | Removed | No plugin system needed | +| `EndpointFactory` + metrics goroutines | Simple `Endpoint` struct | No background metrics collection | +| `fwkdl.Endpoint` interface | `Endpoint` value type | No interface indirection needed | + +### Package Structure + +``` +pkg/epp-light/ + picker.go — EndpointPicker interface, Endpoint/RequestInfo/PickResult types + picker_random.go — Default random picker (reference implementation) + picker_grpc.go — GRPCPicker: implements EndpointPicker via remote gRPC service + metadata.go — EPP protocol constants (proposal 004) + datastore.go — Simplified datastore (pool + pods only) + server.go — Ext-proc StreamingServer with Process loop + request.go — Request handling, metadata generation, subset filtering + response.go — Response handling + proto/ + picker.proto — EndpointPickerService protobuf definition + gen/ — Generated Go stubs (picker.pb.go, picker_grpc.pb.go) + controller/ + pool.go — InferencePool v1 reconciler + pod.go — Pod reconciler + server/ + runner.go — ExtProcServerRunner (gRPC wiring) + options.go — Minimal CLI flags (including --picker-address) +cmd/epp-light/ + main.go — Entrypoint + runner/ + runner.go — Runner with WithPicker() and gRPC picker wiring +``` + +17 files total, versus dozens in `pkg/epp/`. + +### Dependency Isolation + +The Light EPP has **zero imports from `pkg/epp/`**: + +| Dependency | Source | Usage | +|---|---|---| +| `api/v1` | Shared CRD types | InferencePool type definitions | +| `pkg/common/envoy` | Shared utilities | Envoy helpers (headers, metadata, chunking) | +| `pkg/common/error` | Shared utilities | Error types and `BuildErrResponse` | +| `pkg/common/request` | Shared utilities | `RequestIdHeaderKey` constant | +| `internal/runnable` | Infrastructure | `GRPCServer`, `NoLeaderElection` | +| `internal/tls` | Infrastructure | TLS certificate handling | + +The 4 EPP protocol constants (`SubsetFilterNamespace`, `SubsetFilterKey`, `DestinationEndpointNamespace`, `DestinationEndpointKey`) are copied into `metadata.go` rather than imported from `pkg/epp/metadata`, ensuring the two packages can evolve independently. + +### Datastore Simplifications + +| `pkg/epp/datastore` | `pkg/epp-light/datastore` | +|---|---| +| `EndpointFactory` with background goroutines | Direct `Endpoint` struct creation | +| `fwkdl.Endpoint` interface with `GetMetrics()`, `GetAttributes()` | Simple `Endpoint` value struct | +| `ObjectiveSet/Get/Delete/GetAll` | Removed | +| `ModelRewriteSet/Get/Delete/GetAll` | Removed | +| `modelServerMetricsPort` | Removed | +| `parentCtx` for goroutine lifecycle | Not needed (no goroutines) | + +Retained: pool set/get with pod resync, pod CRUD via `sync.Map`, rank-based endpoint naming for multi-port, `activePortsAnnotation` support, label selector matching. + +### Usage Examples + +#### Option A: Custom Go Picker (local, same binary) + +```go +package main + +import ( + "os" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/gateway-api-inference-extension/cmd/epp-light/runner" + epplight "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light" +) + +type ModelAwarePicker struct{} + +func (p *ModelAwarePicker) Pick( + ctx context.Context, + req *epplight.RequestInfo, + endpoints []epplight.Endpoint, +) (*epplight.PickResult, error) { + for _, ep := range endpoints { + if ep.Labels["model"] == req.Model { + return &epplight.PickResult{Endpoint: ep.Address + ":" + ep.Port}, nil + } + } + if len(endpoints) > 0 { + ep := endpoints[0] + return &epplight.PickResult{Endpoint: ep.Address + ":" + ep.Port}, nil + } + return nil, fmt.Errorf("no endpoints for model %q", req.Model) +} + +func main() { + if err := runner.NewRunner().WithPicker(&ModelAwarePicker{}).Run(ctrl.SetupSignalHandler()); err != nil { + os.Exit(1) + } +} +``` + +#### Option B: Remote Picker via gRPC (separate process, any language) + +No custom Go code needed — just run the binary with `--picker-address`: + +```bash +epp-light \ + --pool-name=my-pool \ + --pool-namespace=default \ + --picker-address=localhost:9010 +``` + +The remote service at `localhost:9010` implements `EndpointPickerService` from `picker.proto` in any language. See [Implementing an EPP in Rust](#implementing-an-epp-in-rust) for a complete example. + +## Protocol Conformance + +The Light EPP fully implements the [Endpoint Picker Protocol](../004-endpoint-picker-protocol/README.md): + +| Protocol Requirement | Implementation | +|---|---| +| Set `x-gateway-destination-endpoint` header | `request.go:generateRequestHeaders()` | +| Set `envoy.lb` dynamic metadata | `request.go:generateDestinationMetadata()` | +| Respect `envoy.lb.subset_hint` / `x-gateway-destination-endpoint-subset` | `request.go:extractCandidateSubset()` + `filterEndpointsBySubset()` | +| Support multiple endpoints (fallback) | `PickResult.Fallbacks` joined with `,` | +| Return 503 when no endpoints available | `server.go:handleRequestBody()` | +| Header and metadata values must match | Single `targetEndpoint` string used for both | + +## Alternatives Considered + +### 1. Go interface only, no gRPC picker service + +A Rust or Python EPP would have to reimplement the entire ext-proc protocol layer (Process loop, state machine, metadata generation, subset filtering, pod discovery). This is hundreds of lines of protocol-specific code that has nothing to do with endpoint selection. The gRPC picker service lets non-Go implementations focus purely on selection logic while the Go Light EPP handles everything else. + +### 2. gRPC only, no local Go interface + +All pickers (including Go ones) would communicate via gRPC to a separate process. This simplifies the architecture (one path instead of two) but adds a network hop and serialization overhead for Go pickers that don't need it. The dual-path approach (local for Go, remote gRPC for other languages) gives zero overhead when it's not needed. + +### 3. Define EndpointPicker as a plugin within the existing framework + +The existing plugin framework (`framework/interface/scheduling/plugins.go`) defines Filter, Scorer, and Picker as separate interfaces composed into SchedulerProfiles. This is powerful but forces implementors to understand the profile/filter/scorer/picker pipeline. A single `Pick` method is a lower abstraction barrier. + +### 5. CGo/FFI bindings (Go → C → Rust) instead of gRPC + +Instead of a gRPC call to a remote picker, the Rust selection logic could be compiled as a C-compatible library and called directly from Go via CGo + Rust FFI. This eliminates the ~0.5ms network hop but introduces significant trade-offs: CGo pins an OS thread per call (breaking Go's goroutine concurrency model under load), the build requires Go + C + Rust toolchains, memory must be manually managed across the FFI boundary, debugging tools can't cross the language boundary, and a panic in Rust kills the entire Go process including the ext-proc server and Kubernetes controllers. For a production Rust EPP where the team wants full ownership of the stack with no Go dependency, the native ext-proc implementation (Approach B) is the better fit — full Rust, no intermediary. + +### 6. Use `fwkdl.Endpoint` interface instead of a flat struct + +The existing `fwkdl.Endpoint` interface requires `GetMetrics()`, `GetAttributes()`, `UpdateMetrics()`, and the `EndpointFactory` abstraction. This pulls in the data layer framework. A simple struct with Address, Port, Name, Labels is sufficient for routing decisions and avoids the coupling. The flat struct also maps cleanly to the `EndpointInfo` protobuf message for cross-language support. + +## Testing + +### Unit Tests + +- `picker_random_test.go` — Random selection, empty list error, distribution across endpoints +- `picker_grpc_test.go` — GRPCPicker with mock gRPC server: verify Go→proto→Go round-trip for all types +- `datastore_test.go` — Pool set/get, pod CRUD, label matching, endpoint listing, active ports +- `server_test.go` — Ext-proc Process loop with mock stream (header-only, body, subset filtering, no-endpoints) +- `request_test.go` — `extractModelFromBody`, `extractCandidateSubset`, `filterEndpointsBySubset`, metadata generation +- `controller/*_test.go` — Pool and pod reconcilers with fake k8s client + +### Integration Tests + +- Start full Light EPP against a fake k8s client with local Go picker, send ext-proc requests via gRPC client, verify endpoint selection in headers and dynamic metadata +- Start full Light EPP with `--picker-address` pointing to a test gRPC picker server, verify the remote picker is called and endpoints are returned correctly + +### Conformance Checks + +Per proposal 004: + +- `x-gateway-destination-endpoint` header set on every successful response +- `envoy.lb` dynamic metadata set with matching endpoint value +- Subset filtering respects `envoy.lb.subset_hint` +- 503 returned when no endpoints available +- 503 returned when candidate subset matches no available endpoints + +### Build Verification + +```bash +make generate-proto-light # Generate proto code +go build ./pkg/epp-light/... ./cmd/epp-light/... # Build all packages +go vet ./pkg/epp-light/... ./cmd/epp-light/... # Vet +``` diff --git a/pkg/epp-light/controller/pod.go b/pkg/epp-light/controller/pod.go new file mode 100644 index 0000000000..c74ef3dbe6 --- /dev/null +++ b/pkg/epp-light/controller/pod.go @@ -0,0 +1,109 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + epplight "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light" +) + +// PodReconciler watches Pods and updates the datastore based on the pool selector. +type PodReconciler struct { + client.Reader + Datastore epplight.Datastore +} + +func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + if !c.Datastore.PoolHasSynced() { + logger.V(4).Info("Skipping Pod reconcile: InferencePool not synced yet") + return ctrl.Result{}, nil + } + + pod := &corev1.Pod{} + if err := c.Get(ctx, req.NamespacedName, pod); err != nil { + if apierrors.IsNotFound(err) { + c.Datastore.PodDelete(req.Name) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get pod: %w", err) + } + + c.updateDatastore(ctx, pod) + return ctrl.Result{}, nil +} + +func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + filter := predicate.Funcs{ + CreateFunc: func(ce event.CreateEvent) bool { + pod := ce.Object.(*corev1.Pod) + return c.Datastore.PoolLabelsMatch(pod.GetLabels()) + }, + UpdateFunc: func(ue event.UpdateEvent) bool { + oldPod := ue.ObjectOld.(*corev1.Pod) + newPod := ue.ObjectNew.(*corev1.Pod) + return c.Datastore.PoolLabelsMatch(oldPod.GetLabels()) || c.Datastore.PoolLabelsMatch(newPod.GetLabels()) + }, + DeleteFunc: func(de event.DeleteEvent) bool { + pod := de.Object.(*corev1.Pod) + return c.Datastore.PoolLabelsMatch(pod.GetLabels()) + }, + GenericFunc: func(ge event.GenericEvent) bool { + pod := ge.Object.(*corev1.Pod) + return c.Datastore.PoolLabelsMatch(pod.GetLabels()) + }, + } + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithEventFilter(filter). + Complete(c) +} + +func (c *PodReconciler) updateDatastore(ctx context.Context, pod *corev1.Pod) { + logger := log.FromContext(ctx) + if !isPodReady(pod) || !c.Datastore.PoolLabelsMatch(pod.Labels) { + logger.V(2).Info("Pod removed or not ready", "pod", pod.Name) + c.Datastore.PodDelete(pod.Name) + } else { + if !c.Datastore.PodUpdateOrAddIfNotExist(ctx, pod) { + logger.Info("Pod added", "pod", pod.Name) + } + } +} + +// isPodReady checks if a pod is ready and not being deleted. +func isPodReady(pod *corev1.Pod) bool { + if !pod.DeletionTimestamp.IsZero() { + return false + } + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + return condition.Status == corev1.ConditionTrue + } + } + return false +} diff --git a/pkg/epp-light/controller/pool.go b/pkg/epp-light/controller/pool.go new file mode 100644 index 0000000000..67562b162c --- /dev/null +++ b/pkg/epp-light/controller/pool.go @@ -0,0 +1,95 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + epplight "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light" +) + +// InferencePoolReconciler watches InferencePool resources and updates the datastore. +type InferencePoolReconciler struct { + client.Reader + Datastore epplight.Datastore + PoolNamespace string + PoolName string +} + +func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + logger.Info("Reconciling InferencePool") + + pool := &v1.InferencePool{} + if err := c.Get(ctx, req.NamespacedName, pool); err != nil { + if errors.IsNotFound(err) { + logger.Info("InferencePool not found, clearing datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get InferencePool: %w", err) + } + + if !pool.GetDeletionTimestamp().IsZero() { + logger.Info("InferencePool is marked for deletion, clearing datastore") + c.Datastore.Clear() + return ctrl.Result{}, nil + } + + poolInfo := inferencePoolToPoolInfo(pool) + if err := c.Datastore.PoolSet(ctx, c.Reader, poolInfo); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update datastore: %w", err) + } + + return ctrl.Result{}, nil +} + +func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1.InferencePool{}). + Complete(c) +} + +// PoolNamespacedName returns the namespaced name for the pool this reconciler watches. +func (c *InferencePoolReconciler) PoolNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: c.PoolName, Namespace: c.PoolNamespace} +} + +func inferencePoolToPoolInfo(pool *v1.InferencePool) *epplight.PoolInfo { + targetPorts := make([]int, 0, len(pool.Spec.TargetPorts)) + for _, p := range pool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + } + selector := make(map[string]string, len(pool.Spec.Selector.MatchLabels)) + for k, v := range pool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + return &epplight.PoolInfo{ + Name: pool.Name, + Namespace: pool.Namespace, + Selector: selector, + TargetPorts: targetPorts, + } +} diff --git a/pkg/epp-light/datastore.go b/pkg/epp-light/datastore.go new file mode 100644 index 0000000000..0a9de89be8 --- /dev/null +++ b/pkg/epp-light/datastore.go @@ -0,0 +1,287 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + "context" + "errors" + "fmt" + "maps" + "strconv" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +var errPoolNotSynced = errors.New("InferencePool is not initialized in data store") + +const ( + // activePortsAnnotation specifies which ports on a pod are active for inference traffic. + // Value is a comma-separated list of port numbers, e.g. "8000,8001". + activePortsAnnotation = "inference.networking.k8s.io/active-ports" +) + +// PoolInfo holds the essential information from an InferencePool resource. +type PoolInfo struct { + Name string + Namespace string + Selector map[string]string + TargetPorts []int +} + +// Datastore is the internal interface for the light EPP's pod/pool cache. +type Datastore interface { + PoolGet() (*PoolInfo, error) + PoolSet(ctx context.Context, reader client.Reader, pool *PoolInfo) error + PoolHasSynced() bool + PoolLabelsMatch(podLabels map[string]string) bool + ListEndpoints() []Endpoint + PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod) bool + PodDelete(podName string) + Clear() +} + +// NewDatastore creates a new datastore instance. +func NewDatastore() *datastore { + return &datastore{ + pods: &sync.Map{}, + } +} + +type datastore struct { + mu sync.RWMutex + pool *PoolInfo + // key: types.NamespacedName (endpoint name), value: Endpoint + pods *sync.Map +} + +func (ds *datastore) Clear() { + ds.mu.Lock() + defer ds.mu.Unlock() + ds.pool = nil + ds.pods.Clear() +} + +// PoolSet sets the pool in the datastore. If the selector or targetPorts changed, +// a full pod resync is triggered. +func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *PoolInfo) error { + if pool == nil { + ds.Clear() + return nil + } + logger := log.FromContext(ctx) + ds.mu.Lock() + defer ds.mu.Unlock() + + oldPool := ds.pool + ds.pool = pool + + selectorChanged := oldPool == nil || !labels.Equals(oldPool.Selector, pool.Selector) + targetPortsChanged := oldPool != nil && !slicesEqual(oldPool.TargetPorts, pool.TargetPorts) + + if selectorChanged || targetPortsChanged { + logger.Info("Updating endpoints", "selector", pool.Selector, "targetPortsChanged", targetPortsChanged) + if err := ds.podResyncAll(ctx, reader); err != nil { + return fmt.Errorf("failed to resync pods: %w", err) + } + } + return nil +} + +func (ds *datastore) PoolGet() (*PoolInfo, error) { + ds.mu.RLock() + defer ds.mu.RUnlock() + if ds.pool == nil { + return nil, errPoolNotSynced + } + return ds.pool, nil +} + +func (ds *datastore) PoolHasSynced() bool { + ds.mu.RLock() + defer ds.mu.RUnlock() + return ds.pool != nil +} + +func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool { + ds.mu.RLock() + defer ds.mu.RUnlock() + if ds.pool == nil { + return false + } + return labels.SelectorFromSet(ds.pool.Selector).Matches(labels.Set(podLabels)) +} + +// ListEndpoints returns all endpoints currently tracked in the datastore. +func (ds *datastore) ListEndpoints() []Endpoint { + var result []Endpoint + ds.pods.Range(func(_, v any) bool { + result = append(result, v.(Endpoint)) + return true + }) + return result +} + +// PodUpdateOrAddIfNotExist adds or updates endpoints for the given pod. +// Returns true if the pod already existed (all endpoints were known). +func (ds *datastore) PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod) bool { + ds.mu.RLock() + pool := ds.pool + ds.mu.RUnlock() + return ds.podUpdateOrAddIfNotExist(ctx, pod, pool) +} + +func (ds *datastore) podUpdateOrAddIfNotExist(_ context.Context, pod *corev1.Pod, pool *PoolInfo) bool { + if pool == nil { + return true + } + + podLabels := make(map[string]string, len(pod.GetLabels())) + maps.Copy(podLabels, pod.GetLabels()) + + activePorts := extractActivePorts(pod, pool.TargetPorts) + allExisted := true + + for idx, port := range pool.TargetPorts { + epName := createEndpointName(pod, idx) + if !activePorts.Has(port) { + // Remove endpoint if port is no longer active. + ds.pods.Delete(epName) + continue + } + ep := Endpoint{ + Address: pod.Status.PodIP, + Port: strconv.Itoa(port), + Name: pod.Namespace + "/" + pod.Name, + Labels: podLabels, + } + if _, loaded := ds.pods.LoadOrStore(epName, ep); !loaded { + allExisted = false + } else { + // Update existing endpoint (labels or IP may have changed). + ds.pods.Store(epName, ep) + } + } + return allExisted +} + +func (ds *datastore) PodDelete(podName string) { + ds.pods.Range(func(k, v any) bool { + ep := v.(Endpoint) + // ep.Name is "namespace/name", extract just the name part. + if parts := strings.SplitN(ep.Name, "/", 2); len(parts) == 2 && parts[1] == podName { + ds.pods.Delete(k) + } + return true + }) +} + +func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) error { + logger := log.FromContext(ctx) + podList := &corev1.PodList{} + if err := reader.List(ctx, podList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(ds.pool.Selector), + Namespace: ds.pool.Namespace, + }); err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + activeEndpoints := sets.New[types.NamespacedName]() + for i := range podList.Items { + pod := &podList.Items[i] + if !isPodReady(pod) { + continue + } + for idx := range ds.pool.TargetPorts { + activeEndpoints.Insert(createEndpointName(pod, idx)) + } + if !ds.podUpdateOrAddIfNotExist(ctx, pod, ds.pool) { + logger.Info("Pod added during resync", "pod", pod.Name) + } + } + + // Remove endpoints that are no longer active. + ds.pods.Range(func(k, _ any) bool { + name := k.(types.NamespacedName) + if !activeEndpoints.Has(name) { + logger.Info("Removing stale endpoint", "endpoint", name) + ds.pods.Delete(k) + } + return true + }) + return nil +} + +// isPodReady checks if a pod is ready and not being deleted. +func isPodReady(pod *corev1.Pod) bool { + if !pod.DeletionTimestamp.IsZero() { + return false + } + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + return condition.Status == corev1.ConditionTrue + } + } + return false +} + +// extractActivePorts returns the set of active ports for a pod. +// If the active-ports annotation is not set, all target ports are considered active. +func extractActivePorts(pod *corev1.Pod, targetPorts []int) sets.Set[int] { + allPorts := sets.New(targetPorts...) + portsAnnotation, ok := pod.GetAnnotations()[activePortsAnnotation] + if !ok { + return allPorts + } + + activePorts := sets.New[int]() + for portStr := range strings.SplitSeq(portsAnnotation, ",") { + var portNum int + _, err := fmt.Sscanf(strings.TrimSpace(portStr), "%d", &portNum) + if err == nil && portNum > 0 && allPorts.Has(portNum) { + activePorts.Insert(portNum) + } + } + return activePorts +} + +// createEndpointName creates a namespaced name for an endpoint based on pod and rank index. +func createEndpointName(pod *corev1.Pod, idx int) types.NamespacedName { + return types.NamespacedName{ + Name: pod.Name + "-rank-" + strconv.Itoa(idx), + Namespace: pod.Namespace, + } +} + +// slicesEqual checks if two int slices are equal. +func slicesEqual(a, b []int) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/pkg/epp-light/metadata.go b/pkg/epp-light/metadata.go new file mode 100644 index 0000000000..b1362f7acb --- /dev/null +++ b/pkg/epp-light/metadata.go @@ -0,0 +1,30 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +// Endpoint Picker Protocol constants per proposal 004. +// See: docs/proposals/004-endpoint-picker-protocol/ +const ( + // SubsetFilterNamespace is the metadata namespace for the candidate endpoint subset. + SubsetFilterNamespace = "envoy.lb.subset_hint" + // SubsetFilterKey is the metadata key for the candidate endpoint subset list. + SubsetFilterKey = "x-gateway-destination-endpoint-subset" + // DestinationEndpointNamespace is the metadata namespace for the selected endpoint. + DestinationEndpointNamespace = "envoy.lb" + // DestinationEndpointKey is the header and metadata key for the selected endpoint. + DestinationEndpointKey = "x-gateway-destination-endpoint" +) diff --git a/pkg/epp-light/picker.go b/pkg/epp-light/picker.go new file mode 100644 index 0000000000..0f713afb61 --- /dev/null +++ b/pkg/epp-light/picker.go @@ -0,0 +1,75 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package epplight provides a minimal, API-focused Endpoint Picker (EPP) implementation +// for the Gateway API Inference Extension. It implements the Endpoint Picker Protocol +// (proposal 004) and watches the InferencePool CRD for pod discovery. +// +// The core abstraction is the EndpointPicker interface, which allows anyone to implement +// their own endpoint selection logic while the package handles all ext-proc protocol details. +package epplight + +import "context" + +// Endpoint represents a backend endpoint available in the InferencePool. +type Endpoint struct { + // Address is the pod IP address. + Address string + // Port is the target port number as a string. + Port string + // Name is the pod name (namespace/name format). + Name string + // Labels are the pod's Kubernetes labels. + Labels map[string]string +} + +// RequestInfo contains information about the incoming request that the picker +// may use to make its selection decision. +type RequestInfo struct { + // Headers from the incoming HTTP request. + Headers map[string]string + // Body is the raw request body bytes. May be nil for header-only requests (e.g., GET). + Body []byte + // Model is the model name extracted from the request body, if present. + Model string + // CandidateSubset is the list of candidate endpoints from the + // x-gateway-destination-endpoint-subset metadata. When non-empty, + // the picker MUST select only from endpoints matching this set. + // When empty, all pool endpoints are candidates. + CandidateSubset []string +} + +// PickResult is the output of endpoint selection. +type PickResult struct { + // Endpoint is the primary selected endpoint in "ip:port" format. + Endpoint string + // Fallbacks is an optional ordered list of fallback endpoints in "ip:port" format. + // If the primary endpoint is unavailable, the proxy may try these in order. + Fallbacks []string +} + +// EndpointPicker is the core interface for endpoint selection. +// Implement this interface to build a custom EPP with your own selection logic. +// +// The server handles all Envoy ext-proc protocol details — subset filtering, +// metadata generation, header/body forwarding — so implementations only need +// to decide which endpoint to route to. +type EndpointPicker interface { + // Pick selects one or more endpoints from the available set. + // The endpoints slice contains the current set of ready endpoints, + // already filtered by CandidateSubset if applicable. + Pick(ctx context.Context, req *RequestInfo, endpoints []Endpoint) (*PickResult, error) +} diff --git a/pkg/epp-light/picker_grpc.go b/pkg/epp-light/picker_grpc.go new file mode 100644 index 0000000000..9fdeb78eac --- /dev/null +++ b/pkg/epp-light/picker_grpc.go @@ -0,0 +1,86 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + pb "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light/proto/gen" +) + +// GRPCPicker implements EndpointPicker by calling a remote gRPC EndpointPickerService. +// This allows non-Go implementations (Rust, Python, C++, etc.) to provide custom +// endpoint selection logic over the network. +type GRPCPicker struct { + client pb.EndpointPickerServiceClient + conn *grpc.ClientConn +} + +// NewGRPCPicker creates a GRPCPicker that connects to the given address. +func NewGRPCPicker(address string) (*GRPCPicker, error) { + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("failed to connect to picker service at %s: %w", address, err) + } + return &GRPCPicker{ + client: pb.NewEndpointPickerServiceClient(conn), + conn: conn, + }, nil +} + +// Pick calls the remote EndpointPickerService to select an endpoint. +func (p *GRPCPicker) Pick(ctx context.Context, req *RequestInfo, endpoints []Endpoint) (*PickResult, error) { + protoReq := &pb.PickRequest{ + Headers: req.Headers, + Body: req.Body, + Model: req.Model, + CandidateSubset: req.CandidateSubset, + Endpoints: toProtoEndpoints(endpoints), + } + + resp, err := p.client.Pick(ctx, protoReq) + if err != nil { + return nil, fmt.Errorf("remote picker failed: %w", err) + } + + return &PickResult{ + Endpoint: resp.Endpoint, + Fallbacks: resp.Fallbacks, + }, nil +} + +// Close closes the underlying gRPC connection. +func (p *GRPCPicker) Close() error { + return p.conn.Close() +} + +func toProtoEndpoints(endpoints []Endpoint) []*pb.EndpointInfo { + result := make([]*pb.EndpointInfo, len(endpoints)) + for i, ep := range endpoints { + result[i] = &pb.EndpointInfo{ + Address: ep.Address, + Port: ep.Port, + Name: ep.Name, + Labels: ep.Labels, + } + } + return result +} diff --git a/pkg/epp-light/picker_random.go b/pkg/epp-light/picker_random.go new file mode 100644 index 0000000000..c76bf4c362 --- /dev/null +++ b/pkg/epp-light/picker_random.go @@ -0,0 +1,45 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + "context" + "fmt" + "math/rand" + "net" +) + +// RandomPicker is the default EndpointPicker implementation that selects +// a random endpoint from the available set. It serves as a reference +// implementation for the EndpointPicker interface. +type RandomPicker struct{} + +// NewRandomPicker creates a new RandomPicker. +func NewRandomPicker() *RandomPicker { + return &RandomPicker{} +} + +// Pick selects a random endpoint from the available endpoints. +func (p *RandomPicker) Pick(_ context.Context, _ *RequestInfo, endpoints []Endpoint) (*PickResult, error) { + if len(endpoints) == 0 { + return nil, fmt.Errorf("no endpoints available") + } + ep := endpoints[rand.Intn(len(endpoints))] + return &PickResult{ + Endpoint: net.JoinHostPort(ep.Address, ep.Port), + }, nil +} diff --git a/pkg/epp-light/proto/gen/picker.pb.go b/pkg/epp-light/proto/gen/picker.pb.go new file mode 100644 index 0000000000..54ab2dcfb1 --- /dev/null +++ b/pkg/epp-light/proto/gen/picker.pb.go @@ -0,0 +1,402 @@ +// Copyright 2025 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v3.21.12 +// source: picker.proto + +package gen + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PickRequest contains all information needed to make an endpoint selection decision. +type PickRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // HTTP request headers. + Headers map[string]string `protobuf:"bytes,1,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Raw HTTP request body bytes. May be empty for header-only requests (e.g., GET). + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + // Model name extracted from the request body, if present. + Model string `protobuf:"bytes,3,opt,name=model,proto3" json:"model,omitempty"` + // Candidate endpoint subset from x-gateway-destination-endpoint-subset metadata. + // When non-empty, the picker MUST select only from endpoints matching this set. + CandidateSubset []string `protobuf:"bytes,4,rep,name=candidate_subset,json=candidateSubset,proto3" json:"candidate_subset,omitempty"` + // Available endpoints in the pool, already filtered by candidate_subset if applicable. + Endpoints []*EndpointInfo `protobuf:"bytes,5,rep,name=endpoints,proto3" json:"endpoints,omitempty"` +} + +func (x *PickRequest) Reset() { + *x = PickRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_picker_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PickRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PickRequest) ProtoMessage() {} + +func (x *PickRequest) ProtoReflect() protoreflect.Message { + mi := &file_picker_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PickRequest.ProtoReflect.Descriptor instead. +func (*PickRequest) Descriptor() ([]byte, []int) { + return file_picker_proto_rawDescGZIP(), []int{0} +} + +func (x *PickRequest) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *PickRequest) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *PickRequest) GetModel() string { + if x != nil { + return x.Model + } + return "" +} + +func (x *PickRequest) GetCandidateSubset() []string { + if x != nil { + return x.CandidateSubset + } + return nil +} + +func (x *PickRequest) GetEndpoints() []*EndpointInfo { + if x != nil { + return x.Endpoints + } + return nil +} + +// EndpointInfo describes a backend endpoint available in the InferencePool. +type EndpointInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Pod IP address. + Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + // Target port number as a string. + Port string `protobuf:"bytes,2,opt,name=port,proto3" json:"port,omitempty"` + // Pod name in "namespace/name" format. + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + // Pod Kubernetes labels. + Labels map[string]string `protobuf:"bytes,4,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *EndpointInfo) Reset() { + *x = EndpointInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_picker_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndpointInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndpointInfo) ProtoMessage() {} + +func (x *EndpointInfo) ProtoReflect() protoreflect.Message { + mi := &file_picker_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndpointInfo.ProtoReflect.Descriptor instead. +func (*EndpointInfo) Descriptor() ([]byte, []int) { + return file_picker_proto_rawDescGZIP(), []int{1} +} + +func (x *EndpointInfo) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *EndpointInfo) GetPort() string { + if x != nil { + return x.Port + } + return "" +} + +func (x *EndpointInfo) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *EndpointInfo) GetLabels() map[string]string { + if x != nil { + return x.Labels + } + return nil +} + +// PickResponse is the result of endpoint selection. +type PickResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Primary selected endpoint in "ip:port" format. + Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` + // Optional fallback endpoints in "ip:port" format, tried in order if primary fails. + Fallbacks []string `protobuf:"bytes,2,rep,name=fallbacks,proto3" json:"fallbacks,omitempty"` +} + +func (x *PickResponse) Reset() { + *x = PickResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_picker_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PickResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PickResponse) ProtoMessage() {} + +func (x *PickResponse) ProtoReflect() protoreflect.Message { + mi := &file_picker_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PickResponse.ProtoReflect.Descriptor instead. +func (*PickResponse) Descriptor() ([]byte, []int) { + return file_picker_proto_rawDescGZIP(), []int{2} +} + +func (x *PickResponse) GetEndpoint() string { + if x != nil { + return x.Endpoint + } + return "" +} + +func (x *PickResponse) GetFallbacks() []string { + if x != nil { + return x.Fallbacks + } + return nil +} + +var File_picker_proto protoreflect.FileDescriptor + +var file_picker_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, + 0x65, 0x70, 0x70, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x22, 0x92, 0x02, 0x0a, 0x0b, 0x50, 0x69, 0x63, + 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x65, 0x70, 0x70, 0x6c, + 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x6f, + 0x64, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6d, 0x6f, 0x64, 0x65, 0x6c, + 0x12, 0x29, 0x0a, 0x10, 0x63, 0x61, 0x6e, 0x64, 0x69, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x73, 0x75, + 0x62, 0x73, 0x65, 0x74, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x63, 0x61, 0x6e, 0x64, + 0x69, 0x64, 0x61, 0x74, 0x65, 0x53, 0x75, 0x62, 0x73, 0x65, 0x74, 0x12, 0x34, 0x0a, 0x09, 0x65, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, + 0x2e, 0x65, 0x70, 0x70, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, + 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, + 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xc7, 0x01, + 0x0a, 0x0c, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x18, + 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x3a, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x22, 0x2e, 0x65, 0x70, 0x70, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x45, 0x6e, 0x64, 0x70, + 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x1a, 0x39, 0x0a, 0x0b, + 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x48, 0x0a, 0x0c, 0x50, 0x69, 0x63, 0x6b, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, + 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, + 0x69, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, + 0x73, 0x32, 0x4e, 0x0a, 0x15, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x50, 0x69, 0x63, + 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x50, 0x69, + 0x63, 0x6b, 0x12, 0x15, 0x2e, 0x65, 0x70, 0x70, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, 0x69, + 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x65, 0x70, 0x70, 0x6c, + 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x42, 0x45, 0x5a, 0x43, 0x73, 0x69, 0x67, 0x73, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, + 0x2f, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x2d, 0x61, 0x70, 0x69, 0x2d, 0x69, 0x6e, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x2d, 0x65, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, + 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x65, 0x70, 0x70, 0x2d, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_picker_proto_rawDescOnce sync.Once + file_picker_proto_rawDescData = file_picker_proto_rawDesc +) + +func file_picker_proto_rawDescGZIP() []byte { + file_picker_proto_rawDescOnce.Do(func() { + file_picker_proto_rawDescData = protoimpl.X.CompressGZIP(file_picker_proto_rawDescData) + }) + return file_picker_proto_rawDescData +} + +var file_picker_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_picker_proto_goTypes = []any{ + (*PickRequest)(nil), // 0: epplight.PickRequest + (*EndpointInfo)(nil), // 1: epplight.EndpointInfo + (*PickResponse)(nil), // 2: epplight.PickResponse + nil, // 3: epplight.PickRequest.HeadersEntry + nil, // 4: epplight.EndpointInfo.LabelsEntry +} +var file_picker_proto_depIdxs = []int32{ + 3, // 0: epplight.PickRequest.headers:type_name -> epplight.PickRequest.HeadersEntry + 1, // 1: epplight.PickRequest.endpoints:type_name -> epplight.EndpointInfo + 4, // 2: epplight.EndpointInfo.labels:type_name -> epplight.EndpointInfo.LabelsEntry + 0, // 3: epplight.EndpointPickerService.Pick:input_type -> epplight.PickRequest + 2, // 4: epplight.EndpointPickerService.Pick:output_type -> epplight.PickResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_picker_proto_init() } +func file_picker_proto_init() { + if File_picker_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_picker_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*PickRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_picker_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*EndpointInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_picker_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*PickResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_picker_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_picker_proto_goTypes, + DependencyIndexes: file_picker_proto_depIdxs, + MessageInfos: file_picker_proto_msgTypes, + }.Build() + File_picker_proto = out.File + file_picker_proto_rawDesc = nil + file_picker_proto_goTypes = nil + file_picker_proto_depIdxs = nil +} diff --git a/pkg/epp-light/proto/gen/picker_grpc.pb.go b/pkg/epp-light/proto/gen/picker_grpc.pb.go new file mode 100644 index 0000000000..eb93d229b8 --- /dev/null +++ b/pkg/epp-light/proto/gen/picker_grpc.pb.go @@ -0,0 +1,153 @@ +// Copyright 2025 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v3.21.12 +// source: picker.proto + +package gen + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + EndpointPickerService_Pick_FullMethodName = "/epplight.EndpointPickerService/Pick" +) + +// EndpointPickerServiceClient is the client API for EndpointPickerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// EndpointPickerService is the language-agnostic interface for endpoint selection. +// Implement this service in any language (Rust, Python, C++, etc.) to plug custom +// selection logic into the Light EPP protocol layer. +// +// The Go Light EPP handles all Envoy ext-proc protocol details, InferencePool CRD +// discovery, and pod lifecycle management. This service only needs to decide which +// endpoint to route to. +type EndpointPickerServiceClient interface { + // Pick selects one or more endpoints from the available set for a given request. + Pick(ctx context.Context, in *PickRequest, opts ...grpc.CallOption) (*PickResponse, error) +} + +type endpointPickerServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewEndpointPickerServiceClient(cc grpc.ClientConnInterface) EndpointPickerServiceClient { + return &endpointPickerServiceClient{cc} +} + +func (c *endpointPickerServiceClient) Pick(ctx context.Context, in *PickRequest, opts ...grpc.CallOption) (*PickResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(PickResponse) + err := c.cc.Invoke(ctx, EndpointPickerService_Pick_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EndpointPickerServiceServer is the server API for EndpointPickerService service. +// All implementations must embed UnimplementedEndpointPickerServiceServer +// for forward compatibility. +// +// EndpointPickerService is the language-agnostic interface for endpoint selection. +// Implement this service in any language (Rust, Python, C++, etc.) to plug custom +// selection logic into the Light EPP protocol layer. +// +// The Go Light EPP handles all Envoy ext-proc protocol details, InferencePool CRD +// discovery, and pod lifecycle management. This service only needs to decide which +// endpoint to route to. +type EndpointPickerServiceServer interface { + // Pick selects one or more endpoints from the available set for a given request. + Pick(context.Context, *PickRequest) (*PickResponse, error) + mustEmbedUnimplementedEndpointPickerServiceServer() +} + +// UnimplementedEndpointPickerServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedEndpointPickerServiceServer struct{} + +func (UnimplementedEndpointPickerServiceServer) Pick(context.Context, *PickRequest) (*PickResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Pick not implemented") +} +func (UnimplementedEndpointPickerServiceServer) mustEmbedUnimplementedEndpointPickerServiceServer() {} +func (UnimplementedEndpointPickerServiceServer) testEmbeddedByValue() {} + +// UnsafeEndpointPickerServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EndpointPickerServiceServer will +// result in compilation errors. +type UnsafeEndpointPickerServiceServer interface { + mustEmbedUnimplementedEndpointPickerServiceServer() +} + +func RegisterEndpointPickerServiceServer(s grpc.ServiceRegistrar, srv EndpointPickerServiceServer) { + // If the following call pancis, it indicates UnimplementedEndpointPickerServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&EndpointPickerService_ServiceDesc, srv) +} + +func _EndpointPickerService_Pick_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PickRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EndpointPickerServiceServer).Pick(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EndpointPickerService_Pick_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EndpointPickerServiceServer).Pick(ctx, req.(*PickRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// EndpointPickerService_ServiceDesc is the grpc.ServiceDesc for EndpointPickerService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var EndpointPickerService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "epplight.EndpointPickerService", + HandlerType: (*EndpointPickerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Pick", + Handler: _EndpointPickerService_Pick_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "picker.proto", +} diff --git a/pkg/epp-light/proto/picker.proto b/pkg/epp-light/proto/picker.proto new file mode 100644 index 0000000000..00223a41e4 --- /dev/null +++ b/pkg/epp-light/proto/picker.proto @@ -0,0 +1,74 @@ +// Copyright 2025 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package epplight; + +option go_package = "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light/proto/gen"; + +// EndpointPickerService is the language-agnostic interface for endpoint selection. +// Implement this service in any language (Rust, Python, C++, etc.) to plug custom +// selection logic into the Light EPP protocol layer. +// +// The Go Light EPP handles all Envoy ext-proc protocol details, InferencePool CRD +// discovery, and pod lifecycle management. This service only needs to decide which +// endpoint to route to. +service EndpointPickerService { + // Pick selects one or more endpoints from the available set for a given request. + rpc Pick(PickRequest) returns (PickResponse); +} + +// PickRequest contains all information needed to make an endpoint selection decision. +message PickRequest { + // HTTP request headers. + map headers = 1; + + // Raw HTTP request body bytes. May be empty for header-only requests (e.g., GET). + bytes body = 2; + + // Model name extracted from the request body, if present. + string model = 3; + + // Candidate endpoint subset from x-gateway-destination-endpoint-subset metadata. + // When non-empty, the picker MUST select only from endpoints matching this set. + repeated string candidate_subset = 4; + + // Available endpoints in the pool, already filtered by candidate_subset if applicable. + repeated EndpointInfo endpoints = 5; +} + +// EndpointInfo describes a backend endpoint available in the InferencePool. +message EndpointInfo { + // Pod IP address. + string address = 1; + + // Target port number as a string. + string port = 2; + + // Pod name in "namespace/name" format. + string name = 3; + + // Pod Kubernetes labels. + map labels = 4; +} + +// PickResponse is the result of endpoint selection. +message PickResponse { + // Primary selected endpoint in "ip:port" format. + string endpoint = 1; + + // Optional fallback endpoints in "ip:port" format, tried in order if primary fails. + repeated string fallbacks = 2; +} diff --git a/pkg/epp-light/request.go b/pkg/epp-light/request.go new file mode 100644 index 0000000000..2896bca43a --- /dev/null +++ b/pkg/epp-light/request.go @@ -0,0 +1,195 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + "context" + "encoding/json" + "net" + "strconv" + "strings" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "google.golang.org/protobuf/types/known/structpb" + "k8s.io/apimachinery/pkg/util/sets" + + envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy" +) + +// systemOwnedHeaders are headers managed by the EPP or proxy layer that should +// not be forwarded to the backend. +var systemOwnedHeaders = sets.New( + strings.ToLower(DestinationEndpointKey), + strings.ToLower(SubsetFilterKey), + "content-length", +) + +func isSystemOwnedHeader(key string) bool { + return systemOwnedHeaders.Has(strings.ToLower(key)) +} + +func (s *StreamingServer) handleRequestHeaders(reqCtx *requestContext, req *extProcPb.ProcessingRequest_RequestHeaders) { + for _, header := range req.RequestHeaders.Headers.Headers { + reqCtx.request.Headers[header.Key] = envoy.GetHeaderValue(header) + } +} + +func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, reqCtx *requestContext) *extProcPb.ProcessingResponse { + dynamicMetadata := generateDestinationMetadata(reqCtx.targetEndpoint) + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + ClearRouteCache: true, + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: generateRequestHeaders(ctx, reqCtx), + }, + }, + }, + }, + DynamicMetadata: dynamicMetadata, + } +} + +func generateRequestHeaders(ctx context.Context, reqCtx *requestContext) []*configPb.HeaderValueOption { + headers := []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: DestinationEndpointKey, + RawValue: []byte(reqCtx.targetEndpoint), + }, + }, + } + + if reqCtx.requestSize > 0 { + headers = append(headers, &configPb.HeaderValueOption{ + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte(strconv.Itoa(reqCtx.requestSize)), + }, + }) + } + + // Propagate trace context headers. + traceHeaders := make(map[string]string) + propagator := otel.GetTextMapPropagator() + propagator.Inject(ctx, propagation.MapCarrier(traceHeaders)) + for key, value := range traceHeaders { + headers = append(headers, &configPb.HeaderValueOption{ + Header: &configPb.HeaderValue{ + Key: key, + RawValue: []byte(value), + }, + }) + } + + // Forward non-system-owned headers. + for key, value := range reqCtx.request.Headers { + if isSystemOwnedHeader(key) { + continue + } + headers = append(headers, &configPb.HeaderValueOption{ + Header: &configPb.HeaderValue{ + Key: key, + RawValue: []byte(value), + }, + }) + } + return headers +} + +// generateDestinationMetadata creates the envoy.lb dynamic metadata with the selected endpoint. +func generateDestinationMetadata(endpoint string) *structpb.Struct { + return &structpb.Struct{ + Fields: map[string]*structpb.Value{ + DestinationEndpointNamespace: { + Kind: &structpb.Value_StructValue{ + StructValue: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + DestinationEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: endpoint, + }, + }, + }, + }, + }, + }, + }, + } +} + +// extractModelFromBody extracts the "model" field from a JSON request body. +// Returns empty string if the body is not JSON or has no "model" field. +func extractModelFromBody(body []byte) string { + var bodyMap map[string]any + if err := json.Unmarshal(body, &bodyMap); err != nil { + return "" + } + model, _ := bodyMap["model"].(string) + return model +} + +// extractCandidateSubset extracts the candidate endpoint subset from the request metadata. +// Returns nil if no subset filter is present. +func extractCandidateSubset(metadata map[string]any) []string { + if metadata == nil { + return nil + } + subsetMap, ok := metadata[SubsetFilterNamespace].(map[string]any) + if !ok { + return nil + } + endpointList, ok := subsetMap[SubsetFilterKey].([]any) + if !ok { + return nil + } + result := make([]string, 0, len(endpointList)) + for _, ep := range endpointList { + if s, ok := ep.(string); ok { + result = append(result, s) + } + } + return result +} + +// filterEndpointsBySubset filters endpoints to only those matching the candidate subset. +// The subset contains entries in "ip:port" format; we match by IP address. +func filterEndpointsBySubset(endpoints []Endpoint, subset []string) []Endpoint { + if len(subset) == 0 { + return nil + } + allowedIPs := sets.New[string]() + for _, ep := range subset { + host, _, err := net.SplitHostPort(ep) + if err != nil { + allowedIPs.Insert(ep) // Treat as bare IP if not ip:port. + } else { + allowedIPs.Insert(host) + } + } + var filtered []Endpoint + for _, ep := range endpoints { + if allowedIPs.Has(ep.Address) { + filtered = append(filtered, ep) + } + } + return filtered +} diff --git a/pkg/epp-light/response.go b/pkg/epp-light/response.go new file mode 100644 index 0000000000..07095c18c9 --- /dev/null +++ b/pkg/epp-light/response.go @@ -0,0 +1,54 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + + envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy" +) + +func (s *StreamingServer) handleResponseHeaders(reqCtx *requestContext, resp *extProcPb.ProcessingRequest_ResponseHeaders) { + for _, header := range resp.ResponseHeaders.Headers.Headers { + reqCtx.response.Headers[header.Key] = envoy.GetHeaderValue(header) + } +} + +func generateResponseHeaderResponse() *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{}, + }, + }, + } +} + +func generateResponseBodyResponses(bodyBytes []byte, setEoS bool) []*extProcPb.ProcessingResponse { + commonResponses := envoy.BuildChunkedBodyResponses(bodyBytes, setEoS) + responses := make([]*extProcPb.ProcessingResponse, 0, len(commonResponses)) + for _, commonResp := range commonResponses { + responses = append(responses, &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{ + Response: commonResp, + }, + }, + }) + } + return responses +} diff --git a/pkg/epp-light/server.go b/pkg/epp-light/server.go new file mode 100644 index 0000000000..6eec7e69c8 --- /dev/null +++ b/pkg/epp-light/server.go @@ -0,0 +1,334 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package epplight + +import ( + "context" + "io" + "math/rand" + "net" + "strings" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/log" + + envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy" + errcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/error" + reqcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/request" +) + +// NewStreamingServer creates a new ext-proc streaming server with the given datastore and endpoint picker. +func NewStreamingServer(datastore Datastore, picker EndpointPicker) *StreamingServer { + return &StreamingServer{ + datastore: datastore, + picker: picker, + } +} + +// StreamingServer implements the Envoy ext-proc server for the light EPP. +type StreamingServer struct { + datastore Datastore + picker EndpointPicker +} + +// requestContext stores state during the lifetime of an HTTP request. +type requestContext struct { + targetEndpoint string + requestSize int + requestState streamRequestState + requestReceivedTimestamp time.Time + modelServerStreaming bool + + request *request + response *response + + reqHeaderResp *extProcPb.ProcessingResponse + reqBodyResp []*extProcPb.ProcessingResponse + respHeaderResp *extProcPb.ProcessingResponse + respBodyResp []*extProcPb.ProcessingResponse + respTrailerResp *extProcPb.ProcessingResponse + + responseComplete bool +} + +type request struct { + Headers map[string]string + RawBody []byte + Metadata map[string]any +} + +type response struct { + Headers map[string]string +} + +type streamRequestState int + +const ( + stateRequestReceived streamRequestState = 0 + stateHeaderRequestResponseComplete streamRequestState = 1 + stateBodyRequestResponsesComplete streamRequestState = 2 + stateResponseReceived streamRequestState = 4 + stateHeaderResponseResponseComplete streamRequestState = 5 + stateBodyResponseResponsesComplete streamRequestState = 6 +) + +// Process implements the ext-proc bidirectional streaming RPC. +func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { + ctx := srv.Context() + logger := log.FromContext(ctx) + + reqCtx := &requestContext{ + requestState: stateRequestReceived, + request: &request{ + Headers: make(map[string]string), + Metadata: make(map[string]any), + }, + response: &response{ + Headers: make(map[string]string), + }, + } + + var body []byte + var err error + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + req, recvErr := srv.Recv() + if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled { + return nil + } + if recvErr != nil { + return status.Errorf(codes.Unknown, "cannot receive stream request: %v", recvErr) + } + + reqCtx.request.Metadata = envoy.ExtractMetadataValues(req) + + switch v := req.Request.(type) { + case *extProcPb.ProcessingRequest_RequestHeaders: + requestID := envoy.ExtractHeaderValue(v, reqcommon.RequestIdHeaderKey) + if len(requestID) == 0 { + requestID = uuid.NewString() + reqCtx.request.Headers[reqcommon.RequestIdHeaderKey] = requestID + } + logger = logger.WithValues(reqcommon.RequestIdHeaderKey, requestID) + logger.Info("Light EPP received request") + ctx = log.IntoContext(ctx, logger) + + reqCtx.requestReceivedTimestamp = time.Now() + s.handleRequestHeaders(reqCtx, v) + + // If EoS in headers, this is a header-only request (e.g. GET). + // Route to a random endpoint. + if v.RequestHeaders.EndOfStream { + ep := s.getRandomEndpoint() + if ep == nil { + err = errcommon.Error{Code: errcommon.Internal, Msg: "no pods available in datastore"} + break + } + reqCtx.targetEndpoint = net.JoinHostPort(ep.Address, ep.Port) + reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx) + } + + case *extProcPb.ProcessingRequest_RequestBody: + body = append(body, v.RequestBody.Body...) + if v.RequestBody.EndOfStream { + reqCtx.request.RawBody = body + reqCtx.requestSize = len(body) + body = nil + + err = s.handleRequestBody(ctx, reqCtx) + } + + case *extProcPb.ProcessingRequest_RequestTrailers: + // Unused. + + case *extProcPb.ProcessingRequest_ResponseHeaders: + for _, header := range v.ResponseHeaders.Headers.GetHeaders() { + if header.Key == "content-type" && strings.Contains(string(header.RawValue), "text/event-stream") { + reqCtx.modelServerStreaming = true + } + } + reqCtx.requestState = stateResponseReceived + s.handleResponseHeaders(reqCtx, v) + reqCtx.respHeaderResp = generateResponseHeaderResponse() + + case *extProcPb.ProcessingRequest_ResponseBody: + endOfStream := v.ResponseBody.EndOfStream + chunk := v.ResponseBody.Body + + if endOfStream { + reqCtx.responseComplete = true + } + + if reqCtx.modelServerStreaming { + reqCtx.respBodyResp = generateResponseBodyResponses(chunk, endOfStream) + } else { + body = append(body, chunk...) + if endOfStream { + reqCtx.responseComplete = true + reqCtx.respBodyResp = generateResponseBodyResponses(body, true) + body = nil + } + } + + case *extProcPb.ProcessingRequest_ResponseTrailers: + if !reqCtx.responseComplete { + reqCtx.responseComplete = true + reqCtx.respBodyResp = generateResponseBodyResponses(body, false) + body = nil + } + reqCtx.respTrailerResp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseTrailers{ + ResponseTrailers: &extProcPb.TrailersResponse{}, + }, + } + } + + if err != nil { + logger.Error(err, "Failed to process request") + resp, buildErr := errcommon.BuildErrResponse(err) + if buildErr != nil { + return buildErr + } + if sendErr := srv.Send(resp); sendErr != nil { + return status.Errorf(codes.Unknown, "failed to send error response: %v", sendErr) + } + return nil + } + + if sendErr := reqCtx.updateStateAndSendIfNeeded(srv, logger); sendErr != nil { + return sendErr + } + } +} + +// handleRequestBody processes the full request body: extracts model, resolves +// candidate subset, filters endpoints, and calls the picker. +func (s *StreamingServer) handleRequestBody(ctx context.Context, reqCtx *requestContext) error { + logger := log.FromContext(ctx) + + model := extractModelFromBody(reqCtx.request.RawBody) + candidateSubset := extractCandidateSubset(reqCtx.request.Metadata) + + endpoints := s.datastore.ListEndpoints() + if candidateSubset != nil { + endpoints = filterEndpointsBySubset(endpoints, candidateSubset) + if len(endpoints) == 0 { + return errcommon.Error{ + Code: errcommon.ServiceUnavailable, + Msg: "no endpoints available matching candidate subset", + } + } + } + + if len(endpoints) == 0 { + return errcommon.Error{ + Code: errcommon.ServiceUnavailable, + Msg: "no endpoints available in pool", + } + } + + reqInfo := &RequestInfo{ + Headers: reqCtx.request.Headers, + Body: reqCtx.request.RawBody, + Model: model, + CandidateSubset: candidateSubset, + } + + result, err := s.picker.Pick(ctx, reqInfo, endpoints) + if err != nil { + return errcommon.Error{ + Code: errcommon.ServiceUnavailable, + Msg: "endpoint picker failed: " + err.Error(), + } + } + + // Build the target endpoint string (primary + fallbacks). + targetEndpoints := []string{result.Endpoint} + targetEndpoints = append(targetEndpoints, result.Fallbacks...) + reqCtx.targetEndpoint = strings.Join(targetEndpoints, ",") + + logger.Info("Request handled", "model", model, "endpoint", reqCtx.targetEndpoint) + + reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx) + reqCtx.reqBodyResp = envoy.GenerateRequestBodyResponses(reqCtx.request.RawBody) + + return nil +} + +func (s *StreamingServer) getRandomEndpoint() *Endpoint { + endpoints := s.datastore.ListEndpoints() + if len(endpoints) == 0 { + return nil + } + ep := endpoints[rand.Intn(len(endpoints))] + return &ep +} + +// updateStateAndSendIfNeeded sends pending responses in the correct ext-proc order. +func (r *requestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, logger logr.Logger) error { + if r.requestState == stateRequestReceived && r.reqHeaderResp != nil { + if err := srv.Send(r.reqHeaderResp); err != nil { + return status.Errorf(codes.Unknown, "failed to send request header response: %v", err) + } + r.requestState = stateHeaderRequestResponseComplete + } + if r.requestState == stateHeaderRequestResponseComplete && len(r.reqBodyResp) > 0 { + for _, resp := range r.reqBodyResp { + if err := srv.Send(resp); err != nil { + return status.Errorf(codes.Unknown, "failed to send request body response: %v", err) + } + } + logger.Info("Light EPP sent request body response(s) to proxy") + r.requestState = stateBodyRequestResponsesComplete + r.reqBodyResp = nil + } + if r.requestState == stateResponseReceived && r.respHeaderResp != nil { + if err := srv.Send(r.respHeaderResp); err != nil { + return status.Errorf(codes.Unknown, "failed to send response header response: %v", err) + } + r.requestState = stateHeaderResponseResponseComplete + } + if r.requestState == stateHeaderResponseResponseComplete { + for _, resp := range r.respBodyResp { + if err := srv.Send(resp); err != nil { + return status.Errorf(codes.Unknown, "failed to send response body response: %v", err) + } + } + if r.responseComplete { + logger.Info("Light EPP sent response body back to proxy") + r.requestState = stateBodyResponseResponsesComplete + } + r.respBodyResp = nil + } + if r.requestState == stateBodyResponseResponsesComplete && r.respTrailerResp != nil { + if err := srv.Send(r.respTrailerResp); err != nil { + return status.Errorf(codes.Unknown, "failed to send response trailer: %v", err) + } + } + return nil +} diff --git a/pkg/epp-light/server/options.go b/pkg/epp-light/server/options.go new file mode 100644 index 0000000000..8e994a2207 --- /dev/null +++ b/pkg/epp-light/server/options.go @@ -0,0 +1,77 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "errors" + + "github.com/spf13/pflag" +) + +const ( + DefaultGRPCPort = 9002 + DefaultPoolNamespace = "default" +) + +// Options contains the minimal CLI configuration for the light EPP. +type Options struct { + GRPCPort int + PoolNamespace string + PoolName string + SecureServing bool + CertPath string + // PickerAddress is the address of a remote gRPC EndpointPickerService. + // When set, the light EPP delegates endpoint selection to this remote service + // instead of using the local Go picker. This enables non-Go implementations + // (Rust, Python, etc.) to provide custom selection logic in a separate process. + PickerAddress string +} + +// NewOptions returns Options with default values. +func NewOptions() *Options { + return &Options{ + GRPCPort: DefaultGRPCPort, + SecureServing: false, + } +} + +// AddFlags registers CLI flags for the light EPP options. +func (o *Options) AddFlags(fs *pflag.FlagSet) { + if fs == nil { + fs = pflag.CommandLine + } + fs.IntVar(&o.GRPCPort, "grpc-port", o.GRPCPort, "gRPC port for ext-proc communication with the proxy.") + fs.StringVar(&o.PoolNamespace, "pool-namespace", o.PoolNamespace, "Namespace of the InferencePool.") + fs.StringVar(&o.PoolName, "pool-name", o.PoolName, "Name of the InferencePool.") + fs.BoolVar(&o.SecureServing, "secure-serving", o.SecureServing, "Enables TLS for the gRPC server.") + fs.StringVar(&o.CertPath, "cert-path", o.CertPath, "Path to TLS certificate (tls.crt and tls.key).") + fs.StringVar(&o.PickerAddress, "picker-address", o.PickerAddress, + "Address of a remote gRPC EndpointPickerService (e.g., localhost:9010). "+ + "When set, endpoint selection is delegated to this service. "+ + "When unset, the in-process picker is used.") +} + +// Validate checks that required options are set. +func (o *Options) Validate() error { + if o.PoolName == "" { + return errors.New("--pool-name is required") + } + if o.PoolNamespace == "" { + o.PoolNamespace = DefaultPoolNamespace + } + return nil +} diff --git a/pkg/epp-light/server/runner.go b/pkg/epp-light/server/runner.go new file mode 100644 index 0000000000..698fbba4fb --- /dev/null +++ b/pkg/epp-light/server/runner.go @@ -0,0 +1,108 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "crypto/tls" + "fmt" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" + tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls" + epplight "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp-light/controller" +) + +// ExtProcServerRunner manages the lifecycle of the light EPP ext-proc server. +type ExtProcServerRunner struct { + GRPCPort int + PoolNamespace string + PoolName string + Datastore epplight.Datastore + Picker epplight.EndpointPicker + SecureServing bool + CertPath string +} + +// SetupWithManager registers the InferencePool and Pod reconcilers with the controller manager. +func (r *ExtProcServerRunner) SetupWithManager(mgr ctrl.Manager) error { + if err := (&controller.InferencePoolReconciler{ + Reader: mgr.GetClient(), + Datastore: r.Datastore, + PoolNamespace: r.PoolNamespace, + PoolName: r.PoolName, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) + } + + if err := (&controller.PodReconciler{ + Reader: mgr.GetClient(), + Datastore: r.Datastore, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up PodReconciler: %w", err) + } + + return nil +} + +// AsRunnable returns a manager.Runnable that starts the ext-proc gRPC server. +func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable { + return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { + var srv *grpc.Server + if r.SecureServing { + cert, err := r.loadOrCreateCert(logger) + if err != nil { + return err + } + creds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2"}, + }) + srv = grpc.NewServer(grpc.Creds(creds)) + } else { + srv = grpc.NewServer() + } + + extProcServer := epplight.NewStreamingServer(r.Datastore, r.Picker) + extProcPb.RegisterExternalProcessorServer(srv, extProcServer) + + // Register health check service. + healthcheck := health.NewServer() + healthgrpc.RegisterHealthServer(srv, healthcheck) + svcName := extProcPb.ExternalProcessor_ServiceDesc.ServiceName + logger.Info("Setting ExternalProcessor service status to SERVING", "serviceName", svcName) + healthcheck.SetServingStatus(svcName, healthgrpc.HealthCheckResponse_SERVING) + + return runnable.GRPCServer("epp-light", srv, r.GRPCPort).Start(ctx) + })) +} + +func (r *ExtProcServerRunner) loadOrCreateCert(logger logr.Logger) (tls.Certificate, error) { + if r.CertPath != "" { + return tls.LoadX509KeyPair(r.CertPath+"/tls.crt", r.CertPath+"/tls.key") + } + return tlsutil.CreateSelfSignedTLSCertificate(logger) +}