Skip to content

Commit ff0a97c

Browse files
pkg/clusteragent/autoscaling/cluster/spot: wait for tracker to settle before advancing fake clock
Test scenarios use fake clock to speed-up on-demand fallback and rebalancing. The rebalancer skips an iteration when there are pending pods or in-flight admissions therefore update tests to step fake clock after workload updates have settled. Also refactor manual pod creation/deletion into fakeDeployment Reconcile and ScaleDown methods. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 34e34bb commit ff0a97c

4 files changed

Lines changed: 136 additions & 77 deletions

File tree

pkg/clusteragent/autoscaling/cluster/spot/cluster_test.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type fakeDeployment struct {
7777
name string
7878
existingReplicaSet string
7979
podSelector map[string]string
80+
replicas int
8081
}
8182

8283
// newFakeCluster creates a fakeCluster.
@@ -135,8 +136,8 @@ func (c *fakeCluster) OnPodDeleted(hook deletionHook) {
135136
c.podDeletedHooks = append(c.podDeletedHooks, hook)
136137
}
137138

138-
// CreatePod runs all registered admission hooks on the pod then creates it as Pending.
139-
func (c *fakeCluster) CreatePod(pod *corev1.Pod) {
139+
// createPod runs all registered admission hooks on the pod then creates it as Pending.
140+
func (c *fakeCluster) createPod(pod *corev1.Pod) {
140141
unmodifiedCopy := pod.DeepCopy()
141142
for _, hook := range c.podCreatedHooks {
142143
updated, err := hook(pod)
@@ -417,16 +418,61 @@ func (d *fakeDeployment) rolloutWithDelay(replicas int) string {
417418
// A new ReplicaSet created
418419
newReplicaSet := replicaSetName(d.name)
419420
for range replicas {
420-
d.cluster.CreatePod(newPod(d.namespace, kubernetes.ReplicaSetKind, newReplicaSet, d.podSelector))
421+
d.cluster.createPod(newPod(d.namespace, kubernetes.ReplicaSetKind, newReplicaSet, d.podSelector))
421422
}
422423
// Existing ReplicaSet is scaled down
423424
if d.existingReplicaSet != "" {
424425
d.cluster.DeleteOwnerPods(kubernetes.ReplicaSetKind, d.namespace, d.existingReplicaSet)
425426
}
426427
d.existingReplicaSet = newReplicaSet
428+
d.replicas = replicas
427429
return newReplicaSet
428430
}
429431

432+
// Reconcile creates pods to bring the current ReplicaSet back to d.replicas,
433+
// counting existing non-terminal pods and creating only the missing number.
434+
func (d *fakeDeployment) Reconcile() {
435+
if d.existingReplicaSet == "" {
436+
return
437+
}
438+
pods := d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, d.existingReplicaSet)
439+
active := 0
440+
for _, pod := range pods {
441+
phase := corev1.PodPhase(pod.Phase)
442+
if phase != corev1.PodSucceeded && phase != corev1.PodFailed {
443+
active++
444+
}
445+
}
446+
for range max(0, d.replicas-active) {
447+
d.cluster.createPod(newPod(d.namespace, kubernetes.ReplicaSetKind, d.existingReplicaSet, d.podSelector))
448+
}
449+
}
450+
451+
// ScaleDown deletes pods selected by deleteFilter and updates the replica count to the number of remaining pods.
452+
func (d *fakeDeployment) ScaleDown(deleteFilter func([]*workloadmeta.KubernetesPod) []*workloadmeta.KubernetesPod) {
453+
t := d.cluster.t
454+
t.Helper()
455+
rs := d.ReplicaSet()
456+
pods := d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, rs)
457+
toDelete := deleteFilter(pods)
458+
459+
d.replicas = len(pods) - len(toDelete)
460+
461+
deleted := make(map[string]struct{}, len(toDelete))
462+
for _, pod := range toDelete {
463+
d.cluster.DeletePod(pod)
464+
deleted[pod.ID] = struct{}{}
465+
}
466+
require.Eventually(t, func() bool {
467+
for _, pod := range d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, rs) {
468+
if _, ok := deleted[pod.ID]; ok {
469+
return false
470+
}
471+
}
472+
return true
473+
}, 5*time.Second, 100*time.Millisecond)
474+
}
475+
430476
func async(f func(workloadmeta.Entity), e workloadmeta.Entity) {
431477
go func() {
432478
time.Sleep(time.Duration(10+rand.N(40)) * time.Millisecond)

pkg/clusteragent/autoscaling/cluster/spot/pod_tracker.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (ps *ownerPodSet) track(uid string, isSpot bool, info podInfo, now time.Tim
306306
// getPodToDelete returns the uid and name of a pod to delete to make progress toward the desired config.
307307
// It returns empty strings if no deletion is needed.
308308
func (ps *ownerPodSet) getPodToDelete(lastUpdatedBefore time.Time) (string, string) {
309-
if ps.admissionSpotCount > 0 || ps.admissionOnDemandCount > 0 {
309+
if ps.hasAdmissions() {
310310
return "", ""
311311
}
312312

@@ -340,6 +340,11 @@ func (ps *ownerPodSet) getPodToDelete(lastUpdatedBefore time.Time) (string, stri
340340
return "", ""
341341
}
342342

343+
// hasAdmissions returns true if pod set has admitted but not yet tracked pods.
344+
func (ps *ownerPodSet) hasAdmissions() bool {
345+
return ps.admissionSpotCount > 0 || ps.admissionOnDemandCount > 0
346+
}
347+
343348
// hasPending returns true if any tracked pod is in PodPending phase.
344349
func (ps *ownerPodSet) hasPending() bool {
345350
const pending = string(corev1.PodPending)

pkg/clusteragent/autoscaling/cluster/spot/scheduler_test.go

Lines changed: 63 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -208,34 +208,32 @@ func TestScenarios(t *testing.T) {
208208

209209
// When
210210
// ReplicaSet recreates pods
211-
for range 6 {
212-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
213-
}
211+
d.Reconcile()
214212

215213
// Then
216214
// Fallback to on-demand
217215
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(10))
218216

219217
// When
220218
cluster.AddSpotNode("new-spot")
221-
// Advance past disabled interval to re-enable spot scheduling
222-
clk.Step(s.Config().FallbackDuration)
219+
220+
// Advance past disabled interval to re-enable spot scheduling.
221+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().FallbackDuration, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
222+
223223
requireEventually(t, func() bool {
224224
return !s.IsSpotSchedulingDisabled("apps", kubernetes.DeploymentKind, d.namespace, d.name)
225225
})
226226

227227
// Rebalancing
228228
for i := range 6 {
229229
// When
230-
clk.Step(s.Config().RebalanceStabilizationPeriod)
230+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
231231

232232
// Then: excess on-demand pod evicted
233233
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(10-1-i))
234234

235235
// ReplicaSet recreates pod
236-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
237-
// Important: wait for it to be Running before next step
238-
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(i+1))
236+
d.Reconcile()
239237
}
240238

241239
// Then
@@ -275,13 +273,12 @@ func TestScenarios(t *testing.T) {
275273

276274
cluster.DeletePod(pod)
277275
deleted[pod.ID] = struct{}{}
278-
279-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
280276
}
281-
282277
// Important: wait until deletion is complete before checking expectations to avoid counting deleted pods.
283278
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectHasNoneOf(deleted))
284279

280+
d.Reconcile()
281+
285282
// Then
286283
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(expectedSpot))
287284
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(expectedOnDemand))
@@ -302,14 +299,15 @@ func TestScenarios(t *testing.T) {
302299
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(4))
303300

304301
// When: scale down to 5 replicas leaving 2 spot / 3 on-demand — ratio is off
305-
scaleDown(t, cluster, kubernetes.ReplicaSetKind, "default", rs, 2, 3)
302+
d.ScaleDown(keep(2, 3))
303+
304+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
306305

307-
// When: rebalancing evicts the excess on-demand pod
308-
clk.Step(s.Config().RebalanceStabilizationPeriod)
306+
// Then: excess on-demand pod evicted
309307
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(2))
310308

