Skip to content

Commit 139fdfa

Browse files
chore: backport changes from upstream repo (#1138)
2 parents 92a5aea + 9933de9 commit 139fdfa

7 files changed

Lines changed: 398 additions & 14 deletions

File tree

pkg/controllers/updaterun/execution.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,19 @@ func (r *Reconciler) executeUpdatingStage(
170170
}
171171

172172
// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
173-
if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound ||
174-
!condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation) {
175-
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v, condition: %+v",
176-
clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted))))
177-
klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
178-
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
179-
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
173+
inSync := isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus)
174+
rolloutStarted := condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.Generation)
175+
if !inSync || !rolloutStarted || binding.Spec.State != placementv1beta1.BindingStateBound {
176+
// This issue mostly happens when there are concurrent updateRuns referencing the same clusterResourcePlacement but releasing different versions.
177+
// After the 1st updateRun updates the binding, and before the controller re-checks the binding status, the 2nd updateRun updates the same binding, and thus the 1st updateRun is preempted and observes the binding not matching the desired state.
178+
preemptedErr := controller.NewUserError(fmt.Errorf("the clusterResourceBinding of the updating cluster `%s` in the stage `%s` is not up-to-date with the desired status, "+
179+
"please check the status of binding `%s` and see if there is a concurrent updateRun referencing the same clusterResourcePlacement and updating the same cluster",
180+
clusterStatus.ClusterName, updatingStageStatus.StageName, klog.KObj(binding)))
181+
klog.ErrorS(preemptedErr, "The binding has been changed during updating",
182+
"bindingSpecInSync", inSync, "bindingState", binding.Spec.State,
183+
"bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
184+
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, preemptedErr.Error())
185+
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())
180186
}
181187

182188
finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)

pkg/controllers/workapplier/metrics.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
)
3030

3131
// Note (chenyu1):
32-
// the metrics for the work applier are added the assumptions that:
32+
// the metrics for the work applier are added under the assumptions that:
3333
// a) the applier should provide low-cardinality metrics only to keep the memory footprint low,
3434
// as there might be a high number (up to 10K) of works/manifests to process in a member cluster; and
3535
// b) keep the overhead of metrics collection low, esp. considering that the applier runs in
@@ -47,6 +47,8 @@ const (
4747
manifestDriftOrDiffDetectionStatusNotFound = "NotFound"
4848
)
4949

