Skip to content

Commit 9f9fbd3

Browse files
author
玖宇
committed
Fix unexpected update in juiceruntime
Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
1 parent 21c9735 commit 9f9fbd3

3 files changed

Lines changed: 139 additions & 42 deletions

File tree

pkg/ddc/juicefs/sync_runtime.go

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"time"
2626

27+
appsv1 "k8s.io/api/apps/v1"
2728
corev1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/types"
2930
"k8s.io/client-go/util/retry"
@@ -190,40 +191,44 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run
190191
return
191192
}
192193

193-
func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (changed bool, err error) {
194+
func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, latestValue *JuiceFS) (changed bool, err error) {
194195
j.Log.V(1).Info("syncFuseSpec")
195-
var cmdChanged bool
196+
var cmdChanged, generationNeedUpdate bool
196197
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
197198
fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseName(), j.namespace)
198199
if err != nil {
199200
return err
200201
}
201-
202202
fusesToUpdate := fuses.DeepCopy()
203203

204+
currentValue, err := j.GetValues()
205+
if err != nil {
206+
return err
207+
}
208+
204209
// nodeSelector
205-
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(fusesToUpdate.Spec.Template.Spec.NodeSelector, value.Fuse.NodeSelector); nodeSelectorChanged {
210+
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(currentValue.Fuse.NodeSelector, latestValue.Fuse.NodeSelector); nodeSelectorChanged {
206211
j.Log.Info("syncFuseSpec nodeSelectorChanged")
207212
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
208213
changed = true
209214
}
210215

211216
// volumes
212-
if volumeChanged, newVolumes := j.isVolumesChanged(fusesToUpdate.Spec.Template.Spec.Volumes, value.Fuse.Volumes); volumeChanged {
217+
if volumeChanged, newVolumes := j.isVolumesChanged(currentValue.Fuse.Volumes, latestValue.Fuse.Volumes); volumeChanged {
213218
j.Log.Info("syncFuseSpec volumeChanged")
214219
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
215220
changed = true
216221
}
217222

218223
// labels
219-
if labelChanged, newLabels := j.isLabelsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Labels, value.Fuse.Labels); labelChanged {
224+
if labelChanged, newLabels := j.isLabelsChanged(currentValue.Fuse.Labels, latestValue.Fuse.Labels); labelChanged {
220225
j.Log.Info("syncFuseSpec labelChanged")
221226
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
222227
changed = true
223228
}
224229

225230
// annotations
226-
if annoChanged, newAnnos := j.isAnnotationsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Annotations, value.Fuse.Annotations); annoChanged {
231+
if annoChanged, newAnnos := j.isAnnotationsChanged(currentValue.Fuse.Annotations, latestValue.Fuse.Annotations); annoChanged {
227232
j.Log.Info("syncFuseSpec annoChanged")
228233
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
229234
changed = true
@@ -235,7 +240,7 @@ func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runti
235240
j.Log.Error(err, "Failed to get fuse command")
236241
return err
237242
}
238-
cmdChanged, _ = j.isCommandChanged(fuseCommand, value.Fuse.Command)
243+
cmdChanged, _ = j.isCommandChanged(fuseCommand, latestValue.Fuse.Command)
239244

240245
if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
241246
// resource
@@ -246,34 +251,40 @@ func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runti
246251
}
247252

248253
// env
249-
if envChanged, newEnvs := j.isEnvsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Env, value.Fuse.Envs); envChanged {
254+
if envChanged, newEnvs := j.isEnvsChanged(currentValue.Fuse.Envs, latestValue.Fuse.Envs); envChanged {
250255
j.Log.Info("syncFuseSpec envChanged")
251256
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
252257
changed = true
253258
}
254259

255260
// volumeMounts
256-
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts, value.Fuse.VolumeMounts); volumeMountChanged {
261+
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(currentValue.Fuse.VolumeMounts, latestValue.Fuse.VolumeMounts); volumeMountChanged {
257262
j.Log.Info("syncFuseSpec volumeMountChanged")
258263
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
259264
changed = true
260265
}
261266

262267
// image
263-
fuseImage := value.Fuse.Image
264-
if value.ImageTag != "" {
265-
fuseImage = fuseImage + ":" + value.Fuse.ImageTag
268+
latestFuseImage := latestValue.Fuse.Image
269+
if latestValue.ImageTag != "" {
270+
latestFuseImage = latestFuseImage + ":" + latestValue.Fuse.ImageTag
271+
}
272+
273+
currentFuseImage := currentValue.Fuse.Image
274+
if currentValue.ImageTag != "" {
275+
currentFuseImage = currentFuseImage + ":" + currentValue.Fuse.ImageTag
266276
}
267-
if imageChanged, newImage := j.isImageChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Image, fuseImage); imageChanged {
277+
278+
if imageChanged, newImage := j.isImageChanged(currentFuseImage, latestFuseImage); imageChanged {
268279
j.Log.Info("syncFuseSpec imageChanged")
269280
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
270-
changed = true
281+
changed, generationNeedUpdate = true, true
271282
}
272283
}
273284

274285
if cmdChanged {
275286
j.Log.Info("The fuse config is updated")
276-
err = j.updateFuseScript(value.Fuse.Command)
287+
err = j.updateFuseScript(latestValue.Fuse.Command)
277288
if err != nil {
278289
j.Log.Error(err, "Failed to update the ds config")
279290
return err
@@ -290,36 +301,24 @@ func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runti
290301
}
291302
j.Log.Info("The fuse is updated")
292303

293-
if currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]; exist {
294-
currentGenerationInt, err := strconv.Atoi(currentGeneration)
295-
if err != nil {
296-
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
297-
return nil
298-
}
299-
newGeneration := strconv.FormatInt(int64(currentGenerationInt+1), 10)
300-
fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration
301-
302-
pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
304+
if generationNeedUpdate {
305+
err := j.updateFuseGeneration(fusesToUpdate)
303306
if err != nil {
307+
j.Log.Error(err, "Failed to update the fuse generation")
304308
return err
305309
}
306-
307-
labelsToModify := common.LabelsToModify{}
308-
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
309-
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
310-
} else {
311-
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
312-
}
313-
314-
if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
315-
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
316-
}
317310
}
311+
318312
err = j.Client.Update(context.TODO(), fusesToUpdate)
319313
if err != nil {
320314
j.Log.Error(err, "Failed to update the ds spec")
321315
}
322316

