Skip to content

Commit d6eaa21

Browse files
committed
Add DRA pod reconciler and node labeling
Add DRAPodReconciler that watches DRA-role pods and labels/unlabels nodes based on pod readiness, mirroring DevicePluginPodReconciler. - Add DRAVersionLabelPrefix constant and DRA label utilities (GetDRANodeLabel, GetDRAVersionLabelName, IsDRAVersionLabel) - Extend IsVersionLabel and GetNodesVersionLabels for DRA labels - Create DRAPodReconciler with NodeLabelerFinalizer support - Add isNotDRAPod predicate to DevicePluginPodReconciler to exclude DRA pods from event delivery - Exclude DRA pods from DevicePluginPodReconciler replacement check to prevent a ready DRA pod from blocking device-plugin-ready label removal - Register DRAPodReconciler in manager
1 parent 40f1f16 commit d6eaa21

9 files changed

Lines changed: 573 additions & 48 deletions

File tree

cmd/manager/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ func main() {
170170
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DevicePluginPodReconcilerName)
171171
}
172172

173+
if err = controllers.NewDRAPodReconciler(client).SetupWithManager(mgr); err != nil {
174+
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DRAPodReconcilerName)
175+
}
176+
173177
if err = controllers.NewDRAReconciler(client, nodeAPI, scheme).SetupWithManager(mgr); err != nil {
174178
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DRAReconcilerName)
175179
}

internal/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313

1414
WorkerPodVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-worker-pod"
1515
DevicePluginVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-device-plugin"
16+
DRAVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-dra"
1617
ModuleVersionLabelPrefix = "kmm.node.kubernetes.io/version-module"
1718

1819
GCDelayFinalizer = "kmm.node.kubernetes.io/gc-delay"

internal/controllers/device_plugin_pod_reconciler.go

Lines changed: 13 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ import (
88
"github.com/kubernetes-sigs/kernel-module-management/internal/filter"
99
"github.com/kubernetes-sigs/kernel-module-management/internal/utils"
1010
v1 "k8s.io/api/core/v1"
11-
"k8s.io/apimachinery/pkg/types"
1211
"k8s.io/kubectl/pkg/util/podutils"
1312
ctrl "sigs.k8s.io/controller-runtime"
1413
"sigs.k8s.io/controller-runtime/pkg/client"
15-
"sigs.k8s.io/controller-runtime/pkg/reconcile"
16-
17-
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1814
"sigs.k8s.io/controller-runtime/pkg/predicate"
15+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1916
)
2017

2118
const DevicePluginPodReconcilerName = "DevicePluginPod"
@@ -59,7 +56,7 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
5956
if nodeName != "" {
6057
logger.Info("Unlabeling node")
6158

62-
// Make sure we don't already have a new running pod before unlabeling the node
59+
// Make sure we don't already have a new running device-plugin pod before unlabeling the node
6360
labelSelector := client.MatchingLabels{constants.ModuleNameLabel: moduleName}
6461
fieldSelector := client.MatchingFields{"spec.nodeName": nodeName}
6562
var modulePodsList v1.PodList
@@ -69,13 +66,16 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
6966
}
7067
var foundRunningPod bool
7168
for _, p := range modulePodsList.Items {
69+
if p.Labels[constants.DaemonSetRole] == constants.DRARoleLabelValue {
70+
continue
71+
}
7272
if podutils.IsPodReady(&p) && p.DeletionTimestamp.IsZero() {
7373
foundRunningPod = true
7474
break
7575
}
7676
}
7777
if !foundRunningPod {
78-
if err := dppr.deleteLabel(ctx, nodeName, labelName); err != nil {
78+
if err := utils.DeleteNodeLabel(ctx, dppr.client, nodeName, labelName); err != nil {
7979
return ctrl.Result{}, fmt.Errorf("could not unlabel node %s with label %s: %v",
8080
nodeName, labelName, err)
8181
}
@@ -89,7 +89,7 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
8989
// the specified Pod has already been deleted. By ignoring NotFound errors we ensure
9090
// that no additional, unnecessary reconciliation request will be queued (since a
9191
// reconciliation result with a non-nil error will be requeued).
92-
if err := dppr.deleteFinalizer(ctx, pod); client.IgnoreNotFound(err) != nil {
92+
if err := utils.DeletePodFinalizer(ctx, dppr.client, pod); client.IgnoreNotFound(err) != nil {
9393
return ctrl.Result{}, fmt.Errorf("could not delete the pod finalizer: %v", err)
9494
}
9595
}
@@ -99,7 +99,7 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
9999

100100
logger.Info("Labeling node")
101101

102-
if err := dppr.addLabel(ctx, nodeName, labelName); err != nil {
102+
if err := utils.AddNodeLabel(ctx, dppr.client, nodeName, labelName); err != nil {
103103
return ctrl.Result{}, fmt.Errorf("could not label node %s with %s: %v", nodeName, labelName, err)
104104
}
105105

@@ -126,6 +126,10 @@ func (dppr *DevicePluginPodReconciler) SetupWithManager(mgr ctrl.Manager) error
126126
return false
127127
})
128128

129+
isNotDRAPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
130+
return o.GetLabels()[constants.DaemonSetRole] != constants.DRARoleLabelValue
131+
})
132+
129133
p := predicate.And(
130134
predicate.Or(
131135
filter.PodReadinessChangedPredicate(
@@ -135,6 +139,7 @@ func (dppr *DevicePluginPodReconciler) SetupWithManager(mgr ctrl.Manager) error
135139
),
136140
filter.HasLabel(constants.ModuleNameLabel),
137141
isDaemonSetPod,
142+
isNotDRAPod,
138143
)
139144

140145
return ctrl.
@@ -147,42 +152,3 @@ func (dppr *DevicePluginPodReconciler) SetupWithManager(mgr ctrl.Manager) error
147152
)
148153
}
149154

