Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/v1alpha1/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ const (
// NoneCleanPolicy is the default clean policy. It will be transformed to OnRuntimeDeletedCleanPolicy automatically.
NoneCleanPolicy FuseCleanPolicy = ""

// OnDemandCleanPolicy cleans fuse pod once th fuse pod on some node is not needed
// OnDemandCleanPolicy cleans fuse pod once the fuse pod on some node is not needed
OnDemandCleanPolicy FuseCleanPolicy = "OnDemand"

// OnRuntimeDeletedCleanPolicy cleans fuse pod only when the cache runtime is deleted
OnRuntimeDeletedCleanPolicy FuseCleanPolicy = "OnRuntimeDeleted"

// OnFuseChangedCleanPolicy cleans fuse pod when the fuse in runtime is updated and the fuse pod on some node is not needed
OnFuseChangedCleanPolicy FuseCleanPolicy = "OnFuseChanged"
)
7 changes: 4 additions & 3 deletions api/v1alpha1/juicefsruntime_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ type JuiceFSFuseSpec struct {
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

// CleanPolicy decides when to clean Juicefs Fuse pods.
// Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
// OnDemand cleans fuse pod once th fuse pod on some node is not needed
// Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseChangedCleanPolicy
// OnDemand cleans fuse pod once the fuse pod on some node is not needed
// OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
// Defaults to OnDemand
// OnFuseChangedCleanPolicy cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
// Defaults to OnRuntimeDeleted
// +optional
CleanPolicy FuseCleanPolicy `json:"cleanPolicy,omitempty"`

Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ spec:
cleanPolicy:
description: |-
CleanPolicy decides when to clean Juicefs Fuse pods.
Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
OnDemand cleans fuse pod once th fuse pod on some node is not needed
Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseChangedCleanPolicy
OnDemand cleans fuse pod once the fuse pod on some node is not needed
OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
Defaults to OnDemand
OnFuseChangedCleanPolicy cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
Defaults to OnRuntimeDeleted
type: string
env:
description: Environment variables that will be used by JuiceFS
Expand Down
1 change: 1 addition & 0 deletions charts/fluid/fluid/templates/role/juicefs/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ rules:
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
Expand Down
1 change: 1 addition & 0 deletions charts/juicefs/templates/fuse/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ spec:
{{- end }}
{{- end }}
labels:
fuse.runtime.fluid.io/generation: "1"
sidecar.istio.io/inject: "false"
app: {{ template "juicefs.name" . }}
chart: {{ template "juicefs.chart" . }}
Expand Down
7 changes: 4 additions & 3 deletions config/crd/bases/data.fluid.io_juicefsruntimes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ spec:
cleanPolicy:
description: |-
CleanPolicy decides when to clean Juicefs Fuse pods.
Currently Fluid supports two policies: OnDemand and OnRuntimeDeleted
OnDemand cleans fuse pod once th fuse pod on some node is not needed
Currently Fluid supports three policies: OnDemand, OnRuntimeDeleted and OnFuseChangedCleanPolicy
OnDemand cleans fuse pod once the fuse pod on some node is not needed
OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted
Defaults to OnDemand
OnFuseChangedCleanPolicy cleans fuse pod once the fuse pod on some node is not needed and the fuse in runtime is updated
Defaults to OnRuntimeDeleted
type: string
env:
description: Environment variables that will be used by JuiceFS
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ const (
AccelerateCategory Category = "Accelerate"
)

var (
ExpectedFluidAnnotations = map[string]string{
func GetExpectedFluidAnnotations() map[string]string {
return map[string]string{
"CreatedBy": "fluid",
}
)
}

const (
FluidExclusiveKey string = "fluid_exclusive"
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
// LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod.
// i.e. fluid.io/datasets-in-use
LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use"

// i.e. fuse.runtime.fluid.io/generation
LabelRuntimeFuseGeneration = "fuse.runtime." + LabelAnnotationPrefix + "generation"
Comment thread
Syspretor marked this conversation as resolved.
)

const (
Expand Down
180 changes: 115 additions & 65 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,75 +291,15 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
defer ns.locks.Release(volumeId)

// 1. get runtime namespace and name
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
// no volume context attribute.
namespace, name, err := ns.getRuntimeNamespacedName(nil, volumeId)
cleanFuseFunc, err := ns.getCleanFuseFunc(volumeId)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
// For cases like the related persistent volume has been deleted, ignore it and return success
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
return &csi.NodeUnstageVolumeResponse{}, nil
}
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
return nil, err
}

// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
// need to clean fuse eagerly.
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
// For cases like the dataset or runtime has been deleted, ignore it and return success
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
return &csi.NodeUnstageVolumeResponse{}, nil
if cleanFuseFunc != nil {
if err := cleanFuseFunc(); err != nil {
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to clean fuse for volume %s", volumeId)
}
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
}

var shouldCleanFuse bool
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
switch cleanPolicy {
case v1alpha1.OnDemandCleanPolicy:
shouldCleanFuse = true
case v1alpha1.OnRuntimeDeletedCleanPolicy:
shouldCleanFuse = false
default:
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
}

if !shouldCleanFuse {
return &csi.NodeUnstageVolumeResponse{}, nil
}

// 3. check if the path is mounted
inUse, err := checkMountInUse(volumeId)
if err != nil {
return nil, errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
}
if inUse {
return nil, fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
}

// 4. remove label on node
// Once the label is removed, fuse pod on corresponding node will be terminated
// since node selector in the fuse daemonSet no longer matches.
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetOwnerDatasetUID())
var labelsToModify common.LabelsToModify
labelsToModify.Delete(fuseLabelKey)

