Skip to content

Commit 2f6eb77

Browse files
Merge pull request #4177 from shajmakh/tsc-for-ha-sched
OCPBUGS-79418: ctrl: sched: add topologySpreadConstraints to deployment
2 parents 117a8b7 + f30a924 commit 2f6eb77

9 files changed

Lines changed: 634 additions & 380 deletions

File tree

internal/controller/numaresourcesscheduler_controller.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,6 @@ func isDeploymentRunning(ctx context.Context, c client.Client, key nropv1.Namesp
244244
}
245245

246246
func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context.Context, schedSpec nropv1.NUMAResourcesSchedulerSpec) (*int32, error) {
247-
// do not autodetect if explicitly set by the user
248-
if schedSpec.Replicas != nil {
249-
return schedSpec.Replicas, nil
250-
}
251-
252247
var labelSelector map[string]string
253248
var nodeRoleDescription string
254249

@@ -276,6 +271,14 @@ func (r *NUMAResourcesSchedulerReconciler) computeSchedulerReplicas(ctx context.
276271

277272
replicaCount := int32(len(nodeList.Items))
278273

274+
if schedSpec.Replicas != nil {
275+
// should not allow more replicas than max found nodes
276+
if *schedSpec.Replicas > replicaCount {
277+
return nil, fmt.Errorf("explicitly set replicas count should not be greater than the number of the nodes with role %s", nodeRoleDescription)
278+
}
279+
return schedSpec.Replicas, nil
280+
}
281+
279282
// check if no nodes are found for the target role
280283
if replicaCount == 0 {
281284
return nil, fmt.Errorf("failed to compute scheduler replicas: no %s nodes found for platform %s", nodeRoleDescription, r.PlatformInfo.Platform)
@@ -361,7 +364,7 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex
361364
return nropv1.NUMAResourcesSchedulerStatus{}, err
362365
}
363366

364-
if err := schedupdate.DeploymentAffinityWithStrategy(r.SchedulerManifests.Deployment, instance.Spec); err != nil {
367+
if err := schedupdate.DeploymentTopologySpreadConstraints(r.SchedulerManifests.Deployment); err != nil {
365368
return nropv1.NUMAResourcesSchedulerStatus{}, err
366369
}
367370

internal/controller/numaresourcesscheduler_controller_test.go

Lines changed: 159 additions & 204 deletions
Large diffs are not rendered by default.

internal/wait/deployment.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ import (
2424
"k8s.io/klog/v2"
2525
)
2626

27+
// ForDeploymentComplete waits for the deployment to be complete with the number of replicas specified in the deployment spec.
2728
func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deployment) (*appsv1.Deployment, error) {
28-
// This function waits for the readiness of the pods under the deployment. The best use of this check is for
29-
// completely new deployments. If the deployment exists on the cluster and simply updated, this check is
30-
// not enough to guarantee that the deployment is ready with the NEW replica, thus need to cover that by
31-
// additional checks as the context requires
29+
return wt.ForDeploymentCompleteWithReplicas(ctx, dp, *(dp.Spec.Replicas))
30+
}
31+
32+
// ForDeploymentCompleteWithReplicas waits for the deployment to be complete and have the specified number of replicas.
33+
// Use this function when the deployment is expected to have new number of replicas after an update.
34+
func (wt Waiter) ForDeploymentCompleteWithReplicas(ctx context.Context, dp *appsv1.Deployment, newExpectedReplicas int32) (*appsv1.Deployment, error) {
3235
key := ObjectKeyFromObject(dp)
3336
updatedDp := &appsv1.Deployment{}
3437
immediate := true
@@ -39,7 +42,7 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen
3942
return false, err
4043
}
4144

42-
if !IsDeploymentComplete(dp, &updatedDp.Status) {
45+
if !IsDeploymentComplete(dp.Generation, &updatedDp.Status, newExpectedReplicas) {
4346
klog.Warningf("deployment %s not yet complete", key.String())
4447
return false, nil
4548
}
@@ -51,14 +54,28 @@ func (wt Waiter) ForDeploymentComplete(ctx context.Context, dp *appsv1.Deploymen
5154
}
5255

5356
func areDeploymentReplicasAvailable(newStatus *appsv1.DeploymentStatus, replicas int32) bool {
54-
return newStatus.UpdatedReplicas == replicas &&
55-
newStatus.Replicas == replicas &&
56-
newStatus.AvailableReplicas == replicas
57+
if newStatus.Replicas != replicas {
58+
klog.InfoS("newStatus.Replicas", "expected", replicas, "found", newStatus.Replicas)
59+
return false
60+
}
61+
if newStatus.UpdatedReplicas != replicas {
62+
klog.InfoS("newStatus.UpdatedReplicas", "expected", replicas, "found", newStatus.UpdatedReplicas)
63+
return false
64+
}
65+
if newStatus.AvailableReplicas != replicas {
66+
klog.InfoS("newStatus.AvailableReplicas", "expected", replicas, "found", newStatus.AvailableReplicas)
67+
return false
68+
}
69+
return true
5770
}
5871

59-
func IsDeploymentComplete(dp *appsv1.Deployment, newStatus *appsv1.DeploymentStatus) bool {
60-
return areDeploymentReplicasAvailable(newStatus, *(dp.Spec.Replicas)) &&
61-
newStatus.ObservedGeneration >= dp.Generation
72+
func IsDeploymentComplete(oldGeneration int64, newStatus *appsv1.DeploymentStatus, expectedReplicas int32) bool {
73+
if newStatus.ObservedGeneration < oldGeneration {
74+
klog.InfoS("generation is older than expected to indicate the deployment is complete", "old", oldGeneration, "new", newStatus.ObservedGeneration)
75+
return false
76+
}
77+
78+
return areDeploymentReplicasAvailable(newStatus, expectedReplicas)
6279
}
6380

6481
func (wt Waiter) ForDeploymentReplicasCreation(ctx context.Context, dp *appsv1.Deployment, expectedReplicas int32) (*appsv1.Deployment, error) {

pkg/objectupdate/sched/sched.go

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717
package sched
1818

1919
import (
20-
"errors"
2120
"fmt"
2221

2322
appsv1 "k8s.io/api/apps/v1"
2423
corev1 "k8s.io/api/core/v1"
2524
"k8s.io/apimachinery/pkg/api/resource"
2625
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27-
"k8s.io/apimachinery/pkg/util/intstr"
2826
"k8s.io/klog/v2"
29-
"k8s.io/utils/ptr"
3027

3128
"github.com/k8stopologyawareschedwg/deployer/pkg/flagcodec"
3229
k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
@@ -106,52 +103,30 @@ func DeploymentTLSSettings(dp *appsv1.Deployment, tlsSettings objtls.Settings) e
106103
return nil
107104
}
108105

109-
// DeploymentAffinityWithStrategy configures required pod anti-affinity on the scheduler
110-
// deployment only when the CR leaves replica count to autodetection (Replicas unset).
111-
// An explicit non-nil Replicas value means the user chose a replica count and keeps
112-
// full control (no required pod anti-affinity).
113-
func DeploymentAffinityWithStrategy(dp *appsv1.Deployment, spec nropv1.NUMAResourcesSchedulerSpec) error {
114-
if spec.Replicas != nil {
115-
if dp.Spec.Template.Spec.Affinity != nil {
116-
dp.Spec.Template.Spec.Affinity.PodAntiAffinity = nil
117-
}
118-
dp.Spec.Strategy = appsv1.DeploymentStrategy{}
119-
return nil
120-
}
121-
106+
func DeploymentTopologySpreadConstraints(dp *appsv1.Deployment) error {
122107
labels := dp.Spec.Template.Labels
123108
if len(labels) == 0 {
124-
return errors.New("no labels provided for PodAntiAffinity")
125-
}
126-
127-
if dp.Spec.Template.Spec.Affinity == nil {
128-
dp.Spec.Template.Spec.Affinity = &corev1.Affinity{}
109+
return fmt.Errorf("no labels found in deployment template")
129110
}
130111

131-
dp.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
132-
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
133-
{
134-
LabelSelector: &metav1.LabelSelector{
135-
MatchLabels: labels,
136-
},
137-
TopologyKey: "kubernetes.io/hostname",
138-
},
112+
schedConstr := corev1.TopologySpreadConstraint{
113+
LabelSelector: &metav1.LabelSelector{
114+
MatchLabels: labels,
139115
},
140-
}
141-
klog.V(3).InfoS("Scheduler Deployment affinity", "podAntiAffinity", dp.Spec.Template.Spec.Affinity.PodAntiAffinity.String())
142-
// NOTE: With replicas=1 and no PodAntiAffinity, the default strategy (surge 1, then remove old)
143-
// works as expected — the new pod can schedule anywhere, no constraints block it.
144-
dp.Spec.Strategy = appsv1.DeploymentStrategy{
145-
Type: appsv1.RollingUpdateDeploymentStrategyType,
146-
RollingUpdate: &appsv1.RollingUpdateDeployment{
147-
MaxUnavailable: ptr.To(intstr.FromInt(1)),
148-
MaxSurge: ptr.To(intstr.FromInt(0)),
149-
},
150-
}
151-
klog.V(3).InfoS("Scheduler Deployment Rollout Strategy", "rolloutStrategy", dp.Spec.Strategy.String())
116+
MaxSkew: 1,
117+
TopologyKey: "kubernetes.io/hostname",
118+
WhenUnsatisfiable: corev1.DoNotSchedule,
119+
// the below label is set by the deployment controller and is needed for safe rollouts to allow ignoring
120+
// the old replicaset and calculates the spread purely based on the new replicaset, ensuring the final
121+
// state is perfectly balanced without stalling:
122+
// https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template-hash-label
123+
MatchLabelKeys: []string{"pod-template-hash"},
124+
}
125+
126+
dp.Spec.Template.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{schedConstr}
127+
klog.V(3).InfoS("scheduler deployment topology spread constraints", "constraints", schedConstr.String())
152128
return nil
153129
}
154-
155130
func SchedulerConfig(cm *corev1.ConfigMap, name string, params *k8swgmanifests.ConfigParams) error {
156131
if cm.Data == nil {
157132
return fmt.Errorf("no data found in ConfigMap: %s/%s", cm.Namespace, cm.Name)

pkg/objectupdate/sched/sched_test.go

Lines changed: 23 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
corev1 "k8s.io/api/core/v1"
3131
"k8s.io/apimachinery/pkg/api/resource"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33-
"k8s.io/apimachinery/pkg/util/intstr"
34-
"k8s.io/utils/ptr"
3533

3634
k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
3735

@@ -635,122 +633,40 @@ func TestDeploymentTLSSettingsRepeated(t *testing.T) {
635633
}
636634
}
637635

638-
func TestDeploymentAffinityWithStrategyNoLabels(t *testing.T) {
636+
func TestDeploymentTopologySpreadConstraintsNoLabels(t *testing.T) {
639637
dp := dpMinimal.DeepCopy()
640-
err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{})
641-
if err == nil {
642-
t.Fatalf("expected error but received nil")
638+
dp.Spec.Template.Labels = nil
639+
if err := DeploymentTopologySpreadConstraints(dp); err == nil {
640+
t.Fatalf("expected error but got nil")
643641
}
644-
}
645642

646-
func TestDeploymentAffinityWithStrategyExplicitNonNilReplicasCount(t *testing.T) {
647-
testcases := []struct {
648-
name string
649-
nonNilReplicasCount *int32
650-
}{
651-
{
652-
name: "replicas count is positive int",
653-
nonNilReplicasCount: ptr.To(int32(2)),
654-
},
655-
{
656-
name: "replicas count is zero",
657-
nonNilReplicasCount: ptr.To(int32(0)),
658-
},
659-
}
660-
for _, tc := range testcases {
661-
t.Run(tc.name, func(t *testing.T) {
662-
dp := dpMinimal.DeepCopy()
663-
dp.Spec.Template.ObjectMeta.Labels = map[string]string{"app": "scheduler"}
664-
initialNodeAffinity := &corev1.NodeAffinity{
665-
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
666-
NodeSelectorTerms: []corev1.NodeSelectorTerm{
667-
{
668-
MatchExpressions: []corev1.NodeSelectorRequirement{
669-
{
670-
Key: "node-role.kubernetes.io/worker",
671-
Operator: corev1.NodeSelectorOpExists,
672-
Values: []string{""},
673-
},
674-
},
675-
},
676-
},
677-
},
678-
}
679-
dp.Spec.Template.Spec.Affinity = &corev1.Affinity{
680-
NodeAffinity: initialNodeAffinity,
681-
PodAntiAffinity: &corev1.PodAntiAffinity{
682-
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
683-
{
684-
LabelSelector: &metav1.LabelSelector{
685-
MatchLabels: map[string]string{"app": "scheduler"},
686-
},
687-
},
688-
},
689-
},
690-
}
691-
err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: tc.nonNilReplicasCount})
692-
if err != nil {
693-
t.Fatalf("unexpected error: %v", err)
694-
}
695-
// check that the node affinity is preserved
696-
if diff := cmp.Diff(initialNodeAffinity, dp.Spec.Template.Spec.Affinity.NodeAffinity); diff != "" {
697-
t.Errorf("should preserve existing NodeAffinity: affinity mismatch (-expected +got):\n%s", diff)
698-
}
699-
if dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil {
700-
t.Fatalf("expected podAntiAffinity to be reset but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity)
701-
}
702-
if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) {
703-
t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy)
704-
}
705-
})
643+
dp.Spec.Template.Labels = map[string]string{}
644+
if err := DeploymentTopologySpreadConstraints(dp); err == nil {
645+
t.Fatalf("expected error but got nil")
706646
}
707647
}
708648

709-
func TestDeploymentAffinityWithStrategyWithOverride(t *testing.T) {
710-
labels := map[string]string{"app": "scheduler"}
711-
expectedStrategy := appsv1.DeploymentStrategy{
712-
Type: appsv1.RollingUpdateDeploymentStrategyType,
713-
RollingUpdate: &appsv1.RollingUpdateDeployment{
714-
MaxUnavailable: ptr.To(intstr.FromInt(1)),
715-
MaxSurge: ptr.To(intstr.FromInt(0)),
716-
},
717-
}
718-
expectedPodAntiAffinity := &corev1.PodAntiAffinity{
719-
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
720-
{
721-
LabelSelector: &metav1.LabelSelector{
722-
MatchLabels: labels,
723-
},
724-
TopologyKey: "kubernetes.io/hostname",
725-
},
726-
},
727-
}
649+
func TestDeploymentTopologySpreadConstraints(t *testing.T) {
728650
dp := dpMinimal.DeepCopy()
729-
dp.Spec.Template.ObjectMeta.Labels = labels
730-
731-
err := DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{})
732-
if err != nil {
733-
t.Fatalf("unexpected error: %v", err)
734-
}
735-
736-
if diff := cmp.Diff(expectedPodAntiAffinity, dp.Spec.Template.Spec.Affinity.PodAntiAffinity); diff != "" {
737-
t.Errorf("affinity mismatch (-expected +got):\n%s", diff)
738-
}
739-
if diff := cmp.Diff(expectedStrategy, dp.Spec.Strategy); diff != "" {
740-
t.Errorf("strategy mismatch (-expected +got):\n%s", diff)
651+
dp.Spec.Template.Labels = map[string]string{
652+
"app": "numaresources-scheduler",
741653
}
742-
743-
// check reset works
744-
err = DeploymentAffinityWithStrategy(dp, nropv1.NUMAResourcesSchedulerSpec{Replicas: ptr.To(int32(2))})
745-
if err != nil {
654+
if err := DeploymentTopologySpreadConstraints(dp); err != nil {
746655
t.Fatalf("unexpected error: %v", err)
747656
}
748-
749-
if dp.Spec.Template.Spec.Affinity != nil && dp.Spec.Template.Spec.Affinity.PodAntiAffinity != nil {
750-
t.Fatalf("Override failed: expected no podAntiAffinity but got %v", dp.Spec.Template.Spec.Affinity.PodAntiAffinity)
657+
expectedConstraints := []corev1.TopologySpreadConstraint{
658+
{
659+
LabelSelector: &metav1.LabelSelector{
660+
MatchLabels: dp.Spec.Template.Labels,
661+
},
662+
MaxSkew: 1,
663+
TopologyKey: "kubernetes.io/hostname",
664+
WhenUnsatisfiable: corev1.DoNotSchedule,
665+
MatchLabelKeys: []string{"pod-template-hash"},
666+
},
751667
}
752-
if dp.Spec.Strategy != (appsv1.DeploymentStrategy{}) {
753-
t.Fatalf("expected strategy to be reset but got %v", dp.Spec.Strategy)
668+
if diff := cmp.Diff(dp.Spec.Template.Spec.TopologySpreadConstraints, expectedConstraints); diff != "" {
669+
t.Errorf("constraints mismatch\n%s", diff)
754670
}
755671
}
756672

0 commit comments

Comments
 (0)