diff --git a/cmd/eks-node-viewer/main.go b/cmd/eks-node-viewer/main.go index 6092683..0e47f93 100644 --- a/cmd/eks-node-viewer/main.go +++ b/cmd/eks-node-viewer/main.go @@ -66,6 +66,11 @@ func main() { if err != nil { log.Fatalf("creating node claim client, %s", err) } + resourceClaimClient, err := client.NewResourceClaims(flags.Kubeconfig, flags.Context) + if err != nil { + log.Printf("creating resource claim client (DRA not available): %s", err) + resourceClaimClient = nil + } ctx, cancel := context.WithCancel(context.Background()) pprov := aws.NewStaticPricingProvider() @@ -92,7 +97,7 @@ func main() { } pprov = aws.NewPricingProvider(ctx, cfg) } - controller := client.NewController(cs, nodeClaimClient, m, nodeSelector, pprov) + controller := client.NewController(cs, nodeClaimClient, resourceClaimClient, m, nodeSelector, pprov) controller.Start(ctx) diff --git a/pkg/client/client.go b/pkg/client/client.go index 888bc40..f7dc0fe 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -17,6 +17,7 @@ package client import ( "strings" + resourcev1 "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -60,6 +61,26 @@ func NewNodeClaims(kubeconfig, context string) (*rest.RESTClient, error) { return rest.RESTClientFor(&config) } +func NewResourceClaims(kubeconfig, context string) (*rest.RESTClient, error) { + c, err := getConfig(kubeconfig, context) + if err != nil { + return nil, err + } + + gv := schema.GroupVersion{Group: "resource.k8s.io", Version: "v1"} + scheme.Scheme.AddKnownTypes(gv, + &resourcev1.ResourceClaim{}, + &resourcev1.ResourceClaimList{}) + + config := *c + config.ContentConfig.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + config.UserAgent = rest.DefaultKubernetesUserAgent() + + return rest.RESTClientFor(&config) +} + func getConfig(kubeconfig, context string) (*rest.Config, error) { // use the current context in kubeconfig return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( diff --git a/pkg/client/controller.go b/pkg/client/controller.go index c6d6718..49cd11a 100644 --- a/pkg/client/controller.go +++ b/pkg/client/controller.go @@ -20,6 +20,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + resourcev1 "k8s.io/api/resource/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -33,20 +34,22 @@ import ( ) type Controller struct { - kubeClient *kubernetes.Clientset - uiModel *model.UIModel - pricing pricing.Provider - nodeSelector labels.Selector - nodeClaimClient *rest.RESTClient + kubeClient *kubernetes.Clientset + uiModel *model.UIModel + pricing pricing.Provider + nodeSelector labels.Selector + nodeClaimClient *rest.RESTClient + resourceClaimClient *rest.RESTClient } -func NewController(kubeClient *kubernetes.Clientset, nodeClaimClient *rest.RESTClient, uiModel *model.UIModel, nodeSelector labels.Selector, pricing pricing.Provider) *Controller { +func NewController(kubeClient *kubernetes.Clientset, nodeClaimClient *rest.RESTClient, resourceClaimClient *rest.RESTClient, uiModel *model.UIModel, nodeSelector labels.Selector, pricing pricing.Provider) *Controller { c := &Controller{ - kubeClient: kubeClient, - uiModel: uiModel, - pricing: pricing, - nodeSelector: nodeSelector, - nodeClaimClient: nodeClaimClient, + kubeClient: kubeClient, + uiModel: uiModel, + pricing: pricing, + nodeSelector: nodeSelector, + nodeClaimClient: nodeClaimClient, + resourceClaimClient: resourceClaimClient, } pricing.OnUpdate(c.RefreshNodePrices) return c @@ -62,6 +65,13 @@ func (m Controller) Start(ctx context.Context) { if err := m.nodeClaimClient.Get().Do(ctx).Error(); err == nil { m.startNodeClaimWatch(ctx, cluster) } + + // If ResourceClaims API is available, watch for DRA allocations + if m.resourceClaimClient != nil { + if err := m.resourceClaimClient.Get().Do(ctx).Error(); err == nil { + m.startResourceClaimWatch(ctx, cluster) + } + } } func (m Controller) startNodeClaimWatch(ctx context.Context, cluster *model.Cluster) { @@ -191,6 +201,73 @@ func (m Controller) startPodWatch(ctx context.Context, cluster *model.Cluster) { go podController.Run(ctx.Done()) } +func (m Controller) startResourceClaimWatch(ctx context.Context, cluster *model.Cluster) { + resourceClaimWatchList := cache.NewListWatchFromClient(m.resourceClaimClient, "resourceclaims", + v1.NamespaceAll, fields.Everything()) + _, resourceClaimController := cache.NewInformer( + resourceClaimWatchList, + &resourcev1.ResourceClaim{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.handleResourceClaim(cluster, obj.(*resourcev1.ResourceClaim)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.handleResourceClaim(cluster, newObj.(*resourcev1.ResourceClaim)) + }, + DeleteFunc: func(obj interface{}) { + claim := ignoreDeletedFinalStateUnknown(obj).(*resourcev1.ResourceClaim) + m.handleResourceClaimDelete(cluster, claim) + }, + }, + ) + go resourceClaimController.Run(ctx.Done()) +} + +// draDriverToResource maps DRA driver names to allocatable resource names. +var draDriverToResource = map[string]string{ + "neuron.aws.com": "aws.amazon.com/neuron", +} + +// handleResourceClaim processes an allocated ResourceClaim and updates node DRA usage. +func (m Controller) handleResourceClaim(cluster *model.Cluster, claim *resourcev1.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + + // Group allocated devices by node (pool name) and resource name + nodeDeviceCounts := map[string]map[string]int64{} // nodeName -> resourceName -> count + for _, result := range claim.Status.Allocation.Devices.Results { + nodeName := result.Pool + if nodeName == "" { + continue + } + resourceName := result.Driver + if mapped, ok := draDriverToResource[result.Driver]; ok { + resourceName = mapped + } + if nodeDeviceCounts[nodeName] == nil { + nodeDeviceCounts[nodeName] = map[string]int64{} + } + nodeDeviceCounts[nodeName][resourceName]++ + } + + claimUID := string(claim.UID) + for nodeName, resourceCounts := range nodeDeviceCounts { + if node, ok := cluster.GetNodeByName(nodeName); ok { + node.AddDRAClaim(claimUID, resourceCounts) + } + } +} + +// handleResourceClaimDelete removes DRA device allocations when a claim is deleted. +func (m Controller) handleResourceClaimDelete(cluster *model.Cluster, claim *resourcev1.ResourceClaim) { + claimUID := string(claim.UID) + cluster.ForEachNode(func(n *model.Node) { + n.DeleteDRAClaim(claimUID) + }) +} + func (m Controller) updatePrice(node *model.Node) { // If the node has the instance-price override label, don't look up pricing // and use the value here. diff --git a/pkg/model/node.go b/pkg/model/node.go index cc89734..191c9cd 100644 --- a/pkg/model/node.go +++ b/pkg/model/node.go @@ -22,6 +22,7 @@ import ( ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" v1 "k8s.io/api/core/v1" + k8sresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/duration" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -41,15 +42,19 @@ type Node struct { node v1.Node pods map[objectKey]*Pod used v1.ResourceList + draUsed map[string]int64 // resource name -> count of devices allocated via DRA + draClaims map[string]map[string]int64 // claimUID -> resource name -> device count Price float64 nodeclaimCreationTime time.Time } func NewNode(n *v1.Node) *Node { node := &Node{ - node: *n, - pods: map[objectKey]*Pod{}, - used: v1.ResourceList{}, + node: *n, + pods: map[objectKey]*Pod{}, + used: v1.ResourceList{}, + draUsed: map[string]int64{}, + draClaims: map[string]map[string]int64{}, } return node @@ -181,6 +186,14 @@ func (n *Node) Used() v1.ResourceList { for rn, q := range n.used { used[rn] = q.DeepCopy() } + // Merge DRA device allocations into the used resource list + for res, count := range n.draUsed { + if count > 0 { + existing := used[v1.ResourceName(res)] + existing.Add(k8sresource.MustParse(fmt.Sprintf("%d", count))) + used[v1.ResourceName(res)] = existing + } + } return used } @@ -289,6 +302,45 @@ func (n *Node) HasPrice() bool { return n.Price == n.Price } +// AddDRAClaim records DRA device allocations for a claim on this node. +func (n *Node) AddDRAClaim(claimUID string, resourceCounts map[string]int64) { + n.mu.Lock() + defer n.mu.Unlock() + // Remove old counts if this claim was already tracked + if old, ok := n.draClaims[claimUID]; ok { + for res, count := range old { + n.draUsed[res] -= count + } + } + n.draClaims[claimUID] = resourceCounts + for res, count := range resourceCounts { + n.draUsed[res] += count + } +} + +// DeleteDRAClaim removes DRA device allocations for a claim from this node. +func (n *Node) DeleteDRAClaim(claimUID string) { + n.mu.Lock() + defer n.mu.Unlock() + if old, ok := n.draClaims[claimUID]; ok { + for res, count := range old { + n.draUsed[res] -= count + } + delete(n.draClaims, claimUID) + } +} + +// DRAUsed returns the DRA device allocation counts. +func (n *Node) DRAUsed() map[string]int64 { + n.mu.RLock() + defer n.mu.RUnlock() + result := make(map[string]int64, len(n.draUsed)) + for k, v := range n.draUsed { + result[k] = v + } + return result +} + var resourceLabelRe = regexp.MustCompile("eks-node-viewer/node-(.*?)-usage") // ComputeLabel computes dynamic labels