Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions internal/controller/numaresourcesscheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,6 @@ func isDeploymentRunning(ctx context.Context, c client.Client, key nropv1.Namesp
}

func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context.Context, schedSpec nropv1.NUMAResourcesSchedulerSpec) (*int32, error) {
// do not autodetect if explicitly set by the user
if schedSpec.Replicas != nil {
return schedSpec.Replicas, nil
}

var labelSelector map[string]string
var nodeRoleDescription string

Expand Down Expand Up @@ -276,6 +271,14 @@ func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context.

replicaCount := int32(len(nodeList.Items))

if schedSpec.Replicas != nil {
// should not allow more replicas than max found nodes
if *schedSpec.Replicas > replicaCount {
return nil, fmt.Errorf("explicitly set replicas count should not be greater than the number of the nodes with role %s", nodeRoleDescription)
}
return schedSpec.Replicas, nil
}
Comment thread
shajmakh marked this conversation as resolved.

// check if no nodes are found for the target role
if replicaCount == 0 {
return nil, fmt.Errorf("failed to compute scheduler replicas: no %s nodes found for platform %s", nodeRoleDescription, r.PlatformInfo.Platform)
Expand Down Expand Up @@ -361,7 +364,7 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex
return nropv1.NUMAResourcesSchedulerStatus{}, err
}

if err := schedupdate.DeploymentAffinityWithStrategy(r.SchedulerManifests.Deployment, instance.Spec); err != nil {
if err := schedupdate.DeploymentTopologySpreadConstraints(r.SchedulerManifests.Deployment); err != nil {
return nropv1.NUMAResourcesSchedulerStatus{}, err
}

Expand Down
363 changes: 159 additions & 204 deletions internal/controller/numaresourcesscheduler_controller_test.go

Large diffs are not rendered by default.

39 changes: 28 additions & 11 deletions internal/wait/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import (
"k8s.io/klog/v2"
)

// ForDeploymentComplete waits for the deployment to be complete with the number of replicas specified in the deployment spec.
func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deployment) (*appsv1.Deployment, error) {
// This function waits for the readiness of the pods under the deployment. The best use of this check is for
// completely new deployments. If the deployment exists on the cluster and simply updated, this check is
// not enough to guarantee that the deployment is ready with the NEW replica, thus need to cover that by
// additional checks as the context requires
return wt.ForDeploymentCompleteWithReplicas(ctx, dp, *(dp.Spec.Replicas))
}

// ForDeploymentCompleteWithReplicas waits for the deployment to be complete and have the specified number of replicas.
// Use this function when the deployment is expected to have new number of replicas after an update.
func (wt Waiter) ForDeploymentCompleteWithReplicas(ctx context.Context, dp *appsv1.Deployment, newExpectedReplicas int32) (*appsv1.Deployment, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good addition. If most of the codebase actually uses the pattern

ForDeploymentCompleteWithReplicas(ctx, dp, *dp.Spec.Replicas)

and at glance it seems the case, it's worth keeping the old API ForDeploymentComplete

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally updated the codebase because ForDeploymentComplete without having the correct expected replicas count does not sound complete and can be confusing. Internally in the function, replicas check is a core one and in order to avoid using the incorrect function for deployment waiter I overrode the old API. This was actually an old bug as noted in the omitted comment; the new version is expected to handle all deployments now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so let's just rename ForDeploymentCompleteWithReplicas to ForDeploymentComplete

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the latest push I restored ForDeploymentComplete to keep the diff as less as possible to reduce conflicts while backporting. I can address changing the whole code base in a followup PR including #4177 (comment)

key := ObjectKeyFromObject(dp)
updatedDp := &appsv1.Deployment{}
immediate := true
Expand All @@ -39,7 +42,7 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen
return false, err
}

if !IsDeploymentComplete(dp, &updatedDp.Status) {
if !IsDeploymentComplete(dp.Generation, &updatedDp.Status, newExpectedReplicas) {
klog.Warningf("deployment %s not yet complete", key.String())
return false, nil
}
Expand All @@ -51,14 +54,28 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen
}

func areDeploymentReplicasAvailable(newStatus *appsv1.DeploymentStatus, replicas int32) bool {
return newStatus.UpdatedReplicas == replicas &&
newStatus.Replicas == replicas &&
newStatus.AvailableReplicas == replicas
if newStatus.Replicas != replicas {
klog.InfoS("newStatus.Replicas", "expected", replicas, "found", newStatus.Replicas)
return false
}
if newStatus.UpdatedReplicas != replicas {
klog.InfoS("newStatus.UpdatedReplicas", "expected", replicas, "found", newStatus.UpdatedReplicas)
return false
}
if newStatus.AvailableReplicas != replicas {
klog.InfoS("newStatus.AvailableReplicas", "expected", replicas, "found", newStatus.AvailableReplicas)
return false
}
return true
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func IsDeploymentComplete(dp *appsv1.Deployment, newStatus *appsv1.DeploymentStatus) bool {
return areDeploymentReplicasAvailable(newStatus, *(dp.Spec.Replicas)) &&
newStatus.ObservedGeneration >= dp.Generation
func IsDeploymentComplete(oldGeneration int64, newStatus *appsv1.DeploymentStatus, expectedReplicas int32) bool {
Comment thread
ffromani marked this conversation as resolved.
if newStatus.ObservedGeneration < oldGeneration {
klog.InfoS("generation is older than expected to indicate the deployment is complete", "old", oldGeneration, "new", newStatus.ObservedGeneration)
return false
}

return areDeploymentReplicasAvailable(newStatus, expectedReplicas)
}

func (wt Waiter) ForDeploymentReplicasCreation(ctx context.Context, dp *appsv1.Deployment, expectedReplicas int32) (*appsv1.Deployment, error) {
Expand Down
59 changes: 17 additions & 42 deletions pkg/objectupdate/sched/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package sched

import (
"errors"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/k8stopologyawareschedwg/deployer/pkg/flagcodec"
k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
Expand Down Expand Up @@ -106,52 +103,30 @@ func DeploymentTLSSettings(dp *appsv1.Deployment, tlsSettings objtls.Settings) e
return nil
}

// DeploymentAffinityWithStrategy configures required pod anti-affinity on the scheduler
// deployment only when the CR leaves replica count to autodetection (Replicas unset).
// An explicit non-nil Replicas value means the user chose a replica count and keeps
// full control (no required pod anti-affinity).
func DeploymentAffinityWithStrategy(dp *appsv1.Deployment, spec nropv1.NUMAResourcesSchedulerSpec) error {
if spec.Replicas != nil {
if dp.Spec.Template.Spec.Affinity != nil {
dp.Spec.Template.Spec.Affinity.PodAntiAffinity = nil
}
dp.Spec.Strategy = appsv1.DeploymentStrategy{}
return nil
}

func DeploymentTopologySpreadConstraints(dp *appsv1.Deployment) error {
labels := dp.Spec.Template.Labels
if len(labels) == 0 {
return errors.New("no labels provided for PodAntiAffinity")
}

if dp.Spec.Template.Spec.Affinity == nil {
dp.Spec.Template.Spec.Affinity = &corev1.Affinity{}
return fmt.Errorf("no labels found in deployment template")
}
Comment thread
shajmakh marked this conversation as resolved.

dp.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
schedConstr := corev1.TopologySpreadConstraint{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
}
klog.V(3).InfoS("Scheduler Deployment affinity", "podAntiAffinity", dp.Spec.Template.Spec.Affinity.PodAntiAffinity.String())
// NOTE: With replicas=1 and no PodAntiAffinity, the default strategy (surge 1, then remove old)
// works as expected — the new pod can schedule anywhere, no constraints block it.
dp.Spec.Strategy = appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxUnavailable: ptr.To(intstr.FromInt(1)),
MaxSurge: ptr.To(intstr.FromInt(0)),
},
}
klog.V(3).InfoS("Scheduler Deployment Rollout Strategy", "rolloutStrategy", dp.Spec.Strategy.String())
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.DoNotSchedule,
// the below label is set by the deployment controller and is needed for safe rollouts to allow ignoring
// the old replicaset and calculates the spread purely based on the new replicaset, ensuring the final
// state is perfectly balanced without stalling:
// https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template-hash-label
MatchLabelKeys: []string{"pod-template-hash"},
Comment thread
shajmakh marked this conversation as resolved.
}

dp.Spec.Template.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{schedConstr}
klog.V(3).InfoS("scheduler deployment topology spread constraints", "constraints", schedConstr.String())
return nil
}

func SchedulerConfig(cm *corev1.ConfigMap, name string, params *k8swgmanifests.ConfigParams) error {
if cm.Data == nil {
return fmt.Errorf("no data found in ConfigMap: %s/%s", cm.Namespace, cm.Name)
Expand Down
130 changes: 23 additions & 107 deletions pkg/objectupdate/sched/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"

k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"

Expand Down Expand Up @@ -635,122 +633,40 @@ func TestDeploymentTLSSettingsRepeated(t *testing.T) {
}
}

func TestDeploymentAffinityWithStrategyNoLabels(t *testing.T) {
func TestDeploymentTopologySpreadConstraintsNoLabels(t *testing.T) {
dp := dpMinimal.DeepCopy()
err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{})
if err == nil {
t.Fatalf("expected error but received nil")
dp.Spec.Template.Labels = nil
if err := DeploymentTopologySpreadConstraints(dp); err == nil {
t.Fatalf("expected error but got nil")
}
}

func TestDeploymentAffinityWithStrategyExplicitNonNilReplicasCount(t *testing.T) {
testcases := []struct {
name string
nonNilReplicasCount *int32
}{
{
name: "replicas count is positive int",
nonNilReplicasCount: ptr.To(int32(2)),
},
{
name: "replicas count is zero",
nonNilReplicasCount: ptr.To(int32(0)),
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dp := dpMinimal.DeepCopy()
dp.Spec.Template.ObjectMeta.Labels = map[string]string{"app": "scheduler"}
initialNodeAffinity := &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-role.kubernetes.io/worker",
Operator: corev1.NodeSelectorOpExists,
Values: []string{""},
},
},
},
},
},
}
dp.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: initialNodeAffinity,
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "scheduler"},
},
},
},
},
}
err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: tc.nonNilReplicasCount})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// check that the node affinity is preserved
if diff := cmp.Diff(initialNodeAffinity, dp.Spec.Template.Spec.Affinity.NodeAffinity); diff != "" {
t.Errorf("should preserve existing NodeAffinity: affinity mismatch (-expected +got):\n%s", diff)
}
if dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil {
t.Fatalf("expected podAntiAffinity to be reset but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity)
}
if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) {
t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy)
}
})
dp.Spec.Template.Labels = map[string]string{}
if err := DeploymentTopologySpreadConstraints(dp); err == nil {
t.Fatalf("expected error but got nil")
}
}

