Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/blang/semver/v4 v4.0.0
github.com/go-errors/errors v1.5.1
github.com/google/go-cmp v0.6.0
github.com/matrixorigin/controller-runtime v0.0.0-20240909085031-5f706d779ec6
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/onsi/gomega v1.27.7
Expand All @@ -31,7 +32,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand Down
82 changes: 62 additions & 20 deletions pkg/controllers/cnclaim/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ func (r *Actor) Sync(ctx *recon.Context[*v1alpha1.CNClaim]) error {
return recon.ErrReSync("pod status may be not update to date, wait", waitCacheTimeout)
}
c.Status.Phase = v1alpha1.CNClaimPhaseLost
if err := ctx.Patch(c, func() error {
c.Spec.PodName = ""
c.Spec.NodeName = ""
return nil
}); err != nil {
return errors.WrapPrefix(err, "error clearing lost claim spec", 0)
}
return nil
}
}
Expand Down Expand Up @@ -357,17 +364,24 @@ func (r *Actor) Finalize(ctx *recon.Context[*v1alpha1.CNClaim]) (bool, error) {
}
for i := range ownedCNs {
cn := ownedCNs[i]
// skip reclaim if another CNClaim still references this pod via spec.podName
// if another CNClaim references this pod via spec.podName, release our
// claimed-by label so the owning claim can take full ownership
if holder, ok := claimIndex[cn.Name]; ok {
ctx.Log.Info("skip reclaim, pod still claimed by other CNClaim", "pod", cn.Name, "holder", holder)
ctx.Log.Info("release pod label, pod claimed by other CNClaim", "pod", cn.Name, "holder", holder)
if err := ctx.Patch(&cn, func() error {
delete(cn.Labels, v1alpha1.PodClaimedByLabel)
return nil
}); err != nil {
return false, errors.WrapPrefix(err, "error releasing pod label", 0)
}
continue
}
ctx.Log.Info("finalize CNClaim, reclaim bound CN", "cn", cn.Name)
if err := r.reclaimCN(ctx, &cn); err != nil {
return false, err
}
}
return false, nil
return true, nil
}

// podClaimedByOthers checks if the given pod is referenced by any CNClaim's
Expand Down Expand Up @@ -453,27 +467,55 @@ func (r *Actor) patchStore(ctx *recon.Context[*v1alpha1.CNClaim], pod *corev1.Po
func (r *Actor) Start(mgr manager.Manager) error {
return recon.Setup(&v1alpha1.CNClaim{}, "cn-claim-manager", mgr, r,
recon.WithPredicate(predicate.ResourceVersionChangedPredicate{}),
recon.WithBuildFn(watchPodChange),
recon.WithBuildFn(watchPodChangeFn(mgr.GetClient())),
)
}

func watchPodChange(b *builder.Builder) {
b.Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok {
return nil
}
claimName, ok := pod.Labels[v1alpha1.PodClaimedByLabel]
if !ok {
return nil
func watchPodChangeFn(cli client.Reader) func(*builder.Builder) {
return func(b *builder.Builder) {
b.Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok {
return nil
}
var requests []reconcile.Request
if claimName, ok := pod.Labels[v1alpha1.PodClaimedByLabel]; ok {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: pod.Namespace,
Name: claimName,
},
})
}
claimList := &v1alpha1.CNClaimList{}
if err := cli.List(ctx, claimList, client.InNamespace(pod.Namespace)); err == nil {
for i := range claimList.Items {
c := &claimList.Items[i]
if c.Spec.PodName == pod.Name {
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: pod.Namespace,
Name: c.Name,
},
}
if !containsRequest(requests, req) {
requests = append(requests, req)
}
}
}
}
return requests
}), builder.WithPredicates(common.PodStatusChangedPredicate{}))
}
}

func containsRequest(reqs []reconcile.Request, req reconcile.Request) bool {
for _, r := range reqs {
if r.NamespacedName == req.NamespacedName {
return true
}
return []reconcile.Request{{
NamespacedName: types.NamespacedName{
Namespace: pod.Namespace,
Name: claimName,
},
}}
}), builder.WithPredicates(common.PodStatusChangedPredicate{}))
}
return false
}

