Skip to content

Commit e6fb0df

Browse files
fix: failed to keep s3 bucket retain (#595)
* fix: failed to keep s3 bucket retain * chore: tidy go.mod after bucketclaim test dependency change
1 parent b8b1684 commit e6fb0df

6 files changed

Lines changed: 305 additions & 14 deletions

File tree

api/core/v1alpha1/logset_types.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,31 @@ func (l *LogSetSpec) GetStoreFailureTimeout() metav1.Duration {
8585
}
8686

8787
func (l *LogSetSpec) GetPVCRetentionPolicy() PVCRetentionPolicy {
88-
if l.PVCRetentionPolicy == nil {
89-
l.setDefaultRetentionPolicy()
88+
if l.PVCRetentionPolicy != nil {
89+
return *l.PVCRetentionPolicy
9090
}
91-
return *l.PVCRetentionPolicy
91+
// inherit from s3 policy if only s3 is set (e.g. old objects without pvcRetentionPolicy)
92+
if l.SharedStorage.S3 != nil && l.SharedStorage.S3.S3RetentionPolicy != nil {
93+
return *l.SharedStorage.S3.S3RetentionPolicy
94+
}
95+
return PVCRetentionPolicyDelete
9296
}
9397

9498
func (l *LogSetSpec) GetS3RetentionPolicy() *PVCRetentionPolicy {
9599
if l.SharedStorage.S3 == nil {
96100
return nil
97101
}
98-
if l.SharedStorage.S3.S3RetentionPolicy == nil {
99-
l.setDefaultRetentionPolicy()
102+
if l.SharedStorage.S3.S3RetentionPolicy != nil {
103+
p := *l.SharedStorage.S3.S3RetentionPolicy
104+
return &p
105+
}
106+
// inherit from pvc policy if only pvc is set (e.g. old objects without s3RetentionPolicy)
107+
if l.PVCRetentionPolicy != nil {
108+
p := *l.PVCRetentionPolicy
109+
return &p
100110
}
101-
return l.SharedStorage.S3.S3RetentionPolicy
111+
defaultPolicy := PVCRetentionPolicyDelete
112+
return &defaultPolicy
102113
}
103114

104115
type InitialConfig struct {

api/core/v1alpha1/logset_types_test.go

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 Matrix Origin
1+
// Copyright 2025-2026 Matrix Origin
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -105,3 +105,112 @@ func TestSetDefaultRetentionPolicy(t *testing.T) {
105105
}
106106
}
107107
}
108+
109+
func TestGetRetentionPolicy(t *testing.T) {
110+
del := PVCRetentionPolicyDelete
111+
retain := PVCRetentionPolicyRetain
112+
s3Tpl := func(policy *PVCRetentionPolicy) SharedStorageProvider {
113+
return SharedStorageProvider{S3: &S3Provider{S3RetentionPolicy: policy}}
114+
}
115+
116+
testCases := []struct {
117+
name string
118+
logset LogSetSpec
119+
pvcPolicy PVCRetentionPolicy
120+
s3Policy *PVCRetentionPolicy
121+
}{
122+
{
123+
name: "both nil with s3 returns delete",
124+
logset: LogSetSpec{SharedStorage: s3Tpl(nil)},
125+
pvcPolicy: del,
126+
s3Policy: &del,
127+
},
128+
{
129+
name: "pvc only delete inherits to s3 getter",
130+
logset: LogSetSpec{PVCRetentionPolicy: &del, SharedStorage: s3Tpl(nil)},
131+
pvcPolicy: del,
132+
s3Policy: &del,
133+
},
134+
{
135+
name: "s3 only retain inherits to pvc getter",
136+
logset: LogSetSpec{SharedStorage: s3Tpl(&retain)},
137+
pvcPolicy: retain,
138+
s3Policy: &retain,
139+
},
140+
{
141+
name: "both set delete and retain independently",
142+
logset: LogSetSpec{
143+
PVCRetentionPolicy: &del,
144+
SharedStorage: s3Tpl(&retain),
145+
},
146+
pvcPolicy: del,
147+
s3Policy: &retain,
148+
},
149+
{
150+
name: "both set retain and delete independently",
151+
logset: LogSetSpec{
152+
PVCRetentionPolicy: &retain,
153+
SharedStorage: s3Tpl(&del),
154+
},
155+
pvcPolicy: retain,
156+
s3Policy: &del,
157+
},
158+
{
159+
name: "both set retain",
160+
logset: LogSetSpec{
161+
PVCRetentionPolicy: &retain,
162+
SharedStorage: s3Tpl(&retain),
163+
},
164+
pvcPolicy: retain,
165+
s3Policy: &retain,
166+
},
167+
{
168+
name: "both set delete",
169+
logset: LogSetSpec{
170+
PVCRetentionPolicy: &del,
171+
SharedStorage: s3Tpl(&del),
172+
},
173+
pvcPolicy: del,
174+
s3Policy: &del,
175+
},
176+
{
177+
name: "no s3 returns nil s3 policy",
178+
logset: LogSetSpec{PVCRetentionPolicy: &retain},
179+
pvcPolicy: retain,
180+
s3Policy: nil,
181+
},
182+
{
183+
name: "no s3 and no pvc returns delete",
184+
logset: LogSetSpec{},
185+
pvcPolicy: del,
186+
s3Policy: nil,
187+
},
188+
}
189+
190+
for _, c := range testCases {
191+
t.Run(c.name, func(t *testing.T) {
192+
spec := c.logset
193+
origPVC := spec.PVCRetentionPolicy
194+
var origS3 *PVCRetentionPolicy
195+
if spec.SharedStorage.S3 != nil {
196+
origS3 = spec.SharedStorage.S3.S3RetentionPolicy
197+
}
198+
199+
assert.Equal(t, c.pvcPolicy, spec.GetPVCRetentionPolicy())
200+
201+
s3Policy := spec.GetS3RetentionPolicy()
202+
if c.s3Policy == nil {
203+
assert.Nil(t, s3Policy)
204+
} else {
205+
assert.NotNil(t, s3Policy)
206+
assert.Equal(t, *c.s3Policy, *s3Policy)
207+
}
208+
209+
// getters must not mutate spec
210+
assert.Equal(t, origPVC, spec.PVCRetentionPolicy)
211+
if spec.SharedStorage.S3 != nil {
212+
assert.Equal(t, origS3, spec.SharedStorage.S3.S3RetentionPolicy)
213+
}
214+
})
215+
}
216+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ require (
2323
github.com/openkruise/kruise-api v1.4.0
2424
github.com/prometheus/client_golang v1.17.0
2525
github.com/samber/lo v1.38.1
26-
github.com/stretchr/testify v1.9.0
2726
go.uber.org/multierr v1.11.0
2827
go.uber.org/zap v1.24.0
2928
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
@@ -161,6 +160,7 @@ require (
161160
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
162161
github.com/sirupsen/logrus v1.9.3 // indirect
163162
github.com/spf13/pflag v1.0.5 // indirect
163+
github.com/stretchr/testify v1.9.0 // indirect
164164
github.com/tidwall/btree v1.6.0 // indirect
165165
github.com/tklauser/go-sysconf v0.3.12 // indirect
166166
github.com/tklauser/numcpus v0.6.1 // indirect

pkg/controllers/bucketclaim/controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 Matrix Origin
1+
// Copyright 2025-2026 Matrix Origin
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -71,6 +71,16 @@ func (bca *Actor) Finalize(ctx *recon.Context[*v1alpha1.BucketClaim]) (bool, err
7171
return false, ctx.Update(bucket)
7272
}
7373

74+
// if bucket is Released (s3RetentionPolicy=Retain), skip data deletion and release the finalizer directly
75+
if bucket.Status.State == v1alpha1.StatusReleased {
76+
ctx.Log.Info(fmt.Sprintf("skip data deletion for released bucket %v (s3RetentionPolicy=Retain)", client.ObjectKeyFromObject(ctx.Obj)))
77+
controllerutil.RemoveFinalizer(bucket, v1alpha1.BucketDataFinalizer)
78+
if err := ctx.Update(bucket); err != nil {
79+
return false, err
80+
}
81+
return true, nil
82+
}
83+
7484
// if AnnAnyInstanceRunning is not set, indicates that there is no running pod instance found for its mo cluster
7585
// so there is no need to start a job to finalize bucket data.
7686
// NOTE: generally LogSet start successfully is a precondition of starting DN and CN, if no running pod found means that

pkg/controllers/bucketclaim/controller_test.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 Matrix Origin
1+
// Copyright 2025-2026 Matrix Origin
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -15,12 +15,71 @@
1515
package bucketclaim
1616

1717
import (
18+
"context"
1819
"testing"
20+
"time"
1921

22+
"github.com/golang/mock/gomock"
23+
"github.com/matrixorigin/controller-runtime/pkg/fake"
2024
"github.com/matrixorigin/matrixone-operator/api/core/v1alpha1"
21-
"github.com/stretchr/testify/assert"
25+
batchv1 "k8s.io/api/batch/v1"
26+
corev1 "k8s.io/api/core/v1"
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31+
"sigs.k8s.io/controller-runtime/pkg/client"
32+
33+
. "github.com/onsi/gomega"
2234
)
2335

36+
func TestActor_Finalize_ReleasedSkipsS3CleanupJob(t *testing.T) {
37+
g := NewGomegaWithT(t)
38+
39+
now := metav1.NewTime(time.Now())
40+
bucket := &v1alpha1.BucketClaim{
41+
ObjectMeta: metav1.ObjectMeta{
42+
Name: "test-bucket",
43+
Namespace: "default",
44+
Finalizers: []string{v1alpha1.BucketDataFinalizer},
45+
DeletionTimestamp: &now,
46+
Annotations: map[string]string{
47+
v1alpha1.AnnAnyInstanceRunning: "true",
48+
},
49+
},
50+
Spec: v1alpha1.BucketClaimSpec{
51+
S3: &v1alpha1.S3Provider{Path: "minio-bucket/test"},
52+
},
53+
Status: v1alpha1.BucketClaimStatus{
54+
State: v1alpha1.StatusReleased,
55+
},
56+
}
57+
58+
scheme := runtime.NewScheme()
59+
utilruntime.Must(v1alpha1.AddToScheme(scheme))
60+
utilruntime.Must(batchv1.AddToScheme(scheme))
61+
utilruntime.Must(corev1.AddToScheme(scheme))
62+
63+
cli := fake.KubeClientBuilder().WithScheme(scheme).WithObjects(bucket).Build()
64+
65+
mockCtrl := gomock.NewController(t)
66+
eventEmitter := fake.NewMockEventEmitter(mockCtrl)
67+
ctx := fake.NewContext(bucket, cli, eventEmitter)
68+
69+
actor := New()
70+
ok, err := actor.Finalize(ctx)
71+
g.Expect(err).To(Succeed())
72+
g.Expect(ok).To(BeTrue())
73+
74+
updated := &v1alpha1.BucketClaim{}
75+
err = cli.Get(context.TODO(), client.ObjectKeyFromObject(bucket), updated)
76+
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())
77+
78+
jobs := &batchv1.JobList{}
79+
g.Expect(cli.List(context.TODO(), jobs, client.InNamespace(bucket.Namespace))).To(Succeed())
80+
g.Expect(jobs.Items).To(BeEmpty())
81+
}
82+
2483
func TestParseEndpoint(t *testing.T) {
2584
s3 := v1alpha1.S3Provider{
2685
Path: "minio-mo/test",
@@ -56,6 +115,7 @@ else
56115
fi
57116
`
58117
endpoint, err := parseEntrypoint(&s3)
59-
assert.Nil(t, err)
60-
assert.Equal(t, expect, endpoint)
118+
g := NewGomegaWithT(t)
119+
g.Expect(err).To(Succeed())
120+
g.Expect(endpoint).To(Equal(expect))
61121
}

test/e2e/bucket_test.go

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 Matrix Origin
1+
// Copyright 2025-2026 Matrix Origin
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -104,6 +104,107 @@ var _ = Describe("Matrix BucketClaim test", func() {
104104
}, waitBucketStatusTimeout, time.Second*2).Should(Succeed())
105105
})
106106

107+
It("Should keep S3 data when deleting a Released BucketClaim with pvc=Delete and s3=Retain", func() {
108+
By("create logset with pvc=Delete and s3=Retain")
109+
minioSecret := e2eutil.MinioSecret(env.Namespace)
110+
Expect(kubeCli.Create(ctx, minioSecret)).To(Succeed())
111+
112+
minioProvider := e2eutil.MinioShareStorage(minioSecret.Name)
113+
policyDelete := v1alpha1.PVCRetentionPolicyDelete
114+
policyRetain := v1alpha1.PVCRetentionPolicyRetain
115+
minioProvider.S3.S3RetentionPolicy = &policyRetain
116+
ls := e2eutil.NewLogSetTpl(env.Namespace, fmt.Sprintf("%s:%s", moImageRepo, moVersion))
117+
ls.Spec.PVCRetentionPolicy = &policyDelete
118+
ls.Spec.SharedStorage = minioProvider
119+
Expect(kubeCli.Create(ctx, ls)).To(Succeed())
120+
121+
var bucket *v1alpha1.BucketClaim
122+
var err error
123+
Eventually(func() error {
124+
bucket, err = v1alpha1.ClaimedBucket(kubeCli, minioProvider.S3)
125+
if err != nil || bucket == nil {
126+
return fmt.Errorf("wait bucket creating for logset %v, %v", client.ObjectKeyFromObject(ls), err)
127+
}
128+
expectedStatus := v1alpha1.BucketClaimStatus{
129+
BindTo: v1alpha1.BucketBindToMark(ls.ObjectMeta),
130+
State: v1alpha1.StatusInUse,
131+
}
132+
if !reflect.DeepEqual(expectedStatus, bucket.Status) {
133+
return fmt.Errorf("bucket status is not inuse, current %v", bucket.Status)
134+
}
135+
return nil
136+
}, waitBucketStatusTimeout, time.Second*2).Should(Succeed())
137+
138+
By("put object to s3 path")
139+
object, err := e2eminio.PutObject(minioProvider.S3.Path)
140+
Expect(err).Should(BeNil())
141+
exist, err := e2eminio.IsObjectExist(object)
142+
Expect(err).Should(BeNil())
143+
Expect(exist).Should(BeTrue())
144+
145+
By("wait logset available to set any-instance-running annotation")
146+
Eventually(func() error {
147+
if err := kubeCli.Get(ctx, client.ObjectKeyFromObject(ls), ls); err != nil {
148+
return err
149+
}
150+
if len(ls.Status.AvailableStores) > 0 {
151+
return nil
152+
}
153+
return fmt.Errorf("wait logset pod in running state")
154+
}, createLogSetTimeout, pollInterval).Should(Succeed())
155+
Eventually(func() error {
156+
if err := kubeCli.Get(ctx, client.ObjectKeyFromObject(bucket), bucket); err != nil {
157+
return err
158+
}
159+
if bucket.Annotations[v1alpha1.AnnAnyInstanceRunning] == "" {
160+
return fmt.Errorf("wait any-instance-running annotation on bucket")
161+
}
162+
return nil
163+
}, waitBucketStatusTimeout, time.Second*2).Should(Succeed())
164+
165+
By("tear down logset cluster")
166+
Expect(kubeCli.Delete(ctx, ls)).To(Succeed())
167+
Eventually(func() error {
168+
return waitLogSetDeleted(ls)
169+
}, teardownClusterTimeout, pollInterval).Should(Succeed())
170+
171+
By("bucket should in released state")
172+
Eventually(func() error {
173+
if err := kubeCli.Get(ctx, client.ObjectKeyFromObject(bucket), bucket); err != nil {
174+
return err
175+
}
176+
if bucket.DeletionTimestamp != nil {
177+
return fmt.Errorf("bucket should not be deleted before explicit bucketclaim deletion")
178+
}
179+
expectedStatus := v1alpha1.BucketClaimStatus{
180+
BindTo: "",
181+
State: v1alpha1.StatusReleased,
182+
}
183+
if !reflect.DeepEqual(expectedStatus, bucket.Status) {
184+
return fmt.Errorf("bucket status is not released, current %v", bucket.Status)
185+
}
186+
return nil
187+
}, waitBucketStatusTimeout, time.Second*2).Should(Succeed())
188+
189+
By("delete released bucket claim with finalizer intact")
190+
Expect(bucket.Finalizers).To(ContainElement(v1alpha1.BucketDataFinalizer))
191+
Expect(kubeCli.Delete(ctx, bucket)).To(Succeed())
192+
193+
By("wait bucket claim been deleted")
194+
Eventually(func() error {
195+
err := kubeCli.Get(ctx, client.ObjectKeyFromObject(bucket), bucket)
196+
if apierrors.IsNotFound(err) {
197+
return nil
198+
}
199+
return fmt.Errorf("bucket should be deleted")
200+
}, waitBucketStatusTimeout, time.Second*2).Should(Succeed())
201+
202+
By("s3 object should still exist")
203+
exist, err = e2eminio.IsObjectExist(object)
204+
Expect(err).Should(BeNil())
205+
Expect(exist).Should(BeTrue())
206+
})
207+
107208
It("Should bucket been deleted use delete retain policy", func() {
108209
By("create logset cluster with minio provider")
109210
minioSecret := e2eutil.MinioSecret(env.Namespace)

0 commit comments

Comments
 (0)