func TestDeploymentAffinityWithStrategyWithOverride(t *testing.T) {
labels := map[string]string{"app": "scheduler"}
expectedStrategy := appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxUnavailable: ptr.To(intstr.FromInt(1)),
MaxSurge: ptr.To(intstr.FromInt(0)),
},
}
expectedPodAntiAffinity := &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
}
func TestDeploymentTopologySpreadConstraints(t *testing.T) {
dp := dpMinimal.DeepCopy()
dp.Spec.Template.ObjectMeta.Labels = labels

err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if diff := cmp.Diff(expectedPodAntiAffinity, dp.Spec.Template.Spec.Affinity.PodAntiAffinity); diff != "" {
t.Errorf("affinity mismatch (-expected +got):\n%s", diff)
}
if diff := cmp.Diff(expectedStrategy, dp.Spec.Strategy); diff != "" {
t.Errorf("strategy mismatch (-expected +got):\n%s", diff)
dp.Spec.Template.Labels = map[string]string{
"app": "numaresources-scheduler",
}

// check reset works
err = DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: ptr.To(int32(2))})
if err != nil {
if err := DeploymentTopologySpreadConstraints(dp); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if dp.Spec.Template.Spec.Affinity != nil && dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil {
t.Fatalf("Override failed: expected no podAntiAffinity but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity)
expectedConstraints := []corev1.TopologySpreadConstraint{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: dp.Spec.Template.Labels,
},
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.DoNotSchedule,
MatchLabelKeys: []string{"pod-template-hash"},
},
}
if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) {
t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy)
if diff := cmp.Diff(dp.Spec.Template.Spec.TopologySpreadConstraints, expectedConstraints); diff != "" {
t.Errorf("constraints mismatch\n%s", diff)
}
}

Expand Down
Loading
Loading