diff --git a/kubernetes/internal/controller/allocator.go b/kubernetes/internal/controller/allocator.go index 008b14669..73dd61a8b 100644 --- a/kubernetes/internal/controller/allocator.go +++ b/kubernetes/internal/controller/allocator.go @@ -460,6 +460,21 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec return nil, err } + // Build a set of live pool pod names for dead-pod detection during allocation requests. + // Running, Pending, and Unknown pods WITHOUT DeletionTimestamp are considered live. + // Terminating pods (DeletionTimestamp set), terminal pods (Failed/Succeeded), or + // deleted pods (object gone) should trigger re-allocation. + livePodSet := make(map[string]struct{}, len(spec.Pods)) + // allPodSet tracks all pods that still have an object (including terminal/terminating ones). + // Used to distinguish "deleted" (object gone) from "terminal/terminating" (object exists). + allPodSet := make(map[string]struct{}, len(spec.Pods)) + for _, p := range spec.Pods { + allPodSet[p.Name] = struct{}{} + if p.Status.Phase != corev1.PodFailed && p.Status.Phase != corev1.PodSucceeded && p.DeletionTimestamp.IsZero() { + livePodSet[p.Name] = struct{}{} + } + } + // Fetch pool allocation once and reuse it for both stale-sandbox cleanup and available-pod filtering. // This avoids a double store read on every reconcile. podAllocation, err := allocator.GetPoolAllocation(ctx, spec.Pool) @@ -473,7 +488,7 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // handles them without any special-casing outside this function. // Terminating sandboxes are handled inside getSandboxRequest: they receive no new supplement and // all unreleased pods are queued for release. - allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation) + allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet, allPodSet) if err != nil { return nil, err } @@ -494,13 +509,13 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec // orphan entries for pods in podAllocation whose sandbox is no longer in the sandboxes list // (e.g. force-deleted). Orphan entries carry PodSupplement=0 and ToRelease set to the orphan // pods so the normal recycle path handles them without special-casing in the caller. -func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string) ([]*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}, allPodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) existingSandboxes := make(map[string]struct{}, len(sandboxes)) allRequest := make([]*algorithm.SandboxRequest, 0, len(sandboxes)) for _, sandbox := range sandboxes { existingSandboxes[sandbox.Name] = struct{}{} - request, err := allocator.getSandboxRequest(ctx, sandbox) + request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet, allPodSet) if err != nil { return nil, err } @@ -523,7 +538,7 @@ func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes return allRequest, nil } -func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox) (*algorithm.SandboxRequest, error) { +func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}, allPodSet map[string]struct{}) (*algorithm.SandboxRequest, error) { log := logf.FromContext(ctx) allocated, err := allocator.GetSandboxAllocation(ctx, sandbox) if err != nil { @@ -539,15 +554,64 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo releasedSet[r] = struct{}{} } + // Filter allocated pods into categories: + // - liveAllocated: unreleased live pods + released-but-still-alive pods (count toward replica slots) + // - deadPods: pods whose object is gone (externally deleted), queue for release + // - terminalPods: pods that are terminal/terminating but still exist, queue for release + // to ensure alloc-status cleanup and finalizer removal + // Released pods that are no longer alive don't count (pool already freed them). + // Released pods that are still alive keep their slot to avoid premature supplement + // during active release operations (the recycle flow will eventually remove them). + liveAllocated := make([]string, 0, len(allocated)) + deadPods := make([]string, 0) + terminalPods := make([]string, 0) + for _, p := range allocated { + _, isReleased := releasedSet[p] + _, isAlive := livePodSet[p] + _, exists := allPodSet[p] + if isReleased && !isAlive { + // Released and gone/terminal — no longer occupies a slot. + continue + } + if isReleased && isAlive { + // Released but still alive — keep slot to avoid supplement during active release. + liveAllocated = append(liveAllocated, p) + continue + } + // Not released: + if !isAlive { + if !exists { + // Pod object is completely gone — externally deleted. + deadPods = append(deadPods, p) + } else { + // Exists but terminal/terminating — needs release for alloc-status cleanup. + terminalPods = append(terminalPods, p) + } + continue + } + liveAllocated = append(liveAllocated, p) + } + if len(deadPods) > 0 { + log.Info("Detected dead allocated pods, queuing for release to trigger re-allocation", + "sandbox", sandbox.Name, "deadPods", deadPods) + } + if len(terminalPods) > 0 { + log.Info("Detected terminal/terminating allocated pods, queuing for release", + "sandbox", sandbox.Name, "terminalPods", terminalPods) + } + // Terminating sandboxes should not receive new allocations. - // Queue all unreleased allocated pods for release and set supplement to zero. + // Queue all unreleased allocated pods (live, dead, and terminal) for release and set supplement to zero. + // Terminal pods must also be released to ensure the pool-allocation finalizer can be removed. if !sandbox.DeletionTimestamp.IsZero() { toRelease := make([]string, 0) - for _, p := range allocated { + for _, p := range liveAllocated { if _, ok := releasedSet[p]; !ok { toRelease = append(toRelease, p) } } + toRelease = append(toRelease, deadPods...) + toRelease = append(toRelease, terminalPods...) if len(toRelease) > 0 { log.Info("Queuing terminating sandbox pods for release", "sandbox", sandbox.Name, "pods", toRelease) } @@ -566,9 +630,27 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo } toRelease := make([]string, 0) + toReleaseSet := make(map[string]struct{}) for _, r := range release { if _, exists := releasedSet[r]; !exists { toRelease = append(toRelease, r) + toReleaseSet[r] = struct{}{} + } + } + // Queue dead pods for release so their allocation records are cleaned up. + // Deduplicate against pods already in alloc-release to avoid duplicate recycle operations. + for _, dp := range deadPods { + if _, exists := toReleaseSet[dp]; !exists { + toRelease = append(toRelease, dp) + toReleaseSet[dp] = struct{}{} + } + } + // Queue terminal/terminating pods for release to clean them from alloc-status/store. + // Without this, terminal pods accumulate as stale allocated slots forever. + for _, tp := range terminalPods { + if _, exists := toReleaseSet[tp]; !exists { + toRelease = append(toRelease, tp) + toReleaseSet[tp] = struct{}{} } } @@ -577,9 +659,11 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo replica = *sandbox.Spec.Replicas } + // Use liveAllocated count (excluding dead pods) to compute supplement, + // so deleted pods trigger re-allocation from the pool. supplement := int32(0) - if replica-int32(len(allocated)) > 0 { - supplement = replica - int32(len(allocated)) + if replica-int32(len(liveAllocated)) > 0 { + supplement = replica - int32(len(liveAllocated)) } return &algorithm.SandboxRequest{ diff --git a/kubernetes/internal/controller/allocator_test.go b/kubernetes/internal/controller/allocator_test.go index b102ea837..6e09cbddf 100644 --- a/kubernetes/internal/controller/allocator_test.go +++ b/kubernetes/internal/controller/allocator_test.go @@ -236,6 +236,78 @@ func TestSchedule(t *testing.T) { PodSupplement: 0, }, }, + { + name: "dead pod triggers re-allocation - allocated pod deleted externally", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 was allocated but is now gone (not in Pods list). + // pod2 is a new available pod in the pool. + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}}, + wantAction: &algorithm.AllocAction{ + ToAllocate: map[string][]string{"sbx1": {"pod2"}}, + ToRelease: map[string][]string{"sbx1": {"pod1"}}, + PodSupplement: 0, + }, + }, + { + name: "released pod does not block second re-allocation", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 was previously allocated and released (recycled). + // pod2 was the replacement but is now also deleted. + // pod3 is a new available pod. + {ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1", "pod2": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1", "pod2"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{"pod1"}}}, + wantAction: &algorithm.AllocAction{ + ToAllocate: map[string][]string{"sbx1": {"pod3"}}, + ToRelease: map[string][]string{"sbx1": {"pod2"}}, + PodSupplement: 0, + }, + }, + { + name: "terminal pod (Failed) triggers re-allocation", + spec: &AllocSpec{ + Pods: []*corev1.Pod{ + // pod1 exists but is in Failed state — should not count as live. + {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{Phase: corev1.PodFailed}}, + // pod2 is ready and available. + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}}, + }, + Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}}, + Sandboxes: []*sandboxv1alpha1.BatchSandbox{ + {ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}}, + }, + }, + poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}}, + sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}}, + releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}}, + released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}}, + wantAction: &algorithm.AllocAction{ + // Terminal pod is queued for release to clean alloc-status/store. + ToAllocate: map[string][]string{"sbx1": {"pod2"}}, + ToRelease: map[string][]string{"sbx1": {"pod1"}}, + PodSupplement: 0, + }, + }, { name: "orphan sandbox - pods in store but sandbox no longer in spec", spec: &AllocSpec{