diff --git a/xset/api/well_knowns.go b/xset/api/well_knowns.go index 76199f01..2ace1048 100644 --- a/xset/api/well_knowns.go +++ b/xset/api/well_knowns.go @@ -88,6 +88,12 @@ const ( // XExcludeIndicationLabelKey is used to indicate a target is excluded by xset XExcludeIndicationLabelKey + // SubResourcePvcTemplateLabelKey is used to attach pvc template name to pvc resources + SubResourcePvcTemplateLabelKey + + // SubResourcePvcTemplateHashLabelKey is used to attach hash of pvc template to pvc subresource + SubResourcePvcTemplateHashLabelKey + // LastXStatusAnnotationKey is used to record the last status of a target by xset LastXStatusAnnotationKey @@ -110,18 +116,20 @@ var defaultXSetLabelAnnotationManager = map[XSetLabelAnnotationEnum]string{ ServiceAvailableLabel: appsv1alpha1.PodServiceAvailableLabel, PreparingDeleteLabel: appsv1alpha1.PodPreparingDeleteLabel, - ControlledByXSetLabel: appsv1alpha1.ControlledByKusionStackLabelKey, - XInstanceIdLabelKey: appsv1alpha1.PodInstanceIDLabelKey, - XSetUpdateIndicationLabelKey: appsv1alpha1.CollaSetUpdateIndicateLabelKey, - XDeletionIndicationLabelKey: appsv1alpha1.PodDeletionIndicationLabelKey, - XReplaceIndicationLabelKey: appsv1alpha1.PodReplaceIndicationLabelKey, - XReplacePairNewId: appsv1alpha1.PodReplacePairNewId, - XReplacePairOriginName: appsv1alpha1.PodReplacePairOriginName, - XReplaceByReplaceUpdateLabelKey: appsv1alpha1.PodReplaceByReplaceUpdateLabelKey, - XOrphanedIndicationLabelKey: appsv1alpha1.PodOrphanedIndicateLabelKey, - XCreatingLabel: appsv1alpha1.PodCreatingLabel, - XCompletingLabel: appsv1alpha1.PodCompletingLabel, - XExcludeIndicationLabelKey: appsv1alpha1.PodExcludeIndicationLabelKey, + ControlledByXSetLabel: appsv1alpha1.ControlledByKusionStackLabelKey, + XInstanceIdLabelKey: appsv1alpha1.PodInstanceIDLabelKey, + XSetUpdateIndicationLabelKey: appsv1alpha1.CollaSetUpdateIndicateLabelKey, + XDeletionIndicationLabelKey: appsv1alpha1.PodDeletionIndicationLabelKey, + XReplaceIndicationLabelKey: appsv1alpha1.PodReplaceIndicationLabelKey, + XReplacePairNewId: appsv1alpha1.PodReplacePairNewId, + XReplacePairOriginName: appsv1alpha1.PodReplacePairOriginName, + XReplaceByReplaceUpdateLabelKey: appsv1alpha1.PodReplaceByReplaceUpdateLabelKey, + XOrphanedIndicationLabelKey: appsv1alpha1.PodOrphanedIndicateLabelKey, + XCreatingLabel: appsv1alpha1.PodCreatingLabel, + XCompletingLabel: appsv1alpha1.PodCompletingLabel, + XExcludeIndicationLabelKey: appsv1alpha1.PodExcludeIndicationLabelKey, + SubResourcePvcTemplateLabelKey: appsv1alpha1.PvcTemplateLabelKey, + SubResourcePvcTemplateHashLabelKey: appsv1alpha1.PvcTemplateHashLabelKey, LastXStatusAnnotationKey: appsv1alpha1.LastPodStatusAnnotationKey, } diff --git a/xset/api/xset_controller_types.go b/xset/api/xset_controller_types.go index 361fff68..ee2ca2f1 100644 --- a/xset/api/xset_controller_types.go +++ b/xset/api/xset_controller_types.go @@ -20,6 +20,7 @@ import ( "context" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -56,6 +57,7 @@ type XOperation interface { CheckScheduled(object client.Object) bool CheckReadyTime(object client.Object) (bool, *metav1.Time) CheckAvailable(object client.Object) bool + CheckInactive(object client.Object) bool GetXOpsPriority(ctx context.Context, c client.Client, object client.Object) (*OpsPriority, error) } @@ -74,3 +76,22 @@ type ResourceContextAdapterGetter interface { type LabelAnnotationManagerGetter interface { GetLabelManagerAdapter() XSetLabelAnnotationManager } + +// SubResourcePvcAdapter is used to manage pvc subresource for X, which are declared on XSet, e.g., spec.volumeClaimTemplate. +// Once adapter is implemented, XSetController will automatically manage pvc: (1) create pvcs from GetXSetPvcTemplate for each +// X object and attach theses pvcs with same instance-id, (2) upgrade pvcs and recreate X object pvcs when PvcTemplateChanged, +// (3) retain pvcs when XSet is deleted or scaledIn according to RetainPvcWhenXSetDeleted and RetainPvcWhenXSetScaled. +type SubResourcePvcAdapter interface { + // RetainPvcWhenXSetDeleted returns true if pvc should be retained when XSet is deleted. + RetainPvcWhenXSetDeleted(object XSetObject) bool + // RetainPvcWhenXSetScaled returns true if pvc should be retained when XSet replicas is scaledIn. + RetainPvcWhenXSetScaled(object XSetObject) bool + // GetXSetPvcTemplate returns pvc template from XSet object. + GetXSetPvcTemplate(object XSetObject) []corev1.PersistentVolumeClaim + // GetXSpecVolumes returns spec.volumes from X object. + GetXSpecVolumes(object client.Object) []corev1.Volume + // GetXVolumeMounts returns containers volumeMounts from X (pod) object. + GetXVolumeMounts(object client.Object) []corev1.VolumeMount + // SetXSpecVolumes sets spec.volumes to X object. + SetXSpecVolumes(object client.Object, pvcs []corev1.Volume) +} diff --git a/xset/resourcecontexts/default_adapters.go b/xset/resourcecontexts/default_adapters.go index 73a43872..240f6f1d 100644 --- a/xset/resourcecontexts/default_adapters.go +++ b/xset/resourcecontexts/default_adapters.go @@ -36,6 +36,12 @@ var defaultResourceContextKeys = map[api.ResourceContextKeyEnum]string{ api.EnumReplaceOriginTargetIDContextDataKey: "ReplaceOriginTargetID", } +type ResourceContextAdapterGetter struct{} + +func (r *ResourceContextAdapterGetter) GetResourceContextAdapter() api.ResourceContextAdapter { + return &DefaultResourceContextAdapter{} +} + // DefaultResourceContextAdapter is the adapter to api apps.kusionstack.io.resourcecontexts type DefaultResourceContextAdapter struct{} diff --git a/xset/subresources/getter.go b/xset/subresources/getter.go new file mode 100644 index 00000000..3c9349ff --- /dev/null +++ b/xset/subresources/getter.go @@ -0,0 +1,24 @@ +/* + * Copyright 2024-2025 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subresources + +import "kusionstack.io/kube-utils/xset/api" + +func GetSubresourcePvcAdapter(control api.XSetController) (adapter api.SubResourcePvcAdapter, enabled bool) { + adapter, enabled = control.(api.SubResourcePvcAdapter) + return adapter, enabled +} diff --git a/xset/subresources/pvc_control.go b/xset/subresources/pvc_control.go new file mode 100644 index 00000000..eca2021a --- /dev/null +++ b/xset/subresources/pvc_control.go @@ -0,0 +1,557 @@ +/* + * Copyright 2024-2025 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subresources + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + appsv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + kubeutilclient "kusionstack.io/kube-utils/client" + "kusionstack.io/kube-utils/controller/expectations" + "kusionstack.io/kube-utils/controller/mixin" + refmanagerutil "kusionstack.io/kube-utils/controller/refmanager" + "kusionstack.io/kube-utils/xset/api" +) + +const ( + FieldIndexOwnerRefUID = "ownerRefUID" +) + +var PVCGvk = corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim") + +type PvcControl interface { + GetFilteredPvcs(context.Context, api.XSetObject) ([]*corev1.PersistentVolumeClaim, error) + CreateTargetPvcs(context.Context, api.XSetObject, client.Object, []*corev1.PersistentVolumeClaim) error + DeleteTargetPvcs(context.Context, api.XSetObject, client.Object, []*corev1.PersistentVolumeClaim) error + DeleteTargetUnusedPvcs(context.Context, api.XSetObject, client.Object, []*corev1.PersistentVolumeClaim) error + OrphanPvc(context.Context, api.XSetObject, *corev1.PersistentVolumeClaim) error + AdoptPvc(context.Context, api.XSetObject, *corev1.PersistentVolumeClaim) error + AdoptPvcsLeftByRetainPolicy(context.Context, api.XSetObject) ([]*corev1.PersistentVolumeClaim, error) + IsTargetPvcTmpChanged(api.XSetObject, client.Object, []*corev1.PersistentVolumeClaim) (bool, error) + RetainPvcWhenXSetDeleted(xset api.XSetObject) bool + RetainPvcWhenXSetScaled(xset api.XSetObject) bool +} + +type RealPvcControl struct { + client client.Client + scheme *runtime.Scheme + pvcAdapter api.SubResourcePvcAdapter + expectations *expectations.CacheExpectations + xsetLabelAnnoMgr api.XSetLabelAnnotationManager + xsetController api.XSetController +} + +func NewRealPvcControl(mixin *mixin.ReconcilerMixin, expectations *expectations.CacheExpectations, xsetLabelAnnoMgr api.XSetLabelAnnotationManager, xsetController api.XSetController) (PvcControl, error) { + // requires implementation of SubResourcePvcAdapter + pvcAdapter, ok := GetSubresourcePvcAdapter(xsetController) + if !ok { + return nil, nil + } + // here we go, set up cache and return real pvc control + if err := setUpCache(mixin.Cache, xsetController); err != nil { + return nil, err + } + return &RealPvcControl{ + client: mixin.Client, + scheme: mixin.Scheme, + pvcAdapter: pvcAdapter, + expectations: expectations, + xsetLabelAnnoMgr: xsetLabelAnnoMgr, + xsetController: xsetController, + }, nil +} + +func (pc *RealPvcControl) GetFilteredPvcs(ctx context.Context, xset api.XSetObject) ([]*corev1.PersistentVolumeClaim, error) { + // list pvcs using ownerReference + var filteredPvcs []*corev1.PersistentVolumeClaim + ownedPvcList := &corev1.PersistentVolumeClaimList{} + if err := pc.client.List(ctx, ownedPvcList, &client.ListOptions{ + Namespace: xset.GetNamespace(), + FieldSelector: fields.OneTermEqualSelector(FieldIndexOwnerRefUID, string(xset.GetUID())), + }); err != nil { + return nil, err + } + + for i := range ownedPvcList.Items { + pvc := &ownedPvcList.Items[i] + if pvc.DeletionTimestamp == nil { + filteredPvcs = append(filteredPvcs, pvc) + } + } + return filteredPvcs, nil +} + +func (pc *RealPvcControl) CreateTargetPvcs(ctx context.Context, xset api.XSetObject, x client.Object, existingPvcs []*corev1.PersistentVolumeClaim) error { + id, exist := pc.xsetLabelAnnoMgr.Get(x.GetLabels(), api.XInstanceIdLabelKey) + if !exist { + return nil + } + + // provision pvcs related to pod using pvc template, and reuse + // pvcs if "instance-id" and "pvc-template-hash" label matched + pvcsMap, err := pc.provisionUpdatedPvc(ctx, id, xset, existingPvcs) + if err != nil { + return err + } + + newVolumes := make([]corev1.Volume, 0, len(pvcsMap)) + // mount updated pvcs to target + for name, pvc := range pvcsMap { + volume := corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + ReadOnly: false, + }, + }, + } + newVolumes = append(newVolumes, volume) + } + + // append legacy pvcs + currentVolumes := pc.pvcAdapter.GetXSpecVolumes(x) + for i := range currentVolumes { + currentVolume := currentVolumes[i] + if _, ok := pvcsMap[currentVolume.Name]; !ok { + newVolumes = append(newVolumes, currentVolume) + } + } + pc.pvcAdapter.SetXSpecVolumes(x, newVolumes) + return nil +} + +func (pc *RealPvcControl) provisionUpdatedPvc(ctx context.Context, id string, xset api.XSetObject, existingPvcs []*corev1.PersistentVolumeClaim) (map[string]*corev1.PersistentVolumeClaim, error) { + updatedPvcs, _, err := pc.classifyTargetPvcs(id, xset, existingPvcs) + if err != nil { + return nil, err + } + + templates := pc.pvcAdapter.GetXSetPvcTemplate(xset) + for i := range templates { + pvcTmp := templates[i] + // reuse pvc + if _, exist := updatedPvcs[pvcTmp.Name]; exist { + continue + } + // create new pvc + claim, err := pc.buildPvcWithHash(id, xset, &pvcTmp) + if err != nil { + return nil, err + } + + if err := pc.client.Create(ctx, claim); err != nil { + return nil, fmt.Errorf("fail to create pvc for id %s: %w", id, err) + } + + if err := pc.expectations.ExpectCreation( + kubeutilclient.ObjectKeyString(xset), + PVCGvk, + claim.Namespace, + claim.Name, + ); err != nil { + return nil, err + } + + updatedPvcs[pvcTmp.Name] = claim + } + return updatedPvcs, nil +} + +func (pc *RealPvcControl) DeleteTargetPvcs(ctx context.Context, xset api.XSetObject, x client.Object, pvcs []*corev1.PersistentVolumeClaim) error { + for _, pvc := range pvcs { + if pvc.Labels == nil || x.GetLabels() == nil { + continue + } + + // only delete pvcs used by target + pvcId, _ := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.XInstanceIdLabelKey) + targetId, _ := pc.xsetLabelAnnoMgr.Get(x.GetLabels(), api.XInstanceIdLabelKey) + if pvcId != targetId { + continue + } + + if err := deletePvcWithExpectations(ctx, pc.client, xset, pc.expectations, pvc); err != nil { + return err + } + } + return nil +} + +func (pc *RealPvcControl) DeleteTargetUnusedPvcs(ctx context.Context, xset api.XSetObject, x client.Object, existingPvcs []*corev1.PersistentVolumeClaim) error { + if x.GetLabels() == nil { + return nil + } + id, exist := pc.xsetLabelAnnoMgr.Get(x.GetLabels(), api.XInstanceIdLabelKey) + if !exist { + return nil + } + + newPvcs, oldPvcs, err := pc.classifyTargetPvcs(id, xset, existingPvcs) + if err != nil { + return err + } + + volumeMounts := pc.pvcAdapter.GetXVolumeMounts(x) + mountedVolumeTmps := sets.String{} + for i := range volumeMounts { + mountedVolumeTmps.Insert(volumeMounts[i].Name) + } + + // delete pvc which is not claimed in templates + if err := pc.deleteUnclaimedPvcs(ctx, xset, oldPvcs, mountedVolumeTmps); err != nil { + return err + } + // delete old pvc if new pvc is provisioned and not RetainPVCWhenXSetScaled + if !pc.pvcAdapter.RetainPvcWhenXSetScaled(xset) { + return pc.deleteOldPvcs(ctx, xset, newPvcs, oldPvcs) + } + return nil +} + +func (pc *RealPvcControl) AdoptPvc(ctx context.Context, xset api.XSetObject, pvc *corev1.PersistentVolumeClaim) error { + xsetSpec := pc.xsetController.GetXSetSpec(xset) + if xsetSpec.Selector.MatchLabels == nil { + return nil + } + refWriter := refmanagerutil.NewOwnerRefWriter(pc.client) + matcher, err := refmanagerutil.LabelSelectorAsMatch(xsetSpec.Selector) + if err != nil { + return fmt.Errorf("fail to create labelSelector matcher: %s", err.Error()) + } + refManager := refmanagerutil.NewObjectControllerRefManager(refWriter, xset, xset.GetObjectKind().GroupVersionKind(), matcher) + + if _, err := refManager.Claim(ctx, pvc); err != nil { + return fmt.Errorf("failed to adopt pvc: %s", err.Error()) + } + return nil +} + +func (pc *RealPvcControl) OrphanPvc(ctx context.Context, xset api.XSetObject, pvc *corev1.PersistentVolumeClaim) error { + xsetSpec := pc.xsetController.GetXSetSpec(xset) + if xsetSpec.Selector.MatchLabels == nil { + return nil + } + if pvc.Labels == nil { + pvc.Labels = make(map[string]string) + } + if pvc.Annotations == nil { + pvc.Annotations = make(map[string]string) + } + + refWriter := refmanagerutil.NewOwnerRefWriter(pc.client) + if err := refWriter.Release(ctx, xset, pvc); err != nil { + return fmt.Errorf("failed to orphan target: %s", err.Error()) + } + return nil +} + +func (pc *RealPvcControl) AdoptPvcsLeftByRetainPolicy(ctx context.Context, xset api.XSetObject) ([]*corev1.PersistentVolumeClaim, error) { + xsetSpec := pc.xsetController.GetXSetSpec(xset) + ownerSelector := xsetSpec.Selector.DeepCopy() + if ownerSelector.MatchLabels == nil { + ownerSelector.MatchLabels = map[string]string{} + } + ownerSelector.MatchLabels[pc.xsetLabelAnnoMgr.Value(api.ControlledByXSetLabel)] = "true" + ownerSelector.MatchExpressions = append(ownerSelector.MatchExpressions, metav1.LabelSelectorRequirement{ // nolint + Key: pc.xsetLabelAnnoMgr.Value(api.XOrphanedIndicationLabelKey), // should not be excluded pvcs + Operator: metav1.LabelSelectorOpDoesNotExist, + }) + ownerSelector.MatchExpressions = append(ownerSelector.MatchExpressions, metav1.LabelSelectorRequirement{ + Key: pc.xsetLabelAnnoMgr.Value(api.XInstanceIdLabelKey), // instance-id label should exist + Operator: metav1.LabelSelectorOpExists, + }) + ownerSelector.MatchExpressions = append(ownerSelector.MatchExpressions, metav1.LabelSelectorRequirement{ + Key: pc.xsetLabelAnnoMgr.Value(api.SubResourcePvcTemplateHashLabelKey), // pvc-hash label should exist + Operator: metav1.LabelSelectorOpExists, + }) + + selector, err := metav1.LabelSelectorAsSelector(ownerSelector) + if err != nil { + return nil, err + } + + orphanedPvcList := &corev1.PersistentVolumeClaimList{} + if err := pc.client.List(ctx, orphanedPvcList, &client.ListOptions{Namespace: xset.GetNamespace(), LabelSelector: selector}); err != nil { + return nil, err + } + + // adopt orphaned pvcs + var claims []*corev1.PersistentVolumeClaim + for i := range orphanedPvcList.Items { + pvc := orphanedPvcList.Items[i] + if pvc.OwnerReferences != nil && len(pvc.OwnerReferences) > 0 { + continue + } + if pvc.Labels == nil { + pvc.Labels = make(map[string]string) + } + if pvc.Annotations == nil { + pvc.Annotations = make(map[string]string) + } + + claims = append(claims, &pvc) + } + for i := range claims { + if err := pc.AdoptPvc(ctx, xset, claims[i]); err != nil { + return nil, err + } + } + return claims, nil +} + +func (pc *RealPvcControl) IsTargetPvcTmpChanged(xset api.XSetObject, x client.Object, existingPvcs []*corev1.PersistentVolumeClaim) (bool, error) { + pvcTemplates := pc.pvcAdapter.GetXSetPvcTemplate(xset) + xSpecVolumes := pc.pvcAdapter.GetXSpecVolumes(x) + // get pvc template hash values + newHashMapping, err := PvcTmpHashMapping(pvcTemplates) + if err != nil { + return false, err + } + + // get existing x pvcs hash values + existingPvcHash := map[string]string{} + for _, pvc := range existingPvcs { + if pvc.Labels == nil || x.GetLabels() == nil { + continue + } + pvcId, _ := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.XInstanceIdLabelKey) + targetId, _ := pc.xsetLabelAnnoMgr.Get(x.GetLabels(), api.XInstanceIdLabelKey) + if pvcId != targetId { + continue + } + if _, exist := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.SubResourcePvcTemplateHashLabelKey); !exist { + continue + } + existingPvcHash[pvc.Name] = pvc.Labels[appsv1alpha1.PvcTemplateHashLabelKey] + } + + // check mounted pvcs changed + for i := range xSpecVolumes { + volume := xSpecVolumes[i] + if volume.PersistentVolumeClaim == nil || volume.PersistentVolumeClaim.ClaimName == "" { + continue + } + pvcName := volume.PersistentVolumeClaim.ClaimName + TmpName := volume.Name + if newHashMapping[TmpName] != existingPvcHash[pvcName] { + return true, nil + } + } + return false, nil +} + +func (pc *RealPvcControl) RetainPvcWhenXSetDeleted(xset api.XSetObject) bool { + return pc.pvcAdapter.RetainPvcWhenXSetDeleted(xset) +} + +func (pc *RealPvcControl) RetainPvcWhenXSetScaled(xset api.XSetObject) bool { + return pc.pvcAdapter.RetainPvcWhenXSetScaled(xset) +} + +func (pc *RealPvcControl) deleteUnclaimedPvcs(ctx context.Context, xset api.XSetObject, oldPvcs map[string]*corev1.PersistentVolumeClaim, mountedPvcNames sets.String) error { + inUsedPvcNames := sets.String{} + templates := pc.pvcAdapter.GetXSetPvcTemplate(xset) + for i := range templates { + inUsedPvcNames.Insert(templates[i].Name) + } + for pvcTmpName, pvc := range oldPvcs { + // if pvc is still mounted on target, keep it + if mountedPvcNames.Has(pvcTmpName) { + continue + } + + // is pvc is claimed in pvc templates, keep it + if inUsedPvcNames.Has(pvcTmpName) { + continue + } + + if err := deletePvcWithExpectations(ctx, pc.client, xset, pc.expectations, pvc); err != nil { + return err + } + } + return nil +} + +func (pc *RealPvcControl) deleteOldPvcs(ctx context.Context, xset api.XSetObject, newPvcs, oldPvcs map[string]*corev1.PersistentVolumeClaim) error { + for pvcTmpName, pvc := range oldPvcs { + if _, newPvcExist := newPvcs[pvcTmpName]; !newPvcExist { + continue + } + if err := deletePvcWithExpectations(ctx, pc.client, xset, pc.expectations, pvc); err != nil { + return err + } + } + return nil +} + +func (pc *RealPvcControl) buildPvcWithHash(id string, xset api.XSetObject, pvcTmp *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { + claim := pvcTmp.DeepCopy() + claim.Name = "" + claim.GenerateName = fmt.Sprintf("%s-%s-", xset.GetName(), pvcTmp.Name) + claim.Namespace = xset.GetNamespace() + xsetMeta := pc.xsetController.XSetMeta() + xsetGvk := xsetMeta.GroupVersionKind() + claim.OwnerReferences = append(claim.OwnerReferences, + *metav1.NewControllerRef(xset, xsetGvk)) + + if claim.Labels == nil { + claim.Labels = map[string]string{} + } + xsetSpec := pc.xsetController.GetXSetSpec(xset) + for k, v := range xsetSpec.Selector.MatchLabels { + claim.Labels[k] = v + } + pc.xsetLabelAnnoMgr.Set(claim, api.ControlledByXSetLabel, "true") + + hash, err := PvcTmpHash(pvcTmp) + if err != nil { + return nil, err + } + pc.xsetLabelAnnoMgr.Set(claim, api.SubResourcePvcTemplateHashLabelKey, hash) + pc.xsetLabelAnnoMgr.Set(claim, api.XInstanceIdLabelKey, id) + pc.xsetLabelAnnoMgr.Set(claim, api.SubResourcePvcTemplateLabelKey, pvcTmp.Name) + return claim, nil +} + +// classify pvcs into old and new ones +func (pc *RealPvcControl) classifyTargetPvcs(id string, xset api.XSetObject, existingPvcs []*corev1.PersistentVolumeClaim) (map[string]*corev1.PersistentVolumeClaim, map[string]*corev1.PersistentVolumeClaim, error) { + newPvcs := map[string]*corev1.PersistentVolumeClaim{} + oldPvcs := map[string]*corev1.PersistentVolumeClaim{} + + newPvcTemplates := pc.pvcAdapter.GetXSetPvcTemplate(xset) + newTmpHash, err := PvcTmpHashMapping(newPvcTemplates) + if err != nil { + return newPvcs, oldPvcs, err + } + + for _, pvc := range existingPvcs { + if pvc.DeletionTimestamp != nil { + continue + } + + if pvc.Labels == nil { + continue + } + + if val, exist := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.XInstanceIdLabelKey); !exist { + continue + } else if val != id { + continue + } + + if _, exist := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.SubResourcePvcTemplateHashLabelKey); !exist { + continue + } + hash, _ := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.SubResourcePvcTemplateHashLabelKey) + pvcTmpName, err := pc.extractPvcTmpName(xset, pvc) + if err != nil { + return nil, nil, err + } + + // classify into updated and old pvcs + if newTmpHash[pvcTmpName] == hash { + newPvcs[pvcTmpName] = pvc + } else { + oldPvcs[pvcTmpName] = pvc + } + } + + return newPvcs, oldPvcs, nil +} + +func (pc *RealPvcControl) extractPvcTmpName(xset api.XSetObject, pvc *corev1.PersistentVolumeClaim) (string, error) { + if pvcTmpName, exist := pc.xsetLabelAnnoMgr.Get(pvc.Labels, api.SubResourcePvcTemplateLabelKey); exist { + return pvcTmpName, nil + } + lastDashIndex := strings.LastIndex(pvc.Name, "-") + if lastDashIndex == -1 { + return "", fmt.Errorf("pvc %s has no postfix", pvc.Name) + } + + rest := pvc.Name[:lastDashIndex] + if !strings.HasPrefix(rest, xset.GetName()+"-") { + return "", fmt.Errorf("malformed pvc name %s, expected a part of CollaSet name %s", pvc.Name, xset.GetName()) + } + + return strings.TrimPrefix(rest, xset.GetName()+"-"), nil +} + +func PvcTmpHash(pvc *corev1.PersistentVolumeClaim) (string, error) { + bytes, err := json.Marshal(pvc) + if err != nil { + return "", fmt.Errorf("fail to marshal pvc template: %w", err) + } + + hf := fnv.New32() + if _, err = hf.Write(bytes); err != nil { + return "", fmt.Errorf("fail to calculate pvc template hash: %w", err) + } + + return rand.SafeEncodeString(fmt.Sprint(hf.Sum32())), nil +} + +func PvcTmpHashMapping(pvcTmps []corev1.PersistentVolumeClaim) (map[string]string, error) { + pvcHashMapping := map[string]string{} + for i := range pvcTmps { + pvcTmp := pvcTmps[i] + hash, err := PvcTmpHash(&pvcTmp) + if err != nil { + return nil, err + } + pvcHashMapping[pvcTmp.Name] = hash + } + return pvcHashMapping, nil +} + +func deletePvcWithExpectations(ctx context.Context, client client.Client, xset api.XSetObject, expectations *expectations.CacheExpectations, pvc *corev1.PersistentVolumeClaim) error { + if err := client.Delete(ctx, pvc); err != nil { + return err + } + + // expect deletion + if err := expectations.ExpectDeletion(kubeutilclient.ObjectKeyString(xset), PVCGvk, pvc.GetNamespace(), pvc.GetName()); err != nil { + return err + } + return nil +} + +func setUpCache(cache cache.Cache, controller api.XSetController) error { + if err := cache.IndexField(context.TODO(), &corev1.PersistentVolumeClaim{}, FieldIndexOwnerRefUID, func(object client.Object) []string { + ownerRef := metav1.GetControllerOf(object) + if ownerRef == nil || ownerRef.Kind != controller.XSetMeta().Kind { + return nil + } + return []string{string(ownerRef.UID)} + }); err != nil { + return fmt.Errorf("failed to index by field for pvc->xset %s: %s", FieldIndexOwnerRefUID, err.Error()) + } + return nil +} diff --git a/xset/synccontrols/sync_control.go b/xset/synccontrols/sync_control.go index 66ae9a46..aa7b4ac0 100644 --- a/xset/synccontrols/sync_control.go +++ b/xset/synccontrols/sync_control.go @@ -44,6 +44,7 @@ import ( "kusionstack.io/kube-utils/xset/api" "kusionstack.io/kube-utils/xset/opslifecycle" "kusionstack.io/kube-utils/xset/resourcecontexts" + "kusionstack.io/kube-utils/xset/subresources" "kusionstack.io/kube-utils/xset/xcontrol" ) @@ -64,6 +65,7 @@ type SyncControl interface { func NewRealSyncControl(reconcileMixIn *mixin.ReconcilerMixin, xsetController api.XSetController, xControl xcontrol.TargetControl, + pvcControl subresources.PvcControl, xsetLabelAnnoManager api.XSetLabelAnnotationManager, resourceContexts resourcecontexts.ResourceContextControl, cacheExpectations expectations.CacheExpectationsInterface, @@ -93,6 +95,7 @@ func NewRealSyncControl(reconcileMixIn *mixin.ReconcilerMixin, xsetLabelAnnoMgr: xsetLabelAnnoManager, resourceContextControl: resourceContexts, xControl: xControl, + pvcControl: pvcControl, updateConfig: updateConfig, cacheExpectations: cacheExpectations, @@ -109,6 +112,7 @@ var _ SyncControl = &RealSyncControl{} type RealSyncControl struct { mixin.ReconcilerMixin xControl xcontrol.TargetControl + pvcControl subresources.PvcControl xsetController api.XSetController xsetLabelAnnoMgr api.XSetLabelAnnotationManager resourceContextControl resourcecontexts.ResourceContextControl @@ -137,8 +141,22 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje return false, fmt.Errorf("fail to get filtered Targets: %w", err) } - // TODO list, adopt and retain pvcs pvcs (for pods) + // sync subresource + // 1. list pvcs using ownerReference + // 2. adopt and retain orphaned pvcs according to PVC retention policy + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + var existingPvcs, adoptedPvcs []*corev1.PersistentVolumeClaim + if existingPvcs, err = r.pvcControl.GetFilteredPvcs(ctx, instance); err != nil { + return false, fmt.Errorf("fail to get filtered subresource PVCs: %w", err) + } + if adoptedPvcs, err = r.pvcControl.AdoptPvcsLeftByRetainPolicy(ctx, instance); err != nil { + return false, fmt.Errorf("fail to adopt orphaned left by whenDelete retention policy PVCs: %w", err) + } + syncContext.ExistingPvcs = append(syncContext.ExistingPvcs, existingPvcs...) + syncContext.ExistingPvcs = append(syncContext.ExistingPvcs, adoptedPvcs...) + } + // sync include exclude targets toExcludeTargetNames, toIncludeTargetNames, err := r.dealIncludeExcludeTargets(ctx, instance, syncContext.FilteredTarget) if err != nil { return false, fmt.Errorf("fail to deal with include exclude targets: %s", err.Error()) @@ -194,7 +212,14 @@ func (r *RealSyncControl) SyncTargets(ctx context.Context, instance api.XSetObje } } - // TODO delete unused pvcs (for pods) + // delete unused pvcs + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + err = r.pvcControl.DeleteTargetUnusedPvcs(ctx, instance, target, syncContext.ExistingPvcs) + if err != nil { + return false, fmt.Errorf("fail to delete unused pvcs %w", err) + } + } + targetWrappers = append(targetWrappers, &targetWrapper{ Object: target, ID: id, @@ -315,7 +340,7 @@ func (r *RealSyncControl) allowIncludeExcludeTargets(ctx context.Context, xset a continue } else if err != nil { r.Recorder.Eventf(xset, corev1.EventTypeWarning, "ExcludeIncludeFailed", fmt.Sprintf("failed to find target %s: %s", targetNames[i], err.Error())) - return + return allowTargets, notAllowTargets, err } // check allowance for target @@ -325,7 +350,36 @@ func (r *RealSyncControl) allowIncludeExcludeTargets(ctx context.Context, xset a notAllowTargets.Insert(targetName) continue } - allowTargets.Insert(targetName) + + // check allowance for subresource + pvcsAllowed := true + if adapter, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + volumes := adapter.GetXSpecVolumes(target) + for i := range volumes { + volume := volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvc := &corev1.PersistentVolumeClaim{} + err = r.Client.Get(ctx, types.NamespacedName{Namespace: target.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}, pvc) + // if pvc not found, ignore it. In case of pvc is filtered by controller-mesh + if apierrors.IsNotFound(err) { + continue + } else if err != nil { + r.Recorder.Eventf(target, corev1.EventTypeWarning, "ExcludeIncludeNotAllowed", fmt.Sprintf("failed to check allowed to exclude/include from/to xset %s/%s: %s", xset.GetNamespace(), xset.GetName(), err.Error())) + pvcsAllowed = false + } + if allowed, reason := fn(pvc, xset.GetName(), xset.GetObjectKind().GroupVersionKind().Kind, labelMgr); !allowed { + r.Recorder.Eventf(target, corev1.EventTypeWarning, "ExcludeIncludeNotAllowed", fmt.Sprintf("failed to check allowed to exclude/include from/to xset %s/%s: %s", xset.GetNamespace(), xset.GetName(), reason)) + pvcsAllowed = false + } + } + } + if pvcsAllowed { + allowTargets.Insert(targetName) + } else { + notAllowTargets.Insert(targetName) + } } return allowTargets, notAllowTargets, nil } @@ -465,8 +519,13 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, if err != nil { return fmt.Errorf("fail to new Target from revision %s: %w", revision.GetName(), err) } - - // TODO create pvcs for targets (pod) + // create pvcs for targets (pod) + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + err = r.pvcControl.CreateTargetPvcs(ctx, xsetObject, target, syncContext.ExistingPvcs) + if err != nil { + return fmt.Errorf("fail to create PVCs for target %s: %w", target.GetName(), err) + } + } newTarget := target.DeepCopyObject().(client.Object) logger.Info("try to create Target with revision of "+r.xsetGVK.Kind, "revision", revision.GetName()) if target, err = r.xControl.CreateTarget(ctx, newTarget); err != nil { @@ -590,8 +649,19 @@ func (r *RealSyncControl) Scale(ctx context.Context, xsetObject api.XSetObject, } r.Recorder.Eventf(xsetObject, corev1.EventTypeNormal, "TargetDeleted", "succeed to scale in Target %s/%s", target.GetNamespace(), target.GetName()) - // TODO delete pvcs if target is in update replace, or retention policy is "Deleted" - return r.cacheExpectations.ExpectDeletion(clientutil.ObjectKeyString(xsetObject), r.targetGVK, target.GetNamespace(), target.GetName()) + if err := r.cacheExpectations.ExpectDeletion(clientutil.ObjectKeyString(xsetObject), r.targetGVK, target.GetNamespace(), target.GetName()); err != nil { + return err + } + + // delete pvcs if target is in update replace, or retention policy is "Deleted" + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + _, replaceOrigin := r.xsetLabelAnnoMgr.Get(target.Object.GetLabels(), api.XReplacePairOriginName) + _, replaceNew := r.xsetLabelAnnoMgr.Get(target.Object.GetLabels(), api.XReplacePairNewId) + if replaceOrigin || replaceNew || !r.pvcControl.RetainPvcWhenXSetScaled(xsetObject) { + return r.pvcControl.DeleteTargetPvcs(ctx, xsetObject, target.Object, syncContext.ExistingPvcs) + } + } + return nil }) scaling = scaling || succCount > 0 @@ -643,7 +713,10 @@ func (r *RealSyncControl) Update(ctx context.Context, xsetObject api.XSetObject, } // 1. scan and analysis targets update info for active targets and PlaceHolder targets - targetUpdateInfos := r.attachTargetUpdateInfo(xsetObject, syncContext) + targetUpdateInfos, err := r.attachTargetUpdateInfo(xsetObject, syncContext) + if err != nil { + return false, recordedRequeueAfter, err + } // 2. decide Target update candidates candidates := r.decideTargetToUpdate(r.xsetController, xsetObject, targetUpdateInfos) @@ -655,7 +728,7 @@ func (r *RealSyncControl) Update(ctx context.Context, xsetObject api.XSetObject, // 3. filter already updated revision, for i, targetInfo := range targetToUpdate { // TODO check decoration and pvc template changed - if targetInfo.IsUpdatedRevision { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { continue } diff --git a/xset/synccontrols/types.go b/xset/synccontrols/types.go index e84969f1..23542f02 100644 --- a/xset/synccontrols/types.go +++ b/xset/synccontrols/types.go @@ -20,6 +20,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,9 +41,15 @@ type SyncContext struct { CurrentIDs sets.Int OwnedIds map[int]*api.ContextDetail + SubResources + NewStatus *api.XSetStatus } +type SubResources struct { + ExistingPvcs []*corev1.PersistentVolumeClaim +} + type targetWrapper struct { // parameters must be set during creation client.Object @@ -75,6 +82,7 @@ type targetUpdateInfo struct { UpdateRevision *appsv1.ControllerRevision // TODO decoration revisions + SubResourcesChanged // indicates operate is allowed for TargetOpsLifecycle. IsAllowUpdateOps bool @@ -92,3 +100,8 @@ type targetUpdateInfo struct { // replace origin target ReplacePairOriginTargetName string } + +type SubResourcesChanged struct { + // indicate if the pvc template changed + PvcTmpHashChanged bool +} diff --git a/xset/synccontrols/x_replace.go b/xset/synccontrols/x_replace.go index fc95d4c3..1d02e577 100644 --- a/xset/synccontrols/x_replace.go +++ b/xset/synccontrols/x_replace.go @@ -36,6 +36,7 @@ import ( controllerutils "kusionstack.io/kube-utils/controller/utils" "kusionstack.io/kube-utils/xset/api" "kusionstack.io/kube-utils/xset/opslifecycle" + "kusionstack.io/kube-utils/xset/subresources" ) func (r *RealSyncControl) cleanReplaceTargetLabels( @@ -162,7 +163,13 @@ func (r *RealSyncControl) replaceOriginTargets( r.xsetLabelAnnoMgr.Set(newTarget, api.XCreatingLabel, strconv.FormatInt(time.Now().UnixNano(), 10)) r.resourceContextControl.Put(newTargetContext, api.EnumRevisionContextDataKey, replaceRevision.GetName()) - // TODO create pvcs for new target (pod) + // create pvcs for new target + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + err = r.pvcControl.CreateTargetPvcs(ctx, instance, newTarget, syncContext.ExistingPvcs) + if err != nil { + return fmt.Errorf("fail to create PVCs for target %s: %w", newTarget.GetName(), err) + } + } if newCreatedTarget, err := r.xControl.CreateTarget(ctx, newTarget); err == nil { r.Recorder.Eventf(originTarget, diff --git a/xset/synccontrols/x_scale.go b/xset/synccontrols/x_scale.go index 605633ca..3901e49b 100644 --- a/xset/synccontrols/x_scale.go +++ b/xset/synccontrols/x_scale.go @@ -22,6 +22,8 @@ import ( "sort" "strconv" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -33,10 +35,11 @@ import ( controllerutils "kusionstack.io/kube-utils/controller/utils" "kusionstack.io/kube-utils/xset/api" "kusionstack.io/kube-utils/xset/opslifecycle" + "kusionstack.io/kube-utils/xset/subresources" ) // getTargetsToDelete -// 1. finds number of diff targets from filteredPods to do scaleIn +// 1. finds number of diff targets from filteredTargets to do scaleIn // 2. finds targets allowed to scale in out of diff func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filteredTargets []*targetWrapper, replaceMapping map[string]*targetWrapper, diff int) []*targetWrapper { var countedTargets []*targetWrapper @@ -55,7 +58,7 @@ func (r *RealSyncControl) getTargetsToDelete(xsetObject api.XSetObject, filtered // 2. select targets to delete in second round according to replace, delete, exclude var needDeleteTargets []*targetWrapper for i, target := range countedTargets { - // find pods to be scaleIn out of diff, is allowed to ops + // find targets to be scaleIn out of diff, is allowed to ops spec := r.xsetController.GetXSetSpec(xsetObject) _, allowed := opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.scaleInLifecycleAdapter, ptr.Deref(spec.ScaleStrategy.OperationDelaySeconds, 0), target) if i >= diff && !allowed { @@ -160,8 +163,35 @@ func (r *RealSyncControl) excludeTarget(ctx context.Context, xsetObject api.XSet return err } + // exclude subresource + if adapter, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + volumes := adapter.GetXSpecVolumes(target) + for i := range volumes { + volume := volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvc := &corev1.PersistentVolumeClaim{} + err := r.Client.Get(ctx, types.NamespacedName{Namespace: target.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}, pvc) + // If pvc not found, ignore it. In case of pvc is filtered out by controller-mesh + if apierrors.IsNotFound(err) { + continue + } else if err != nil { + return err + } + + r.xsetLabelAnnoMgr.Set(pvc, api.XOrphanedIndicationLabelKey, "true") + if err := r.pvcControl.OrphanPvc(ctx, xsetObject, pvc); err != nil { + return err + } + } + } + r.xsetLabelAnnoMgr.Set(target, api.XOrphanedIndicationLabelKey, "true") - return r.xControl.OrphanTarget(xsetObject, target) + if err := r.xControl.OrphanTarget(xsetObject, target); err != nil { + return err + } + return r.cacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(xsetObject), r.targetGVK, target.GetNamespace(), target.GetName(), target.GetResourceVersion()) } // includeTarget try to include a target into xset @@ -171,9 +201,37 @@ func (r *RealSyncControl) includeTarget(ctx context.Context, xsetObject api.XSet return err } + // exclude subresource + if adapter, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + volumes := adapter.GetXSpecVolumes(target) + for i := range volumes { + volume := volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvc := &corev1.PersistentVolumeClaim{} + err := r.Client.Get(ctx, types.NamespacedName{Namespace: target.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}, pvc) + // If pvc not found, ignore it. In case of pvc is filtered out by controller-mesh + if apierrors.IsNotFound(err) { + continue + } else if err != nil { + return err + } + + r.xsetLabelAnnoMgr.Set(pvc, api.XInstanceIdLabelKey, instanceId) + r.xsetLabelAnnoMgr.Delete(pvc.GetLabels(), api.XOrphanedIndicationLabelKey) + if err := r.pvcControl.AdoptPvc(ctx, xsetObject, pvc); err != nil { + return err + } + } + } + r.xsetLabelAnnoMgr.Set(target, api.XInstanceIdLabelKey, instanceId) r.xsetLabelAnnoMgr.Delete(target.GetLabels(), api.XOrphanedIndicationLabelKey) - return r.xControl.AdoptTarget(xsetObject, target) + if err := r.xControl.AdoptTarget(xsetObject, target); err != nil { + return err + } + return r.cacheExpectations.ExpectUpdation(clientutil.ObjectKeyString(xsetObject), r.targetGVK, target.GetNamespace(), target.GetName(), target.GetResourceVersion()) } // reclaimScaleStrategy updates targetToDelete, targetToExclude, targetToInclude in scaleStrategy diff --git a/xset/synccontrols/x_update.go b/xset/synccontrols/x_update.go index 73bf256b..01df9abc 100644 --- a/xset/synccontrols/x_update.go +++ b/xset/synccontrols/x_update.go @@ -41,12 +41,13 @@ import ( "kusionstack.io/kube-utils/xset/api" "kusionstack.io/kube-utils/xset/opslifecycle" "kusionstack.io/kube-utils/xset/resourcecontexts" + "kusionstack.io/kube-utils/xset/subresources" "kusionstack.io/kube-utils/xset/xcontrol" ) const UnknownRevision = "__unknownRevision__" -func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, syncContext *SyncContext) []*targetUpdateInfo { +func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, syncContext *SyncContext) ([]*targetUpdateInfo, error) { activeTargets := FilterOutActiveTargetWrappers(syncContext.TargetWrappers) targetUpdateInfoList := make([]*targetUpdateInfo, len(activeTargets)) @@ -88,10 +89,17 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync "target is going to be updated by recreate because: (1) controller-revision-hash label not found, or (2) not found in history revisions") } + var err error spec := r.xsetController.GetXSetSpec(xsetObject) // decide whether the TargetOpsLifecycle is during ops or not updateInfo.RequeueForOperationDelay, updateInfo.IsAllowUpdateOps = opslifecycle.AllowOps(r.updateConfig.xsetLabelAnnoMgr, r.updateLifecycleAdapter, ptr.Deref(spec.UpdateStrategy.OperationDelaySeconds, 0), target) - // TODO check pvc template changed + // check subresource pvc template changed + if _, enabled := subresources.GetSubresourcePvcAdapter(r.xsetController); enabled { + updateInfo.PvcTmpHashChanged, err = r.pvcControl.IsTargetPvcTmpChanged(xsetObject, target.Object, syncContext.ExistingPvcs) + if err != nil { + return nil, err + } + } targetUpdateInfoList[i] = updateInfo } @@ -143,7 +151,7 @@ func (r *RealSyncControl) attachTargetUpdateInfo(xsetObject api.XSetObject, sync targetUpdateInfoList = append(targetUpdateInfoList, updateInfo) } - return targetUpdateInfoList + return targetUpdateInfoList, nil } func filterOutPlaceHolderUpdateInfos(targets []*targetUpdateInfo) []*targetUpdateInfo { @@ -367,7 +375,7 @@ func (u *GenericTargetUpdater) FilterAllowOpsTargets(ctx context.Context, candid targetInfo.IsAllowUpdateOps = true - if targetInfo.IsUpdatedRevision { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { continue } @@ -486,6 +494,10 @@ func (u *inPlaceIfPossibleUpdater) FulfillTargetUpdatedInfo(_ context.Context, r return fmt.Errorf("fail to build Target from updated revision %s: %v", targetUpdateInfo.UpdateRevision.GetName(), err.Error()) } + if targetUpdateInfo.PvcTmpHashChanged { + targetUpdateInfo.InPlaceUpdateSupport, targetUpdateInfo.OnlyMetadataChanged = false, false + } + newUpdatedTarget := targetUpdateInfo.targetWrapper.Object.DeepCopyObject().(client.Object) if err = merge.ThreeWayMergeToTarget(currentTarget, UpdatedTarget, newUpdatedTarget, u.xsetController.NewXObject()); err != nil { return fmt.Errorf("fail to patch Target %s/%s: %v", targetUpdateInfo.GetNamespace(), targetUpdateInfo.GetName(), err.Error()) @@ -615,7 +627,7 @@ func (u *replaceUpdateTargetUpdater) BeginUpdateTarget(ctx context.Context, sync func (u *replaceUpdateTargetUpdater) FilterAllowOpsTargets(_ context.Context, candidates []*targetUpdateInfo, _ map[int]*api.ContextDetail, _ *SyncContext, targetCh chan *targetUpdateInfo) (requeueAfter *time.Duration, err error) { activeTargetToUpdate := filterOutPlaceHolderUpdateInfos(candidates) for i, targetInfo := range activeTargetToUpdate { - if targetInfo.IsUpdatedRevision { + if targetInfo.IsUpdatedRevision && !targetInfo.PvcTmpHashChanged { continue } diff --git a/xset/xcontrol/target_control.go b/xset/xcontrol/target_control.go index 0c1661f1..a2a9fe33 100644 --- a/xset/xcontrol/target_control.go +++ b/xset/xcontrol/target_control.go @@ -97,7 +97,7 @@ func (r *targetControl) GetFilteredTargets(ctx context.Context, selector *metav1 return nil, fmt.Errorf("target list items is invalid") } - // todo filterOutInactiveTargets + items = filterOutInactiveTargets(r.xsetController, items) targets, err := r.getTargets(items, selector, owner) return targets, err } @@ -195,7 +195,19 @@ func setUpCache(cache cache.Cache, controller api.XSetController) error { } return []string{string(ownerRef.UID)} }); err != nil { - return fmt.Errorf("failed to index by field %s: %s", FieldIndexOwnerRefUID, err.Error()) + return fmt.Errorf("failed to index by field for x->xset %s: %s", FieldIndexOwnerRefUID, err.Error()) } return nil } + +func filterOutInactiveTargets(xsetController api.XSetController, targets []client.Object) []client.Object { + var filteredTarget []client.Object + for i := range targets { + target := targets[i] + if xsetController.CheckInactive(target) { + continue + } + filteredTarget = append(filteredTarget, target) + } + return filteredTarget +} diff --git a/xset/xset_controller.go b/xset/xset_controller.go index d4dc0951..0cf30b09 100644 --- a/xset/xset_controller.go +++ b/xset/xset_controller.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -44,6 +45,7 @@ import ( "kusionstack.io/kube-utils/xset/api/validation" "kusionstack.io/kube-utils/xset/resourcecontexts" "kusionstack.io/kube-utils/xset/revisionowner" + "kusionstack.io/kube-utils/xset/subresources" "kusionstack.io/kube-utils/xset/synccontrols" "kusionstack.io/kube-utils/xset/xcontrol" ) @@ -59,6 +61,7 @@ type xSetCommonReconciler struct { // reconcile logic helpers cacheExpectations *expectations.CacheExpectations targetControl xcontrol.TargetControl + pvcControl subresources.PvcControl syncControl synccontrols.SyncControl revisionManager history.HistoryManager resourceContextControl resourcecontexts.ResourceContextControl @@ -87,7 +90,11 @@ func SetUpWithManager(mgr ctrl.Manager, xsetController api.XSetController) error } cacheExpectations := expectations.NewxCacheExpectations(reconcilerMixin.Client, reconcilerMixin.Scheme, clock.RealClock{}) resourceContextControl := resourcecontexts.NewRealResourceContextControl(reconcilerMixin.Client, xsetController, resourceContextAdapter, resourceContextGVK, cacheExpectations) - syncControl := synccontrols.NewRealSyncControl(reconcilerMixin, xsetController, targetControl, xsetLabelManager, resourceContextControl, cacheExpectations) + pvcControl, err := subresources.NewRealPvcControl(reconcilerMixin, cacheExpectations, xsetLabelManager, xsetController) + if err != nil { + return errors.New("failed to create pvc control") + } + syncControl := synccontrols.NewRealSyncControl(reconcilerMixin, xsetController, targetControl, pvcControl, xsetLabelManager, resourceContextControl, cacheExpectations) revisionControl := history.NewRevisionControl(reconcilerMixin.Client, reconcilerMixin.Client) revisionOwner := revisionowner.NewRevisionOwner(xsetController, targetControl) revisionManager := history.NewHistoryManager(revisionControl, revisionOwner) @@ -98,6 +105,7 @@ func SetUpWithManager(mgr ctrl.Manager, xsetController api.XSetController) error XSetController: xsetController, meta: xsetController.XSetMeta(), finalizerName: xsetController.FinalizerName(), + pvcControl: pvcControl, syncControl: syncControl, revisionManager: revisionManager, resourceContextControl: resourceContextControl, @@ -165,21 +173,21 @@ func (r *xSetCommonReconciler) Reconcile(ctx context.Context, req reconcile.Requ } if instance.GetDeletionTimestamp() != nil { - if err := r.ensureReclaimTargetsDeletion(ctx, instance); err != nil { - // reclaim targets deletion before remove finalizers - return ctrl.Result{}, err - } if controllerutil.ContainsFinalizer(instance, r.finalizerName) { // reclaim owner IDs in ResourceContextControl if err := r.resourceContextControl.UpdateToTargetContext(ctx, instance, nil); err != nil { return ctrl.Result{}, err } - if err := clientutil.RemoveFinalizerAndUpdate(ctx, r.Client, instance, r.finalizerName); err != nil { + if cleaned, err := r.ensureReclaimTargetsDeletion(ctx, instance); !cleaned || err != nil { + // reclaim targets deletion before remove finalizers + return ctrl.Result{}, err + } + // reclaim target sub resources before remove finalizers + if err := r.ensureReclaimTargetSubResources(ctx, instance); err != nil { return ctrl.Result{}, err } } - - return ctrl.Result{}, nil + return ctrl.Result{}, clientutil.RemoveFinalizerAndUpdate(ctx, r.Client, instance, r.finalizerName) } if !controllerutil.ContainsFinalizer(instance, r.finalizerName) { @@ -238,13 +246,61 @@ func (r *xSetCommonReconciler) doSync(ctx context.Context, instance api.XSetObje return scaleRequeueAfter, err } -func (r *xSetCommonReconciler) ensureReclaimTargetsDeletion(ctx context.Context, instance api.XSetObject) error { +func (r *xSetCommonReconciler) ensureReclaimTargetSubResources(ctx context.Context, xset api.XSetObject) error { + if _, enabled := subresources.GetSubresourcePvcAdapter(r.XSetController); enabled { + err := r.ensureReclaimPvcs(ctx, xset) + if err != nil { + return err + } + } + return nil +} + +// ensureReclaimPvcs removes xset ownerReference from pvcs if RetainPvcWhenXSetDeleted. +// This allows pvcs to be retained for other xsets with same pvc template. +func (r *xSetCommonReconciler) ensureReclaimPvcs(ctx context.Context, xset api.XSetObject) error { + if !r.pvcControl.RetainPvcWhenXSetDeleted(xset) { + return nil + } + var needReclaimPvcs []*corev1.PersistentVolumeClaim + pvcs, err := r.pvcControl.GetFilteredPvcs(ctx, xset) + if err != nil { + return err + } + // reclaim pvcs if RetainPvcWhenXSetDeleted + for i := range pvcs { + owned := pvcs[i].OwnerReferences != nil && len(pvcs[i].OwnerReferences) > 0 + if owned { + needReclaimPvcs = append(needReclaimPvcs, pvcs[i]) + } + } + for i := range needReclaimPvcs { + if err := r.pvcControl.OrphanPvc(ctx, xset, needReclaimPvcs[i]); err != nil { + return err + } + } + return nil +} + +func (r *xSetCommonReconciler) ensureReclaimTargetsDeletion(ctx context.Context, instance api.XSetObject) (bool, error) { xSetSpec := r.XSetController.GetXSetSpec(instance) targets, err := r.targetControl.GetFilteredTargets(ctx, xSetSpec.Selector, instance) if err != nil { - return fmt.Errorf("fail to get filtered Targets: %s", err.Error()) + return false, fmt.Errorf("fail to get filtered Targets: %s", err.Error()) + } + // if targets are deleted, return true + if len(targets) == 0 { + return true, nil + } + // wait for all targets are terminating + for i := range targets { + target := targets[i] + if target.GetDeletionTimestamp() == nil { + r.Recorder.Eventf(instance, corev1.EventTypeNormal, "TargetsDeleted", "waiting for models to be deleted gracefully before xset deleted %s/%s", instance.GetNamespace(), instance.GetName()) + return false, r.syncControl.BatchDeleteTargetsByLabel(ctx, r.targetControl, targets) + } } - return r.syncControl.BatchDeleteTargetsByLabel(ctx, r.targetControl, targets) + return true, nil } func (r *xSetCommonReconciler) updateStatus(ctx context.Context, instance api.XSetObject, status *api.XSetStatus) error {