1717package dataflowaffinity
1818
1919import (
20+ "context"
21+
22+ . "github.com/onsi/ginkgo/v2"
23+ . "github.com/onsi/gomega"
24+
2025 datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
2126 "github.com/fluid-cloudnative/fluid/pkg/common"
2227 "github.com/fluid-cloudnative/fluid/pkg/utils/fake"
2328 batchv1 "k8s.io/api/batch/v1"
2429 v1 "k8s.io/api/core/v1"
2530 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2631 "k8s.io/apimachinery/pkg/runtime"
27- "reflect "
28- "testing "
32+ "k8s.io/apimachinery/pkg/types "
33+ "sigs.k8s.io/controller-runtime/pkg/reconcile "
2934)
3035
31- func TestDataOpJobReconciler_injectPodNodeLabelsToJob (t * testing.T ) {
32- type args struct {
33- job * batchv1.Job
34- pods * v1.Pod
35- node * v1.Node
36- }
37- tests := []struct {
38- name string
39- args args
40- wantAnnotations map [string ]string
41- wantErr bool
42- }{
43- {
44- name : "job with succeed pods" ,
45- args : args {
46- job : & batchv1.Job {
36+ var _ = Describe ("DataOpJobReconciler" , func () {
37+ var testScheme * runtime.Scheme
38+
39+ BeforeEach (func () {
40+ testScheme = runtime .NewScheme ()
41+ Expect (v1 .AddToScheme (testScheme )).To (Succeed ())
42+ Expect (batchv1 .AddToScheme (testScheme )).To (Succeed ())
43+ Expect (datav1alpha1 .AddToScheme (testScheme )).To (Succeed ())
44+ })
45+
46+ Describe ("ControllerName" , func () {
47+ It ("returns the controller name constant" , func () {
48+ f := & DataOpJobReconciler {Log : fake .NullLogger ()}
49+ Expect (f .ControllerName ()).To (Equal (DataOpJobControllerName ))
50+ })
51+ })
52+
53+ Describe ("ManagedResource" , func () {
54+ It ("returns a batchv1.Job object" , func () {
55+ f := & DataOpJobReconciler {Log : fake .NullLogger ()}
56+ obj := f .ManagedResource ()
57+ Expect (obj ).To (BeAssignableToTypeOf (& batchv1.Job {}))
58+ })
59+ })
60+
61+ Describe ("NewDataOpJobReconciler" , func () {
62+ It ("constructs a reconciler with the given client, logger, and recorder" , func () {
63+ c := fake .NewFakeClientWithScheme (testScheme )
64+ logger := fake .NullLogger ()
65+ r := NewDataOpJobReconciler (c , logger , nil )
66+ Expect (r ).NotTo (BeNil ())
67+ Expect (r .Client ).To (Equal (c ))
68+ })
69+ })
70+
71+ Describe ("Reconcile" , func () {
72+ Context ("when the job does not exist" , func () {
73+ It ("returns an error (not-found propagates)" , func () {
74+ c := fake .NewFakeClientWithScheme (testScheme )
75+ f := & DataOpJobReconciler {Client : c , Log : fake .NullLogger ()}
76+ _ , err := f .Reconcile (context .TODO (), reconcile.Request {
77+ NamespacedName : types.NamespacedName {Name : "missing-job" , Namespace : "default" },
78+ })
79+ Expect (err ).To (HaveOccurred ())
80+ })
81+ })
82+
83+ Context ("when job should not be in queue (cronjob label)" , func () {
84+ It ("returns no-requeue without error" , func () {
85+ job := & batchv1.Job {
86+ ObjectMeta : metav1.ObjectMeta {
87+ Name : "cron-job" ,
88+ Namespace : "default" ,
89+ Labels : map [string ]string {
90+ common .LabelAnnotationManagedBy : common .Fluid ,
91+ "cronjob" : "something" ,
92+ },
93+ },
94+ }
95+ c := fake .NewFakeClientWithScheme (testScheme , job )
96+ f := & DataOpJobReconciler {Client : c , Log : fake .NullLogger ()}
97+ result , err := f .Reconcile (context .TODO (), reconcile.Request {
98+ NamespacedName : types.NamespacedName {Name : "cron-job" , Namespace : "default" },
99+ })
100+ Expect (err ).NotTo (HaveOccurred ())
101+ Expect (result .Requeue ).To (BeFalse ())
102+ })
103+ })
104+
105+ Context ("when job is a valid fluid job without affinity annotation" , func () {
106+ It ("injects the dataflow affinity annotation and returns no-requeue" , func () {
107+ job := & batchv1.Job {
108+ ObjectMeta : metav1.ObjectMeta {
109+ Name : "test-job" ,
110+ Namespace : "default" ,
111+ Labels : map [string ]string {
112+ common .LabelAnnotationManagedBy : common .Fluid ,
113+ },
114+ },
115+ }
116+ c := fake .NewFakeClientWithScheme (testScheme , job )
117+ f := & DataOpJobReconciler {Client : c , Log : fake .NullLogger ()}
118+ result , err := f .Reconcile (context .TODO (), reconcile.Request {
119+ NamespacedName : types.NamespacedName {Name : "test-job" , Namespace : "default" },
120+ })
121+ Expect (err ).NotTo (HaveOccurred ())
122+ Expect (result .Requeue ).To (BeFalse ())
123+ })
124+ })
125+
126+ Context ("when job is complete and has a succeeded pod" , func () {
127+ It ("injects node labels and returns no-requeue" , func () {
128+ job := & batchv1.Job {
129+ ObjectMeta : metav1.ObjectMeta {
130+ Name : "complete-job" ,
131+ Namespace : "default" ,
132+ Labels : map [string ]string {
133+ common .LabelAnnotationManagedBy : common .Fluid ,
134+ },
135+ Annotations : map [string ]string {
136+ common .AnnotationDataFlowAffinityInject : "true" ,
137+ },
138+ },
139+ Spec : batchv1.JobSpec {
140+ Selector : & metav1.LabelSelector {
141+ MatchLabels : map [string ]string {
142+ "controller-uid" : "abc-123" ,
143+ },
144+ },
145+ },
146+ Status : batchv1.JobStatus {
147+ Conditions : []batchv1.JobCondition {
148+ {Type : batchv1 .JobComplete },
149+ },
150+ },
151+ }
152+ pod := & v1.Pod {
153+ ObjectMeta : metav1.ObjectMeta {
154+ Name : "complete-pod" ,
155+ Namespace : "default" ,
156+ Labels : map [string ]string {
157+ "controller-uid" : "abc-123" ,
158+ },
159+ },
160+ Spec : v1.PodSpec {
161+ NodeName : "node01" ,
162+ },
163+ Status : v1.PodStatus {
164+ Phase : v1 .PodSucceeded ,
165+ },
166+ }
167+ node := & v1.Node {
168+ ObjectMeta : metav1.ObjectMeta {
169+ Name : "node01" ,
170+ Labels : map [string ]string {
171+ common .K8sNodeNameLabelKey : "node01" ,
172+ common .K8sRegionLabelKey : "region01" ,
173+ common .K8sZoneLabelKey : "zone01" ,
174+ },
175+ },
176+ }
177+ c := fake .NewFakeClientWithScheme (testScheme , job , pod , node )
178+ f := & DataOpJobReconciler {Client : c , Log : fake .NullLogger ()}
179+ result , err := f .Reconcile (context .TODO (), reconcile.Request {
180+ NamespacedName : types.NamespacedName {Name : "complete-job" , Namespace : "default" },
181+ })
182+ Expect (err ).NotTo (HaveOccurred ())
183+ Expect (result .Requeue ).To (BeFalse ())
184+ })
185+ })
186+ })
187+
188+ Describe ("injectPodNodeLabelsToJob" , func () {
189+ Context ("when job has a succeeded pod" , func () {
190+ It ("should inject node labels as annotations onto the job" , func () {
191+ job := & batchv1.Job {
47192 ObjectMeta : metav1.ObjectMeta {
48193 Name : "test-job" ,
49194 Labels : map [string ]string {
@@ -57,8 +202,8 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
57202 },
58203 },
59204 },
60- },
61- pods : & v1.Pod {
205+ }
206+ pod := & v1.Pod {
62207 ObjectMeta : metav1.ObjectMeta {
63208 Name : "test-pod" ,
64209 Labels : map [string ]string {
@@ -91,8 +236,8 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
91236 Status : v1.PodStatus {
92237 Phase : v1 .PodSucceeded ,
93238 },
94- },
95- node : & v1.Node {
239+ }
240+ node := & v1.Node {
96241 ObjectMeta : metav1.ObjectMeta {
97242 Name : "node01" ,
98243 Labels : map [string ]string {
@@ -102,29 +247,42 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
102247 "k8s.gpu" : "true" ,
103248 },
104249 },
105- },
106- },
107- wantAnnotations : map [string ]string {
108- common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sNodeNameLabelKey : "node01" ,
109- common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sRegionLabelKey : "region01" ,
110- common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sZoneLabelKey : "zone01" ,
111- common .AnnotationDataFlowCustomizedAffinityPrefix + "k8s.gpu" : "true" ,
112- },
113- wantErr : false ,
114- },
115- {
116- name : "job with failed pods" ,
117- args : args {
118- job : & batchv1.Job {
250+ }
251+
252+ c := fake .NewFakeClientWithScheme (testScheme , job , pod , node )
253+ f := & DataOpJobReconciler {
254+ Client : c ,
255+ Log : fake .NullLogger (),
256+ }
257+
258+ err := f .injectPodNodeLabelsToJob (job )
259+ Expect (err ).NotTo (HaveOccurred ())
260+
261+ wantAnnotations := map [string ]string {
262+ common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sNodeNameLabelKey : "node01" ,
263+ common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sRegionLabelKey : "region01" ,
264+ common .AnnotationDataFlowCustomizedAffinityPrefix + common .K8sZoneLabelKey : "zone01" ,
265+ common .AnnotationDataFlowCustomizedAffinityPrefix + "k8s.gpu" : "true" ,
266+ }
267+ Expect (job .Annotations ).To (Equal (wantAnnotations ))
268+ })
269+ })
270+
271+ Context ("when job has only a failed pod" , func () {
272+ It ("should return an error" , func () {
273+ job := & batchv1.Job {
274+ ObjectMeta : metav1.ObjectMeta {
275+ Name : "test-job-failed" ,
276+ },
119277 Spec : batchv1.JobSpec {
120278 Selector : & metav1.LabelSelector {
121279 MatchLabels : map [string ]string {
122280 "controller-uid" : "455afc34-93b1-4e75-a6fa-8e13d2c6ca06" ,
123281 },
124282 },
125283 },
126- },
127- pods : & v1.Pod {
284+ }
285+ pod := & v1.Pod {
128286 ObjectMeta : metav1.ObjectMeta {
129287 Name : "test-pod" ,
130288 Labels : map [string ]string {
@@ -134,41 +292,25 @@ func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
134292 Status : v1.PodStatus {
135293 Phase : v1 .PodFailed ,
136294 },
137- },
138- node : & v1.Node {
295+ }
296+ node := & v1.Node {
139297 ObjectMeta : metav1.ObjectMeta {
140298 Name : "node01" ,
141299 Labels : map [string ]string {
142300 common .K8sNodeNameLabelKey : "node01" ,
143- common .K8sRegionLabelKey : "region01" ,
144- common .K8sZoneLabelKey : "zone01" ,
145- "k8s.gpu" : "true" ,
146301 },
147302 },
148- },
149- },
150- wantErr : true ,
151- },
152- }
153- testScheme := runtime .NewScheme ()
154- _ = v1 .AddToScheme (testScheme )
155- _ = batchv1 .AddToScheme (testScheme )
156- _ = datav1alpha1 .AddToScheme (testScheme )
157- for _ , tt := range tests {
158- t .Run (tt .name , func (t * testing.T ) {
159- var c = fake .NewFakeClientWithScheme (testScheme , tt .args .job , tt .args .pods , tt .args .node )
160-
161- f := & DataOpJobReconciler {
162- Client : c ,
163- Log : fake .NullLogger (),
164- }
165- err := f .injectPodNodeLabelsToJob (tt .args .job )
166- if (err != nil ) != tt .wantErr {
167- t .Errorf ("injectPodNodeLabelsToJob() error = %v, wantErr %v" , err , tt .wantErr )
168- }
169- if err == nil && ! reflect .DeepEqual (tt .args .job .Annotations , tt .wantAnnotations ) {
170- t .Errorf ("injectPodNodeLabelsToJob() got = %v, want %v" , tt .args .job .Labels , tt .wantAnnotations )
171- }
303+ }
304+
305+ c := fake .NewFakeClientWithScheme (testScheme , job , pod , node )
306+ f := & DataOpJobReconciler {
307+ Client : c ,
308+ Log : fake .NullLogger (),
309+ }
310+
311+ err := f .injectPodNodeLabelsToJob (job )
312+ Expect (err ).To (HaveOccurred ())
313+ })
172314 })
173- }
174- }
315+ })
316+ })
0 commit comments