From e749d7318a9f1458afc7c48d2a46d97ac1fd81af Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 10 Jun 2026 19:06:22 -0700 Subject: [PATCH] feat: fix kind deployment and add informer we need an informer to use an annotation on a single pod (anything not a podgroup) or a podgroup to send cancel to fluxion Signed-off-by: vsoch --- README.md | 12 +- deploy/kind-config.yaml | 11 +- pkg/fluence/fluence.go | 222 ++++++++++++++++++++++++++++--------- pkg/graph/graph.go | 8 ++ pkg/placement/placement.go | 6 + 5 files changed, 207 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index eafb62f..ba2c24f 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,17 @@ kubectl logs sampler -f {"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} 2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh ``` -Boum! +Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources. + +```bash +kubectl logs -n kube-system fluence-75d6848778-g4lh6 +... +I0610 18:33:05.843325 1 eventhandlers.go:443] "Delete event for scheduled pod" pod="default/sampler" + 🌀 Cancel jobid: 1 +(env) (base) vanessa@vanessa-ThinkPad-P14s-Gen-4:~/Desktop/Code/fluence$ kubectl get pods +NAME READY STATUS RESTARTS AGE +sampler 0/1 Completed 0 24s +``` ### A note on deletion diff --git a/deploy/kind-config.yaml b/deploy/kind-config.yaml index 1ef46da..ec310bc 100644 --- a/deploy/kind-config.yaml +++ b/deploy/kind-config.yaml @@ -14,6 +14,7 @@ nodes: kind: ClusterConfiguration apiServer: extraArgs: + # Turn on the alpha API group + the API itself. - name: runtime-config value: "scheduling.k8s.io/v1alpha2=true" - name: feature-gates @@ -22,5 +23,13 @@ nodes: extraArgs: - name: feature-gates value: "GenericWorkload=true,GangScheduling=true" + controllerManager: + extraArgs: + # The podgroup-protection-controller (which removes the + # scheduling.k8s.io/podgroup-protection finalizer once a PodGroup's + # pods are gone) is gated on GenericWorkload. Without this, PodGroup + # deletion hangs on the finalizer forever. + - name: feature-gates + value: "GenericWorkload=true" - role: worker - - role: worker + - role: worker \ No newline at end of file diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 82f5943..50bd82b 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -3,7 +3,9 @@ package fluence import ( "context" "fmt" + "log" "os" + "strconv" "sync" "github.com/converged-computing/fluence/pkg/cluster" @@ -11,24 +13,34 @@ import ( "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" + schedv1a2 "k8s.io/api/scheduling/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" fwk "k8s.io/kube-scheduler/framework" ) // The scheduler-framework types live in the staging module // k8s.io/kube-scheduler/framework (imported as fwk). The plugin is built into // the scheduler binary via cmd/fluence. Signatures here are verified against -// k8s.io/kube-scheduler v0.36.0; keep them in lockstep with the k8s version you -// run on (CycleState and NodeInfo are interfaces, and PreFilter takes the -// candidate node list). +// k8s.io/kube-scheduler v0.36.0. // Name is the plugin name registered with the scheduler and referenced in the // KubeSchedulerConfiguration. const Name = "Fluence" +// groupAlloc is the in-memory record of a group's Fluxion allocation. It is a +// rebuildable, within-lifetime memo: its job is race-free "match once per group" +// dedup on the scheduling path (the durable record is the jobid annotation on +// the owning object). It does not survive a scheduler restart, which is fine — +// the graph itself is rebuilt fresh on restart. +type groupAlloc struct { + place placement.Placement + jobid uint64 +} + // Fluence is a scheduler-framework plugin that places whole pod groups by // matching them against a flux-sched resource graph built from the live cluster // (plus any configured quantum resources). Gang/all-or-nothing semantics are @@ -37,10 +49,14 @@ type Fluence struct { handle fwk.Handle matcher *graph.FluxionGraph + // matcherMu serializes all access to the cgo Fluxion client, which is not + // thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in + // informer handler goroutines, so the two can race without this. + matcherMu sync.Mutex + mu sync.Mutex - // placement maps a pod-group key to the placement chosen for the group - // (nodes + allocated backend). - placement map[string]placement.Placement + // placement maps a group key to its allocation (nodes, backend, jobid). + placement map[string]groupAlloc } var ( @@ -50,28 +66,25 @@ var ( ) // New builds the plugin: discover cluster nodes, optionally inject quantum -// resources, write the JGF graph, and initialize the Fluxion matcher. -// -// Configuration (for now via env; can move to plugin args): +// resources, write the JGF graph, initialize the Fluxion matcher, and register +// the delete handlers that cancel allocations when their owning object is gone. // // FLUENCE_RESOURCES path to a YAML/JSON resources config (e.g. quantum // backends). Unset = classical-only graph. // FLUENCE_MATCH_POLICY Fluxion match policy (default "first") func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error) { // List nodes via the API. The scheduler's shared snapshot is empty at - // plugin-construction time (it is populated per scheduling cycle once - // informers have synced), so a direct List is what actually gives us the - // cluster's compute. We assume a static cluster for now: this is read once - // at startup and the graph is not updated as nodes come and go. + // plugin-construction time, so a direct List is what gives us the cluster's + // compute. Static cluster for now: read once, graph not updated live. nodeList, err := h.ClientSet().CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("list nodes: %w", err) } - // Classical compute always comes from the cluster nodes. Quantum/other - // resources are added only when a resources config is present. FLUENCE_RESOURCES - // is set on the base scheduler but the file only exists once the resources - // add-on is applied, so a missing file is normal (classical-only), not fatal. + // Quantum/other resources are added only when a resources config is present. + // FLUENCE_RESOURCES is set on the base scheduler but the file only exists once + // the resources add-on is applied, so a missing file is classical-only, not + // fatal. opts := cluster.Options{} if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { raw, err := os.ReadFile(path) @@ -107,19 +120,21 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error matcher := &graph.FluxionGraph{MatchFormat: "jgf"} matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") - return &Fluence{ + f := &Fluence{ handle: h, matcher: matcher, - placement: map[string]placement.Placement{}, - }, nil + placement: map[string]groupAlloc{}, + } + f.registerCancelHandlers() + return f, nil } // Name returns the plugin name. func (f *Fluence) Name() string { return Name } // PreFilter runs once per scheduling cycle for a pod. The first pod of a group -// triggers a single match for the whole gang; the resulting node assignment is -// cached and consumed by Filter for every pod in the group. +// triggers a single match for the whole gang; the resulting placement (and the +// Fluxion jobid) is cached and consumed by Filter/PreBind for every pod. func (f *Fluence) PreFilter( ctx context.Context, state fwk.CycleState, @@ -149,7 +164,9 @@ func (f *Fluence) PreFilter( return nil, fwk.AsStatus(err) } + f.matcherMu.Lock() req, err := f.matcher.MatchAllocateSpec(specYAML) + f.matcherMu.Unlock() if err != nil { return nil, fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err)) } @@ -160,18 +177,16 @@ func (f *Fluence) PreFilter( if len(place.Nodes) == 0 && place.Backend == "" { return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation") } - // Note: a quantum-only allocation has a Backend but no Nodes (a qpu vertex - // lives under the qgateway, not under a compute node). That is valid — the - // backend is a remote API reachable from any node — so we do not require a - // node here; Filter imposes no node constraint in that case. + // A quantum-only allocation has a Backend but no Nodes (a qpu vertex lives + // under the qgateway, not under a compute node). That is valid — the backend + // is reachable from any node — so Filter imposes no node constraint then. f.mu.Lock() - f.placement[group] = place + f.placement[group] = groupAlloc{place: place, jobid: req.Number} f.mu.Unlock() - // The allocated backend is recorded onto each pod in PreBind (container env - // is immutable post-creation, but annotations can be patched); the - // webhook-injected downward-API env then surfaces it as QRMI_BACKEND. + // The jobid (for cancel) and any backend (for the webhook env) are written + // onto the owning object in PreBind, the commit phase. return nil, fwk.NewStatus(fwk.Success) } @@ -188,7 +203,7 @@ func (f *Fluence) Filter( group := groupKey(pod) f.mu.Lock() - nodes := f.placement[group].Nodes + nodes := f.placement[group].place.Nodes f.mu.Unlock() // A quantum-only allocation pins no node (the backend is a remote API any @@ -206,10 +221,9 @@ func (f *Fluence) Filter( return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group") } -// PreBindPreFlight runs before PreBind. It returns Success when this plugin has -// a backend to stamp on the pod (a quantum group), and Skip otherwise so the -// framework doesn't call PreBind needlessly. It is lightweight: it only reads -// the cached group placement, no API calls. +// PreBindPreFlight runs before PreBind. It returns Success when we have a cached +// allocation for the pod's group (so PreBind can record the jobid, and stamp the +// backend for a quantum pod), and Skip otherwise. func (f *Fluence) PreBindPreFlight( ctx context.Context, state fwk.CycleState, @@ -217,19 +231,20 @@ func (f *Fluence) PreBindPreFlight( nodeName string, ) (*fwk.PreBindPreFlightResult, *fwk.Status) { f.mu.Lock() - backend := f.placement[groupKey(pod)].Backend + _, ok := f.placement[groupKey(pod)] f.mu.Unlock() - if backend == "" { + if !ok { return nil, fwk.NewStatus(fwk.Skip) } return nil, fwk.NewStatus(fwk.Success) } -// PreBind writes the backend Fluxion allocated for this pod's group onto the pod -// as the annotation placement.BackendAnnotation. The mutating webhook has -// already wired a downward-API env (QRMI_BACKEND) that reads this annotation, so -// the container sees the backend as an ordinary env var. Container env cannot be -// patched after creation, which is why the value travels via an annotation. +// PreBind records, in the commit phase, the durable state for this group: +// - the Fluxion jobid onto the owning object (the PodGroup for a gang, else the +// pod) so the allocation can be cancelled when that object is deleted; +// - for a quantum group, the allocated backend onto the pod, which the webhook- +// injected downward-API env surfaces as QRMI_BACKEND (container env is +// immutable post-creation, so the value must travel via an annotation). func (f *Fluence) PreBind( ctx context.Context, state fwk.CycleState, @@ -237,21 +252,128 @@ func (f *Fluence) PreBind( nodeName string, ) *fwk.Status { f.mu.Lock() - backend := f.placement[groupKey(pod)].Backend + alloc, ok := f.placement[groupKey(pod)] f.mu.Unlock() - if backend == "" { - return fwk.NewStatus(fwk.Success) // nothing to do; PreBindPreFlight skips these + if !ok { + return fwk.NewStatus(fwk.Success) // not ours; nothing to record } - patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.BackendAnnotation, backend) - _, err := f.handle.ClientSet().CoreV1().Pods(pod.Namespace).Patch( - ctx, pod.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) - if err != nil { - return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + if err := f.recordJobID(ctx, pod, alloc.jobid); err != nil { + return fwk.AsStatus(fmt.Errorf("record jobid: %w", err)) + } + if alloc.place.Backend != "" { + if err := f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.BackendAnnotation, alloc.place.Backend); err != nil { + return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + } } return fwk.NewStatus(fwk.Success) } +// recordJobID writes the jobid annotation onto the allocation's owning object: a +// grouped pod's allocation belongs to the PodGroup; an ungrouped pod owns its own. +func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64) error { + val := strconv.FormatUint(jobid, 10) + if group := placement.PodGroupName(pod); group != "" { + patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.JobIDAnnotation, val) + _, err := f.handle.ClientSet().SchedulingV1alpha2().PodGroups(pod.Namespace).Patch( + ctx, group, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + return err + } + return f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.JobIDAnnotation, val) +} + +func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val string) error { + patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, key, val) + _, err := f.handle.ClientSet().CoreV1().Pods(ns).Patch( + ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + return err +} + +// registerCancelHandlers watches PodGroup and Pod deletions and frees the +// corresponding Fluxion allocation. Grouped pods are ignored by the pod handler +// (their allocation lives on the PodGroup); ungrouped pods are handled there. +// The framework has no deletion extension point, so this is informer-driven. +func (f *Fluence) registerCancelHandlers() { + sif := f.handle.SharedInformerFactory() + + _, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + pg, ok := obj.(*schedv1a2.PodGroup) + if !ok { + tomb, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok { + return + } + } + f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations) + }, + }) + + _, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + tomb, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + if pod, ok = tomb.Obj.(*corev1.Pod); !ok { + return + } + } + // Grouped pods' allocation is owned by the PodGroup; only the + // PodGroup's deletion frees it. Act on ungrouped pods only. + if placement.PodGroupName(pod) != "" { + return + } + f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations) + }, + }) +} + +// cancelGroup frees the allocation for a deleted owning object. The jobid comes +// from the object's annotation (the durable source of truth); if it is missing +// (e.g. deleted between PreFilter and PreBind, before the annotation was +// written) it falls back to the in-memory memo by key. Cancel is idempotent. +func (f *Fluence) cancelGroup(key string, ann map[string]string) { + jobid, ok := parseJobID(ann) + if !ok { + f.mu.Lock() + alloc, found := f.placement[key] + f.mu.Unlock() + if !found { + return // never scheduled by us, or already cancelled + } + jobid = alloc.jobid + } + + f.matcherMu.Lock() + err := f.matcher.Cancel(jobid) + f.matcherMu.Unlock() + if err != nil { + log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err) + } + + f.mu.Lock() + delete(f.placement, key) + f.mu.Unlock() +} + +func parseJobID(ann map[string]string) (uint64, bool) { + raw := ann[placement.JobIDAnnotation] + if raw == "" { + return 0, false + } + jobid, err := strconv.ParseUint(raw, 10, 64) + if err != nil { + return 0, false + } + return jobid, true +} + // groupPods returns the pods belonging to the same native PodGroup as pod // (spec.schedulingGroup.podGroupName). That field is not label-selectable, so we // list the namespace and filter in code. A pod with no scheduling group is its diff --git a/pkg/graph/graph.go b/pkg/graph/graph.go index 478889f..29490dd 100644 --- a/pkg/graph/graph.go +++ b/pkg/graph/graph.go @@ -109,6 +109,14 @@ func (f *FluxionGraph) MatchAllocateSpec(spec string) (quantum.MatchAllocateRequ return request, nil } +// Cancel frees a previously match-allocated jobid in the resource graph. It is +// idempotent: cancelling an unknown jobid is not treated as an error (noent_ok), +// so a double-cancel (e.g. a redelivered informer event) is harmless. +func (f *FluxionGraph) Cancel(jobid uint64) error { + fmt.Printf(" 🌀 Cancel jobid: %d\n", jobid) + return f.cli.Cancel(int64(jobid), true) +} + // Satisfy determines if we can satisfy func (f *FluxionGraph) Satisfy(specFile string) (bool, error) { fmt.Printf(" 🌀 Request: %s\n", specFile) diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 2b65aad..46c4222 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -21,6 +21,12 @@ const ( // backend for a pod. The mutating webhook wires a downward-API env // (QRMI_BACKEND) that reads this annotation. BackendAnnotation = "fluence.flux-framework.org/backend" + + // JobIDAnnotation records the Fluxion allocation (jobid) for a scheduled + // group. It is written onto the owning object — the PodGroup for a gang, or + // the pod itself for an ungrouped pod — so the allocation can be cancelled + // when that object is deleted, and replayed on scheduler restart. + JobIDAnnotation = "fluence.flux-framework.org/jobid" ) // PodGroupName returns the native (Kubernetes 1.36) scheduling-group name a pod