Skip to content

Commit 2a339c8

Browse files
committed
feat(controller): add configurable restart timeout and exclude restarting pods from allocation #452
- Add restart-timeout flag to controller with default 90s for pod restart operations - Pass restartTimeout value to restartTracker for managing pod lifecycle - Modify restartTracker to use configurable restartTimeout instead of constant - Exclude pods in restarting state from allocator’s available pod list - Add unit test to verify allocator excludes restarting pods during scheduling - Update e2e test to deploy controller with restart-timeout=10s for timeout testing - Add setup and teardown steps in e2e test for namespace, CRDs, and controller deployment - Reduce pod restart timeout wait in e2e test from 4 minutes to 1 minute for faster feedback
1 parent 6d0211b commit 2a339c8

6 files changed

Lines changed: 149 additions & 18 deletions

File tree

kubernetes/cmd/controller/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"flag"
2020
"os"
2121
"path/filepath"
22+
"time"
2223

2324
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
2425
// to ensure that exec-entrypoint and run can make use of them.
@@ -78,6 +79,9 @@ func main() {
7879
var kubeClientQPS float64
7980
var kubeClientBurst int
8081

82+
// Restart timeout configuration
83+
var restartTimeout time.Duration
84+
8185
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
8286
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
8387
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -105,6 +109,7 @@ func main() {
105109
flag.BoolVar(&logCompress, "log-compress", true, "Compress determines if the rotated log files should be compressed using gzip")
106110
flag.Float64Var(&kubeClientQPS, "kube-client-qps", 100, "QPS for Kubernetes client rate limiter.")
107111
flag.IntVar(&kubeClientBurst, "kube-client-burst", 200, "Burst for Kubernetes client rate limiter.")
112+
flag.DurationVar(&restartTimeout, "restart-timeout", 90*time.Second, "Timeout for Pod restart operations. If a Pod fails to restart within this duration, it will be deleted.")
108113

109114
opts := zap.Options{}
110115
opts.BindFlags(flag.CommandLine)
@@ -261,7 +266,7 @@ func main() {
261266
os.Exit(1)
262267
}
263268
kubeClient := kubernetes.NewForConfigOrDie(mgr.GetConfig())
264-
restartTracker := controller.NewRestartTracker(mgr.GetClient(), kubeClient, mgr.GetConfig())
269+
restartTracker := controller.NewRestartTracker(mgr.GetClient(), kubeClient, mgr.GetConfig(), restartTimeout)
265270
if err := (&controller.PoolReconciler{
266271
Client: mgr.GetClient(),
267272
Scheme: mgr.GetScheme(),

kubernetes/internal/controller/allocator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ func (allocator *defaultAllocator) Schedule(ctx context.Context, spec *AllocSpec
227227
if pod.Status.Phase != corev1.PodRunning {
228228
continue
229229
}
230+
if isRestarting(pod) {
231+
continue
232+
}
230233
availablePods = append(availablePods, pod.Name)
231234
}
232235
log.V(1).Info("Schedule init", "existingAllocations", len(status.PodAllocation), "availablePods", len(availablePods))

kubernetes/internal/controller/allocator_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package controller
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"reflect"
2021
"testing"
2122

@@ -494,3 +495,70 @@ func TestSyncSandboxAllocationError(t *testing.T) {
494495
err := allocator.SyncSandboxAllocation(context.Background(), sandbox, pods)
495496
assert.Error(t, err)
496497
}
498+
499+
func TestScheduleExcludesRestartingPods(t *testing.T) {
500+
ctrl := gomock.NewController(t)
501+
defer ctrl.Finish()
502+
store := NewMockAllocationStore(ctrl)
503+
syncer := NewMockAllocationSyncer(ctrl)
504+
allocator := &defaultAllocator{
505+
store: store,
506+
syncer: syncer,
507+
}
508+
replica1 := int32(1)
509+
510+
// Create pods: one normal, one restarting (should be excluded from allocation)
511+
restartingMeta := PodRecycleMeta{
512+
State: RecycleStateRestarting,
513+
TriggeredAt: 1234567890,
514+
}
515+
restartingMetaJSON, _ := json.Marshal(restartingMeta)
516+
517+
pods := []*corev1.Pod{
518+
{
519+
ObjectMeta: metav1.ObjectMeta{
520+
Name: "pod-normal",
521+
},
522+
Status: corev1.PodStatus{Phase: corev1.PodRunning},
523+
},
524+
{
525+
ObjectMeta: metav1.ObjectMeta{
526+
Name: "pod-restarting",
527+
Annotations: map[string]string{
528+
AnnoPodRecycleMeta: string(restartingMetaJSON),
529+
},
530+
},
531+
Status: corev1.PodStatus{Phase: corev1.PodRunning},
532+
},
533+
}
534+
sandboxes := []*sandboxv1alpha1.BatchSandbox{
535+
{
536+
ObjectMeta: metav1.ObjectMeta{Name: "sbx1"},
537+
Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1},
538+
},
539+
{
540+
ObjectMeta: metav1.ObjectMeta{Name: "sbx2"},
541+
Spec: sandboxv1alpha1.BatchSandboxSpec{Replicas: &replica1},
542+
},
543+
}
544+
spec := &AllocSpec{
545+
Pods: pods,
546+
Sandboxes: sandboxes,
547+
Pool: &sandboxv1alpha1.Pool{ObjectMeta: metav1.ObjectMeta{Name: "pool1"}},
548+
}
549+
550+
store.EXPECT().GetAllocation(gomock.Any(), gomock.Any()).Return(&PoolAllocation{PodAllocation: map[string]string{}}, nil).Times(1)
551+
syncer.EXPECT().GetAllocation(gomock.Any(), gomock.Any()).Return(&SandboxAllocation{Pods: []string{}}, nil).Times(2)
552+
syncer.EXPECT().GetRelease(gomock.Any(), gomock.Any()).Return(&AllocationRelease{Pods: []string{}}, nil).Times(2)
553+
554+
status, pendingSyncs, poolDirty, err := allocator.Schedule(context.Background(), spec)
555+
556+
assert.NoError(t, err)
557+
assert.True(t, poolDirty)
558+
// Only the normal pod should be allocated, sbx2 should have no pod
559+
assert.Contains(t, status.PodAllocation, "pod-normal")
560+
assert.NotContains(t, status.PodAllocation, "pod-restarting")
561+
// sbx2 should need supplement since restarting pod is excluded
562+
assert.Equal(t, int32(1), status.PodSupplement)
563+
assert.Len(t, pendingSyncs, 1)
564+
}

kubernetes/internal/controller/restart_tracker.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333

3434
// Restart timeout configurations
3535
const (
36-
restartTimeout = 90 * time.Second
37-
killTimeout = 30 * time.Second
36+
defaultRestartTimeout = 90 * time.Second
37+
killTimeout = 30 * time.Second
3838
)
3939

4040
// restartTracker manages the Pod restart lifecycle as part of the PoolReconciler.
@@ -48,22 +48,28 @@ const (
4848
// all restarted & ready → None (clear annotation, reuse)
4949
// timeout / CrashLoop → delete Pod
5050
type restartTracker struct {
51-
client client.Client
52-
kubeClient kubernetes.Interface
53-
restConfig *rest.Config
51+
client client.Client
52+
kubeClient kubernetes.Interface
53+
restConfig *rest.Config
54+
restartTimeout time.Duration
5455
}
5556

5657
type RestartTracker interface {
5758
HandleRestart(ctx context.Context, pod *corev1.Pod) error
5859
}
5960

60-
// NewRestartTracker creates a new restartTracker.
61-
func NewRestartTracker(c client.Client, kubeClient kubernetes.Interface, restConfig *rest.Config) RestartTracker {
62-
return &restartTracker{
63-
client: c,
64-
kubeClient: kubeClient,
65-
restConfig: restConfig,
61+
// NewRestartTracker creates a new restartTracker with custom restart timeout.
62+
func NewRestartTracker(c client.Client, kubeClient kubernetes.Interface, restConfig *rest.Config, restartTimeout time.Duration) RestartTracker {
63+
r := &restartTracker{
64+
client: c,
65+
kubeClient: kubeClient,
66+
restConfig: restConfig,
67+
restartTimeout: restartTimeout,
6668
}
69+
if r.restartTimeout == 0 {
70+
r.restartTimeout = defaultRestartTimeout
71+
}
72+
return r
6773
}
6874

6975
// HandleRestart handles the Restart recycle policy for a Pod.
@@ -206,9 +212,9 @@ func (t *restartTracker) checkRestartStatus(ctx context.Context, pod *corev1.Pod
206212
return t.client.Delete(ctx, pod)
207213
}
208214

209-
if elapsed > restartTimeout {
215+
if elapsed > t.restartTimeout {
210216
log.Info("Pod restart timeout, deleting", "pod", pod.Name,
211-
"elapsed", elapsed, "timeout", restartTimeout,
217+
"elapsed", elapsed, "timeout", t.restartTimeout,
212218
"allRestarted", allRestarted, "allReady", allReady)
213219
return t.client.Delete(ctx, pod)
214220
}

kubernetes/internal/controller/restart_tracker_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestRestartTracker_CheckRestartStatus_Timeout(t *testing.T) {
134134

135135
meta := &PodRecycleMeta{
136136
State: RecycleStateRestarting,
137-
TriggeredAt: time.Now().UnixMilli() - (restartTimeout.Milliseconds() + 1000),
137+
TriggeredAt: time.Now().UnixMilli() - (defaultRestartTimeout.Milliseconds() + 1000),
138138
InitialRestartCounts: map[string]int32{
139139
"c1": 0,
140140
},
@@ -143,7 +143,8 @@ func TestRestartTracker_CheckRestartStatus_Timeout(t *testing.T) {
143143

144144
client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(pod).Build()
145145
tracker := &restartTracker{
146-
client: client,
146+
client: client,
147+
restartTimeout: defaultRestartTimeout,
147148
}
148149

149150
err := tracker.checkRestartStatus(context.Background(), pod)
@@ -229,7 +230,8 @@ func TestRestartTracker_CheckRestartStatus_StillRestarting(t *testing.T) {
229230

230231
client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(pod).Build()
231232
tracker := &restartTracker{
232-
client: client,
233+
client: client,
234+
restartTimeout: defaultRestartTimeout,
233235
}
234236

235237
err := tracker.checkRestartStatus(context.Background(), pod)

kubernetes/test/e2e/pod_recycle_policy_test.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,39 @@ var _ = Describe("Pod Recycle Policy", Ordered, func() {
3535
const testNamespace = "default"
3636

3737
BeforeAll(func() {
38+
By("creating manager namespace")
39+
cmd := exec.Command("kubectl", "create", "ns", namespace)
40+
_, err := utils.Run(cmd)
41+
Expect(err).NotTo(HaveOccurred(), "Failed to create namespace")
42+
43+
By("labeling the namespace to enforce the restricted security policy")
44+
cmd = exec.Command("kubectl", "label", "--overwrite", "ns", namespace,
45+
"pod-security.kubernetes.io/enforce=restricted")
46+
_, err = utils.Run(cmd)
47+
Expect(err).NotTo(HaveOccurred(), "Failed to label namespace with restricted policy")
48+
49+
By("installing CRDs")
50+
cmd = exec.Command("make", "install")
51+
_, err = utils.Run(cmd)
52+
Expect(err).NotTo(HaveOccurred(), "Failed to install CRDs")
53+
54+
By("deploying the controller-manager")
55+
cmd = exec.Command("make", "deploy", fmt.Sprintf("CONTROLLER_IMG=%s", utils.ControllerImage))
56+
_, err = utils.Run(cmd)
57+
Expect(err).NotTo(HaveOccurred(), "Failed to deploy the controller-manager")
58+
59+
By("patching controller deployment with restart-timeout for testing")
60+
cmd = exec.Command("kubectl", "patch", "deployment", "opensandbox-controller-manager", "-n", namespace,
61+
"--type", "json", "-p",
62+
`[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--restart-timeout=10s"}]`)
63+
_, err = utils.Run(cmd)
64+
Expect(err).NotTo(HaveOccurred(), "Failed to patch controller deployment")
65+
66+
By("waiting for controller rollout to complete")
67+
cmd = exec.Command("kubectl", "rollout", "status", "deployment/opensandbox-controller-manager", "-n", namespace, "--timeout=60s")
68+
_, err = utils.Run(cmd)
69+
Expect(err).NotTo(HaveOccurred(), "Failed to wait for controller rollout")
70+
3871
By("waiting for controller to be ready")
3972
Eventually(func(g Gomega) {
4073
cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager",
@@ -45,6 +78,20 @@ var _ = Describe("Pod Recycle Policy", Ordered, func() {
4578
}, 2*time.Minute).Should(Succeed())
4679
})
4780

81+
AfterAll(func() {
82+
By("undeploying the controller-manager")
83+
cmd := exec.Command("make", "undeploy")
84+
_, _ = utils.Run(cmd)
85+
86+
By("uninstalling CRDs")
87+
cmd = exec.Command("make", "uninstall")
88+
_, _ = utils.Run(cmd)
89+
90+
By("removing manager namespace")
91+
cmd = exec.Command("kubectl", "delete", "ns", namespace)
92+
_, _ = utils.Run(cmd)
93+
})
94+
4895
SetDefaultEventuallyTimeout(3 * time.Minute)
4996
SetDefaultEventuallyPollingInterval(2 * time.Second)
5097

@@ -411,7 +458,7 @@ var _ = Describe("Pod Recycle Policy", Ordered, func() {
411458
_, err := utils.Run(cmd)
412459
g.Expect(err).To(HaveOccurred(), "Pod should be deleted after timeout")
413460
g.Expect(err.Error()).To(ContainSubstring("not found"))
414-
}, 4*time.Minute).Should(Succeed()) // restartTimeout (90s) + buffer
461+
}, 1*time.Minute).Should(Succeed())
415462

416463
By("cleaning up")
417464
cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace)

0 commit comments

Comments
 (0)