50+
// trackWorkAndManifestProcessingRequestMetrics tracks the work and manifest processing request metrics.
51+
// It is called right after the status of the work is refreshed.
5052
func trackWorkAndManifestProcessingRequestMetrics(work *fleetv1beta1.Work) {
5153
// Increment the work processing request counter.
5254

pkg/metrics/metrics.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,18 @@ var (
9393
// total number of work object processing requests.
9494
//
9595
// The following labels are available:
96-
// * apply_status: the apply status of the processing request; values can
97-
// be "applied", "failed", or "skipped".
96+
// * apply_status: the apply status of the processing request; see the list of
97+
// work apply condition reasons in the work applier source code
98+
// (pkg/controller/workapplier/controller.go) for possible values.
99+
// if the work object does not need to be applied, the value is "Skipped".
98100
// * availability_status: the availability check status of the processing request;
99-
// values can be "available", "unavailable", or "skipped".
101+
// see the list of availability check condition reasons in the work applier source
102+
// code (pkg/controller/workapplier/controller.go) for possible values.
103+
// if the work object does not need an availability check, the value is "Skipped".
100104
// * diff_reporting_status: the diff reporting status of the processing request;
101-
// values can be "reported", "failed", or "skipped".
105+
// see the list of diff reporting condition reasons in the work applier source
106+
// code (pkg/controller/workapplier/controller.go) for possible values.
107+
// if the work object does not need a diff reporting, the value is "Skipped".
102108
FleetWorkProcessingRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
103109
Name: "fleet_work_processing_requests_total",
104110
Help: "Total number of processing requests of work objects, including retries and periodic checks",
@@ -110,16 +116,19 @@ var (
110116
// The following labels are available:
111117
// * apply_status: the apply status of the processing request; see the list of
112118
// apply result types in the work applier source code for possible values.
119+
// if the manifest object does not need to be applied, the value is "Skipped".
113120
// * availability_status: the availability check status of the processing request;
114121
// see the list of availability check result types in the work applier source code
115122
// for possible values.
123+
// if the manifest object does not need an availability check, the value is "Skipped".
116124
// * diff_reporting_status: the diff reporting status of the processing request;
117125
// see the list of diff reporting result types in the work applier source code
118126
// for possible values.
127+
// if the manifest object does not need a diff reporting, the value is "Skipped".
119128
// * drift_detection_status: the drift detection status of the processing request;
120-
// values can be "found" and "not_found".
129+
// values can be "Found" and "NotFound".
121130
// * diff_detection_status: the diff detection status of the processing request;
122-
// values can be "found" and "not_found".
131+
// values can be "Found" and "NotFound".
123132
FleetManifestProcessingRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
124133
Name: "fleet_manifest_processing_requests_total",
125134
Help: "Total number of processing requests of manifest objects, including retries and periodic checks",

pkg/utils/common.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ var (
171171
Kind: placementv1beta1.ClusterResourcePlacementKind,
172172
}
173173

174+
ClusterResourcePlacementEvictionMetaGVK = metav1.GroupVersionKind{
175+
Group: placementv1beta1.GroupVersion.Group,
176+
Version: placementv1beta1.GroupVersion.Version,
177+
Kind: placementv1beta1.ClusterResourcePlacementEvictionKind,
178+
}
179+
174180
ConfigMapGVK = schema.GroupVersionKind{
175181
Group: corev1.GroupName,
176182
Version: corev1.SchemeGroupVersion.Version,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package validator provides utils to validate ClusterResourcePlacementEviction resources.
18+
package validator
19+
20+
import (
21+
"fmt"
22+
23+
"k8s.io/apimachinery/pkg/util/errors"
24+
25+
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
26+
)
27+
28+
// ValidateClusterResourcePlacementForEviction validates cluster resource placement fields for eviction and returns error.
29+
func ValidateClusterResourcePlacementForEviction(crp fleetv1beta1.ClusterResourcePlacement) error {
30+
allErr := make([]error, 0)
31+
32+
// Check Cluster Resource Placement is not deleting
33+
if crp.DeletionTimestamp != nil {
34+
allErr = append(allErr, fmt.Errorf("cluster resource placement %s is being deleted", crp.Name))
35+
return errors.NewAggregate(allErr)
36+
}
37+
// Check Cluster Resource Placement Policy
38+
if crp.Spec.Policy != nil {
39+
if crp.Spec.Policy.PlacementType == fleetv1beta1.PickFixedPlacementType {
40+
allErr = append(allErr, fmt.Errorf("cluster resource placement policy type %s is not supported", crp.Spec.Policy.PlacementType))
41+
}
42+
}
43+
44+
return errors.NewAggregate(allErr)
45+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package clusterresourceplacementeviction provides a validating webhook for the clusterresourceplacementeviction custom resource in the KubeFleet API group.
18+
package clusterresourceplacementeviction
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"net/http"
24+
25+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/klog/v2"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/controller-runtime/pkg/webhook"
31+
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
32+
33+
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
34+
"go.goms.io/fleet/pkg/utils"
35+
"go.goms.io/fleet/pkg/utils/condition"
36+
"go.goms.io/fleet/pkg/utils/validator"
37+
)
38+
39+
var (
40+
// ValidationPath is the webhook service path which admission requests are routed to for validating clusterresourceplacementeviction resources.
41+
ValidationPath = fmt.Sprintf(utils.ValidationPathFmt, fleetv1beta1.GroupVersion.Group, fleetv1beta1.GroupVersion.Version, "clusterresourceplacementeviction")
42+
)
43+
44+
type clusterResourcePlacementEvictionValidator struct {
45+
client client.Client
46+
decoder webhook.AdmissionDecoder
47+
}
48+
49+
// Add registers the webhook for K8s bulit-in object types.
50+
func Add(mgr manager.Manager) error {
51+
hookServer := mgr.GetWebhookServer()
52+
hookServer.Register(ValidationPath, &webhook.Admission{Handler: &clusterResourcePlacementEvictionValidator{mgr.GetClient(), admission.NewDecoder(mgr.GetScheme())}})
53+
return nil
54+
}
55+
56+
// Handle clusterResourcePlacementEvictionValidator checks to see if resource override is valid.
57+
func (v *clusterResourcePlacementEvictionValidator) Handle(ctx context.Context, req admission.Request) admission.Response {
58+
var crpe fleetv1beta1.ClusterResourcePlacementEviction
59+
klog.V(2).InfoS("Validating webhook handling cluster resource placement eviction", "operation", req.Operation, "clusterResourcePlacementEviction", req.Name)
60+
if err := v.decoder.Decode(req, &crpe); err != nil {
61+
klog.ErrorS(err, "Failed to decode cluster resource placement eviction object for validating fields", "userName", req.UserInfo.Username, "groups", req.UserInfo.Groups, "clusterResourcePlacementEviction", req.Name)
62+
return admission.Errored(http.StatusBadRequest, err)
63+
}
64+
65+
// Get the ClusterResourcePlacement object
66+
var crp fleetv1beta1.ClusterResourcePlacement
67+
if err := v.client.Get(ctx, types.NamespacedName{Name: crpe.Spec.PlacementName}, &crp); err != nil {
68+
if k8serrors.IsNotFound(err) {
69+
klog.V(2).InfoS(condition.EvictionInvalidMissingCRPMessage, "clusterResourcePlacementEviction", crpe.Name, "clusterResourcePlacement", crpe.Spec.PlacementName)
70+
return admission.Denied(err.Error())
71+
}
72+
return admission.Errored(http.StatusBadRequest, fmt.Errorf("failed to get clusterResourcePlacement %s for clusterResourcePlacementEviction %s: %w", crpe.Spec.PlacementName, crpe.Name, err))
73+
}
74+
75+
if err := validator.ValidateClusterResourcePlacementForEviction(crp); err != nil {
76+
klog.V(2).ErrorS(err, "ClusterResourcePlacement has invalid fields, request is denied", "operation", req.Operation, "clusterResourcePlacementEviction", crpe.Name)
77+
return admission.Denied(err.Error())
78+
}
79+
80+
klog.V(2).InfoS("ClusterResourcePlacementEviction has valid fields", "clusterResourcePlacementEviction", crpe.Name)
81+
return admission.Allowed("clusterResourcePlacementEviction has valid fields")
82+
}

0 commit comments

Comments
 (0)