Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ const (
DaemonSetRole = "kmm.node.kubernetes.io/role"
NamespaceLabelKey = "kmm.node.k8s.io/contains-modules"

WorkerPodVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-worker-pod"
DevicePluginVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-device-plugin"
ModuleVersionLabelPrefix = "kmm.node.kubernetes.io/version-module"
WorkerPodVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-worker-pod"
SchedulePluginVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-schedule-plugin"
ModuleVersionLabelPrefix = "kmm.node.kubernetes.io/version-module"

GCDelayFinalizer = "kmm.node.kubernetes.io/gc-delay"
ModuleFinalizer = "kmm.node.kubernetes.io/module-finalizer"
Expand Down
6 changes: 3 additions & 3 deletions internal/controllers/device_plugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func generateDevicePluginLabelsAndSelector(mod *kmmv1beta1.Module) (map[string]s
}

if mod.Spec.ModuleLoader != nil && mod.Spec.ModuleLoader.Container.Version != "" {
versionLabel := utils.GetDevicePluginVersionLabelName(mod.Namespace, mod.Name)
versionLabel := utils.GetSchedulePluginVersionLabelName(mod.Namespace, mod.Name)
labels[versionLabel] = mod.Spec.ModuleLoader.Container.Version
nodeSelector[versionLabel] = mod.Spec.ModuleLoader.Container.Version
} else if mod.Spec.ModuleLoader == nil {
Expand Down Expand Up @@ -536,7 +536,7 @@ func getExistingDSFromVersion(existingDS []appsv1.DaemonSet,
version = moduleLoader.Container.Version
}

versionLabel := utils.GetDevicePluginVersionLabelName(moduleNamespace, moduleName)
versionLabel := utils.GetSchedulePluginVersionLabelName(moduleNamespace, moduleName)
for _, ds := range existingDS {
dsModuleVersion := ds.GetLabels()[versionLabel]
if dsModuleVersion == version {
Expand All @@ -549,7 +549,7 @@ func getExistingDSFromVersion(existingDS []appsv1.DaemonSet,
func isOlderVersionUnusedDevicePluginDaemonset(ds *appsv1.DaemonSet, moduleVersion string) bool {
moduleName := ds.Labels[constants.ModuleNameLabel]
moduleNamespace := ds.Namespace
versionLabel := utils.GetDevicePluginVersionLabelName(moduleNamespace, moduleName)
versionLabel := utils.GetSchedulePluginVersionLabelName(moduleNamespace, moduleName)
return ds.Labels[versionLabel] != moduleVersion && ds.Status.DesiredNumberScheduled == 0
}

Expand Down
16 changes: 8 additions & 8 deletions internal/controllers/device_plugin_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,16 @@ var _ = Describe("DevicePluginReconciler_garbageCollect", func() {
},
},
}
devicePluginVersionLabel := utils.GetDevicePluginVersionLabelName(mod.Namespace, mod.Name)
schedulePluginVersionLabel := utils.GetSchedulePluginVersionLabelName(mod.Namespace, mod.Name)

DescribeTable("device-plugin GC", func(devicePluginFormerLabel bool, devicePluginFormerDesired int) {
devicePluginDS := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "devicePlugin",
Namespace: "namespace",
Labels: map[string]string{
devicePluginVersionLabel: currentModuleVersion,
constants.ModuleNameLabel: mod.Name,
schedulePluginVersionLabel: currentModuleVersion,
constants.ModuleNameLabel: mod.Name,
},
},
}
Expand All @@ -335,7 +335,7 @@ var _ = Describe("DevicePluginReconciler_garbageCollect", func() {
if devicePluginFormerLabel {
devicePluginFormerVersionDS = devicePluginDS.DeepCopy()
devicePluginFormerVersionDS.SetName("devicePluginFormer")
devicePluginFormerVersionDS.Labels[devicePluginVersionLabel] = "former label"
devicePluginFormerVersionDS.Labels[schedulePluginVersionLabel] = "former label"
devicePluginFormerVersionDS.Status.DesiredNumberScheduled = int32(devicePluginFormerDesired)
existingDS = append(existingDS, *devicePluginFormerVersionDS)
}
Expand All @@ -357,7 +357,7 @@ var _ = Describe("DevicePluginReconciler_garbageCollect", func() {
ObjectMeta: metav1.ObjectMeta{
Name: "devicePlugin",
Namespace: "namespace",
Labels: map[string]string{constants.ModuleNameLabel: mod.Name, devicePluginVersionLabel: "formerVersion"},
Labels: map[string]string{constants.ModuleNameLabel: mod.Name, schedulePluginVersionLabel: "formerVersion"},
},
}
clnt.EXPECT().Delete(context.Background(), &deleteDS).Return(fmt.Errorf("some error"))
Expand All @@ -375,7 +375,7 @@ var _ = Describe("DevicePluginReconciler_garbageCollect", func() {
ObjectMeta: metav1.ObjectMeta{
Name: "devicePlugin",
Namespace: "namespace",
Labels: map[string]string{constants.ModuleNameLabel: mod.Name, devicePluginVersionLabel: "formerVersion"},
Labels: map[string]string{constants.ModuleNameLabel: mod.Name, schedulePluginVersionLabel: "formerVersion"},
},
}

Expand Down Expand Up @@ -738,7 +738,7 @@ var _ = Describe("DevicePluginReconciler_setDevicePluginAsDesired", func() {
err := dsc.setDevicePluginAsDesired(context.Background(), &ds, &mod)

Expect(err).NotTo(HaveOccurred())
versionLabel := utils.GetDevicePluginVersionLabelName(namespace, moduleName)
versionLabel := utils.GetSchedulePluginVersionLabelName(namespace, moduleName)
Expect(ds.GetLabels()).Should(HaveKeyWithValue(versionLabel, "some version"))
})

Expand Down Expand Up @@ -968,7 +968,7 @@ var _ = Describe("DevicePluginReconciler_getExistingDSFromVersion", func() {
)

devicePluginLabels := map[string]string{
utils.GetDevicePluginVersionLabelName(moduleNamespace, moduleName): moduleVersion,
utils.GetSchedulePluginVersionLabelName(moduleNamespace, moduleName): moduleVersion,
}

ds := appsv1.DaemonSet{
Expand Down
65 changes: 60 additions & 5 deletions internal/controllers/dra_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (r *DRAReconciler) Reconcile(ctx context.Context, mod *kmmv1beta1.Module) (
return res, fmt.Errorf("could not handle DRA: %v", err)
}

err = r.reconHelperAPI.garbageCollectDRADaemonSets(ctx, mod, existingDRADS)
if err != nil {
return res, fmt.Errorf("failed to run DRA garbage collection: %v", err)
}

err = r.reconHelperAPI.handleDeviceClasses(ctx, mod, existingDCs)
if err != nil {
return res, fmt.Errorf("could not handle DeviceClasses: %v", err)
Expand All @@ -133,6 +138,7 @@ func (r *DRAReconciler) Reconcile(ctx context.Context, mod *kmmv1beta1.Module) (
type draReconcilerHelperAPI interface {
getModuleDRADaemonSets(ctx context.Context, name, namespace string) ([]appsv1.DaemonSet, error)
handleDRA(ctx context.Context, mod *kmmv1beta1.Module, existingDRADS []appsv1.DaemonSet) error
garbageCollectDRADaemonSets(ctx context.Context, mod *kmmv1beta1.Module, existingDS []appsv1.DaemonSet) error
deleteDRAResources(ctx context.Context, moduleName, moduleNamespace string) error
moduleUpdateDRAStatus(ctx context.Context, mod *kmmv1beta1.Module, existingDRADS []appsv1.DaemonSet) error
clearDRAStatus(ctx context.Context, mod *kmmv1beta1.Module) error
Expand Down Expand Up @@ -181,11 +187,9 @@ func (drh *draReconcilerHelper) handleDRA(ctx context.Context, mod *kmmv1beta1.M

logger := log.FromContext(ctx)

var ds *appsv1.DaemonSet
if len(existingDRADS) > 0 {
ds = &existingDRADS[0]
} else {
logger.Info("creating new DRA DaemonSet")
ds, version := getExistingDRADSFromVersion(existingDRADS, mod.Namespace, mod.Name, mod.Spec.ModuleLoader)
if ds == nil {
logger.Info("creating new DRA DaemonSet", "version", version)
ds = &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Namespace: mod.Namespace, GenerateName: mod.Name + "-dra-"},
}
Expand All @@ -202,6 +206,51 @@ func (drh *draReconcilerHelper) handleDRA(ctx context.Context, mod *kmmv1beta1.M
return err
}

func (drh *draReconcilerHelper) garbageCollectDRADaemonSets(ctx context.Context, mod *kmmv1beta1.Module, existingDS []appsv1.DaemonSet) error {
if mod.Spec.ModuleLoader == nil {
return nil
}

logger := log.FromContext(ctx)
deleted := make([]string, 0)
for _, ds := range existingDS {
if isOlderVersionUnusedDRADaemonSet(&ds, mod.Namespace, mod.Spec.ModuleLoader.Container.Version) {
deleted = append(deleted, ds.Name)
if err := drh.client.Delete(ctx, &ds); err != nil {
return fmt.Errorf("could not delete DRA DaemonSet %s: %v", ds.Name, err)
}
}
}

logger.Info("garbage-collected DRA DaemonSets", "names", deleted)
return nil
}

func getExistingDRADSFromVersion(existingDS []appsv1.DaemonSet,
moduleNamespace string,
moduleName string,
moduleLoader *kmmv1beta1.ModuleLoaderSpec) (*appsv1.DaemonSet, string) {
version := ""
if moduleLoader != nil {
version = moduleLoader.Container.Version
}

versionLabel := utils.GetSchedulePluginVersionLabelName(moduleNamespace, moduleName)
for _, ds := range existingDS {
dsModuleVersion := ds.GetLabels()[versionLabel]
if dsModuleVersion == version {
return &ds, version
}
}
return nil, version
}

func isOlderVersionUnusedDRADaemonSet(ds *appsv1.DaemonSet, moduleNamespace, moduleVersion string) bool {
moduleName := ds.Labels[constants.ModuleNameLabel]
versionLabel := utils.GetSchedulePluginVersionLabelName(moduleNamespace, moduleName)
return ds.Labels[versionLabel] != moduleVersion && ds.Status.DesiredNumberScheduled == 0
}

// deleteDRAResources deletes all DRA-owned DaemonSets and DeviceClasses using label-based bulk deletion.
func (drh *draReconcilerHelper) deleteDRAResources(ctx context.Context, moduleName, moduleNamespace string) error {
var errs []error
Expand Down Expand Up @@ -403,6 +452,12 @@ func (dsci *draDaemonSetCreatorImpl) setDRAAsDesired(
utils.GetKernelModuleReadyNodeLabel(mod.Namespace, mod.Name): "",
}

if mod.Spec.ModuleLoader != nil && mod.Spec.ModuleLoader.Container.Version != "" {
versionLabel := utils.GetSchedulePluginVersionLabelName(mod.Namespace, mod.Name)
standardLabels[versionLabel] = mod.Spec.ModuleLoader.Container.Version
nodeSelector[versionLabel] = mod.Spec.ModuleLoader.Container.Version
}

ds.SetLabels(
overrideLabels(ds.GetLabels(), standardLabels),
)
Expand Down
Loading
Loading