Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 92 additions & 8 deletions kubernetes/internal/controller/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,21 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
return nil, err
}

// Build a set of live pool pod names for dead-pod detection during allocation requests.
// Running, Pending, and Unknown pods WITHOUT DeletionTimestamp are considered live.
// Terminating pods (DeletionTimestamp set), terminal pods (Failed/Succeeded), or
// deleted pods (object gone) should trigger re-allocation.
livePodSet := make(map[string]struct{}, len(spec.Pods))
// allPodSet tracks all pods that still have an object (including terminal/terminating ones).
// Used to distinguish "deleted" (object gone) from "terminal/terminating" (object exists).
allPodSet := make(map[string]struct{}, len(spec.Pods))
for _, p := range spec.Pods {
allPodSet[p.Name] = struct{}{}
if p.Status.Phase != corev1.PodFailed && p.Status.Phase != corev1.PodSucceeded && p.DeletionTimestamp.IsZero() {
livePodSet[p.Name] = struct{}{}
}
}

// Fetch pool allocation once and reuse it for both stale-sandbox cleanup and available-pod filtering.
// This avoids a double store read on every reconcile.
podAllocation, err := allocator.GetPoolAllocation(ctx, spec.Pool)
Expand All @@ -473,7 +488,7 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
// handles them without any special-casing outside this function.
// Terminating sandboxes are handled inside getSandboxRequest: they receive no new supplement and
// all unreleased pods are queued for release.
allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation)
allRequest, err := allocator.getAllRequest(ctx, spec.Sandboxes, podAllocation, livePodSet, allPodSet)
if err != nil {
return nil, err
}
Expand All @@ -494,13 +509,13 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
// orphan entries for pods in podAllocation whose sandbox is no longer in the sandboxes list
// (e.g. force-deleted). Orphan entries carry PodSupplement=0 and ToRelease set to the orphan
// pods so the normal recycle path handles them without special-casing in the caller.
func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string) ([]*algorithm.SandboxRequest, error) {
func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes []*sandboxv1alpha1.BatchSandbox, podAllocation map[string]string, livePodSet map[string]struct{}, allPodSet map[string]struct{}) ([]*algorithm.SandboxRequest, error) {
log := logf.FromContext(ctx)
existingSandboxes := make(map[string]struct{}, len(sandboxes))
allRequest := make([]*algorithm.SandboxRequest, 0, len(sandboxes))
for _, sandbox := range sandboxes {
existingSandboxes[sandbox.Name] = struct{}{}
request, err := allocator.getSandboxRequest(ctx, sandbox)
request, err := allocator.getSandboxRequest(ctx, sandbox, livePodSet, allPodSet)
if err != nil {
return nil, err
}
Expand All @@ -523,7 +538,7 @@ func (allocator *defaultAllocator) getAllRequest(ctx context.Context, sandboxes
return allRequest, nil
}

func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox) (*algorithm.SandboxRequest, error) {
func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, livePodSet map[string]struct{}, allPodSet map[string]struct{}) (*algorithm.SandboxRequest, error) {
log := logf.FromContext(ctx)
allocated, err := allocator.GetSandboxAllocation(ctx, sandbox)
if err != nil {
Expand All @@ -539,15 +554,64 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
releasedSet[r] = struct{}{}
}

// Filter allocated pods into categories:
// - liveAllocated: unreleased live pods + released-but-still-alive pods (count toward replica slots)
// - deadPods: pods whose object is gone (externally deleted), queue for release
// - terminalPods: pods that are terminal/terminating but still exist, queue for release
// to ensure alloc-status cleanup and finalizer removal
// Released pods that are no longer alive don't count (pool already freed them).
// Released pods that are still alive keep their slot to avoid premature supplement
// during active release operations (the recycle flow will eventually remove them).
liveAllocated := make([]string, 0, len(allocated))
deadPods := make([]string, 0)
terminalPods := make([]string, 0)
for _, p := range allocated {
_, isReleased := releasedSet[p]
_, isAlive := livePodSet[p]
_, exists := allPodSet[p]
if isReleased && !isAlive {
// Released and gone/terminal — no longer occupies a slot.
continue
Comment thread
longsuizhi marked this conversation as resolved.
Comment thread
longsuizhi marked this conversation as resolved.
}
if isReleased && isAlive {
// Released but still alive — keep slot to avoid supplement during active release.
liveAllocated = append(liveAllocated, p)
continue
}
// Not released:
if !isAlive {
if !exists {
// Pod object is completely gone — externally deleted.
deadPods = append(deadPods, p)
} else {
// Exists but terminal/terminating — needs release for alloc-status cleanup.
terminalPods = append(terminalPods, p)
}
continue
}
liveAllocated = append(liveAllocated, p)
}
if len(deadPods) > 0 {
log.Info("Detected dead allocated pods, queuing for release to trigger re-allocation",
"sandbox", sandbox.Name, "deadPods", deadPods)
}
if len(terminalPods) > 0 {
log.Info("Detected terminal/terminating allocated pods, queuing for release",
"sandbox", sandbox.Name, "terminalPods", terminalPods)
}

// Terminating sandboxes should not receive new allocations.
// Queue all unreleased allocated pods for release and set supplement to zero.
// Queue all unreleased allocated pods (live, dead, and terminal) for release and set supplement to zero.
// Terminal pods must also be released to ensure the pool-allocation finalizer can be removed.
if !sandbox.DeletionTimestamp.IsZero() {
toRelease := make([]string, 0)
for _, p := range allocated {
for _, p := range liveAllocated {
Comment thread
longsuizhi marked this conversation as resolved.
if _, ok := releasedSet[p]; !ok {
toRelease = append(toRelease, p)
}
}
toRelease = append(toRelease, deadPods...)
toRelease = append(toRelease, terminalPods...)
if len(toRelease) > 0 {
log.Info("Queuing terminating sandbox pods for release", "sandbox", sandbox.Name, "pods", toRelease)
}
Expand All @@ -566,9 +630,27 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
}

toRelease := make([]string, 0)
toReleaseSet := make(map[string]struct{})
for _, r := range release {
if _, exists := releasedSet[r]; !exists {
toRelease = append(toRelease, r)
toReleaseSet[r] = struct{}{}
}
}
// Queue dead pods for release so their allocation records are cleaned up.
// Deduplicate against pods already in alloc-release to avoid duplicate recycle operations.
for _, dp := range deadPods {
if _, exists := toReleaseSet[dp]; !exists {
toRelease = append(toRelease, dp)
toReleaseSet[dp] = struct{}{}
}
}
// Queue terminal/terminating pods for release to clean them from alloc-status/store.
// Without this, terminal pods accumulate as stale allocated slots forever.
for _, tp := range terminalPods {
if _, exists := toReleaseSet[tp]; !exists {
toRelease = append(toRelease, tp)
toReleaseSet[tp] = struct{}{}
Comment thread
longsuizhi marked this conversation as resolved.
}
}

Expand All @@ -577,9 +659,11 @@ func (allocator *defaultAllocator) getSandboxRequest(ctx context.Context, sandbo
replica = *sandbox.Spec.Replicas
}

// Use liveAllocated count (excluding dead pods) to compute supplement,
// so deleted pods trigger re-allocation from the pool.
supplement := int32(0)
if replica-int32(len(allocated)) > 0 {
supplement = replica - int32(len(allocated))
if replica-int32(len(liveAllocated)) > 0 {
supplement = replica - int32(len(liveAllocated))
}

return &algorithm.SandboxRequest{
Expand Down
72 changes: 72 additions & 0 deletions kubernetes/internal/controller/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,78 @@ func TestSchedule(t *testing.T) {
PodSupplement: 0,
},
},
{
name: "dead pod triggers re-allocation - allocated pod deleted externally",
spec: &AllocSpec{
Pods: []*corev1.Pod{
// pod1 was allocated but is now gone (not in Pods list).
// pod2 is a new available pod in the pool.
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}},
},
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
{ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}},
},
},
poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}},
sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}},
releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}},
released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}},
wantAction: &algorithm.AllocAction{
ToAllocate: map[string][]string{"sbx1": {"pod2"}},
ToRelease: map[string][]string{"sbx1": {"pod1"}},
PodSupplement: 0,
},
},
{
name: "released pod does not block second re-allocation",
spec: &AllocSpec{
Pods: []*corev1.Pod{
// pod1 was previously allocated and released (recycled).
// pod2 was the replacement but is now also deleted.
// pod3 is a new available pod.
{ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}},
},
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
{ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}},
},
},
poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1", "pod2": "sbx1"}},
sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1", "pod2"}}},
releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}},
released: map[string]*AllocationReleased{"sbx1": {Pods: []string{"pod1"}}},
wantAction: &algorithm.AllocAction{
ToAllocate: map[string][]string{"sbx1": {"pod3"}},
ToRelease: map[string][]string{"sbx1": {"pod2"}},
PodSupplement: 0,
},
},
{
name: "terminal pod (Failed) triggers re-allocation",
spec: &AllocSpec{
Pods: []*corev1.Pod{
// pod1 exists but is in Failed state — should not count as live.
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, Status: corev1.PodStatus{Phase: corev1.PodFailed}},
// pod2 is ready and available.
{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}, Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}},
},
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
{ObjectMeta: metav1.ObjectMeta{Name: "sbx1"}, Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1}},
},
},
poolAlloc: &PoolAllocation{PodAllocation: map[string]string{"pod1": "sbx1"}},
sandboxAllocs: map[string]*SandboxAllocation{"sbx1": {Pods: []string{"pod1"}}},
releases: map[string]*AllocationRelease{"sbx1": {Pods: []string{}}},
released: map[string]*AllocationReleased{"sbx1": {Pods: []string{}}},
wantAction: &algorithm.AllocAction{
// Terminal pod is queued for release to clean alloc-status/store.
ToAllocate: map[string][]string{"sbx1": {"pod2"}},
ToRelease: map[string][]string{"sbx1": {"pod1"}},
PodSupplement: 0,
},
},
{
name: "orphan sandbox - pods in store but sandbox no longer in spec",
spec: &AllocSpec{
Expand Down
Loading