diff --git a/internal/controller/numaresourcesscheduler_controller.go b/internal/controller/numaresourcesscheduler_controller.go index 56e352b161..8b6588fe15 100644 --- a/internal/controller/numaresourcesscheduler_controller.go +++ b/internal/controller/numaresourcesscheduler_controller.go @@ -244,11 +244,6 @@ func isDeploymentRunning(ctx context.Context, c client.Client, key nropv1.Namesp } func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context.Context, schedSpec nropv1.NUMAResourcesSchedulerSpec) (*int32, error) { - // do not autodetect if explicitly set by the user - if schedSpec.Replicas != nil { - return schedSpec.Replicas, nil - } - var labelSelector map[string]string var nodeRoleDescription string @@ -276,6 +271,14 @@ func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context. replicaCount := int32(len(nodeList.Items)) + if schedSpec.Replicas != nil { + // should not allow more replicas than max found nodes + if *schedSpec.Replicas > replicaCount { + return nil, fmt.Errorf("explicitly set replicas count should not be greater than the number of the nodes with role %s", nodeRoleDescription) + } + return schedSpec.Replicas, nil + } + // check if no nodes are found for the target role if replicaCount == 0 { return nil, fmt.Errorf("failed to compute scheduler replicas: no %s nodes found for platform %s", nodeRoleDescription, r.PlatformInfo.Platform) @@ -361,7 +364,7 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex return nropv1.NUMAResourcesSchedulerStatus{}, err } - if err := schedupdate.DeploymentAffinityWithStrategy(r.SchedulerManifests.Deployment, instance.Spec); err != nil { + if err := schedupdate.DeploymentTopologySpreadConstraints(r.SchedulerManifests.Deployment); err != nil { return nropv1.NUMAResourcesSchedulerStatus{}, err } diff --git a/internal/controller/numaresourcesscheduler_controller_test.go b/internal/controller/numaresourcesscheduler_controller_test.go index 0fc708a1cb..28af976bb5 100644 --- a/internal/controller/numaresourcesscheduler_controller_test.go +++ b/internal/controller/numaresourcesscheduler_controller_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -1031,192 +1030,27 @@ var _ = Describe("Test NUMAResourcesScheduler Reconcile", func() { Expect(dp.Spec.Template.Spec.Containers[0].Args).To(ContainElement("--tls-min-version=" + updatedSettings.MinVersion)) Expect(dp.Spec.Template.Spec.Containers[0].Args).To(ContainElement("--tls-cipher-suites=" + updatedSettings.CipherSuites)) }) - }) - Context("when setting the scheduler PodAntiAffinity", func() { - var ( - expectedPodAntiAff *corev1.PodAntiAffinity - expectedStrategy appsv1.DeploymentStrategy - ) + It("should set the TopologySpreadConstraints in the deployment by default", func(ctx context.Context) { + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: nrsKey}) + Expect(err).ToNot(HaveOccurred()) - BeforeEach(func() { - expectedPodAntiAff = &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "secondary-scheduler", - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - } - expectedStrategy = appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, - RollingUpdate: &appsv1.RollingUpdateDeployment{ - MaxUnavailable: ptr.To(intstr.FromInt(1)), - MaxSurge: ptr.To(intstr.FromInt(0)), + Expect(reconciler.Client.Get(ctx, nrsKey, nrs)).To(Succeed()) + dpKey := client.ObjectKey{Namespace: nrs.Status.Deployment.Namespace, Name: nrs.Status.Deployment.Name} + dp := &appsv1.Deployment{} + Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) + Expect(dp.Spec.Template.Spec.TopologySpreadConstraints).To(HaveLen(1)) + + expectedConstraint := corev1.TopologySpreadConstraint{ + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + MatchLabelKeys: []string{"pod-template-hash"}, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: dp.Spec.Template.Labels, }, } - }) - - Context("set different replicas counts", func() { - type testCase struct { - replicasCount *int32 - expectPodAntiAffinity bool - } - - DescribeTable("should set PodAntiAffinity for scheduler Deployment according to replicas count", func(ctx context.Context, tc testCase) { - nrs := testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second) - nrs.Spec.Replicas = tc.replicasCount - initObjects := []runtime.Object{nrs} - initObjects = append(initObjects, fakeNodes(3, 0)...) - reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(initObjects...) - Expect(err).ToNot(HaveOccurred()) - // Baseline node affinity comes from the scheduler deployment manifest; reconcile must preserve it. - Expect(reconciler.SchedulerManifests.Deployment.Spec.Template.Spec.Affinity).ToNot(BeNil()) - expectedNodeAff := reconciler.SchedulerManifests.Deployment.Spec.Template.Spec.Affinity.NodeAffinity.DeepCopy() - - key := client.ObjectKeyFromObject(nrs) - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - dp := &appsv1.Deployment{} - dpKey := client.ObjectKey{Namespace: testNamespace, Name: "secondary-scheduler"} - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity := dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.NodeAffinity).To(Equal(expectedNodeAff)) - - if !tc.expectPodAntiAffinity { - Expect(affinity.PodAntiAffinity).To(BeNil()) - Expect(dp.Spec.Strategy).To(Equal(appsv1.DeploymentStrategy{})) - - // no changes expected on second reconcile with same input - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity = dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.NodeAffinity).To(Equal(expectedNodeAff)) - Expect(affinity.PodAntiAffinity).To(BeNil()) - Expect(dp.Spec.Strategy).To(Equal(appsv1.DeploymentStrategy{})) - return - } - - Expect(affinity.PodAntiAffinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(Equal(expectedPodAntiAff)) - Expect(dp.Spec.Strategy).To(Equal(expectedStrategy)) - - // no changes expected on second reconcile with same input - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity = dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.NodeAffinity).To(Equal(expectedNodeAff)) - Expect(affinity.PodAntiAffinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(Equal(expectedPodAntiAff)) - Expect(dp.Spec.Strategy).To(Equal(expectedStrategy)) - }, - Entry("when replicas are not set, expected podAntiAffinity", testCase{expectPodAntiAffinity: true}), - Entry("when replicas are set and zero, no podAntiAffinity is set", testCase{replicasCount: ptr.To(int32(0)), expectPodAntiAffinity: false}), - Entry("when replicas are set and non-zero, no podAntiAffinity is set", testCase{replicasCount: ptr.To(int32(1)), expectPodAntiAffinity: false}), - ) - }) - - Context("switch between replicas counts", func() { - It("should reset podAntiAffinity and strategy - transition from autodetect to explicit replicas count", func(ctx context.Context) { - nrs := testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second) - nrs.Spec.Replicas = nil // autodetect - initObjects := []runtime.Object{nrs} - initObjects = append(initObjects, fakeNodes(3, 0)...) - reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(initObjects...) - Expect(err).ToNot(HaveOccurred()) - - key := client.ObjectKeyFromObject(nrs) - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - dp := &appsv1.Deployment{} - dpKey := client.ObjectKey{Namespace: testNamespace, Name: "secondary-scheduler"} - - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity := dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(Equal(expectedPodAntiAff)) - Expect(dp.Spec.Strategy).To(Equal(expectedStrategy)) - - // reconcile with non-nil replicas count - Eventually(func(g Gomega) { - g.Expect(reconciler.Client.Get(ctx, key, nrs)).ToNot(HaveOccurred()) - nrs.Spec.Replicas = ptr.To(int32(2)) - g.Expect(reconciler.Client.Update(ctx, nrs)).ToNot(HaveOccurred()) - }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(Succeed()) - - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity = dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(BeNil()) - Expect(dp.Spec.Strategy).To(Equal(appsv1.DeploymentStrategy{})) - - // reconcile with a different non-nil replicas count should keep the same result - Eventually(func(g Gomega) { - g.Expect(reconciler.Client.Get(ctx, key, nrs)).ToNot(HaveOccurred()) - nrs.Spec.Replicas = ptr.To(int32(0)) - g.Expect(reconciler.Client.Update(ctx, nrs)).ToNot(HaveOccurred()) - }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(Succeed()) - - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity = dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(BeNil()) - Expect(dp.Spec.Strategy).To(Equal(appsv1.DeploymentStrategy{})) - - }) - - It("should reset podAntiAffinity and strategy - transition from explicit replicas count to autodetect", func(ctx context.Context) { - nrs := testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second) - nrs.Spec.Replicas = ptr.To(int32(3)) - initObjects := []runtime.Object{nrs} - initObjects = append(initObjects, fakeNodes(3, 0)...) - reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(initObjects...) - Expect(err).ToNot(HaveOccurred()) - - key := client.ObjectKeyFromObject(nrs) - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - dp := &appsv1.Deployment{} - dpKey := client.ObjectKey{Namespace: testNamespace, Name: "secondary-scheduler"} - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity := dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(BeNil()) - Expect(dp.Spec.Strategy).To(Equal(appsv1.DeploymentStrategy{})) - - Eventually(func(g Gomega) { - g.Expect(reconciler.Client.Get(ctx, key, nrs)).ToNot(HaveOccurred()) - nrs.Spec.Replicas = nil // autodetect - g.Expect(reconciler.Client.Update(ctx, nrs)).ToNot(HaveOccurred()) - }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(Succeed()) - - _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - Expect(err).ToNot(HaveOccurred()) - - Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) - affinity = dp.Spec.Template.Spec.Affinity - Expect(affinity).ToNot(BeNil()) - Expect(affinity.PodAntiAffinity).To(Equal(expectedPodAntiAff)) - Expect(dp.Spec.Strategy).To(Equal(expectedStrategy)) - }) + Expect(dp.Spec.Template.Spec.TopologySpreadConstraints[0]).To(Equal(expectedConstraint)) }) }) }) @@ -1293,36 +1127,157 @@ var _ = Describe("Test computeSchedulerReplicas", func() { Entry("HyperShift: 3 control-plane + 5 worker should return 3 replicas (uses worker)", platform.HyperShift, 3, 5, ptr.To(int32(3)), false), ) - Context("when replicas are explicitly set", func() { - It("should return the explicitly set replicas without checking nodes", func() { - // setup reconciler with HyperShift platform (uses worker nodes) - reconciler.PlatformInfo = platforminfo.PlatformInfo{ - Platform: platform.HyperShift, - Version: "v4.14.0", - } + When("replicas count is explicitly set", func() { + type testCase struct { + platform platform.Platform + numControlPlane int + numWorker int + explicitReplicas *int32 + expectedReplicas *int32 + expectError string + } + DescribeTable( + "should not allow more replicas than the number of nodes", func(ctx context.Context, tc testCase) { + nrs := testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second) + nrs.Spec.Replicas = tc.explicitReplicas + initObjects := []runtime.Object{nrs} + initObjects = append(initObjects, fakeNodes(tc.numControlPlane, tc.numWorker)...) + reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(initObjects...) + Expect(err).ToNot(HaveOccurred()) + reconciler.PlatformInfo = platforminfo.PlatformInfo{ + Platform: tc.platform, + Version: "v4.14.0", + } - // create scenario with no worker nodes but explicit replicas - nodes := fakeNodes(3, 0) // only control-plane nodes - reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(nodes...) + key := client.ObjectKeyFromObject(nrs) + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + if tc.expectError == "" { + Expect(err).ToNot(HaveOccurred()) + } else { + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(tc.expectError)) + } + + Expect(reconciler.Client.Get(ctx, key, nrs)).To(Succeed()) + + if tc.expectError != "" { + degradedCondition := getConditionByType(nrs.Status.Conditions, status.ConditionDegraded) + Expect(degradedCondition.Status).To(Equal(metav1.ConditionTrue)) + Expect(degradedCondition.Reason).To(Equal(status.ReasonInternalError)) + Expect(degradedCondition.Message).To(ContainSubstring(tc.expectError)) + return + } + + dp := &appsv1.Deployment{} + dpKey := client.ObjectKey{Namespace: nrs.Status.Deployment.Namespace, Name: nrs.Status.Deployment.Name} + Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) + Expect(*dp.Spec.Replicas).To(Equal(*tc.expectedReplicas)) + }, + // openshift scenarios + Entry("OpenShift: 3 control-plane with explicit replicas count equal to the number of nodes", + testCase{ + platform: platform.OpenShift, + numControlPlane: 3, + numWorker: 5, + explicitReplicas: ptr.To(int32(3)), + expectedReplicas: ptr.To(int32(3)), + }), + Entry("OpenShift: 3 control-plane with explicit replicas count less than the number of nodes", + testCase{ + platform: platform.OpenShift, + numControlPlane: 3, + numWorker: 0, + explicitReplicas: ptr.To(int32(2)), + expectedReplicas: ptr.To(int32(2)), + }), + Entry("OpenShift: 3 control-plane with explicit replicas count greater than the number of nodes", + testCase{ + platform: platform.OpenShift, + numControlPlane: 3, + numWorker: 5, + explicitReplicas: ptr.To(int32(5)), + expectError: "explicitly set replicas count should not be greater than the number of the nodes with role", + }), + + // hypershift scenarios + Entry("HyperShift: 3 control-plane with explicit replicas count equal to the number of nodes", + testCase{ + platform: platform.HyperShift, + numControlPlane: 0, + numWorker: 3, + explicitReplicas: ptr.To(int32(3)), + expectedReplicas: ptr.To(int32(3)), + }), + Entry("HyperShift: 3 control-plane with explicit replicas count less than the number of nodes", + testCase{ + platform: platform.HyperShift, + numControlPlane: 0, + numWorker: 3, + explicitReplicas: ptr.To(int32(2)), + expectedReplicas: ptr.To(int32(2)), + }), + Entry("HyperShift: 3 control-plane with explicit replicas count greater than the number of nodes", + testCase{ + platform: platform.HyperShift, + numControlPlane: 0, + numWorker: 5, + explicitReplicas: ptr.To(int32(6)), + expectError: "explicitly set replicas count should not be greater than the number of the nodes with role", + }), + ) + + It("should degrade when nodes are scaled down with higher number of explicit replicas", func(ctx context.Context) { + nrs := testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second) + initialReplicas := int32(3) + nrs.Spec.Replicas = ptr.To(initialReplicas) + initObjects := []runtime.Object{nrs} + initialNodes := fakeNodes(4, 4) + initObjects = append(initObjects, initialNodes...) + reconciler, err := NewFakeNUMAResourcesSchedulerReconciler(initObjects...) Expect(err).ToNot(HaveOccurred()) reconciler.PlatformInfo = platforminfo.PlatformInfo{ - Platform: platform.HyperShift, + Platform: platform.OpenShift, Version: "v4.14.0", } - // create test scheduler spec with explicit replicas - explicitReplicas := int32(5) - schedSpec := nropv1.NUMAResourcesSchedulerSpec{ - Replicas: &explicitReplicas, - } + key := client.ObjectKeyFromObject(nrs) + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).ToNot(HaveOccurred()) - // call the function under test - result, err := reconciler.computeSchedulerReplicas(context.TODO(), schedSpec) + Expect(reconciler.Client.Get(ctx, key, nrs)).To(Succeed()) + dp := &appsv1.Deployment{} + dpKey := client.ObjectKey{Namespace: nrs.Status.Deployment.Namespace, Name: nrs.Status.Deployment.Name} + Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) + Expect(*dp.Spec.Replicas).To(Equal(initialReplicas)) - // should return the explicit value without error + // scale down the nodes by one but keep it same as the explicit replicas count + Eventually(func(g Gomega) { + g.Expect(reconciler.Client.Delete(ctx, initialNodes[0].(*corev1.Node))).To(Succeed()) + }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(Succeed()) + + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) Expect(err).ToNot(HaveOccurred()) - Expect(result).ToNot(BeNil()) - Expect(*result).To(Equal(explicitReplicas)) + + Expect(reconciler.Client.Get(ctx, key, nrs)).To(Succeed()) + Expect(reconciler.Client.Get(ctx, dpKey, dp)).To(Succeed()) + Expect(*dp.Spec.Replicas).To(Equal(initialReplicas)) + + // scale down the nodes by one but keep it same as the explicit replicas count + Eventually(func(g Gomega) { + g.Expect(reconciler.Client.Delete(ctx, initialNodes[1].(*corev1.Node))).To(Succeed()) + }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(Succeed()) + + errMessage := "explicitly set replicas count should not be greater than the number of the nodes with role" + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(errMessage)) + + Expect(reconciler.Client.Get(ctx, key, nrs)).To(Succeed()) + + degradedCondition := getConditionByType(nrs.Status.Conditions, status.ConditionDegraded) + Expect(degradedCondition.Status).To(Equal(metav1.ConditionTrue)) + Expect(degradedCondition.Reason).To(Equal(status.ReasonInternalError)) + Expect(degradedCondition.Message).To(ContainSubstring(errMessage)) }) }) }) diff --git a/internal/wait/deployment.go b/internal/wait/deployment.go index 210862e99c..d232257cbf 100644 --- a/internal/wait/deployment.go +++ b/internal/wait/deployment.go @@ -24,11 +24,14 @@ import ( "k8s.io/klog/v2" ) +// ForDeploymentComplete waits for the deployment to be complete with the number of replicas specified in the deployment spec. func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deployment) (*appsv1.Deployment, error) { - // This function waits for the readiness of the pods under the deployment. The best use of this check is for - // completely new deployments. If the deployment exists on the cluster and simply updated, this check is - // not enough to guarantee that the deployment is ready with the NEW replica, thus need to cover that by - // additional checks as the context requires + return wt.ForDeploymentCompleteWithReplicas(ctx, dp, *(dp.Spec.Replicas)) +} + +// ForDeploymentCompleteWithReplicas waits for the deployment to be complete and have the specified number of replicas. +// Use this function when the deployment is expected to have new number of replicas after an update. +func (wt Waiter) ForDeploymentCompleteWithReplicas(ctx context.Context, dp *appsv1.Deployment, newExpectedReplicas int32) (*appsv1.Deployment, error) { key := ObjectKeyFromObject(dp) updatedDp := &appsv1.Deployment{} immediate := true @@ -39,7 +42,7 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen return false, err } - if !IsDeploymentComplete(dp, &updatedDp.Status) { + if !IsDeploymentComplete(dp.Generation, &updatedDp.Status, newExpectedReplicas) { klog.Warningf("deployment %s not yet complete", key.String()) return false, nil } @@ -51,14 +54,28 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen } func areDeploymentReplicasAvailable(newStatus *appsv1.DeploymentStatus, replicas int32) bool { - return newStatus.UpdatedReplicas == replicas && - newStatus.Replicas == replicas && - newStatus.AvailableReplicas == replicas + if newStatus.Replicas != replicas { + klog.InfoS("newStatus.Replicas", "expected", replicas, "found", newStatus.Replicas) + return false + } + if newStatus.UpdatedReplicas != replicas { + klog.InfoS("newStatus.UpdatedReplicas", "expected", replicas, "found", newStatus.UpdatedReplicas) + return false + } + if newStatus.AvailableReplicas != replicas { + klog.InfoS("newStatus.AvailableReplicas", "expected", replicas, "found", newStatus.AvailableReplicas) + return false + } + return true } -func IsDeploymentComplete(dp *appsv1.Deployment, newStatus *appsv1.DeploymentStatus) bool { - return areDeploymentReplicasAvailable(newStatus, *(dp.Spec.Replicas)) && - newStatus.ObservedGeneration >= dp.Generation +func IsDeploymentComplete(oldGeneration int64, newStatus *appsv1.DeploymentStatus, expectedReplicas int32) bool { + if newStatus.ObservedGeneration < oldGeneration { + klog.InfoS("generation is older than expected to indicate the deployment is complete", "old", oldGeneration, "new", newStatus.ObservedGeneration) + return false + } + + return areDeploymentReplicasAvailable(newStatus, expectedReplicas) } func (wt Waiter) ForDeploymentReplicasCreation(ctx context.Context, dp *appsv1.Deployment, expectedReplicas int32) (*appsv1.Deployment, error) { diff --git a/pkg/objectupdate/sched/sched.go b/pkg/objectupdate/sched/sched.go index ff3c2fec64..388e168739 100644 --- a/pkg/objectupdate/sched/sched.go +++ b/pkg/objectupdate/sched/sched.go @@ -17,16 +17,13 @@ package sched import ( - "errors" "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" - "k8s.io/utils/ptr" "github.com/k8stopologyawareschedwg/deployer/pkg/flagcodec" k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests" @@ -106,52 +103,30 @@ func DeploymentTLSSettings(dp *appsv1.Deployment, tlsSettings objtls.Settings) e return nil } -// DeploymentAffinityWithStrategy configures required pod anti-affinity on the scheduler -// deployment only when the CR leaves replica count to autodetection (Replicas unset). -// An explicit non-nil Replicas value means the user chose a replica count and keeps -// full control (no required pod anti-affinity). -func DeploymentAffinityWithStrategy(dp *appsv1.Deployment, spec nropv1.NUMAResourcesSchedulerSpec) error { - if spec.Replicas != nil { - if dp.Spec.Template.Spec.Affinity != nil { - dp.Spec.Template.Spec.Affinity.PodAntiAffinity = nil - } - dp.Spec.Strategy = appsv1.DeploymentStrategy{} - return nil - } - +func DeploymentTopologySpreadConstraints(dp *appsv1.Deployment) error { labels := dp.Spec.Template.Labels if len(labels) == 0 { - return errors.New("no labels provided for PodAntiAffinity") - } - - if dp.Spec.Template.Spec.Affinity == nil { - dp.Spec.Template.Spec.Affinity = &corev1.Affinity{} + return fmt.Errorf("no labels found in deployment template") } - dp.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - TopologyKey: "kubernetes.io/hostname", - }, + schedConstr := corev1.TopologySpreadConstraint{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, }, - } - klog.V(3).InfoS("Scheduler Deployment affinity", "podAntiAffinity", dp.Spec.Template.Spec.Affinity.PodAntiAffinity.String()) - // NOTE: With replicas=1 and no PodAntiAffinity, the default strategy (surge 1, then remove old) - // works as expected — the new pod can schedule anywhere, no constraints block it. - dp.Spec.Strategy = appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, - RollingUpdate: &appsv1.RollingUpdateDeployment{ - MaxUnavailable: ptr.To(intstr.FromInt(1)), - MaxSurge: ptr.To(intstr.FromInt(0)), - }, - } - klog.V(3).InfoS("Scheduler Deployment Rollout Strategy", "rolloutStrategy", dp.Spec.Strategy.String()) + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + // the below label is set by the deployment controller and is needed for safe rollouts to allow ignoring + // the old replicaset and calculates the spread purely based on the new replicaset, ensuring the final + // state is perfectly balanced without stalling: + // https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template-hash-label + MatchLabelKeys: []string{"pod-template-hash"}, + } + + dp.Spec.Template.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{schedConstr} + klog.V(3).InfoS("scheduler deployment topology spread constraints", "constraints", schedConstr.String()) return nil } - func SchedulerConfig(cm *corev1.ConfigMap, name string, params *k8swgmanifests.ConfigParams) error { if cm.Data == nil { return fmt.Errorf("no data found in ConfigMap: %s/%s", cm.Namespace, cm.Name) diff --git a/pkg/objectupdate/sched/sched_test.go b/pkg/objectupdate/sched/sched_test.go index 7446fb45fd..a2e3a678c6 100644 --- a/pkg/objectupdate/sched/sched_test.go +++ b/pkg/objectupdate/sched/sched_test.go @@ -30,8 +30,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/utils/ptr" k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests" @@ -635,122 +633,40 @@ func TestDeploymentTLSSettingsRepeated(t *testing.T) { } } -func TestDeploymentAffinityWithStrategyNoLabels(t *testing.T) { +func TestDeploymentTopologySpreadConstraintsNoLabels(t *testing.T) { dp := dpMinimal.DeepCopy() - err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{}) - if err == nil { - t.Fatalf("expected error but received nil") + dp.Spec.Template.Labels = nil + if err := DeploymentTopologySpreadConstraints(dp); err == nil { + t.Fatalf("expected error but got nil") } -} -func TestDeploymentAffinityWithStrategyExplicitNonNilReplicasCount(t *testing.T) { - testcases := []struct { - name string - nonNilReplicasCount *int32 - }{ - { - name: "replicas count is positive int", - nonNilReplicasCount: ptr.To(int32(2)), - }, - { - name: "replicas count is zero", - nonNilReplicasCount: ptr.To(int32(0)), - }, - } - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - dp := dpMinimal.DeepCopy() - dp.Spec.Template.ObjectMeta.Labels = map[string]string{"app": "scheduler"} - initialNodeAffinity := &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Key: "node-role.kubernetes.io/worker", - Operator: corev1.NodeSelectorOpExists, - Values: []string{""}, - }, - }, - }, - }, - }, - } - dp.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: initialNodeAffinity, - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "scheduler"}, - }, - }, - }, - }, - } - err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: tc.nonNilReplicasCount}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - // check that the node affinity is preserved - if diff := cmp.Diff(initialNodeAffinity, dp.Spec.Template.Spec.Affinity.NodeAffinity); diff != "" { - t.Errorf("should preserve existing NodeAffinity: affinity mismatch (-expected +got):\n%s", diff) - } - if dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil { - t.Fatalf("expected podAntiAffinity to be reset but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity) - } - if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) { - t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy) - } - }) + dp.Spec.Template.Labels = map[string]string{} + if err := DeploymentTopologySpreadConstraints(dp); err == nil { + t.Fatalf("expected error but got nil") } } -func TestDeploymentAffinityWithStrategyWithOverride(t *testing.T) { - labels := map[string]string{"app": "scheduler"} - expectedStrategy := appsv1.DeploymentStrategy{ - Type: appsv1.RollingUpdateDeploymentStrategyType, - RollingUpdate: &appsv1.RollingUpdateDeployment{ - MaxUnavailable: ptr.To(intstr.FromInt(1)), - MaxSurge: ptr.To(intstr.FromInt(0)), - }, - } - expectedPodAntiAffinity := &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - } +func TestDeploymentTopologySpreadConstraints(t *testing.T) { dp := dpMinimal.DeepCopy() - dp.Spec.Template.ObjectMeta.Labels = labels - - err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if diff := cmp.Diff(expectedPodAntiAffinity, dp.Spec.Template.Spec.Affinity.PodAntiAffinity); diff != "" { - t.Errorf("affinity mismatch (-expected +got):\n%s", diff) - } - if diff := cmp.Diff(expectedStrategy, dp.Spec.Strategy); diff != "" { - t.Errorf("strategy mismatch (-expected +got):\n%s", diff) + dp.Spec.Template.Labels = map[string]string{ + "app": "numaresources-scheduler", } - - // check reset works - err = DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: ptr.To(int32(2))}) - if err != nil { + if err := DeploymentTopologySpreadConstraints(dp); err != nil { t.Fatalf("unexpected error: %v", err) } - - if dp.Spec.Template.Spec.Affinity != nil && dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil { - t.Fatalf("Override failed: expected no podAntiAffinity but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity) + expectedConstraints := []corev1.TopologySpreadConstraint{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: dp.Spec.Template.Labels, + }, + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + MatchLabelKeys: []string{"pod-template-hash"}, + }, } - if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) { - t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy) + if diff := cmp.Diff(dp.Spec.Template.Spec.TopologySpreadConstraints, expectedConstraints); diff != "" { + t.Errorf("constraints mismatch\n%s", diff) } } diff --git a/test/e2e/sched/sched_test.go b/test/e2e/sched/sched_test.go index c5c4a61ed7..914e45b989 100644 --- a/test/e2e/sched/sched_test.go +++ b/test/e2e/sched/sched_test.go @@ -26,10 +26,15 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + klog "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform" "github.com/k8stopologyawareschedwg/deployer/pkg/manifests" nropv1 "github.com/openshift-kni/numaresources-operator/api/v1" @@ -37,6 +42,8 @@ import ( "github.com/openshift-kni/numaresources-operator/internal/wait" schedstate "github.com/openshift-kni/numaresources-operator/pkg/numaresourcesscheduler/objectstate/sched" e2eclient "github.com/openshift-kni/numaresources-operator/test/internal/clients" + "github.com/openshift-kni/numaresources-operator/test/internal/configuration" + e2efixture "github.com/openshift-kni/numaresources-operator/test/internal/fixture" e2eimages "github.com/openshift-kni/numaresources-operator/test/internal/images" "github.com/openshift-kni/numaresources-operator/test/internal/objects" @@ -44,7 +51,7 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("[Scheduler] imageReplacement", func() { +var _ = Describe("[Scheduler] scheduler object updates", func() { var initialized bool nroSchedObj := &nropv1.NUMAResourcesScheduler{} BeforeEach(func() { @@ -54,6 +61,7 @@ var _ = Describe("[Scheduler] imageReplacement", func() { initialized = true nroSchedKey := objects.NROSchedObjectKey() Expect(e2eclient.Client.Get(context.TODO(), nroSchedKey, nroSchedObj)).ToNot(HaveOccurred(), "cannot get %q in the cluster", nroSchedKey.String()) + initialNroSchedObj := nroSchedObj.DeepCopy() DeferCleanup(func() { Eventually(func() error { @@ -62,14 +70,14 @@ var _ = Describe("[Scheduler] imageReplacement", func() { return err } - nroSchedObj.Spec = objects.TestNROScheduler().Spec + nroSchedObj.Spec = initialNroSchedObj.Spec return e2eclient.Client.Update(context.TODO(), nroSchedObj) }).Should(Succeed(), "failed to revert changes to %q during cleanup", nroSchedKey) dp, err := podlist.With(e2eclient.Client).DeploymentByOwnerReference(context.TODO(), nroSchedObj.UID) Expect(err).ToNot(HaveOccurred(), "unable to get deployment by owner reference") - _, err = wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentComplete(context.TODO(), dp) + _, err = wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(context.TODO(), dp, *dp.Spec.Replicas) Expect(err).ToNot(HaveOccurred()) }) }) @@ -124,7 +132,7 @@ var _ = Describe("[Scheduler] imageReplacement", func() { dp, err := podlist.With(e2eclient.Client).DeploymentByOwnerReference(context.TODO(), nroSchedObj.GetUID()) Expect(err).ToNot(HaveOccurred()) - _, err = wait.With(e2eclient.Client).Interval(30*time.Second).Timeout(2*time.Minute).ForDeploymentComplete(context.TODO(), dp) + _, err = wait.With(e2eclient.Client).Interval(30*time.Second).Timeout(2*time.Minute).ForDeploymentCompleteWithReplicas(context.TODO(), dp, *dp.Spec.Replicas) Expect(err).ToNot(HaveOccurred()) }) @@ -262,7 +270,7 @@ var _ = Describe("[Scheduler] imageReplacement", func() { dp, err := podlist.With(e2eclient.Client).DeploymentByOwnerReference(context.TODO(), nroSchedObj.UID) Expect(err).ToNot(HaveOccurred(), "unable to get deployment by owner reference") - dp, err = wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentComplete(context.TODO(), dp) + dp, err = wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(context.TODO(), dp, *dp.Spec.Replicas) Expect(err).ToNot(HaveOccurred()) podList, err = podlist.With(e2eclient.Client).ByDeployment(context.TODO(), *dp) @@ -273,4 +281,384 @@ var _ = Describe("[Scheduler] imageReplacement", func() { } }) }) + + When("testing deployment TopologySpreadConstraints", func() { + + var ( + nroSchedKey client.ObjectKey + autoDetectedReplicasCount *int32 + schedDp *appsv1.Deployment + ) + + BeforeEach(func(ctx context.Context) { + nroSchedKey = client.ObjectKeyFromObject(nroSchedObj) + Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + currentReplicas := nroSchedObj.Spec.Replicas + var err error + schedDp, err = podlist.With(e2eclient.Client).DeploymentByOwnerReference(ctx, nroSchedObj.UID) + Expect(err).ToNot(HaveOccurred()) + + if currentReplicas != nil { + By("compute expected autodetected replicas count") + var labelSelector map[string]string + switch configuration.Plat { + case platform.HyperShift: + labelSelector = map[string]string{ + "node-role.kubernetes.io/worker": "", + } + case platform.OpenShift: + labelSelector = map[string]string{ + "node-role.kubernetes.io/control-plane": "", + } + default: + Fail("unknown platform") + } + + nodeList := &corev1.NodeList{} + Expect(e2eclient.Client.List(ctx, nodeList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(labelSelector)})).To(Succeed()) + Expect(nodeList.Items).ToNot(BeEmpty(), "no nodes found with control-plane or worker role") + expectedAutodetectedCount := int32(len(nodeList.Items)) + if expectedAutodetectedCount > 3 { + // this is how it is capped in the controller + expectedAutodetectedCount = 3 + } + + e2efixture.By("configure NRS replicas for autodetection: current=%d desired=nil", *currentReplicas) + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = nil + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with autodetection mode for replicas") + + schedDp, err = wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, expectedAutodetectedCount) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + + DeferCleanup(func(ctx context.Context) { + e2efixture.By("revert NRS replicas to %d", *currentReplicas) + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = currentReplicas + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with autodetection mode for replicas") + schedDp, err := wait.With(e2eclient.Client).Timeout(5*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, *currentReplicas) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + }) + } + + expectedTopologySpreadConstraints := []corev1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + MatchLabelKeys: []string{"pod-template-hash"}, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: schedDp.Spec.Template.Labels, + }, + }, + } + + By("verify spread constraints are set by default") + Expect(schedDp.Spec.Template.Spec.TopologySpreadConstraints).To(Equal(expectedTopologySpreadConstraints), "topology spread constraints mismatch") + + autoDetectedReplicasCount = schedDp.Spec.Replicas + Expect(autoDetectedReplicasCount).ToNot(BeNil()) // must never happen + }) + + It("should have each replica running on a different node", func(ctx context.Context) { + pods, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + Expect(err).NotTo(HaveOccurred()) + Expect(pods).To(HaveLen(int(*autoDetectedReplicasCount))) + + nodeNames := sets.New[string]() + for _, pod := range pods { + Expect(pod.Status.Phase).To(Equal(corev1.PodRunning)) + nodeNames.Insert(pod.Spec.NodeName) + } + Expect(nodeNames).To(HaveLen(len(pods)), "expected each replica on a different node") + }) + + It("should allow updating replicas to 0 and properly switch back to autodetection mode", func(ctx context.Context) { + By("update NRS to use explicit 0 replicas") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = ptr.To[int32](0) + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with explicit replicas count") + + schedDp, err := wait.With(e2eclient.Client).Timeout(10*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, 0) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + + By("verify the running pods are scaled down to 0") + Eventually(func(g Gomega) { + updatedPodListExplicit, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + g.Expect(err).NotTo(HaveOccurred()) + runningPods := make([]*corev1.Pod, 0, len(updatedPodListExplicit)) + for _, pod := range updatedPodListExplicit { + if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp == nil { + runningPods = append(runningPods, &pod) + } + } + g.Expect(runningPods).To(BeEmpty(), "expected 0 running pods, got %d", len(runningPods)) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the expected number of pods running") + + By("switch back to autodetection mode and verify pods are running") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = nil + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with autodetection mode for replicas") + + schedDp, err = wait.With(e2eclient.Client).Timeout(10*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, *autoDetectedReplicasCount) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + + By("verifying new pods are created and running") + Eventually(func(g Gomega) { + updatedPodListAutodetection, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + g.Expect(err).NotTo(HaveOccurred()) + for _, pod := range updatedPodListAutodetection { + g.Expect(pod.Status.Phase).To(Equal(corev1.PodRunning)) + } + g.Expect(updatedPodListAutodetection).To(HaveLen(int(*autoDetectedReplicasCount)), "expected %d new pods, got %d", *autoDetectedReplicasCount, len(updatedPodListAutodetection)) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the expected number of pods running") + }) + + It("should allow updating replicas and keep deployment the same when explicit replicas count set similar to autodetected count", func(ctx context.Context) { + newCount := *autoDetectedReplicasCount + By(fmt.Sprintf("update NRS to use explicit %d replicas", newCount)) + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = &newCount + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with explicit replicas count") + + By("wait enough to ensure the deployment is kept the same") + Consistently(func(g Gomega) { + schedDpPostUpdate, err := podlist.With(e2eclient.Client).DeploymentByOwnerReference(ctx, nroSchedObj.UID) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(schedDpPostUpdate.UID).To(Equal(schedDp.UID), "deployment UID mismatch") + g.Expect(schedDpPostUpdate.Generation).To(Equal(schedDp.Generation), "deployment generation mismatch") + }).WithTimeout(2*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to verify the deployment is kept the same") + + By("switch back to autodetection mode") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = nil + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with autodetection mode for replicas") + + By("wait enough to ensure the deployment is kept the same") + Consistently(func(g Gomega) { + schedDpPostUpdate, err := podlist.With(e2eclient.Client).DeploymentByOwnerReference(ctx, nroSchedObj.UID) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(schedDpPostUpdate.UID).To(Equal(schedDp.UID), "deployment UID mismatch") + g.Expect(schedDpPostUpdate.Generation).To(Equal(schedDp.Generation), "deployment generation mismatch") + }).WithTimeout(2*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to verify the deployment is kept the same") + }) + + It("should allow updating replicas to 1 and have pods running properly", func(ctx context.Context) { + if *autoDetectedReplicasCount == 1 { + Skip("skipping test for single replica deployment") + } + + By("update NRS to use explicit 1 replicas") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = ptr.To[int32](1) + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with explicit replicas count") + + schedDp, err := wait.With(e2eclient.Client).Timeout(10*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, 1) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + + By("verify the running pods are scaled down to 1") + Eventually(func(g Gomega) { + updatedPodListExplicit, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(updatedPodListExplicit).To(HaveLen(1), "expected 1 pod, got %d", len(updatedPodListExplicit)) + By("verifying the pod is running") + g.Expect(updatedPodListExplicit[0].Status.Phase).To(Equal(corev1.PodRunning)) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the expected number of pods running") + + By("switch back to autodetection mode") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, nroSchedKey, nroSchedObj)).To(Succeed()) + nroSchedObj.Spec.Replicas = nil + g.Expect(e2eclient.Client.Update(ctx, nroSchedObj)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to update NRS with autodetection mode for replicas") + + schedDp, err = wait.With(e2eclient.Client).Timeout(10*time.Minute).Interval(10*time.Second).ForDeploymentCompleteWithReplicas(ctx, schedDp, *autoDetectedReplicasCount) + if err != nil { + infoUponDeploymentCompleteFailure(ctx, schedDp) + } + Expect(err).ToNot(HaveOccurred()) + + By("verifying new pods are created and running") + Eventually(func(g Gomega) { + newPods, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + g.Expect(err).NotTo(HaveOccurred()) + for _, pod := range newPods { + g.Expect(pod.Status.Phase).To(Equal(corev1.PodRunning)) + } + g.Expect(newPods).To(HaveLen(int(*autoDetectedReplicasCount)), "expected %d new pods, got %d", *autoDetectedReplicasCount, len(newPods)) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the expected number of pods running") + }) + + When("one host of a scheduler pod is down", func() { + It("should keep the associated pod pending when deleted for balanced spread", func(ctx context.Context) { + var targetNodeName string + var targetPod *corev1.Pod + By("verify pods are spread across nodes") + // the deployment has a soft node affinity to pin the pods to control-plane nodes, thus if a pod landed + // on a worker node that is also valid. Hence we only check here that the hosts are different. + pods, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + Expect(err).NotTo(HaveOccurred()) + Expect(pods).To(HaveLen(int(*autoDetectedReplicasCount)), "expected %d pods, got %d", *autoDetectedReplicasCount, len(pods)) + nodeNames := sets.New[string]() + for idx, pod := range pods { + Expect(pod.Status.Phase).To(Equal(corev1.PodRunning), "pod %s/%s is not running", pod.Namespace, pod.Name) + nodeNames.Insert(pod.Spec.NodeName) + klog.InfoS("running pod", "namespace", pod.Namespace, "name", pod.Name, "nodeName", pod.Spec.NodeName) + // any sample pod is fine as target + if idx == 0 { + targetNodeName = pod.Spec.NodeName + targetPod = &pod + } + } + Expect(nodeNames).To(HaveLen(int(*autoDetectedReplicasCount)), "expected unique host per pod") + + // remove the target node to know what nodes to keep untouched + nodeNames.Delete(targetNodeName) + + // for compact cluster, it's enough to mark only one of the nodes as unschedulable to get a pending pod. + // however, for multi-node cluster, we need to mark all nodes except for the ones who have a scheduler pod -1. + // the reason for that is because the NodeAffinity to pin the pod to control-plane is best effort, if no + // control-plane is found fit, the scheduler will try to schedule the pod on other workernodes. + e2efixture.By("list all cluster nodes and mark them unschedulable except for %d nodes who have a scheduler pod", len(nodeNames)) + allNodes := &corev1.NodeList{} + Expect(e2eclient.Client.List(ctx, allNodes)).To(Succeed()) + nodesToUpdate := sets.New[string]() + for _, node := range allNodes.Items { + if nodeNames.Has(node.Name) { + continue + } + + if node.Spec.Unschedulable { + continue + } + + e2efixture.By("cordon node %s", node.Name) + Eventually(func(g Gomega) { + var nd corev1.Node + g.Expect(e2eclient.Client.Get(ctx, client.ObjectKey{Name: node.Name}, &nd)).To(Succeed()) + nd.Spec.Unschedulable = true + g.Expect(e2eclient.Client.Update(ctx, &nd)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to cordon node %s", node.Name) + + nodesToUpdate.Insert(node.Name) + } + + DeferCleanup(func(ctx context.Context) { + if nodesToUpdate.Len() == 0 { + return + } + + By("REVERT: uncordon nodes") + for nodeName := range nodesToUpdate { + klog.InfoS("uncordoning node", "nodeName", nodeName) + var node corev1.Node + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, client.ObjectKey{Name: nodeName}, &node)).To(Succeed()) + node.Spec.Unschedulable = false + g.Expect(e2eclient.Client.Update(ctx, &node)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to uncordon node %s", nodeName) + nodesToUpdate.Delete(nodeName) + } + }) + + e2efixture.By("delete pod %s/%s to trigger its recreation", targetPod.Namespace, targetPod.Name) + Expect(e2eclient.Client.Delete(ctx, targetPod)).To(Succeed()) + Expect(wait.With(e2eclient.Client).Timeout(5*time.Minute).ForPodDeleted(ctx, targetPod.Namespace, targetPod.Name)).To(Succeed()) + + By("fetch the recreated pod") + var recreatedPod *corev1.Pod + Eventually(func(g Gomega) { + recreatedPod = nil + newPods, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newPods).To(HaveLen(int(*autoDetectedReplicasCount)), "expected %d pods, got %d", *autoDetectedReplicasCount, len(newPods)) + for _, pod := range newPods { + if pod.Status.Phase == corev1.PodPending { + // this is for sure the pod that was recreated because we know for sure there are exactly + // len(control-plane) pods that were running and each is on a different node, and only one was deleted + // after node cordoning + recreatedPod = &pod + break + } + } + g.Expect(recreatedPod).ToNot(BeNil(), "failed to find recreated pod") + klog.InfoS("recreated pod", "namespace", recreatedPod.Namespace, "name", recreatedPod.Name) + }).WithTimeout(20*time.Second).WithPolling(2*time.Second).Should(Succeed(), "failed to have the expected number of pods running") + + By("wait enough to ensure the pod keeps pending as long as there is no schedulable node") + Consistently(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, client.ObjectKey{Namespace: recreatedPod.Namespace, Name: recreatedPod.Name}, recreatedPod)).To(Succeed()) + g.Expect(recreatedPod.Status.Phase).To(Equal(corev1.PodPending), "pod %s/%s is not pending", recreatedPod.Namespace, recreatedPod.Name) + }).WithTimeout(2*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the pod pending") + + Expect(recreatedPod.Status.Phase).To(Equal(corev1.PodPending), "pod %s/%s is not pending", recreatedPod.Namespace, recreatedPod.Name) + Expect(recreatedPod.Status.Conditions).To(HaveLen(1), "pod %s/%s should have only one condition", recreatedPod.Namespace, recreatedPod.Name) + Expect(recreatedPod.Status.Conditions[0].Type).To(Equal(corev1.PodScheduled)) + Expect(recreatedPod.Status.Conditions[0].Status).To(Equal(corev1.ConditionFalse)) + Expect(recreatedPod.Status.Conditions[0].Reason).To(Equal("Unschedulable")) + occupiedNodes := *autoDetectedReplicasCount - 1 + Expect(recreatedPod.Status.Conditions[0].Message).To(ContainSubstring(fmt.Sprintf("%d node(s) didn't match pod topology spread constraints", occupiedNodes))) + + By("uncordon the nodes") + for nodeName := range nodesToUpdate { + klog.InfoS("uncordoning node", "nodeName", nodeName) + var node corev1.Node + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, client.ObjectKey{Name: nodeName}, &node)).To(Succeed()) + node.Spec.Unschedulable = false + g.Expect(e2eclient.Client.Update(ctx, &node)).To(Succeed()) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to uncordon node %s", nodeName) + nodesToUpdate.Delete(nodeName) + } + + By("verify the pod goes back to running") + Eventually(func(g Gomega) { + g.Expect(e2eclient.Client.Get(ctx, client.ObjectKey{Namespace: recreatedPod.Namespace, Name: recreatedPod.Name}, recreatedPod)).To(Succeed()) + g.Expect(recreatedPod.Status.Phase).To(Equal(corev1.PodRunning), "pod %s/%s is not running", recreatedPod.Namespace, recreatedPod.Name) + }).WithTimeout(5*time.Minute).WithPolling(10*time.Second).Should(Succeed(), "failed to have the pod running") + }) + }) + }) }) + +func infoUponDeploymentCompleteFailure(ctx context.Context, schedDp *appsv1.Deployment) { + klog.InfoS("deployment status", "status", schedDp.Status) + pods, err := podlist.With(e2eclient.Client).ByDeployment(ctx, *schedDp) + if err != nil { + klog.ErrorS(err, "failed to list deployment pods", "namespace", schedDp.Namespace, "name", schedDp.Name) + return + } + + for _, pod := range pods { + klog.InfoS("pod status", "podName", pod.Name, "podStatus", pod.Status.String()) + } +} diff --git a/test/e2e/serial/tests/configuration.go b/test/e2e/serial/tests/configuration.go index abcf65ccf3..995e224b97 100644 --- a/test/e2e/serial/tests/configuration.go +++ b/test/e2e/serial/tests/configuration.go @@ -1000,7 +1000,7 @@ var _ = Describe("[serial][disruptive] numaresources configuration management", e2efixture.By("ensuring the deployment %q keep being pending %d/%d", deployment.Name, step+1, maxStep) err = fxt.Client.Get(ctx, client.ObjectKeyFromObject(deployment), &updatedDeployment) Expect(err).ToNot(HaveOccurred()) - Expect(wait.IsDeploymentComplete(deployment, &updatedDeployment.Status)).To(BeFalse(), "deployment %q become ready", deployment.Name) + Expect(wait.IsDeploymentComplete(deployment.Generation, &updatedDeployment.Status, *(deployment.Spec.Replicas))).To(BeFalse(), "deployment %q become ready", deployment.Name) } By("checking the deployment pod has failed scheduling and its at the pending status") diff --git a/test/e2e/serial/tests/scheduler_removal.go b/test/e2e/serial/tests/scheduler_removal.go index 8875f0dcbb..2c00b46588 100644 --- a/test/e2e/serial/tests/scheduler_removal.go +++ b/test/e2e/serial/tests/scheduler_removal.go @@ -95,7 +95,7 @@ var _ = Describe("[serial][disruptive][scheduler][schedrst] numaresources schedu err = fxt.Client.Get(context.TODO(), client.ObjectKeyFromObject(dp), updatedDp) Expect(err).ToNot(HaveOccurred()) - Expect(wait.IsDeploymentComplete(dp, &updatedDp.Status)).To(BeTrue(), "deployment %q become unready", dp.Name) + Expect(wait.IsDeploymentComplete(dp.Generation, &updatedDp.Status, *dp.Spec.Replicas)).To(BeTrue(), "deployment %q become unready", dp.Name) } }) @@ -122,7 +122,7 @@ var _ = Describe("[serial][disruptive][scheduler][schedrst] numaresources schedu err = fxt.Client.Get(context.TODO(), client.ObjectKeyFromObject(dp), updatedDp) Expect(err).ToNot(HaveOccurred()) - Expect(wait.IsDeploymentComplete(dp, &updatedDp.Status)).To(BeFalse(), "deployment %q become ready", dp.Name) + Expect(wait.IsDeploymentComplete(dp.Generation, &updatedDp.Status, *dp.Spec.Replicas)).To(BeFalse(), "deployment %q become ready", dp.Name) } }) }) @@ -153,7 +153,7 @@ var _ = Describe("[serial][disruptive][scheduler][schedrst] numaresources schedu err = fxt.Client.Get(ctx, client.ObjectKeyFromObject(dp), updatedDp) Expect(err).ToNot(HaveOccurred()) - Expect(wait.IsDeploymentComplete(dp, &updatedDp.Status)).To(BeFalse(), "deployment %q become ready", dp.Name) + Expect(wait.IsDeploymentComplete(dp.Generation, &updatedDp.Status, *dp.Spec.Replicas)).To(BeFalse(), "deployment %q become ready", dp.Name) } restoreScheduler(fxt, nroSchedObj) diff --git a/test/e2e/serial/tests/workload_placement_nodelabel.go b/test/e2e/serial/tests/workload_placement_nodelabel.go index 90444890a8..e24e0aa16e 100644 --- a/test/e2e/serial/tests/workload_placement_nodelabel.go +++ b/test/e2e/serial/tests/workload_placement_nodelabel.go @@ -347,7 +347,7 @@ var _ = Describe("[serial][disruptive][scheduler] numaresources workload placeme updatedDp := &appsv1.Deployment{} err := fxt.Client.Get(context.TODO(), client.ObjectKeyFromObject(deployment), updatedDp) Expect(err).ToNot(HaveOccurred()) - return wait.IsDeploymentComplete(deployment, &updatedDp.Status) + return wait.IsDeploymentComplete(deployment.Generation, &updatedDp.Status, *deployment.Spec.Replicas) }).WithTimeout(time.Second*30).WithPolling(time.Second*5).Should(BeTrue(), "deployment %q became unready", deployment.Name) }, Entry("[test_id:47597] should be able to schedule pod with affinity property requiredDuringSchedulingIgnoredDuringExecution on the available node with feasible numa zone", createNodeAffinityRequiredDuringSchedulingIgnoredDuringExecution),