150-
func (dppr *DevicePluginPodReconciler) addLabel(ctx context.Context, nodeName string, labelName string) error {
151-
node := v1.Node{}
152-
153-
if err := dppr.client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
154-
return fmt.Errorf("could not get node %s: %v", nodeName, err)
155-
}
156-
157-
nodeCopy := node.DeepCopy()
158-
159-
if node.Labels == nil {
160-
node.Labels = make(map[string]string)
161-
}
162-
163-
node.Labels[labelName] = ""
164-
165-
return dppr.client.Patch(ctx, &node, client.MergeFrom(nodeCopy))
166-
}
167-
168-
func (dppr *DevicePluginPodReconciler) deleteFinalizer(ctx context.Context, pod *v1.Pod) error {
169-
podCopy := pod.DeepCopy()
170-
171-
controllerutil.RemoveFinalizer(pod, constants.NodeLabelerFinalizer)
172-
173-
return dppr.client.Patch(ctx, pod, client.MergeFrom(podCopy))
174-
}
175-
176-
func (dppr *DevicePluginPodReconciler) deleteLabel(ctx context.Context, nodeName string, labelName string) error {
177-
node := v1.Node{}
178-
179-
if err := dppr.client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
180-
return fmt.Errorf("could not get node %s: %v", nodeName, err)
181-
}
182-
183-
nodeCopy := node.DeepCopy()
184-
185-
delete(node.Labels, labelName)
186-
187-
return dppr.client.Patch(ctx, &node, client.MergeFrom(nodeCopy))
188-
}

internal/controllers/device_plugin_pod_reconciler_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,61 @@ var _ = Describe("DevicePluginPodReconciler_Reconcile", func() {
126126
Expect(err).NotTo(HaveOccurred())
127127
})
128128

129+
It("should unlabel the node when the only ready pod is a DRA pod", func() {
130+
var (
131+
labelSelector = client.MatchingLabels{constants.ModuleNameLabel: moduleName}
132+
fieldSelector = client.MatchingFields{"spec.nodeName": nodeName}
133+
)
134+
135+
gomock.InOrder(
136+
kubeClient.EXPECT().List(ctx, gomock.Any(), labelSelector, fieldSelector).Do(
137+
func(_ interface{}, modulePodsList *v1.PodList, _ ...client.ListOption) {
138+
modulePodsList.Items = []v1.Pod{
139+
{
140+
ObjectMeta: metav1.ObjectMeta{
141+
Labels: map[string]string{
142+
constants.ModuleNameLabel: moduleName,
143+
constants.DaemonSetRole: constants.DRARoleLabelValue,
144+
},
145+
},
146+
Status: v1.PodStatus{
147+
Conditions: []v1.PodCondition{
148+
{
149+
Type: v1.PodReady,
150+
Status: v1.ConditionTrue,
151+
},
152+
},
153+
},
154+
},
155+
}
156+
},
157+
),
158+
kubeClient.EXPECT().Get(ctx, gomock.Any(), gomock.Any()).Do(
159+
func(_ interface{}, _ interface{}, node *v1.Node, _ ...client.GetOption) {
160+
node.SetLabels(map[string]string{utils.GetDevicePluginNodeLabel(podNamespace, moduleName): ""})
161+
},
162+
),
163+
kubeClient.EXPECT().Patch(ctx, gomock.Any(), gomock.Any()).Do(
164+
func(_ interface{}, node *v1.Node, p client.Patch, _ ...client.GetOption) {
165+
Expect(p.Type()).To(Equal(types.MergePatchType))
166+
Expect(p.Data(node)).To(Equal([]byte(`{"metadata":{"labels":null}}`)))
167+
},
168+
),
169+
)
170+
171+
pod := &v1.Pod{
172+
ObjectMeta: metav1.ObjectMeta{
173+
Labels: map[string]string{constants.ModuleNameLabel: moduleName},
174+
Name: podName,
175+
Namespace: podNamespace,
176+
},
177+
Spec: v1.PodSpec{NodeName: nodeName},
178+
}
179+
180+
_, err := r.Reconcile(ctx, pod)
181+
Expect(err).NotTo(HaveOccurred())
182+
})
183+
129184
now := metav1.Now()
130185

