From 74e4f20c1fdf15f86bbf76d8e82da1eee09b25f5 Mon Sep 17 00:00:00 2001 From: Harsh Date: Tue, 24 Mar 2026 15:16:31 +0530 Subject: [PATCH 1/3] test(datamigrate): migrate controller tests to Ginkgo v2 Signed-off-by: Harsh --- .../datamigrate_controller_test.go | 94 +++ .../v1alpha1/datamigrate/implement_test.go | 468 +++++++++-- .../datamigrate/status_handler_test.go | 746 +++++++++++------- .../v1alpha1/datamigrate/suite_test.go | 39 + 4 files changed, 1000 insertions(+), 347 deletions(-) create mode 100644 pkg/controllers/v1alpha1/datamigrate/datamigrate_controller_test.go create mode 100644 pkg/controllers/v1alpha1/datamigrate/suite_test.go diff --git a/pkg/controllers/v1alpha1/datamigrate/datamigrate_controller_test.go b/pkg/controllers/v1alpha1/datamigrate/datamigrate_controller_test.go new file mode 100644 index 00000000000..c5f735eae6f --- /dev/null +++ b/pkg/controllers/v1alpha1/datamigrate/datamigrate_controller_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamigrate + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +var _ = Describe("DataMigrateReconciler", func() { + Describe("NewDataMigrateReconciler", func() { + It("should create a reconciler with the provided fields", func() { + scheme := runtime.NewScheme() + fakeClient := fake.NewFakeClientWithScheme(scheme) + log := fake.NullLogger() + recorder := record.NewFakeRecorder(10) + + r := NewDataMigrateReconciler(fakeClient, log, scheme, recorder) + + Expect(r).NotTo(BeNil()) + Expect(r.Scheme).To(Equal(scheme)) + Expect(r.OperationReconciler).NotTo(BeNil()) + }) + }) + + Describe("ControllerName", func() { + It("should return DataMigrateReconciler", func() { + scheme := runtime.NewScheme() + fakeClient := fake.NewFakeClientWithScheme(scheme) + log := fake.NullLogger() + recorder := record.NewFakeRecorder(10) + + r := NewDataMigrateReconciler(fakeClient, log, scheme, recorder) + + Expect(r.ControllerName()).To(Equal("DataMigrateReconciler")) + }) + }) + + Describe("Build", func() { + It("should return a dataMigrateOperation when given a DataMigrate object", func() { + scheme := runtime.NewScheme() + fakeClient := fake.NewFakeClientWithScheme(scheme) + log := fake.NullLogger() + recorder := record.NewFakeRecorder(10) + + r := NewDataMigrateReconciler(fakeClient, log, scheme, recorder) + + dm := &datav1alpha1.DataMigrate{} + dm.Name = "test-migrate" + dm.Namespace = "default" + + op, err := r.Build(dm) + + Expect(err).NotTo(HaveOccurred()) + Expect(op).NotTo(BeNil()) + }) + + It("should return an error when given a non-DataMigrate object", func() { + scheme := runtime.NewScheme() + fakeClient := fake.NewFakeClientWithScheme(scheme) + log := fake.NullLogger() + recorder := record.NewFakeRecorder(10) + + r := NewDataMigrateReconciler(fakeClient, log, scheme, recorder) + + notDM := &datav1alpha1.DataLoad{} + + op, err := r.Build(notDM) + + Expect(err).To(HaveOccurred()) + Expect(op).To(BeNil()) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/datamigrate/implement_test.go b/pkg/controllers/v1alpha1/datamigrate/implement_test.go index 1a619b49eb5..46403d06587 100644 --- a/pkg/controllers/v1alpha1/datamigrate/implement_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/implement_test.go @@ -1,83 +1,435 @@ /* - Copyright 2024 The Fluid Authors. +Copyright 2026 The Fluid Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package datamigrate import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate" + "github.com/fluid-cloudnative/fluid/pkg/dataoperation" "github.com/fluid-cloudnative/fluid/pkg/runtime" - "github.com/go-logr/logr" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" - "testing" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) -func TestDataMigrateOperation_Validate(t *testing.T) { - type fields struct { - Client client.Client - Log logr.Logger - Recorder record.EventRecorder - dataMigrate *datav1alpha1.DataMigrate - } - type args struct { - ctx runtime.ReconcileRequestContext - } - tests := []struct { - name string - fields fields - args args - reason string - wantErr bool - }{ - { - name: "ssh secret not set when parallel migrate", - fields: fields{ +var _ = Describe("dataMigrateOperation", func() { + Describe("Validate", func() { + It("should error when SSH secret is not set for parallel migrate", func() { + op := &dataMigrateOperation{ dataMigrate: &datav1alpha1.DataMigrate{ Spec: datav1alpha1.DataMigrateSpec{ Parallelism: 2, ParallelOptions: map[string]string{}, }, }, - }, - args: args{ - ctx: runtime.ReconcileRequestContext{ - Dataset: nil, + } + ctx := runtime.ReconcileRequestContext{ + Dataset: nil, + } + + got, err := op.Validate(ctx) + + Expect(err).To(HaveOccurred()) + Expect(got).NotTo(BeEmpty()) + Expect(got[0].Reason).To(Equal(common.TargetSSHSecretNameNotSet)) + }) + + It("should error when datamigrate namespace differs from dataset namespace", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Parallelism: 1, + }, }, - }, - reason: common.TargetSSHSecretNameNotSet, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &dataMigrateOperation{ - Client: tt.fields.Client, - Log: tt.fields.Log, - Recorder: tt.fields.Recorder, - dataMigrate: tt.fields.dataMigrate, } - got, err := r.Validate(tt.args.ctx) - if (err != nil) != tt.wantErr { - t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) - return + op.dataMigrate.Namespace = "ns-a" + + dataset := &datav1alpha1.Dataset{} + dataset.Namespace = "ns-b" + + ctx := runtime.ReconcileRequestContext{ + Dataset: dataset, } - if tt.wantErr && got[0].Reason != tt.reason { - t.Errorf("Validate() error reason got = %v, want %v", got[0].Reason, tt.reason) + + got, err := op.Validate(ctx) + + Expect(err).To(HaveOccurred()) + Expect(got).NotTo(BeEmpty()) + Expect(got[0].Reason).To(Equal(common.TargetDatasetNamespaceNotSame)) + }) + + It("should return nil when validation passes", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Parallelism: 1, + }, + }, } + op.dataMigrate.Namespace = "default" + + dataset := &datav1alpha1.Dataset{} + dataset.Namespace = "default" + + ctx := runtime.ReconcileRequestContext{ + Dataset: dataset, + } + + got, err := op.Validate(ctx) + + Expect(err).NotTo(HaveOccurred()) + Expect(got).To(BeNil()) + }) + }) + + Describe("GetOperationObject", func() { + It("should return the dataMigrate object", func() { + dm := &datav1alpha1.DataMigrate{} + dm.Name = "test-migrate" + op := &dataMigrateOperation{dataMigrate: dm} + + Expect(op.GetOperationObject()).To(Equal(dm)) + }) + }) + + Describe("HasPrecedingOperation", func() { + It("should return false when RunAfter is nil", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{}, + }, + } + Expect(op.HasPrecedingOperation()).To(BeFalse()) + }) + + It("should return true when RunAfter is set", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{}, + }, + }, + } + Expect(op.HasPrecedingOperation()).To(BeTrue()) + }) + }) + + Describe("GetPossibleTargetDatasetNamespacedNames", func() { + It("should return empty when no datasets are set", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{}, + }, + } + + result := op.GetPossibleTargetDatasetNamespacedNames() + + Expect(result).To(BeEmpty()) + }) + + It("should return To dataset when set", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + To: datav1alpha1.DataToMigrate{ + DataSet: &datav1alpha1.DatasetToMigrate{ + Name: "dest-dataset", + Namespace: "ns-dest", + }, + }, + }, + }, + } + + result := op.GetPossibleTargetDatasetNamespacedNames() + + Expect(result).To(HaveLen(1)) + Expect(result[0]).To(Equal(types.NamespacedName{Namespace: "ns-dest", Name: "dest-dataset"})) + }) + + It("should fall back to dataMigrate namespace when dataset namespace is empty", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + From: datav1alpha1.DataToMigrate{ + DataSet: &datav1alpha1.DatasetToMigrate{ + Name: "src-dataset", + }, + }, + }, + }, + } + op.dataMigrate.Namespace = "default" + + result := op.GetPossibleTargetDatasetNamespacedNames() + + Expect(result).To(HaveLen(1)) + Expect(result[0]).To(Equal(types.NamespacedName{Namespace: "default", Name: "src-dataset"})) + }) + }) + + Describe("GetReleaseNameSpacedName", func() { + It("should return correct namespaced name for release", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{}, + } + op.dataMigrate.Name = "my-migrate" + op.dataMigrate.Namespace = "my-ns" + + result := op.GetReleaseNameSpacedName() + + expected := utils.GetDataMigrateReleaseName("my-migrate") + Expect(result.Namespace).To(Equal("my-ns")) + Expect(result.Name).To(Equal(expected)) + }) + }) + + Describe("GetChartsDirectory", func() { + It("should return charts directory containing datamigrate chart name", func() { + op := &dataMigrateOperation{dataMigrate: &datav1alpha1.DataMigrate{}} + result := op.GetChartsDirectory() + Expect(result).To(ContainSubstring(cdatamigrate.DataMigrateChart)) + }) + }) + + Describe("GetOperationType", func() { + It("should return DataMigrateType", func() { + op := &dataMigrateOperation{dataMigrate: &datav1alpha1.DataMigrate{}} + Expect(op.GetOperationType()).To(Equal(dataoperation.DataMigrateType)) + }) + }) + + Describe("UpdateStatusInfoForCompleted", func() { + It("should return nil (no-op)", func() { + op := &dataMigrateOperation{dataMigrate: &datav1alpha1.DataMigrate{}} + Expect(op.UpdateStatusInfoForCompleted(nil)).To(Succeed()) + }) + }) + + Describe("SetTargetDatasetStatusInProgress", func() { + It("should set dataset phase to DataMigrating", func() { + op := &dataMigrateOperation{dataMigrate: &datav1alpha1.DataMigrate{}} + dataset := &datav1alpha1.Dataset{} + + op.SetTargetDatasetStatusInProgress(dataset) + + Expect(dataset.Status.Phase).To(Equal(datav1alpha1.DataMigrating)) + }) + }) + + Describe("RemoveTargetDatasetStatusInProgress", func() { + It("should set dataset phase to BoundDatasetPhase", func() { + op := &dataMigrateOperation{dataMigrate: &datav1alpha1.DataMigrate{}} + dataset := &datav1alpha1.Dataset{} + + op.RemoveTargetDatasetStatusInProgress(dataset) + + Expect(dataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + }) + }) + + Describe("GetStatusHandler", func() { + It("should return OnceStatusHandler for Once policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.Once, + }, + }, + } + handler := op.GetStatusHandler() + Expect(handler).To(BeAssignableToTypeOf(&OnceStatusHandler{})) + }) + + It("should return CronStatusHandler for Cron policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.Cron, + }, + }, + } + handler := op.GetStatusHandler() + Expect(handler).To(BeAssignableToTypeOf(&CronStatusHandler{})) + }) + + It("should return OnEventStatusHandler for OnEvent policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.OnEvent, + }, + }, + } + handler := op.GetStatusHandler() + Expect(handler).To(BeAssignableToTypeOf(&OnEventStatusHandler{})) + }) + + It("should return nil for unknown policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: "Unknown", + }, + }, + } + handler := op.GetStatusHandler() + Expect(handler).To(BeNil()) + }) + }) + + Describe("GetTTL", func() { + It("should return TTL for Once policy", func() { + ttlVal := int32(300) + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.Once, + TTLSecondsAfterFinished: &ttlVal, + }, + }, + } + ttl, err := op.GetTTL() + Expect(err).NotTo(HaveOccurred()) + Expect(ttl).NotTo(BeNil()) + Expect(*ttl).To(Equal(ttlVal)) + }) + + It("should return nil TTL for Cron policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.Cron, + }, + }, + } + ttl, err := op.GetTTL() + Expect(err).NotTo(HaveOccurred()) + Expect(ttl).To(BeNil()) + }) + + It("should return nil TTL for OnEvent policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: datav1alpha1.OnEvent, + }, + }, + } + ttl, err := op.GetTTL() + Expect(err).NotTo(HaveOccurred()) + Expect(ttl).To(BeNil()) + }) + + It("should return error for unknown policy", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Policy: "UnknownPolicy", + }, + }, + } + ttl, err := op.GetTTL() + Expect(err).To(HaveOccurred()) + Expect(ttl).To(BeNil()) + }) + }) + + Describe("GetParallelTaskNumber", func() { + It("should return Parallelism from spec", func() { + op := &dataMigrateOperation{ + dataMigrate: &datav1alpha1.DataMigrate{ + Spec: datav1alpha1.DataMigrateSpec{ + Parallelism: 5, + }, + }, + } + Expect(op.GetParallelTaskNumber()).To(Equal(int32(5))) + }) + }) + + Describe("GetTargetDataset", func() { + It("should return the dataset when it exists and is referenced by datamigrate", func() { + testScheme := k8sruntime.NewScheme() + Expect(datav1alpha1.AddToScheme(testScheme)).To(Succeed()) + + dataset := &datav1alpha1.Dataset{} + dataset.Name = "my-dataset" + dataset.Namespace = "default" + + dm := &datav1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate", + Namespace: "default", + }, + Spec: datav1alpha1.DataMigrateSpec{ + To: datav1alpha1.DataToMigrate{ + DataSet: &datav1alpha1.DatasetToMigrate{ + Name: "my-dataset", + Namespace: "default", + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, dataset, dm) + op := &dataMigrateOperation{ + Client: fakeClient, + dataMigrate: dm, + } + + result, err := op.GetTargetDataset() + + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + Expect(result.Name).To(Equal("my-dataset")) + }) + }) + + Describe("UpdateOperationApiStatus", func() { + It("should update status without error", func() { + testScheme := k8sruntime.NewScheme() + Expect(datav1alpha1.AddToScheme(testScheme)).To(Succeed()) + + dm := &datav1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate", + Namespace: "default", + }, + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, dm) + op := &dataMigrateOperation{ + Client: fakeClient, + dataMigrate: dm, + } + + newStatus := &datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + } + err := op.UpdateOperationApiStatus(newStatus) + + Expect(err).NotTo(HaveOccurred()) }) - } -} + }) +}) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go index 90fbf728b53..7a516ffbdfc 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go @@ -1,285 +1,433 @@ /* - Copyright 2023 The Fluid Authors. +Copyright 2026 The Fluid Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package datamigrate import ( - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - appsv1 "k8s.io/api/apps/v1" - "reflect" - "testing" "time" - "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/common" - cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/agiledragon/gomonkey/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/compatibility" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) -func TestOnceGetOperationStatus(t *testing.T) { - testScheme := runtime.NewScheme() - _ = v1alpha1.AddToScheme(testScheme) - _ = batchv1.AddToScheme(testScheme) - - mockDataMigrate := v1alpha1.DataMigrate{ - ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: v1alpha1.DataMigrateSpec{}, - } - - mockJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate", - Namespace: "default", - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobComplete, - LastProbeTime: v1.NewTime(time.Now()), - LastTransitionTime: v1.NewTime(time.Now()), - }, +var _ = Describe("OnceStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockDataMigrate v1alpha1.DataMigrate + mockJob batchv1.Job + mockFailedJob batchv1.Job + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataMigrate = v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", }, - }, - } - - mockFailedJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate", - Namespace: "default", - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobFailed, - LastProbeTime: v1.NewTime(time.Now()), - LastTransitionTime: v1.NewTime(time.Now()), + Spec: v1alpha1.DataMigrateSpec{}, + } + + mockJob = batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, }, }, - }, - } - - testcases := []struct { - name string - job batchv1.Job - expectedPhase common.Phase - }{ - { - name: "job success", - job: mockJob, - expectedPhase: common.PhaseComplete, - }, - { - name: "job failed", - job: mockFailedJob, - expectedPhase: common.PhaseFailed, - }, - } - - for _, testcase := range testcases { - client := fake.NewFakeClientWithScheme(testScheme, &mockDataMigrate, &testcase.job) - onceStatusHandler := &OnceStatusHandler{Client: client, dataMigrate: &mockDataMigrate} - ctx := cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ + } + + mockFailedJob = batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", Namespace: "default", - Name: "", }, - Log: fake.NullLogger(), - } - opStatus, err := onceStatusHandler.GetOperationStatus(ctx, &mockDataMigrate.Status) - if err != nil { - t.Errorf("fail to GetOperationStatus with error %v", err) - } - if opStatus.Phase != testcase.expectedPhase { - t.Error("Failed to GetOperationStatus", "expected phase", testcase.expectedPhase, "get", opStatus.Phase) - } - } -} - -func TestCronGetOperationStatus(t *testing.T) { - testScheme := runtime.NewScheme() - _ = v1alpha1.AddToScheme(testScheme) - _ = batchv1.AddToScheme(testScheme) - - startTime := time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) - lastScheduleTime := v1.NewTime(startTime) - lastSuccessfulTime := v1.NewTime(startTime.Add(time.Second * 10)) - - mockCronDataMigrate := v1alpha1.DataMigrate{ - ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: v1alpha1.DataMigrateSpec{ - Policy: "Cron", - Schedule: "* * * * *", - }, - Status: v1alpha1.OperationStatus{ - Phase: common.PhaseComplete, - }, - } - - mockCronJob := batchv1.CronJob{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate", - Namespace: "default", - }, - Spec: batchv1.CronJobSpec{ - Schedule: "* * * * *", - }, - Status: batchv1.CronJobStatus{ - LastScheduleTime: &lastScheduleTime, - LastSuccessfulTime: &lastSuccessfulTime, - }, - } - - mockJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate-1", - Namespace: "default", - Labels: map[string]string{ - "cronjob": "test-migrate-migrate", + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, }, - CreationTimestamp: lastScheduleTime, - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobComplete, - LastProbeTime: lastSuccessfulTime, - LastTransitionTime: lastSuccessfulTime, + } + }) + + Describe("GetOperationStatus", func() { + It("should return PhaseComplete when job succeeded", func() { + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockDataMigrate, &mockJob) + handler := &OnceStatusHandler{Client: fakeClient, dataMigrate: &mockDataMigrate} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + }) + + It("should return PhaseFailed when job failed", func() { + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockDataMigrate, &mockFailedJob) + handler := &OnceStatusHandler{Client: fakeClient, dataMigrate: &mockDataMigrate} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseFailed)) + }) + + It("should return original status without change when job is still running (no finished condition)", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", + Namespace: "default", + }, + Status: batchv1.JobStatus{}, + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockDataMigrate, &runningJob) + handler := &OnceStatusHandler{Client: fakeClient, dataMigrate: &mockDataMigrate} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataMigrate.Status) + + // job still running; GetOperationStatus returns nil result (early return) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + }) + + It("should return PhaseComplete and set NodeAffinity for single-parallelism succeed job", func() { + parallelism1DataMigrate := v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", }, + Spec: v1alpha1.DataMigrateSpec{ + Parallelism: 1, + }, + } + // Job with node selector labels to trigger node affinity generation + jobWithNode := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, + }, + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, ¶llelism1DataMigrate, &jobWithNode) + handler := &OnceStatusHandler{Client: fakeClient, dataMigrate: ¶llelism1DataMigrate} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, ¶llelism1DataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + }) + }) +}) + +var _ = Describe("CronStatusHandler", func() { + var ( + testScheme *runtime.Scheme + startTime time.Time + lastScheduleTime v1.Time + lastSuccessfulTime v1.Time + mockCronDataMigrate v1alpha1.DataMigrate + mockCronJob batchv1.CronJob + patch *gomonkey.Patches + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + startTime = time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) + lastScheduleTime = v1.NewTime(startTime) + lastSuccessfulTime = v1.NewTime(startTime.Add(time.Second * 10)) + + mockCronDataMigrate = v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", }, - }, - } - - mockRunningJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate-1", - Namespace: "default", - Labels: map[string]string{ - "cronjob": "test-migrate-migrate", + Spec: v1alpha1.DataMigrateSpec{ + Policy: "Cron", + Schedule: "* * * * *", + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhaseComplete, }, - CreationTimestamp: lastScheduleTime, - }, - } - - testcases := []struct { - name string - job batchv1.Job - expectedPhase common.Phase - }{ - { - name: "job complete", - job: mockJob, - expectedPhase: common.PhaseComplete, - }, - { - name: "job running", - job: mockRunningJob, - expectedPhase: common.PhasePending, - }, - } - - for _, testcase := range testcases { - client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &testcase.job) - cronStatusHandler := &CronStatusHandler{Client: client, dataMigrate: &mockCronDataMigrate} - ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} - opStatus, err := cronStatusHandler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) - if err != nil { - t.Errorf("fail to GetOperationStatus with error %v", err) } - if !reflect.DeepEqual(opStatus.LastScheduleTime, &lastScheduleTime) || !reflect.DeepEqual(opStatus.LastSuccessfulTime, &lastSuccessfulTime) { - t.Error("fail to get correct Operation Status", "expected LastScheduleTime", lastScheduleTime, "expected LastSuccessfulTime", lastSuccessfulTime, "get", opStatus) + + mockCronJob = batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * *", + }, + Status: batchv1.CronJobStatus{ + LastScheduleTime: &lastScheduleTime, + LastSuccessfulTime: &lastSuccessfulTime, + }, } - if opStatus.Phase != testcase.expectedPhase { - t.Error("Failed to GetOperationStatus", "expected phase", testcase.expectedPhase, "get", opStatus.Phase) + + patch = gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool { + return true + }) + }) + + AfterEach(func() { + if patch != nil { + patch.Reset() } - } -} - -func TestCronGetOperationStatusWithParallelTasks(t1 *testing.T) { - testScheme := runtime.NewScheme() - _ = v1alpha1.AddToScheme(testScheme) - _ = batchv1.AddToScheme(testScheme) - _ = appsv1.AddToScheme(testScheme) - - startTime := time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) - lastScheduleTime := v1.NewTime(startTime) - lastSuccessfulTime := v1.NewTime(startTime.Add(time.Second * 10)) - - mockCronDataMigrate := v1alpha1.DataMigrate{ - ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: v1alpha1.DataMigrateSpec{ - Policy: "Cron", - Schedule: "* * * * *", - Parallelism: 3, - }, - Status: v1alpha1.OperationStatus{ - Phase: common.PhaseComplete, - }, - } - - mockCronJob := batchv1.CronJob{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate-migrate", - Namespace: "default", - }, - Spec: batchv1.CronJobSpec{ - Schedule: "* * * * *", - }, - Status: batchv1.CronJobStatus{ - LastScheduleTime: &lastScheduleTime, - LastSuccessfulTime: &lastSuccessfulTime, - }, - } - - defaultStsReplicas := mockCronDataMigrate.Spec.Parallelism - 1 - sts := appsv1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: utils.GetParallelOperationWorkersName(utils.GetDataMigrateReleaseName(mockCronDataMigrate.Name)), - Namespace: mockCronDataMigrate.Namespace, - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &defaultStsReplicas, - }, - Status: appsv1.StatefulSetStatus{}, - } - - trueFlag := true - - testcases := []struct { - name string - job batchv1.Job - expectedPhase common.Phase - stsReplicas int32 - }{ - { - name: "job complete, suspend is false", - job: batchv1.Job{ + }) + + Describe("GetOperationStatus (non-parallel)", func() { + It("should return PhaseComplete and correct timestamps when job completes", func() { + mockJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": "test-migrate-migrate", + }, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &mockJob) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: &mockCronDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + }) + + It("should return PhasePending when job is still running", func() { + mockRunningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": "test-migrate-migrate", + }, + CreationTimestamp: lastScheduleTime, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &mockRunningJob) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: &mockCronDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhasePending)) + }) + + It("should return PhaseFailed when cron job's latest job failed", func() { + mockFailedCronJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": "test-migrate-migrate", + }, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &mockFailedCronJob) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: &mockCronDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseFailed)) + }) + + It("should skip and return current status when no current job matches schedule time", func() { + // Job with CreationTimestamp BEFORE lastScheduleTime — won't match + oldJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": "test-migrate-migrate", + }, + CreationTimestamp: v1.NewTime(startTime.Add(-time.Hour)), + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &oldJob) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: &mockCronDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + // currentJob is nil — returns early with the current opStatus copy + Expect(opStatus).NotTo(BeNil()) + }) + }) + + Describe("GetOperationStatus (parallel tasks)", func() { + var ( + parallelScheme *runtime.Scheme + parallelDataMigrate v1alpha1.DataMigrate + parallelCronJob batchv1.CronJob + sts appsv1.StatefulSet + defaultStsReplicas int32 + ) + + BeforeEach(func() { + parallelScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(parallelScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(parallelScheme)).To(Succeed()) + Expect(appsv1.AddToScheme(parallelScheme)).To(Succeed()) + + parallelDataMigrate = v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1alpha1.DataMigrateSpec{ + Policy: "Cron", + Schedule: "* * * * *", + Parallelism: 3, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + + parallelCronJob = batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate-migrate", + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * *", + }, + Status: batchv1.CronJobStatus{ + LastScheduleTime: &lastScheduleTime, + LastSuccessfulTime: &lastSuccessfulTime, + }, + } + + defaultStsReplicas = parallelDataMigrate.Spec.Parallelism - 1 + sts = appsv1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Name: utils.GetParallelOperationWorkersName(utils.GetDataMigrateReleaseName(parallelDataMigrate.Name)), + Namespace: parallelDataMigrate.Namespace, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &defaultStsReplicas, + }, + Status: appsv1.StatefulSetStatus{}, + } + }) + + It("should return PhaseComplete and leave StatefulSet replicas unchanged when job completes without suspend", func() { + job := batchv1.Job{ ObjectMeta: v1.ObjectMeta{ Name: "test-migrate-migrate-1", Namespace: "default", @@ -303,13 +451,27 @@ func TestCronGetOperationStatusWithParallelTasks(t1 *testing.T) { }, }, }, - }, - expectedPhase: common.PhaseComplete, - stsReplicas: *sts.Spec.Replicas, - }, - { - name: "job start, suspend is true", - job: batchv1.Job{ + } + + fakeClient := fake.NewFakeClientWithScheme(parallelScheme, ¶llelDataMigrate, ¶llelCronJob, &job, &sts) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: ¶llelDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, ¶llelDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + + updatedSts, err := kubeclient.GetStatefulSet(fakeClient, sts.Name, sts.Namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(*updatedSts.Spec.Replicas).To(Equal(defaultStsReplicas)) + }) + + It("should return PhasePending and scale StatefulSet when job starts with suspend=true", func() { + trueFlag := true + job := batchv1.Job{ ObjectMeta: v1.ObjectMeta{ Name: "test-migrate-migrate-1", Namespace: "default", @@ -330,36 +492,42 @@ func TestCronGetOperationStatusWithParallelTasks(t1 *testing.T) { }, }, }, - }, - expectedPhase: common.PhasePending, - stsReplicas: *sts.Spec.Replicas, - }, - } - - for _, testcase := range testcases { - t1.Run(testcase.name, func(t *testing.T) { - client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataMigrate, &mockCronJob, &testcase.job, &sts) - cronStatusHandler := &CronStatusHandler{Client: client, dataMigrate: &mockCronDataMigrate} - ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} - opStatus, err := cronStatusHandler.GetOperationStatus(ctx, &mockCronDataMigrate.Status) - if err != nil { - t.Errorf("fail to GetOperationStatus with error %v", err) - } - if !reflect.DeepEqual(opStatus.LastScheduleTime, &lastScheduleTime) || !reflect.DeepEqual(opStatus.LastSuccessfulTime, &lastSuccessfulTime) { - t.Error("fail to get correct Operation Status", "expected LastScheduleTime", lastScheduleTime, "expected LastSuccessfulTime", lastSuccessfulTime, "get", opStatus) - } - if opStatus.Phase != testcase.expectedPhase { - t.Error("Failed to GetOperationStatus", "expected phase", testcase.expectedPhase, "get", opStatus.Phase) } - updatedSts, err := kubeclient.GetStatefulSet(client, sts.Name, sts.Namespace) - if err != nil { - t.Error("Failed to GetStatefulSet", err) - } + fakeClient := fake.NewFakeClientWithScheme(parallelScheme, ¶llelDataMigrate, ¶llelCronJob, &job, &sts) + handler := &CronStatusHandler{Client: fakeClient, dataMigrate: ¶llelDataMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} - if *updatedSts.Spec.Replicas != testcase.stsReplicas { - t.Error("Failed to GetOperationStatus", "expected replicas", testcase.stsReplicas, "get", *sts.Spec.Replicas) - } + opStatus, err := handler.GetOperationStatus(ctx, ¶llelDataMigrate.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhasePending)) + + updatedSts, err := kubeclient.GetStatefulSet(fakeClient, sts.Name, sts.Namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(*updatedSts.Spec.Replicas).To(Equal(defaultStsReplicas)) + }) + }) +}) + +var _ = Describe("OnEventStatusHandler", func() { + Describe("GetOperationStatus", func() { + It("should return nil result and nil error (stub)", func() { + testScheme := runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + dm := &v1alpha1.DataMigrate{} + dm.Name = "test" + dm.Namespace = "default" + fakeClient := fake.NewFakeClientWithScheme(testScheme, dm) + handler := &OnEventStatusHandler{Client: fakeClient, dataMigrate: dm} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + result, err := handler.GetOperationStatus(ctx, &dm.Status) + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeNil()) }) - } -} + }) +}) diff --git a/pkg/controllers/v1alpha1/datamigrate/suite_test.go b/pkg/controllers/v1alpha1/datamigrate/suite_test.go new file mode 100644 index 00000000000..a0c17f8e8cb --- /dev/null +++ b/pkg/controllers/v1alpha1/datamigrate/suite_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamigrate + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +func TestDataMigrate(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "DataMigrate Controller Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(fake.NullLogger()) +}) From b6652dfe56188213b3b3db39309c118247f56724 Mon Sep 17 00:00:00 2001 From: Harsh Date: Tue, 24 Mar 2026 15:25:02 +0530 Subject: [PATCH 2/3] test(datamigrate): address Copilot review findings - Fix misleading comment on running-job test (returns DeepCopy, not nil) - Rename NodeAffinity test to reflect actual assertion scope - Improve suspend test: start STS at 0 replicas for observable scaling, assert job.Spec.Suspend is flipped to false Signed-off-by: Harsh --- .../datamigrate/status_handler_test.go | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go index 7a516ffbdfc..d8a66bad1fb 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go @@ -17,6 +17,7 @@ limitations under the License. package datamigrate import ( + "context" "time" "github.com/agiledragon/gomonkey/v2" @@ -147,12 +148,12 @@ var _ = Describe("OnceStatusHandler", func() { opStatus, err := handler.GetOperationStatus(ctx, &mockDataMigrate.Status) - // job still running; GetOperationStatus returns nil result (early return) + // job still running; GetOperationStatus returns a deep-copied status without modification Expect(err).NotTo(HaveOccurred()) Expect(opStatus).NotTo(BeNil()) }) - It("should return PhaseComplete and set NodeAffinity for single-parallelism succeed job", func() { + It("should return PhaseComplete for single-parallelism succeed job", func() { parallelism1DataMigrate := v1alpha1.DataMigrate{ ObjectMeta: v1.ObjectMeta{ Name: "test", @@ -469,7 +470,7 @@ var _ = Describe("CronStatusHandler", func() { Expect(*updatedSts.Spec.Replicas).To(Equal(defaultStsReplicas)) }) - It("should return PhasePending and scale StatefulSet when job starts with suspend=true", func() { + It("should return PhasePending, scale StatefulSet, and unsuspend job when job starts with suspend=true", func() { trueFlag := true job := batchv1.Job{ ObjectMeta: v1.ObjectMeta{ @@ -494,6 +495,10 @@ var _ = Describe("CronStatusHandler", func() { }, } + // Start StatefulSet at 0 replicas so ScaleStatefulSet has an observable effect + zeroReplicas := int32(0) + sts.Spec.Replicas = &zeroReplicas + fakeClient := fake.NewFakeClientWithScheme(parallelScheme, ¶llelDataMigrate, ¶llelCronJob, &job, &sts) handler := &CronStatusHandler{Client: fakeClient, dataMigrate: ¶llelDataMigrate} ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} @@ -505,9 +510,18 @@ var _ = Describe("CronStatusHandler", func() { Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) Expect(opStatus.Phase).To(Equal(common.PhasePending)) + // Verify StatefulSet was scaled up to Parallelism-1 updatedSts, err := kubeclient.GetStatefulSet(fakeClient, sts.Name, sts.Namespace) Expect(err).NotTo(HaveOccurred()) Expect(*updatedSts.Spec.Replicas).To(Equal(defaultStsReplicas)) + + // Verify job was unsuspended + updatedJob := &batchv1.Job{} + Expect(fakeClient.Get(context.TODO(), types.NamespacedName{ + Name: job.Name, + Namespace: job.Namespace, + }, updatedJob)).To(Succeed()) + Expect(*updatedJob.Spec.Suspend).To(BeFalse()) }) }) }) From 53ff1c97f3ff29eb4ab7b2fbd88b4c7bf1a2f04b Mon Sep 17 00:00:00 2001 From: Harsh Date: Tue, 24 Mar 2026 16:36:30 +0530 Subject: [PATCH 3/3] test(datamigrate): deduplicate repeated test literals Signed-off-by: Harsh --- .../v1alpha1/datamigrate/implement_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/v1alpha1/datamigrate/implement_test.go b/pkg/controllers/v1alpha1/datamigrate/implement_test.go index 46403d06587..90d71e2de97 100644 --- a/pkg/controllers/v1alpha1/datamigrate/implement_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/implement_test.go @@ -33,6 +33,11 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) +const ( + testDataMigrateName = "test-migrate" + testDatasetName = "my-dataset" +) + var _ = Describe("dataMigrateOperation", func() { Describe("Validate", func() { It("should error when SSH secret is not set for parallel migrate", func() { @@ -106,7 +111,7 @@ var _ = Describe("dataMigrateOperation", func() { Describe("GetOperationObject", func() { It("should return the dataMigrate object", func() { dm := &datav1alpha1.DataMigrate{} - dm.Name = "test-migrate" + dm.Name = testDataMigrateName op := &dataMigrateOperation{dataMigrate: dm} Expect(op.GetOperationObject()).To(Equal(dm)) @@ -375,18 +380,18 @@ var _ = Describe("dataMigrateOperation", func() { Expect(datav1alpha1.AddToScheme(testScheme)).To(Succeed()) dataset := &datav1alpha1.Dataset{} - dataset.Name = "my-dataset" + dataset.Name = testDatasetName dataset.Namespace = "default" dm := &datav1alpha1.DataMigrate{ ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate", + Name: testDataMigrateName, Namespace: "default", }, Spec: datav1alpha1.DataMigrateSpec{ To: datav1alpha1.DataToMigrate{ DataSet: &datav1alpha1.DatasetToMigrate{ - Name: "my-dataset", + Name: testDatasetName, Namespace: "default", }, }, @@ -403,7 +408,7 @@ var _ = Describe("dataMigrateOperation", func() { Expect(err).NotTo(HaveOccurred()) Expect(result).NotTo(BeNil()) - Expect(result.Name).To(Equal("my-dataset")) + Expect(result.Name).To(Equal(testDatasetName)) }) }) @@ -414,7 +419,7 @@ var _ = Describe("dataMigrateOperation", func() { dm := &datav1alpha1.DataMigrate{ ObjectMeta: v1.ObjectMeta{ - Name: "test-migrate", + Name: testDataMigrateName, Namespace: "default", }, }