317+
err = j.SaveValues(latestValue)
318+
if err != nil {
319+
j.Log.Error(err, "Failed to update the ds spec")
320+
}
321+
323322
} else {
324323
j.Log.V(1).Info("The fuse is not changed")
325324
}
@@ -334,6 +333,37 @@ func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runti
334333
return
335334
}
336335

336+
func (j *JuiceFSEngine) updateFuseGeneration(fusesToUpdate *appsv1.DaemonSet) error {
337+
newGeneration := "1"
338+
currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]
339+
if exist {
340+
currentGenerationInt, err := strconv.Atoi(currentGeneration)
341+
if err != nil {
342+
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
343+
return nil
344+
}
345+
newGeneration = strconv.FormatInt(int64(currentGenerationInt+1), 10)
346+
}
347+
348+
fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration
349+
pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
350+
if err != nil {
351+
return err
352+
}
353+
354+
labelsToModify := common.LabelsToModify{}
355+
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
356+
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
357+
} else {
358+
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
359+
}
360+
361+
if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
362+
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
363+
}
364+
return nil
365+
}
366+
337367
func (j *JuiceFSEngine) isVolumeMountsChanged(crtVolumeMounts, runtimeVolumeMounts []corev1.VolumeMount) (changed bool, newVolumeMounts []corev1.VolumeMount) {
338368
mounts := make(map[string]corev1.VolumeMount)
339369
for _, mount := range crtVolumeMounts {

pkg/ddc/juicefs/sync_runtime_test.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
265265
res := resource.MustParse("320Gi")
266266
type fields struct {
267267
runtime *datav1alpha1.JuiceFSRuntime
268+
helmValue *corev1.ConfigMap
268269
name string
269270
namespace string
270271
}
@@ -286,6 +287,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
286287
name: "emtpy",
287288
namespace: "default",
288289
runtime: &datav1alpha1.JuiceFSRuntime{},
290+
helmValue: &corev1.ConfigMap{
291+
ObjectMeta: metav1.ObjectMeta{
292+
Name: "emtpy-juicefs-values",
293+
Namespace: "default",
294+
},
295+
Data: map[string]string{
296+
"data": "",
297+
},
298+
},
289299
}, args: args{
290300
fuse: &appsv1.DaemonSet{
291301
ObjectMeta: metav1.ObjectMeta{
@@ -302,6 +312,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
302312
name: "nofuse",
303313
namespace: "default",
304314
runtime: &datav1alpha1.JuiceFSRuntime{},
315+
helmValue: &corev1.ConfigMap{
316+
ObjectMeta: metav1.ObjectMeta{
317+
Name: "nofuse-juicefs-values",
318+
Namespace: "default",
319+
},
320+
Data: map[string]string{
321+
"data": "",
322+
},
323+
},
305324
}, args: args{
306325
fuse: &appsv1.DaemonSet{
307326
ObjectMeta: metav1.ObjectMeta{
@@ -337,6 +356,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
337356
},
338357
},
339358
},
359+
helmValue: &corev1.ConfigMap{
360+
ObjectMeta: metav1.ObjectMeta{
361+
Name: "same-juicefs-values",
362+
Namespace: "default",
363+
},
364+
Data: map[string]string{
365+
"data": "",
366+
},
367+
},
340368
}, args: args{
341369
fuse: &appsv1.DaemonSet{
342370
ObjectMeta: metav1.ObjectMeta{
@@ -379,18 +407,21 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
379407
tt.fields.runtime.SetNamespace(tt.fields.namespace)
380408
s.AddKnownTypes(appsv1.SchemeGroupVersion, tt.args.fuse)
381409
s.AddKnownTypes(datav1alpha1.GroupVersion, tt.fields.runtime)
410+
s.AddKnownTypes(corev1.SchemeGroupVersion, tt.fields.helmValue)
382411

383412
_ = corev1.AddToScheme(s)
384413
runtimeObjs = append(runtimeObjs, tt.fields.runtime)
414+
runtimeObjs = append(runtimeObjs, tt.fields.helmValue)
385415
runtimeObjs = append(runtimeObjs, tt.args.fuse)
386416
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)
387417

388418
e := &JuiceFSEngine{
389-
runtime: tt.fields.runtime,
390-
name: tt.fields.name,
391-
namespace: tt.fields.namespace,
392-
Log: fake.NullLogger(),
393-
Client: client,
419+
runtime: tt.fields.runtime,
420+
name: tt.fields.name,
421+
namespace: tt.fields.namespace,
422+
Log: fake.NullLogger(),
423+
Client: client,
424+
engineImpl: "juicefs",
394425
}
395426
value := &JuiceFS{
396427
Fuse: Fuse{},

pkg/ddc/juicefs/utils.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,42 @@ func (j *JuiceFSEngine) GetValuesConfigMap() (cm *corev1.ConfigMap, err error) {
210210
return
211211
}
212212

213+
func (j *JuiceFSEngine) GetValues() (JuiceFS, error) {
214+
helmValueConfigMap, err := j.GetValuesConfigMap()
215+
if err != nil {
216+
return JuiceFS{}, err
217+
}
218+
if helmValueConfigMap == nil {
219+
return JuiceFS{}, fmt.Errorf("helm value %s not found", j.getHelmValuesConfigMapName())
220+
}
221+
helmValue, exist := helmValueConfigMap.Data["data"]
222+
if !exist {
223+
return JuiceFS{}, fmt.Errorf("data in helm value %s do not exist", j.getHelmValuesConfigMapName())
224+
}
225+
var currentValue JuiceFS
226+
if err := yaml.Unmarshal([]byte(helmValue), &currentValue); err != nil {
227+
return JuiceFS{}, err
228+
}
229+
return currentValue, nil
230+
}
231+
232+
func (j *JuiceFSEngine) SaveValues(value *JuiceFS) error {
233+
helmValueConfigMap, err := j.GetValuesConfigMap()
234+
if err != nil {
235+
return err
236+
}
237+
data, err := yaml.Marshal(value)
238+
if err != nil {
239+
return err
240+
}
241+
helmValueConfigMap.Data["data"] = string(data)
242+
err = kubeclient.UpdateConfigMap(j.Client, helmValueConfigMap)
243+
if err != nil {
244+
return err
245+
}
246+
return nil
247+
}
248+
213249
func (j *JuiceFSEngine) GetEdition() (edition string) {
214250
cm, err := j.GetValuesConfigMap()
215251
if err != nil || cm == nil {

0 commit comments

Comments
 (0)