Skip to content
11 changes: 5 additions & 6 deletions pkg/application/inject/fuse/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ func (s *Injector) inject(in runtime.Object, runtimeInfos map[string]base.Runtim
continue
}

platform := s.getServerlessPlatformFromMeta(podSpecs.MetaObj)
if len(platform) == 0 {
return out, fmt.Errorf("can't find any supported platform-specific mutator in pod's metadata")
platform, err := s.getServerlessPlatformFromMeta(podSpecs.MetaObj)
if err != nil {
return out, fmt.Errorf("fail to get serverless platform from pod's meta: %v", err)
}

mutatorBuildArgs := mutator.MutatorBuildArgs{
Expand All @@ -173,7 +173,6 @@ func (s *Injector) inject(in runtime.Object, runtimeInfos map[string]base.Runtim
Specs: podSpecs,
Options: common.FuseSidecarInjectOption{
EnableCacheDir: utils.InjectCacheDirEnabled(podSpecs.MetaObj.Labels),
EnableUnprivilegedSidecar: utils.FuseSidecarUnprivileged(podSpecs.MetaObj.Labels),
SkipSidecarPostStartInject: utils.SkipSidecarPostStartInject(podSpecs.MetaObj.Labels),
},
ExtraArgs: mutator.FindExtraArgsFromMetadata(podSpecs.MetaObj, platform),
Expand Down Expand Up @@ -257,6 +256,6 @@ func (s *Injector) shouldInject(pod common.FluidObject) (should bool, err error)
return should, nil
}

func (s *Injector) getServerlessPlatformFromMeta(metaObj metav1.ObjectMeta) string {
return utils.GetServerlessPlatform(metaObj.Labels)
func (s *Injector) getServerlessPlatformFromMeta(metaObj metav1.ObjectMeta) (string, error) {
return utils.GetServerlessPlatform(metaObj)
}
4 changes: 2 additions & 2 deletions pkg/application/inject/fuse/mutator/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (args MutatorBuildArgs) String() string {
}

var mutatorBuildFn map[string]func(MutatorBuildArgs) Mutator = map[string]func(MutatorBuildArgs) Mutator{
utils.PlatformDefault: NewDefaultMutator,
utils.PlatformUnprivileged: NewUnprivilegedMutator,
utils.ServerlessPlatformDefault: NewDefaultMutator,
utils.ServerlessPlatformUnprivileged: NewUnprivilegedMutator,
}

func BuildMutator(args MutatorBuildArgs, platform string) (Mutator, error) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ const (
)

const (
EnvServerlessPlatformKey = "KEY_SERVERLESS_PLATFORM"
EnvServerlessPlatformVal = "VALUE_SERVERLESS_PLATFORM"
EnvDisableApplicationController = "KEY_DISABLE_APP_CONTROLLER"
EnvImagePullSecretsKey = "IMAGE_PULL_SECRETS"
// DEPRECATED: env variable for Fluid webhook to determine the serverless platform.
// Use commmon.AnnotationServerlessPlatform instead.
DeprecatedEnvServerlessPlatformKey = "KEY_SERVERLESS_PLATFORM"
EnvDisableApplicationController = "KEY_DISABLE_APP_CONTROLLER"
EnvImagePullSecretsKey = "IMAGE_PULL_SECRETS"
)

const (
Expand Down
6 changes: 6 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ const (
AnnotationDataFlowCustomizedAffinityPrefix = "affinity.dataflow.fluid.io."
)

const (
// AnnotationServerlessPlatform is an annotation key name for the platform type of serverless.
// i.e. serverless.fluid.io/platform
AnnotationServerlessPlatform = "serverless." + LabelAnnotationPrefix + "platform"
)

var (
// LabelAnnotationPodSchedRegex is the fluid cache label for scheduling pod, format: 'fluid.io/dataset.{dataset name}.sched]'
// use string literal to meet security check.
Expand Down
4 changes: 1 addition & 3 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,12 @@ type FuseMountInfo struct {
// FuseSidecarInjectOption are options for webhook to inject fuse sidecar containers
type FuseSidecarInjectOption struct {
EnableCacheDir bool
EnableUnprivilegedSidecar bool
SkipSidecarPostStartInject bool
}

func (f FuseSidecarInjectOption) String() string {
return fmt.Sprintf("EnableCacheDir=%v;EnableUnprivilegedSidecar=%v;SkipSidecarPostStartInject=%v",
return fmt.Sprintf("EnableCacheDir=%v;SkipSidecarPostStartInject=%v",
f.EnableCacheDir,
f.EnableUnprivilegedSidecar,
f.SkipSidecarPostStartInject)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ctrl/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func ShouldInQueue(pod *corev1.Pod) bool {
}

// ignore if it's not fluid label pod
if !utils.FuseSidecarPrivileged(pod.Labels) {
if !utils.FuseSidecarPrivileged(pod.ObjectMeta) {
log.Info("Privileged fuse sidecar is not enabled.", "labels", pod.Labels)
return false
}
Expand Down
124 changes: 64 additions & 60 deletions pkg/utils/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@ limitations under the License.
package utils

import (
"fmt"
stdlog "log"
"os"

"github.com/fluid-cloudnative/fluid/pkg/common"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
ServerlessPlatformKey string = ""
ServerlessPlatformVal string = ""
disableApplicationController string = ""
// DEPRECATED: the label key for Fluid webhook to determine serverless platform.
// It's replaced by commmon.AnnotationServerlessPlatform.
DeprecatedServerlessPlatformKey string = ""
disableApplicationController string = ""
)

func init() {
if envVal, exists := os.LookupEnv(common.EnvServerlessPlatformKey); exists {
ServerlessPlatformKey = envVal
stdlog.Printf("Found %s value %s, using it as ServerlessPlatformLabelKey", common.EnvServerlessPlatformKey, envVal)
}
if envVal, exists := os.LookupEnv(common.EnvServerlessPlatformVal); exists {
ServerlessPlatformVal = envVal
stdlog.Printf("Found %s value %s, using it as ServerlessPlatformLabelValue", common.EnvServerlessPlatformVal, envVal)
if envVal, exists := os.LookupEnv(common.DeprecatedEnvServerlessPlatformKey); exists {
DeprecatedServerlessPlatformKey = envVal
stdlog.Printf("Found %s value %s, using it as ServerlessPlatformLabelKey", common.DeprecatedEnvServerlessPlatformKey, envVal)
}
if envVal, exists := os.LookupEnv(common.EnvDisableApplicationController); exists {
disableApplicationController = envVal
Expand All @@ -62,7 +61,7 @@ func InjectCacheDirEnabled(infos map[string]string) (match bool) {
}

func SkipSidecarPostStartInject(infos map[string]string) (match bool) {
return matchedValue(infos, common.InjectSidecarPostStart, common.False)
return KeyValueMatched(infos, common.InjectSidecarPostStart, common.False)
}

func AppContainerPostStartInjectEnabled(infos map[string]string) (match bool) {
Expand All @@ -71,62 +70,89 @@ func AppContainerPostStartInjectEnabled(infos map[string]string) (match bool) {

// ---- Utils functions to decide serverless platform ----
const (
PlatformDefault = "Default"
PlatformUnprivileged = "Unprivileged"
ServerlessPlatformDefault = "default"
ServerlessPlatformUnprivileged = "unprivileged"
)

func GetServerlessPlatform(infos map[string]string) (platform string) {
if matchedKey(infos, ServerlessPlatformKey) {
return infos[ServerlessPlatformKey]
func GetServerlessPlatform(metaObj metav1.ObjectMeta) (platform string, err error) {
metaLabels := metaObj.Labels
metaAnnotations := metaObj.Annotations

// Setting both DeprecatedServerlessPlatformKey and common.InjectServerless is not allowed
if KeyMatched(metaLabels, DeprecatedServerlessPlatformKey) && enabled(metaLabels, common.InjectServerless) {
err = fmt.Errorf("\"%s\" and \"%s\" is not allowed to set together, remove \"%s\" and retry", DeprecatedServerlessPlatformKey, common.InjectServerless, DeprecatedServerlessPlatformKey)
return
}

// handle deprecated serverless platform key.
if KeyMatched(metaLabels, DeprecatedServerlessPlatformKey) {
platform = metaLabels[DeprecatedServerlessPlatformKey]
return
}

if enabled(infos, common.InjectServerless) || enabled(infos, common.InjectFuseSidecar) {
if enabled(infos, common.InjectUnprivilegedFuseSidecar) {
return PlatformUnprivileged
// handle deprecated common.InjectFuseSidecar. In this case,
// only two platforms are supported: PlatformDefault and PlatformUnprivileged
if enabled(metaLabels, common.InjectFuseSidecar) {
if enabled(metaLabels, common.InjectUnprivilegedFuseSidecar) {
platform = ServerlessPlatformUnprivileged
} else {
return PlatformDefault
platform = ServerlessPlatformDefault
}
return
}

if enabled(metaLabels, common.InjectServerless) {
if enabled(metaLabels, common.InjectUnprivilegedFuseSidecar) {
platform = ServerlessPlatformUnprivileged
return
}

// Setting common.InjectServerless in labels and common.AnnotationServerlessPlatform in annotations
// together to indicate the serverless platform
if KeyMatched(metaAnnotations, common.AnnotationServerlessPlatform) {
platform = metaAnnotations[common.AnnotationServerlessPlatform]
return
}

platform = ServerlessPlatformDefault
return
}

// default to an empty platform, meaning no platform is found
return ""
return "", fmt.Errorf("no serverless platform can be found from Pod's metadata")
}

// ServerlessEnabled decides if fuse sidecar should be injected, whether privileged or unprivileged
// - serverlessPlatform implies injecting unprivileged fuse sidecar
// - serverless.fluid.io/inject=true implies injecting (privileged/unprivileged) fuse sidecar,
// We don't have to know which serverless platform it is using here.
// - serverless.fluid.io/inject=true implies injecting fuse sidecar.
// - [deprecated] serverlessPlatform implies injecting fuse sidecar according to the deprecated env variable. It's deprecated by common.AnnotationServerlessPlatform.
// - [deprecated] fuse.sidecar.fluid.io/inject=true is the deprecated version of serverless.fluid.io/inject=true
func ServerlessEnabled(infos map[string]string) (match bool) {
return serverlessPlatformMatched(infos) || enabled(infos, common.InjectServerless) || enabled(infos, common.InjectFuseSidecar)
}

// FuseSidecarUnprivileged decides if the injected fuse sidecar should be unprivileged, only used when fuse sidecar should be injected
// - serverlessPlatform implies injecting unprivileged fuse sidecar
// - serverless.fluid.io/inject=true + unprivileged.sidecar.fluid.io/inject=true implies injecting unprivileged fuse sidecar,
func FuseSidecarUnprivileged(infos map[string]string) (match bool) {
return serverlessPlatformMatched(infos) || (ServerlessEnabled(infos) && enabled(infos, common.InjectUnprivilegedFuseSidecar))
return enabled(infos, common.InjectServerless) || serverlessPlatformMatched(infos) || enabled(infos, common.InjectFuseSidecar)
}

// FuseSidecarPrivileged decides if the injected fuse sidecar should be privileged, only used when fuse sidecar should be injected
// - sidecar is privileged only when setting serverless.fluid.io/inject=true without unprivileged.sidecar.fluid.io/inject=true
func FuseSidecarPrivileged(infos map[string]string) (match bool) {
return enabled(infos, common.InjectServerless) && !(enabled(infos, common.InjectUnprivilegedFuseSidecar))
// TODO: The func is used for Fluid App controller to determine if it's a pod should be watched. It could be better to use another way(e.g. a special label)to indicate this.
func FuseSidecarPrivileged(metaObj metav1.ObjectMeta) (match bool) {
// error can be ignored here because platform equals to "" when error is not nil
platform, _ := GetServerlessPlatform(metaObj)
return platform == ServerlessPlatformDefault
}

func InjectSidecarDone(infos map[string]string) (match bool) {
return enabled(infos, common.InjectSidecarDone)
}

func AppControllerDisabled(info map[string]string) (match bool) {
return matchedKey(info, disableApplicationController)
return KeyMatched(info, disableApplicationController)
}

func serverlessPlatformMatched(infos map[string]string) (match bool) {
if len(ServerlessPlatformKey) == 0 {
if len(DeprecatedServerlessPlatformKey) == 0 {
return
}

return matchedKey(infos, ServerlessPlatformKey)
return KeyMatched(infos, DeprecatedServerlessPlatformKey)
}

func SkipPrecheckEnable(infos map[string]string) (match bool) {
Expand All @@ -135,27 +161,5 @@ func SkipPrecheckEnable(infos map[string]string) (match bool) {

// enabled checks if the given name has a value of "true"
func enabled(infos map[string]string, name string) (match bool) {
return matchedValue(infos, name, common.True)
}

// matchedValue checks if the given name has the expected value
func matchedValue(infos map[string]string, name string, val string) (match bool) {
for key, value := range infos {
if key == name && value == val {
match = true
break
}
}
return
}

// matchedKey checks if the given name exists
func matchedKey(infos map[string]string, name string) (match bool) {
for key := range infos {
if key == name {
match = true
break
}
}
return
return KeyValueMatched(infos, name, common.True)
}
Loading
Loading