Skip to content

Commit f10d123

Browse files
author
jiuyu
committed
Feat: support Runtime Fuse UpdateStrategy - OnIdle
Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
1 parent 13ec658 commit f10d123

45 files changed

Lines changed: 1250 additions & 328 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/v1alpha1/constant.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,16 @@ const (
5353
// OnRuntimeDeletedCleanPolicy cleans fuse pod only when the cache runtime is deleted
5454
OnRuntimeDeletedCleanPolicy FuseCleanPolicy = "OnRuntimeDeleted"
5555
)
56+
57+
type FuseUpdateStrategy string
58+
59+
const (
60+
// NoneFuseUpdateStrategy is the default clean policy. It will be transformed to OnDeleteFuseUpdateStrategy automatically.
61+
NoneFuseUpdateStrategy FuseUpdateStrategy = ""
62+
63+
// OnDeleteFuseUpdateStrategy cleans fuse pod once th fuse pod on some node is deleted
64+
OnDeleteFuseUpdateStrategy FuseUpdateStrategy = "OnDelete"
65+
66+
// OnIdleFuseUpdateStrategy cleans fuse pod once th fuse pod on some node is in idle
67+
OnIdleFuseUpdateStrategy FuseUpdateStrategy = "OnIdle"
68+
)

api/v1alpha1/juicefsruntime_types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ type JuiceFSFuseSpec struct {
164164
// +optional
165165
CleanPolicy FuseCleanPolicy `json:"cleanPolicy,omitempty"`
166166

167+
// UpdateStrategy decides when to update Fuse pods.
168+
// Currently Fluid supports two UpdateStrategy: OnDelete and OnIdle
169+
// OnDelete update fuse pod by native daemonset once the fuse pod on some node is deleted
170+
// OnIdle update fuse pod once the fuse pod on some node is in idle
171+
// Defaults to OnDelete
172+
// +optional
173+
UpdateStrategy FuseUpdateStrategy `json:"updateStrategy,omitempty"`
174+
167175
// PodMetadata defines labels and annotations that will be propagated to JuiceFs's pods.
168176
// +optional
169177
PodMetadata PodMetadata `json:"podMetadata,omitempty"`

api/v1alpha1/openapi_generated.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ spec:
328328
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
329329
type: object
330330
type: object
331+
updateStrategy:
332+
description: |-
333+
UpdateStrategy decides when to update Fuse pods.
334+
Currently Fluid supports two UpdateStrategy: OnDelete and OnIdle
335+
OnDelete update fuse pod by native daemonset once the fuse pod on some node is deleted
336+
OnIdle update fuse pod once the fuse pod on some node is in idle
337+
Defaults to OnDelete
338+
type: string
331339
volumeMounts:
332340
description: VolumeMounts specifies the volumes listed in ".spec.volumes"
333341
to mount into runtime component's filesystem.

charts/fluid/fluid/templates/role/juicefs/rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ rules:
3333
- watch
3434
- create
3535
- update
36+
- patch
3637
- delete
3738
- apiGroups:
3839
- ""

charts/juicefs/templates/fuse/daemonset.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ spec:
3939
{{- end }}
4040
{{- end }}
4141
labels:
42+
fuse.runtime.fluid.io/generation: "1"
4243
sidecar.istio.io/inject: "false"
4344
app: {{ template "juicefs.name" . }}
4445
chart: {{ template "juicefs.chart" . }}

config/crd/bases/data.fluid.io_juicefsruntimes.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ spec:
328328
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
329329
type: object
330330
type: object
331+
updateStrategy:
332+
description: |-
333+
UpdateStrategy decides when to update Fuse pods.
334+
Currently Fluid supports two UpdateStrategy: OnDelete and OnIdle
335+
OnDelete update fuse pod by native daemonset once the fuse pod on some node is deleted
336+
OnIdle update fuse pod once the fuse pod on some node is in idle
337+
Defaults to OnDelete
338+
type: string
331339
volumeMounts:
332340
description: VolumeMounts specifies the volumes listed in ".spec.volumes"
333341
to mount into runtime component's filesystem.

pkg/common/constants.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@ const (
156156
AccelerateCategory Category = "Accelerate"
157157
)
158158

159-
var (
160-
ExpectedFluidAnnotations = map[string]string{
159+
func GetExpectedFluidAnnotations() map[string]string {
160+
return map[string]string{
161161
"CreatedBy": "fluid",
162162
}
163-
)
163+
}
164164

165165
const (
166166
FluidExclusiveKey string = "fluid_exclusive"

pkg/common/label.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ const (
8080
// LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod.
8181
// i.e. fluid.io/datasets-in-use
8282
LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use"
83+
84+
// i.e. fuse.runtime.fluid.io/generation
85+
LabelRuntimeFuseGeneration = "fuse.runtime." + LabelAnnotationPrefix + "generation"
8386
)
8487

8588
const (

pkg/csi/plugins/nodeserver.go

Lines changed: 119 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -291,75 +291,15 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
291291
}
292292
defer ns.locks.Release(volumeId)
293293

294-
// 1. get runtime namespace and name
295-
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
296-
// no volume context attribute.
297-
namespace, name, err := ns.getRuntimeNamespacedName(nil, volumeId)
294+
cleanFuseFunc, err := ns.getCleanFuseFunc(volumeId)
298295
if err != nil {
299-
if utils.IgnoreNotFound(err) == nil {
300-
// For cases like the related persistent volume has been deleted, ignore it and return success
301-
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
302-
return &csi.NodeUnstageVolumeResponse{}, nil
303-
}
304-
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
305-
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
296+
return nil, err
306297
}
307298

308-
// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
309-
// need to clean fuse eagerly.
310-
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
311-
if err != nil {
312-
if utils.IgnoreNotFound(err) == nil {
313-
// For cases like the dataset or runtime has been deleted, ignore it and return success
314-
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
315-
return &csi.NodeUnstageVolumeResponse{}, nil
299+
if cleanFuseFunc != nil {
300+
if err := cleanFuseFunc(); err != nil {
301+
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to clean fuse for volume %s", volumeId)
316302
}
317-
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
318-
}
319-
320-
var shouldCleanFuse bool
321-
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
322-
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
323-
switch cleanPolicy {
324-
case v1alpha1.OnDemandCleanPolicy:
325-
shouldCleanFuse = true
326-
case v1alpha1.OnRuntimeDeletedCleanPolicy:
327-
shouldCleanFuse = false
328-
default:
329-
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
330-
}
331-
332-
if !shouldCleanFuse {
333-
return &csi.NodeUnstageVolumeResponse{}, nil
334-
}
335-
336-
// 3. check if the path is mounted
337-
inUse, err := checkMountInUse(volumeId)
338-
if err != nil {
339-
return nil, errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
340-
}
341-
if inUse {
342-
return nil, fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
343-
}
344-
345-
// 4. remove label on node
346-
// Once the label is removed, fuse pod on corresponding node will be terminated
347-
// since node selector in the fuse daemonSet no longer matches.
348-
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetOwnerDatasetUID())
349-
var labelsToModify common.LabelsToModify
350-
labelsToModify.Delete(fuseLabelKey)
351-
352-
node, err := ns.getNode()
353-
if err != nil {
354-
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
355-
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
356-
}
357-
358-
// _, err = utils.ChangeNodeLabelWithPatchMode(ns.client, node, labelsToModify)
359-
err = ns.patchNodeWithLabel(node, labelsToModify)
360-
if err != nil {
361-
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
362-
return nil, errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
363303
}
364304

365305
return &csi.NodeUnstageVolumeResponse{}, nil
@@ -696,3 +636,117 @@ func checkMountPathExists(ctx context.Context, mountPath string) error {
696636
}
697637
return nil
698638
}
639+
640+
func (ns *nodeServer) getCleanFuseFunc(volumeId string) (func() error, error) {
641+
// 1. get runtime namespace and name from pvc
642+
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
643+
// no volume context attribute.
644+
pvc, err := volume.GetPVCByVolumeId(ns.apiReader, volumeId)
645+
if err != nil {
646+
if utils.IgnoreNotFound(err) == nil {
647+
// For cases like the related persistent volume has been deleted, ignore it and return success
648+
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
649+
return nil, nil
650+
}
651+
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
652+
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
653+
}
654+
namespace, name := pvc.Namespace, pvc.Name
655+
656+
// get latestFuseGeneration from pvc annotations
657+
var latestFuseGeneration string
658+
if pvc.Annotations != nil {
659+
latestFuseGeneration = pvc.Labels[common.LabelRuntimeFuseGeneration]
660+
}
661+
662+
// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
663+
// need to clean fuse eagerly.
664+
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
665+
if err != nil {
666+
if utils.IgnoreNotFound(err) == nil {
667+
// For cases like the dataset or runtime has been deleted, ignore it and return success
668+
glog.Warningf("NodeUnstageVolume: dataset or runtime %s/%s not found, maybe it's already cleaned up", namespace, name)
669+
return nil, nil
670+
}
671+
return nil, errors.Wrapf(err, "NodeUnstageVolume: failed to get runtime info for %s/%s", namespace, name)
672+
}
673+
674+
var shouldCleanFuse bool
675+
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
676+
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
677+
switch cleanPolicy {
678+
case v1alpha1.OnDemandCleanPolicy:
679+
shouldCleanFuse = true
680+
case v1alpha1.OnRuntimeDeletedCleanPolicy:
681+
shouldCleanFuse = false
682+
default:
683+
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
684+
}
685+
686+
//if getCleanFuseFunc == true, fuse pod will be deleted and recreate by NodeStage
687+
updateStrategy := runtimeInfo.GetFuseUpdateStrategy()
688+
glog.Infof("NodeUnstage GetFuseUpdateStrategy %v", updateStrategy)
689+
if updateStrategy == v1alpha1.OnIdleFuseUpdateStrategy {
690+
if checkIfFuseNeedUpdate(runtimeInfo, latestFuseGeneration) {
691+
shouldCleanFuse = true
692+
}
693+
}
694+
695+
if !shouldCleanFuse {
696+
return nil, nil
697+
}
698+
699+
return func() error {
700+
// check if the path is mounted
701+
inUse, err := checkMountInUse(volumeId)
702+
if err != nil {
703+
return errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
704+
}
705+
if inUse {
706+
return fmt.Errorf("NodeUnstageVolume: can't stop fuse cause it's in use")
707+
}
708+
709+
// remove label on node
710+
// Once the label is removed, fuse pod on corresponding node will be terminated
711+
// since node selector in the fuse daemonSet no longer matches.
712+
fuseLabelKey := utils.GetFuseLabelName(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetOwnerDatasetUID())
713+
var labelsToModify common.LabelsToModify
714+
labelsToModify.Delete(fuseLabelKey)
715+
716+
node, err := ns.getNode()
717+
if err != nil {
718+
glog.Errorf("NodeUnstageVolume: can't get node %s: %v", ns.nodeId, err)
719+
return errors.Wrapf(err, "NodeUnstageVolume: can't get node %s", ns.nodeId)
720+
}
721+
722+
if err := ns.patchNodeWithLabel(node, labelsToModify); err != nil {
723+
glog.Errorf("NodeUnstageVolume: error when patching labels on node %s: %v", ns.nodeId, err)
724+
return errors.Wrapf(err, "NodeUnstageVolume: error when patching labels on node %s", ns.nodeId)
725+
}
726+
727+
return nil
728+
}, nil
729+
}
730+
731+
func checkIfFuseNeedUpdate(runtimeInfo base.RuntimeInfoInterface, latestFuseImageVersion string) (needUpdate bool) {
732+
if len(latestFuseImageVersion) == 0 {
733+
return
734+
}
735+
736+
currentImageVersion, err := utils.LoadCurrentFuseGenerationFromMeta(runtimeInfo.GetNamespace(), runtimeInfo.GetName(), runtimeInfo.GetRuntimeType())
737+
glog.Infof("NodeUnstage GetFuseImageVersionFromMetadata %v, %v", currentImageVersion, err)
738+
if err != nil {
739+
glog.Warningf("NodeUnstage GetFuseImageVersionFromMetadata failed %v, skip to update fuse pod", err)
740+
return
741+
}
742+
if len(currentImageVersion) == 0 {
743+
return
744+
}
745+
746+
glog.Infof("NodeUnstage checkIfFuseNeedUpdate currentImageVersion: %v, latestFuseImageVersion: %v", currentImageVersion, latestFuseImageVersion)
747+
if currentImageVersion != latestFuseImageVersion {
748+
needUpdate = true
749+
return
750+
}
751+
return
752+
}

0 commit comments

Comments
 (0)