Skip to content

Commit 376e2a4

Browse files
committed
fix: prevent duplicate CNClaim binding to same CN Pod during migration
Add podClaimedByOthers() check to selectCN() and Finalize() to verify no other CNClaim references a Pod via spec.podName before binding or reclaiming it. This prevents the race condition where migration causes Pod labels to become stale while spec.podName still holds the true binding relationship. Fixes #581, #582, #585
1 parent 3de66ab commit 376e2a4

2 files changed

Lines changed: 160 additions & 0 deletions

File tree

pkg/controllers/cnclaim/controller.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ func (r *Actor) selectCN(ctx *recon.Context[*v1alpha1.CNClaim], orphans []corev1
161161
sortCNByPriority(c, idleCNs)
162162
for i := range idleCNs {
163163
pod := &idleCNs[i]
164+
// skip pod already referenced by another CNClaim's spec.podName
165+
if claimed, err := podClaimedByOthers(ctx, c.Namespace, pod.Name, c.Name); err != nil {
166+
return nil, errors.WrapPrefix(err, "error checking pod claims", 0)
167+
} else if claimed {
168+
ctx.Log.Info("skip pod claimed by other CNClaim", "podName", pod.Name)
169+
continue
170+
}
164171
if err := r.ensureOwnership(ctx, pod); err != nil {
165172
if apierrors.IsConflict(err) {
166173
ctx.Log.Info("CN pod is not up to date, try next", "podName", pod.Name)
@@ -342,6 +349,13 @@ func (r *Actor) Finalize(ctx *recon.Context[*v1alpha1.CNClaim]) (bool, error) {
342349
}
343350
for i := range ownedCNs {
344351
cn := ownedCNs[i]
352+
// skip reclaim if another CNClaim still references this pod via spec.podName
353+
if claimed, err := podClaimedByOthers(ctx, c.Namespace, cn.Name, c.Name); err != nil {
354+
return false, errors.WrapPrefix(err, "error checking pod claims", 0)
355+
} else if claimed {
356+
ctx.Log.Info("skip reclaim, pod still claimed by other CNClaim", "pod", cn.Name)
357+
continue
358+
}
345359
ctx.Log.Info("finalize CNClaim, reclaim bound CN", "cn", cn.Name)
346360
if err := r.reclaimCN(ctx, &cn); err != nil {
347361
return false, err
@@ -350,6 +364,25 @@ func (r *Actor) Finalize(ctx *recon.Context[*v1alpha1.CNClaim]) (bool, error) {
350364
return false, nil
351365
}
352366

367+
// podClaimedByOthers checks if the given pod is referenced by any CNClaim's
368+
// spec.podName other than excludeClaim in the same namespace.
369+
func podClaimedByOthers(cli recon.KubeClient, namespace, podName, excludeClaim string) (bool, error) {
370+
claimList := &v1alpha1.CNClaimList{}
371+
if err := cli.List(claimList, client.InNamespace(namespace)); err != nil {
372+
return false, err
373+
}
374+
for i := range claimList.Items {
375+
claim := &claimList.Items[i]
376+
if claim.Name == excludeClaim {
377+
continue
378+
}
379+
if claim.Spec.PodName == podName {
380+
return true, nil
381+
}
382+
}
383+
return false, nil
384+
}
385+
353386
func (r *Actor) patchStore(ctx *recon.Context[*v1alpha1.CNClaim], pod *corev1.Pod, req logpb.CNStateLabel) (*metadata.CNService, error) {
354387
cs, err := common.ResolveCNSet(ctx, pod)
355388
if err != nil {

pkg/controllers/cnclaim/controller_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,144 @@
1515
package cnclaim
1616

1717
import (
18+
"context"
1819
"math/rand"
1920
"testing"
2021

2122
"github.com/matrixorigin/matrixone-operator/api/core/v1alpha1"
2223
corev1 "k8s.io/api/core/v1"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/runtime"
2426
"k8s.io/utils/pointer"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
2529

2630
. "github.com/onsi/gomega"
2731
)
2832

33+
func newFakeClient(objs ...client.Object) client.Client {
34+
scheme := runtime.NewScheme()
35+
_ = v1alpha1.SchemeBuilder.AddToScheme(scheme)
36+
_ = corev1.AddToScheme(scheme)
37+
return fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
38+
}
39+
40+
// fakeKubeClient adapts client.Client to recon.KubeClient for testing.
41+
type fakeKubeClient struct {
42+
client.Client
43+
}
44+
45+
func (f *fakeKubeClient) Create(obj client.Object, opts ...client.CreateOption) error {
46+
return f.Client.Create(context.TODO(), obj, opts...)
47+
}
48+
func (f *fakeKubeClient) CreateOwned(obj client.Object, opts ...client.CreateOption) error {
49+
return f.Client.Create(context.TODO(), obj, opts...)
50+
}
51+
func (f *fakeKubeClient) Get(objKey client.ObjectKey, obj client.Object) error {
52+
return f.Client.Get(context.TODO(), objKey, obj)
53+
}
54+
func (f *fakeKubeClient) Update(obj client.Object, opts ...client.UpdateOption) error {
55+
return f.Client.Update(context.TODO(), obj, opts...)
56+
}
57+
func (f *fakeKubeClient) UpdateStatus(obj client.Object, opts ...client.SubResourceUpdateOption) error {
58+
return f.Client.Status().Update(context.TODO(), obj, opts...)
59+
}
60+
func (f *fakeKubeClient) Delete(obj client.Object, opts ...client.DeleteOption) error {
61+
return f.Client.Delete(context.TODO(), obj, opts...)
62+
}
63+
func (f *fakeKubeClient) List(objList client.ObjectList, opts ...client.ListOption) error {
64+
return f.Client.List(context.TODO(), objList, opts...)
65+
}
66+
func (f *fakeKubeClient) Patch(obj client.Object, mutateFn func() error, opts ...client.PatchOption) error {
67+
return mutateFn()
68+
}
69+
func (f *fakeKubeClient) Exist(objKey client.ObjectKey, kind client.Object) (bool, error) {
70+
err := f.Client.Get(context.TODO(), objKey, kind)
71+
if err != nil {
72+
return false, client.IgnoreNotFound(err)
73+
}
74+
return true, nil
75+
}
76+
77+
func Test_podClaimedByOthers(t *testing.T) {
78+
tests := []struct {
79+
name string
80+
claims []client.Object
81+
podName string
82+
excludeClaim string
83+
want bool
84+
}{
85+
{
86+
name: "no claims exist",
87+
claims: nil,
88+
podName: "pod-1",
89+
excludeClaim: "claim-a",
90+
want: false,
91+
},
92+
{
93+
name: "pod only claimed by self",
94+
claims: []client.Object{
95+
&v1alpha1.CNClaim{
96+
ObjectMeta: metav1.ObjectMeta{Name: "claim-a", Namespace: "ns"},
97+
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"}},
98+
},
99+
},
100+
podName: "pod-1",
101+
excludeClaim: "claim-a",
102+
want: false,
103+
},
104+
{
105+
name: "pod claimed by another claim",
106+
claims: []client.Object{
107+
&v1alpha1.CNClaim{
108+
ObjectMeta: metav1.ObjectMeta{Name: "claim-a", Namespace: "ns"},
109+
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"}},
110+
},
111+
&v1alpha1.CNClaim{
112+
ObjectMeta: metav1.ObjectMeta{Name: "claim-b", Namespace: "ns"},
113+
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-1"}},
114+
},
115+
},
116+
podName: "pod-1",
117+
excludeClaim: "claim-a",
118+
want: true,
119+
},
120+
{
121+
name: "pod not claimed by anyone",
122+
claims: []client.Object{
123+
&v1alpha1.CNClaim{
124+
ObjectMeta: metav1.ObjectMeta{Name: "claim-a", Namespace: "ns"},
125+
Spec: v1alpha1.CNClaimSpec{ClaimPodRef: v1alpha1.ClaimPodRef{PodName: "pod-2"}},
126+
},
127+
},
128+
podName: "pod-1",
129+
excludeClaim: "claim-a",
130+
want: false,
131+
},
132+
{
133+
name: "claim with empty podName is ignored",
134+
claims: []client.Object{
135+
&v1alpha1.CNClaim{
136+
ObjectMeta: metav1.ObjectMeta{Name: "claim-pending", Namespace: "ns"},
137+
Spec: v1alpha1.CNClaimSpec{},
138+
},
139+
},
140+
podName: "pod-1",
141+
excludeClaim: "claim-a",
142+
want: false,
143+
},
144+
}
145+
for _, tt := range tests {
146+
t.Run(tt.name, func(t *testing.T) {
147+
g := NewGomegaWithT(t)
148+
cli := newFakeClient(tt.claims...)
149+
got, err := podClaimedByOthers(&fakeKubeClient{cli}, "ns", tt.podName, tt.excludeClaim)
150+
g.Expect(err).NotTo(HaveOccurred())
151+
g.Expect(got).To(Equal(tt.want))
152+
})
153+
}
154+
}
155+
29156
func Test_sortCNByPriority(t *testing.T) {
30157
tests := []struct {
31158
name string

0 commit comments

Comments
 (0)