Skip to content

Commit 999f13a

Browse files
committed
Add DRA pod reconciler and node labeling
Add DRAPodReconciler that watches DRA-role pods and labels/unlabels nodes based on pod readiness, mirroring DevicePluginPodReconciler. - Add DRAVersionLabelPrefix constant and DRA label utilities (GetDRANodeLabel, GetDRAVersionLabelName, IsDRAVersionLabel) - Extend IsVersionLabel and GetNodesVersionLabels for DRA labels - Create DRAPodReconciler with NodeLabelerFinalizer support - Add isNotDRAPod predicate to DevicePluginPodReconciler to exclude DRA pods from event delivery - Exclude DRA pods from DevicePluginPodReconciler replacement check to prevent a ready DRA pod from blocking device-plugin-ready label removal - Register DRAPodReconciler in manager
1 parent 40f1f16 commit 999f13a

8 files changed

Lines changed: 556 additions & 2 deletions

File tree

cmd/manager/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ func main() {
170170
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DevicePluginPodReconcilerName)
171171
}
172172

173+
if err = controllers.NewDRAPodReconciler(client).SetupWithManager(mgr); err != nil {
174+
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DRAPodReconcilerName)
175+
}
176+
173177
if err = controllers.NewDRAReconciler(client, nodeAPI, scheme).SetupWithManager(mgr); err != nil {
174178
cmd.FatalError(setupLogger, err, "unable to create controller", "name", controllers.DRAReconcilerName)
175179
}

internal/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313

1414
WorkerPodVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-worker-pod"
1515
DevicePluginVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-device-plugin"
16+
DRAVersionLabelPrefix = "beta.kmm.node.kubernetes.io/version-dra"
1617
ModuleVersionLabelPrefix = "kmm.node.kubernetes.io/version-module"
1718

1819
GCDelayFinalizer = "kmm.node.kubernetes.io/gc-delay"

