Skip to content

Commit c9818d1

Browse files
author
玖宇
committed
Fix unexpected update in juiceruntime
Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
1 parent 21c9735 commit c9818d1

3 files changed

Lines changed: 257 additions & 139 deletions

File tree

pkg/ddc/juicefs/sync_runtime.go

Lines changed: 158 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"time"
2626

27+
appsv1 "k8s.io/api/apps/v1"
2728
corev1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/types"
2930
"k8s.io/client-go/util/retry"
@@ -190,148 +191,194 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run
190191
return
191192
}
192193

193-
func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (changed bool, err error) {
194+
func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, latestValue *JuiceFS) (bool, error) {
194195
j.Log.V(1).Info("syncFuseSpec")
195-
var cmdChanged bool
196-
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
196+
197+
//1. check if fuse cmd configmap needs to update
198+
if err := j.updateFuseCmdConfigmapOnChanged(latestValue); err != nil {
199+
return false, err
200+
}
201+
202+
//2. check if fuse needs to update
203+
var fuseChanged, fuseGenerationNeedIncrease bool
204+
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
197205
fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseName(), j.namespace)
198206
if err != nil {
199207
return err
200208
}
201-
202209
fusesToUpdate := fuses.DeepCopy()
203210

204-
// nodeSelector
205-
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(fusesToUpdate.Spec.Template.Spec.NodeSelector, value.Fuse.NodeSelector); nodeSelectorChanged {
206-
j.Log.Info("syncFuseSpec nodeSelectorChanged")
207-
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
208-
changed = true
211+
currentValue, err := j.GetValues()
212+
if err != nil {
213+
return err
209214
}
210215

211-
// volumes
212-
if volumeChanged, newVolumes := j.isVolumesChanged(fusesToUpdate.Spec.Template.Spec.Volumes, value.Fuse.Volumes); volumeChanged {
213-
j.Log.Info("syncFuseSpec volumeChanged")
214-
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
215-
changed = true
216+
fuseChanged, fuseGenerationNeedIncrease = j.checkAndSetFuseChanges(currentValue, latestValue, runtime, fusesToUpdate)
217+
if !fuseChanged {
218+
j.Log.V(1).Info("The fuse is not changed")
219+
return nil
216220
}
217221

218-
// labels
219-
if labelChanged, newLabels := j.isLabelsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Labels, value.Fuse.Labels); labelChanged {
220-
j.Log.Info("syncFuseSpec labelChanged")
221-
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
222-
changed = true
222+
if reflect.DeepEqual(fuses, fusesToUpdate) {
223+
fuseChanged = false
224+
j.Log.V(1).Info("The fuse is not changed, skip")
225+
return nil
223226
}
227+
j.Log.Info("The fuse changes, need to update")
224228

225-
// annotations
226-
if annoChanged, newAnnos := j.isAnnotationsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Annotations, value.Fuse.Annotations); annoChanged {
227-
j.Log.Info("syncFuseSpec annoChanged")
228-
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
229-
changed = true
229+
if fuseGenerationNeedIncrease {
230+
err := j.increaseFuseGeneration(fusesToUpdate)
231+
if err != nil {
232+
j.Log.Error(err, "Failed to update the fuse generation")
233+
return err
234+
}
230235
}
231236

232-
// options -> configmap
233-
fuseCommand, err := j.getFuseCommand()
234-
if err != nil || fuseCommand == "" {
235-
j.Log.Error(err, "Failed to get fuse command")
237+
if err := j.Client.Update(context.TODO(), fusesToUpdate); err != nil {
238+
j.Log.Error(err, "Failed to update the ds spec")
236239
return err
237240
}
238-
cmdChanged, _ = j.isCommandChanged(fuseCommand, value.Fuse.Command)
239241

240-
if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
241-
// resource
242-
if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged {
243-
j.Log.Info("syncFuseSpec resourcesChanged")
244-
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources
245-
changed = true
246-
}
242+
if err := j.SaveValues(latestValue); err != nil {
243+
j.Log.Error(err, "Failed to update the ds spec")
244+
return err
245+
}
247246

248-
// env
249-
if envChanged, newEnvs := j.isEnvsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Env, value.Fuse.Envs); envChanged {
250-
j.Log.Info("syncFuseSpec envChanged")
251-
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
252-
changed = true
253-
}
247+
return nil
248+
})
249+
if err != nil {
250+
if fluiderrs.IsDeprecated(err) {
251+
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
252+
return false, nil
253+
}
254+
return false, err
255+
}
254256

255-
// volumeMounts
256-
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts, value.Fuse.VolumeMounts); volumeMountChanged {
257-
j.Log.Info("syncFuseSpec volumeMountChanged")
258-
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
259-
changed = true
260-
}
257+
return fuseChanged, nil
258+
}
261259

