Skip to content

Commit 7e7aeb0

Browse files
committed
fix(dataprocess): use finished job condition for status updates
Signed-off-by: Harsh <harshmastic@gmail.com>
1 parent d14dcc7 commit 7e7aeb0

2 files changed

Lines changed: 117 additions & 1 deletion

File tree

pkg/controllers/v1alpha1/dataprocess/status_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (handler *OnceStatusHandler) GetOperationStatus(ctx runtime.ReconcileReques
7878
}
7979

8080
// job either failed or complete, update DataLoad's phase status
81-
jobCondition := job.Status.Conditions[0]
81+
jobCondition := *finishedJobCondition
8282

8383
result.Conditions = []datav1alpha1.Condition{
8484
{

pkg/controllers/v1alpha1/dataprocess/status_handler_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import (
2020
"testing"
2121
"time"
2222

23+
"github.com/agiledragon/gomonkey/v2"
2324
"github.com/fluid-cloudnative/fluid/api/v1alpha1"
2425
"github.com/fluid-cloudnative/fluid/pkg/common"
2526
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
2627
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
28+
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
2729
batchv1 "k8s.io/api/batch/v1"
30+
corev1 "k8s.io/api/core/v1"
2831
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2932
"k8s.io/apimachinery/pkg/runtime"
3033
"k8s.io/apimachinery/pkg/types"
@@ -111,3 +114,116 @@ func TestOnceGetOperationStatus(t *testing.T) {
111114
}
112115
}
113116
}
117+
118+
func TestOnceGetOperationStatusIgnoresMissingJobAfterHelmCleanup(t *testing.T) {
119+
testScheme := runtime.NewScheme()
120+
_ = v1alpha1.AddToScheme(testScheme)
121+
_ = batchv1.AddToScheme(testScheme)
122+
123+
mockDataProcess := v1alpha1.DataProcess{
124+
ObjectMeta: v1.ObjectMeta{
125+
Name: "test",
126+
Namespace: "default",
127+
},
128+
Spec: v1alpha1.DataProcessSpec{},
129+
}
130+
131+
patch := gomonkey.ApplyFunc(helm.DeleteReleaseIfExists, func(_ string, _ string) error { return nil })
132+
defer patch.Reset()
133+
134+
client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess)
135+
handler := &OnceStatusHandler{Client: client, dataProcess: &mockDataProcess}
136+
ctx := cruntime.ReconcileRequestContext{
137+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "test"},
138+
Log: fake.NullLogger(),
139+
}
140+
141+
opStatus := &v1alpha1.OperationStatus{Phase: common.PhaseExecuting}
142+
result, err := handler.GetOperationStatus(ctx, opStatus)
143+
if err != nil {
144+
t.Fatalf("expected missing job to be handled without error, got %v", err)
145+
}
146+
if result == nil {
147+
t.Fatal("expected non-nil result")
148+
}
149+
if result == opStatus {
150+
t.Fatal("expected deep-copied status result")
151+
}
152+
if result.Phase != common.PhaseExecuting {
153+
t.Fatalf("expected phase %q, got %q", common.PhaseExecuting, result.Phase)
154+
}
155+
}
156+
157+
func TestOnceGetOperationStatusUsesFinishedConditionWhenConditionsOutOfOrder(t *testing.T) {
158+
testScheme := runtime.NewScheme()
159+
_ = v1alpha1.AddToScheme(testScheme)
160+
_ = batchv1.AddToScheme(testScheme)
161+
162+
createdAt := time.Now().Add(-2 * time.Minute)
163+
suspendedAt := createdAt.Add(30 * time.Second)
164+
failedAt := createdAt.Add(90 * time.Second)
165+
166+
mockDataProcess := v1alpha1.DataProcess{
167+
ObjectMeta: v1.ObjectMeta{
168+
Name: "test",
169+
Namespace: "default",
170+
},
171+
Spec: v1alpha1.DataProcessSpec{},
172+
}
173+
174+
mockJob := batchv1.Job{
175+
ObjectMeta: v1.ObjectMeta{
176+
Name: "test-processor-job",
177+
Namespace: "default",
178+
CreationTimestamp: v1.NewTime(createdAt),
179+
},
180+
Status: batchv1.JobStatus{
181+
Conditions: []batchv1.JobCondition{
182+
{
183+
Type: batchv1.JobSuspended,
184+
Status: corev1.ConditionTrue,
185+
LastProbeTime: v1.NewTime(suspendedAt),
186+
LastTransitionTime: v1.NewTime(suspendedAt),
187+
},
188+
{
189+
Type: batchv1.JobFailed,
190+
Status: corev1.ConditionTrue,
191+
Reason: "FailedReason",
192+
Message: "failed after resume",
193+
LastProbeTime: v1.NewTime(failedAt),
194+
LastTransitionTime: v1.NewTime(failedAt),
195+
},
196+
},
197+
},
198+
}
199+
200+
client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &mockJob)
201+
handler := &OnceStatusHandler{Client: client, dataProcess: &mockDataProcess}
202+
ctx := cruntime.ReconcileRequestContext{
203+
NamespacedName: types.NamespacedName{Namespace: "default", Name: "test"},
204+
Log: fake.NullLogger(),
205+
}
206+
207+
result, err := handler.GetOperationStatus(ctx, &v1alpha1.OperationStatus{})
208+
if err != nil {
209+
t.Fatalf("expected no error, got %v", err)
210+
}
211+
if result == nil {
212+
t.Fatal("expected non-nil result")
213+
}
214+
if result.Phase != common.PhaseFailed {
215+
t.Fatalf("expected phase %q, got %q", common.PhaseFailed, result.Phase)
216+
}
217+
if len(result.Conditions) != 1 {
218+
t.Fatalf("expected 1 condition, got %d", len(result.Conditions))
219+
}
220+
if result.Conditions[0].Type != common.ConditionType(batchv1.JobFailed) {
221+
t.Fatalf("expected failed condition type, got %q", result.Conditions[0].Type)
222+
}
223+
if result.Conditions[0].Reason != "FailedReason" {
224+
t.Fatalf("expected failed reason, got %q", result.Conditions[0].Reason)
225+
}
226+
if result.Duration != failedAt.Sub(createdAt).String() {
227+
t.Fatalf("expected duration %q, got %q", failedAt.Sub(createdAt).String(), result.Duration)
228+
}
229+
}

0 commit comments

Comments
 (0)