From 3e0c9b5b666e03833984d9d5347a9452bfc99a99 Mon Sep 17 00:00:00 2001 From: longsuizhi <95574295+longsuizhi@users.noreply.github.com> Date: Thu, 28 May 2026 15:55:00 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20Pool=20?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E4=B8=8B=20Pod=20=E8=A2=AB=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=90=8E=20BatchSandbox=20=E4=B8=8D=E4=BC=9A=E9=87=8D=E6=96=B0?= =?UTF-8?q?=E5=88=86=E9=85=8D=E6=96=B0=20Pod=20=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当 Pool 中已分配给 BatchSandbox 的 Pod 被外部删除时,alloc-status 注解中仍保留 已删除 Pod 的名称,导致 supplement 计算为 0,无法触发重新分配。 本次修复在 getSandboxRequest 中增加了存活检测:将已删除的 Pod 从有效分配中 排除并加入 ToRelease 队列,使 supplement > 0 从而触发 Pool 重新分配新 Pod。 --- kubernetes/internal/controller/allocator.go | 46 +++++++++++++++++---- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/kubernetes/internal/controller/allocator.go b/kubernetes/internal/controller/allocator.go index 008b14669..436c22242 100644 --- a/kubernetes/internal/controller/allocator.go +++ b/kubernetes/internal/controller/allocator.go @@ -460,6 +460,12 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec return nil, err } + // Build a set of live pool pod names for dead-pod detection during allocation requests. + livePodSet := make(map[string]struct{}, len(spec.Pods)) + for _, p := range spec.Pods { + livePodSet[p.Name] = struct{}{} + } + // Fetch pool allocation once and reuse it for both stale-sandbox cleanup and available-pod filtering. // This avoids a double store read on every reconcile. podAllocation, err := allocator.GetPoolAllocation(ctx, spec.Pool) @@ -473,7 +479,7 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // handles them without any special-casing outside this function. // Terminating sandboxes are handled inside getSandboxRequest: they receive no new supplement and // all unreleased pods are queued for release. - allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation) + allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet) if err != nil { return nil, err } @@ -494,13 +500,13 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // orphan entries for pods in podAllocation whose sandbox is no longer in the sandboxes list // (e.g. force-deleted). Orphan entries carry PodSupplement=0 and ToRelease set to the orphan // pods so the normal recycle path handles them without special-casing in the caller. -func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string) ([]*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) existingSandboxes := make(map[string]struct{}, len(sandboxes)) allRequest := make([]*algorithm.SandboxRequest, 0, len(sandboxes)) for _, sandbox := range sandboxes { existingSandboxes[sandbox.Name] = struct{}{} - request, err := allocator.getSandboxRequest(ctx, sandbox) + request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet) if err != nil { return nil, err } @@ -523,7 +529,7 @@ func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes return allRequest, nil } -func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox) (*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}) (*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) allocated, err := allocator.GetSandboxAllocation(ctx, sandbox) if err != nil { @@ -539,15 +545,37 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo releasedSet[r] = struct{}{} } + // Filter out pods that no longer exist in the pool (e.g. externally deleted). + // Dead pods are treated as released so the sandbox can receive replacement allocations. + liveAllocated := make([]string, 0, len(allocated)) + deadPods := make([]string, 0) + for _, p := range allocated { + if _, exists := releasedSet[p]; exists { + // Already released, keep in allocated for bookkeeping consistency. + liveAllocated = append(liveAllocated, p) + continue + } + if _, alive := livePodSet[p]; alive { + liveAllocated = append(liveAllocated, p) + } else { + deadPods = append(deadPods, p) + } + } + if len(deadPods) > 0 { + log.Info("Detected dead allocated pods, queuing for release to trigger re-allocation", + "sandbox", sandbox.Name, "deadPods", deadPods) + } + // Terminating sandboxes should not receive new allocations. // Queue all unreleased allocated pods for release and set supplement to zero. if !sandbox.DeletionTimestamp.IsZero() { toRelease := make([]string, 0) - for _, p := range allocated { + for _, p := range liveAllocated { if _, ok := releasedSet[p]; !ok { toRelease = append(toRelease, p) } } + toRelease = append(toRelease, deadPods...) if len(toRelease) > 0 { log.Info("Queuing terminating sandbox pods for release", "sandbox", sandbox.Name, "pods", toRelease) } @@ -571,15 +599,19 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo toRelease = append(toRelease, r) } } + // Also queue dead pods for release so their allocation records are cleaned up. + toRelease = append(toRelease, deadPods...) replica := int32(0) if sandbox.Spec.Replicas != nil { replica = *sandbox.Spec.Replicas } + // Use liveAllocated count (excluding dead pods) to compute supplement, + // so deleted pods trigger re-allocation from the pool. supplement := int32(0) - if replica-int32(len(allocated)) > 0 { - supplement = replica - int32(len(allocated)) + if replica-int32(len(liveAllocated)) > 0 { + supplement = replica - int32(len(liveAllocated)) } return &algorithm.SandboxRequest{ From a9ff4c90cd8763207198676b0968b0a967bd7214 Mon Sep 17 00:00:00 2001 From: longsuizhi <95574295+longsuizhi@users.noreply.github.com> Date: Thu, 28 May 2026 16:02:30 +0800 Subject: [PATCH 2/3] test(allocator): add unit test for dead pod re-allocation --- .../internal/controller/allocator_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/kubernetes/internal/controller/allocator_test.go b/kubernetes/internal/controller/allocator_test.go index b102ea837..172fdee8a 100644 --- a/kubernetes/internal/controller/allocator_test.go +++ b/kubernetes/internal/controller/allocator_test.go @@ -236,6 +236,29 @@ func TestSchedule(t *testing.T) { PodSupplement: 0, }, }, + { + name: "dead pod triggers re-allocation - allocated pod deleted externally", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 was allocated but is now gone (not in Pods list). + // pod2 is a new available pod in the pool. + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}}, + wantAction: &algorithm.AllocAction{ + ToAllocate: map[string][]string{"sbx1": {"pod2"}}, + ToRelease: map[string][]string{"sbx1": {"pod1"}}, + PodSupplement: 0, + }, + }, { name: "orphan sandbox - pods in store but sandbox no longer in spec", spec: &AllocSpec{ From 46d6222c71726ebd75c67c0350628d908700b8c6 Mon Sep 17 00:00:00 2001 From: longsuizhi <95574295+longsuizhi@users.noreply.github.com> Date: Thu, 28 May 2026 16:08:39 +0800 Subject: [PATCH 3/3] fix(allocator): address CR feedback - exclude released and terminal pods from live allocation - P1: Released pods no longer count toward liveAllocated, preventing stale released entries from blocking subsequent re-allocations. - P2: Only Running+Ready pods are added to livePodSet, so terminal pods (Failed/Evicted) that still have their object present also trigger re-allocation. --- kubernetes/internal/controller/allocator.go | 84 +++++++++++++++---- .../internal/controller/allocator_test.go | 49 +++++++++++ 2 files changed, 117 insertions(+), 16 deletions(-) diff --git a/kubernetes/internal/controller/allocator.go b/kubernetes/internal/controller/allocator.go index 436c22242..73dd61a8b 100644 --- a/kubernetes/internal/controller/allocator.go +++ b/kubernetes/internal/controller/allocator.go @@ -461,9 +461,18 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec } // Build a set of live pool pod names for dead-pod detection during allocation requests. + // Running, Pending, and Unknown pods WITHOUT DeletionTimestamp are considered live. + // Terminating pods (DeletionTimestamp set), terminal pods (Failed/Succeeded), or + // deleted pods (object gone) should trigger re-allocation. livePodSet := make(map[string]struct{}, len(spec.Pods)) + // allPodSet tracks all pods that still have an object (including terminal/terminating ones). + // Used to distinguish "deleted" (object gone) from "terminal/terminating" (object exists). + allPodSet := make(map[string]struct{}, len(spec.Pods)) for _, p := range spec.Pods { - livePodSet[p.Name] = struct{}{} + allPodSet[p.Name] = struct{}{} + if p.Status.Phase != corev1.PodFailed && p.Status.Phase != corev1.PodSucceeded && p.DeletionTimestamp.IsZero() { + livePodSet[p.Name] = struct{}{} + } } // Fetch pool allocation once and reuse it for both stale-sandbox cleanup and available-pod filtering. @@ -479,7 +488,7 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // handles them without any special-casing outside this function. // Terminating sandboxes are handled inside getSandboxRequest: they receive no new supplement and // all unreleased pods are queued for release. - allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet) + allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet, allPodSet) if err != nil { return nil, err } @@ -500,13 +509,13 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // orphan entries for pods in podAllocation whose sandbox is no longer in the sandboxes list // (e.g. force-deleted). Orphan entries carry PodSupplement=0 and ToRelease set to the orphan // pods so the normal recycle path handles them without special-casing in the caller. -func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}, allPodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) existingSandboxes := make(map[string]struct{}, len(sandboxes)) allRequest := make([]*algorithm.SandboxRequest, 0, len(sandboxes)) for _, sandbox := range sandboxes { existingSandboxes[sandbox.Name] = struct{}{} - request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet) + request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet, allPodSet) if err != nil { return nil, err } @@ -529,7 +538,7 @@ func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes return allRequest, nil } -func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}) (*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}, allPodSet map[string]struct{}) (*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) allocated, err := allocator.GetSandboxAllocation(ctx, sandbox) if err != nil { @@ -545,29 +554,55 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo releasedSet[r] = struct{}{} } - // Filter out pods that no longer exist in the pool (e.g. externally deleted). - // Dead pods are treated as released so the sandbox can receive replacement allocations. + // Filter allocated pods into categories: + // - liveAllocated: unreleased live pods + released-but-still-alive pods (count toward replica slots) + // - deadPods: pods whose object is gone (externally deleted), queue for release + // - terminalPods: pods that are terminal/terminating but still exist, queue for release + // to ensure alloc-status cleanup and finalizer removal + // Released pods that are no longer alive don't count (pool already freed them). + // Released pods that are still alive keep their slot to avoid premature supplement + // during active release operations (the recycle flow will eventually remove them). liveAllocated := make([]string, 0, len(allocated)) deadPods := make([]string, 0) + terminalPods := make([]string, 0) for _, p := range allocated { - if _, exists := releasedSet[p]; exists { - // Already released, keep in allocated for bookkeeping consistency. - liveAllocated = append(liveAllocated, p) + _, isReleased := releasedSet[p] + _, isAlive := livePodSet[p] + _, exists := allPodSet[p] + if isReleased && !isAlive { + // Released and gone/terminal — no longer occupies a slot. continue } - if _, alive := livePodSet[p]; alive { + if isReleased && isAlive { + // Released but still alive — keep slot to avoid supplement during active release. liveAllocated = append(liveAllocated, p) - } else { - deadPods = append(deadPods, p) + continue + } + // Not released: + if !isAlive { + if !exists { + // Pod object is completely gone — externally deleted. + deadPods = append(deadPods, p) + } else { + // Exists but terminal/terminating — needs release for alloc-status cleanup. + terminalPods = append(terminalPods, p) + } + continue } + liveAllocated = append(liveAllocated, p) } if len(deadPods) > 0 { log.Info("Detected dead allocated pods, queuing for release to trigger re-allocation", "sandbox", sandbox.Name, "deadPods", deadPods) } + if len(terminalPods) > 0 { + log.Info("Detected terminal/terminating allocated pods, queuing for release", + "sandbox", sandbox.Name, "terminalPods", terminalPods) + } // Terminating sandboxes should not receive new allocations. - // Queue all unreleased allocated pods for release and set supplement to zero. + // Queue all unreleased allocated pods (live, dead, and terminal) for release and set supplement to zero. + // Terminal pods must also be released to ensure the pool-allocation finalizer can be removed. if !sandbox.DeletionTimestamp.IsZero() { toRelease := make([]string, 0) for _, p := range liveAllocated { @@ -576,6 +611,7 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo } } toRelease = append(toRelease, deadPods...) + toRelease = append(toRelease, terminalPods...) if len(toRelease) > 0 { log.Info("Queuing terminating sandbox pods for release", "sandbox", sandbox.Name, "pods", toRelease) } @@ -594,13 +630,29 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo } toRelease := make([]string, 0) + toReleaseSet := make(map[string]struct{}) for _, r := range release { if _, exists := releasedSet[r]; !exists { toRelease = append(toRelease, r) + toReleaseSet[r] = struct{}{} + } + } + // Queue dead pods for release so their allocation records are cleaned up. + // Deduplicate against pods already in alloc-release to avoid duplicate recycle operations. + for _, dp := range deadPods { + if _, exists := toReleaseSet[dp]; !exists { + toRelease = append(toRelease, dp) + toReleaseSet[dp] = struct{}{} + } + } + // Queue terminal/terminating pods for release to clean them from alloc-status/store. + // Without this, terminal pods accumulate as stale allocated slots forever. + for _, tp := range terminalPods { + if _, exists := toReleaseSet[tp]; !exists { + toRelease = append(toRelease, tp) + toReleaseSet[tp] = struct{}{} } } - // Also queue dead pods for release so their allocation records are cleaned up. - toRelease = append(toRelease, deadPods...) replica := int32(0) if sandbox.Spec.Replicas != nil { diff --git a/kubernetes/internal/controller/allocator_test.go b/kubernetes/internal/controller/allocator_test.go index 172fdee8a..6e09cbddf 100644 --- a/kubernetes/internal/controller/allocator_test.go +++ b/kubernetes/internal/controller/allocator_test.go @@ -259,6 +259,55 @@ func TestSchedule(t *testing.T) { PodSupplement: 0, }, }, + { + name: "released pod does not block second re-allocation", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 was previously allocated and released (recycled). + // pod2 was the replacement but is now also deleted. + // pod3 is a new available pod. + {ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1", "pod2": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1", "pod2"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{"pod1"}}}, + wantAction: &algorithm.AllocAction{ + ToAllocate: map[string][]string{"sbx1": {"pod3"}}, + ToRelease: map[string][]string{"sbx1": {"pod2"}}, + PodSupplement: 0, + }, + }, + { + name: "terminal pod (Failed) triggers re-allocation", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 exists but is in Failed state — should not count as live. + {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{Phase: corev1.PodFailed}}, + // pod2 is ready and available. + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}}, + wantAction: &algorithm.AllocAction{ + // Terminal pod is queued for release to clean alloc-status/store. + ToAllocate: map[string][]string{"sbx1": {"pod2"}}, + ToRelease: map[string][]string{"sbx1": {"pod1"}}, + PodSupplement: 0, + }, + }, { name: "orphan sandbox - pods in store but sandbox no longer in spec", spec: &AllocSpec{