311309
// ReplicaSet recreates the evicted pod as spot
312-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
310+
d.Reconcile()
313311

314312
// Then: 3 spot / 2 on-demand (60% of 5, minOnDemand=2 satisfied)
315313
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(3))
@@ -330,17 +328,19 @@ func TestScenarios(t *testing.T) {
330328
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(4))
331329

332330
// When: scale down to 5 replicas leaving 5 spot / 0 on-demand — on-demand count is below minOnDemand=2
333-
scaleDown(t, cluster, kubernetes.ReplicaSetKind, "default", rs, 5, 0)
331+
d.ScaleDown(keep(5, 0))
334332

335333
// Rebalancing evicts spot pods until on-demand count reaches minOnDemand=2.
336334
// Each evicted spot pod is recreated by the ReplicaSet as on-demand.
337335
for i := range 2 {
338-
clk.Step(s.Config().RebalanceStabilizationPeriod)
336+
// When
337+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
338+
339+
// Then: excess spot pod evicted
339340
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(4-i))
340341

341342
// ReplicaSet recreates the evicted pod as on-demand (on-demand count still below minOnDemand)
342-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
343-
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(i+1))
343+
d.Reconcile()
344344
}
345345

346346
// Then: 3 spot / 2 on-demand (60% of 5, minOnDemand=2 satisfied)
@@ -364,14 +364,15 @@ func TestScenarios(t *testing.T) {
364364

365365
// When: scale down to 5 replicas leaving 4 spot / 1 on-demand — on-demand satisfies minOnDemand=1
366366
// but spot count exceeds the desired 3 (60% of 5).
367-
scaleDown(t, cluster, kubernetes.ReplicaSetKind, "default", rs, 4, 1)
367+
d.ScaleDown(keep(4, 1))
368368

369-
// When: rebalancing evicts the excess spot pod
370-
clk.Step(s.Config().RebalanceStabilizationPeriod)
369+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
370+
371+
// Then: excess spot pod evicted
371372
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(3))
372373

