diff --git a/api/v1alpha1/constant.go b/api/v1alpha1/constant.go index 824870bfeaf..a0f28b7a64b 100644 --- a/api/v1alpha1/constant.go +++ b/api/v1alpha1/constant.go @@ -56,3 +56,14 @@ const ( // OnFuseChangedCleanPolicy cleans fuse pod when the fuse in runtime is updated and the fuse pod on some node is not needed OnFuseChangedCleanPolicy FuseCleanPolicy = "OnFuseChanged" ) + +type UpdateStrategy string + +const ( + // ReCreate is the default update strategy. + ReCreate UpdateStrategy = "ReCreate" + + InPlace UpdateStrategy = "InPlace" + + InPlaceIfPossible UpdateStrategy = "InPlaceIfPossible" +) diff --git a/api/v1alpha1/juicefsruntime_types.go b/api/v1alpha1/juicefsruntime_types.go index c0300dc30e1..688aede73e7 100644 --- a/api/v1alpha1/juicefsruntime_types.go +++ b/api/v1alpha1/juicefsruntime_types.go @@ -80,6 +80,12 @@ type JuiceFSRuntimeSpec struct { // RuntimeManagement defines policies when managing the runtime // +optional RuntimeManagement RuntimeManagement `json:"management,omitempty"` + + // UpdateStrategy defines the update policy for worker pod. + // +kubebuilder:validation:Enum=ReCreate;InPlace;InPlaceIfPossible + // +kubebuilder:default=ReCreate + // +optional + UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"` } // JuiceFSCompTemplateSpec is a description of the JuiceFS components diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 388655a6937..83186935b42 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -97,7 +97,12 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.ObjectRef": schema_fluid_cloudnative_fluid_api_v1alpha1_ObjectRef(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodState": schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodState(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateList": schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateList(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateSpec(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateStatus(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodMetadata": schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodState": schema_fluid_cloudnative_fluid_api_v1alpha1_PodState(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Prefer": schema_fluid_cloudnative_fluid_api_v1alpha1_Prefer(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Processor": schema_fluid_cloudnative_fluid_api_v1alpha1_Processor(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Require": schema_fluid_cloudnative_fluid_api_v1alpha1_Require(ref), @@ -4540,6 +4545,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_JuiceFSRuntimeSpec(ref common.R Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeManagement"), }, }, + "updateStrategy": { + SchemaProps: spec.SchemaProps{ + Description: "UpdateStrategy defines the update policy for worker pod.", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, @@ -5134,6 +5146,148 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_OperationStatus(ref common.Refe } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodState(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "PersistentPodState is the Schema for the PersistentPodState API", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "spec": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateSpec"), + }, + }, + "status": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateStatus"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateSpec", "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodStateStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "PersistentPodStateList contains a list of PersistentPodState", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodState"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PersistentPodState", "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"}, + } +} + +func schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "PersistentPodStateSpec defines the desired state of PersistentPodState", + Type: []string{"object"}, + }, + }, + } +} + +func schema_fluid_cloudnative_fluid_api_v1alpha1_PersistentPodStateStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "PersistentPodStateStatus defines the observed state of PersistentPodState", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "podStates": { + SchemaProps: spec.SchemaProps{ + Description: "\tPodStates is the pod name mapping to state.", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.PodState"), + }, + }, + }, + }, + }, + "lastUpdateTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.PodState", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -5179,6 +5333,26 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_PodMetadata(ref common.Referenc } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_PodState(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "nodeName": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"nodeName"}, + }, + }, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_Prefer(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/api/v1alpha1/persistentpodstate_types.go b/api/v1alpha1/persistentpodstate_types.go new file mode 100644 index 00000000000..4105fe8be69 --- /dev/null +++ b/api/v1alpha1/persistentpodstate_types.go @@ -0,0 +1,64 @@ +/* +Copyright 2025 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// PersistentPodStateSpec defines the desired state of PersistentPodState +type PersistentPodStateSpec struct { +} + +type PodState struct { + NodeName string `json:"nodeName"` +} + +// PersistentPodStateStatus defines the observed state of PersistentPodState +type PersistentPodStateStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // PodStates is the pod name mapping to state. + PodStates map[string]PodState `json:"podStates,omitempty"` + LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` +} + +// PersistentPodState is the Schema for the PersistentPodState API +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Namespaced +type PersistentPodState struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec PersistentPodStateSpec `json:"spec,omitempty"` + Status PersistentPodStateStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// PersistentPodStateList contains a list of PersistentPodState +type PersistentPodStateList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []PersistentPodState `json:"items"` +} + +func init() { + SchemeBuilder.Register(&PersistentPodState{}, &PersistentPodStateList{}) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 289be17d9c1..59f38708174 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -2238,6 +2238,106 @@ func (in *OperationStatus) DeepCopy() *OperationStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PersistentPodState) DeepCopyInto(out *PersistentPodState) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PersistentPodState. +func (in *PersistentPodState) DeepCopy() *PersistentPodState { + if in == nil { + return nil + } + out := new(PersistentPodState) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PersistentPodState) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PersistentPodStateList) DeepCopyInto(out *PersistentPodStateList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PersistentPodState, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PersistentPodStateList. +func (in *PersistentPodStateList) DeepCopy() *PersistentPodStateList { + if in == nil { + return nil + } + out := new(PersistentPodStateList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PersistentPodStateList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PersistentPodStateSpec) DeepCopyInto(out *PersistentPodStateSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PersistentPodStateSpec. +func (in *PersistentPodStateSpec) DeepCopy() *PersistentPodStateSpec { + if in == nil { + return nil + } + out := new(PersistentPodStateSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PersistentPodStateStatus) DeepCopyInto(out *PersistentPodStateStatus) { + *out = *in + if in.PodStates != nil { + in, out := &in.PodStates, &out.PodStates + *out = make(map[string]PodState, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.LastUpdateTime != nil { + in, out := &in.LastUpdateTime, &out.LastUpdateTime + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PersistentPodStateStatus. +func (in *PersistentPodStateStatus) DeepCopy() *PersistentPodStateStatus { + if in == nil { + return nil + } + out := new(PersistentPodStateStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodMetadata) DeepCopyInto(out *PodMetadata) { *out = *in @@ -2267,6 +2367,21 @@ func (in *PodMetadata) DeepCopy() *PodMetadata { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodState) DeepCopyInto(out *PodState) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodState. +func (in *PodState) DeepCopy() *PodState { + if in == nil { + return nil + } + out := new(PodState) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Prefer) DeepCopyInto(out *Prefer) { *out = *in diff --git a/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml index 08a3a764d42..5ad1fa2bd10 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml @@ -2882,6 +2882,14 @@ spec: type: object type: array type: object + updateStrategy: + default: ReCreate + description: UpdateStrategy defines the update policy for worker pod. + enum: + - ReCreate + - InPlace + - InPlaceIfPossible + type: string volumes: description: Volumes is the list of Kubernetes volumes that can be mounted by the alluxio runtime components and/or fuses. diff --git a/charts/fluid/fluid/crds/data.fluid.io_persistentpodstates.yaml b/charts/fluid/fluid/crds/data.fluid.io_persistentpodstates.yaml new file mode 100644 index 00000000000..ffc4cfbf0fd --- /dev/null +++ b/charts/fluid/fluid/crds/data.fluid.io_persistentpodstates.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: persistentpodstates.data.fluid.io +spec: + group: data.fluid.io + names: + kind: PersistentPodState + listKind: PersistentPodStateList + plural: persistentpodstates + singular: persistentpodstate + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: PersistentPodState is the Schema for the PersistentPodState API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: PersistentPodStateSpec defines the desired state of PersistentPodState + type: object + status: + description: PersistentPodStateStatus defines the observed state of PersistentPodState + properties: + lastUpdateTime: + format: date-time + type: string + podStates: + additionalProperties: + properties: + nodeName: + type: string + required: + - nodeName + type: object + description: "\tPodStates is the pod name mapping to state." + type: object + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/charts/fluid/fluid/templates/role/alluxio/rbac.yaml b/charts/fluid/fluid/templates/role/alluxio/rbac.yaml index 995eae54fe7..9a2a98d58fe 100644 --- a/charts/fluid/fluid/templates/role/alluxio/rbac.yaml +++ b/charts/fluid/fluid/templates/role/alluxio/rbac.yaml @@ -94,6 +94,8 @@ rules: - alluxiodataloads/status - alluxioruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/efc/rbac.yaml b/charts/fluid/fluid/templates/role/efc/rbac.yaml index b763418d551..f64b209bb93 100644 --- a/charts/fluid/fluid/templates/role/efc/rbac.yaml +++ b/charts/fluid/fluid/templates/role/efc/rbac.yaml @@ -93,6 +93,8 @@ rules: - datasets - efcruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/goosefs/rbac.yaml b/charts/fluid/fluid/templates/role/goosefs/rbac.yaml index 557c88c70dc..f401cd2bbf5 100644 --- a/charts/fluid/fluid/templates/role/goosefs/rbac.yaml +++ b/charts/fluid/fluid/templates/role/goosefs/rbac.yaml @@ -104,6 +104,8 @@ rules: - goosefsdataloads/status - goosefsruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/jindo/rbac.yaml b/charts/fluid/fluid/templates/role/jindo/rbac.yaml index 943f1e92e5b..ece8fe61236 100644 --- a/charts/fluid/fluid/templates/role/jindo/rbac.yaml +++ b/charts/fluid/fluid/templates/role/jindo/rbac.yaml @@ -92,6 +92,8 @@ rules: - datasets - jindoruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/juicefs/rbac.yaml b/charts/fluid/fluid/templates/role/juicefs/rbac.yaml index 123436ec663..089339fbffb 100644 --- a/charts/fluid/fluid/templates/role/juicefs/rbac.yaml +++ b/charts/fluid/fluid/templates/role/juicefs/rbac.yaml @@ -95,6 +95,8 @@ rules: - datasets - juicefsruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/thin/rbac.yaml b/charts/fluid/fluid/templates/role/thin/rbac.yaml index 524dc15f151..1b311d8f8fb 100644 --- a/charts/fluid/fluid/templates/role/thin/rbac.yaml +++ b/charts/fluid/fluid/templates/role/thin/rbac.yaml @@ -102,6 +102,8 @@ rules: - thinruntimes/status - thinruntimeprofiles/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/vineyard/rbac.yaml b/charts/fluid/fluid/templates/role/vineyard/rbac.yaml index 72a4a20caf0..f53e27fbf16 100644 --- a/charts/fluid/fluid/templates/role/vineyard/rbac.yaml +++ b/charts/fluid/fluid/templates/role/vineyard/rbac.yaml @@ -86,6 +86,8 @@ rules: - datasets - vineyardruntimes/status - datasets/status + - persistentpodstates + - persistentpodstates/status verbs: - get - list diff --git a/charts/fluid/fluid/templates/role/webhook/rabc.yaml b/charts/fluid/fluid/templates/role/webhook/rabc.yaml index 9c56c036341..5f6fba8ebb7 100644 --- a/charts/fluid/fluid/templates/role/webhook/rabc.yaml +++ b/charts/fluid/fluid/templates/role/webhook/rabc.yaml @@ -65,6 +65,7 @@ rules: - thinruntimes - efcruntimes - vineyardruntimes + - persistentpodstates verbs: - get - list diff --git a/charts/fluid/fluid/templates/webhook/webhookconfiguration.yaml b/charts/fluid/fluid/templates/webhook/webhookconfiguration.yaml index 99c5e5d23cf..ecbdfb42fae 100644 --- a/charts/fluid/fluid/templates/webhook/webhookconfiguration.yaml +++ b/charts/fluid/fluid/templates/webhook/webhookconfiguration.yaml @@ -46,4 +46,25 @@ webhooks: objectSelector: matchLabels: fuse.serverful.fluid.io/inject: "true" + - name: inplaceupdate.worker.fluid.io + rules: + - apiGroups: [ "" ] + apiVersions: [ "v1" ] + operations: [ "CREATE" ] + resources: [ "pods" ] + clientConfig: + service: + namespace: {{ include "fluid.namespace" . }} + name: fluid-pod-admission-webhook + path: "/mutate-fluid-io-v1alpha1-schedulepod" + port: 9443 + caBundle: Cg== + timeoutSeconds: {{ .Values.webhook.timeoutSeconds }} + failurePolicy: Fail + reinvocationPolicy: {{ .Values.webhook.reinvocationPolicy }} + sideEffects: None + admissionReviewVersions: [ "v1","v1beta1" ] + objectSelector: + matchLabels: + fluid.io/runtime-pod-type: "worker" {{- end }} diff --git a/charts/juicefs/templates/worker/persistent_podstate.yaml b/charts/juicefs/templates/worker/persistent_podstate.yaml new file mode 100644 index 00000000000..0dbd8237a73 --- /dev/null +++ b/charts/juicefs/templates/worker/persistent_podstate.yaml @@ -0,0 +1,20 @@ +apiVersion: data.fluid.io/v1alpha1 +kind: PersistentPodState +metadata: + name: {{ template "juicefs.fullname" . }}-pod-state + labels: + app: {{ template "juicefs.name" . }} + chart: {{ template "juicefs.chart" . }} + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} + fluid.io/configmap-type: worker-state + {{- include "library.fluid.labels" . | nindent 4 }} + ownerReferences: + {{- if .Values.owner.enabled }} + - apiVersion: {{ .Values.owner.apiVersion }} + blockOwnerDeletion: {{ .Values.owner.blockOwnerDeletion }} + controller: {{ .Values.owner.controller }} + kind: {{ .Values.owner.kind }} + name: {{ .Values.owner.name }} + uid: {{ .Values.owner.uid }} + {{- end }} \ No newline at end of file diff --git a/config/crd/bases/data.fluid.io_juicefsruntimes.yaml b/config/crd/bases/data.fluid.io_juicefsruntimes.yaml index 08a3a764d42..5ad1fa2bd10 100644 --- a/config/crd/bases/data.fluid.io_juicefsruntimes.yaml +++ b/config/crd/bases/data.fluid.io_juicefsruntimes.yaml @@ -2882,6 +2882,14 @@ spec: type: object type: array type: object + updateStrategy: + default: ReCreate + description: UpdateStrategy defines the update policy for worker pod. + enum: + - ReCreate + - InPlace + - InPlaceIfPossible + type: string volumes: description: Volumes is the list of Kubernetes volumes that can be mounted by the alluxio runtime components and/or fuses. diff --git a/config/crd/bases/data.fluid.io_persistentpodstates.yaml b/config/crd/bases/data.fluid.io_persistentpodstates.yaml new file mode 100644 index 00000000000..ffc4cfbf0fd --- /dev/null +++ b/config/crd/bases/data.fluid.io_persistentpodstates.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: persistentpodstates.data.fluid.io +spec: + group: data.fluid.io + names: + kind: PersistentPodState + listKind: PersistentPodStateList + plural: persistentpodstates + singular: persistentpodstate + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: PersistentPodState is the Schema for the PersistentPodState API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: PersistentPodStateSpec defines the desired state of PersistentPodState + type: object + status: + description: PersistentPodStateStatus defines the observed state of PersistentPodState + properties: + lastUpdateTime: + format: date-time + type: string + podStates: + additionalProperties: + properties: + nodeName: + type: string + required: + - nodeName + type: object + description: "\tPodStates is the pod name mapping to state." + type: object + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/pkg/common/constants.go b/pkg/common/constants.go index e715dd5e888..5611e9c04dd 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -197,6 +197,8 @@ const ( InjectAppPostStart = "app.poststart" + inject // app.poststart.fluid.io/inject InjectSidecarPostStart = "fuse.sidecar.poststart" + inject // fuse.sidecar.poststart.fluid.io/inject + InjectWorkerPodDone = "done.worker" + inject // done.worker.fluid.io/inject + injectServerful = ".serverful" + inject InjectServerfulFuse = "fuse" + injectServerful diff --git a/pkg/common/label.go b/pkg/common/label.go index 89f3b3f55bf..a544c176308 100644 --- a/pkg/common/label.go +++ b/pkg/common/label.go @@ -35,7 +35,7 @@ const ( LabelAnnotationDataset = LabelAnnotationPrefix + "dataset" // LabelAnnotationDatasetId indicates the uuid of the dataset - // i.e. fluid.io/dataset-uuid + // i.e. fluid.io/dataset-id LabelAnnotationDatasetId = LabelAnnotationDataset + "-id" // LabelAnnotationDatasetNum indicates the number of the dataset in specific node @@ -125,6 +125,15 @@ var ( LabelAnnotationPodSchedRegex = regexp.MustCompile(`^fluid\.io/dataset\.([A-Za-z0-9.-]*)\.sched$`) ) +const ( + // RuntimePodType is the label key for runtime pod type + RuntimePodType = "fluid.io/runtime-pod-type" + RuntimeWorkerPod = "worker" + + // AnnotationRuntimeName is the annotation key for the runtime name + AnnotationRuntimeName = LabelAnnotationPrefix + "runtime-name" +) + type OperationType string const ( diff --git a/pkg/ctrl/affinity.go b/pkg/ctrl/affinity.go index 0d5d72d7452..8be4cd8168e 100644 --- a/pkg/ctrl/affinity.go +++ b/pkg/ctrl/affinity.go @@ -81,7 +81,7 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { - Key: "fluid.io/dataset", + Key: common.LabelAnnotationDataset, Operator: metav1.LabelSelectorOpExists, }, }, @@ -100,7 +100,7 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { - Key: "fluid.io/dataset", + Key: common.LabelAnnotationDataset, Operator: metav1.LabelSelectorOpExists, }, }, diff --git a/pkg/ddc/base/label.go b/pkg/ddc/base/label.go index c1878926905..5c3b9189ca1 100644 --- a/pkg/ddc/base/label.go +++ b/pkg/ddc/base/label.go @@ -62,3 +62,7 @@ func (info *RuntimeInfo) GetExclusiveLabelValue() string { return utils.GetNamespacedNameValueWithPrefix("", info.namespace, info.name, info.ownerDatasetUID) } + +func (info *RuntimeInfo) GetWorkerPodStateName() string { + return info.name + "-pod-state" +} diff --git a/pkg/ddc/base/runtime.go b/pkg/ddc/base/runtime.go index e9724275c62..7df34eee6f1 100644 --- a/pkg/ddc/base/runtime.go +++ b/pkg/ddc/base/runtime.go @@ -55,6 +55,8 @@ type Conventions interface { GetWorkerStatefulsetName() string GetExclusiveLabelValue() string + + GetWorkerPodStateName() string } // Runtime Information interface defines the interfaces that should be implemented @@ -110,6 +112,10 @@ type RuntimeInfoInterface interface { GetAnnotations() map[string]string GetFuseMetricsScrapeTarget() mountModeSelector + + SetUpdateStrategy(strategy datav1alpha1.UpdateStrategy) + + GetUpdateStrategy() datav1alpha1.UpdateStrategy } var _ RuntimeInfoInterface = &RuntimeInfo{} @@ -145,6 +151,8 @@ type RuntimeInfo struct { annotations map[string]string metadataList []datav1alpha1.Metadata + + updateStrategy datav1alpha1.UpdateStrategy } type Fuse struct { @@ -384,6 +392,17 @@ func (info *RuntimeInfo) SetAPIReader(apiReader client.Reader) { info.apiReader = apiReader } +func (info *RuntimeInfo) SetUpdateStrategy(strategy datav1alpha1.UpdateStrategy) { + info.updateStrategy = strategy +} + +func (info *RuntimeInfo) GetUpdateStrategy() datav1alpha1.UpdateStrategy { + if len(info.updateStrategy) == 0 { + return datav1alpha1.ReCreate + } + return info.updateStrategy +} + func convertToTieredstoreInfo(tieredstore datav1alpha1.TieredStore) (TieredStoreInfo, error) { if len(tieredstore.Levels) == 0 { return TieredStoreInfo{}, nil @@ -521,6 +540,7 @@ func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(juicefsRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(juicefsRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetUpdateStrategy(juicefsRuntime.Spec.UpdateStrategy) case common.ThinRuntime: thinRuntime, err := utils.GetThinRuntime(reader, name, namespace) if err != nil { diff --git a/pkg/ddc/juicefs/transform.go b/pkg/ddc/juicefs/transform.go index 882b71a9b73..e67dd40bed8 100644 --- a/pkg/ddc/juicefs/transform.go +++ b/pkg/ddc/juicefs/transform.go @@ -262,6 +262,15 @@ func (j *JuiceFSEngine) transformPodMetadata(runtime *datav1alpha1.JuiceFSRuntim value.Worker.Annotations = utils.UnionMapsWithOverride(commonAnnotations, runtime.Spec.Worker.PodMetadata.Annotations) value.Fuse.Annotations = utils.UnionMapsWithOverride(commonAnnotations, runtime.Spec.Fuse.PodMetadata.Annotations) + // only Worker Pods requiring in-place updates trigger the mutating webhook + if runtime.Spec.UpdateStrategy == datav1alpha1.InPlaceIfPossible || + runtime.Spec.UpdateStrategy == datav1alpha1.InPlace { + // in-place update for worker pods used in webhook mutating. + value.Worker.Labels[common.RuntimePodType] = common.RuntimeWorkerPod + // the runtime name used in webhook mutating to mutate in-place updated worker pods. + value.Worker.Annotations[common.AnnotationRuntimeName] = runtime.Name + } + return nil } diff --git a/pkg/ddc/juicefs/transform_test.go b/pkg/ddc/juicefs/transform_test.go index 19f2b27567f..0bb012485de 100644 --- a/pkg/ddc/juicefs/transform_test.go +++ b/pkg/ddc/juicefs/transform_test.go @@ -17,6 +17,7 @@ limitations under the License. package juicefs import ( + "github.com/fluid-cloudnative/fluid/pkg/common" "reflect" "testing" @@ -183,13 +184,14 @@ func TestJuiceFSEngine_transformPodMetadata(t *testing.T) { Labels: map[string]string{"common-key": "common-value"}, Annotations: map[string]string{"common-annotation": "val"}, }, + UpdateStrategy: datav1alpha1.InPlaceIfPossible, }, }, Value: &JuiceFS{}, wantValue: &JuiceFS{ Worker: Worker{ - Labels: map[string]string{"common-key": "common-value"}, - Annotations: map[string]string{"common-annotation": "val"}, + Labels: map[string]string{"common-key": "common-value", common.RuntimePodType: common.RuntimeWorkerPod}, + Annotations: map[string]string{"common-annotation": "val", common.AnnotationRuntimeName: ""}, }, Fuse: Fuse{ Labels: map[string]string{"common-key": "common-value"}, @@ -205,10 +207,11 @@ func TestJuiceFSEngine_transformPodMetadata(t *testing.T) { Labels: map[string]string{"common-key": "common-value"}, Annotations: map[string]string{"common-annotation": "val"}, }, + UpdateStrategy: datav1alpha1.InPlaceIfPossible, Worker: datav1alpha1.JuiceFSCompTemplateSpec{ PodMetadata: datav1alpha1.PodMetadata{ - Labels: map[string]string{"common-key": "worker-value"}, - Annotations: map[string]string{"common-annotation": "worker-val"}, + Labels: map[string]string{"common-key": "worker-value", common.RuntimePodType: common.RuntimeWorkerPod}, + Annotations: map[string]string{"common-annotation": "worker-val", common.AnnotationRuntimeName: ""}, }, }, }, @@ -216,8 +219,8 @@ func TestJuiceFSEngine_transformPodMetadata(t *testing.T) { Value: &JuiceFS{}, wantValue: &JuiceFS{ Worker: Worker{ - Labels: map[string]string{"common-key": "worker-value"}, - Annotations: map[string]string{"common-annotation": "worker-val"}, + Labels: map[string]string{"common-key": "worker-value", common.RuntimePodType: common.RuntimeWorkerPod}, + Annotations: map[string]string{"common-annotation": "worker-val", common.AnnotationRuntimeName: ""}, }, Fuse: Fuse{ Labels: map[string]string{"common-key": "common-value"}, diff --git a/pkg/utils/dataset/lifecycle/node.go b/pkg/utils/dataset/lifecycle/node.go index dcd35bbff15..36ea10e6a23 100644 --- a/pkg/utils/dataset/lifecycle/node.go +++ b/pkg/utils/dataset/lifecycle/node.go @@ -19,7 +19,10 @@ package lifecycle import ( "context" "fmt" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "reflect" "strconv" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -68,7 +71,6 @@ func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client } return err } - workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector) workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector) @@ -107,9 +109,77 @@ func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client } } + // previous worker pod index to node name. + persistentPodState, err := kubeclient.GetPersistentPodState(client, runtimeInfo.GetWorkerPodStateName(), runtimeInfo.GetNamespace()) + if err != nil { + return err + } + // runtime not support + if persistentPodState == nil { + rootLog.V(1).Info("runtime not support in-place update, skip", "name", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace()) + return nil + } + + updatedPodState := persistentPodState.DeepCopy() + + // update configmap interval >= 1s, wait for next reconcile. + if updatedPodState.Status.LastUpdateTime != nil && time.Since(updatedPodState.Status.LastUpdateTime.Time) < time.Second { + rootLog.V(1).Info("update persistentPodState too fast, will wait for next.", "name", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace()) + return nil + } + + // insert new podState via current worker statefulset pods + for _, pod := range workerPods { + podNodeName := pod.Spec.NodeName + // 1. pod not ready, continue + if !podutil.IsPodReady(&pod) || len(podNodeName) == 0 { + continue + } + podState, ok := updatedPodState.Status.PodStates[pod.Name] + // 2. index already recorded + if ok && podState.NodeName == podNodeName { + continue + } + // 3. insert new index or replace + if updatedPodState.Status.PodStates == nil { + updatedPodState.Status.PodStates = make(map[string]datav1alpha1.PodState) + } + updatedPodState.Status.PodStates[pod.Name] = datav1alpha1.PodState{ + NodeName: podNodeName, + } + } + + replicas := int(*workers.Spec.Replicas) + + // delete old podState when index >= replicas, scale down statefulSet scenario + for podName := range updatedPodState.Status.PodStates { + index, err := parseStsPodIndex(podName) + if err != nil { + rootLog.Info(fmt.Sprintf("failed to parse statefulset pod name: %v, ignore it and continue", err), "name", podName) + continue + } + // index starts from 0 to replicas - 1 + if index >= replicas { + rootLog.Info(fmt.Sprintf("remove pod state for index: %d", index)) + delete(updatedPodState.Status.PodStates, podName) + } + } + + if !reflect.DeepEqual(persistentPodState.Status, updatedPodState.Status) { + updatedPodState.Status.LastUpdateTime = &metav1.Time{Time: time.Now()} + rootLog.Info("update podpersistentstate status", "status", updatedPodState.Status) + if err = kubeclient.UpdatePersistentPodStateStatus(client, updatedPodState); err != nil { + return err + } + } return nil } +func parseStsPodIndex(podName string) (int, error) { + index := strings.LastIndex(podName, "-") + return strconv.Atoi(podName[index+1:]) +} + func addScheduleInfoToNode(nodeName string, runtimeInfo base.RuntimeInfoInterface, client client.Client) (err error) { node := corev1.Node{} err = client.Get(context.TODO(), types.NamespacedName{ diff --git a/pkg/utils/kubeclient/persistent_podstate.go b/pkg/utils/kubeclient/persistent_podstate.go new file mode 100644 index 00000000000..87c53181d24 --- /dev/null +++ b/pkg/utils/kubeclient/persistent_podstate.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubeclient + +import ( + "context" + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetPersistentPodState gets PersistentPodState with given name and namespace of the configmap. +func GetPersistentPodState(client client.Client, name, namespace string) (persistentPodState *v1alpha1.PersistentPodState, err error) { + key := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + + persistentPodState = &v1alpha1.PersistentPodState{} + + if err = client.Get(context.TODO(), key, persistentPodState); err != nil { + if apierrs.IsNotFound(err) { + err = nil + persistentPodState = nil + } + return persistentPodState, err + } + + return +} + +func UpdatePersistentPodStateStatus(client client.Client, persistentPodState *v1alpha1.PersistentPodState) (err error) { + err = client.Status().Update(context.TODO(), persistentPodState) + return +} diff --git a/pkg/webhook/handler/mutating/mutating_handler.go b/pkg/webhook/handler/mutating/mutating_handler.go index 1e52805f006..6e8ae3c665e 100644 --- a/pkg/webhook/handler/mutating/mutating_handler.go +++ b/pkg/webhook/handler/mutating/mutating_handler.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "net/http" "time" @@ -86,32 +88,42 @@ func (a *FluidMutatingHandler) Handle(ctx context.Context, req admission.Request undoNamespaceOverride = true } - // check whether should inject - if common.CheckExpectValue(pod.Labels, common.EnableFluidInjectionFlag, common.False) { - setupLog.Info("skip mutating the pod because injection is disabled", "Pod", pod.Name, "Namespace", pod.Namespace) - return admission.Allowed("skip mutating the pod because injection is disabled") - } - if common.CheckExpectValue(pod.Labels, common.InjectSidecarDone, common.True) { - setupLog.Info("skip mutating the pod because injection is done", "Pod", pod.Name, "Namespace", pod.Namespace) - return admission.Allowed("skip mutating the pod because injection is done") - } - - backupPod := pod.DeepCopy() - if err := a.MutatePod(pod, false); err != nil { - setupLog.Error(err, "failed to mutate pod with cache client", "Pod", pod.Name, "Namespace", pod.Namespace) - if webhookutils.IsNeedRetryWithApiReaderError(err) { - setupLog.Info("retrying with API reader", - "namespace", pod.Namespace, - "pod", pod.Name, - "reason", err.Error(), - ) - pod = backupPod - if err := a.MutatePod(pod, true); err != nil { - return admission.Errored(http.StatusInternalServerError, err) + // mutating runtime worker pod + if common.CheckExpectValue(pod.Labels, common.RuntimePodType, common.RuntimeWorkerPod) { + // mutating app pod, check whether should inject + if common.CheckExpectValue(pod.Labels, common.InjectWorkerPodDone, common.True) { + setupLog.Info("skip mutating the worker pod because injection is done", "Pod", pod.Name, "Namespace", pod.Namespace) + return admission.Allowed("skip mutating the worker pod because injection is done") + } + err = a.MutateRuntimeWorkerPod(pod) + } else { + // mutating app pod, check whether should inject + if common.CheckExpectValue(pod.Labels, common.EnableFluidInjectionFlag, common.False) { + setupLog.Info("skip mutating the pod because injection is disabled", "Pod", pod.Name, "Namespace", pod.Namespace) + return admission.Allowed("skip mutating the pod because injection is disabled") + } + if common.CheckExpectValue(pod.Labels, common.InjectSidecarDone, common.True) { + setupLog.Info("skip mutating the pod because injection is done", "Pod", pod.Name, "Namespace", pod.Namespace) + return admission.Allowed("skip mutating the pod because injection is done") + } + backupPod := pod.DeepCopy() + if err = a.MutatePod(pod, false); err != nil { + setupLog.Error(err, "failed to mutate pod with cache client", "Pod", pod.Name, "Namespace", pod.Namespace) + if webhookutils.IsNeedRetryWithApiReaderError(err) { + setupLog.Info("retrying with API reader", + "namespace", pod.Namespace, + "pod", pod.Name, + "reason", err.Error(), + ) + pod = backupPod + err = a.MutatePod(pod, true) } } } + if err != nil { + return admission.Errored(http.StatusInternalServerError, err) + } if undoNamespaceOverride { pod.Namespace = "" } @@ -127,6 +139,94 @@ func (a *FluidMutatingHandler) Handle(ctx context.Context, req admission.Request return resp } +func (a *FluidMutatingHandler) MutateRuntimeWorkerPod(pod *corev1.Pod) (err error) { + if utils.IsTimeTrackerDebugEnabled() { + defer utils.TimeTrack(time.Now(), "AddAffinityToWorkerPod", + "pod.name", pod.GetName(), "pod.namespace", pod.GetNamespace()) + } + var setupLog = ctrl.Log.WithName("AddAffinityToWorkerPod") + + // use annotation for runtime name as the value for label 'fluid.io/dataset' may be the uuid when length >= 64. + runtimeName, ok := pod.Annotations[common.AnnotationRuntimeName] + if !ok { + setupLog.Info("no runtimeName found in pod, skip mutating the pod", "Pod", pod.Name, "Namespace", pod.Namespace) + return + } + + runtimeNamespace := pod.Namespace + runtimeInfo, err := base.GetRuntimeInfo(a.Client, runtimeName, runtimeNamespace) + if err != nil { + setupLog.Info("runtime may not be ready, skip mutating worker pods", "Pod", pod.Name, "Namespace", pod.Namespace, "error", err) + return nil + } + + // add label for injection done + pod.ObjectMeta.Labels[common.InjectWorkerPodDone] = common.True + + updateStrategy := runtimeInfo.GetUpdateStrategy() + if updateStrategy == datav1alpha1.ReCreate { + setupLog.Info("updateStrategy is ReCreate, skip mutating the pod", "Pod", pod.Name, "Namespace", pod.Namespace) + return nil + } + + persistentPodState, err := kubeclient.GetPersistentPodState(a.Client, runtimeInfo.GetWorkerPodStateName(), runtimeNamespace) + if err != nil { + return err + } + // config map not created, the statefulset is being created now. + if persistentPodState == nil { + setupLog.Info("no PersistentPodState found in runtime, skip mutating the pod", "Pod", pod.Name, "Namespace", pod.Namespace) + return nil + } + + // states are not sync yet. + podState, ok := persistentPodState.Status.PodStates[pod.Name] + if !ok { + return + } + + nodeName := podState.NodeName + + setupLog.Info("add terms to pod", "pod", pod) + + if updateStrategy == datav1alpha1.InPlace { + terms := []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{nodeName}, + }, + }, + }, + } + utils.InjectNodeSelectorTerms(terms, pod) + return + } + + if updateStrategy == datav1alpha1.InPlaceIfPossible { + terms := []corev1.PreferredSchedulingTerm{ + { + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: common.K8sNodeNameLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{nodeName}, + }, + }, + }, + Weight: 100, + }, + } + utils.InjectPreferredSchedulingTerms(terms, pod) + return + } + + return nil +} + // MutatePod will call all plugins to get total prefer info func (a *FluidMutatingHandler) MutatePod(pod *corev1.Pod, useDirectReader bool) (err error) { handlerClient := a.Reader