Skip to content

Commit 3835fbf

Browse files
committed
feat(k8s): Add lifecycle feat in task schedule
1 parent aea52c0 commit 3835fbf

21 files changed

Lines changed: 1555 additions & 24 deletions

kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type BatchSandboxStatus struct {
100100
TaskPending int32 `json:"taskPending"`
101101
// TaskUnknown is the number of Unknown task
102102
TaskUnknown int32 `json:"taskUnknown"`
103+
// TaskLastErrorMessage holds the most recent error message from a failed task.
104+
// This includes lifecycle hook failures (e.g., ossfs mount timeout or error output).
105+
// +optional
106+
TaskLastErrorMessage string `json:"taskLastErrorMessage,omitempty"`
103107
}
104108

105109
// +genclient
@@ -145,6 +149,17 @@ type TaskTemplateSpec struct {
145149
Spec TaskSpec `json:"spec,omitempty"`
146150
}
147151

152+
// ExecMode defines where a process should be executed.
153+
// +kubebuilder:validation:Enum=Local;Remote
154+
type ExecMode string
155+
156+
const (
157+
// ExecModeLocal executes in the task-executor container.
158+
ExecModeLocal ExecMode = "Local"
159+
// ExecModeRemote executes in the main container via nsenter.
160+
ExecModeRemote ExecMode = "Remote"
161+
)
162+
148163
type TaskSpec struct {
149164
// +optional
150165
Process *ProcessTask `json:"process,omitempty"`
@@ -169,6 +184,49 @@ type ProcessTask struct {
169184
// WorkingDir task working directory.
170185
// +optional
171186
WorkingDir string `json:"workingDir,omitempty"`
187+
// ExecMode controls where the process runs.
188+
// Local: inside task-executor container.
189+
// Remote: inside main container (via nsenter).
190+
// +optional
191+
// +kubebuilder:default=Local
192+
ExecMode ExecMode `json:"execMode,omitempty"`
193+
// Lifecycle defines actions to be executed before and after the main process.
194+
// +optional
195+
Lifecycle *ProcessLifecycle `json:"lifecycle,omitempty"`
196+
}
197+
198+
// ProcessLifecycle defines lifecycle hooks for a process.
199+
type ProcessLifecycle struct {
200+
// PreStart is executed before the main process starts.
201+
// +optional
202+
PreStart *LifecycleHandler `json:"preStart,omitempty"`
203+
// PostStop is executed after the main process stops.
204+
// +optional
205+
PostStop *LifecycleHandler `json:"postStop,omitempty"`
206+
}
207+
208+
// LifecycleHandler defines a lifecycle action.
209+
type LifecycleHandler struct {
210+
// Exec specifies the action to take.
211+
// +optional
212+
Exec *ExecAction `json:"exec,omitempty"`
213+
// ExecMode controls where the action runs.
214+
// +optional
215+
// +kubebuilder:default=Local
216+
ExecMode ExecMode `json:"execMode,omitempty"`
217+
// TimeoutSeconds is the maximum number of seconds the hook may run.
218+
// If the hook does not complete within this time, it is killed and the
219+
// enclosing operation (Start or Stop) is treated as failed.
220+
// If not set or zero, there is no timeout.
221+
// +optional
222+
TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"`
223+
}
224+
225+
// ExecAction describes a "run in container" action.
226+
type ExecAction struct {
227+
// Command is the command line to execute inside the container.
228+
// +kubebuilder:validation:Required
229+
Command []string `json:"command"`
172230
}
173231

174232
// TaskStatus task status

kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go

Lines changed: 75 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ spec:
156156
description: TaskFailed is the number of Failed task
157157
format: int32
158158
type: integer
159+
taskLastErrorMessage:
160+
description: |-
161+
TaskLastErrorMessage holds the most recent error message from a failed task.
162+
This includes lifecycle hook failures (e.g., ossfs mount timeout or error output).
163+
type: string
159164
taskPending:
160165
description: TaskPending is the number of Pending task which is unassigned
161166
format: int32

kubernetes/config/manager/kustomization.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ apiVersion: kustomize.config.k8s.io/v1beta1
44
kind: Kustomization
55
images:
66
- name: controller
7-
newName: controller
8-
newTag: dev
7+
newName: example.com/sandbox-k8s
8+
newTag: v0.0.1
99
- name: manager
1010
newName: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/controller
1111
newTag: v0.0.1
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
apiVersion: sandbox.opensandbox.io/v1alpha1
2+
kind: BatchSandbox
3+
metadata:
4+
labels:
5+
app.kubernetes.io/name: opensandbox
6+
app.kubernetes.io/managed-by: kustomize
7+
name: batchsandbox-lifecycle-sample
8+
namespace: opensandbox
9+
spec:
10+
replicas: 1
11+
template:
12+
metadata:
13+
labels:
14+
app: lifecycle-example
15+
spec:
16+
containers:
17+
- name: main
18+
image: registry.k8s.io/e2e-test-images/busybox:1.29-4
19+
command:
20+
- tail
21+
- -f
22+
- /dev/null
23+
expireTime: "2025-12-31T23:59:59Z"
24+
taskTemplate:
25+
spec:
26+
process:
27+
command:
28+
- sleep
29+
args:
30+
- "30"
31+
env:
32+
- name: TASK_NAME
33+
value: lifecycle-task
34+
lifecycle:
35+
# preStart hook runs before the main task process starts.
36+
# Useful for preparing the environment, checking dependencies, warming up data, etc.
37+
preStart:
38+
exec:
39+
command:
40+
- /bin/sh
41+
- -c
42+
- |
43+
echo "[preStart] Starting environment preparation..."
44+
mkdir -p /tmp/workdir
45+
echo "$(date): Task starting" > /tmp/workdir/prestart-marker
46+
echo "[preStart] Environment ready!"
47+
timeoutSeconds: 30
48+
# postStop hook runs after the main task process exits.
49+
# Useful for cleaning up resources, uploading logs, sending notifications, etc.
50+
postStop:
51+
exec:
52+
command:
53+
- /bin/sh
54+
- -c
55+
- |
56+
echo "[postStop] Starting cleanup..."
57+
echo "$(date): Task finished" >> /tmp/workdir/poststop-marker
58+
echo "Task logs:" && cat /tmp/workdir/*
59+
echo "[postStop] Cleanup complete!"
60+
timeoutSeconds: 30

kubernetes/internal/controller/batchsandbox_controller.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch
368368
var (
369369
running, failed, succeed, unknown int32
370370
pending int32
371+
lastErrorMessage string
371372
)
372373
for i := range len(tasks) {
373374
task := tasks[i]
@@ -385,6 +386,10 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch
385386
succeed++
386387
case taskscheduler.FailedTaskState:
387388
failed++
389+
// Capture the most recent error message to surface in status.
390+
if msg := task.GetTerminatedMessage(); msg != "" {
391+
lastErrorMessage = msg
392+
}
388393
case taskscheduler.UnknownTaskState:
389394
unknown++
390395
}
@@ -405,8 +410,14 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch
405410
newStatus.TaskSucceed = succeed
406411
newStatus.TaskUnknown = unknown
407412
newStatus.TaskPending = pending
413+
// Persist error message from the most recently failed task (e.g. lifecycle hook stderr).
414+
// Only update when a new non-empty message is available, to avoid clearing a previously
415+
// recorded error on subsequent reconcile cycles where the task may have been released.
416+
if lastErrorMessage != "" {
417+
newStatus.TaskLastErrorMessage = lastErrorMessage
418+
}
408419
if !reflect.DeepEqual(newStatus, oldStatus) {
409-
log.Info("To update BatchSandbox status", "replicas", newStatus.Replicas, "task_running", newStatus.TaskRunning, "task_succeed", newStatus.TaskSucceed, "task_failed", newStatus.TaskFailed, "task_unknown", newStatus.TaskUnknown, "task_pending", newStatus.TaskPending)
420+
log.Info("To update BatchSandbox status", "replicas", newStatus.Replicas, "task_running", newStatus.TaskRunning, "task_succeed", newStatus.TaskSucceed, "task_failed", newStatus.TaskFailed, "task_unknown", newStatus.TaskUnknown, "task_pending", newStatus.TaskPending, "last_error", newStatus.TaskLastErrorMessage)
410421
if err := r.updateStatus(batchSbx, newStatus); err != nil {
411422
return err
412423
}

kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,58 @@ func (s *DefaultTaskSchedulingStrategy) getTaskSpec(idx int) (*api.Task, error)
7272
if err = json.Unmarshal(modified, newTaskTemplate); err != nil {
7373
return nil, fmt.Errorf("batchsandbox: failed to unmarshal %s to TaskTemplateSpec, idx %d, err %w", modified, idx, err)
7474
}
75-
task.Process = &api.Process{
76-
Command: newTaskTemplate.Spec.Process.Command,
77-
Args: newTaskTemplate.Spec.Process.Args,
78-
Env: newTaskTemplate.Spec.Process.Env,
79-
WorkingDir: newTaskTemplate.Spec.Process.WorkingDir,
80-
TimeoutSeconds: s.Spec.TaskTemplate.Spec.TimeoutSeconds,
81-
}
75+
task.Process = convertProcessSpec(newTaskTemplate.Spec.Process, s.Spec.TaskTemplate.Spec.TimeoutSeconds)
8276
} else if s.Spec.TaskTemplate != nil && s.Spec.TaskTemplate.Spec.Process != nil {
83-
task.Process = &api.Process{
84-
Command: s.Spec.TaskTemplate.Spec.Process.Command,
85-
Args: s.Spec.TaskTemplate.Spec.Process.Args,
86-
Env: s.Spec.TaskTemplate.Spec.Process.Env,
87-
WorkingDir: s.Spec.TaskTemplate.Spec.Process.WorkingDir,
88-
TimeoutSeconds: s.Spec.TaskTemplate.Spec.TimeoutSeconds,
89-
}
77+
task.Process = convertProcessSpec(s.Spec.TaskTemplate.Spec.Process, s.Spec.TaskTemplate.Spec.TimeoutSeconds)
9078
}
9179
return task, nil
9280
}
81+
82+
// convertProcessSpec converts sandboxv1alpha1.ProcessTask to api.Process.
83+
func convertProcessSpec(src *sandboxv1alpha1.ProcessTask, timeoutSeconds *int64) *api.Process {
84+
if src == nil {
85+
return nil
86+
}
87+
return &api.Process{
88+
Command: src.Command,
89+
Args: src.Args,
90+
Env: src.Env,
91+
WorkingDir: src.WorkingDir,
92+
TimeoutSeconds: timeoutSeconds,
93+
ExecMode: api.ExecMode(src.ExecMode),
94+
Lifecycle: convertLifecycle(src.Lifecycle),
95+
}
96+
}
97+
98+
// convertLifecycle converts sandboxv1alpha1.ProcessLifecycle to api.ProcessLifecycle.
99+
func convertLifecycle(src *sandboxv1alpha1.ProcessLifecycle) *api.ProcessLifecycle {
100+
if src == nil {
101+
return nil
102+
}
103+
return &api.ProcessLifecycle{
104+
PreStart: convertLifecycleHandler(src.PreStart),
105+
PostStop: convertLifecycleHandler(src.PostStop),
106+
}
107+
}
108+
109+
// convertLifecycleHandler converts sandboxv1alpha1.LifecycleHandler to api.LifecycleHandler.
110+
func convertLifecycleHandler(src *sandboxv1alpha1.LifecycleHandler) *api.LifecycleHandler {
111+
if src == nil {
112+
return nil
113+
}
114+
return &api.LifecycleHandler{
115+
Exec: convertExecAction(src.Exec),
116+
ExecMode: api.ExecMode(src.ExecMode),
117+
TimeoutSeconds: src.TimeoutSeconds,
118+
}
119+
}
120+
121+
// convertExecAction converts sandboxv1alpha1.ExecAction to api.ExecAction.
122+
func convertExecAction(src *sandboxv1alpha1.ExecAction) *api.ExecAction {
123+
if src == nil {
124+
return nil
125+
}
126+
return &api.ExecAction{
127+
Command: src.Command,
128+
}
129+
}

kubernetes/internal/scheduler/default_scheduler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,19 @@ func (t *taskNode) IsResourceReleased() bool {
7373
return t.sState == stateReleased
7474
}
7575

76+
// GetTerminatedMessage returns the failure message from the task status, including
77+
// lifecycle hook stderr output. It is used by the controller to populate
78+
// BatchSandboxStatus.TaskLastErrorMessage.
79+
func (t *taskNode) GetTerminatedMessage() string {
80+
if t.Status == nil {
81+
return ""
82+
}
83+
if t.Status.ProcessStatus != nil && t.Status.ProcessStatus.Terminated != nil {
84+
return t.Status.ProcessStatus.Terminated.Message
85+
}
86+
return ""
87+
}
88+
7689
func (t *taskNode) isTaskCompleted() bool {
7790
return t.tState == SucceedTaskState || t.tState == FailedTaskState
7891
}

0 commit comments

Comments
 (0)