262-
// image
263-
fuseImage := value.Fuse.Image
264-
if value.ImageTag != "" {
265-
fuseImage = fuseImage + ":" + value.Fuse.ImageTag
266-
}
267-
if imageChanged, newImage := j.isImageChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Image, fuseImage); imageChanged {
268-
j.Log.Info("syncFuseSpec imageChanged")
269-
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
270-
changed = true
271-
}
260+
func (j *JuiceFSEngine) checkAndSetFuseChanges(currentValue, latestValue *JuiceFS, runtime *datav1alpha1.JuiceFSRuntime, fusesToUpdate *appsv1.DaemonSet) (bool, bool) {
261+
var fuseChanged, fuseGenerationNeedUpdate bool
262+
// nodeSelector
263+
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(currentValue.Fuse.NodeSelector, latestValue.Fuse.NodeSelector); nodeSelectorChanged {
264+
j.Log.Info("syncFuseSpec nodeSelectorChanged")
265+
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
266+
fuseChanged = true
267+
}
268+
269+
// volumes
270+
if volumeChanged, newVolumes := j.isVolumesChanged(currentValue.Fuse.Volumes, latestValue.Fuse.Volumes); volumeChanged {
271+
j.Log.Info("syncFuseSpec volumeChanged")
272+
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
273+
fuseChanged = true
274+
}
275+
276+
// labels
277+
if labelChanged, newLabels := j.isLabelsChanged(currentValue.Fuse.Labels, latestValue.Fuse.Labels); labelChanged {
278+
j.Log.Info("syncFuseSpec labelChanged")
279+
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
280+
fuseChanged = true
281+
}
282+
283+
// annotations
284+
if annoChanged, newAnnos := j.isAnnotationsChanged(currentValue.Fuse.Annotations, latestValue.Fuse.Annotations); annoChanged {
285+
j.Log.Info("syncFuseSpec annoChanged")
286+
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
287+
fuseChanged = true
288+
}
289+
290+
if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
291+
// resource
292+
if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged {
293+
j.Log.Info("syncFuseSpec resourcesChanged")
294+
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources
295+
fuseChanged = true
272296
}
273297

274-
if cmdChanged {
275-
j.Log.Info("The fuse config is updated")
276-
err = j.updateFuseScript(value.Fuse.Command)
277-
if err != nil {
278-
j.Log.Error(err, "Failed to update the ds config")
279-
return err
280-
}
281-
} else {
282-
j.Log.V(1).Info("The fuse config is not changed")
298+
// env
299+
if envChanged, newEnvs := j.isEnvsChanged(currentValue.Fuse.Envs, latestValue.Fuse.Envs); envChanged {
300+
j.Log.Info("syncFuseSpec envChanged")
301+
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
302+
fuseChanged = true
283303
}
284304

285-
if changed {
286-
if reflect.DeepEqual(fuses, fusesToUpdate) {
287-
changed = false
288-
j.Log.V(1).Info("The fuse is not changed, skip")
289-
return nil
290-
}
291-
j.Log.Info("The fuse is updated")
292-
293-
if currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]; exist {
294-
currentGenerationInt, err := strconv.Atoi(currentGeneration)
295-
if err != nil {
296-
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
297-
return nil
298-
}
299-
newGeneration := strconv.FormatInt(int64(currentGenerationInt+1), 10)
300-
fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration
301-
302-
pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
303-
if err != nil {
304-
return err
305-
}
306-
307-
labelsToModify := common.LabelsToModify{}
308-
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
309-
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
310-
} else {
311-
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
312-
}
313-
314-
if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
315-
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
316-
}
317-
}
318-
err = j.Client.Update(context.TODO(), fusesToUpdate)
319-
if err != nil {
320-
j.Log.Error(err, "Failed to update the ds spec")
321-
}
305+
// volumeMounts
306+
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(currentValue.Fuse.VolumeMounts, latestValue.Fuse.VolumeMounts); volumeMountChanged {
307+
j.Log.Info("syncFuseSpec volumeMountChanged")
308+
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
309+
fuseChanged = true
310+
}
322311

323-
} else {
324-
j.Log.V(1).Info("The fuse is not changed")
312+
// image
313+
latestFuseImage := latestValue.Fuse.Image
314+
if latestValue.ImageTag != "" {
315+
latestFuseImage = latestFuseImage + ":" + latestValue.Fuse.ImageTag
316+
}
317+
318+
currentFuseImage := currentValue.Fuse.Image
319+
if currentValue.ImageTag != "" {
320+
currentFuseImage = currentFuseImage + ":" + currentValue.Fuse.ImageTag
321+
}
322+
323+
if imageChanged, newImage := j.isImageChanged(currentFuseImage, latestFuseImage); imageChanged {
324+
j.Log.Info("syncFuseSpec imageChanged")
325+
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
326+
fuseChanged, fuseGenerationNeedUpdate = true, true
327+
}
328+
}
329+
330+
return fuseChanged, fuseGenerationNeedUpdate
331+
}
332+
333+
func (j *JuiceFSEngine) updateFuseCmdConfigmapOnChanged(latestValue *JuiceFS) error {
334+
// options -> configmap
335+
fuseCommand, err := j.getFuseCommand()
336+
if err != nil || fuseCommand == "" {
337+
j.Log.Error(err, "Failed to get fuse command")
338+
return err
339+
}
340+
341+
if cmdChanged, _ := j.isCommandChanged(fuseCommand, latestValue.Fuse.Command); cmdChanged {
342+
j.Log.Info("The fuse config is updated")
343+
if err := j.updateFuseScript(latestValue.Fuse.Command); err != nil {
344+
j.Log.Error(err, "Failed to update the ds config")
345+
return err
325346
}
347+
return nil
348+
}
349+
j.Log.V(1).Info("The fuse config is not changed")
350+
return nil
351+
}
326352

