Skip to content

Commit 6f11e0d

Browse files
authored
fix(store): cannot scale in pending pod of store (#6531)
1 parent a908a7a commit 6f11e0d

12 files changed

Lines changed: 219 additions & 62 deletions

File tree

pkg/controllers/common/store_offline.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,16 @@ import (
3939
// - B1: state == Preparing || state == Serving
4040
// - B2: state == Removing
4141
// - B3: state == Removed
42+
// - B4: store does not exist
4243
// 1. (A1, B1): do nothing, waiting for changes of spec.offline
4344
// 2. (A2, B1): call delete store api
4445
// 3. (A1, B2): call cancel api
4546
// 4. (A2, B2): do nothing, waiting for changes of state
4647
// 5. (A1, B3): do nothing, waiting for instance being removed
4748
// 6. (A2, B3): do nothing, waiting for instance being removed
49+
// 7. (A1, B4): do nothing, waiting for pod being created
50+
// 8. (A2, B4): do nothing, waiting for instance being removed
51+
// nolint:gocyclo // refactor later if necessary
4852
func TaskOfflineStore[
4953
S scope.Instance[F, T],
5054
F client.Object,
@@ -57,31 +61,39 @@ func TaskOfflineStore[
5761
storeID string,
5862
state pdv1.NodeState,
5963
) error {
60-
if storeID == "" {
61-
// can do nothing until store id is found
62-
return fmt.Errorf("%w: store does not exists", task.ErrWait)
63-
}
6464
isOffline := coreutil.IsOffline[S](obj)
6565

6666
switch {
67+
case !isOffline && storeID == "":
68+
// do nothing because store does not exist
69+
return nil
70+
71+
case isOffline && storeID == "":
72+
return fmt.Errorf("%w: store does not exist and is offline", task.ErrWait)
73+
6774
case isOffline && (state == pdv1.NodeStatePreparing || state == pdv1.NodeStateServing):
6875
if err := c.DeleteStore(ctx, storeID); err != nil {
6976
return err
7077
}
7178

7279
return fmt.Errorf("%w: node state should be changed to Removing", task.ErrWait)
80+
7381
case isOffline && state == pdv1.NodeStateRemoving:
7482
return fmt.Errorf("%w: node state should be changed to Removed", task.ErrWait)
83+
7584
case isOffline && state == pdv1.NodeStateRemoved:
7685
return nil
86+
7787
case !isOffline && (state == pdv1.NodeStatePreparing || state == pdv1.NodeStateServing):
7888
// do nothing
7989
return nil
90+
8091
case !isOffline && state == pdv1.NodeStateRemoving:
8192
if err := c.CancelDeleteStore(ctx, storeID); err != nil {
8293
return err
8394
}
8495
return fmt.Errorf("%w: node state should be changed to Preparing or Serving", task.ErrWait)
96+
8597
case !isOffline && state == pdv1.NodeStateRemoved:
8698
// store has been removed, cannot cancel the deletion
8799
return nil
@@ -108,19 +120,18 @@ func TaskInstanceConditionOffline[
108120
isOffline := coreutil.IsOffline[S](instance)
109121
state := s.GetStoreState()
110122

111-
if state == "" {
112-
return task.Wait().With("wait for state is synced")
113-
}
114-
115123
var needUpdate, isCompleted bool
116124
var reason string
117125
switch {
126+
case isOffline && state == "":
127+
reason = v1alpha1.ReasonOfflineCompleted
128+
isCompleted = true
118129
case state == pdv1.NodeStateRemoved:
119130
reason = v1alpha1.ReasonOfflineCompleted
120131
isCompleted = true
121132
case isOffline:
122133
reason = v1alpha1.ReasonOfflineProcessing
123-
case !isOffline && (state == pdv1.NodeStatePreparing || state == pdv1.NodeStateServing):
134+
case !isOffline && (state == "" || state == pdv1.NodeStatePreparing || state == pdv1.NodeStateServing):
124135
reason = ""
125136
case !isOffline && state == pdv1.NodeStateRemoving:
126137
reason = v1alpha1.ReasonOfflineCanceling

pkg/controllers/common/store_offline_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ func TestTaskOfflineStore(t *testing.T) {
5656
isWaitErr bool
5757
}{
5858
{
59-
desc: "empty store id",
59+
desc: "spec.offline == false, empty store id",
60+
obj: newFakeStore(false, nil),
61+
hasErr: false,
62+
},
63+
{
64+
desc: "spec.offline == true, empty store id",
65+
obj: newFakeStore(true, nil),
6066
hasErr: true,
6167
isWaitErr: true,
6268
},
@@ -169,11 +175,18 @@ func TestTaskInstanceConditionOffline(t *testing.T) {
169175
expectedObj *v1alpha1.TiKV
170176
}{
171177
{
172-
desc: "empty state",
178+
desc: "spec.offline == false, empty state",
173179
obj: newFakeStore(false, nil),
174-
expectedStatus: task.SWait,
180+
expectedStatus: task.SComplete,
175181
expectedObj: newFakeStore(false, nil),
176182
},
183+
{
184+
desc: "spec.offline == true, empty state",
185+
obj: newFakeStore(true, nil),
186+
expectedStatusChanged: true,
187+
expectedStatus: task.SComplete,
188+
expectedObj: newFakeStore(true, coreutil.Offlined()),
189+
},
177190
{
178191
desc: "spec.offline == false, state == Preparing",
179192
obj: newFakeStore(false, nil),

pkg/controllers/tiflash/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
7676
// only set ready if pd is synced
7777
task.If(PDIsSynced(state),
7878
common.TaskInstanceConditionReady[scope.TiFlash](state),
79+
common.TaskInstanceConditionOffline[scope.TiFlash](state),
7980
),
8081
common.TaskInstanceConditionRunning[scope.TiFlash](state),
81-
common.TaskInstanceConditionOffline[scope.TiFlash](state),
8282
tasks.TaskStatus(state, r.Client),
8383
)
8484

pkg/controllers/tikv/builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
4343
tasks.TaskFinalizerDel(state, r.Client),
4444
),
4545

46-
// if instance is deleting and store is removed
46+
// if instance is not deleting but store is offlined
4747
task.IfBreak(common.CondObjectIsNotDeletingButOfflined[scope.TiKV](state),
4848
common.TaskDeleteOfflinedStore[scope.TiKV](state, r.Client),
4949
),
@@ -86,9 +86,9 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
8686
// only set ready if pd is synced
8787
task.If(PDIsSynced(state),
8888
common.TaskInstanceConditionReady[scope.TiKV](state),
89+
common.TaskInstanceConditionOffline[scope.TiKV](state),
8990
),
9091
common.TaskInstanceConditionRunning[scope.TiKV](state),
91-
common.TaskInstanceConditionOffline[scope.TiKV](state),
9292
tasks.TaskStatus(state, r.Client),
9393
)
9494

pkg/controllers/tikv/tasks/evict_leader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func TaskEvictLeader(state *ReconcileContext) task.Task {
4646
}
4747

4848
// TaskEndEvictLeader only be called when object is deleting and store has been removed
49+
// TODO(liubo02): it's not stable because status.ID may be lost
4950
func TaskEndEvictLeader(state *ReconcileContext) task.Task {
5051
return task.NameTaskFunc("EndEvictLeader", func(ctx context.Context) task.Result {
5152
msg := "ensure evict leader scheduler doesn't exist"

pkg/controllers/tikv/tasks/pod.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,6 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task {
9292

9393
if !reloadable.CheckTiKVPod(state.TiKV(), pod) {
9494
logger.Info("will recreate the pod")
95-
if err := k8s.RemoveFinalizer(ctx, c, pod); err != nil {
96-
return task.Fail().With("cannot remove finalizer: %w", err)
97-
}
9895
regionCount := 0
9996
if state.Store != nil {
10097
regionCount = state.Store.RegionCount

pkg/updater/builder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ func (b *builder[T, O, R]) Build() Executor {
7171

7272
updatePolicies := b.updatePreferPolicies
7373
updatePolicies = append(updatePolicies, PreferNotRunning[R](), PreferUnready[R]())
74+
scaleInPolicies := b.scaleInPreferPolicies
75+
scaleInPolicies = append(scaleInPolicies, PreferNotRunning[R](), PreferUnready[R]())
7476
actor := &actor[T, O, R]{
7577
c: b.c,
7678
f: b.f,
@@ -86,7 +88,7 @@ func (b *builder[T, O, R]) Build() Executor {
8688
updateHooks: append(b.updateHooks, KeepName[R](), KeepTopology[R](), KeepResourceVersion[R]()),
8789
delHooks: b.delHooks,
8890

89-
scaleInSelector: NewSelector(b.scaleInPreferPolicies...),
91+
scaleInSelector: NewSelector(scaleInPolicies...),
9092
updateSelector: NewSelector(updatePolicies...),
9193
}
9294
return NewExecutor(

tests/e2e/data/tikv.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package data
1616

1717
import (
18+
corev1 "k8s.io/api/core/v1"
1819
"k8s.io/apimachinery/pkg/api/resource"
1920
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021
"k8s.io/utils/ptr"
@@ -108,3 +109,34 @@ s3-region = "local"
108109
`
109110
})
110111
}
112+
113+
func WithTiKVPodAntiAffinity() GroupPatch[*v1alpha1.TiKVGroup] {
114+
return GroupPatchFunc[*v1alpha1.TiKVGroup](func(obj *v1alpha1.TiKVGroup) {
115+
if obj.Spec.Template.Spec.Overlay == nil {
116+
obj.Spec.Template.Spec.Overlay = &v1alpha1.Overlay{}
117+
}
118+
o := obj.Spec.Template.Spec.Overlay
119+
if o.Pod == nil {
120+
o.Pod = &v1alpha1.PodOverlay{}
121+
}
122+
if o.Pod.Spec == nil {
123+
o.Pod.Spec = &corev1.PodSpec{}
124+
}
125+
126+
o.Pod.Spec.Affinity = &corev1.Affinity{
127+
PodAntiAffinity: &corev1.PodAntiAffinity{
128+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
129+
{
130+
LabelSelector: &metav1.LabelSelector{
131+
MatchLabels: map[string]string{
132+
"pingcap.com/component": "tikv",
133+
"pingcap.com/group": obj.GetName(),
134+
},
135+
},
136+
TopologyKey: "kubernetes.io/hostname",
137+
},
138+
},
139+
},
140+
}
141+
})
142+
}

tests/e2e/e2e_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
// TODO: move all tests into suite
3838
_ "github.com/pingcap/tidb-operator/tests/e2e/suite/availability"
3939
_ "github.com/pingcap/tidb-operator/tests/e2e/suite/cluster"
40+
_ "github.com/pingcap/tidb-operator/tests/e2e/suite/scale"
4041
)
4142

4243
func TestE2E(t *testing.T) {

tests/e2e/suite/scale/tikv.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package scale
16+
17+
import (
18+
"context"
19+
"time"
20+
21+
"github.com/onsi/ginkgo/v2"
22+
"k8s.io/utils/ptr"
23+
24+
"github.com/pingcap/tidb-operator/pkg/client"
25+
"github.com/pingcap/tidb-operator/pkg/runtime"
26+
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
27+
"github.com/pingcap/tidb-operator/tests/e2e/data"
28+
"github.com/pingcap/tidb-operator/tests/e2e/framework"
29+
"github.com/pingcap/tidb-operator/tests/e2e/label"
30+
"github.com/pingcap/tidb-operator/tests/e2e/utils/waiter"
31+
)
32+
33+
var _ = ginkgo.Describe("Scale TiKV", label.TiKV, label.MultipleAZ, label.P0, label.Scale, func() {
34+
f := framework.New()
35+
f.Setup()
36+
37+
ginkgo.It("support scale out from 3 to 4", func(ctx context.Context) {
38+
pdg := f.MustCreatePD(ctx)
39+
kvg := f.MustCreateTiKV(ctx,
40+
data.WithReplicas[scope.TiKVGroup](3),
41+
data.WithTiKVEvenlySpreadPolicy(),
42+
)
43+
44+
f.WaitForPDGroupReady(ctx, pdg)
45+
f.WaitForTiKVGroupReady(ctx, kvg)
46+
47+
f.MustEvenlySpreadTiKV(ctx, kvg)
48+
49+
patch := client.MergeFrom(kvg.DeepCopy())
50+
kvg.Spec.Replicas = ptr.To[int32](4)
51+
52+
f.Must(f.Client.Patch(ctx, kvg, patch))
53+
f.WaitForTiKVGroupReady(ctx, kvg)
54+
f.MustEvenlySpreadTiKV(ctx, kvg)
55+
})
56+
57+
ginkgo.It("support scale in from 4 to 3", func(ctx context.Context) {
58+
pdg := f.MustCreatePD(ctx)
59+
kvg := f.MustCreateTiKV(ctx,
60+
data.WithReplicas[scope.TiKVGroup](4),
61+
data.WithTiKVEvenlySpreadPolicy(),
62+
)
63+
64+
f.WaitForPDGroupReady(ctx, pdg)
65+
f.WaitForTiKVGroupReady(ctx, kvg)
66+
67+
f.MustEvenlySpreadTiKV(ctx, kvg)
68+
69+
patch := client.MergeFrom(kvg.DeepCopy())
70+
kvg.Spec.Replicas = ptr.To[int32](3)
71+
72+
f.Must(f.Client.Patch(ctx, kvg, patch))
73+
f.WaitForTiKVGroupReady(ctx, kvg)
74+
f.MustEvenlySpreadTiKV(ctx, kvg)
75+
})
76+
77+
ginkgo.It("support scale in from 4 to 3 with a pending pod", func(ctx context.Context) {
78+
pdg := f.MustCreatePD(ctx)
79+
kvg := f.MustCreateTiKV(ctx,
80+
data.WithReplicas[scope.TiKVGroup](3),
81+
data.WithTiKVEvenlySpreadPolicy(),
82+
data.WithTiKVPodAntiAffinity(),
83+
)
84+
85+
f.WaitForPDGroupReady(ctx, pdg)
86+
f.WaitForTiKVGroupReady(ctx, kvg)
87+
88+
f.MustEvenlySpreadTiKV(ctx, kvg)
89+
90+
patch := client.MergeFrom(kvg.DeepCopy())
91+
kvg.Spec.Replicas = ptr.To[int32](4)
92+
f.Must(f.Client.Patch(ctx, kvg, patch))
93+
94+
// just wait a few seconds
95+
time.Sleep(time.Second * 15)
96+
97+
patch2 := client.MergeFrom(kvg.DeepCopy())
98+
kvg.Spec.Replicas = ptr.To[int32](3)
99+
100+
f.Must(f.Client.Patch(ctx, kvg, patch2))
101+
f.WaitForTiKVGroupReady(ctx, kvg)
102+
f.MustEvenlySpreadTiKV(ctx, kvg)
103+
})
104+
105+
ginkgo.It("support scale from 3 to 6 and rolling update at same time", ginkgo.Serial, label.Update, func(ctx context.Context) {
106+
pdg := f.MustCreatePD(ctx)
107+
kvg := f.MustCreateTiKV(ctx,
108+
data.WithReplicas[scope.TiKVGroup](3),
109+
data.WithTiKVEvenlySpreadPolicy(),
110+
)
111+
112+
f.WaitForPDGroupReady(ctx, pdg)
113+
f.WaitForTiKVGroupReady(ctx, kvg)
114+
f.MustEvenlySpreadTiKV(ctx, kvg)
115+
116+
patch := client.MergeFrom(kvg.DeepCopy())
117+
kvg.Spec.Replicas = ptr.To[int32](6)
118+
kvg.Spec.Template.Annotations = map[string]string{
119+
"test": "test",
120+
}
121+
122+
nctx, cancel := context.WithCancel(ctx)
123+
ch := make(chan struct{})
124+
go func() {
125+
defer close(ch)
126+
defer ginkgo.GinkgoRecover()
127+
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromTiKVGroup(kvg), 3, 0, waiter.LongTaskTimeout))
128+
}()
129+
130+
changeTime, err := waiter.MaxPodsCreateTimestamp(ctx, f.Client, runtime.FromTiKVGroup(kvg))
131+
f.Must(err)
132+
133+
ginkgo.By("Change config and replicas of the TiKVGroup")
134+
f.Must(f.Client.Patch(ctx, kvg, patch))
135+
f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromTiKVGroup(kvg), *changeTime, waiter.LongTaskTimeout))
136+
f.WaitForTiKVGroupReady(ctx, kvg)
137+
cancel()
138+
<-ch
139+
140+
f.MustEvenlySpreadTiKV(ctx, kvg)
141+
})
142+
})

0 commit comments

Comments
 (0)