func toStoreStatus(cn *metadata.CNService, pod *corev1.Pod) v1alpha1.CNStoreStatus {
Expand Down
187 changes: 187 additions & 0 deletions pkg/controllers/cnclaim/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -198,6 +200,191 @@ func Test_buildPodClaimIndex(t *testing.T) {
g.Expect(index).To(Equal(map[string]string{"pod-2": "other"}))
}

func Test_Finalize_releasesLabelWhenPodClaimedByOther(t *testing.T) {
g := NewGomegaWithT(t)

// Setup: claim-a is being deleted, owns pod-1 via label.
// claim-b references pod-1 via spec.podName (migration target).
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns",
Labels: map[string]string{
v1alpha1.CNPodPhaseLabel: v1alpha1.CNPodPhaseBound,
v1alpha1.PodClaimedByLabel: "claim-a",
},
},
}
claimA := &v1alpha1.CNClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claim-a",
Namespace: "ns",
},
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"}},
}
claimB := &v1alpha1.CNClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claim-b",
Namespace: "ns",
},
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"}},
}

cli := newFakeClient(pod, claimA, claimB)
kube := &fakeKubeClient{cli}

// Simulate what Finalize does: list owned pods, build claim index, release label
ownedCNs := []corev1.Pod{}
podList := &corev1.PodList{}
g.Expect(kube.List(podList, client.InNamespace("ns"), client.MatchingLabels{
v1alpha1.CNPodPhaseLabel: v1alpha1.CNPodPhaseBound,
v1alpha1.PodClaimedByLabel: "claim-a",
})).To(Succeed())
ownedCNs = podList.Items
g.Expect(ownedCNs).To(HaveLen(1))

// Build claim index excluding self (claim-a)
claimIndex, err := buildPodClaimIndex(kube, "ns", "claim-a")
g.Expect(err).NotTo(HaveOccurred())
// claim-b holds pod-1
g.Expect(claimIndex).To(HaveKeyWithValue("pod-1", "claim-b"))

// The Finalize fix: when pod is in claimIndex, release the claimed-by label
cn := ownedCNs[0]
_, inIndex := claimIndex[cn.Name]
g.Expect(inIndex).To(BeTrue())

// Simulate ctx.Patch — release the label
g.Expect(kube.Patch(&cn, func() error {
delete(cn.Labels, v1alpha1.PodClaimedByLabel)
return nil
})).To(Succeed())

// Verify: pod no longer has claimed-by label
g.Expect(cn.Labels).NotTo(HaveKey(v1alpha1.PodClaimedByLabel))

// After all pods processed, Finalize should return (true, nil) — verified by logic
}

func Test_Sync_clearsSpecOnPodNotFound(t *testing.T) {
g := NewGomegaWithT(t)

// Setup: claim references a pod that doesn't exist
claim := &v1alpha1.CNClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claim-lost",
Namespace: "ns",
},
Spec: v1alpha1.CNClaimSpec{
ClaimPodRef: v1alpha1.ClaimPodRef{
PodName: "pod-deleted",
NodeName: "node-1",
},
},
Status: v1alpha1.CNClaimStatus{
Phase: v1alpha1.CNClaimPhaseBound,
},
}

cli := newFakeClient(claim)
kube := &fakeKubeClient{cli}

// Simulate what Sync does on Pod NotFound:
// 1. ctx.Get(pod) returns NotFound
pod := &corev1.Pod{}
err := kube.Get(types.NamespacedName{Namespace: "ns", Name: "pod-deleted"}, pod)
g.Expect(err).To(HaveOccurred()) // NotFound

// 2. Set phase to Lost and clear spec via Patch
claim.Status.Phase = v1alpha1.CNClaimPhaseLost
g.Expect(kube.Patch(claim, func() error {
claim.Spec.PodName = ""
claim.Spec.NodeName = ""
return nil
})).To(Succeed())

