Skip to content

Commit fdf6634

Browse files
pkg/clusteragent/autoscaling/cluster/spot: fix the race between pod updates and workload opt-out (#49880)
### What does this PR do? Refactor podTracker to use up-to-date spot config and untrack workload in case it is not spot eligible anymore (opted-out). ### Motivation This fixes the race between pod and workload updates where pod updates may arrive after workload was opted-out and silently recreate the pod set in the tracker. ### Describe how you validated your changes Reproduced the race in main via: ``` while go test -tags "kubeapiserver test" -count=1 ./pkg/clusteragent/autoscaling/cluster/spot/... ; do echo OK ; done ``` and verified it does not reproduce with the changes. ### Additional Notes To reproduce the race in tests delay WLM updates. Test scenarios use fake clock to speed-up on-demand fallback and rebalancing. The rebalancer skips an iteration when there are pending pods or in-flight admissions therefore update tests to step fake clock after workload updates have settled. Also refactor manual pod creation/deletion into fakeDeployment Reconcile and ScaleDown methods. Updates https://datadoghq.atlassian.net/browse/CASCL-1274 Co-authored-by: alexander.yastrebov <alexander.yastrebov@datadoghq.com>
1 parent faaf02e commit fdf6634

5 files changed

Lines changed: 266 additions & 98 deletions

File tree

pkg/clusteragent/autoscaling/cluster/spot/cluster_test.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type fakeDeployment struct {
7777
name string
7878
existingReplicaSet string
7979
podSelector map[string]string
80+
replicas int
8081
}
8182

8283
// newFakeCluster creates a fakeCluster.
@@ -135,8 +136,8 @@ func (c *fakeCluster) OnPodDeleted(hook deletionHook) {
135136
c.podDeletedHooks = append(c.podDeletedHooks, hook)
136137
}
137138

138-
// CreatePod runs all registered admission hooks on the pod then creates it as Pending.
139-
func (c *fakeCluster) CreatePod(pod *corev1.Pod) {
139+
// createPod runs all registered admission hooks on the pod then creates it as Pending.
140+
func (c *fakeCluster) createPod(pod *corev1.Pod) {
140141
unmodifiedCopy := pod.DeepCopy()
141142
for _, hook := range c.podCreatedHooks {
142143
updated, err := hook(pod)
@@ -218,9 +219,10 @@ func (c *fakeCluster) T() *testing.T {
218219
return c.t
219220
}
220221

221-
// WLM returns the underlying workloadmeta mock store.
222-
func (c *fakeCluster) WLM() workloadmetamock.Mock {
223-
return c.wlm
222+
// WLM returns the workloadmeta component used by the scheduler.
223+
func (c *fakeCluster) WLM() workloadmeta.Component {
224+
// Delay updates delivery to expose race conditions
225+
return newDelayedWLM(c.wlm, 50*time.Millisecond)
224226
}
225227

226228
func (c *fakeCluster) DynamicClient() dynamic.Interface {
@@ -416,16 +418,61 @@ func (d *fakeDeployment) rolloutWithDelay(replicas int) string {
416418
// A new ReplicaSet created
417419
newReplicaSet := replicaSetName(d.name)
418420
for range replicas {
419-
d.cluster.CreatePod(newPod(d.namespace, kubernetes.ReplicaSetKind, newReplicaSet, d.podSelector))
421+
d.cluster.createPod(newPod(d.namespace, kubernetes.ReplicaSetKind, newReplicaSet, d.podSelector))
420422
}
421423
// Existing ReplicaSet is scaled down
422424
if d.existingReplicaSet != "" {
423425
d.cluster.DeleteOwnerPods(kubernetes.ReplicaSetKind, d.namespace, d.existingReplicaSet)
424426
}
425427
d.existingReplicaSet = newReplicaSet
428+
d.replicas = replicas
426429
return newReplicaSet
427430
}
428431

432+
// Reconcile creates pods to bring the current ReplicaSet back to d.replicas,
433+
// counting existing non-terminal pods and creating only the missing number.
434+
func (d *fakeDeployment) Reconcile() {
435+
if d.existingReplicaSet == "" {
436+
return
437+
}
438+
pods := d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, d.existingReplicaSet)
439+
active := 0
440+
for _, pod := range pods {
441+
phase := corev1.PodPhase(pod.Phase)
442+
if phase != corev1.PodSucceeded && phase != corev1.PodFailed {
443+
active++
444+
}
445+
}
446+
for range max(0, d.replicas-active) {
447+
d.cluster.createPod(newPod(d.namespace, kubernetes.ReplicaSetKind, d.existingReplicaSet, d.podSelector))
448+
}
449+
}
450+
451+
// ScaleDown deletes pods selected by deleteFilter and updates the replica count to the number of remaining pods.
452+
func (d *fakeDeployment) ScaleDown(deleteFilter func([]*workloadmeta.KubernetesPod) []*workloadmeta.KubernetesPod) {
453+
t := d.cluster.t
454+
t.Helper()
455+
rs := d.ReplicaSet()
456+
pods := d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, rs)
457+
toDelete := deleteFilter(pods)
458+
459+
d.replicas = len(pods) - len(toDelete)
460+
461+
deleted := make(map[string]struct{}, len(toDelete))
462+
for _, pod := range toDelete {
463+
d.cluster.DeletePod(pod)
464+
deleted[pod.ID] = struct{}{}
465+
}
466+
require.Eventually(t, func() bool {
467+
for _, pod := range d.cluster.ListOwnerPods(kubernetes.ReplicaSetKind, d.namespace, rs) {
468+
if _, ok := deleted[pod.ID]; ok {
469+
return false
470+
}
471+
}
472+
return true
473+
}, 5*time.Second, 100*time.Millisecond)
474+
}
475+
429476
func async(f func(workloadmeta.Entity), e workloadmeta.Entity) {
430477
go func() {
431478
time.Sleep(time.Duration(10+rand.N(40)) * time.Millisecond)
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-present Datadog, Inc.
5+
6+
//go:build kubeapiserver
7+
8+
package spot_test
9+
10+
import (
11+
"sync"
12+
"time"
13+
14+
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
15+
)
16+
17+
// delayedWLM wraps a workloadmeta.Component and introduces a fixed delay before
18+
// delivering each event bundle to subscribers.
19+
type delayedWLM struct {
20+
workloadmeta.Component
21+
delay time.Duration
22+
23+
mu sync.Mutex
24+
channels map[chan workloadmeta.EventBundle]delayedSubscription
25+
}
26+
27+
type delayedSubscription struct {
28+
realCh chan workloadmeta.EventBundle
29+
done chan struct{}
30+
}
31+
32+
func newDelayedWLM(component workloadmeta.Component, delay time.Duration) *delayedWLM {
33+
return &delayedWLM{
34+
Component: component,
35+
delay: delay,
36+
channels: make(map[chan workloadmeta.EventBundle]delayedSubscription),
37+
}
38+
}
39+
40+
// Subscribe returns a channel that receives the same event bundles as the real
41+
// WLM channel but delayed by d.delay.
42+
func (d *delayedWLM) Subscribe(name string, priority workloadmeta.SubscriberPriority, filter *workloadmeta.Filter) chan workloadmeta.EventBundle {
43+
realCh := d.Component.Subscribe(name, priority, filter)
44+
wrappedCh := make(chan workloadmeta.EventBundle, 100)
45+
done := make(chan struct{})
46+
47+
d.mu.Lock()
48+
d.channels[wrappedCh] = delayedSubscription{realCh: realCh, done: done}
49+
d.mu.Unlock()
50+
51+
go func() {
52+
defer close(wrappedCh)
53+
for {
54+
select {
55+
case bundle, ok := <-realCh:
56+
if !ok {
57+
return
58+
}
59+
time.Sleep(d.delay)
60+
select {
61+
case wrappedCh <- bundle:
62+
case <-done:
63+
return
64+
}
65+
case <-done:
66+
return
67+
}
68+
}
69+
}()
70+
71+
return wrappedCh
72+
}
73+
74+
// Unsubscribe stops the forwarding goroutine and unsubscribes from the real WLM.
75+
func (d *delayedWLM) Unsubscribe(ch chan workloadmeta.EventBundle) {
76+
d.mu.Lock()
77+
sub, ok := d.channels[ch]
78+
if ok {
79+
delete(d.channels, ch)
80+
}
81+
d.mu.Unlock()
82+
83+
if ok {
84+
close(sub.done)
85+
d.Component.Unsubscribe(sub.realCh)
86+
}
87+
}

pkg/clusteragent/autoscaling/cluster/spot/pod_tracker.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ func (t *podTracker) admitNewPod(o podOwnership) bool {
7676
t.mu.Lock()
7777
defer t.mu.Unlock()
7878

79-
ps := t.getOrCreatePodSetLocked(o)
80-
t.refreshConfigLocked(o.topLevelOwner, ps)
79+
ps, ok := t.getPodSetLocked(o)
80+
if !ok {
81+
return false
82+
}
8183

8284
total := ps.totalCount()
8385
spot := ps.spotCount()
@@ -103,7 +105,9 @@ func (t *podTracker) admitNewOnDemandPod(o podOwnership) {
103105
t.mu.Lock()
104106
defer t.mu.Unlock()
105107

106-
t.getOrCreatePodSetLocked(o).admit(false)
108+
if ps, ok := t.getPodSetLocked(o); ok {
109+
ps.admit(false)
110+
}
107111
}
108112

109113
// addedOrUpdated updates tracking state when a pod is added or updated.
@@ -128,7 +132,11 @@ func (t *podTracker) addedOrUpdated(pod *workloadmeta.KubernetesPod) {
128132
return
129133
}
130134

131-
t.getOrCreatePodSetLocked(o).track(pod.ID, isSpot, podInfo{name: pod.Name, phase: pod.Phase}, t.clock.Now())
135+
ps, ok := t.getPodSetLocked(o)
136+
if !ok {
137+
return
138+
}
139+
ps.track(pod.ID, isSpot, podInfo{name: pod.Name, phase: pod.Phase}, t.clock.Now())
132140

133141
if isSpot {
134142
if pod.Phase == string(corev1.PodPending) {
@@ -182,28 +190,31 @@ func (t *podTracker) deletePodLocked(o podOwnership, uid string) {
182190
delete(t.pendingSpotPods, uid)
183191
}
184192

185-
// refreshConfigLocked refreshes the spot config for the ownerPodSet from the configSource.
193+
// getPodSetLocked returns the ownerPodSet for the given ownership, creating it if absent,
194+
// and always applies the latest config from configSource to the returned pod set.
195+
// When configSource has no entry for the top-level owner it removes
196+
// tracking state for the owner and returns (nil, false).
186197
// Must be called with t.mu held.
187-
func (t *podTracker) refreshConfigLocked(topLevelOwner objectRef, ps *ownerPodSet) {
188-
if cfg, ok := t.configSource(topLevelOwner); ok {
189-
ps.config = cfg
198+
func (t *podTracker) getPodSetLocked(o podOwnership) (*ownerPodSet, bool) {
199+
cfg, hasConfig := t.configSource(o.topLevelOwner)
200+
if !hasConfig {
201+
t.untrackLocked(o.topLevelOwner)
202+
return nil, false
190203
}
191-
}
192204

193-
// getOrCreatePodSetLocked returns the ownerPodSet for the given ownership, creating it if absent.
194-
// Must be called with t.mu held.
195-
func (t *podTracker) getOrCreatePodSetLocked(o podOwnership) *ownerPodSet {
196205
owners, ok := t.podSets[o.topLevelOwner]
197206
if !ok {
198207
owners = make(map[objectRef]*ownerPodSet)
199208
t.podSets[o.topLevelOwner] = owners
200209
}
201-
if ps, ok := owners[o.directOwner]; ok {
202-
return ps
210+
211+
ps, exists := owners[o.directOwner]
212+
if !exists {
213+
ps = t.newOwnerPodSet()
214+
owners[o.directOwner] = ps
203215
}
204-
ps := t.newOwnerPodSet()
205-
owners[o.directOwner] = ps
206-
return ps
216+
ps.config = cfg
217+
return ps, true
207218
}
208219

209220
// getPodToDelete returns the uid, name, and namespace of a pod to delete to make progress toward
@@ -217,8 +228,13 @@ func (t *podTracker) getPodToDelete(rebalanceStabilizationPeriod time.Duration)
217228
now := t.clock.Now()
218229
lastUpdatedBefore := now.Add(-rebalanceStabilizationPeriod)
219230
for topLevel, owners := range t.podSets {
231+
cfg, ok := t.configSource(topLevel)
232+
if !ok {
233+
t.untrackLocked(topLevel)
234+
continue
235+
}
220236
for owner, ps := range owners {
221-
t.refreshConfigLocked(topLevel, ps)
237+
ps.config = cfg
222238
if ps.config.isDisabled(now) {
223239
continue
224240
}
@@ -256,7 +272,12 @@ func (t *podTracker) deletePendingSpotPod(uid string) {
256272
func (t *podTracker) untrack(topLevelOwner objectRef) {
257273
t.mu.Lock()
258274
defer t.mu.Unlock()
275+
t.untrackLocked(topLevelOwner)
276+
}
259277

278+
// untrackLocked removes all tracking state for the given top-level owner.
279+
// Must be called with t.mu held.
280+
func (t *podTracker) untrackLocked(topLevelOwner objectRef) {
260281
delete(t.podSets, topLevelOwner)
261282
for uid, p := range t.pendingSpotPods {
262283
if p.topLevelOwner == topLevelOwner {
@@ -306,7 +327,7 @@ func (ps *ownerPodSet) track(uid string, isSpot bool, info podInfo, now time.Tim
306327
// getPodToDelete returns the uid and name of a pod to delete to make progress toward the desired config.
307328
// It returns empty strings if no deletion is needed.
308329
func (ps *ownerPodSet) getPodToDelete(lastUpdatedBefore time.Time) (string, string) {
309-
if ps.admissionSpotCount > 0 || ps.admissionOnDemandCount > 0 {
330+
if ps.hasAdmissions() {
310331
return "", ""
311332
}
312333

@@ -340,6 +361,11 @@ func (ps *ownerPodSet) getPodToDelete(lastUpdatedBefore time.Time) (string, stri
340361
return "", ""
341362
}
342363

364+
// hasAdmissions returns true if pod set has admitted but not yet tracked pods.
365+
func (ps *ownerPodSet) hasAdmissions() bool {
366+
return ps.admissionSpotCount > 0 || ps.admissionOnDemandCount > 0
367+
}
368+
343369
// hasPending returns true if any tracked pod is in PodPending phase.
344370
func (ps *ownerPodSet) hasPending() bool {
345371
const pending = string(corev1.PodPending)

0 commit comments

Comments
 (0)