131186
patchRemoveFinalizerFunc := func(_ interface{}, pod *v1.Pod, p client.Patch, _ ...client.GetOption) {
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
Copyright 2022.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/kubernetes-sigs/kernel-module-management/internal/constants"
24+
"github.com/kubernetes-sigs/kernel-module-management/internal/filter"
25+
"github.com/kubernetes-sigs/kernel-module-management/internal/utils"
26+
v1 "k8s.io/api/core/v1"
27+
"k8s.io/kubectl/pkg/util/podutils"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
"sigs.k8s.io/controller-runtime/pkg/predicate"
31+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
32+
)
33+
34+
const DRAPodReconcilerName = "DRAPod"
35+
36+
type DRAPodReconciler struct {
37+
client client.Client
38+
}
39+
40+
func NewDRAPodReconciler(client client.Client) *DRAPodReconciler {
41+
return &DRAPodReconciler{client: client}
42+
}
43+
44+
func (r *DRAPodReconciler) Reconcile(ctx context.Context, pod *v1.Pod) (ctrl.Result, error) {
45+
logger := ctrl.LoggerFrom(ctx)
46+
47+
nodeName := pod.Spec.NodeName
48+
49+
moduleName, ok := pod.Labels[constants.ModuleNameLabel]
50+
if !ok {
51+
return ctrl.Result{}, fmt.Errorf("pod %s/%s has no %q label", pod.Namespace, pod.Name, constants.ModuleNameLabel)
52+
}
53+
54+
labelName := utils.GetDRANodeLabel(pod.Namespace, moduleName)
55+
56+
logger = logger.WithValues(
57+
"node name", nodeName,
58+
"module name", moduleName,
59+
"label name", labelName,
60+
)
61+
62+
if !podutils.IsPodReady(pod) || !pod.DeletionTimestamp.IsZero() {
63+
if nodeName != "" {
64+
logger.Info("Unlabeling node")
65+
66+
labelSelector := client.MatchingLabels{
67+
constants.ModuleNameLabel: moduleName,
68+
constants.DaemonSetRole: constants.DRARoleLabelValue,
69+
}
70+
fieldSelector := client.MatchingFields{"spec.nodeName": nodeName}
71+
var draPodsList v1.PodList
72+
err := r.client.List(ctx, &draPodsList, labelSelector, fieldSelector)
73+
if err != nil {
74+
return ctrl.Result{}, fmt.Errorf("failed to get list of all DRA pods for module %s on node %s: %v", moduleName, nodeName, err)
75+
}
76+
var foundRunningPod bool
77+
for _, p := range draPodsList.Items {
78+
if podutils.IsPodReady(&p) && p.DeletionTimestamp.IsZero() {
79+
foundRunningPod = true
80+
break
81+
}
82+
}
83+
if !foundRunningPod {
84+
if err := utils.DeleteNodeLabel(ctx, r.client, nodeName, labelName); err != nil {
85+
return ctrl.Result{}, fmt.Errorf("could not unlabel node %s with label %s: %v",
86+
nodeName, labelName, err)
87+
}
88+
}
89+
}
90+
91+
if !pod.DeletionTimestamp.IsZero() {
92+
logger.Info("Pod deletion requested; removing finalizer")
93+
94+
if err := utils.DeletePodFinalizer(ctx, r.client, pod); client.IgnoreNotFound(err) != nil {
95+
return ctrl.Result{}, fmt.Errorf("could not delete the pod finalizer: %v", err)
96+
}
97+
}
98+
99+
return ctrl.Result{}, nil
100+
}
101+
102+
logger.Info("Labeling node")
103+
104+
if err := utils.AddNodeLabel(ctx, r.client, nodeName, labelName); err != nil {
105+
return ctrl.Result{}, fmt.Errorf("could not label node %s with %s: %v", nodeName, labelName, err)
106+
}
107+
108+
return ctrl.Result{}, nil
109+
}
110+
111+
func (r *DRAPodReconciler) SetupWithManager(mgr ctrl.Manager) error {
112+
isDaemonSetPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
113+
ownerReferences := o.GetOwnerReferences()
114+
for _, ownerReference := range ownerReferences {
115+
if ownerReference.Kind == "DaemonSet" {
116+
return true
117+
}
118+
}
119+
return false
120+
})
121+
122+
isDRAPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
123+
return o.GetLabels()[constants.DaemonSetRole] == constants.DRARoleLabelValue
124+
})
125+
126+
p := predicate.And(
127+
predicate.Or(
128+
filter.PodReadinessChangedPredicate(
129+
mgr.GetLogger().WithName("dra-pod-readiness-changed"),
130+
),
131+
filter.DeletingPredicate(),
132+
),
133+
filter.HasLabel(constants.ModuleNameLabel),
134+
isDaemonSetPod,
135+
isDRAPod,
136+
)
137+
138+
return ctrl.
139+
NewControllerManagedBy(mgr).
140+
Named(DRAPodReconcilerName).
141+
For(&v1.Pod{}).
142+
WithEventFilter(p).
143+
Complete(
144+
reconcile.AsReconciler[*v1.Pod](r.client, r),
145+
)
146+
}
147+

0 commit comments

Comments
 (0)