diff --git a/cmd/compute-domain-controller/cliqueconfigmap.go b/cmd/compute-domain-controller/cliqueconfigmap.go new file mode 100644 index 000000000..7e648e491 --- /dev/null +++ b/cmd/compute-domain-controller/cliqueconfigmap.go @@ -0,0 +1,396 @@ +/* + * Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" +) + +const ( + // cliqueConfigMapPrefix is the prefix for ConfigMaps that store clique node-to-index mappings. + cliqueConfigMapPrefix = "clique-" +) + +// CliqueConfigMapManager watches kubelet plugin pods and maintains ConfigMaps +// with node-name to index mappings for each clique. +type CliqueConfigMapManager struct { + config *ManagerConfig + workQueue *workqueue.WorkQueue + waitGroup sync.WaitGroup + cancelContext context.CancelFunc + + factory informers.SharedInformerFactory + informer cache.SharedIndexInformer +} + +// NewCliqueConfigMapManager creates a new CliqueConfigMapManager that watches +// kubelet plugin pods with the computeDomainCliqueLabelKey label. +func NewCliqueConfigMapManager(config *ManagerConfig) *CliqueConfigMapManager { + labelSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: computeDomainCliqueLabelKey, + Operator: metav1.LabelSelectorOpExists, + }, + }, + } + + factory := informers.NewSharedInformerFactoryWithOptions( + config.clientsets.Core, + informerResyncPeriod, + informers.WithNamespace(config.driverNamespace), + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.LabelSelector = metav1.FormatLabelSelector(labelSelector) + }), + ) + + informer := factory.Core().V1().Pods().Informer() + workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) + + m := &CliqueConfigMapManager{ + config: config, + workQueue: workQueue, + factory: factory, + informer: informer, + } + + return m +} + +// Start begins watching kubelet plugin pods and managing clique ConfigMaps. +func (m *CliqueConfigMapManager) Start(ctx context.Context) (rerr error) { + ctx, cancel := context.WithCancel(ctx) + m.cancelContext = cancel + + defer func() { + if rerr != nil { + if err := m.Stop(); err != nil { + klog.Errorf("error stopping CliqueConfigMap manager: %v", err) + } + } + }() + + _, err := m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.workQueue.Enqueue(obj, m.onPodAddOrUpdate) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.workQueue.Enqueue(newObj, m.onPodAddOrUpdate) + }, + }) + if err != nil { + return fmt.Errorf("error adding event handlers for pod informer: %w", err) + } + + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + m.factory.Start(ctx.Done()) + }() + + if !cache.WaitForCacheSync(ctx.Done(), m.informer.HasSynced) { + return fmt.Errorf("error syncing pod informer cache") + } + + // Start periodic cleanup goroutine + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + m.periodicCleanup(ctx) + }() + + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + m.workQueue.Run(ctx) + }() + + return nil +} + +// Stop stops the CliqueConfigMapManager. +func (m *CliqueConfigMapManager) Stop() error { + if m.cancelContext != nil { + m.cancelContext() + } + m.waitGroup.Wait() + return nil +} + +// onPodAddOrUpdate handles pod add/update events by ensuring the node is in the +// clique's ConfigMap with a stable index. +func (m *CliqueConfigMapManager) onPodAddOrUpdate(ctx context.Context, obj any) error { + pod, ok := obj.(*corev1.Pod) + if !ok { + return fmt.Errorf("failed to cast to Pod") + } + + nodeName := pod.Spec.NodeName + if nodeName == "" { + return nil + } + + cliqueID := pod.Labels[computeDomainCliqueLabelKey] + if cliqueID == "" { + return nil + } + + return m.ensureNodeInConfigMap(ctx, cliqueID, nodeName) +} + +// ensureNodeInConfigMap ensures a node is present in the clique's ConfigMap. +// If the node is already present, its index is preserved. +// If the node is new, it gets the lowest available index. +func (m *CliqueConfigMapManager) ensureNodeInConfigMap(ctx context.Context, cliqueID, nodeName string) error { + cmName := cliqueConfigMapPrefix + cliqueID + + cm, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).Get(ctx, cmName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return m.createConfigMap(ctx, cmName, nodeName) + } + if err != nil { + return fmt.Errorf("get ConfigMap: %w", err) + } + + if _, exists := cm.Data[nodeName]; exists { + return nil + } + + return m.addNodeToConfigMap(ctx, cm, nodeName) +} + +// createConfigMap creates a new ConfigMap with the given node at index 0. +func (m *CliqueConfigMapManager) createConfigMap(ctx context.Context, cmName, nodeName string) error { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: m.config.driverNamespace, + }, + Data: map[string]string{ + nodeName: "0", + }, + } + + if _, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create ConfigMap: %w", err) + } + + klog.Infof("Created ConfigMap %s", cmName) + return nil +} + +// addNodeToConfigMap adds a node to an existing ConfigMap with the next available index. +func (m *CliqueConfigMapManager) addNodeToConfigMap(ctx context.Context, cm *corev1.ConfigMap, nodeName string) error { + nextIndex, err := m.getNextAvailableIndex(cm.Data) + if err != nil { + return fmt.Errorf("get next available index: %w", err) + } + + newCM := cm.DeepCopy() + if newCM.Data == nil { + newCM.Data = make(map[string]string) + } + newCM.Data[nodeName] = strconv.Itoa(nextIndex) + + if _, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).Update(ctx, newCM, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update ConfigMap: %w", err) + } + + klog.Infof("Added node %s to ConfigMap %s at index %d", nodeName, cm.Name, nextIndex) + return nil +} + +// getNextAvailableIndex finds the lowest available index not currently assigned. +// Returns an error if no index is available within maxNodesPerIMEXDomain. +func (m *CliqueConfigMapManager) getNextAvailableIndex(data map[string]string) (int, error) { + usedIndices := make(map[int]bool) + + for _, indexStr := range data { + if index, err := strconv.Atoi(indexStr); err == nil { + usedIndices[index] = true + } + } + + // Find the next available index, starting from 0 and filling gaps + nextIndex := 0 + for usedIndices[nextIndex] { + nextIndex++ + } + + if nextIndex >= m.config.maxNodesPerIMEXDomain { + return -1, fmt.Errorf("no available index within maxNodesPerIMEXDomain (%d)", m.config.maxNodesPerIMEXDomain) + } + + return nextIndex, nil +} + +// periodicCleanup periodically removes stale entries from clique ConfigMaps. +func (m *CliqueConfigMapManager) periodicCleanup(ctx context.Context) { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + // Run cleanup immediately on startup + m.cleanupStaleEntries(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.cleanupStaleEntries(ctx) + } + } +} + +// cleanupStaleEntries removes entries for nodes that no longer exist in the cluster +// or have a kubelet plugin pod with a different clique label, +// and deletes ConfigMaps that become empty. +func (m *CliqueConfigMapManager) cleanupStaleEntries(ctx context.Context) { + klog.V(6).Infof("Cleanup: checking for stale clique ConfigMap entries") + + // Get the set of nodes that exist in the cluster + nodes, err := m.getNodes(ctx) + if err != nil { + klog.Errorf("error getting nodes for cleanup: %v", err) + return + } + + // List all ConfigMaps in the driver namespace + cmList, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + klog.Errorf("error listing ConfigMaps for cleanup: %v", err) + return + } + + for _, cm := range cmList.Items { + // Only consider ConfigMaps with the clique prefix + if !strings.HasPrefix(cm.Name, cliqueConfigMapPrefix) { + continue + } + + m.cleanupConfigMap(ctx, &cm, nodes) + } +} + +// cleanupConfigMap removes entries for nodes that no longer exist or have a kubelet +// plugin pod with a different clique label value. +func (m *CliqueConfigMapManager) cleanupConfigMap(ctx context.Context, cm *corev1.ConfigMap, nodes sets.Set[string]) { + cliqueID := strings.TrimPrefix(cm.Name, cliqueConfigMapPrefix) + cliqueLabels := m.getCliqueLabels() + + var nodesToRemove []string + for nodeName := range cm.Data { + if m.shouldRemoveNode(nodeName, cliqueID, nodes, cliqueLabels) { + nodesToRemove = append(nodesToRemove, nodeName) + } + } + + if len(nodesToRemove) == 0 { + return + } + + newCM := cm.DeepCopy() + for _, nodeName := range nodesToRemove { + klog.Infof("Cleanup: removing node %s from ConfigMap %s", nodeName, cm.Name) + delete(newCM.Data, nodeName) + } + + // If ConfigMap is now empty, delete it + if len(newCM.Data) == 0 { + klog.Infof("Cleanup: deleting empty ConfigMap %s", cm.Name) + if err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).Delete(ctx, cm.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("error deleting empty ConfigMap %s: %v", cm.Name, err) + } + return + } + + // Update the ConfigMap with removed entries + if _, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.driverNamespace).Update(ctx, newCM, metav1.UpdateOptions{}); err != nil { + klog.Errorf("error updating ConfigMap %s after cleanup: %v", cm.Name, err) + } +} + +// shouldRemoveNode returns true if a node entry should be removed from a clique ConfigMap. +// A node should be removed if: +// - The node no longer exists in the cluster. +// - The node has a kubelet plugin pod with a different clique label (no label is OK). +func (m *CliqueConfigMapManager) shouldRemoveNode(nodeName, cliqueID string, nodes sets.Set[string], cliqueLabels map[string]string) bool { + if !nodes.Has(nodeName) { + return true + } + if _, exists := cliqueLabels[nodeName]; !exists { + return false + } + if cliqueLabels[nodeName] != cliqueID { + return true + } + return false +} + +// getNodes returns a set of node names that exist in the cluster. +func (m *CliqueConfigMapManager) getNodes(ctx context.Context) (sets.Set[string], error) { + nodeList, err := m.config.clientsets.Core.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("list nodes: %w", err) + } + + nodes := sets.New[string]() + for _, node := range nodeList.Items { + nodes.Insert(node.Name) + } + + return nodes, nil +} + +// getCliqueLabels returns a mapping of node name to cliqueID for nodes that have +// kubelet plugin pods with the clique label, using the informer cache. +func (m *CliqueConfigMapManager) getCliqueLabels() map[string]string { + cliqueLabels := make(map[string]string) + + for _, obj := range m.informer.GetStore().List() { + pod, ok := obj.(*corev1.Pod) + if !ok { + continue + } + cliqueID := pod.Labels[computeDomainCliqueLabelKey] + if cliqueID == "" { + continue + } + nodeName := pod.Spec.NodeName + if nodeName == "" { + continue + } + cliqueLabels[nodeName] = cliqueID + } + + return cliqueLabels +} diff --git a/cmd/compute-domain-controller/computedomain.go b/cmd/compute-domain-controller/computedomain.go index 4c6245fb5..43ba0d4ab 100644 --- a/cmd/compute-domain-controller/computedomain.go +++ b/cmd/compute-domain-controller/computedomain.go @@ -28,6 +28,7 @@ import ( nvapi "github.com/NVIDIA/k8s-dra-driver-gpu/api/nvidia.com/resource/v1beta1" nvinformers "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/nvidia.com/informers/externalversions" + "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) type GetComputeDomainFunc func(uid string) (*nvapi.ComputeDomain, error) @@ -43,8 +44,9 @@ const ( // not so long that stale entries cause issues. mutationCacheTTL = time.Hour - computeDomainLabelKey = "resource.nvidia.com/computeDomain" - computeDomainFinalizer = computeDomainLabelKey + computeDomainLabelKey = "resource.nvidia.com/computeDomain" + computeDomainCliqueLabelKey = "resource.nvidia.com/computeDomain.cliqueID" + computeDomainFinalizer = computeDomainLabelKey computeDomainDefaultChannelDeviceClass = "compute-domain-default-channel.nvidia.com" computeDomainChannelDeviceClass = "compute-domain-channel.nvidia.com" @@ -57,6 +59,7 @@ const ( type ComputeDomainManager struct { config *ManagerConfig + workQueue *workqueue.WorkQueue waitGroup sync.WaitGroup cancelContext context.CancelFunc @@ -73,14 +76,16 @@ type ComputeDomainManager struct { func NewComputeDomainManager(config *ManagerConfig) *ComputeDomainManager { factory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, informerResyncPeriod) informer := factory.Resource().V1beta1().ComputeDomains().Informer() + workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) m := &ComputeDomainManager{ - config: config, - factory: factory, - informer: informer, + config: config, + workQueue: workQueue, + factory: factory, + informer: informer, } - m.daemonSetManager = NewMultiNamespaceDaemonSetManager(config, m.Get, m.UpdateStatus) + m.daemonSetManager = NewMultiNamespaceDaemonSetManager(config, workQueue, m.Get, m.UpdateStatus) m.resourceClaimTemplateManager = NewWorkloadResourceClaimTemplateManager(config, m.Get) m.nodeManager = NewNodeManager(config, m.Get) @@ -119,10 +124,10 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) { _, err = m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { - m.config.workQueue.Enqueue(obj, m.onAddOrUpdate) + m.workQueue.Enqueue(obj, m.onAddOrUpdate) }, UpdateFunc: func(oldObj, newObj any) { - m.config.workQueue.Enqueue(newObj, m.onAddOrUpdate) + m.workQueue.Enqueue(newObj, m.onAddOrUpdate) }, }) if err != nil { @@ -151,6 +156,12 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) { return fmt.Errorf("error starting Node manager: %w", err) } + m.waitGroup.Add(1) + go func() { + defer m.waitGroup.Done() + m.workQueue.Run(ctx) + }() + return nil } diff --git a/cmd/compute-domain-controller/controller.go b/cmd/compute-domain-controller/controller.go index e1e8a57eb..9b29a2ae9 100644 --- a/cmd/compute-domain-controller/controller.go +++ b/cmd/compute-domain-controller/controller.go @@ -24,7 +24,6 @@ import ( "k8s.io/klog/v2" "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/flags" - "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) // ManagerConfig defines the common configuration options shared across all managers. @@ -46,9 +45,6 @@ type ManagerConfig struct { // clientsets provides access to various Kubernetes API client interfaces clientsets flags.ClientSets - // workQueue manages the asynchronous processing of tasks - workQueue *workqueue.WorkQueue - // additionalNamespaces is a list of additional namespaces // where the driver can manage resources additionalNamespaces []string @@ -70,11 +66,8 @@ func NewController(config *Config) *Controller { } // Run starts the controller's main loop and manages the lifecycle of its components. -// It initializes the work queue, starts the ComputeDomain manager, and handles -// graceful shutdown when the context is cancelled. +// It starts the managers and handles graceful shutdown when the context is cancelled. func (c *Controller) Run(ctx context.Context) error { - workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter()) - managerConfig := &ManagerConfig{ driverName: c.config.driverName, driverNamespace: c.config.flags.namespace, @@ -82,7 +75,6 @@ func (c *Controller) Run(ctx context.Context) error { imageName: c.config.flags.imageName, maxNodesPerIMEXDomain: c.config.flags.maxNodesPerIMEXDomain, clientsets: c.config.clientsets, - workQueue: workQueue, logVerbosityCDDaemon: c.config.flags.logVerbosityCDDaemon, } @@ -90,12 +82,22 @@ func (c *Controller) Run(ctx context.Context) error { klog.Infof("controller manager config: %+v", managerConfig) cdManager := NewComputeDomainManager(managerConfig) + cliqueConfigMapManager := NewCliqueConfigMapManager(managerConfig) if err := cdManager.Start(ctx); err != nil { return fmt.Errorf("error starting ComputeDomain manager: %w", err) } - workQueue.Run(ctx) + if err := cliqueConfigMapManager.Start(ctx); err != nil { + return fmt.Errorf("error starting CliqueConfigMap manager: %w", err) + } + + // Wait for context cancellation + <-ctx.Done() + + if err := cliqueConfigMapManager.Stop(); err != nil { + return fmt.Errorf("error stopping CliqueConfigMap manager: %w", err) + } if err := cdManager.Stop(); err != nil { return fmt.Errorf("error stopping ComputeDomain manager: %w", err) diff --git a/cmd/compute-domain-controller/daemonset.go b/cmd/compute-domain-controller/daemonset.go index 942b4b64c..8e39a4714 100644 --- a/cmd/compute-domain-controller/daemonset.go +++ b/cmd/compute-domain-controller/daemonset.go @@ -36,6 +36,7 @@ import ( nvapi "github.com/NVIDIA/k8s-dra-driver-gpu/api/nvidia.com/resource/v1beta1" "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/featuregates" + "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) const ( @@ -72,7 +73,7 @@ type DaemonSetManager struct { cleanupManager *CleanupManager[*appsv1.DaemonSet] } -func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *DaemonSetManager { +func NewDaemonSetManager(config *ManagerConfig, workQueue *workqueue.WorkQueue, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *DaemonSetManager { labelSelector := &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { @@ -99,7 +100,7 @@ func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomai factory: factory, informer: informer, } - m.daemonsetPodManager = NewDaemonSetPodManager(config, getComputeDomain, updateComputeDomainStatus) + m.daemonsetPodManager = NewDaemonSetPodManager(config, workQueue, getComputeDomain, updateComputeDomainStatus) m.resourceClaimTemplateManager = NewDaemonSetResourceClaimTemplateManager(config, getComputeDomain) m.cleanupManager = NewCleanupManager[*appsv1.DaemonSet](informer, getComputeDomain, m.cleanup) @@ -130,18 +131,6 @@ func (m *DaemonSetManager) Start(ctx context.Context) (rerr error) { true, ) - _, err := m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - m.config.workQueue.Enqueue(obj, m.onAddOrUpdate) - }, - UpdateFunc: func(objOld, objNew any) { - m.config.workQueue.Enqueue(objNew, m.onAddOrUpdate) - }, - }) - if err != nil { - return fmt.Errorf("error adding event handlers for DaemonSet informer: %w", err) - } - m.waitGroup.Add(1) go func() { defer m.waitGroup.Done() @@ -359,35 +348,6 @@ func (m *DaemonSetManager) assertRemoved(ctx context.Context, cdUID string) erro return nil } -func (m *DaemonSetManager) onAddOrUpdate(ctx context.Context, obj any) error { - d, ok := obj.(*appsv1.DaemonSet) - if !ok { - return fmt.Errorf("failed to cast to DaemonSet") - } - - klog.V(2).Infof("Processing added or updated DaemonSet: %s/%s", d.Namespace, d.Name) - - cd, err := m.getComputeDomain(d.Labels[computeDomainLabelKey]) - if err != nil { - return fmt.Errorf("error getting ComputeDomain: %w", err) - } - if cd == nil { - return nil - } - - if int(d.Status.NumberReady) != cd.Spec.NumNodes { - return nil - } - - newCD := cd.DeepCopy() - newCD.Status.Status = nvapi.ComputeDomainStatusReady - if _, err = m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(newCD.Namespace).UpdateStatus(ctx, newCD, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("error updating nodes in ComputeDomain status: %w", err) - } - - return nil -} - func (m *DaemonSetManager) cleanup(ctx context.Context, cdUID string) error { if err := m.Delete(ctx, cdUID); err != nil { return fmt.Errorf("error deleting DaemonSet: %w", err) diff --git a/cmd/compute-domain-controller/daemonsetpods.go b/cmd/compute-domain-controller/daemonsetpods.go index 8d6da0bdd..a1ae4ebbb 100644 --- a/cmd/compute-domain-controller/daemonsetpods.go +++ b/cmd/compute-domain-controller/daemonsetpods.go @@ -29,10 +29,12 @@ import ( "k8s.io/klog/v2" nvapi "github.com/NVIDIA/k8s-dra-driver-gpu/api/nvidia.com/resource/v1beta1" + "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) type DaemonSetPodManager struct { config *ManagerConfig + workQueue *workqueue.WorkQueue waitGroup sync.WaitGroup cancelContext context.CancelFunc @@ -44,7 +46,7 @@ type DaemonSetPodManager struct { updateComputeDomainStatus UpdateComputeDomainStatusFunc } -func NewDaemonSetPodManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *DaemonSetPodManager { +func NewDaemonSetPodManager(config *ManagerConfig, workQueue *workqueue.WorkQueue, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *DaemonSetPodManager { labelSelector := &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { @@ -68,6 +70,7 @@ func NewDaemonSetPodManager(config *ManagerConfig, getComputeDomain GetComputeDo m := &DaemonSetPodManager{ config: config, + workQueue: workQueue, factory: factory, informer: informer, lister: lister, @@ -92,7 +95,7 @@ func (m *DaemonSetPodManager) Start(ctx context.Context) (rerr error) { _, err := m.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { - m.config.workQueue.Enqueue(obj, m.onPodDelete) + m.workQueue.Enqueue(obj, m.onPodDelete) }, }) if err != nil { diff --git a/cmd/compute-domain-controller/mnsdaemonset.go b/cmd/compute-domain-controller/mnsdaemonset.go index 1952ed0ab..3d1b141f3 100644 --- a/cmd/compute-domain-controller/mnsdaemonset.go +++ b/cmd/compute-domain-controller/mnsdaemonset.go @@ -23,6 +23,7 @@ import ( appsv1 "k8s.io/api/apps/v1" nvapi "github.com/NVIDIA/k8s-dra-driver-gpu/api/nvidia.com/resource/v1beta1" + "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/workqueue" ) // MultiNamespaceDaemonSetManager manages DaemonSets across multiple namespaces. @@ -33,7 +34,7 @@ type MultiNamespaceDaemonSetManager struct { // NewMultiNamespaceDaemonSetManager creates a new multi-namespace DaemonSet manager // It creates individual DaemonSet managers for the driver namespace and all additional namespaces. -func NewMultiNamespaceDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *MultiNamespaceDaemonSetManager { +func NewMultiNamespaceDaemonSetManager(config *ManagerConfig, workQueue *workqueue.WorkQueue, getComputeDomain GetComputeDomainFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *MultiNamespaceDaemonSetManager { m := &MultiNamespaceDaemonSetManager{ config: config, managers: make(map[string]*DaemonSetManager), @@ -51,7 +52,7 @@ func NewMultiNamespaceDaemonSetManager(config *ManagerConfig, getComputeDomain G configNew := *config configNew.driverNamespace = ns configNew.additionalNamespaces = nil - m.managers[ns] = NewDaemonSetManager(&configNew, getComputeDomain, updateComputeDomainStatus) + m.managers[ns] = NewDaemonSetManager(&configNew, workQueue, getComputeDomain, updateComputeDomainStatus) } return m diff --git a/cmd/compute-domain-daemon/computedomain.go b/cmd/compute-domain-daemon/computedomain.go index ec11d5403..61ebdcf54 100644 --- a/cmd/compute-domain-daemon/computedomain.go +++ b/cmd/compute-domain-daemon/computedomain.go @@ -237,7 +237,6 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error // reports the IP address of this current pod running the CD daemon. If mutation // is needed (first insertion, or IP address update) and successful, it reflects // the mutation in `m.mutationCache`. -// TODO: rename function? func (m *ComputeDomainManager) EnsureNodeInfoInCD(ctx context.Context, cd *nvapi.ComputeDomain) (*nvapi.ComputeDomain, error) { var myNode, myNodePrevious *nvapi.ComputeDomainNode @@ -255,17 +254,10 @@ func (m *ComputeDomainManager) EnsureNodeInfoInCD(ctx context.Context, cd *nvapi // Create new ComputeDomainNode object representing myself, and insert it into the nodes list. if myNode == nil { - // Get the next available index for this new node (local point of view, - // API server may tell us later that this index was chosen poorly). - nextIndex, err := getNextAvailableIndex(m.config.cliqueID, newCD.Status.Nodes, m.config.maxNodesPerIMEXDomain) - if err != nil { - return nil, fmt.Errorf("error getting next available index: %w", err) - } - myNode = &nvapi.ComputeDomainNode{ Name: m.config.nodeName, CliqueID: m.config.cliqueID, - Index: nextIndex, + Index: m.config.cliqueNodeIndex, // This is going to be switched to Ready by podmanager. Status: nvapi.ComputeDomainStatusNotReady, } @@ -278,30 +270,6 @@ func (m *ComputeDomainManager) EnsureNodeInfoInCD(ctx context.Context, cd *nvapi // across pod restarts. myNode.IPAddress = m.config.podIP - // Detect and handle DNS index collision where my self-chosen DNS index - // appears elsewhere (among the nodes with the same cliqueID). If - // `m.previousNodes` is empty, we haven't started the IMEX daemon yet and - // hence can change our previous choice safely. - if len(m.previousNodes) == 0 { - for _, other := range newCD.Status.Nodes { - // Skip items in different cliques, and also myself. - if other.CliqueID != m.config.cliqueID { - continue - } - if other.Name == m.config.nodeName { - continue - } - if other.Index == myNode.Index { - idx, err := getNextAvailableIndex(m.config.cliqueID, newCD.Status.Nodes, m.config.maxNodesPerIMEXDomain) - if err != nil { - return nil, fmt.Errorf("error getting next available index: %w", err) - } - myNode.Index = idx - klog.V(4).Infof("EnsureNodeInfoInCD: IMEX daemon not started yet, DNS index collision with %v, picked new index: %d", other, idx) - } - } - } - if myNodePrevious != nil && *myNodePrevious == *myNode { klog.V(6).Infof("EnsureNodeInfoInCD noop: no change (%v)", *myNode) return newCD, nil @@ -326,58 +294,6 @@ func (m *ComputeDomainManager) EnsureNodeInfoInCD(ctx context.Context, cd *nvapi return updatedCD, nil } -// The Index field in the Nodes section of the ComputeDomain status ensures a -// consistent IP-to-DNS name mapping across all machines within a given IMEX -// domain. Each node's index directly determines its DNS name using the format -// "compute-domain-daemon-{index}". -// -// getNextAvailableIndex finds the next available index for the current node by -// seeing which ones are already taken by other nodes in the ComputeDomain -// status that have the same cliqueID. It fills in gaps where it can, and returns -// an error if no index is available within maxNodesPerIMEXDomain. -// -// By filling gaps in the index sequence (rather than always appending), we -// maintain stable DNS names for existing nodes even when intermediate nodes -// are removed from the compute domain and new ones are added. -func getNextAvailableIndex(currentCliqueID string, nodes []*nvapi.ComputeDomainNode, maxNodesPerIMEXDomain int) (int, error) { - // Filter nodes to only consider those with the same cliqueID - var cliqueNodes []*nvapi.ComputeDomainNode - for _, node := range nodes { - if node.CliqueID == currentCliqueID { - cliqueNodes = append(cliqueNodes, node) - } - } - - // Create a map to track used indices - usedIndices := make(map[int]bool) - - // Collect all currently used indices from nodes with the same cliqueID - for _, node := range cliqueNodes { - usedIndices[node.Index] = true - } - - // Find the next available index, starting from 0 and filling gaps - nextIndex := 0 - for usedIndices[nextIndex] { - nextIndex++ - } - - // Skip `maxNodesPerIMEXDomain` check in the special case of no clique ID - // being set: this means that this node does not actually run an IMEX daemon - // managed by us and the set of nodes in this "noop" mode in this CD is - // allowed to grow larger than maxNodesPerIMEXDomain. - if currentCliqueID == "" { - return nextIndex, nil - } - - // Ensure nextIndex is within the range 0..maxNodesPerIMEXDomain - if nextIndex < 0 || nextIndex >= maxNodesPerIMEXDomain { - return -1, fmt.Errorf("no available indices within maxNodesPerIMEXDomain (%d) for cliqueID %s", maxNodesPerIMEXDomain, currentCliqueID) - } - - return nextIndex, nil -} - // If there was actually a change compared to the previously known set of // nodes: pass info to IMEX daemon controller. func (m *ComputeDomainManager) MaybePushNodesUpdate(cd *nvapi.ComputeDomain) { @@ -390,12 +306,6 @@ func (m *ComputeDomainManager) MaybePushNodesUpdate(cd *nvapi.ComputeDomain) { } } - // Do not update the IMEX daemon config if the current nodes list - // contains any duplicate DNS index (within our clique). - if m.HasDuplicateIndex(cd.Status.Nodes, m.config.cliqueID) { - return - } - newIPs := getIPSet(cd.Status.Nodes) previousIPs := getIPSet(m.previousNodes) @@ -501,26 +411,3 @@ func generatePatchForNodeInfo(nodes []*nvapi.ComputeDomainNode) ([]byte, error) patchBytes, err := json.Marshal(patch) return patchBytes, err } - -// HasDuplicateIndex iterates over the list of ComputeDomainNodes (in this CD, -// and in this clique), and returns true if any Index appears more than once. -func (m *ComputeDomainManager) HasDuplicateIndex(nodeInfos []*nvapi.ComputeDomainNode, cliqueID string) bool { - seen := make(map[int]struct{}) - - for _, node := range nodeInfos { - // Ignore nodes in a different clique. - if node.CliqueID != cliqueID { - continue - } - - if _, exists := seen[node.Index]; exists { - klog.V(4).Infof("DNS index collision detected: %v uses an index seen before (we are node %v)", node, m.config.nodeName) - return true - } - - // Mark as seen. - seen[node.Index] = struct{}{} - } - - return false -} diff --git a/cmd/compute-domain-daemon/controller.go b/cmd/compute-domain-daemon/controller.go index 7da2cb047..9b056107a 100644 --- a/cmd/compute-domain-daemon/controller.go +++ b/cmd/compute-domain-daemon/controller.go @@ -34,6 +34,7 @@ type ManagerConfig struct { computeDomainName string computeDomainNamespace string cliqueID string + cliqueNodeIndex int podIP string podName string podNamespace string @@ -47,6 +48,7 @@ type ControllerConfig struct { computeDomainName string computeDomainNamespace string cliqueID string + cliqueNodeIndex int podIP string podName string podNamespace string @@ -78,6 +80,7 @@ func NewController(config *ControllerConfig) (*Controller, error) { computeDomainName: config.computeDomainName, computeDomainNamespace: config.computeDomainNamespace, cliqueID: config.cliqueID, + cliqueNodeIndex: config.cliqueNodeIndex, podIP: config.podIP, podName: config.podName, podNamespace: config.podNamespace, diff --git a/cmd/compute-domain-daemon/main.go b/cmd/compute-domain-daemon/main.go index bd92dcf3e..d1ddef628 100644 --- a/cmd/compute-domain-daemon/main.go +++ b/cmd/compute-domain-daemon/main.go @@ -49,6 +49,7 @@ const ( type Flags struct { cliqueID string + cliqueNodeIndex int computeDomainUUID string computeDomainName string computeDomainNamespace string @@ -101,6 +102,12 @@ func newApp() *cli.App { EnvVars: []string{"CLIQUE_ID"}, Destination: &flags.cliqueID, }, + &cli.IntFlag{ + Name: "clique-node-index", + Usage: "The node index within the clique (assigned by controller).", + EnvVars: []string{"CLIQUE_NODE_INDEX"}, + Destination: &flags.cliqueNodeIndex, + }, &cli.StringFlag{ Name: "compute-domain-uuid", Usage: "The UUID of the ComputeDomain to manage.", @@ -192,6 +199,7 @@ func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error { config := &ControllerConfig{ cliqueID: flags.cliqueID, + cliqueNodeIndex: flags.cliqueNodeIndex, computeDomainUUID: flags.computeDomainUUID, computeDomainName: flags.computeDomainName, computeDomainNamespace: flags.computeDomainNamespace, diff --git a/cmd/compute-domain-kubelet-plugin/computedomain.go b/cmd/compute-domain-kubelet-plugin/computedomain.go index c4ddcf080..13406ac5a 100644 --- a/cmd/compute-domain-kubelet-plugin/computedomain.go +++ b/cmd/compute-domain-kubelet-plugin/computedomain.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "sync" "time" @@ -37,7 +38,11 @@ import ( ) const ( - computeDomainLabelKey = "resource.nvidia.com/computeDomain" + // cliqueConfigMapPrefix is the prefix for ConfigMaps that store clique node-to-index mappings. + cliqueConfigMapPrefix = "clique-" + + computeDomainLabelKey = "resource.nvidia.com/computeDomain" + computeDomainCliqueLabelKey = "resource.nvidia.com/computeDomain.cliqueID" informerResyncPeriod = 10 * time.Minute cleanupInterval = 10 * time.Minute @@ -56,6 +61,7 @@ type ComputeDomainManager struct { configFilesRoot string cliqueID string + cliqueNodeIndex *int } type ComputeDomainDaemonSettings struct { @@ -158,10 +164,18 @@ func (s *ComputeDomainDaemonSettings) GetCDIContainerEditsCommon(ctx context.Con return nil, fmt.Errorf("compute domain not found: %s", s.domainID) } + // Get the clique node index from the ConfigMap (managed by the controller). + // This will error if not available yet, causing a retry. + cliqueNodeIndex, err := s.manager.GetCliqueNodeIndex(ctx) + if err != nil { + return nil, fmt.Errorf("error getting clique node index: %w", err) + } + edits := &cdiapi.ContainerEdits{ ContainerEdits: &cdispec.ContainerEdits{ Env: []string{ fmt.Sprintf("CLIQUE_ID=%s", s.manager.cliqueID), + fmt.Sprintf("CLIQUE_NODE_INDEX=%d", cliqueNodeIndex), fmt.Sprintf("COMPUTE_DOMAIN_UUID=%s", cd.UID), fmt.Sprintf("COMPUTE_DOMAIN_NAME=%s", cd.Name), fmt.Sprintf("COMPUTE_DOMAIN_NAMESPACE=%s", cd.Namespace), @@ -349,6 +363,73 @@ func (m *ComputeDomainManager) GetComputeDomain(ctx context.Context, cdUID strin return cd, nil } +// SetPodCliqueLabel sets the computeDomain.clique label on the kubelet plugin pod +// with the cliqueID. This label is set when the plugin first comes online, +// overwriting any existing label with the same key. The label is automatically +// cleaned up when the pod is deleted. +func (m *ComputeDomainManager) SetPodCliqueLabel(ctx context.Context) error { + if m.cliqueID == "" { + klog.V(4).Infof("Skipping setting clique label: no cliqueID available") + return nil + } + + if m.config.flags.podName == "" { + klog.V(4).Infof("Skipping setting clique label: no podName available") + return nil + } + + pod, err := m.config.clientsets.Core.CoreV1().Pods(m.config.flags.namespace).Get(ctx, m.config.flags.podName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error retrieving Pod: %w", err) + } + + newPod := pod.DeepCopy() + if newPod.Labels == nil { + newPod.Labels = make(map[string]string) + } + newPod.Labels[computeDomainCliqueLabelKey] = m.cliqueID + + if _, err = m.config.clientsets.Core.CoreV1().Pods(m.config.flags.namespace).Update(ctx, newPod, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("error updating Pod with clique label: %w", err) + } + + klog.Infof("Set pod label %s=%s on pod %s/%s", computeDomainCliqueLabelKey, m.cliqueID, m.config.flags.namespace, m.config.flags.podName) + return nil +} + +// GetCliqueNodeIndex retrieves this node's index from the clique ConfigMap. +// The result is cached after the first successful lookup since it never changes +// for the lifetime of the kubelet plugin pod. +// Returns an error if the ConfigMap doesn't exist or the node isn't in it yet. +func (m *ComputeDomainManager) GetCliqueNodeIndex(ctx context.Context) (int, error) { + if m.cliqueID == "" { + return 0, nil + } + + if m.cliqueNodeIndex != nil { + return *m.cliqueNodeIndex, nil + } + + cmName := cliqueConfigMapPrefix + m.cliqueID + cm, err := m.config.clientsets.Core.CoreV1().ConfigMaps(m.config.flags.namespace).Get(ctx, cmName, metav1.GetOptions{}) + if err != nil { + return -1, fmt.Errorf("get ConfigMap %s: %w", cmName, err) + } + + indexStr, exists := cm.Data[m.config.flags.nodeName] + if !exists { + return -1, fmt.Errorf("node %s not found in ConfigMap %s", m.config.flags.nodeName, cmName) + } + + index, err := strconv.Atoi(indexStr) + if err != nil { + return -1, fmt.Errorf("invalid index value %q for node %s in ConfigMap %s: %w", indexStr, m.config.flags.nodeName, cmName, err) + } + + m.cliqueNodeIndex = &index + return index, nil +} + func (m *ComputeDomainManager) periodicCleanup(ctx context.Context) { ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index cc0e4be24..06ef90c0c 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -122,6 +122,11 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) { return nil, fmt.Errorf("error starting ComputeDomain manager: %w", err) } + // Set the clique label on the pod when the plugin comes online + if err := state.computeDomainManager.SetPodCliqueLabel(ctx); err != nil { + return nil, fmt.Errorf("error setting pod clique label: %w", err) + } + // Pass `nodeUnprepareResource` function in the cleanup manager. if err := state.checkpointCleanupManager.Start(ctx, driver.nodeUnprepareResource); err != nil { return nil, fmt.Errorf("error starting CheckpointCleanupManager: %w", err) diff --git a/cmd/compute-domain-kubelet-plugin/main.go b/cmd/compute-domain-kubelet-plugin/main.go index ade42c22d..bc28f493b 100644 --- a/cmd/compute-domain-kubelet-plugin/main.go +++ b/cmd/compute-domain-kubelet-plugin/main.go @@ -48,6 +48,7 @@ type Flags struct { nodeName string namespace string + podName string cdiRoot string containerDriverRoot string hostDriverRoot string @@ -93,6 +94,12 @@ func newApp() *cli.App { Destination: &flags.namespace, EnvVars: []string{"NAMESPACE"}, }, + &cli.StringFlag{ + Name: "pod-name", + Usage: "The name of the pod running this plugin.", + Destination: &flags.podName, + EnvVars: []string{"POD_NAME"}, + }, &cli.StringFlag{ Name: "cdi-root", Usage: "Absolute path to the directory where CDI files will be generated.", diff --git a/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml b/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml index 504cf0437..5b0bfa205 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml @@ -148,6 +148,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name {{- if .Values.nvidiaCDIHookPath }} - name: NVIDIA_CDI_HOOK_PATH value: "{{ .Values.nvidiaCDIHookPath }}" diff --git a/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-controller.yaml b/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-controller.yaml index f793d393b..610ed36d1 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-controller.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-controller.yaml @@ -57,6 +57,9 @@ rules: - apiGroups: ["apps"] resources: ["daemonsets"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list", "watch", "create", "update", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-kubeletplugin.yaml b/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-kubeletplugin.yaml index 9c7a78cd7..bd6953790 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-kubeletplugin.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/templates/rbac-kubeletplugin.yaml @@ -54,6 +54,12 @@ metadata: name: {{ include "nvidia-dra-driver-gpu.name" $ }}-role-kubeletplugin namespace: {{ $namespace }} rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["update"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] {{- if (and $.Values.resources.gpus.enabled $.Values.featureGates.MPSSupport) }} - apiGroups: ["apps"] resources: ["deployments"]