373374
// ReplicaSet recreates the evicted pod as on-demand
374-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
375+
d.Reconcile()
375376

376377
// Then: 3 spot / 2 on-demand (60% of 5, minOnDemand=1 satisfied)
377378
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(3))
@@ -392,10 +393,11 @@ func TestScenarios(t *testing.T) {
392393
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(4))
393394

394395
// When: scale down to 5 replicas preserving the ratio — 3 spot / 2 on-demand (60% of 5)
395-
scaleDown(t, cluster, kubernetes.ReplicaSetKind, "default", rs, 3, 2)
396+
d.ScaleDown(keep(3, 2))
397+
398+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
396399

397400
// Then: ratio is already correct; rebalancing does not evict any pod
398-
clk.Step(s.Config().RebalanceStabilizationPeriod)
399401
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(3))
400402
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(2))
401403
})
@@ -444,10 +446,13 @@ func TestScenarios(t *testing.T) {
444446

445447
// Then: rebalancer evicts one on-demand pod per cycle; RS recreates it as spot.
446448
for i := range expectedSpot {
447-
clk.Step(s.Config().RebalanceStabilizationPeriod)
449+
// When
450+
stepClockAfterUpdatesSettled(t, s, clk, s.Config().RebalanceStabilizationPeriod, "apps", kubernetes.DeploymentKind, d.namespace, d.name)
451+
452+
// Then: excess on-demand pod evicted
448453
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningOnDemand(replicas-1-i))
449-
cluster.CreatePod(newPod("default", kubernetes.ReplicaSetKind, rs, nil))
450-
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(i+1))
454+
455+
d.Reconcile()
451456
}
452457

453458
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectRunningSpot(expectedSpot))
@@ -537,15 +542,6 @@ func TestScenarios(t *testing.T) {
537542
total, spot := s.TrackedCounts("apps", kubernetes.DeploymentKind, d.namespace, d.name)
538543
assert.Zero(t, total)
539544
assert.Zero(t, spot)
540-
541-
// When
542-
deleted := make(map[string]struct{}, 5)
543-
for _, pod := range cluster.ListOwnerPods(kubernetes.ReplicaSetKind, "default", rs) {
544-
cluster.DeletePod(pod)
545-
deleted[pod.ID] = struct{}{}
546-
}
547-
// Then
548-
requireOwnerPods(cluster, kubernetes.ReplicaSetKind, "default", rs, expectHasNoneOf(deleted))
549545
})
550546

551547
t.Run("Restarted scheduler tracks existing pods", func(t *testing.T) {
@@ -579,45 +575,37 @@ func TestScenarios(t *testing.T) {
579575
})
580576
}
581577

