diff --git a/pkg/ddc/juicefs/sync_runtime.go b/pkg/ddc/juicefs/sync_runtime.go index dec69f41e93..0523e720b05 100644 --- a/pkg/ddc/juicefs/sync_runtime.go +++ b/pkg/ddc/juicefs/sync_runtime.go @@ -24,6 +24,7 @@ import ( "strings" "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -190,148 +191,194 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run return } -func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (changed bool, err error) { +func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, latestValue *JuiceFS) (bool, error) { j.Log.V(1).Info("syncFuseSpec") - var cmdChanged bool - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + + //1. check if fuse cmd configmap needs to update + if err := j.updateFuseCmdConfigmapOnChanged(latestValue); err != nil { + return false, err + } + + //2. check if fuse needs to update + var fuseChanged, fuseGenerationNeedIncrease bool + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseName(), j.namespace) if err != nil { return err } - fusesToUpdate := fuses.DeepCopy() - // nodeSelector - if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(fusesToUpdate.Spec.Template.Spec.NodeSelector, value.Fuse.NodeSelector); nodeSelectorChanged { - j.Log.Info("syncFuseSpec nodeSelectorChanged") - fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector - changed = true + currentValue, err := j.GetValueFromConfigmap() + if err != nil { + return err } - // volumes - if volumeChanged, newVolumes := j.isVolumesChanged(fusesToUpdate.Spec.Template.Spec.Volumes, value.Fuse.Volumes); volumeChanged { - j.Log.Info("syncFuseSpec volumeChanged") - fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes - changed = true + fuseChanged, fuseGenerationNeedIncrease = j.checkAndSetFuseChanges(currentValue, latestValue, runtime, fusesToUpdate) + if !fuseChanged { + j.Log.V(1).Info("The fuse is not changed") + return nil } - // labels - if labelChanged, newLabels := j.isLabelsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Labels, value.Fuse.Labels); labelChanged { - j.Log.Info("syncFuseSpec labelChanged") - fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels - changed = true + if reflect.DeepEqual(fuses, fusesToUpdate) { + fuseChanged = false + j.Log.V(1).Info("The fuse is not changed, skip") + return nil } + j.Log.Info("The fuse changes, need to update") - // annotations - if annoChanged, newAnnos := j.isAnnotationsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Annotations, value.Fuse.Annotations); annoChanged { - j.Log.Info("syncFuseSpec annoChanged") - fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos - changed = true + if fuseGenerationNeedIncrease { + err := j.increaseFuseGeneration(fusesToUpdate) + if err != nil { + j.Log.Error(err, "Failed to update the fuse generation") + return err + } } - // options -> configmap - fuseCommand, err := j.getFuseCommand() - if err != nil || fuseCommand == "" { - j.Log.Error(err, "Failed to get fuse command") + if err := j.Client.Update(context.TODO(), fusesToUpdate); err != nil { + j.Log.Error(err, "Failed to update the ds spec") return err } - cmdChanged, _ = j.isCommandChanged(fuseCommand, value.Fuse.Command) - if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 { - // resource - if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged { - j.Log.Info("syncFuseSpec resourcesChanged") - fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources - changed = true - } + if err := j.SaveValueToConfigmap(latestValue); err != nil { + j.Log.Error(err, "Failed to update the ds spec") + return err + } - // env - if envChanged, newEnvs := j.isEnvsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Env, value.Fuse.Envs); envChanged { - j.Log.Info("syncFuseSpec envChanged") - fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs - changed = true - } + return nil + }) + if err != nil { + if fluiderrs.IsDeprecated(err) { + j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err) + return false, nil + } + return false, err + } - // volumeMounts - if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts, value.Fuse.VolumeMounts); volumeMountChanged { - j.Log.Info("syncFuseSpec volumeMountChanged") - fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts - changed = true - } + return fuseChanged, nil +} - // image - fuseImage := value.Fuse.Image - if value.ImageTag != "" { - fuseImage = fuseImage + ":" + value.Fuse.ImageTag - } - if imageChanged, newImage := j.isImageChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Image, fuseImage); imageChanged { - j.Log.Info("syncFuseSpec imageChanged") - fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage - changed = true - } +func (j *JuiceFSEngine) checkAndSetFuseChanges(currentValue, latestValue *JuiceFS, runtime *datav1alpha1.JuiceFSRuntime, fusesToUpdate *appsv1.DaemonSet) (bool, bool) { + var fuseChanged, fuseGenerationNeedUpdate bool + // nodeSelector + if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(currentValue.Fuse.NodeSelector, latestValue.Fuse.NodeSelector); nodeSelectorChanged { + j.Log.Info("syncFuseSpec nodeSelectorChanged") + fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector + fuseChanged = true + } + + // volumes + if volumeChanged, newVolumes := j.isVolumesChanged(currentValue.Fuse.Volumes, latestValue.Fuse.Volumes); volumeChanged { + j.Log.Info("syncFuseSpec volumeChanged") + fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes + fuseChanged = true + } + + // labels + if labelChanged, newLabels := j.isLabelsChanged(currentValue.Fuse.Labels, latestValue.Fuse.Labels); labelChanged { + j.Log.Info("syncFuseSpec labelChanged") + fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels + fuseChanged = true + } + + // annotations + if annoChanged, newAnnos := j.isAnnotationsChanged(currentValue.Fuse.Annotations, latestValue.Fuse.Annotations); annoChanged { + j.Log.Info("syncFuseSpec annoChanged") + fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos + fuseChanged = true + } + + if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 { + // resource + if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged { + j.Log.Info("syncFuseSpec resourcesChanged") + fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources + fuseChanged = true } - if cmdChanged { - j.Log.Info("The fuse config is updated") - err = j.updateFuseScript(value.Fuse.Command) - if err != nil { - j.Log.Error(err, "Failed to update the ds config") - return err - } - } else { - j.Log.V(1).Info("The fuse config is not changed") + // env + if envChanged, newEnvs := j.isEnvsChanged(currentValue.Fuse.Envs, latestValue.Fuse.Envs); envChanged { + j.Log.Info("syncFuseSpec envChanged") + fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs + fuseChanged = true } - if changed { - if reflect.DeepEqual(fuses, fusesToUpdate) { - changed = false - j.Log.V(1).Info("The fuse is not changed, skip") - return nil - } - j.Log.Info("The fuse is updated") - - if currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]; exist { - currentGenerationInt, err := strconv.Atoi(currentGeneration) - if err != nil { - j.Log.Error(err, "Failed to parse current fuse generation from the ds label") - return nil - } - newGeneration := strconv.FormatInt(int64(currentGenerationInt+1), 10) - fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration - - pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace) - if err != nil { - return err - } - - labelsToModify := common.LabelsToModify{} - if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist { - labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration) - } else { - labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration) - } - - if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil { - j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name)) - } - } - err = j.Client.Update(context.TODO(), fusesToUpdate) - if err != nil { - j.Log.Error(err, "Failed to update the ds spec") - } + // volumeMounts + if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(currentValue.Fuse.VolumeMounts, latestValue.Fuse.VolumeMounts); volumeMountChanged { + j.Log.Info("syncFuseSpec volumeMountChanged") + fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts + fuseChanged = true + } - } else { - j.Log.V(1).Info("The fuse is not changed") + // image + latestFuseImage := latestValue.Fuse.Image + if latestValue.ImageTag != "" { + latestFuseImage = latestFuseImage + ":" + latestValue.Fuse.ImageTag + } + + currentFuseImage := currentValue.Fuse.Image + if currentValue.ImageTag != "" { + currentFuseImage = currentFuseImage + ":" + currentValue.Fuse.ImageTag + } + + if imageChanged, newImage := j.isImageChanged(currentFuseImage, latestFuseImage); imageChanged { + j.Log.Info("syncFuseSpec imageChanged") + fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage + fuseChanged, fuseGenerationNeedUpdate = true, true + } + } + + return fuseChanged, fuseGenerationNeedUpdate +} + +func (j *JuiceFSEngine) updateFuseCmdConfigmapOnChanged(latestValue *JuiceFS) error { + // options -> configmap + fuseCommand, err := j.getFuseCommand() + if err != nil || fuseCommand == "" { + j.Log.Error(err, "Failed to get fuse command") + return err + } + + if cmdChanged, _ := j.isCommandChanged(fuseCommand, latestValue.Fuse.Command); cmdChanged { + j.Log.Info("The fuse config is updated") + if err := j.updateFuseScript(latestValue.Fuse.Command); err != nil { + j.Log.Error(err, "Failed to update the ds config") + return err } + return nil + } + j.Log.V(1).Info("The fuse config is not changed") + return nil +} +func (j *JuiceFSEngine) increaseFuseGeneration(fusesToUpdate *appsv1.DaemonSet) error { + newGeneration := "1" + currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] + if exist { + currentGenerationInt, err := strconv.Atoi(currentGeneration) + if err != nil { + j.Log.Error(err, "Failed to parse current fuse generation from the ds label") + return nil + } + newGeneration = strconv.FormatInt(int64(currentGenerationInt+1), 10) + } + + fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration + pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace) + if err != nil { return err - }) + } - if fluiderrs.IsDeprecated(err) { - j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err) - return false, nil + labelsToModify := common.LabelsToModify{} + if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist { + labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration) + } else { + labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration) } - return + + if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil { + j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name)) + } + return nil } func (j *JuiceFSEngine) isVolumeMountsChanged(crtVolumeMounts, runtimeVolumeMounts []corev1.VolumeMount) (changed bool, newVolumeMounts []corev1.VolumeMount) { diff --git a/pkg/ddc/juicefs/sync_runtime_test.go b/pkg/ddc/juicefs/sync_runtime_test.go index 507181eafba..136ecd15069 100644 --- a/pkg/ddc/juicefs/sync_runtime_test.go +++ b/pkg/ddc/juicefs/sync_runtime_test.go @@ -265,6 +265,7 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) { res := resource.MustParse("320Gi") type fields struct { runtime *datav1alpha1.JuiceFSRuntime + helmValue *corev1.ConfigMap name string namespace string } @@ -286,6 +287,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) { name: "emtpy", namespace: "default", runtime: &datav1alpha1.JuiceFSRuntime{}, + helmValue: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "emtpy-juicefs-values", + Namespace: "default", + }, + Data: map[string]string{ + "data": "", + }, + }, }, args: args{ fuse: &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ @@ -302,6 +312,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) { name: "nofuse", namespace: "default", runtime: &datav1alpha1.JuiceFSRuntime{}, + helmValue: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nofuse-juicefs-values", + Namespace: "default", + }, + Data: map[string]string{ + "data": "", + }, + }, }, args: args{ fuse: &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ @@ -337,6 +356,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) { }, }, }, + helmValue: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "same-juicefs-values", + Namespace: "default", + }, + Data: map[string]string{ + "data": "", + }, + }, }, args: args{ fuse: &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ @@ -379,18 +407,21 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) { tt.fields.runtime.SetNamespace(tt.fields.namespace) s.AddKnownTypes(appsv1.SchemeGroupVersion, tt.args.fuse) s.AddKnownTypes(datav1alpha1.GroupVersion, tt.fields.runtime) + s.AddKnownTypes(corev1.SchemeGroupVersion, tt.fields.helmValue) _ = corev1.AddToScheme(s) runtimeObjs = append(runtimeObjs, tt.fields.runtime) + runtimeObjs = append(runtimeObjs, tt.fields.helmValue) runtimeObjs = append(runtimeObjs, tt.args.fuse) client := fake.NewFakeClientWithScheme(s, runtimeObjs...) e := &JuiceFSEngine{ - runtime: tt.fields.runtime, - name: tt.fields.name, - namespace: tt.fields.namespace, - Log: fake.NullLogger(), - Client: client, + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Log: fake.NullLogger(), + Client: client, + engineImpl: "juicefs", } value := &JuiceFS{ Fuse: Fuse{}, diff --git a/pkg/ddc/juicefs/utils.go b/pkg/ddc/juicefs/utils.go index 93cb9bfd1c9..7e22ab68229 100644 --- a/pkg/ddc/juicefs/utils.go +++ b/pkg/ddc/juicefs/utils.go @@ -19,6 +19,7 @@ package juicefs import ( "context" "fmt" + "k8s.io/client-go/util/retry" "regexp" "strconv" "strings" @@ -210,6 +211,43 @@ func (j *JuiceFSEngine) GetValuesConfigMap() (cm *corev1.ConfigMap, err error) { return } +func (j *JuiceFSEngine) GetValueFromConfigmap() (*JuiceFS, error) { + helmValueConfigMap, err := j.GetValuesConfigMap() + if err != nil { + return nil, err + } + if helmValueConfigMap == nil { + return nil, fmt.Errorf("helm value %s not found", j.getHelmValuesConfigMapName()) + } + helmValue, exist := helmValueConfigMap.Data["data"] + if !exist { + return nil, fmt.Errorf("data in helm value %s do not exist", j.getHelmValuesConfigMapName()) + } + var currentValue JuiceFS + if err := yaml.Unmarshal([]byte(helmValue), ¤tValue); err != nil { + return nil, err + } + return ¤tValue, nil +} + +func (j *JuiceFSEngine) SaveValueToConfigmap(value *JuiceFS) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + helmValueConfigMap, err := j.GetValuesConfigMap() + if err != nil { + return err + } + if helmValueConfigMap == nil { + return fmt.Errorf("helm value %s not found", j.getHelmValuesConfigMapName()) + } + data, err := yaml.Marshal(value) + if err != nil { + return err + } + helmValueConfigMap.Data["data"] = string(data) + return kubeclient.UpdateConfigMap(j.Client, helmValueConfigMap) + }) +} + func (j *JuiceFSEngine) GetEdition() (edition string) { cm, err := j.GetValuesConfigMap() if err != nil || cm == nil { @@ -415,31 +453,33 @@ func (j JuiceFSEngine) updateWorkerScript(command string) error { } func (j JuiceFSEngine) updateFuseScript(command string) error { - cm, err := kubeclient.GetConfigmapByName(j.Client, j.getFuseScriptName(), j.namespace) - if err != nil { - return err - } - if cm == nil { - j.Log.Info("value configMap not found") - return nil - } - data := cm.Data - script := data["script.sh"] - - newScript := script - newScripts := strings.Split(newScript, "\n") - // mount command is the last one, replace it - for i := len(newScripts) - 1; i >= 0; i-- { - if newScripts[i] != "" { - newScripts[i] = command - break + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + cm, err := kubeclient.GetConfigmapByName(j.Client, j.getFuseScriptName(), j.namespace) + if err != nil { + return err + } + if cm == nil { + j.Log.Info("value configMap not found") + return nil + } + data := cm.Data + script := data["script.sh"] + + newScript := script + newScripts := strings.Split(newScript, "\n") + // mount command is the last one, replace it + for i := len(newScripts) - 1; i >= 0; i-- { + if newScripts[i] != "" { + newScripts[i] = command + break + } } - } - newValues := make(map[string]string) - newValues["script.sh"] = strings.Join(newScripts, "\n") - cm.Data = newValues - return j.Client.Update(context.Background(), cm) + newValues := make(map[string]string) + newValues["script.sh"] = strings.Join(newScripts, "\n") + cm.Data = newValues + return j.Client.Update(context.Background(), cm) + }) } func (j *JuiceFSEngine) getWorkerScriptName() string {