Skip to content

Commit beb1e05

Browse files
committed
fix(pool): skip recycling pods in allocate actions
1 parent 453a039 commit beb1e05

5 files changed

Lines changed: 43 additions & 21 deletions

File tree

kubernetes/internal/controller/allocator.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,6 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
227227
if spec.RecyclingPods.Has(pod.Name) {
228228
continue
229229
}
230-
// Exclude pods that are restarting (have recycle-meta annotation)
231-
if isRecycling(pod) {
232-
continue
233-
}
234230
if pod.Status.Phase != corev1.PodRunning {
235231
continue
236232
}
@@ -240,11 +236,11 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
240236
for podName, sandboxName := range status.PodAllocation {
241237
sandboxToPods[sandboxName] = append(sandboxToPods[sandboxName], podName)
242238
}
243-
sandboxAlloc, dirtySandboxes, poolAllocate, err := allocator.allocate(ctx, status, sandboxToPods, availablePods, spec.Sandboxes, spec.Pods)
239+
sandboxAlloc, dirtySandboxes, poolAllocate, err := allocator.allocate(ctx, status, sandboxToPods, availablePods, spec.Sandboxes)
244240
if err != nil {
245241
log.Error(err, "allocate failed")
246242
}
247-
poolDeallocate, err := allocator.deallocate(ctx, status, sandboxToPods, spec.Sandboxes)
243+
poolDeallocate, err := allocator.deallocate(ctx, status, sandboxToPods, spec.Sandboxes, spec.RecyclingPods)
248244
if err != nil {
249245
log.Error(err, "deallocate failed")
250246
}
@@ -284,7 +280,7 @@ func (allocator *defaultAllocator) initAllocation(ctx context.Context, spec *All
284280
return status, nil
285281
}
286282

