From 22e690403a22f436b36cb544cb803472fd19ba1f Mon Sep 17 00:00:00 2001 From: Yahav Date: Thu, 4 Jun 2026 09:35:41 -0700 Subject: [PATCH] Add DRA ResourceClaim tracking for neuron device allocation Watch ResourceClaim objects to track DRA (Dynamic Resource Allocation) device usage per node. This enables visualization of devices managed by DRA drivers (e.g., AWS Neuron) that don't appear in traditional pod resource requests. Maps the neuron.aws.com DRA driver to the aws.amazon.com/neuron allocatable resource so the progress bar shows correct utilization. Usage: eks-node-viewer --resources cpu,aws.amazon.com/neuron --- cmd/eks-node-viewer/main.go | 7 ++- pkg/client/client.go | 21 ++++++++ pkg/client/controller.go | 99 ++++++++++++++++++++++++++++++++----- pkg/model/node.go | 58 ++++++++++++++++++++-- 4 files changed, 170 insertions(+), 15 deletions(-) diff --git a/cmd/eks-node-viewer/main.go b/cmd/eks-node-viewer/main.go index 6092683..0e47f93 100644 --- a/cmd/eks-node-viewer/main.go +++ b/cmd/eks-node-viewer/main.go @@ -66,6 +66,11 @@ func main() { if err != nil { log.Fatalf("creating node claim client, %s", err) } + resourceClaimClient, err := client.NewResourceClaims(flags.Kubeconfig, flags.Context) + if err != nil { + log.Printf("creating resource claim client (DRA not available): %s", err) + resourceClaimClient = nil + } ctx, cancel := context.WithCancel(context.Background()) pprov := aws.NewStaticPricingProvider() @@ -92,7 +97,7 @@ func main() { } pprov = aws.NewPricingProvider(ctx, cfg) } - controller := client.NewController(cs, nodeClaimClient, m, nodeSelector, pprov) + controller := client.NewController(cs, nodeClaimClient, resourceClaimClient, m, nodeSelector, pprov) controller.Start(ctx) diff --git a/pkg/client/client.go b/pkg/client/client.go index 888bc40..f7dc0fe 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -17,6 +17,7 @@ package client import ( "strings" + resourcev1 "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -60,6 +61,26 @@ func NewNodeClaims(kubeconfig, context string) (*rest.RESTClient, error) { return rest.RESTClientFor(&config) } +func NewResourceClaims(kubeconfig, context string) (*rest.RESTClient, error) { + c, err := getConfig(kubeconfig, context) + if err != nil { + return nil, err + } + + gv := schema.GroupVersion{Group: "resource.k8s.io", Version: "v1"} + scheme.Scheme.AddKnownTypes(gv, + &resourcev1.ResourceClaim{}, + &resourcev1.ResourceClaimList{}) + + config := *c + config.ContentConfig.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + config.UserAgent = rest.DefaultKubernetesUserAgent() + + return rest.RESTClientFor(&config) +} + func getConfig(kubeconfig, context string) (*rest.Config, error) { // use the current context in kubeconfig return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( diff --git a/pkg/client/controller.go b/pkg/client/controller.go index c6d6718..49cd11a 100644 --- a/pkg/client/controller.go +++ b/pkg/client/controller.go @@ -20,6 +20,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + resourcev1 "k8s.io/api/resource/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -33,20 +34,22 @@ import ( ) type Controller struct { - kubeClient *kubernetes.Clientset - uiModel *model.UIModel - pricing pricing.Provider - nodeSelector labels.Selector - nodeClaimClient *rest.RESTClient + kubeClient *kubernetes.Clientset + uiModel *model.UIModel + pricing pricing.Provider + nodeSelector labels.Selector + nodeClaimClient *rest.RESTClient + resourceClaimClient *rest.RESTClient } -func NewController(kubeClient *kubernetes.Clientset, nodeClaimClient *rest.RESTClient, uiModel *model.UIModel, nodeSelector labels.Selector, pricing pricing.Provider) *Controller { +func NewController(kubeClient *kubernetes.Clientset, nodeClaimClient *rest.RESTClient, resourceClaimClient *rest.RESTClient, uiModel *model.UIModel, nodeSelector labels.Selector, pricing pricing.Provider) *Controller { c := &Controller{ - kubeClient: kubeClient, - uiModel: uiModel, - pricing: pricing, - nodeSelector: nodeSelector, - nodeClaimClient: nodeClaimClient, + kubeClient: kubeClient, + uiModel: uiModel, + pricing: pricing, + nodeSelector: nodeSelector, + nodeClaimClient: nodeClaimClient, + resourceClaimClient: resourceClaimClient, } pricing.OnUpdate(c.RefreshNodePrices) return c @@ -62,6 +65,13 @@ func (m Controller) Start(ctx context.Context) { if err := m.nodeClaimClient.Get().Do(ctx).Error(); err == nil { m.startNodeClaimWatch(ctx, cluster) } + + // If ResourceClaims API is available, watch for DRA allocations + if m.resourceClaimClient != nil { + if err := m.resourceClaimClient.Get().Do(ctx).Error(); err == nil { + m.startResourceClaimWatch(ctx, cluster) + } + } } func (m Controller) startNodeClaimWatch(ctx context.Context, cluster *model.Cluster) { @@ -191,6 +201,73 @@ func (m Controller) startPodWatch(ctx context.Context, cluster *model.Cluster) { go podController.Run(ctx.Done()) } +func (m Controller) startResourceClaimWatch(ctx context.Context, cluster *model.Cluster) { + resourceClaimWatchList := cache.NewListWatchFromClient(m.resourceClaimClient, "resourceclaims", + v1.NamespaceAll, fields.Everything()) + _, resourceClaimController := cache.NewInformer( + resourceClaimWatchList, + &resourcev1.ResourceClaim{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.handleResourceClaim(cluster, obj.(*resourcev1.ResourceClaim)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.handleResourceClaim(cluster, newObj.(*resourcev1.ResourceClaim)) + }, + DeleteFunc: func(obj interface{}) { + claim := ignoreDeletedFinalStateUnknown(obj).(*resourcev1.ResourceClaim) + m.handleResourceClaimDelete(cluster, claim) + }, + }, + ) + go resourceClaimController.Run(ctx.Done()) +} + +// draDriverToResource maps DRA driver names to allocatable resource names. +var draDriverToResource = map[string]string{ + "neuron.aws.com": "aws.amazon.com/neuron", +} + +// handleResourceClaim processes an allocated ResourceClaim and updates node DRA usage. +func (m Controller) handleResourceClaim(cluster *model.Cluster, claim *resourcev1.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + + // Group allocated devices by node (pool name) and resource name + nodeDeviceCounts := map[string]map[string]int64{} // nodeName -> resourceName -> count + for _, result := range claim.Status.Allocation.Devices.Results { + nodeName := result.Pool + if nodeName == "" { + continue + } + resourceName := result.Driver + if mapped, ok := draDriverToResource[result.Driver]; ok { + resourceName = mapped + } + if nodeDeviceCounts[nodeName] == nil { + nodeDeviceCounts[nodeName] = map[string]int64{} + } + nodeDeviceCounts[nodeName][resourceName]++ + } + + claimUID := string(claim.UID) + for nodeName, resourceCounts := range nodeDeviceCounts { + if node, ok := cluster.GetNodeByName(nodeName); ok { + node.AddDRAClaim(claimUID, resourceCounts) + } + } +} + +// handleResourceClaimDelete removes DRA device allocations when a claim is deleted. +func (m Controller) handleResourceClaimDelete(cluster *model.Cluster, claim *resourcev1.ResourceClaim) { + claimUID := string(claim.UID) + cluster.ForEachNode(func(n *model.Node) { + n.DeleteDRAClaim(claimUID) + }) +} + func (m Controller) updatePrice(node *model.Node) { // If the node has the instance-price override label, don't look up pricing // and use the value here. diff --git a/pkg/model/node.go b/pkg/model/node.go index cc89734..191c9cd 100644 --- a/pkg/model/node.go +++ b/pkg/model/node.go @@ -22,6 +22,7 @@ import ( ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" v1 "k8s.io/api/core/v1" + k8sresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/duration" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -41,15 +42,19 @@ type Node struct { node v1.Node pods map[objectKey]*Pod used v1.ResourceList + draUsed map[string]int64 // resource name -> count of devices allocated via DRA + draClaims map[string]map[string]int64 // claimUID -> resource name -> device count Price float64 nodeclaimCreationTime time.Time } func NewNode(n *v1.Node) *Node { node := &Node{ - node: *n, - pods: map[objectKey]*Pod{}, - used: v1.ResourceList{}, + node: *n, + pods: map[objectKey]*Pod{}, + used: v1.ResourceList{}, + draUsed: map[string]int64{}, + draClaims: map[string]map[string]int64{}, } return node @@ -181,6 +186,14 @@ func (n *Node) Used() v1.ResourceList { for rn, q := range n.used { used[rn] = q.DeepCopy() } + // Merge DRA device allocations into the used resource list + for res, count := range n.draUsed { + if count > 0 { + existing := used[v1.ResourceName(res)] + existing.Add(k8sresource.MustParse(fmt.Sprintf("%d", count))) + used[v1.ResourceName(res)] = existing + } + } return used } @@ -289,6 +302,45 @@ func (n *Node) HasPrice() bool { return n.Price == n.Price } +// AddDRAClaim records DRA device allocations for a claim on this node. +func (n *Node) AddDRAClaim(claimUID string, resourceCounts map[string]int64) { + n.mu.Lock() + defer n.mu.Unlock() + // Remove old counts if this claim was already tracked + if old, ok := n.draClaims[claimUID]; ok { + for res, count := range old { + n.draUsed[res] -= count + } + } + n.draClaims[claimUID] = resourceCounts + for res, count := range resourceCounts { + n.draUsed[res] += count + } +} + +// DeleteDRAClaim removes DRA device allocations for a claim from this node. +func (n *Node) DeleteDRAClaim(claimUID string) { + n.mu.Lock() + defer n.mu.Unlock() + if old, ok := n.draClaims[claimUID]; ok { + for res, count := range old { + n.draUsed[res] -= count + } + delete(n.draClaims, claimUID) + } +} + +// DRAUsed returns the DRA device allocation counts. +func (n *Node) DRAUsed() map[string]int64 { + n.mu.RLock() + defer n.mu.RUnlock() + result := make(map[string]int64, len(n.draUsed)) + for k, v := range n.draUsed { + result[k] = v + } + return result +} + var resourceLabelRe = regexp.MustCompile("eks-node-viewer/node-(.*?)-usage") // ComputeLabel computes dynamic labels