Skip to content

Commit 55984c1

Browse files
author
jiuyu
committed
Feat: support Runtime Fuse UpdateStrategy - OnIdle
Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
1 parent 13ec658 commit 55984c1

46 files changed

Lines changed: 1201 additions & 338 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/v1alpha1/constant.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,12 @@ const (
4747
// NoneCleanPolicy is the default clean policy. It will be transformed to OnRuntimeDeletedCleanPolicy automatically.
4848
NoneCleanPolicy FuseCleanPolicy = ""
4949

50-
// OnDemandCleanPolicy cleans fuse pod once th fuse pod on some node is not needed
50+
// OnDemandCleanPolicy cleans fuse pod once the fuse pod on some node is not needed
5151
OnDemandCleanPolicy FuseCleanPolicy = "OnDemand"
5252

5353
// OnRuntimeDeletedCleanPolicy cleans fuse pod only when the cache runtime is deleted
5454
OnRuntimeDeletedCleanPolicy FuseCleanPolicy = "OnRuntimeDeleted"
55+
56+
// OnFuseUpdateCleanPolicy cleans fuse pod when the fuse in runtime is updated and the fuse pod on some node is not needed
57+
OnFuseUpdateCleanPolicy FuseCleanPolicy = "OnFuseUpdate"
5558
)