internal/controllers/device_plugin_pod_reconciler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
5959
if nodeName != "" {
6060
logger.Info("Unlabeling node")
6161

62-
// Make sure we don't already have a new running pod before unlabeling the node
62+
// Make sure we don't already have a new running device-plugin pod before unlabeling the node
6363
labelSelector := client.MatchingLabels{constants.ModuleNameLabel: moduleName}
6464
fieldSelector := client.MatchingFields{"spec.nodeName": nodeName}
6565
var modulePodsList v1.PodList
@@ -69,6 +69,9 @@ func (dppr *DevicePluginPodReconciler) Reconcile(ctx context.Context, pod *v1.Po
6969
}
7070
var foundRunningPod bool
7171
for _, p := range modulePodsList.Items {
72+
if p.Labels[constants.DaemonSetRole] == constants.DRARoleLabelValue {
73+
continue
74+
}
7275
if podutils.IsPodReady(&p) && p.DeletionTimestamp.IsZero() {
7376
foundRunningPod = true
7477
break
@@ -126,6 +129,10 @@ func (dppr *DevicePluginPodReconciler) SetupWithManager(mgr ctrl.Manager) error
126129
return false
127130
})
128131

132+
isNotDRAPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
133+
return o.GetLabels()[constants.DaemonSetRole] != constants.DRARoleLabelValue
134+
})
135+
129136
p := predicate.And(
130137
predicate.Or(
131138
filter.PodReadinessChangedPredicate(
@@ -135,6 +142,7 @@ func (dppr *DevicePluginPodReconciler) SetupWithManager(mgr ctrl.Manager) error
135142
),
136143
filter.HasLabel(constants.ModuleNameLabel),
137144
isDaemonSetPod,
145+
isNotDRAPod,
138146
)
139147

140148
return ctrl.

internal/controllers/device_plugin_pod_reconciler_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,61 @@ var _ = Describe("DevicePluginPodReconciler_Reconcile", func() {
126126
Expect(err).NotTo(HaveOccurred())
127127
})
128128

129+
It("should unlabel the node when the only ready pod is a DRA pod", func() {
130+
var (
131+
labelSelector = client.MatchingLabels{constants.ModuleNameLabel: moduleName}
132+
fieldSelector = client.MatchingFields{"spec.nodeName": nodeName}
133+
)
134+
135+
gomock.InOrder(
136+
kubeClient.EXPECT().List(ctx, gomock.Any(), labelSelector, fieldSelector).Do(
137+
func(_ interface{}, modulePodsList *v1.PodList, _ ...client.ListOption) {
138+
modulePodsList.Items = []v1.Pod{
139+
{
140+
ObjectMeta: metav1.ObjectMeta{
141+
Labels: map[string]string{
142+
constants.ModuleNameLabel: moduleName,
143+
constants.DaemonSetRole: constants.DRARoleLabelValue,
144+
},
145+
},
146+
Status: v1.PodStatus{
147+
Conditions: []v1.PodCondition{
148+
{
149+
Type: v1.PodReady,
150+
Status: v1.ConditionTrue,
151+
},
152+
},
153+
},
154+
},
155+
}
156+
},
157+
),
158+
kubeClient.EXPECT().Get(ctx, gomock.Any(), gomock.Any()).Do(
159+
func(_ interface{}, _ interface{}, node *v1.Node, _ ...client.GetOption) {
160+
node.SetLabels(map[string]string{utils.GetDevicePluginNodeLabel(podNamespace, moduleName): ""})
161+
},
162+
),
163+
kubeClient.EXPECT().Patch(ctx, gomock.Any(), gomock.Any()).Do(
164+
func(_ interface{}, node *v1.Node, p client.Patch, _ ...client.GetOption) {
165+
Expect(p.Type()).To(Equal(types.MergePatchType))
166+
Expect(p.Data(node)).To(Equal([]byte(`{"metadata":{"labels":null}}`)))
167+
},
168+
),
169+
)
170+
171+
pod := &v1.Pod{
172+
ObjectMeta: metav1.ObjectMeta{
173+
Labels: map[string]string{constants.ModuleNameLabel: moduleName},
174+
Name: podName,
175+
Namespace: podNamespace,
176+
},
177+
Spec: v1.PodSpec{NodeName: nodeName},
178+
}
179+
180+
_, err := r.Reconcile(ctx, pod)
181+
Expect(err).NotTo(HaveOccurred())
182+
})
183+
129184
now := metav1.Now()
130185

131186
patchRemoveFinalizerFunc := func(_ interface{}, pod *v1.Pod, p client.Patch, _ ...client.GetOption) {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
Copyright 2022.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/kubernetes-sigs/kernel-module-management/internal/constants"
24+
"github.com/kubernetes-sigs/kernel-module-management/internal/filter"
25+
"github.com/kubernetes-sigs/kernel-module-management/internal/utils"
26+
v1 "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/kubectl/pkg/util/podutils"
29+
ctrl "sigs.k8s.io/controller-runtime"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
32+
"sigs.k8s.io/controller-runtime/pkg/predicate"
33+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
34+
)
35+
36+
const DRAPodReconcilerName = "DRAPod"
37+
38+
type DRAPodReconciler struct {
39+
client client.Client
40+
}
41+
42+
func NewDRAPodReconciler(client client.Client) *DRAPodReconciler {
43+
return &DRAPodReconciler{client: client}
44+
}
45+
46+
func (r *DRAPodReconciler) Reconcile(ctx context.Context, pod *v1.Pod) (ctrl.Result, error) {
47+
logger := ctrl.LoggerFrom(ctx)
48+
49+
nodeName := pod.Spec.NodeName
50+
51+
moduleName, ok := pod.Labels[constants.ModuleNameLabel]
52+
if !ok {
53+
return ctrl.Result{}, fmt.Errorf("pod %s/%s has no %q label", pod.Namespace, pod.Name, constants.ModuleNameLabel)
54+
}
55+
56+
labelName := utils.GetDRANodeLabel(pod.Namespace, moduleName)
57+
58+
logger = logger.WithValues(
59+
"node name", nodeName,
60+
"module name", moduleName,
61+
"label name", labelName,
62+
)
63+
64+
if !podutils.IsPodReady(pod) || !pod.DeletionTimestamp.IsZero() {
65+
if nodeName != "" {
66+
logger.Info("Unlabeling node")
67+
68+
labelSelector := client.MatchingLabels{
69+
constants.ModuleNameLabel: moduleName,
70+
constants.DaemonSetRole: constants.DRARoleLabelValue,
71+
}
72+
fieldSelector := client.MatchingFields{"spec.nodeName": nodeName}
73+
var draPodsList v1.PodList
74+
err := r.client.List(ctx, &draPodsList, labelSelector, fieldSelector)
75+
if err != nil {
76+
return ctrl.Result{}, fmt.Errorf("failed to get list of all DRA pods for module %s on node %s: %v", moduleName, nodeName, err)
77+
}
78+
var foundRunningPod bool
79+
for _, p := range draPodsList.Items {
80+
if podutils.IsPodReady(&p) && p.DeletionTimestamp.IsZero() {
81+
foundRunningPod = true
82+
break
83+
}
84+
}
85+
if !foundRunningPod {
86+
if err := r.deleteLabel(ctx, nodeName, labelName); err != nil {
87+
return ctrl.Result{}, fmt.Errorf("could not unlabel node %s with label %s: %v",
88+
nodeName, labelName, err)
89+
}
90+
}
91+
}
92+
93+
if !pod.DeletionTimestamp.IsZero() {
94+
logger.Info("Pod deletion requested; removing finalizer")
95+
96+
if err := r.deleteFinalizer(ctx, pod); client.IgnoreNotFound(err) != nil {
97+
return ctrl.Result{}, fmt.Errorf("could not delete the pod finalizer: %v", err)
98+
}
99+
}
100+
101+
return ctrl.Result{}, nil
102+
}
103+
104+
logger.Info("Labeling node")
105+
106+
if err := r.addLabel(ctx, nodeName, labelName); err != nil {
107+
return ctrl.Result{}, fmt.Errorf("could not label node %s with %s: %v", nodeName, labelName, err)
108+
}
109+
110+
return ctrl.Result{}, nil
111+
}
112+
113+
func (r *DRAPodReconciler) SetupWithManager(mgr ctrl.Manager) error {
114+
isDaemonSetPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
115+
ownerReferences := o.GetOwnerReferences()
116+
for _, ownerReference := range ownerReferences {
117+
if ownerReference.Kind == "DaemonSet" {
118+
return true
119+
}
120+
}
121+
return false
122+
})
123+
124+
isDRAPod := predicate.NewPredicateFuncs(func(o client.Object) bool {
125+
return o.GetLabels()[constants.DaemonSetRole] == constants.DRARoleLabelValue
126+
})
127+
128+
p := predicate.And(
129+
predicate.Or(
130+
filter.PodReadinessChangedPredicate(
131+
mgr.GetLogger().WithName("dra-pod-readiness-changed"),
132+
),
133+
filter.DeletingPredicate(),
134+
),
135+
filter.HasLabel(constants.ModuleNameLabel),
136+
isDaemonSetPod,
137+
isDRAPod,
138+
)
139+
140+
return ctrl.
141+
NewControllerManagedBy(mgr).
142+
Named(DRAPodReconcilerName).
143+
For(&v1.Pod{}).
144+
WithEventFilter(p).
145+
Complete(
146+
reconcile.AsReconciler[*v1.Pod](r.client, r),
147+
)
148+
}
149+
150+
func (r *DRAPodReconciler) addLabel(ctx context.Context, nodeName string, labelName string) error {
151+
node := v1.Node{}
152+
153+
if err := r.client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
154+
return fmt.Errorf("could not get node %s: %v", nodeName, err)
155+
}
156+
157+
nodeCopy := node.DeepCopy()
158+
159+
if node.Labels == nil {
160+
node.Labels = make(map[string]string)
161+
}
162+
163+
node.Labels[labelName] = ""
164+
165+
return r.client.Patch(ctx, &node, client.MergeFrom(nodeCopy))
166+
}
167+
168+
func (r *DRAPodReconciler) deleteFinalizer(ctx context.Context, pod *v1.Pod) error {
169+
podCopy := pod.DeepCopy()
170+
171+
controllerutil.RemoveFinalizer(pod, constants.NodeLabelerFinalizer)
172+
173+
return r.client.Patch(ctx, pod, client.MergeFrom(podCopy))
174+
}
175+
176+
func (r *DRAPodReconciler) deleteLabel(ctx context.Context, nodeName string, labelName string) error {
177+
node := v1.Node{}
178+
179+
if err := r.client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
180+
return fmt.Errorf("could not get node %s: %v", nodeName, err)
181+
}
182+
183+
nodeCopy := node.DeepCopy()
184+
185+
delete(node.Labels, labelName)
186+
187+
return r.client.Patch(ctx, &node, client.MergeFrom(nodeCopy))
188+
}

0 commit comments

Comments
 (0)