Skip to content

Commit 20956e7

Browse files
author
longsuizhi
committed
fix(allocator): address CR feedback - exclude released and terminal pods from live allocation
- P1: Released pods no longer count toward liveAllocated, preventing stale released entries from blocking subsequent re-allocations. - P2: Only Running+Ready pods are added to livePodSet, so terminal pods (Failed/Evicted) that still have their object present also trigger re-allocation.
1 parent d4b8e95 commit 20956e7

2 files changed

Lines changed: 117 additions & 16 deletions

File tree

kubernetes/internal/controller/allocator.go

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,18 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
461461
}
462462

463463
// Build a set of live pool pod names for dead-pod detection during allocation requests.
464+
// Running, Pending, and Unknown pods WITHOUT DeletionTimestamp are considered live.
465+
// Terminating pods (DeletionTimestamp set), terminal pods (Failed/Succeeded), or
466+
// deleted pods (object gone) should trigger re-allocation.
464467
livePodSet := make(map[string]struct{}, len(spec.Pods))
468+
// allPodSet tracks all pods that still have an object (including terminal/terminating ones).
469+
// Used to distinguish "deleted" (object gone) from "terminal/terminating" (object exists).
470+
allPodSet := make(map[string]struct{}, len(spec.Pods))
465471
for _, p := range spec.Pods {
466-
livePodSet[p.Name] = struct{}{}
472+
allPodSet[p.Name] = struct{}{}
473+
if p.Status.Phase != corev1.PodFailed && p.Status.Phase != corev1.PodSucceeded && p.DeletionTimestamp.IsZero() {
474+
livePodSet[p.Name] = struct{}{}
475+
}
467476
}
468477

469478
// Fetch pool allocation once and reuse it for both stale-sandbox cleanup and available-pod filtering.
@@ -479,7 +488,7 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
479488
// handles them without any special-casing outside this function.
480489
// Terminating sandboxes are handled inside getSandboxRequest: they receive no new supplement and
481490
// all unreleased pods are queued for release.
482-
allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet)
491+
allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet, allPodSet)
483492
if err != nil {
484493
return nil, err
485494
}
@@ -500,13 +509,13 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
500509
// orphan entries for pods in podAllocation whose sandbox is no longer in the sandboxes list
501510
// (e.g. force-deleted). Orphan entries carry PodSupplement=0 and ToRelease set to the orphan
502511
// pods so the normal recycle path handles them without special-casing in the caller.
503-
func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) {
512+
func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}, allPodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) {
504513
log := logf.FromContext(ctx)
505514
existingSandboxes := make(map[string]struct{}, len(sandboxes))
506515
allRequest := make([]*algorithm.SandboxRequest, 0, len(sandboxes))
507516
for _, sandbox := range sandboxes {
508517
existingSandboxes[sandbox.Name] = struct{}{}
509-
request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet)
518+
request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet, allPodSet)
510519
if err != nil {
511520
return nil, err
512521
}
@@ -529,7 +538,7 @@ func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes
529538
return allRequest, nil
530539
}
531540

