Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.

Commit f03add4

Browse files
committed
feat: adds job tracker functions and job status checks
1 parent e43b8f0 commit f03add4

4 files changed

Lines changed: 110 additions & 8 deletions

File tree

toolkit/job-helper/job-helper.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ const (
2323
JobStatusSuccess JobStatus = "job-status-success"
2424
)
2525

26+
type JobPhase string
27+
28+
const (
29+
JobPhasePending JobPhase = "PENDING"
30+
JobPhaseRunning JobPhase = "RUNNING"
31+
JobPhaseFailed JobPhase = "FAILED"
32+
JobPhaseSucceeded JobPhase = "SUCCEEDED"
33+
)
34+
2635
func GetLatestPod(ctx context.Context, cli client.Client, jobNamespace string, jobName string) (*corev1.Pod, error) {
2736
var podList corev1.PodList
2837

toolkit/job-helper/job-runner.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package job_helper
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
batchv1 "k8s.io/api/batch/v1"
8+
"k8s.io/apimachinery/pkg/types"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
)
11+
12+
type JobTracker struct {
13+
ctx context.Context
14+
client client.Client
15+
16+
job *batchv1.Job
17+
18+
isTargetJob func(job *batchv1.Job) bool
19+
}
20+
21+
type JobTrackerArgs struct {
22+
client client.Client
23+
24+
JobNamespace string
25+
JobName string
26+
27+
IsTargetJob func(job *batchv1.Job) bool
28+
}
29+
30+
func NewJobTracker(ctx context.Context, kcli client.Client, args JobTrackerArgs) (*JobTracker, error) {
31+
var job batchv1.Job
32+
if err := kcli.Get(ctx, types.NamespacedName{Name: args.JobName, Namespace: args.JobNamespace}, &job); err != nil {
33+
return nil, err
34+
}
35+
36+
return &JobTracker{
37+
ctx: ctx,
38+
client: kcli,
39+
job: &job,
40+
isTargetJob: args.IsTargetJob,
41+
}, nil
42+
}
43+
44+
func (jr *JobTracker) HasJobFinished() bool {
45+
for _, v := range jr.job.Status.Conditions {
46+
if v.Type == batchv1.JobComplete && v.Status == "True" {
47+
return true
48+
}
49+
50+
if v.Type == batchv1.JobFailed && v.Status == "True" {
51+
return true
52+
}
53+
54+
if v.Type == batchv1.JobSuspended && v.Status == "True" {
55+
return true
56+
}
57+
}
58+
59+
return false
60+
}
61+
62+
func (jr *JobTracker) Process(ctx context.Context) (phase JobPhase, message string, err error) {
63+
if !jr.isTargetJob(jr.job) {
64+
if !jr.HasJobFinished() {
65+
return JobPhasePending, "waiting for previous jobs to finish execution", nil
66+
}
67+
68+
if err := DeleteJob(jr.ctx, jr.client, jr.job.Namespace, jr.job.Name); err != nil {
69+
return JobPhasePending, "", errors.Join(errors.New("failed to delete job"), err)
70+
}
71+
72+
return JobPhasePending, "waiting for job to start", nil
73+
}
74+
75+
if !jr.HasJobFinished() {
76+
return JobPhaseRunning, "waiting for running job to finish", nil
77+
}
78+
79+
// check.Message = job_manager.GetTerminationLog(ctx, r.Client, job.Namespace, job.Name)
80+
if jr.job.Status.Succeeded > 0 {
81+
return JobPhaseSucceeded, "", nil
82+
}
83+
84+
if jr.job.Status.Active > 0 {
85+
return JobPhaseRunning, "waiting for job to complete", nil
86+
}
87+
88+
if jr.job.Status.Failed > 0 {
89+
return JobPhaseFailed, "", errors.New("install or upgrade job failed")
90+
}
91+
92+
return JobPhasePending, "", nil
93+
}

toolkit/reconciler/checks.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ func AreChecksEqual(c1 Check, c2 Check) bool {
5050
c1.StartedAt.Sub(c2.StartedAt.Time) == 0
5151
}
5252

53-
type checkWrapper[T Resource] struct {
53+
type CheckWrapper[T Resource] struct {
5454
checkName string
5555
request *Request[T]
5656
Check `json:",inline"`
5757
}
5858

59-
func (cw *checkWrapper[T]) Failed(err error) step_result.Result {
59+
func (cw *CheckWrapper[T]) Failed(err error) step_result.Result {
6060
defer cw.request.LogPostCheck(cw.checkName)
6161

6262
_, file, line, _ := runtime.Caller(1)
@@ -82,7 +82,7 @@ func (cw *checkWrapper[T]) Failed(err error) step_result.Result {
8282
// return cw.request.updateStatus().Continue(false).Err(err)
8383
}
8484

85-
func (cw *checkWrapper[T]) StillRunning(err error) step_result.Result {
85+
func (cw *CheckWrapper[T]) StillRunning(err error) step_result.Result {
8686
defer cw.request.LogPostCheck(cw.checkName)
8787

8888
cw.Check.State = RunningState
@@ -104,7 +104,7 @@ func (cw *checkWrapper[T]) StillRunning(err error) step_result.Result {
104104
// return cw.request.updateStatus().Continue(false).Err(err)
105105
}
106106

107-
func (cw *checkWrapper[T]) Completed() step_result.Result {
107+
func (cw *CheckWrapper[T]) Completed() step_result.Result {
108108
defer cw.request.LogPostCheck(cw.checkName)
109109

110110
cw.Check.State = CompletedState
@@ -119,8 +119,8 @@ func (cw *checkWrapper[T]) Completed() step_result.Result {
119119
return step_result.New().Continue(true)
120120
}
121121

122-
func NewRunningCheck[T Resource](name string, req *Request[T]) *checkWrapper[T] {
123-
cw := &checkWrapper[T]{
122+
func NewRunningCheck[T Resource](name string, req *Request[T]) *CheckWrapper[T] {
123+
cw := &CheckWrapper[T]{
124124
checkName: name,
125125
request: req,
126126
Check: Check{

toolkit/reconciler/request.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func (r *Request[T]) AddToOwnedResources(refs ...ResourceRef) {
448448
r.resourceRefs = append(r.resourceRefs, refs...)
449449
}
450450

451-
func (r *Request[T]) CleanupOwnedResources(check *checkWrapper[T]) stepResult.Result {
451+
func (r *Request[T]) CleanupOwnedResources(check *CheckWrapper[T]) stepResult.Result {
452452
resources := r.Object.GetStatus().Resources
453453
objects := make([]client.Object, 0, len(resources))
454454
for i := range resources {
@@ -474,7 +474,7 @@ INFO: this should only be used for very specific cases, where there is no other
474474
Like, when deleting ManagedService
475475
- all managed resources should be deleted, but since owner is already getting deleted, there is no point in their proper cleanup
476476
*/
477-
func (r *Request[T]) ForceCleanupOwnedResources(check *checkWrapper[T]) stepResult.Result {
477+
func (r *Request[T]) ForceCleanupOwnedResources(check *CheckWrapper[T]) stepResult.Result {
478478
ctx := r.Context()
479479
resources := r.Object.GetStatus().Resources
480480

0 commit comments

Comments
 (0)