diff --git a/Dockerfile b/Dockerfile index cce7802..4dba6f6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,7 @@ +# Mr. Fluence! # Multi-stage build for the fluence scheduler. -# # The scheduler binary cgo-links flux-sched (Fluxion) for resource matching. -# It does NOT depend on QRMI — quantum job submission is a separate workload -# (github.com/converged-computing/qrmi-sampler). So this image needs only -# flux-sched, no Rust/QRMI. Mirrors the .devcontainer build. -# ---------- builder ---------- FROM fluxrm/flux-core:noble AS builder USER root @@ -37,7 +33,9 @@ COPY . . RUN CGO_ENABLED=1 \ CGO_CFLAGS="-I/opt/flux-sched" \ CGO_LDFLAGS="-L/opt/flux-sched/resource -L/opt/flux-sched/resource/libjobspec -L/opt/flux-sched/resource/reapi/bindings -lresource -ljobspec_conv -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" \ - go build -ldflags '-w' -o /bin/fluence ./cmd/fluence + go build -ldflags '-w' -o /bin/fluence ./cmd/fluence && \ + CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin && \ + CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook FROM fluxrm/flux-core:noble AS runtime @@ -55,4 +53,6 @@ COPY --from=builder /usr/lib/libjobspec_conv.so* /usr/lib/ RUN ldconfig COPY --from=builder /bin/fluence /bin/fluence -ENTRYPOINT ["/bin/fluence"] \ No newline at end of file +COPY --from=builder /bin/fluence-deviceplugin /bin/fluence-deviceplugin +COPY --from=builder /bin/fluence-webhook /bin/fluence-webhook +ENTRYPOINT ["/bin/fluence"] diff --git a/Makefile b/Makefile index 2fde646..5d9328a 100644 --- a/Makefile +++ b/Makefile @@ -18,13 +18,16 @@ CGO_LDFLAGS = -L$(FLUX_SCHED_ROOT)/resource \ -lflux-hostlist -lboost_graph -lyaml-cpp .PHONY: build -build: ## Build the fluence scheduler binary (needs flux-sched) +build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go) CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ go build -o bin/fluence ./cmd/fluence + CGO_ENABLED=0 go build -o bin/fluence-deviceplugin ./cmd/deviceplugin + CGO_ENABLED=0 go build -o bin/fluence-webhook ./cmd/webhook .PHONY: test test: ## Pure-Go unit tests (no flux, no k8s scheduler libs, no cluster) - go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... ./pkg/quantum/... + go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... \ + ./pkg/quantum/... ./pkg/webhook/... ./pkg/deviceplugin/... .PHONY: test-graph test-graph: ## Matcher tests (needs flux-sched) diff --git a/README.md b/README.md index 8b7ca43..eafb62f 100644 --- a/README.md +++ b/README.md @@ -4,119 +4,222 @@ A Kubernetes scheduler plugin that places **pod groups** (and individual pods) by matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) -(flux-sched) resource graph built from the live cluster. +(flux-sched) resource graph built from the live cluster. This is an update from [flux-k8s](https://github.com/flux-framework/flux-k8s) that uses the native PodGroup and optionally allows for scheduling -against **quantum resources** modeled in the same graph. I am also improving -the design by not requiring a sidecar for fluence - the plugin is built as one -container. +against arbitrary resources such as **quantum resources** modeled in the same graph. +I am also improving the design by not requiring a sidecar for fluence, and not +requiring the `kubernetes-sigs/scheduler-plugins` dependency. We use native Gang +scheduling provided by Kubernetes. For quantum resource modeling, we start from the prototype proven out in -[fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). -This design is an improvement upon the initial fluence because we drop -the `kubernetes-sigs/scheduler-plugins` dependency and use Kubernetes -**native gang scheduling** (the `PodGroup` API, `scheduling.k8s.io/v1alpha2`, -alpha in 1.35/1.36). +[fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). ## How it works +### Gang Scheduling + Gang semantics (all-or-nothing) come from the native `PodGroup` API. Fluence is responsible only for **placement**: 1. **Discover** — on startup fluence lists cluster nodes and turns their cpu/memory/gpu capacity into a Fluxion JGF resource graph - (`pkg/cluster` + `pkg/jgf`). Quantum backends from a config file are injected - as `qpu` vertices under a `qgateway` (`AddQuantum`). + (`pkg/cluster` + `pkg/jgf`). If a resources config is provided (via + `FLUENCE_RESOURCES`), its entries (e.g. quantum backends) are injected as + `qpu`/`qubit` vertices. With no config the graph is classical-only. 2. **Match** — when the first pod of a group hits `PreFilter`, fluence builds a - Fluxion jobspec for the whole gang (`pkg/fluence.JobspecForGroup`), asks the + Fluxion jobspec for the whole gang (`pkg/placement.JobspecForGroup`), asks the matcher to allocate (`pkg/graph.FluxionGraph.MatchAllocateSpec`), and parses - the allocation into node names (`PlacementFromAllocation`). -3. **Place** — `Filter` then permits each pod only on its allocated node. - -For a **quantum** pod (one that requests `quantum.flux-framework.org/qpu`), the -match allocates a `qpu` vertex instead of cores; the allocated backend name -(e.g. `ibm_fez`) is what the workload submits to via -[qrmi-go](https://github.com/converged-computing/qrmi-go) (job mode on the IBM -open plan — see fluxion-quantum for that story). - -``` -nodes (kubectl get nodes) ─┐ - ├─► JGF resource graph ─► Fluxion match ─► node + backend placement -quantum-backends.yaml ─────┘ + the allocation into node and backend names (`PlacementFromAllocation`). +3. **Place** — `Filter` permits each pod only on its allocated node. (A + quantum-only pod allocates a `qpu` but no node — the backend is a remote API + any node can reach — so fluence imposes no node constraint in that case.) +4. **Hand off** — for a quantum pod, `PreBind` records the allocated backend on + the pod as the `fluence.flux-framework.org/backend` annotation. The mutating + webhook (installed with the base) injects a downward-API env so the container + reads it as `QRMI_BACKEND` with no boilerplate in the manifest. + +### Design Choices + +While Quantum resources are this first target, notably we should be able to support +any arbitrary resource in the graph. I decided that a pod can request a graph resource generically +e.g., `fluxion.flux-framework.org/` (like `.../qpu: "1"`) and that becomes a jobspec count +of ``. To support this, we deploy a **device plugin** that can advertise these virtual +types on every node. We need to do this because of the in-tree `NodeResourcesFit` endpoint. +If we do not have the device plugin, this call will not be satisfied. Note that +this device plugin will return True for any resources it sees added to the Fluxion resource graph, +but is not actually involved with scheduling. Fluxion does the real matching. + +```console +nodes (kubectl get nodes) ──┐ + ├─► JGF resource graph ─► Fluxion match ─► node + backend placement +fluence-resources ConfigMap ┘ ``` +I am also choosing to keep credentials and qrmi interactions on the level of the application. +I am not comfortable with the design of an operator holding any kind of credential or being +responsible for managing calls with qrmi in a multi-tenant environment. Finally, since +there are (and will continue to be) a lot of environment variables that I do not want +to place on the user to define, we have a webhook to handle this. We can combine an annotation +added with the webhook with a PreBind call to define the annotation to orchestrate that. + ## Build -The scheduler binary links flux-sched (the matcher) and, for quantum, QRMI: +The scheduler binary links flux-sched (the matcher). It does **not** link QRMI — +quantum job submission lives in a separate workload container +([qrmi-sampler](https://github.com/converged-computing/qrmi-sampler)), not here. ```bash -# If you want to debug inside the .devcontainer, use this one -make build # needs flux-sched at /opt/flux-sched and QRMI at /usr/local +# Inside the .devcontainer (flux-sched at /opt/flux-sched): +# builds bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook +make build +make test -# If you want to test outside (and build the docker image, this one) +# Or build the container image (all three binaries): make image ``` -Pure-Go pieces (graph builder, discovery, jobspec, placement) need neither and -are covered by: +## Deploy + +Create a development cluster on a Kubernetes release that supports native gang +scheduling, with the feature gates enabled: ```bash -make test +kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml ``` -## Deploy +(See [installing kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries).) +The kind config turns on the `GangScheduling` and `GenericWorkload` feature gates +and the `scheduling.k8s.io/v1alpha2` API group on the apiserver and scheduler. In +the future these will likely be enabled by default. -Here is how I am creating a development cluster with a release of Kubernetes that will support -what we need: +Load the image (built above) into the cluster: ```bash -kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml +kind load docker-image ghcr.io/converged-computing/fluence:latest ``` -And if you [need to install kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries). +### 1. Gang Scheduling +Install the **base** scheduler (this is all you need for classical scheduling — +no device plugin, no quantum): ```bash -# This creates the quantum backends yaml graph -kubectl create configmap fluence-quantum-backends --from-file=quantum-backends.yaml=config/quantum-backends.yaml -n kube-system +kubectl apply -f deploy/fluence.yaml +``` -# load docker image -kind load docker-image ghcr.io/converged-computing/fluence +This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with +`schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group` +label (a single pod is treated as a group of one and needs no label). -kubectl apply -f deploy/fluence.yaml # RBAC + scheduler in kube-system -kubectl apply -f examples/podgroup.yaml # a gang scheduled by fluence -``` +## Testing + +### 1. Classical (a pod group) -This works by enabling the native gang feature on the cluster (kube-scheduler / API server), meaning -the `GangScheduling` and `GenericWorkload` feature gates and the `scheduling.k8s.io/v1alpha2` API group. -In the future these will likely be enabled by default. +The base install is enough. Schedule a gang: -Pods opt in with `schedulerName: fluence` and a `scheduling.k8s.io/pod-group` label; group size can be set explicitly with -`fluence.flux-framework.org/group-size`. +```bash +kubectl apply -f examples/podgroup.yaml +kubectl get pods -o wide +kubectl get events --field-selector reason=Scheduled +kubectl get podgroups.scheduling.k8s.io +``` +```console +NAME POLICY WORKLOAD STATUS AGE +training Gang Scheduled 15s +``` -Note that when you are developing / debugging a group deletion can hang because of finalizers. I do: +And cleanup. ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' +kubectl delete -f examples/podgroup.yaml ``` -## Quantum +### 2. Quantum -We can bing fluence up with quantum resources by pointing `FLUENCE_QUANTUM_CONFIG` at a backends file (see `config/quantum-backends.yaml`). -Those backends become schedulable `qpu` vertices; a pod requesting `quantum.flux-framework.org/qpu` will be matched to one, and the allocated backend is handed to the workload. +Quantum needs the resources add-on, which supplies the `fluence-resources` +ConfigMap (the single source of truth for which backends exist) **and** the +device plugin that advertises them: + +```bash +kubectl apply -f deploy/fluence-resources.yaml +# The scheduler reads its resources config at startup, so restart it to pick up +# the quantum vertices: +kubectl rollout restart deployment/fluence -n kube-system +``` + +Confirm the device plugin advertised the resources on the nodes: + +```bash +kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.allocatable}{"\n"}{end}' \ + | grep fluxion.flux-framework.org +``` +```console +kind-control-plane {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +kind-worker {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +kind-worker2 {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +``` + +Create the IBM credentials the **workload** uses to submit (in the namespace +where the workload runs — the scheduler itself never needs them): + +```bash +# If you don't have this yet +curl -fsSL https://clis.cloud.ibm.com/install/linux | sudo sh +ibmcloud login --apikey +# 12 for us-east +``` +```bash +export IBM_CLOUD_TOKEN= +export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[] | {name: .name, crn: .crn}' | jq -r .crn) +``` + +```bash +kubectl create secret generic ibm-quantum -n default --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" +``` + +Run a single quantum pod. It just requests `fluxion.flux-framework.org/qpu` — no +group, and no hard-coded backend (the webhook + PreBind supply `QRMI_BACKEND`): + +```bash +kubectl apply -f examples/quantum-pod.yaml +kubectl get pod sampler -o wide + +# fluence's chosen backend, injected as an environment variable: +kubectl get pod sampler -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/backend}{"\n"}' +kubectl logs sampler +``` +```console +kubectl logs sampler -f +2026/06/06 19:04:38 submitting sampler job to ibm_marrakesh +{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} +2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh +``` +Boum! + +### A note on deletion + +When developing/debugging, a PodGroup (or its pods) can hang on delete because of +finalizers (the workload controller may not be running). Clear them with: + +```bash +kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' +``` -**under development** I am still thinking about how to make this request. -V +Importantly, submission is **not** done by the scheduler — the workload container holds the +user's credentials and submits via qrmi-go (job mode on the IBM open plan; see +fluxion-quantum for that story). Fluence only schedules and hands off the backend. +When we actually have control of local quantum devices this will be different. ## License HPCIC DevTools is distributed under the terms of the MIT license. All new contributions must be made under this license. -See [LICENSE](https://github.com/converged-computing/cloud-select/blob/main/LICENSE), -[COPYRIGHT](https://github.com/converged-computing/cloud-select/blob/main/COPYRIGHT), and -[NOTICE](https://github.com/converged-computing/cloud-select/blob/main/NOTICE) for details. +See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details. -SPDX-License-Identifier: (MIT) +SPDX-License-Identifier: MIT -LLNL-CODE- 842614 +LLNL-CODE-842614 diff --git a/cmd/deviceplugin/main.go b/cmd/deviceplugin/main.go new file mode 100644 index 0000000..8e88e6b --- /dev/null +++ b/cmd/deviceplugin/main.go @@ -0,0 +1,83 @@ +// Command fluence-deviceplugin advertises exotic Fluxion resource types as +// counted extended resources on the node it runs on (one per type, e.g. +// fluxion.flux-framework.org/qpu and .../qubit). Deploy it as a DaemonSet +// alongside the fluence scheduler. +// +// The set of types is derived from the SAME resources config the scheduler uses +// to build its graph (FLUENCE_RESOURCES), so the advertised resources and the +// graph's resource types come from one source and cannot drift. If +// FLUENCE_RESOURCES is unset or the file is absent, nothing is advertised — the +// node stays classical-only. +// +// A quantum backend is a remote API reachable from any node, not a node-local +// device, so each type is advertised at a large per-node ceiling. That count is +// only a local admission gate (so NodeResourcesFit is satisfied); the real gates +// are Fluxion (which backend, is one available) and the user's API limit. +// +// FLUENCE_RESOURCES path to the shared resources config +// (default /etc/fluence/resources.yaml) +// FLUENCE_RESOURCE_CAPACITY per-node ceiling for each type (default 1000) +package main + +import ( + "context" + "log" + "os" + "os/signal" + "strconv" + "sync" + "syscall" + + "github.com/converged-computing/fluence/pkg/cluster" + "github.com/converged-computing/fluence/pkg/deviceplugin" +) + +func main() { + cfgPath := os.Getenv("FLUENCE_RESOURCES") + if cfgPath == "" { + cfgPath = "/etc/fluence/resources.yaml" + } + + var names []string + if data, err := os.ReadFile(cfgPath); err == nil { + qc, perr := cluster.LoadQuantumConfig(data) + if perr != nil { + log.Fatalf("parse resources config %s: %v", cfgPath, perr) + } + names = cluster.FluxionResourceNames(qc.Backends) + log.Printf("derived %d resource(s) from %s: %v", len(names), cfgPath, names) + } else { + log.Printf("no resources config at %s (%v); advertising nothing", cfgPath, err) + } + + capacity := 1000 + if v := os.Getenv("FLUENCE_RESOURCE_CAPACITY"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + capacity = n + } + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + if len(names) == 0 { + log.Print("no exotic resources to advertise; idling") + <-ctx.Done() + return + } + + var wg sync.WaitGroup + for _, name := range names { + wg.Add(1) + go func(name string) { + defer wg.Done() + p := deviceplugin.New(name, capacity) + log.Printf("advertising %s capacity=%d", name, capacity) + if err := p.Run(ctx); err != nil { + log.Printf("device plugin %s: %v", name, err) + stop() // bring the process down so the DaemonSet restarts it + } + }(name) + } + wg.Wait() +} diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go new file mode 100644 index 0000000..bc5f816 --- /dev/null +++ b/cmd/webhook/main.go @@ -0,0 +1,82 @@ +// Command fluence-webhook runs fluence's mutating admission webhook. At startup +// it generates a self-signed CA + serving certificate, patches its +// MutatingWebhookConfiguration's caBundle so the apiserver trusts it, then +// serves the /mutate endpoint over HTTPS. No cert-manager or committed keys. +// +// WEBHOOK_SERVICE Service name (default fluence-webhook) +// WEBHOOK_NAMESPACE Service namespace (default kube-system) +// WEBHOOK_CONFIG MutatingWebhookConfiguration name (default fluence-webhook) +// WEBHOOK_ADDR listen address (default :8443) +package main + +import ( + "context" + "crypto/tls" + "log" + "net/http" + "os" + "time" + + "github.com/converged-computing/fluence/pkg/webhook" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func env(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func main() { + svc := env("WEBHOOK_SERVICE", "fluence-webhook") + ns := env("WEBHOOK_NAMESPACE", "kube-system") + cfgName := env("WEBHOOK_CONFIG", "fluence-webhook") + addr := env("WEBHOOK_ADDR", ":8443") + + dnsNames := []string{ + svc + "." + ns + ".svc", + svc + "." + ns + ".svc.cluster.local", + } + caPEM, certPEM, keyPEM, err := webhook.GenerateCerts(dnsNames) + if err != nil { + log.Fatalf("generate certs: %v", err) + } + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + log.Fatalf("load serving cert: %v", err) + } + + cfg, err := rest.InClusterConfig() + if err != nil { + log.Fatalf("in-cluster config: %v", err) + } + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Fatalf("client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := webhook.EnsureCABundle(ctx, client, cfgName, caPEM); err != nil { + cancel() + log.Fatalf("patch caBundle on %s: %v", cfgName, err) + } + cancel() + log.Printf("patched caBundle on MutatingWebhookConfiguration %q", cfgName) + + mux := http.NewServeMux() + mux.HandleFunc("/mutate", webhook.Handler) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + log.Printf("serving webhook on %s", addr) + if err := srv.ListenAndServeTLS("", ""); err != nil { + log.Fatalf("serve: %v", err) + } +} diff --git a/config/quantum-backends.yaml b/config/quantum-backends.yaml deleted file mode 100644 index 1dcfc14..0000000 --- a/config/quantum-backends.yaml +++ /dev/null @@ -1,12 +0,0 @@ -# Virtual quantum resources to inject into the cluster resource graph. -# Point FLUENCE_QUANTUM_CONFIG at this file to bring up fluence with quantum. -# Each backend becomes a `qpu` vertex (name = QRMI backend id) under a qgateway. -backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service diff --git a/deploy/fluence-resources.yaml b/deploy/fluence-resources.yaml new file mode 100644 index 0000000..83a992a --- /dev/null +++ b/deploy/fluence-resources.yaml @@ -0,0 +1,77 @@ +# Resources add-on for fluence. Turns on fluence-managed resources by supplying +# (1) the resources config and (2) the device plugin that advertises them. +# Quantum backends are just the example payload here; any resource type fluence +# can model goes in the same ConfigMap. Apply AFTER deploy/fluence.yaml: +# +# kubectl apply -f deploy/fluence.yaml # base scheduler (no devices) +# kubectl apply -f deploy/fluence-resources.yaml # + resources config + device plugin +# kubectl rollout restart deployment/fluence -n kube-system # scheduler re-reads resources +# +# The base scheduler already mounts the `fluence-resources` ConfigMap optionally +# and reads FLUENCE_RESOURCES, so this add-on is purely additive — no edits to +# the base Deployment. + +# Resources config: the SINGLE source of truth for the resource types fluence +# injects/advertises. The scheduler builds qpu/qubit graph vertices from it; the +# device plugin derives which extended resources to advertise from the SAME +# document (same rule), so the two cannot drift. +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluence-resources + namespace: kube-system +data: + resources.yaml: | + backends: + - name: ibm_fez + num_qubits: 156 + vendor: ibm + qrmi_type: qiskit-runtime-service + - name: ibm_marrakesh + num_qubits: 156 + vendor: ibm + qrmi_type: qiskit-runtime-service +--- +# Device plugin: advertises the exotic Fluxion resource types (derived from the +# resources config above) on every node, so pods can request them via resources +# and NodeResourcesFit is satisfied. +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: fluence-deviceplugin + namespace: kube-system + labels: {app: fluence-deviceplugin} +spec: + selector: + matchLabels: {app: fluence-deviceplugin} + template: + metadata: + labels: {app: fluence-deviceplugin} + spec: + priorityClassName: system-node-critical + tolerations: + - operator: Exists # run on every node, including tainted/control-plane + containers: + - name: deviceplugin + image: ghcr.io/converged-computing/fluence:latest + imagePullPolicy: Never + command: ["/bin/fluence-deviceplugin"] + env: + - name: FLUENCE_RESOURCES + value: /etc/fluence/resources.yaml + - name: FLUENCE_RESOURCE_CAPACITY + value: "1000" + securityContext: + privileged: true + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + - name: resources + mountPath: /etc/fluence + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + - name: resources + configMap: + name: fluence-resources diff --git a/deploy/fluence.yaml b/deploy/fluence.yaml index 89b17a5..a16dcda 100644 --- a/deploy/fluence.yaml +++ b/deploy/fluence.yaml @@ -62,6 +62,16 @@ rules: - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "list", "watch"] + # PreBind stamps the allocated backend onto the pod as an annotation; the + # built-in system:kube-scheduler role only allows patching pods/status, not + # the pod object, so grant it here. + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch", "patch", "update"] + # The webhook self-manages its TLS by patching its own config's caBundle. + - apiGroups: ["admissionregistration.k8s.io"] + resources: ["mutatingwebhookconfigurations"] + verbs: ["get", "list", "watch", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -90,9 +100,11 @@ data: profiles: - schedulerName: fluence plugins: - preFilter: - enabled: [{name: Fluence}] - filter: + # multiPoint wires Fluence into every extension point its Go type + # implements: PreFilter, Filter, and PreBind (which stamps the backend + # annotation). Listing points individually risks omitting one — that is + # exactly what left PreBind unwired and the backend annotation unset. + multiPoint: enabled: [{name: Fluence}] --- apiVersion: apps/v1 @@ -113,19 +125,21 @@ spec: containers: - name: fluence image: ghcr.io/converged-computing/fluence:latest - # For development only imagePullPolicy: Never command: - /bin/fluence - --config=/etc/fluence/scheduler-config.yaml + # fluence is its own scheduler binary, so it needs the gang gates set + # here (the cluster-level kube-scheduler gates don't apply to it). + # Without these its PodGroup/GangScheduling plugin is inactive, pods + # schedule with no gang semantics, and PodGroup status stays Pending. + - --feature-gates=GenericWorkload=true,GangScheduling=true - --v=4 env: - - name: FLUENCE_QUANTUM_CONFIG - value: /etc/fluence/quantum-backends.yaml - - name: IBM_CLOUD_TOKEN - valueFrom: {secretKeyRef: {name: ibm-quantum, key: token, optional: true}} - - name: IBM_CLOUD_CRN - valueFrom: {secretKeyRef: {name: ibm-quantum, key: crn, optional: true}} + # Path to the resources config (e.g. quantum backends). Unset/empty + # file -> classical-only graph. Supplied by the quantum add-on. + - name: FLUENCE_RESOURCES + value: /etc/fluence/resources.yaml volumeMounts: - name: config mountPath: /etc/fluence @@ -134,4 +148,75 @@ spec: projected: sources: - configMap: {name: fluence-scheduler-config} - - configMap: {name: fluence-quantum-backends, optional: true} \ No newline at end of file + - configMap: {name: fluence-resources, optional: true} +--- +# Mutating webhook: injects scheduler-chosen values into pods at creation time +# (currently a downward-API QRMI_BACKEND env for quantum pods). It self-manages +# TLS — generates a CA + serving cert at startup and patches the caBundle below — +# so no cert-manager and no committed keys. failurePolicy Ignore keeps a webhook +# outage from blocking pod creation cluster-wide. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: fluence-webhook + namespace: kube-system + labels: {app: fluence-webhook} +spec: + replicas: 1 + selector: + matchLabels: {app: fluence-webhook} + template: + metadata: + labels: {app: fluence-webhook} + spec: + serviceAccountName: fluence + containers: + - name: webhook + image: ghcr.io/converged-computing/fluence:latest + imagePullPolicy: Never + command: ["/bin/fluence-webhook"] + ports: + - containerPort: 8443 + readinessProbe: + httpGet: {path: /healthz, port: 8443, scheme: HTTPS} + initialDelaySeconds: 2 +--- +apiVersion: v1 +kind: Service +metadata: + name: fluence-webhook + namespace: kube-system +spec: + selector: {app: fluence-webhook} + ports: + - port: 443 + targetPort: 8443 +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: fluence-webhook +webhooks: + - name: pods.fluence.flux-framework.org + admissionReviewVersions: ["v1"] + sideEffects: None + failurePolicy: Ignore # never block pod creation if the webhook is down + # caBundle is filled in at runtime by the webhook patching this object. + clientConfig: + service: + name: fluence-webhook + namespace: kube-system + path: /mutate + port: 443 + rules: + - apiGroups: [""] + apiVersions: ["v1"] + operations: ["CREATE"] + resources: ["pods"] + scope: Namespaced + # Don't intercept system pods (and avoid bootstrap coupling). + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: ["kube-system"] \ No newline at end of file diff --git a/deploy/kind-config.yaml b/deploy/kind-config.yaml index 811b40e..1ef46da 100644 --- a/deploy/kind-config.yaml +++ b/deploy/kind-config.yaml @@ -1,8 +1,10 @@ -# kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml +# kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config-v1.36.yaml # # Enables the alpha Workload / PodGroup gang-scheduling feature (off by default # in 1.36). Gates are set per-component because GangScheduling is scheduler-only -# (setting it on the apiserver would be rejected as an unknown gate). +# (setting it on the apiserver would be rejected as an unknown gate). extraArgs +# uses the kubeadm v1beta4 list form ({name,value}); the old map form is silently +# mishandled on 1.36 and the gates never apply. kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: @@ -12,11 +14,13 @@ nodes: kind: ClusterConfiguration apiServer: extraArgs: - # Turn on the alpha API group + the API itself. - runtime-config: "scheduling.k8s.io/v1alpha2=true" - feature-gates: "GenericWorkload=true" + - name: runtime-config + value: "scheduling.k8s.io/v1alpha2=true" + - name: feature-gates + value: "GenericWorkload=true" scheduler: extraArgs: - feature-gates: "GenericWorkload=true,GangScheduling=true" + - name: feature-gates + value: "GenericWorkload=true,GangScheduling=true" + - role: worker - role: worker - - role: worker \ No newline at end of file diff --git a/examples/podgroup.yaml b/examples/podgroup.yaml index a59e995..068e56c 100644 --- a/examples/podgroup.yaml +++ b/examples/podgroup.yaml @@ -1,10 +1,10 @@ -# Native gang scheduling (k8s >= 1.35, GangScheduling/GenericWorkload gates on). -# Fluence does placement; the PodGroup gives all-or-nothing semantics. +# Native gang scheduling (k8s >= 1.36, GangScheduling/GenericWorkload gates on). +# Fluence does placement; the PodGroup gives all-or-nothing semantics. Pods link +# to the PodGroup via the first-class field spec.schedulingGroup.podGroupName. apiVersion: scheduling.k8s.io/v1alpha2 kind: PodGroup metadata: name: training - namespace: default spec: schedulingPolicy: gang: @@ -14,7 +14,6 @@ apiVersion: apps/v1 kind: Deployment metadata: name: training - namespace: default spec: replicas: 2 selector: @@ -23,14 +22,15 @@ spec: metadata: labels: app: training - scheduling.k8s.io/pod-group: training spec: schedulerName: fluence + schedulingGroup: + podGroupName: training containers: - name: worker image: busybox command: ["sleep", "3600"] resources: requests: - cpu: "2" - memory: 8Gi + cpu: "4" + memory: 8Gi \ No newline at end of file diff --git a/examples/quantum-pod.yaml b/examples/quantum-pod.yaml new file mode 100644 index 0000000..a619df9 --- /dev/null +++ b/examples/quantum-pod.yaml @@ -0,0 +1,30 @@ +# A quantum workload scheduled by fluence. The pod REQUESTS a quantum backend +# via resources (the fluence device plugin advertises fluxion.flux-framework.org/qpu +# on every node, so NodeResourcesFit is satisfied). Fluence's PreFilter matches +# the request against the resource graph and picks a backend, the webhook injects +# QRMI_BACKEND (the allocated backend) automatically, and note we can add other +# envars here in the future. I chose a webhook because I think this is going to +# be a requirement, and the pod is immutable after creation. +# Then the container submits via qrmi-go (the separate qrmi-sampler image). +# the credentials live on the level of the application container NOT in +# a shared space. +apiVersion: v1 +kind: Pod +metadata: + name: sampler +spec: + schedulerName: fluence + restartPolicy: Never + containers: + - name: sampler + image: ghcr.io/converged-computing/qrmi-sampler:latest + env: + - name: IBM_CLOUD_TOKEN + valueFrom: {secretKeyRef: {name: ibm-quantum, key: token}} + - name: IBM_CLOUD_CRN + valueFrom: {secretKeyRef: {name: ibm-quantum, key: crn}} + resources: + requests: + fluxion.flux-framework.org/qpu: "1" + limits: + fluxion.flux-framework.org/qpu: "1" \ No newline at end of file diff --git a/go.mod b/go.mod index df802a2..7f712a1 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/converged-computing/fluence go 1.26.0 require ( - github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba github.com/flux-framework/flux-sched/resource/reapi/bindings/go v0.0.0-20260526195258-f0e815f1f354 + google.golang.org/grpc v1.79.3 k8s.io/api v0.36.0 k8s.io/apimachinery v0.36.0 k8s.io/component-base v0.36.0 + k8s.io/kube-scheduler v0.36.0 + k8s.io/kubelet v0.36.0 k8s.io/kubernetes v1.36.0 sigs.k8s.io/yaml v1.6.0 ) @@ -91,7 +93,6 @@ require ( golang.org/x/time v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect - google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -108,8 +109,6 @@ require ( k8s.io/klog/v2 v2.140.0 // indirect k8s.io/kms v0.36.0 // indirect k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a // indirect - k8s.io/kube-scheduler v0.36.0 // indirect - k8s.io/kubelet v0.36.0 // indirect k8s.io/streaming v0.36.0 // indirect k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.34.0 // indirect diff --git a/go.sum b/go.sum index d7b875c..a67f060 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba h1:hRm9gcU/geUqAl8OgYSxF4pO/oqanKxxpLZJEzSwzcE= -github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba/go.mod h1:BvdrzMeplw5UgflH0s/VpyuN0eUxXMKDzf5DAC4oQhk= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd/v22 v22.7.0 h1:LAEzFkke61DFROc7zNLX/WA2i5J8gYqe0rSj9KI28KA= diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db383f1..6bb572a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -6,8 +6,10 @@ package cluster import ( "fmt" + "sort" "github.com/converged-computing/fluence/pkg/jgf" + "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/yaml" ) @@ -94,6 +96,31 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { return b.JSON() } +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise for a set of quantum backends. It uses the SAME +// type-derivation rule as AddQuantum — each backend is a `qpu`, and a backend +// with num_qubits > 0 contributes `qubit` — so the device plugin's advertised +// resources and the graph's resource types are derived from one config and +// cannot drift. Names are prefixed with placement.FluxionResourcePrefix so they +// match what the scheduler strips off a pod request. +func FluxionResourceNames(backends []QuantumBackend) []string { + types := map[string]bool{} + if len(backends) > 0 { + types["qpu"] = true + } + for _, b := range backends { + if b.NumQubits > 0 { + types["qubit"] = true + } + } + names := make([]string, 0, len(types)) + for t := range types { + names = append(names, placement.FluxionResourcePrefix+t) + } + sort.Strings(names) + return names +} + // AddQuantum injects a qgateway under the cluster with one qpu vertex per // backend. Exposed so a graph built elsewhere can be augmented the same way. func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) { @@ -108,11 +135,18 @@ func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) if be.Vendor != "" { props["vendor"] = be.Vendor } - b.AddChild(gw, "qpu", "qpu", jgf.Options{ + qpu := b.AddChild(gw, "qpu", "qpu", jgf.Options{ Name: be.Name, Exclusive: true, Properties: props, }) + // Model qubits as a counted child so a request for N qubits matches a + // backend with at least that many (Fluxion count matching is >=). This + // is how the numeric "at least N qubits" ask is expressed without a + // numeric constraint (RFC 31 properties are boolean tags, not >=). + if be.NumQubits > 0 { + b.AddChild(qpu, "qubit", "qubit", jgf.Options{Size: int64(be.NumQubits)}) + } } } @@ -144,4 +178,4 @@ func orDefault(s, def string) string { return def } return s -} +} \ No newline at end of file diff --git a/pkg/deviceplugin/plugin.go b/pkg/deviceplugin/plugin.go new file mode 100644 index 0000000..cc98012 --- /dev/null +++ b/pkg/deviceplugin/plugin.go @@ -0,0 +1,172 @@ +package deviceplugin + +// Package deviceplugin implements a Kubernetes device plugin that advertises +// "quantum reachability" as a counted extended resource on every node. +// +// A quantum backend is not hardware attached to a node — it is a remote API any +// node can call (subject to the user's access). So the plugin advertises a large +// per-node ceiling of a single counted resource (quantum.flux-framework.org/qpu): +// this is what lets a pod write `resources.requests: {quantum.flux-framework.org/qpu: "1"}` +// and have the in-tree NodeResourcesFit plugin be satisfied (no wrapper needed). +// +// The count is a local admission gate only. Whether a backend is actually +// available and which one is matched is decided by Fluxion in the scheduler, and +// the real per-user limit lives on the IBM API — neither of which is node-local, +// which is exactly why the ceiling is large rather than a true quota. + +import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + "strings" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Plugin is a device-plugin server for a single counted resource. +type Plugin struct { + pluginapi.UnimplementedDevicePluginServer + + resourceName string + capacity int + socket string + + server *grpc.Server + devices []*pluginapi.Device + stop chan struct{} +} + +// New builds a plugin advertising `capacity` units of resourceName. The socket +// and device IDs are derived from the resource name so multiple plugins can run +// in one process without colliding. +func New(resourceName string, capacity int) *Plugin { + tag := sanitize(resourceName) + devs := make([]*pluginapi.Device, 0, capacity) + for i := 0; i < capacity; i++ { + devs = append(devs, &pluginapi.Device{ + ID: fmt.Sprintf("%s-%d", tag, i), + Health: pluginapi.Healthy, + }) + } + sock := filepath.Join(pluginapi.DevicePluginPath, tag+".sock") + return &Plugin{ + resourceName: resourceName, + capacity: capacity, + socket: sock, + devices: devs, + stop: make(chan struct{}), + } +} + +// sanitize turns a resource name into a filesystem/identifier-safe tag, e.g. +// "fluxion.flux-framework.org/qpu" -> "fluxion-flux-framework-org-qpu". +func sanitize(name string) string { + repl := func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') { + return r + } + return '-' + } + return strings.Map(repl, name) +} + +// Run serves the plugin and registers it with the kubelet, blocking until the +// context is cancelled. +func (p *Plugin) Run(ctx context.Context) error { + if err := p.serve(); err != nil { + return err + } + defer p.server.Stop() + + if err := p.register(ctx); err != nil { + return fmt.Errorf("register with kubelet: %w", err) + } + + <-ctx.Done() + close(p.stop) + return nil +} + +func (p *Plugin) serve() error { + if err := os.Remove(p.socket); err != nil && !os.IsNotExist(err) { + return err + } + lis, err := net.Listen("unix", p.socket) + if err != nil { + return fmt.Errorf("listen on %s: %w", p.socket, err) + } + p.server = grpc.NewServer() + pluginapi.RegisterDevicePluginServer(p.server, p) + go func() { _ = p.server.Serve(lis) }() + return nil +} + +func (p *Plugin) register(ctx context.Context) error { + conn, err := grpc.NewClient( + "unix://"+pluginapi.KubeletSocket, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + defer conn.Close() + + client := pluginapi.NewRegistrationClient(conn) + _, err = client.Register(ctx, &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: filepath.Base(p.socket), + ResourceName: p.resourceName, + }) + return err +} + +// GetDevicePluginOptions: no pre-start hook needed. +func (p *Plugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{PreStartRequired: false}, nil +} + +// ListAndWatch streams the (static) device list to the kubelet. +func (p *Plugin) ListAndWatch(_ *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { + return err + } + // Static list: keep the stream open, re-sending periodically until shutdown. + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-p.stop: + return nil + case <-ticker.C: + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { + return err + } + } + } +} + +// Allocate is a no-op: a quantum "device" is just a reachability token, so no +// env vars, mounts, or device nodes are injected. The workload gets its backend +// from the scheduler and its credentials from a Secret. +func (p *Plugin) Allocate(_ context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + resp := &pluginapi.AllocateResponse{} + for range req.ContainerRequests { + resp.ContainerResponses = append(resp.ContainerResponses, &pluginapi.ContainerAllocateResponse{}) + } + return resp, nil +} + +// GetPreferredAllocation: no preference. +func (p *Plugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +// PreStartContainer: nothing to do. +func (p *Plugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index a615839..82f5943 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" fwk "k8s.io/kube-scheduler/framework" ) @@ -37,13 +38,15 @@ type Fluence struct { matcher *graph.FluxionGraph mu sync.Mutex - // placement maps a pod-group key to the nodes chosen for the group. - placement map[string][]string + // placement maps a pod-group key to the placement chosen for the group + // (nodes + allocated backend). + placement map[string]placement.Placement } var ( _ fwk.PreFilterPlugin = (*Fluence)(nil) _ fwk.FilterPlugin = (*Fluence)(nil) + _ fwk.PreBindPlugin = (*Fluence)(nil) ) // New builds the plugin: discover cluster nodes, optionally inject quantum @@ -51,7 +54,8 @@ var ( // // Configuration (for now via env; can move to plugin args): // -// FLUENCE_QUANTUM_CONFIG path to a YAML/JSON list of quantum backends +// FLUENCE_RESOURCES path to a YAML/JSON resources config (e.g. quantum +// backends). Unset = classical-only graph. // FLUENCE_MATCH_POLICY Fluxion match policy (default "first") func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error) { // List nodes via the API. The scheduler's shared snapshot is empty at @@ -64,19 +68,25 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error return nil, fmt.Errorf("list nodes: %w", err) } - // Classical compute always comes from the cluster nodes. Quantum resources - // are added only when a backends config is provided. + // Classical compute always comes from the cluster nodes. Quantum/other + // resources are added only when a resources config is present. FLUENCE_RESOURCES + // is set on the base scheduler but the file only exists once the resources + // add-on is applied, so a missing file is normal (classical-only), not fatal. opts := cluster.Options{} - if path := os.Getenv("FLUENCE_QUANTUM_CONFIG"); path != "" { + if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { raw, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("read quantum config: %w", err) + switch { + case err == nil: + qc, err := cluster.LoadQuantumConfig(raw) + if err != nil { + return nil, err + } + opts.Quantum = qc.Backends + case os.IsNotExist(err): + // No resources config mounted -> classical-only graph. + default: + return nil, fmt.Errorf("read resources config %s: %w", path, err) } - qc, err := cluster.LoadQuantumConfig(raw) - if err != nil { - return nil, err - } - opts.Quantum = qc.Backends } jgfBytes, err := cluster.BuildGraph(nodeList.Items, opts) @@ -100,7 +110,7 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error return &Fluence{ handle: h, matcher: matcher, - placement: map[string][]string{}, + placement: map[string]placement.Placement{}, }, nil } @@ -147,16 +157,21 @@ func (f *Fluence) PreFilter( if err != nil { return nil, fwk.AsStatus(err) } - if len(place.Nodes) == 0 { - return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no node placement") + if len(place.Nodes) == 0 && place.Backend == "" { + return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation") } + // Note: a quantum-only allocation has a Backend but no Nodes (a qpu vertex + // lives under the qgateway, not under a compute node). That is valid — the + // backend is a remote API reachable from any node — so we do not require a + // node here; Filter imposes no node constraint in that case. f.mu.Lock() - f.placement[group] = place.Nodes + f.placement[group] = place f.mu.Unlock() - // place.Backend (quantum) would be recorded on the pod(s) here so the - // workload knows which QRMI backend to submit to (e.g. via annotation/env). + // The allocated backend is recorded onto each pod in PreBind (container env + // is immutable post-creation, but annotations can be patched); the + // webhook-injected downward-API env then surfaces it as QRMI_BACKEND. return nil, fwk.NewStatus(fwk.Success) } @@ -173,9 +188,16 @@ func (f *Fluence) Filter( group := groupKey(pod) f.mu.Lock() - nodes := f.placement[group] + nodes := f.placement[group].Nodes f.mu.Unlock() + // A quantum-only allocation pins no node (the backend is a remote API any + // node can reach), so impose no constraint; the qpu device plugin already + // gates which nodes can admit the pod. + if len(nodes) == 0 { + return fwk.NewStatus(fwk.Success) + } + for _, n := range nodes { if n == nodeInfo.Node().Name { return fwk.NewStatus(fwk.Success) @@ -184,29 +206,78 @@ func (f *Fluence) Filter( return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group") } -// groupPods returns the pods belonging to the same group as pod, by label. +// PreBindPreFlight runs before PreBind. It returns Success when this plugin has +// a backend to stamp on the pod (a quantum group), and Skip otherwise so the +// framework doesn't call PreBind needlessly. It is lightweight: it only reads +// the cached group placement, no API calls. +func (f *Fluence) PreBindPreFlight( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + nodeName string, +) (*fwk.PreBindPreFlightResult, *fwk.Status) { + f.mu.Lock() + backend := f.placement[groupKey(pod)].Backend + f.mu.Unlock() + if backend == "" { + return nil, fwk.NewStatus(fwk.Skip) + } + return nil, fwk.NewStatus(fwk.Success) +} + +// PreBind writes the backend Fluxion allocated for this pod's group onto the pod +// as the annotation placement.BackendAnnotation. The mutating webhook has +// already wired a downward-API env (QRMI_BACKEND) that reads this annotation, so +// the container sees the backend as an ordinary env var. Container env cannot be +// patched after creation, which is why the value travels via an annotation. +func (f *Fluence) PreBind( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + nodeName string, +) *fwk.Status { + f.mu.Lock() + backend := f.placement[groupKey(pod)].Backend + f.mu.Unlock() + if backend == "" { + return fwk.NewStatus(fwk.Success) // nothing to do; PreBindPreFlight skips these + } + + patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.BackendAnnotation, backend) + _, err := f.handle.ClientSet().CoreV1().Pods(pod.Namespace).Patch( + ctx, pod.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + } + return fwk.NewStatus(fwk.Success) +} + +// groupPods returns the pods belonging to the same native PodGroup as pod +// (spec.schedulingGroup.podGroupName). That field is not label-selectable, so we +// list the namespace and filter in code. A pod with no scheduling group is its +// own group of one. func (f *Fluence) groupPods(pod *corev1.Pod) ([]corev1.Pod, error) { - group := pod.Labels[placement.PodGroupLabel] + group := placement.PodGroupName(pod) if group == "" { - // Singleton pod: treat it as its own group of one. return []corev1.Pod{*pod}, nil } - sel := labels.SelectorFromSet(labels.Set{placement.PodGroupLabel: group}) list, err := f.handle.SharedInformerFactory().Core().V1().Pods().Lister(). - Pods(pod.Namespace).List(sel) + Pods(pod.Namespace).List(labels.Everything()) if err != nil { return nil, err } out := make([]corev1.Pod, 0, len(list)) for _, p := range list { - out = append(out, *p) + if placement.PodGroupName(p) == group { + out = append(out, *p) + } } return out, nil } // groupKey is the cache key for a pod's group (namespace-scoped). func groupKey(pod *corev1.Pod) string { - if g := pod.Labels[placement.PodGroupLabel]; g != "" { + if g := placement.PodGroupName(pod); g != "" { return pod.Namespace + "/" + g } return pod.Namespace + "/" + pod.Name diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index e106540..2b65aad 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -2,6 +2,8 @@ package placement import ( "fmt" + "sort" + "strings" "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/quantum" @@ -9,65 +11,97 @@ import ( ) const ( - // QuantumResource is the extended resource a pod requests to be placed on a - // quantum backend (a qpu vertex) instead of classical compute. - QuantumResource corev1.ResourceName = "quantum.flux-framework.org/qpu" + // FluxionResourcePrefix marks an extended resource whose suffix is a Fluxion + // graph type. A request for fluxion.flux-framework.org/ is translated + // generically into a jobspec count of — no per-type code. Anything the + // graph models as a count (qpu, qubit, ...) is requestable this way. + FluxionResourcePrefix = "fluxion.flux-framework.org/" - // PodGroupLabel and PodGroupSizeLabel mirror the native PodGroup wiring; a - // pod carries its group name and the group's total size so the scheduler can - // match the whole gang at once. - PodGroupLabel = "scheduling.k8s.io/pod-group" - PodGroupSizeLabel = "fluence.flux-framework.org/group-size" + // BackendAnnotation is where the scheduler records the Fluxion-allocated + // backend for a pod. The mutating webhook wires a downward-API env + // (QRMI_BACKEND) that reads this annotation. + BackendAnnotation = "fluence.flux-framework.org/backend" ) -// podRes is the classical/quantum resource ask distilled from a pod. -type podRes struct { - cpu int - gpu int - quantum bool +// PodGroupName returns the native (Kubernetes 1.36) scheduling-group name a pod +// belongs to, from spec.schedulingGroup.podGroupName, or "" if the pod is not +// part of a group. This is the first-class field that links a Pod to its +// PodGroup object; the pre-1.36 label/annotation pattern is gone. +func PodGroupName(pod *corev1.Pod) string { + if sg := pod.Spec.SchedulingGroup; sg != nil && sg.PodGroupName != nil { + return *sg.PodGroupName + } + return "" } -// podResources sums container requests into whole cores/gpus and detects a -// quantum request. -func podResources(p *corev1.Pod) podRes { - var r podRes +// podResources distills a pod's container requests into Fluxion resource counts +// keyed by Fluxion graph type (e.g. "core", "gpu", "qpu", "qubit"). +// +// Kubernetes names its native resources (cpu, memory, nvidia.com/gpu), so those +// get a small fixed mapping to graph types. Every resource named +// fluxion.flux-framework.org/ is passed through generically as , +// with no knowledge of what the type means — if the graph has it as a count, +// Fluxion will verify it. +func podResources(p *corev1.Pod) map[string]int { + counts := map[string]int{} for i := range p.Spec.Containers { - req := p.Spec.Containers[i].Resources.Requests - if q, ok := req[corev1.ResourceCPU]; ok { - r.cpu += int(q.Value()) // Value() rounds millicores up to whole cores - } - if q, ok := req["nvidia.com/gpu"]; ok { - r.gpu += int(q.Value()) - } - if _, ok := req[QuantumResource]; ok { - r.quantum = true + for name, q := range p.Spec.Containers[i].Resources.Requests { + switch { + case name == corev1.ResourceCPU: + counts["core"] += int(q.Value()) // rounds millicores up to whole cores + case name == corev1.ResourceMemory: + counts["memory"] += int(q.Value() / (1000 * 1000)) // bytes -> MB + case name == "nvidia.com/gpu": + counts["gpu"] += int(q.Value()) + case strings.HasPrefix(string(name), FluxionResourcePrefix): + t := strings.TrimPrefix(string(name), FluxionResourcePrefix) + counts[t] += int(q.Value()) + } } } - if !r.quantum && r.cpu == 0 { - r.cpu = 1 // every classical pod needs at least one core to match + // A pod that requested no exotic (non-classical) resource still needs at + // least one core to land on. + if !hasExotic(counts) && counts["core"] == 0 { + counts["core"] = 1 + } + return counts +} + +// hasExotic reports whether counts contains any non-classical type (i.e. one +// that came through the Fluxion prefix, like qpu/qubit). +func hasExotic(counts map[string]int) bool { + for t := range counts { + switch t { + case "core", "memory", "gpu": + default: + return true + } } - return r + return false } // JobspecForGroup builds a Fluxion jobspec for a whole pod group: a slot per pod -// (count = group size), each holding the per-pod resources. The group is -// assumed homogeneous (same shape per pod), which is the common case for a gang; -// heterogeneous groups are a TODO. +// (count = group size), each holding the per-pod resources as `with` entries — +// one per requested Fluxion type. A hybrid pod (e.g. cores + a qpu) produces a +// slot with both, so classical and quantum are requested together. The group is +// assumed homogeneous (same shape per pod); heterogeneous groups are a TODO. func JobspecForGroup(groupName string, pods []corev1.Pod) (*jobspec.Jobspec, error) { if len(pods) == 0 { return nil, fmt.Errorf("pod group %q has no pods", groupName) } - r := podResources(&pods[0]) + counts := podResources(&pods[0]) + + // Deterministic order for stable jobspecs/tests. + types := make([]string, 0, len(counts)) + for t := range counts { + types = append(types, t) + } + sort.Strings(types) var with []jobspec.Resource - if r.quantum { - with = []jobspec.Resource{{Type: "qpu", Count: 1}} - } else { - if r.cpu > 0 { - with = append(with, jobspec.Resource{Type: "core", Count: r.cpu}) - } - if r.gpu > 0 { - with = append(with, jobspec.Resource{Type: "gpu", Count: r.gpu}) + for _, t := range types { + if counts[t] > 0 { + with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) } } diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index 518904d..2def510 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -1,55 +1,99 @@ package placement import ( - "strings" "testing" + "github.com/converged-computing/fluence/pkg/jobspec" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func cpuPod(name string, cpu, gpu int64) corev1.Pod { - req := corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(cpu, resource.DecimalSI)} - if gpu > 0 { - req["nvidia.com/gpu"] = *resource.NewQuantity(gpu, resource.DecimalSI) - } +func podWith(name string, req corev1.ResourceList) corev1.Pod { return corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: corev1.PodSpec{Containers: []corev1.Container{{Resources: corev1.ResourceRequirements{Requests: req}}}}, } } -func TestJobspecForGroupClassical(t *testing.T) { - pods := []corev1.Pod{cpuPod("p0", 4, 1), cpuPod("p1", 4, 1), cpuPod("p2", 4, 1)} +func qty(n int64) resource.Quantity { return *resource.NewQuantity(n, resource.DecimalSI) } + +// withType returns the count for a given Fluxion type in the slot's `with`. +func withType(js *jobspec.Jobspec, t string) (int, bool) { + for _, w := range js.Resources[0].With { + if w.Type == t { + return w.Count, true + } + } + return 0, false +} + +func TestClassical(t *testing.T) { + pods := []corev1.Pod{ + podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), + podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), + } js, err := JobspecForGroup("grp", pods) if err != nil { t.Fatal(err) } - if js.Resources[0].Count != 3 { - t.Fatalf("slot count = %d, want 3", js.Resources[0].Count) + if js.Resources[0].Count != 2 { + t.Fatalf("slot count = %d, want 2", js.Resources[0].Count) + } + if c, _ := withType(js, "core"); c != 4 { + t.Errorf("core = %d, want 4", c) + } + if c, _ := withType(js, "gpu"); c != 1 { + t.Errorf("gpu = %d, want 1", c) + } + if _, ok := withType(js, "qpu"); ok { + t.Error("classical pod should not request qpu") + } +} + +func TestGenericQuantumCount(t *testing.T) { + // fluxion.flux-framework.org/qpu: 1 -> a qpu count, with no per-type code. + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) + js, err := JobspecForGroup("qgrp", []corev1.Pod{p}) + if err != nil { + t.Fatal(err) + } + if c, ok := withType(js, "qpu"); !ok || c != 1 { + t.Fatalf("qpu = %d (ok=%v), want 1", c, ok) } - y, _ := js.YAML() - if !strings.Contains(y, "core") || !strings.Contains(y, "gpu") { - t.Fatalf("missing core/gpu:\n%s", y) + // no classical core forced on an exotic-only request + if _, ok := withType(js, "core"); ok { + t.Error("quantum-only pod should not be forced to request a core") } } -func TestJobspecForGroupQuantum(t *testing.T) { - q := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "q0"}, - Spec: corev1.PodSpec{Containers: []corev1.Container{{ - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{ - QuantumResource: *resource.NewQuantity(1, resource.DecimalSI), - }}, - }}}, - } - js, err := JobspecForGroup("qgrp", []corev1.Pod{q}) +func TestGenericQubitCount(t *testing.T) { + // "at least 156 qubits" expressed as a count (Fluxion count match is >=). + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qubit": qty(156)}) + js, err := JobspecForGroup("qubits", []corev1.Pod{p}) if err != nil { t.Fatal(err) } - if js.Resources[0].With[0].Type != "qpu" { - t.Fatalf("want qpu, got %+v", js.Resources[0].With) + if c, ok := withType(js, "qubit"); !ok || c != 156 { + t.Fatalf("qubit = %d (ok=%v), want 156", c, ok) + } +} + +func TestHybrid(t *testing.T) { + // cores AND a qpu in the same pod -> both appear in the slot. + p := podWith("h", corev1.ResourceList{ + corev1.ResourceCPU: qty(2), + FluxionResourcePrefix + "qpu": qty(1), + }) + js, err := JobspecForGroup("hyb", []corev1.Pod{p}) + if err != nil { + t.Fatal(err) + } + if c, _ := withType(js, "core"); c != 2 { + t.Errorf("core = %d, want 2", c) + } + if c, _ := withType(js, "qpu"); c != 1 { + t.Errorf("qpu = %d, want 1", c) } } @@ -70,3 +114,23 @@ func TestPlacementFromAllocation(t *testing.T) { t.Fatalf("backend = %q", p.Backend) } } + +func TestPlacementQuantumOnly(t *testing.T) { + // A pure-quantum allocation has a qpu (under qgateway) but NO node vertex. + // Nodes must be empty and Backend set — fluence then imposes no node constraint. + alloc := `{"graph":{"nodes":[ + {"metadata":{"type":"cluster","name":"kind"}}, + {"metadata":{"type":"qgateway","name":"qgateway0"}}, + {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}, + {"metadata":{"type":"qubit","name":"qubit0"}}]}}` + p, err := PlacementFromAllocation(alloc) + if err != nil { + t.Fatal(err) + } + if len(p.Nodes) != 0 { + t.Fatalf("quantum-only allocation should have no nodes, got %v", p.Nodes) + } + if p.Backend != "ibm_marrakesh" { + t.Fatalf("backend = %q, want ibm_marrakesh", p.Backend) + } +} diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go new file mode 100644 index 0000000..08e8364 --- /dev/null +++ b/pkg/webhook/webhook.go @@ -0,0 +1,201 @@ +// Package webhook is fluence's mutating admission webhook. Its job is to make +// scheduler-chosen values reach a pod's containers without the user wiring +// anything. Container env is immutable after a pod is created, so the scheduler +// cannot write it directly; instead this webhook injects, at pod-creation time, +// a downward-API env that reads an annotation the scheduler fills in later +// (during PreBind). The user writes a plain pod; the plumbing is automatic. +// +// Current rule: for a pod scheduled by fluence whose container requests a +// fluxion.flux-framework.org/* resource, inject QRMI_BACKEND sourced from the +// fluence backend annotation. New mutation rules can be added in Mutate. +// +// The webhook also manages its own TLS: it generates a self-signed CA + serving +// certificate at startup and patches its MutatingWebhookConfiguration's caBundle, +// so the install needs no cert-manager and no committed keys. +package webhook + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/base64" + "encoding/json" + "encoding/pem" + "fmt" + "io" + "math/big" + "net/http" + "strings" + "time" + + "github.com/converged-computing/fluence/pkg/placement" + + admissionv1 "k8s.io/api/admission/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// SchedulerName is the scheduler whose pods this webhook mutates. +const SchedulerName = "fluence" + +// jsonPatchOp is a single RFC 6902 JSON Patch operation. +type jsonPatchOp struct { + Op string `json:"op"` + Path string `json:"path"` + Value any `json:"value,omitempty"` +} + +// backendEnv is the downward-API env injected into quantum containers. To the +// app it is an ordinary env var; its value comes from the fluence backend +// annotation, which the scheduler sets in PreBind. +func backendEnv() corev1.EnvVar { + return corev1.EnvVar{ + Name: "QRMI_BACKEND", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", placement.BackendAnnotation), + }, + }, + } +} + +// Mutate returns the JSON Patch operations for a pod, or nil if nothing applies. +func Mutate(pod *corev1.Pod) []jsonPatchOp { + if pod.Spec.SchedulerName != SchedulerName { + return nil + } + var ops []jsonPatchOp + for i, c := range pod.Spec.Containers { + if !requestsFluxionResource(c) || hasEnv(c, "QRMI_BACKEND") { + continue + } + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env", i), + Value: []corev1.EnvVar{backendEnv()}, + }) + } else { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env/-", i), + Value: backendEnv(), + }) + } + } + return ops +} + +func requestsFluxionResource(c corev1.Container) bool { + for name := range c.Resources.Requests { + if strings.HasPrefix(string(name), placement.FluxionResourcePrefix) { + return true + } + } + return false +} + +func hasEnv(c corev1.Container, name string) bool { + for _, e := range c.Env { + if e.Name == name { + return true + } + } + return false +} + +// Handler is the /mutate endpoint. It always admits the pod (failure to mutate +// must not block creation); it only adds a patch when Mutate returns one. +func Handler(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var review admissionv1.AdmissionReview + if err := json.Unmarshal(body, &review); err != nil || review.Request == nil { + http.Error(w, "bad admission review", http.StatusBadRequest) + return + } + + resp := &admissionv1.AdmissionResponse{UID: review.Request.UID, Allowed: true} + var pod corev1.Pod + if err := json.Unmarshal(review.Request.Object.Raw, &pod); err == nil { + if ops := Mutate(&pod); len(ops) > 0 { + if patch, err := json.Marshal(ops); err == nil { + pt := admissionv1.PatchTypeJSONPatch + resp.Patch = patch + resp.PatchType = &pt + } + } + } + + out := admissionv1.AdmissionReview{TypeMeta: review.TypeMeta, Response: resp} + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(out) +} + +// GenerateCerts returns a self-signed CA (PEM) and a serving cert+key (PEM) valid +// for the given DNS names. The CA PEM is what the apiserver must trust (caBundle). +func GenerateCerts(dnsNames []string) (caPEM, certPEM, keyPEM []byte, err error) { + caKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, nil, nil, err + } + caTmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "fluence-webhook-ca"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().AddDate(10, 0, 0), + IsCA: true, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + BasicConstraintsValid: true, + } + caDER, err := x509.CreateCertificate(rand.Reader, caTmpl, caTmpl, &caKey.PublicKey, caKey) + if err != nil { + return nil, nil, nil, err + } + caCert, err := x509.ParseCertificate(caDER) + if err != nil { + return nil, nil, nil, err + } + + leafKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, nil, nil, err + } + leafTmpl := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{CommonName: dnsNames[0]}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().AddDate(10, 0, 0), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: dnsNames, + } + leafDER, err := x509.CreateCertificate(rand.Reader, leafTmpl, caCert, &leafKey.PublicKey, caKey) + if err != nil { + return nil, nil, nil, err + } + + caPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDER}) + certPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: leafDER}) + keyPEM = pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(leafKey)}) + return caPEM, certPEM, keyPEM, nil +} + +// EnsureCABundle patches the named MutatingWebhookConfiguration so its first +// webhook trusts caPEM. +func EnsureCABundle(ctx context.Context, client kubernetes.Interface, configName string, caPEM []byte) error { + patch := fmt.Sprintf( + `[{"op":"replace","path":"/webhooks/0/clientConfig/caBundle","value":%q}]`, + base64.StdEncoding.EncodeToString(caPEM), + ) + _, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Patch( + ctx, configName, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) + return err +} diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go new file mode 100644 index 0000000..c5a164c --- /dev/null +++ b/pkg/webhook/webhook_test.go @@ -0,0 +1,56 @@ +package webhook + +import ( + "testing" + + "github.com/converged-computing/fluence/pkg/placement" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +func qpuPod(scheduler string, withEnv bool) *corev1.Pod { + c := corev1.Container{ + Name: "app", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + placement.FluxionResourcePrefix + "qpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + } + if withEnv { + c.Env = []corev1.EnvVar{{Name: "QRMI_BACKEND", Value: "preset"}} + } + return &corev1.Pod{Spec: corev1.PodSpec{SchedulerName: scheduler, Containers: []corev1.Container{c}}} +} + +func TestMutateInjectsBackendEnv(t *testing.T) { + ops := Mutate(qpuPod("fluence", false)) + if len(ops) != 1 { + t.Fatalf("want 1 op, got %d", len(ops)) + } + if ops[0].Path != "/spec/containers/0/env" { + t.Errorf("path = %q", ops[0].Path) + } +} + +func TestMutateSkipsOtherScheduler(t *testing.T) { + if ops := Mutate(qpuPod("default-scheduler", false)); ops != nil { + t.Fatalf("non-fluence pod should not be mutated, got %v", ops) + } +} + +func TestMutateRespectsExistingEnv(t *testing.T) { + if ops := Mutate(qpuPod("fluence", true)); ops != nil { + t.Fatalf("should not override an existing QRMI_BACKEND, got %v", ops) + } +} + +func TestMutateSkipsNonQuantum(t *testing.T) { + p := &corev1.Pod{Spec: corev1.PodSpec{ + SchedulerName: "fluence", + Containers: []corev1.Container{{Name: "c", Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI)}}}}, + }} + if ops := Mutate(p); ops != nil { + t.Fatalf("classical pod should not be mutated, got %v", ops) + } +}