node, err := ns.getNode()
if err != nil {
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
}

// _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify)
err = ns.patchNodeWithLabel(node, labelsToModify)
if err != nil {
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
}

return &csi.NodeUnstageVolumeResponse{}, nil
Expand Down Expand Up @@ -696,3 +636,113 @@ func checkMountPathExists(ctx context.Context, mountPath string) error {
}
return nil
}

func (ns *nodeServer) getCleanFuseFunc(volumeId string) (func() error, error) {
// 1. get runtime namespace and name from pvc
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
// no volume context attribute.
pvc, err := volume.GetPVCByVolumeId(ns.apiReader, volumeId)
Comment thread
Syspretor marked this conversation as resolved.
if err != nil {
if utils.IgnoreNotFound(err) == nil {
// For cases like the related persistent volume has been deleted, ignore it and return success
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
return nil, nil
}
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
}
namespace, name := pvc.Namespace, pvc.Name

// get latestFuseGeneration from pvc annotations
var latestFuseGeneration string
if pvc.Labels != nil {
latestFuseGeneration = pvc.Labels[common.LabelRuntimeFuseGeneration]
}

// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
// need to clean fuse eagerly.
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
// For cases like the dataset or runtime has been deleted, ignore it and return success
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
return nil, nil
}
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
}

var shouldCleanFuse bool
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
switch cleanPolicy {
case v1alpha1.OnDemandCleanPolicy:
shouldCleanFuse = true
case v1alpha1.OnRuntimeDeletedCleanPolicy:
shouldCleanFuse = false
case v1alpha1.OnFuseChangedCleanPolicy:
if checkIfFuseNeedUpdate(runtimeInfo, latestFuseGeneration) {
shouldCleanFuse = true
}
default:
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
}

//if getCleanFuseFunc == true, fuse pod will be deleted and recreate by NodeStage
if !shouldCleanFuse {
return nil, nil
}

return func() error {
// check if the path is mounted
inUse, err := checkMountInUse(volumeId)
if err != nil {
return errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
}
if inUse {
return fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
}

// remove label on node
// Once the label is removed, fuse pod on corresponding node will be terminated
// since node selector in the fuse daemonSet no longer matches.
fuseLabelKey := utils.GetFuseLabelName(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetOwnerDatasetUID())
var labelsToModify common.LabelsToModify
labelsToModify.Delete(fuseLabelKey)

node, err := ns.getNode()
if err != nil {
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
return errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
}

if err := ns.patchNodeWithLabel(node, labelsToModify); err != nil {
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
return errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
}

return nil
}, nil
}

func checkIfFuseNeedUpdate(runtimeInfo base.RuntimeInfoInterface, latestFuseImageVersion string) (needUpdate bool) {
if len(latestFuseImageVersion) == 0 {
return
}

currentImageVersion, err := utils.LoadCurrentFuseGenerationFromMeta(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetRuntimeType())
glog.Infof("NodeUnstage GetFuseImageVersionFromMetadata %v, %v", currentImageVersion, err)
if err != nil {
glog.Warningf("NodeUnstage GetFuseImageVersionFromMetadata failed %v, skip to update fuse pod", err)
return
}
if len(currentImageVersion) == 0 {
return
}

glog.Infof("NodeUnstage checkIfFuseNeedUpdate currentImageVersion: %v, latestFuseImageVersion: %v", currentImageVersion, latestFuseImageVersion)
if currentImageVersion != latestFuseImageVersion {
needUpdate = true
return
}
return
}
Loading
Loading