api/v1alpha1/juicefsruntime_types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,11 @@ type JuiceFSFuseSpec struct {
157157
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
158158

159159
// CleanPolicy decides when to clean Juicefs Fuse pods.
160-
// Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
161-
// OnDemand cleans fuse pod once th fuse pod on some node is not needed
160+
// Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseUpdate
161+
// OnDemand cleans fuse pod once the fuse pod on some node is not needed
162162
// OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
163-
// Defaults to OnDemand
163+
// OnFuseUpdate cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
164+
// Defaults to OnRuntimeDeleted
164165
// +optional
165166
CleanPolicy FuseCleanPolicy `json:"cleanPolicy,omitempty"`
166167

api/v1alpha1/openapi_generated.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ spec:
109109
cleanPolicy:
110110
description: |-
111111
CleanPolicy decides when to clean Juicefs Fuse pods.
112-
Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
113-
OnDemand cleans fuse pod once th fuse pod on some node is not needed
112+
Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseUpdate
113+
OnDemand cleans fuse pod once the fuse pod on some node is not needed
114114
OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
115-
Defaults to OnDemand
115+
OnFuseUpdate cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
116+
Defaults to OnRuntimeDeleted
116117
type: string
117118
env:
118119
description: Environment variables that will be used by JuiceFS

charts/fluid/fluid/templates/role/juicefs/rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ rules:
3333
- watch
3434
- create
3535
- update
36+
- patch
3637
- delete
3738
- apiGroups:
3839
- ""

charts/juicefs/templates/fuse/daemonset.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ spec:
3939
{{- end }}
4040
{{- end }}
4141
labels:
42+
fuse.runtime.fluid.io/generation: "1"
4243
sidecar.istio.io/inject: "false"
4344
app: {{ template "juicefs.name" . }}
4445
chart: {{ template "juicefs.chart" . }}

config/crd/bases/data.fluid.io_juicefsruntimes.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ spec:
109109
cleanPolicy:
110110
description: |-
111111
CleanPolicy decides when to clean Juicefs Fuse pods.
112-
Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
113-
OnDemand cleans fuse pod once th fuse pod on some node is not needed
112+
Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseUpdate
113+
OnDemand cleans fuse pod once the fuse pod on some node is not needed
114114
OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
115-
Defaults to OnDemand
115+
OnFuseUpdate cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
116+
Defaults to OnRuntimeDeleted
116117
type: string
117118
env:
118119
description: Environment variables that will be used by JuiceFS

pkg/common/constants.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@ const (
156156
AccelerateCategory Category = "Accelerate"
157157
)
158158

159-
var (
160-
ExpectedFluidAnnotations = map[string]string{
159+
func GetExpectedFluidAnnotations() map[string]string {
160+
return map[string]string{
161161
"CreatedBy": "fluid",
162162
}
163-
)
163+
}
164164

165165
const (
166166
FluidExclusiveKey string = "fluid_exclusive"

pkg/common/label.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ const (
8080
// LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod.
8181
// i.e. fluid.io/datasets-in-use
8282
LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use"
83+
84+
// i.e. fuse.runtime.fluid.io/generation
85+
LabelRuntimeFuseGeneration = "fuse.runtime." + LabelAnnotationPrefix + "generation"
8386
)
8487

8588
const (

pkg/csi/plugins/nodeserver.go

Lines changed: 115 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -291,75 +291,15 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
291291
}
292292
defer ns.locks.Release(volumeId)
293293

294-
// 1. get runtime namespace and name
295-
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
296-
// no volume context attribute.
297-
namespace, name, err := ns.getRuntimeNamespacedName(nil, volumeId)
294+
cleanFuseFunc, err := ns.getCleanFuseFunc(volumeId)
298295
if err != nil {
299-
if utils.IgnoreNotFound(err) == nil {
300-
// For cases like the related persistent volume has been deleted, ignore it and return success
301-
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
302-
return &csi.NodeUnstageVolumeResponse{}, nil
303-
}
304-
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
305-
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
296+
return nil, err
306297
}
307298

308-
// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
309-
// need to clean fuse eagerly.
310-
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
311-
if err != nil {
312-
if utils.IgnoreNotFound(err) == nil {
313-
// For cases like the dataset or runtime has been deleted, ignore it and return success
314-
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
315-
return &csi.NodeUnstageVolumeResponse{}, nil
299+
if cleanFuseFunc != nil {
300+
if err := cleanFuseFunc(); err != nil {
301+
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to clean fuse for volume %s", volumeId)
316302
}
317-
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
318-
}
319-
320-
var shouldCleanFuse bool
321-
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
322-
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
323-
switch cleanPolicy {
324-
case v1alpha1.OnDemandCleanPolicy:
325-
shouldCleanFuse = true
326-
case v1alpha1.OnRuntimeDeletedCleanPolicy:
327-
shouldCleanFuse = false
328-
default:
329-
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
330-
}
331-
332-
if !shouldCleanFuse {
333-
return &csi.NodeUnstageVolumeResponse{}, nil
334-
}
335-
336-
// 3. check if the path is mounted
337-
inUse, err := checkMountInUse(volumeId)
338-
if err != nil {
339-
return nil, errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
340-
}
341-
if inUse {
342-
return nil, fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
343-
}
344-
345-
// 4. remove label on node
346-
// Once the label is removed, fuse pod on corresponding node will be terminated
347-
// since node selector in the fuse daemonSet no longer matches.
348-
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetOwnerDatasetUID())
349-
var labelsToModify common.LabelsToModify
350-
labelsToModify.Delete(fuseLabelKey)
351-
352-
node, err := ns.getNode()
353-
if err != nil {
354-
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
355-
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
356-
}
357-
358-
// _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify)
359-
err = ns.patchNodeWithLabel(node, labelsToModify)
360-
if err != nil {
361-
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
362-
return nil, errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
363303
}
364304

365305
return &csi.NodeUnstageVolumeResponse{}, nil
@@ -696,3 +636,113 @@ func checkMountPathExists(ctx context.Context, mountPath string) error {
696636
}
697637
return nil
698638
}
639+
640+
func (ns *nodeServer) getCleanFuseFunc(volumeId string) (func() error, error) {
641+
// 1. get runtime namespace and name from pvc
642+
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
643+
// no volume context attribute.
644+
pvc, err := volume.GetPVCByVolumeId(ns.apiReader, volumeId)
645+
if err != nil {
646+
if utils.IgnoreNotFound(err) == nil {
647+
// For cases like the related persistent volume has been deleted, ignore it and return success
648+
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
649+
return nil, nil
650+
}
651+
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
652+
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
653+
}
654+
namespace, name := pvc.Namespace, pvc.Name
655+
656+
// get latestFuseGeneration from pvc annotations
657+
var latestFuseGeneration string
658+
if pvc.Labels != nil {
659+
latestFuseGeneration = pvc.Labels[common.LabelRuntimeFuseGeneration]
660+
}
661+
662+
// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
663+
// need to clean fuse eagerly.
664+
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
665+
if err != nil {
666+
if utils.IgnoreNotFound(err) == nil {
667+
// For cases like the dataset or runtime has been deleted, ignore it and return success
668+
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
669+
return nil, nil
670+
}
671+
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
672+
}
673+
674+
var shouldCleanFuse bool
675+
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
676+
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
677+
switch cleanPolicy {
678+
case v1alpha1.OnDemandCleanPolicy:
679+
shouldCleanFuse = true
680+
case v1alpha1.OnRuntimeDeletedCleanPolicy:
681+
shouldCleanFuse = false
682+
case v1alpha1.OnFuseUpdateCleanPolicy:
683+
if checkIfFuseNeedUpdate(runtimeInfo, latestFuseGeneration) {
684+
shouldCleanFuse = true
685+
}
686+
default:
687+
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
688+
}
689+
690+
//if getCleanFuseFunc == true, fuse pod will be deleted and recreate by NodeStage
691+
if !shouldCleanFuse {
692+
return nil, nil
693+
}
694+
695+
return func() error {
696+
// check if the path is mounted
697+
inUse, err := checkMountInUse(volumeId)
698+
if err != nil {
699+
return errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
700+
}
701+
if inUse {
702+
return fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
703+
}
704+
705+
// remove label on node
706+
// Once the label is removed, fuse pod on corresponding node will be terminated
707+
// since node selector in the fuse daemonSet no longer matches.
708+
fuseLabelKey := utils.GetFuseLabelName(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetOwnerDatasetUID())
709+
var labelsToModify common.LabelsToModify
710+
labelsToModify.Delete(fuseLabelKey)
711+
712+
node, err := ns.getNode()
713+
if err != nil {
714+
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
715+
return errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
716+
}
717+
718+
if err := ns.patchNodeWithLabel(node, labelsToModify); err != nil {
719+
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
720+
return errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
721+
}
722+
723+
return nil
724+
}, nil
725+
}
726+
727+
func checkIfFuseNeedUpdate(runtimeInfo base.RuntimeInfoInterface, latestFuseImageVersion string) (needUpdate bool) {
728+
if len(latestFuseImageVersion) == 0 {
729+
return
730+
}
731+
732+
currentImageVersion, err := utils.LoadCurrentFuseGenerationFromMeta(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetRuntimeType())
733+
glog.Infof("NodeUnstage GetFuseImageVersionFromMetadata %v, %v", currentImageVersion, err)
734+
if err != nil {
735+
glog.Warningf("NodeUnstage GetFuseImageVersionFromMetadata failed %v, skip to update fuse pod", err)
736+
return
737+
}
738+
if len(currentImageVersion) == 0 {
739+
return
740+
}
741+
742+
glog.Infof("NodeUnstage checkIfFuseNeedUpdate currentImageVersion: %v, latestFuseImageVersion: %v", currentImageVersion, latestFuseImageVersion)
743+
if currentImageVersion != latestFuseImageVersion {
744+
needUpdate = true
745+
return
746+
}
747+
return
748+
}

0 commit comments

Comments
 (0)