@@ -24,6 +24,7 @@ import (
2424 "strings"
2525 "time"
2626
27+ appsv1 "k8s.io/api/apps/v1"
2728 corev1 "k8s.io/api/core/v1"
2829 "k8s.io/apimachinery/pkg/types"
2930 "k8s.io/client-go/util/retry"
@@ -190,148 +191,194 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run
190191 return
191192}
192193
193- func (j * JuiceFSEngine ) syncFuseSpec (ctx cruntime.ReconcileRequestContext , runtime * datav1alpha1.JuiceFSRuntime , value * JuiceFS ) (changed bool , err error ) {
194+ func (j * JuiceFSEngine ) syncFuseSpec (ctx cruntime.ReconcileRequestContext , runtime * datav1alpha1.JuiceFSRuntime , latestValue * JuiceFS ) (bool , error ) {
194195 j .Log .V (1 ).Info ("syncFuseSpec" )
195- var cmdChanged bool
196- err = retry .RetryOnConflict (retry .DefaultBackoff , func () error {
196+
197+ //1. check if fuse cmd configmap needs to update
198+ if err := j .updateFuseCmdConfigmapOnChanged (latestValue ); err != nil {
199+ return false , err
200+ }
201+
202+ //2. check if fuse needs to update
203+ var fuseChanged , fuseGenerationNeedIncrease bool
204+ err := retry .RetryOnConflict (retry .DefaultBackoff , func () error {
197205 fuses , err := kubeclient .GetDaemonset (j .Client , j .getFuseName (), j .namespace )
198206 if err != nil {
199207 return err
200208 }
201-
202209 fusesToUpdate := fuses .DeepCopy ()
203210
204- // nodeSelector
205- if nodeSelectorChanged , newSelector := j .isNodeSelectorChanged (fusesToUpdate .Spec .Template .Spec .NodeSelector , value .Fuse .NodeSelector ); nodeSelectorChanged {
206- j .Log .Info ("syncFuseSpec nodeSelectorChanged" )
207- fusesToUpdate .Spec .Template .Spec .NodeSelector = newSelector
208- changed = true
211+ currentValue , err := j .GetValueFromConfigmap ()
212+ if err != nil {
213+ return err
209214 }
210215
211- // volumes
212- if volumeChanged , newVolumes := j .isVolumesChanged (fusesToUpdate .Spec .Template .Spec .Volumes , value .Fuse .Volumes ); volumeChanged {
213- j .Log .Info ("syncFuseSpec volumeChanged" )
214- fusesToUpdate .Spec .Template .Spec .Volumes = newVolumes
215- changed = true
216+ fuseChanged , fuseGenerationNeedIncrease = j .checkAndSetFuseChanges (currentValue , latestValue , runtime , fusesToUpdate )
217+ if ! fuseChanged {
218+ j .Log .V (1 ).Info ("The fuse is not changed" )
219+ return nil
216220 }
217221
218- // labels
219- if labelChanged , newLabels := j .isLabelsChanged (fusesToUpdate .Spec .Template .ObjectMeta .Labels , value .Fuse .Labels ); labelChanged {
220- j .Log .Info ("syncFuseSpec labelChanged" )
221- fusesToUpdate .Spec .Template .ObjectMeta .Labels = newLabels
222- changed = true
222+ if reflect .DeepEqual (fuses , fusesToUpdate ) {
223+ fuseChanged = false
224+ j .Log .V (1 ).Info ("The fuse is not changed, skip" )
225+ return nil
223226 }
227+ j .Log .Info ("The fuse changes, need to update" )
224228
225- // annotations
226- if annoChanged , newAnnos := j .isAnnotationsChanged (fusesToUpdate .Spec .Template .ObjectMeta .Annotations , value .Fuse .Annotations ); annoChanged {
227- j .Log .Info ("syncFuseSpec annoChanged" )
228- fusesToUpdate .Spec .Template .ObjectMeta .Annotations = newAnnos
229- changed = true
229+ if fuseGenerationNeedIncrease {
230+ err := j .increaseFuseGeneration (fusesToUpdate )
231+ if err != nil {
232+ j .Log .Error (err , "Failed to update the fuse generation" )
233+ return err
234+ }
230235 }
231236
232- // options -> configmap
233- fuseCommand , err := j .getFuseCommand ()
234- if err != nil || fuseCommand == "" {
235- j .Log .Error (err , "Failed to get fuse command" )
237+ if err := j .Client .Update (context .TODO (), fusesToUpdate ); err != nil {
238+ j .Log .Error (err , "Failed to update the ds spec" )
236239 return err
237240 }
238- cmdChanged , _ = j .isCommandChanged (fuseCommand , value .Fuse .Command )
239241
240- if len (fusesToUpdate .Spec .Template .Spec .Containers ) == 1 {
241- // resource
242- if resourcesChanged , newResources := j .isResourcesChanged (fusesToUpdate .Spec .Template .Spec .Containers [0 ].Resources , runtime .Spec .Fuse .Resources ); resourcesChanged {
243- j .Log .Info ("syncFuseSpec resourcesChanged" )
244- fusesToUpdate .Spec .Template .Spec .Containers [0 ].Resources = newResources
245- changed = true
246- }
242+ if err := j .SaveValueToConfigmap (latestValue ); err != nil {
243+ j .Log .Error (err , "Failed to update the ds spec" )
244+ return err
245+ }
247246
248- // env
249- if envChanged , newEnvs := j .isEnvsChanged (fusesToUpdate .Spec .Template .Spec .Containers [0 ].Env , value .Fuse .Envs ); envChanged {
250- j .Log .Info ("syncFuseSpec envChanged" )
251- fusesToUpdate .Spec .Template .Spec .Containers [0 ].Env = newEnvs
252- changed = true
253- }
247+ return nil
248+ })
249+ if err != nil {
250+ if fluiderrs .IsDeprecated (err ) {
251+ j .Log .Info ("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset" , "details" , err )
252+ return false , nil
253+ }
254+ return false , err
255+ }
254256
255- // volumeMounts
256- if volumeMountChanged , newVolumeMounts := j .isVolumeMountsChanged (fusesToUpdate .Spec .Template .Spec .Containers [0 ].VolumeMounts , value .Fuse .VolumeMounts ); volumeMountChanged {
257- j .Log .Info ("syncFuseSpec volumeMountChanged" )
258- fusesToUpdate .Spec .Template .Spec .Containers [0 ].VolumeMounts = newVolumeMounts
259- changed = true
260- }
257+ return fuseChanged , nil
258+ }
261259
262- // image
263- fuseImage := value .Fuse .Image
264- if value .ImageTag != "" {
265- fuseImage = fuseImage + ":" + value .Fuse .ImageTag
266- }
267- if imageChanged , newImage := j .isImageChanged (fusesToUpdate .Spec .Template .Spec .Containers [0 ].Image , fuseImage ); imageChanged {
268- j .Log .Info ("syncFuseSpec imageChanged" )
269- fusesToUpdate .Spec .Template .Spec .Containers [0 ].Image = newImage
270- changed = true
271- }
260+ func (j * JuiceFSEngine ) checkAndSetFuseChanges (currentValue , latestValue * JuiceFS , runtime * datav1alpha1.JuiceFSRuntime , fusesToUpdate * appsv1.DaemonSet ) (bool , bool ) {
261+ var fuseChanged , fuseGenerationNeedUpdate bool
262+ // nodeSelector
263+ if nodeSelectorChanged , newSelector := j .isNodeSelectorChanged (currentValue .Fuse .NodeSelector , latestValue .Fuse .NodeSelector ); nodeSelectorChanged {
264+ j .Log .Info ("syncFuseSpec nodeSelectorChanged" )
265+ fusesToUpdate .Spec .Template .Spec .NodeSelector = newSelector
266+ fuseChanged = true
267+ }
268+
269+ // volumes
270+ if volumeChanged , newVolumes := j .isVolumesChanged (currentValue .Fuse .Volumes , latestValue .Fuse .Volumes ); volumeChanged {
271+ j .Log .Info ("syncFuseSpec volumeChanged" )
272+ fusesToUpdate .Spec .Template .Spec .Volumes = newVolumes
273+ fuseChanged = true
274+ }
275+
276+ // labels
277+ if labelChanged , newLabels := j .isLabelsChanged (currentValue .Fuse .Labels , latestValue .Fuse .Labels ); labelChanged {
278+ j .Log .Info ("syncFuseSpec labelChanged" )
279+ fusesToUpdate .Spec .Template .ObjectMeta .Labels = newLabels
280+ fuseChanged = true
281+ }
282+
283+ // annotations
284+ if annoChanged , newAnnos := j .isAnnotationsChanged (currentValue .Fuse .Annotations , latestValue .Fuse .Annotations ); annoChanged {
285+ j .Log .Info ("syncFuseSpec annoChanged" )
286+ fusesToUpdate .Spec .Template .ObjectMeta .Annotations = newAnnos
287+ fuseChanged = true
288+ }
289+
290+ if len (fusesToUpdate .Spec .Template .Spec .Containers ) == 1 {
291+ // resource
292+ if resourcesChanged , newResources := j .isResourcesChanged (fusesToUpdate .Spec .Template .Spec .Containers [0 ].Resources , runtime .Spec .Fuse .Resources ); resourcesChanged {
293+ j .Log .Info ("syncFuseSpec resourcesChanged" )
294+ fusesToUpdate .Spec .Template .Spec .Containers [0 ].Resources = newResources
295+ fuseChanged = true
272296 }
273297
274- if cmdChanged {
275- j .Log .Info ("The fuse config is updated" )
276- err = j .updateFuseScript (value .Fuse .Command )
277- if err != nil {
278- j .Log .Error (err , "Failed to update the ds config" )
279- return err
280- }
281- } else {
282- j .Log .V (1 ).Info ("The fuse config is not changed" )
298+ // env
299+ if envChanged , newEnvs := j .isEnvsChanged (currentValue .Fuse .Envs , latestValue .Fuse .Envs ); envChanged {
300+ j .Log .Info ("syncFuseSpec envChanged" )
301+ fusesToUpdate .Spec .Template .Spec .Containers [0 ].Env = newEnvs
302+ fuseChanged = true
283303 }
284304
285- if changed {
286- if reflect .DeepEqual (fuses , fusesToUpdate ) {
287- changed = false
288- j .Log .V (1 ).Info ("The fuse is not changed, skip" )
289- return nil
290- }
291- j .Log .Info ("The fuse is updated" )
292-
293- if currentGeneration , exist := fusesToUpdate .Spec .Template .Labels [common .LabelRuntimeFuseGeneration ]; exist {
294- currentGenerationInt , err := strconv .Atoi (currentGeneration )
295- if err != nil {
296- j .Log .Error (err , "Failed to parse current fuse generation from the ds label" )
297- return nil
298- }
299- newGeneration := strconv .FormatInt (int64 (currentGenerationInt + 1 ), 10 )
300- fusesToUpdate .Spec .Template .Labels [common .LabelRuntimeFuseGeneration ] = newGeneration
301-
302- pvc , err := kubeclient .GetPersistentVolumeClaim (j .Client , j .name , j .namespace )
303- if err != nil {
304- return err
305- }
306-
307- labelsToModify := common.LabelsToModify {}
308- if _ , exist := pvc .Labels [common .LabelRuntimeFuseGeneration ]; exist {
309- labelsToModify .Update (common .LabelRuntimeFuseGeneration , newGeneration )
310- } else {
311- labelsToModify .Add (common .LabelRuntimeFuseGeneration , newGeneration )
312- }
313-
314- if _ , err = utils .PatchLabels (j .Client , pvc , labelsToModify ); err != nil {
315- j .Log .Error (err , fmt .Sprintf ("imageChanged but failed to update image info on pvc %s/%s" , j .namespace , j .name ))
316- }
317- }
318- err = j .Client .Update (context .TODO (), fusesToUpdate )
319- if err != nil {
320- j .Log .Error (err , "Failed to update the ds spec" )
321- }
305+ // volumeMounts
306+ if volumeMountChanged , newVolumeMounts := j .isVolumeMountsChanged (currentValue .Fuse .VolumeMounts , latestValue .Fuse .VolumeMounts ); volumeMountChanged {
307+ j .Log .Info ("syncFuseSpec volumeMountChanged" )
308+ fusesToUpdate .Spec .Template .Spec .Containers [0 ].VolumeMounts = newVolumeMounts
309+ fuseChanged = true
310+ }
322311
323- } else {
324- j .Log .V (1 ).Info ("The fuse is not changed" )
312+ // image
313+ latestFuseImage := latestValue .Fuse .Image
314+ if latestValue .ImageTag != "" {
315+ latestFuseImage = latestFuseImage + ":" + latestValue .Fuse .ImageTag
316+ }
317+
318+ currentFuseImage := currentValue .Fuse .Image
319+ if currentValue .ImageTag != "" {
320+ currentFuseImage = currentFuseImage + ":" + currentValue .Fuse .ImageTag
321+ }
322+
323+ if imageChanged , newImage := j .isImageChanged (currentFuseImage , latestFuseImage ); imageChanged {
324+ j .Log .Info ("syncFuseSpec imageChanged" )
325+ fusesToUpdate .Spec .Template .Spec .Containers [0 ].Image = newImage
326+ fuseChanged , fuseGenerationNeedUpdate = true , true
327+ }
328+ }
329+
330+ return fuseChanged , fuseGenerationNeedUpdate
331+ }
332+
333+ func (j * JuiceFSEngine ) updateFuseCmdConfigmapOnChanged (latestValue * JuiceFS ) error {
334+ // options -> configmap
335+ fuseCommand , err := j .getFuseCommand ()
336+ if err != nil || fuseCommand == "" {
337+ j .Log .Error (err , "Failed to get fuse command" )
338+ return err
339+ }
340+
341+ if cmdChanged , _ := j .isCommandChanged (fuseCommand , latestValue .Fuse .Command ); cmdChanged {
342+ j .Log .Info ("The fuse config is updated" )
343+ if err := j .updateFuseScript (latestValue .Fuse .Command ); err != nil {
344+ j .Log .Error (err , "Failed to update the ds config" )
345+ return err
325346 }
347+ return nil
348+ }
349+ j .Log .V (1 ).Info ("The fuse config is not changed" )
350+ return nil
351+ }
326352
353+ func (j * JuiceFSEngine ) increaseFuseGeneration (fusesToUpdate * appsv1.DaemonSet ) error {
354+ newGeneration := "1"
355+ currentGeneration , exist := fusesToUpdate .Spec .Template .Labels [common .LabelRuntimeFuseGeneration ]
356+ if exist {
357+ currentGenerationInt , err := strconv .Atoi (currentGeneration )
358+ if err != nil {
359+ j .Log .Error (err , "Failed to parse current fuse generation from the ds label" )
360+ return nil
361+ }
362+ newGeneration = strconv .FormatInt (int64 (currentGenerationInt + 1 ), 10 )
363+ }
364+
365+ fusesToUpdate .Spec .Template .Labels [common .LabelRuntimeFuseGeneration ] = newGeneration
366+ pvc , err := kubeclient .GetPersistentVolumeClaim (j .Client , j .name , j .namespace )
367+ if err != nil {
327368 return err
328- })
369+ }
329370
330- if fluiderrs .IsDeprecated (err ) {
331- j .Log .Info ("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset" , "details" , err )
332- return false , nil
371+ labelsToModify := common.LabelsToModify {}
372+ if _ , exist := pvc .Labels [common .LabelRuntimeFuseGeneration ]; exist {
373+ labelsToModify .Update (common .LabelRuntimeFuseGeneration , newGeneration )
374+ } else {
375+ labelsToModify .Add (common .LabelRuntimeFuseGeneration , newGeneration )
333376 }
334- return
377+
378+ if _ , err = utils .PatchLabels (j .Client , pvc , labelsToModify ); err != nil {
379+ j .Log .Error (err , fmt .Sprintf ("imageChanged but failed to update image info on pvc %s/%s" , j .namespace , j .name ))
380+ }
381+ return nil
335382}
336383
337384func (j * JuiceFSEngine ) isVolumeMountsChanged (crtVolumeMounts , runtimeVolumeMounts []corev1.VolumeMount ) (changed bool , newVolumeMounts []corev1.VolumeMount ) {
0 commit comments