287-
func (allocator *defaultAllocator) allocate(ctx context.Context, status *AllocStatus, sandboxToPods map[string][]string, availablePods []string, sandboxes []*sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (map[string][]string, []string, bool, error) {
283+
func (allocator *defaultAllocator) allocate(ctx context.Context, status *AllocStatus, sandboxToPods map[string][]string, availablePods []string, sandboxes []*sandboxv1alpha1.BatchSandbox) (map[string][]string, []string, bool, error) {
288284
errs := make([]error, 0)
289285
sandboxAlloc := make(map[string][]string)
290286
dirtySandboxes := make([]string, 0)
@@ -366,7 +362,7 @@ func (allocator *defaultAllocator) doAllocate(ctx context.Context, status *Alloc
366362
return sandboxAlloc, remainAvailablePods, sandboxDirty, poolAllocate, nil
367363
}
368364

369-
func (allocator *defaultAllocator) deallocate(ctx context.Context, status *AllocStatus, sandboxToPods map[string][]string, sandboxes []*sandboxv1alpha1.BatchSandbox) (bool, error) {
365+
func (allocator *defaultAllocator) deallocate(ctx context.Context, status *AllocStatus, sandboxToPods map[string][]string, sandboxes []*sandboxv1alpha1.BatchSandbox, recycling sets.Set[string]) (bool, error) {
370366
log := logf.FromContext(ctx)
371367
poolDeallocate := false
372368
errs := make([]error, 0)
@@ -393,6 +389,9 @@ func (allocator *defaultAllocator) deallocate(ctx context.Context, status *Alloc
393389
pods := sandboxToPods[name]
394390
log.Info("GC deleted sandbox allocation", "sandbox", name, "podCount", len(pods))
395391
for _, pod := range pods {
392+
if recycling.Has(pod) {
393+
continue
394+
}
396395
delete(status.PodAllocation, pod)
397396
poolDeallocate = true
398397
}

kubernetes/internal/controller/allocator_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
corev1 "k8s.io/api/core/v1"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/util/sets"
2526

2627
sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1"
2728
"github.com/golang/mock/gomock"
@@ -324,6 +325,7 @@ func TestAllocatorSchedule(t *testing.T) {
324325
Name: "pool1",
325326
},
326327
},
328+
RecyclingPods: sets.New("pod-deallocated"),
327329
Sandboxes: []*sandboxv1alpha1.BatchSandbox{
328330
{
329331
ObjectMeta: metav1.ObjectMeta{
@@ -610,9 +612,10 @@ func TestScheduleExcludesRestartingPods(t *testing.T) {
610612
},
611613
}
612614
spec := &AllocSpec{
613-
Pods: pods,
614-
Sandboxes: sandboxes,
615-
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
615+
Pods: pods,
616+
Sandboxes: sandboxes,
617+
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
618+
RecyclingPods: sets.New("pod-restarting"),
616619
}
617620

618621
store.EXPECT().GetAllocation(gomock.Any(), gomock.Any()).Return(&PoolAllocation{PodAllocation: map[string]string{}}, nil).Times(1)

kubernetes/internal/controller/batchsandbox_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,14 @@ func (r *BatchSandboxReconciler) addDeallocatedFromLabel(ctx context.Context, bs
622622
if err != nil {
623623
return err
624624
}
625+
released, err := parseSandboxReleased(bsx)
626+
if err != nil {
627+
return err
628+
}
629+
pods := sets.NewString(released.Pods...)
630+
pods.Insert(alloc.Pods...)
625631

626-
for _, podName := range alloc.Pods {
632+
for podName := range pods {
627633
pod := &corev1.Pod{}
628634
err = r.Get(ctx, types.NamespacedName{Namespace: bsx.Namespace, Name: podName}, pod)
629635
if errors.IsNotFound(err) {

kubernetes/internal/controller/pool_controller.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (r *PoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
158158
return r.reconcilePool(ctx, pool, batchSandboxes, pods)
159159
}
160160

161-
func (r *PoolReconciler) collectRecyclingPods(batchSandboxes []*sandboxv1alpha1.BatchSandbox) (sets.Set[string], error) {
161+
func (r *PoolReconciler) collectRecyclingPods(batchSandboxes []*sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (sets.Set[string], error) {
162162
recycling := make(sets.Set[string])
163163
for _, batchSandbox := range batchSandboxes {
164164
if batchSandbox.DeletionTimestamp != nil {
@@ -178,6 +178,11 @@ func (r *PoolReconciler) collectRecyclingPods(batchSandboxes []*sandboxv1alpha1.
178178
}
179179
}
180180
}
181+
for _, pod := range pods {
182+
if isRecycling(pod) {
183+
recycling.Insert(pod.Name)
184+
}
185+
}
181186
return recycling, nil
182187
}
183188

@@ -194,7 +199,7 @@ func (r *PoolReconciler) reconcilePool(ctx context.Context, pool *sandboxv1alpha
194199
}
195200

196201
// 2. First, handle Pod Recycle to ensure pods are ready for scheduling
197-
recyclingPods, err := r.collectRecyclingPods(batchSandboxes)
202+
recyclingPods, err := r.collectRecyclingPods(batchSandboxes, pods)
198203
if err != nil {
199204
log.Error(err, "Failed to collect recycling pods")
200205
return err
@@ -250,7 +255,7 @@ func (r *PoolReconciler) reconcilePool(ctx context.Context, pool *sandboxv1alpha
250255
pool: latestPool,
251256
pods: pods,
252257
allocatedCnt: int32(len(scheRes.podAllocation)),
253-
recycling: int32(len(recyclingPods)),
258+
recycling: recyclingPods,
254259
idlePods: latestIdlePods,
255260
redundantPods: deleteOld,
256261
supplyCnt: scheRes.supplySandbox + supplyNew,
@@ -310,6 +315,9 @@ func (r *PoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
310315
if oldObj.Spec.Replicas != newObj.Spec.Replicas {
311316
return true
312317
}
318+
if oldObj.DeletionTimestamp == nil && newObj.DeletionTimestamp != nil {
319+
return true
320+
}
313321
return false
314322
},
315323
DeleteFunc: func(e event.DeleteEvent) bool {
@@ -425,8 +433,8 @@ type scaleArgs struct {
425433
pool *sandboxv1alpha1.Pool
426434
pods []*corev1.Pod
427435
allocatedCnt int32
428-
recycling int32 // pods that are restarting and not available
429-
supplyCnt int32 // to create
436+
recycling sets.Set[string] // pods that are restarting and not available
437+
supplyCnt int32 // to create
430438
idlePods []string
431439
redundantPods []string
432440
}
@@ -446,7 +454,7 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error {
446454
supplyCnt := args.supplyCnt
447455
redundantPods := args.redundantPods
448456
// Buffer count excludes allocated and restarting pods
449-
bufferCnt := totalCnt - allocatedCnt - recycling
457+
bufferCnt := totalCnt - allocatedCnt - int32(len(recycling))
450458

451459
// Calculate desired buffer cnt.
452460
desiredBufferCnt := bufferCnt
@@ -482,7 +490,7 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error {
482490
if desiredTotalCnt < totalCnt {
483491
scaleIn = totalCnt - desiredTotalCnt
484492
}
485-
podsToDelete := r.pickPodsToDelete(pods, args.idlePods, args.redundantPods, scaleIn)
493+
podsToDelete := r.pickPodsToDelete(pods, args.idlePods, args.redundantPods, scaleIn, args.recycling)
486494
log.Info("Scaling down pool", "pool", pool.Name, "scaleIn", scaleIn, "redundantPods", len(redundantPods), "podsToDelete", len(podsToDelete))
487495
for _, pod := range podsToDelete {
488496
log.Info("Deleting pool pod", "pool", pool.Name, "pod", pod.Name)
@@ -532,7 +540,7 @@ func (r *PoolReconciler) updatePoolStatus(ctx context.Context, latestRevision st
532540
return nil
533541
}
534542

535-
func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []string, redundantPodNames []string, scaleIn int32) []*corev1.Pod {
543+
func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []string, redundantPodNames []string, scaleIn int32, recycling sets.Set[string]) []*corev1.Pod {
536544
var idlePods []*corev1.Pod
537545
podMap := make(map[string]*corev1.Pod)
538546
for _, pod := range pods {
@@ -555,12 +563,18 @@ func (r *PoolReconciler) pickPodsToDelete(pods []*corev1.Pod, idlePodNames []str
555563
if !ok {
556564
continue
557565
}
566+
if recycling.Has(pod.Name) {
567+
continue
568+
}
558569
podsToDelete = append(podsToDelete, pod)
559570
}
560571
for _, pod := range idlePods { // delete pod from pool scale
561572
if scaleIn <= 0 {
562573
break
563574
}
575+
if recycling.Has(pod.Name) {
576+
continue
577+
}
564578
if pod.DeletionTimestamp == nil {
565579
podsToDelete = append(podsToDelete, pod)
566580
}

kubernetes/test/e2e/pod_recycle_policy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var _ = Describe("Pod Recycle Policy", Ordered, func() {
5757
By("patching controller deployment with restart-timeout for testing")
5858
cmd = exec.Command("kubectl", "patch", "deployment", "opensandbox-controller-manager", "-n", namespace,
5959
"--type", "json", "-p",
60-
`[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--restart-timeout=20s"}]`)
60+
`[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--restart-timeout=60s"}]`)
6161
_, err = utils.Run(cmd)
6262
Expect(err).NotTo(HaveOccurred(), "Failed to patch controller deployment")
6363

0 commit comments

Comments
 (0)