353+
func (j *JuiceFSEngine) increaseFuseGeneration(fusesToUpdate *appsv1.DaemonSet) error {
354+
newGeneration := "1"
355+
currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]
356+
if exist {
357+
currentGenerationInt, err := strconv.Atoi(currentGeneration)
358+
if err != nil {
359+
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
360+
return nil
361+
}
362+
newGeneration = strconv.FormatInt(int64(currentGenerationInt+1), 10)
363+
}
364+
365+
fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration
366+
pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
367+
if err != nil {
327368
return err
328-
})
369+
}
329370

330-
if fluiderrs.IsDeprecated(err) {
331-
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
332-
return false, nil
371+
labelsToModify := common.LabelsToModify{}
372+
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
373+
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
374+
} else {
375+
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
333376
}
334-
return
377+
378+
if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
379+
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
380+
}
381+
return nil
335382
}
336383

337384
func (j *JuiceFSEngine) isVolumeMountsChanged(crtVolumeMounts, runtimeVolumeMounts []corev1.VolumeMount) (changed bool, newVolumeMounts []corev1.VolumeMount) {

pkg/ddc/juicefs/sync_runtime_test.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
265265
res := resource.MustParse("320Gi")
266266
type fields struct {
267267
runtime *datav1alpha1.JuiceFSRuntime
268+
helmValue *corev1.ConfigMap
268269
name string
269270
namespace string
270271
}
@@ -286,6 +287,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
286287
name: "emtpy",
287288
namespace: "default",
288289
runtime: &datav1alpha1.JuiceFSRuntime{},
290+
helmValue: &corev1.ConfigMap{
291+
ObjectMeta: metav1.ObjectMeta{
292+
Name: "emtpy-juicefs-values",
293+
Namespace: "default",
294+
},
295+
Data: map[string]string{
296+
"data": "",
297+
},
298+
},
289299
}, args: args{
290300
fuse: &appsv1.DaemonSet{
291301
ObjectMeta: metav1.ObjectMeta{
@@ -302,6 +312,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
302312
name: "nofuse",
303313
namespace: "default",
304314
runtime: &datav1alpha1.JuiceFSRuntime{},
315+
helmValue: &corev1.ConfigMap{
316+
ObjectMeta: metav1.ObjectMeta{
317+
Name: "nofuse-juicefs-values",
318+
Namespace: "default",
319+
},
320+
Data: map[string]string{
321+
"data": "",
322+
},
323+
},
305324
}, args: args{
306325
fuse: &appsv1.DaemonSet{
307326
ObjectMeta: metav1.ObjectMeta{
@@ -337,6 +356,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
337356
},
338357
},
339358
},
359+
helmValue: &corev1.ConfigMap{
360+
ObjectMeta: metav1.ObjectMeta{
361+
Name: "same-juicefs-values",
362+
Namespace: "default",
363+
},
364+
Data: map[string]string{
365+
"data": "",
366+
},
367+
},
340368
}, args: args{
341369
fuse: &appsv1.DaemonSet{
342370
ObjectMeta: metav1.ObjectMeta{
@@ -379,18 +407,21 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
379407
tt.fields.runtime.SetNamespace(tt.fields.namespace)
380408
s.AddKnownTypes(appsv1.SchemeGroupVersion, tt.args.fuse)
381409
s.AddKnownTypes(datav1alpha1.GroupVersion, tt.fields.runtime)
410+
s.AddKnownTypes(corev1.SchemeGroupVersion, tt.fields.helmValue)
382411

383412
_ = corev1.AddToScheme(s)
384413
runtimeObjs = append(runtimeObjs, tt.fields.runtime)
414+
runtimeObjs = append(runtimeObjs, tt.fields.helmValue)
385415
runtimeObjs = append(runtimeObjs, tt.args.fuse)
386416
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)
387417

388418
e := &JuiceFSEngine{
389-
runtime: tt.fields.runtime,
390-
name: tt.fields.name,
391-
namespace: tt.fields.namespace,
392-
Log: fake.NullLogger(),
393-
Client: client,
419+
runtime: tt.fields.runtime,
420+
name: tt.fields.name,
421+
namespace: tt.fields.namespace,
422+
Log: fake.NullLogger(),
423+
Client: client,
424+
engineImpl: "juicefs",
394425
}
395426
value := &JuiceFS{
396427
Fuse: Fuse{},

0 commit comments

Comments
 (0)