// Verify: spec is cleared, phase is Lost
g.Expect(claim.Spec.PodName).To(BeEmpty())
g.Expect(claim.Spec.NodeName).To(BeEmpty())
g.Expect(claim.Status.Phase).To(Equal(v1alpha1.CNClaimPhaseLost))

// Verify: next Observe would route to Bind (since PodName is empty)
// This is the key behavioral guarantee of the fix
r := &Actor{}
_ = r // Actor.Observe checks ctx.Obj.Spec.PodName == ""
g.Expect(claim.Spec.PodName).To(Equal(""))
}

func Test_watchPodChangeFn_enqueuesClaimBySpecPodName(t *testing.T) {
g := NewGomegaWithT(t)

// Setup: pod with NO claimed-by label, but a CNClaim references it via spec.podName
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns",
Labels: map[string]string{},
},
}
claim := &v1alpha1.CNClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claim-refs-pod",
Namespace: "ns",
},
Spec: v1alpha1.CNClaimSpec{
ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"},
},
}

cli := newFakeClient(pod, claim)

// Simulate what watchPodChangeFn does:
// 1. No claimed-by label → no label-based request
var requests []reconcile.Request
if claimName, ok := pod.Labels[v1alpha1.PodClaimedByLabel]; ok {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: claimName},
})
}
g.Expect(requests).To(BeEmpty())

// 2. List CNClaims and find those referencing this pod via spec.podName
claimList := &v1alpha1.CNClaimList{}
g.Expect(cli.List(context.TODO(), claimList, client.InNamespace("ns"))).To(Succeed())
for i := range claimList.Items {
c := &claimList.Items[i]
if c.Spec.PodName == pod.Name {
req := reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: c.Name},
}
if !containsRequest(requests, req) {
requests = append(requests, req)
}
}
}

// Verify: claim-refs-pod is enqueued even without the label
g.Expect(requests).To(HaveLen(1))
g.Expect(requests[0].Name).To(Equal("claim-refs-pod"))
}

func Test_containsRequest(t *testing.T) {
g := NewGomegaWithT(t)
reqs := []reconcile.Request{
{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "claim-a"}},
{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "claim-b"}},
}
g.Expect(containsRequest(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "ns", Name: "claim-a"},
})).To(BeTrue())
g.Expect(containsRequest(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "ns", Name: "claim-c"},
})).To(BeFalse())
g.Expect(containsRequest(nil, reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "ns", Name: "claim-a"},
})).To(BeFalse())
}

func Test_sortCNByPriority(t *testing.T) {
tests := []struct {
name string
Expand Down
10 changes: 9 additions & 1 deletion pkg/controllers/cnclaimset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,13 @@ func (c *ClaimAndPod) scoreHasPod() int {

func (r *Actor) scaleIn(ctx *recon.Context[*v1alpha1.CNClaimSet], oc *ownedClaims, count int) error {
var cps []ClaimAndPod
var migrating []v1alpha1.CNClaim
for i := range oc.owned {
c := oc.owned[i]
if c.Spec.SourcePod != nil {
migrating = append(migrating, c)
continue
}
pod, err := getClaimedPod(ctx, &c)
if err != nil {
return errors.WrapPrefix(err, "error get claimed Pod", 0)
Expand All @@ -226,8 +231,10 @@ func (r *Actor) scaleIn(ctx *recon.Context[*v1alpha1.CNClaimSet], oc *ownedClaim
Pod: pod,
})
}
if len(migrating) > 0 {
ctx.Log.Info("skip migrating claims from scale-in", "count", len(migrating))
}
if count >= len(cps) {
// simply delete all claims
count = len(cps)
} else {
sortClaimsToDelete(cps)
Expand All @@ -246,6 +253,7 @@ func (r *Actor) scaleIn(ctx *recon.Context[*v1alpha1.CNClaimSet], oc *ownedClaim
for ; i < len(cps); i++ {
left = append(left, *cps[i].Claim)
}
left = append(left, migrating...)
oc.owned = left
return nil
}
Expand Down
Loading
Loading