Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 158 additions & 111 deletions pkg/ddc/juicefs/sync_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -190,148 +191,194 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run
return
}

func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (changed bool, err error) {
func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, latestValue *JuiceFS) (bool, error) {
j.Log.V(1).Info("syncFuseSpec")
var cmdChanged bool
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {

//1. check if fuse cmd configmap needs to update
if err := j.updateFuseCmdConfigmapOnChanged(latestValue); err != nil {
return false, err
}

//2. check if fuse needs to update
var fuseChanged, fuseGenerationNeedIncrease bool
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseName(), j.namespace)
if err != nil {
return err
}

fusesToUpdate := fuses.DeepCopy()

// nodeSelector
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(fusesToUpdate.Spec.Template.Spec.NodeSelector, value.Fuse.NodeSelector); nodeSelectorChanged {
j.Log.Info("syncFuseSpec nodeSelectorChanged")
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
changed = true
currentValue, err := j.GetValueFromConfigmap()
if err != nil {
return err
}

// volumes
if volumeChanged, newVolumes := j.isVolumesChanged(fusesToUpdate.Spec.Template.Spec.Volumes, value.Fuse.Volumes); volumeChanged {
j.Log.Info("syncFuseSpec volumeChanged")
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
changed = true
fuseChanged, fuseGenerationNeedIncrease = j.checkAndSetFuseChanges(currentValue, latestValue, runtime, fusesToUpdate)
if !fuseChanged {
j.Log.V(1).Info("The fuse is not changed")
return nil
}

// labels
if labelChanged, newLabels := j.isLabelsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Labels, value.Fuse.Labels); labelChanged {
j.Log.Info("syncFuseSpec labelChanged")
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
changed = true
if reflect.DeepEqual(fuses, fusesToUpdate) {
fuseChanged = false
j.Log.V(1).Info("The fuse is not changed, skip")
return nil
}
j.Log.Info("The fuse changes, need to update")

// annotations
if annoChanged, newAnnos := j.isAnnotationsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Annotations, value.Fuse.Annotations); annoChanged {
j.Log.Info("syncFuseSpec annoChanged")
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
changed = true
if fuseGenerationNeedIncrease {
err := j.increaseFuseGeneration(fusesToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the fuse generation")
return err
}
}

// options -> configmap
fuseCommand, err := j.getFuseCommand()
if err != nil || fuseCommand == "" {
j.Log.Error(err, "Failed to get fuse command")
if err := j.Client.Update(context.TODO(), fusesToUpdate); err != nil {
j.Log.Error(err, "Failed to update the ds spec")
return err
}
cmdChanged, _ = j.isCommandChanged(fuseCommand, value.Fuse.Command)

if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
// resource
if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged {
j.Log.Info("syncFuseSpec resourcesChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources
changed = true
}
if err := j.SaveValueToConfigmap(latestValue); err != nil {
j.Log.Error(err, "Failed to update the ds spec")
return err
}

// env
if envChanged, newEnvs := j.isEnvsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Env, value.Fuse.Envs); envChanged {
j.Log.Info("syncFuseSpec envChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
changed = true
}
return nil
})
if err != nil {
if fluiderrs.IsDeprecated(err) {
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
return false, nil
}
return false, err
}

// volumeMounts
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts, value.Fuse.VolumeMounts); volumeMountChanged {
j.Log.Info("syncFuseSpec volumeMountChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
changed = true
}
return fuseChanged, nil
}

// image
fuseImage := value.Fuse.Image
if value.ImageTag != "" {
fuseImage = fuseImage + ":" + value.Fuse.ImageTag
}
if imageChanged, newImage := j.isImageChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Image, fuseImage); imageChanged {
j.Log.Info("syncFuseSpec imageChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
changed = true
}
func (j *JuiceFSEngine) checkAndSetFuseChanges(currentValue, latestValue *JuiceFS, runtime *datav1alpha1.JuiceFSRuntime, fusesToUpdate *appsv1.DaemonSet) (bool, bool) {
var fuseChanged, fuseGenerationNeedUpdate bool
// nodeSelector
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(currentValue.Fuse.NodeSelector, latestValue.Fuse.NodeSelector); nodeSelectorChanged {
j.Log.Info("syncFuseSpec nodeSelectorChanged")
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
fuseChanged = true
}

// volumes
if volumeChanged, newVolumes := j.isVolumesChanged(currentValue.Fuse.Volumes, latestValue.Fuse.Volumes); volumeChanged {
j.Log.Info("syncFuseSpec volumeChanged")
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
fuseChanged = true
}

// labels
if labelChanged, newLabels := j.isLabelsChanged(currentValue.Fuse.Labels, latestValue.Fuse.Labels); labelChanged {
j.Log.Info("syncFuseSpec labelChanged")
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
fuseChanged = true
}

// annotations
if annoChanged, newAnnos := j.isAnnotationsChanged(currentValue.Fuse.Annotations, latestValue.Fuse.Annotations); annoChanged {
j.Log.Info("syncFuseSpec annoChanged")
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
fuseChanged = true
}

if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
// resource
if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged {
j.Log.Info("syncFuseSpec resourcesChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources
fuseChanged = true
}

if cmdChanged {
j.Log.Info("The fuse config is updated")
err = j.updateFuseScript(value.Fuse.Command)
if err != nil {
j.Log.Error(err, "Failed to update the ds config")
return err
}
} else {
j.Log.V(1).Info("The fuse config is not changed")
// env
if envChanged, newEnvs := j.isEnvsChanged(currentValue.Fuse.Envs, latestValue.Fuse.Envs); envChanged {
j.Log.Info("syncFuseSpec envChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
fuseChanged = true
}

if changed {
if reflect.DeepEqual(fuses, fusesToUpdate) {
changed = false
j.Log.V(1).Info("The fuse is not changed, skip")
return nil
}
j.Log.Info("The fuse is updated")

if currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]; exist {
currentGenerationInt, err := strconv.Atoi(currentGeneration)
if err != nil {
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
return nil
}
newGeneration := strconv.FormatInt(int64(currentGenerationInt+1), 10)
fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration

pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
if err != nil {
return err
}

labelsToModify := common.LabelsToModify{}
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
} else {
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
}

if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
}
}
err = j.Client.Update(context.TODO(), fusesToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the ds spec")
}
// volumeMounts
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(currentValue.Fuse.VolumeMounts, latestValue.Fuse.VolumeMounts); volumeMountChanged {
j.Log.Info("syncFuseSpec volumeMountChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
fuseChanged = true
}

} else {
j.Log.V(1).Info("The fuse is not changed")
// image
latestFuseImage := latestValue.Fuse.Image
if latestValue.ImageTag != "" {
latestFuseImage = latestFuseImage + ":" + latestValue.Fuse.ImageTag
}

currentFuseImage := currentValue.Fuse.Image
if currentValue.ImageTag != "" {
currentFuseImage = currentFuseImage + ":" + currentValue.Fuse.ImageTag
}

if imageChanged, newImage := j.isImageChanged(currentFuseImage, latestFuseImage); imageChanged {
j.Log.Info("syncFuseSpec imageChanged")
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
fuseChanged, fuseGenerationNeedUpdate = true, true
}
}

return fuseChanged, fuseGenerationNeedUpdate
}

func (j *JuiceFSEngine) updateFuseCmdConfigmapOnChanged(latestValue *JuiceFS) error {
// options -> configmap
fuseCommand, err := j.getFuseCommand()
if err != nil || fuseCommand == "" {
j.Log.Error(err, "Failed to get fuse command")
return err
}

if cmdChanged, _ := j.isCommandChanged(fuseCommand, latestValue.Fuse.Command); cmdChanged {
j.Log.Info("The fuse config is updated")
if err := j.updateFuseScript(latestValue.Fuse.Command); err != nil {
j.Log.Error(err, "Failed to update the ds config")
return err
}
return nil
}
j.Log.V(1).Info("The fuse config is not changed")
return nil
}

func (j *JuiceFSEngine) increaseFuseGeneration(fusesToUpdate *appsv1.DaemonSet) error {
newGeneration := "1"
currentGeneration, exist := fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration]
if exist {
currentGenerationInt, err := strconv.Atoi(currentGeneration)
if err != nil {
j.Log.Error(err, "Failed to parse current fuse generation from the ds label")
return nil
}
newGeneration = strconv.FormatInt(int64(currentGenerationInt+1), 10)
}

fusesToUpdate.Spec.Template.Labels[common.LabelRuntimeFuseGeneration] = newGeneration
pvc, err := kubeclient.GetPersistentVolumeClaim(j.Client, j.name, j.namespace)
if err != nil {
return err
})
}

if fluiderrs.IsDeprecated(err) {
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
return false, nil
labelsToModify := common.LabelsToModify{}
if _, exist := pvc.Labels[common.LabelRuntimeFuseGeneration]; exist {
labelsToModify.Update(common.LabelRuntimeFuseGeneration, newGeneration)
} else {
labelsToModify.Add(common.LabelRuntimeFuseGeneration, newGeneration)
}
return

if _, err = utils.PatchLabels(j.Client, pvc, labelsToModify); err != nil {
j.Log.Error(err, fmt.Sprintf("imageChanged but failed to update image info on pvc %s/%s", j.namespace, j.name))
}
return nil
}

func (j *JuiceFSEngine) isVolumeMountsChanged(crtVolumeMounts, runtimeVolumeMounts []corev1.VolumeMount) (changed bool, newVolumeMounts []corev1.VolumeMount) {
Expand Down
41 changes: 36 additions & 5 deletions pkg/ddc/juicefs/sync_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
res := resource.MustParse("320Gi")
type fields struct {
runtime *datav1alpha1.JuiceFSRuntime
helmValue *corev1.ConfigMap
name string
namespace string
}
Expand All @@ -286,6 +287,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
name: "emtpy",
namespace: "default",
runtime: &datav1alpha1.JuiceFSRuntime{},
helmValue: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "emtpy-juicefs-values",
Namespace: "default",
},
Data: map[string]string{
"data": "",
},
},
}, args: args{
fuse: &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -302,6 +312,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
name: "nofuse",
namespace: "default",
runtime: &datav1alpha1.JuiceFSRuntime{},
helmValue: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "nofuse-juicefs-values",
Namespace: "default",
},
Data: map[string]string{
"data": "",
},
},
}, args: args{
fuse: &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -337,6 +356,15 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
},
},
},
helmValue: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "same-juicefs-values",
Namespace: "default",
},
Data: map[string]string{
"data": "",
},
},
}, args: args{
fuse: &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -379,18 +407,21 @@ func TestJuiceFSxEngine_syncFuseSpec(t *testing.T) {
tt.fields.runtime.SetNamespace(tt.fields.namespace)
s.AddKnownTypes(appsv1.SchemeGroupVersion, tt.args.fuse)
s.AddKnownTypes(datav1alpha1.GroupVersion, tt.fields.runtime)
s.AddKnownTypes(corev1.SchemeGroupVersion, tt.fields.helmValue)

_ = corev1.AddToScheme(s)
runtimeObjs = append(runtimeObjs, tt.fields.runtime)
runtimeObjs = append(runtimeObjs, tt.fields.helmValue)
runtimeObjs = append(runtimeObjs, tt.args.fuse)
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)

e := &JuiceFSEngine{
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Log: fake.NullLogger(),
Client: client,
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Log: fake.NullLogger(),
Client: client,
engineImpl: "juicefs",
}
value := &JuiceFS{
Fuse: Fuse{},
Expand Down
Loading
Loading