532-
func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}) (*algorithm.SandboxRequest, error) {
541+
func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}, allPodSet map[string]struct{}) (*algorithm.SandboxRequest, error) {
533542
log := logf.FromContext(ctx)
534543
allocated, err := allocator.GetSandboxAllocation(ctx, sandbox)
535544
if err != nil {
@@ -545,29 +554,55 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
545554
releasedSet[r] = struct{}{}
546555
}
547556

548-
// Filter out pods that no longer exist in the pool (e.g. externally deleted).
549-
// Dead pods are treated as released so the sandbox can receive replacement allocations.
557+
// Filter allocated pods into categories:
558+
// - liveAllocated: unreleased live pods + released-but-still-alive pods (count toward replica slots)
559+
// - deadPods: pods whose object is gone (externally deleted), queue for release
560+
// - terminalPods: pods that are terminal/terminating but still exist, queue for release
561+
// to ensure alloc-status cleanup and finalizer removal
562+
// Released pods that are no longer alive don't count (pool already freed them).
563+
// Released pods that are still alive keep their slot to avoid premature supplement
564+
// during active release operations (the recycle flow will eventually remove them).
550565
liveAllocated := make([]string, 0, len(allocated))
551566
deadPods := make([]string, 0)
567+
terminalPods := make([]string, 0)
552568
for _, p := range allocated {
553-
if _, exists := releasedSet[p]; exists {
554-
// Already released, keep in allocated for bookkeeping consistency.
555-
liveAllocated = append(liveAllocated, p)
569+
_, isReleased := releasedSet[p]
570+
_, isAlive := livePodSet[p]
571+
_, exists := allPodSet[p]
572+
if isReleased && !isAlive {
573+
// Released and gone/terminal — no longer occupies a slot.
556574
continue
557575
}
558-
if _, alive := livePodSet[p]; alive {
576+
if isReleased && isAlive {
577+
// Released but still alive — keep slot to avoid supplement during active release.
559578
liveAllocated = append(liveAllocated, p)
560-
} else {
561-
deadPods = append(deadPods, p)
579+
continue
580+
}
581+
// Not released:
582+
if !isAlive {
583+
if !exists {
584+
// Pod object is completely gone — externally deleted.
585+
deadPods = append(deadPods, p)
586+
} else {
587+
// Exists but terminal/terminating — needs release for alloc-status cleanup.
588+
terminalPods = append(terminalPods, p)
589+
}
590+
continue
562591
}
592+
liveAllocated = append(liveAllocated, p)
563593
}
564594
if len(deadPods) > 0 {
565595
log.Info("Detected dead allocated pods, queuing for release to trigger re-allocation",
566596
"sandbox", sandbox.Name, "deadPods", deadPods)
567597
}
598+
if len(terminalPods) > 0 {
599+
log.Info("Detected terminal/terminating allocated pods, queuing for release",
600+
"sandbox", sandbox.Name, "terminalPods", terminalPods)
601+
}
568602

569603
// Terminating sandboxes should not receive new allocations.
570-
// Queue all unreleased allocated pods for release and set supplement to zero.
604+
// Queue all unreleased allocated pods (live, dead, and terminal) for release and set supplement to zero.
605+
// Terminal pods must also be released to ensure the pool-allocation finalizer can be removed.
571606
if !sandbox.DeletionTimestamp.IsZero() {
572607
toRelease := make([]string, 0)
573608
for _, p := range liveAllocated {
@@ -576,6 +611,7 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
576611
}
577612
}
578613
toRelease = append(toRelease, deadPods...)
614+
toRelease = append(toRelease, terminalPods...)
579615
if len(toRelease) > 0 {
580616
log.Info("Queuing terminating sandbox pods for release", "sandbox", sandbox.Name, "pods", toRelease)
581617
}
@@ -594,13 +630,29 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
594630
}
595631

596632
toRelease := make([]string, 0)
633+
toReleaseSet := make(map[string]struct{})
597634
for _, r := range release {
598635
if _, exists := releasedSet[r]; !exists {
599636
toRelease = append(toRelease, r)
637+
toReleaseSet[r] = struct{}{}
638+
}
639+
}
640+
// Queue dead pods for release so their allocation records are cleaned up.
641+
// Deduplicate against pods already in alloc-release to avoid duplicate recycle operations.
642+
for _, dp := range deadPods {
643+
if _, exists := toReleaseSet[dp]; !exists {
644+
toRelease = append(toRelease, dp)
645+
toReleaseSet[dp] = struct{}{}
646+
}
647+
}
648+
// Queue terminal/terminating pods for release to clean them from alloc-status/store.
649+
// Without this, terminal pods accumulate as stale allocated slots forever.
650+
for _, tp := range terminalPods {
651+
if _, exists := toReleaseSet[tp]; !exists {
652+
toRelease = append(toRelease, tp)
653+
toReleaseSet[tp] = struct{}{}
600654
}
601655
}
602-
// Also queue dead pods for release so their allocation records are cleaned up.
603-
toRelease = append(toRelease, deadPods...)
604656

605657
replica := int32(0)
606658
if sandbox.Spec.Replicas != nil {

kubernetes/internal/controller/allocator_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,55 @@ func TestSchedule(t *testing.T) {
259259
PodSupplement: 0,
260260
},
261261
},
262+
{
263+
name: "released pod does not block second re-allocation",
264+
spec: &AllocSpec{
265+
Pods: []*corev1.Pod{
266+
// pod1 was previously allocated and released (recycled).
267+
// pod2 was the replacement but is now also deleted.
268+
// pod3 is a new available pod.
269+
{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}},
270+
},
271+
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
272+
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
273+
{ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}},
274+
},
275+
},
276+
poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1", "pod2": "sbx1"}},
277+
sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1", "pod2"}}},
278+
releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}},
279+
released: map[string]*AllocationReleased{"sbx1": {Pods: []string{"pod1"}}},
280+
wantAction: &algorithm.AllocAction{
281+
ToAllocate: map[string][]string{"sbx1": {"pod3"}},
282+
ToRelease: map[string][]string{"sbx1": {"pod2"}},
283+
PodSupplement: 0,
284+
},
285+
},
286+
{
287+
name: "terminal pod (Failed) triggers re-allocation",
288+
spec: &AllocSpec{
289+
Pods: []*corev1.Pod{
290+
// pod1 exists but is in Failed state — should not count as live.
291+
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{Phase: corev1.PodFailed}},
292+
// pod2 is ready and available.
293+
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}},
294+
},
295+
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
296+
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
297+
{ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}},
298+
},
299+
},
300+
poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}},
301+
sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}},
302+
releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}},
303+
released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}},
304+
wantAction: &algorithm.AllocAction{
305+
// Terminal pod is queued for release to clean alloc-status/store.
306+
ToAllocate: map[string][]string{"sbx1": {"pod2"}},
307+
ToRelease: map[string][]string{"sbx1": {"pod1"}},
308+
PodSupplement: 0,
309+
},
310+
},
262311
{
263312
name: "orphan sandbox - pods in store but sandbox no longer in spec",
264313
spec: &AllocSpec{

0 commit comments

Comments
 (0)