582-
// scaleDown simulates a Deployment scale-down by deleting pods to reach the expected spot/on-demand counts.
583-
func scaleDown(t *testing.T, cluster *fakeCluster, ownerKind, namespace, name string, expectSpot, expectOnDemand int) {
584-
t.Helper()
585-
pods := cluster.ListOwnerPods(ownerKind, namespace, name)
586-
587-
currentSpot, currentOnDemand := 0, 0
588-
for _, pod := range pods {
589-
if spot.IsSpotAssigned(pod) {
590-
currentSpot++
591-
} else {
592-
currentOnDemand++
593-
}
594-
}
595-
596-
require.GreaterOrEqual(t, currentSpot, expectSpot, "expectSpot=%d exceeds current spot count %d", expectSpot, currentSpot)
597-
require.GreaterOrEqual(t, currentOnDemand, expectOnDemand, "expectOnDemand=%d exceeds current on-demand count %d", expectOnDemand, currentOnDemand)
598-
599-
spotToDelete := currentSpot - expectSpot
600-
onDemandToDelete := currentOnDemand - expectOnDemand
601-
deleted := make(map[string]struct{})
602-
for _, pod := range pods {
603-
if spot.IsSpotAssigned(pod) {
604-
if spotToDelete > 0 {
605-
cluster.DeletePod(pod)
606-
deleted[pod.ID] = struct{}{}
607-
spotToDelete--
608-
}
609-
} else {
610-
if onDemandToDelete > 0 {
611-
cluster.DeletePod(pod)
612-
deleted[pod.ID] = struct{}{}
613-
onDemandToDelete--
578+
// keep returns a filter that retains given number of spot and on-demand pods.
579+
func keep(spotCount, onDemandCount int) func([]*workloadmeta.KubernetesPod) []*workloadmeta.KubernetesPod {
580+
return func(pods []*workloadmeta.KubernetesPod) []*workloadmeta.KubernetesPod {
581+
var toDelete []*workloadmeta.KubernetesPod
582+
for _, pod := range pods {
583+
if spot.IsSpotAssigned(pod) {
584+
if spotCount > 0 {
585+
spotCount--
586+
} else {
587+
toDelete = append(toDelete, pod)
588+
}
589+
} else {
590+
if onDemandCount > 0 {
591+
onDemandCount--
592+
} else {
593+
toDelete = append(toDelete, pod)
594+
}
614595
}
615596
}
597+
return toDelete
616598
}
599+
}
617600

618-
requireOwnerPods(cluster, ownerKind, namespace, name, expectHasNoneOf(deleted))
619-
requireOwnerPods(cluster, ownerKind, namespace, name, expectRunningSpot(expectSpot))
620-
requireOwnerPods(cluster, ownerKind, namespace, name, expectRunningOnDemand(expectOnDemand))
601+
// stepClockAfterUpdatesSettled waits for the pod tracker to have no in-flight admissions or pending pods
602+
// for the given workload, then advances the fake clock by duration.
603+
func stepClockAfterUpdatesSettled(t *testing.T, s *spot.TestScheduler, clk *clocktesting.FakeClock, duration time.Duration, group, kind, namespace, name string) {
604+
t.Helper()
605+
requireEventually(t, func() bool {
606+
return !s.HasAdmissionsOrPending(group, kind, namespace, name)
607+
})
608+
clk.Step(duration)
621609
}
622610

623611
func requireEventually(t *testing.T, condition func() bool, msgAndArgs ...any) {
@@ -647,6 +635,8 @@ func (h *spewStringer[T]) String() string {
647635

648636
// requireOwnerPods checks that all pods owned by ownerKind/namespace/ownerName eventually satisfy check.
649637
func requireOwnerPods(c *fakeCluster, ownerKind, namespace, ownerName string, check func(wlm []*workloadmeta.KubernetesPod) bool) {
638+
c.T().Helper()
639+
650640
pods := new(spewStringer[[]*workloadmeta.KubernetesPod])
651641
requireEventually(c.T(), func() bool {
652642
return check(pods.set(c.ListOwnerPods(ownerKind, namespace, ownerName)))

pkg/clusteragent/autoscaling/cluster/spot/testing_helpers.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,24 @@ func NewTestScheduler(config Config, clk clock.WithTicker, wlm workloadmeta.Comp
3535
return newScheduler(config, clk, wlm, evictorFunc, patcherFunc, dynamicClient, newWLMPodLister(wlm), isLeader)
3636
}
3737

38+
// HasAdmissionsOrPending reports whether the pod tracker has any in-flight admissions or pending pods
39+
// for the given workload. Used in tests to wait for pod tracker updates to propagate before advancing the clock.
40+
func (s *TestScheduler) HasAdmissionsOrPending(group, kind, namespace, name string) bool {
41+
s.tracker.mu.RLock()
42+
defer s.tracker.mu.RUnlock()
43+
w := objectRef{Group: group, Kind: kind, Namespace: namespace, Name: name}
44+
owners, ok := s.tracker.podSets[w]
45+
if !ok {
46+
return true
47+
}
48+
for _, ps := range owners {
49+
if ps.hasAdmissions() || ps.hasPending() {
50+
return true
51+
}
52+
}
53+
return false
54+
}
55+
3856
// TrackedCounts returns the total and spot tracked pod counts (including in-flight admissions) for the given workload.
3957
func (s *TestScheduler) TrackedCounts(group, kind, namespace, name string) (total, spot int) {
4058
s.tracker.mu.RLock()

0 commit comments

Comments
 (0)