Skip to content

Commit 8b9fc49

Browse files
pkg/clusteragent/autoscaling/cluster/spot: fix the race between pod updates and workload opt-out
Refactor podTracker to use up-to-date spot config and untrack workload in case it is not spot eligible anymore (opted-out). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent ff0a97c commit 8b9fc49

1 file changed

Lines changed: 39 additions & 18 deletions

File tree

pkg/clusteragent/autoscaling/cluster/spot/pod_tracker.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ func (t *podTracker) admitNewPod(o podOwnership) bool {
7676
t.mu.Lock()
7777
defer t.mu.Unlock()
7878

79-
ps := t.getOrCreatePodSetLocked(o)
80-
t.refreshConfigLocked(o.topLevelOwner, ps)
79+
ps, ok := t.getPodSetLocked(o)
80+
if !ok {
81+
return false
82+
}
8183

8284
total := ps.totalCount()
8385
spot := ps.spotCount()
@@ -103,7 +105,9 @@ func (t *podTracker) admitNewOnDemandPod(o podOwnership) {
103105
t.mu.Lock()
104106
defer t.mu.Unlock()
105107

106-
t.getOrCreatePodSetLocked(o).admit(false)
108+
if ps, ok := t.getPodSetLocked(o); ok {
109+
ps.admit(false)
110+
}
107111
}
108112

109113
// addedOrUpdated updates tracking state when a pod is added or updated.
@@ -128,7 +132,11 @@ func (t *podTracker) addedOrUpdated(pod *workloadmeta.KubernetesPod) {
128132
return
129133
}
130134

131-
t.getOrCreatePodSetLocked(o).track(pod.ID, isSpot, podInfo{name: pod.Name, phase: pod.Phase}, t.clock.Now())
135+
ps, ok := t.getPodSetLocked(o)
136+
if !ok {
137+
return
138+
}
139+
ps.track(pod.ID, isSpot, podInfo{name: pod.Name, phase: pod.Phase}, t.clock.Now())
132140

133141
if isSpot {
134142
if pod.Phase == string(corev1.PodPending) {
@@ -182,28 +190,31 @@ func (t *podTracker) deletePodLocked(o podOwnership, uid string) {
182190
delete(t.pendingSpotPods, uid)
183191
}
184192

185-
// refreshConfigLocked refreshes the spot config for the ownerPodSet from the configSource.
193+
// getPodSetLocked returns the ownerPodSet for the given ownership, creating it if absent,
194+
// and always applies the latest config from configSource to the returned pod set.
195+
// When configSource has no entry for the top-level owner it removes
196+
// tracking state for the owner and returns (nil, false).
186197
// Must be called with t.mu held.
187-
func (t *podTracker) refreshConfigLocked(topLevelOwner objectRef, ps *ownerPodSet) {
188-
if cfg, ok := t.configSource(topLevelOwner); ok {
189-
ps.config = cfg
198+
func (t *podTracker) getPodSetLocked(o podOwnership) (*ownerPodSet, bool) {
199+
cfg, hasConfig := t.configSource(o.topLevelOwner)
200+
if !hasConfig {
201+
t.untrackLocked(o.topLevelOwner)
202+
return nil, false
190203
}
191-
}
192204

193-
// getOrCreatePodSetLocked returns the ownerPodSet for the given ownership, creating it if absent.
194-
// Must be called with t.mu held.
195-
func (t *podTracker) getOrCreatePodSetLocked(o podOwnership) *ownerPodSet {
196205
owners, ok := t.podSets[o.topLevelOwner]
197206
if !ok {
198207
owners = make(map[objectRef]*ownerPodSet)
199208
t.podSets[o.topLevelOwner] = owners
200209
}
201-
if ps, ok := owners[o.directOwner]; ok {
202-
return ps
210+
211+
ps, exists := owners[o.directOwner]
212+
if !exists {
213+
ps = t.newOwnerPodSet()
214+
owners[o.directOwner] = ps
203215
}
204-
ps := t.newOwnerPodSet()
205-
owners[o.directOwner] = ps
206-
return ps
216+
ps.config = cfg
217+
return ps, true
207218
}
208219

209220
// getPodToDelete returns the uid, name, and namespace of a pod to delete to make progress toward
@@ -217,8 +228,13 @@ func (t *podTracker) getPodToDelete(rebalanceStabilizationPeriod time.Duration)
217228
now := t.clock.Now()
218229
lastUpdatedBefore := now.Add(-rebalanceStabilizationPeriod)
219230
for topLevel, owners := range t.podSets {
231+
cfg, ok := t.configSource(topLevel)
232+
if !ok {
233+
t.untrackLocked(topLevel)
234+
continue
235+
}
220236
for owner, ps := range owners {
221-
t.refreshConfigLocked(topLevel, ps)
237+
ps.config = cfg
222238
if ps.config.isDisabled(now) {
223239
continue
224240
}
@@ -256,7 +272,12 @@ func (t *podTracker) deletePendingSpotPod(uid string) {
256272
func (t *podTracker) untrack(topLevelOwner objectRef) {
257273
t.mu.Lock()
258274
defer t.mu.Unlock()
275+
t.untrackLocked(topLevelOwner)
276+
}
259277

278+
// untrackLocked removes all tracking state for the given top-level owner.
279+
// Must be called with t.mu held.
280+
func (t *podTracker) untrackLocked(topLevelOwner objectRef) {
260281
delete(t.podSets, topLevelOwner)
261282
for uid, p := range t.pendingSpotPods {
262283
if p.topLevelOwner == topLevelOwner {

0 commit comments

Comments
 (0)