From 81c39596f44f1a1b54de100a8ff68eed9c6ee410 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 5 May 2026 21:31:24 +0800 Subject: [PATCH 1/4] feat: support prechecking down peers before restarting tikv pod Signed-off-by: liubo02 --- api/core/v1alpha1/tikv_types.go | 9 +- pkg/controllers/tikv/builder.go | 2 +- pkg/controllers/tikv/tasks/ctx.go | 7 +- pkg/controllers/tikv/tasks/evict_leader.go | 71 ++++++-- .../tikv/tasks/evict_leader_test.go | 114 ++++++++++++ pkg/controllers/tikv/tasks/offline.go | 29 +-- pkg/controllers/tikv/tasks/pod.go | 58 +++++- pkg/controllers/tikv/tasks/pod_test.go | 171 +++++++++++++++++- pkg/controllers/tikv/tasks/status.go | 33 ---- pkg/controllers/tikv/tasks/util.go | 43 ++++- pkg/pdapi/v1/client.go | 22 ++- pkg/pdapi/v1/client_test.go | 30 +++ pkg/pdapi/v1/mock_generated.go | 15 ++ pkg/pdapi/v1/types.go | 16 ++ 14 files changed, 525 insertions(+), 95 deletions(-) create mode 100644 pkg/controllers/tikv/tasks/evict_leader_test.go diff --git a/api/core/v1alpha1/tikv_types.go b/api/core/v1alpha1/tikv_types.go index 57619816a3b..e5adc0d350f 100644 --- a/api/core/v1alpha1/tikv_types.go +++ b/api/core/v1alpha1/tikv_types.go @@ -45,9 +45,12 @@ const ( // condition TiKVCondLeadersEvicted = "LeadersEvicted" // reason - ReasonNotEvicted = "NotEvicted" - ReasonEvicting = "Evicting" - ReasonEvicted = "Evicted" + ReasonNotEvicted = "NotEvicted" + ReasonEvicting = "Evicting" + ReasonEvicted = "Evicted" + ReasonStoreNotExist = "StoreNotExist" + + // Deprecated: replaced by ReasonStoreNotExist ReasonStoreIsRemoved = "StoreIsRemoved" ) diff --git a/pkg/controllers/tikv/builder.go b/pkg/controllers/tikv/builder.go index 16d913524c6..77cdc799f47 100644 --- a/pkg/controllers/tikv/builder.go +++ b/pkg/controllers/tikv/builder.go @@ -88,7 +88,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task tasks.TaskOfflineStore(state, r.PDClientManager), tasks.TaskConfigMap(state, r.Client), common.TaskPVC[scope.TiKV](state, r.Client, r.VolumeModifierFactory, tasks.PVCNewer()), - tasks.TaskPod(state, r.Client), + tasks.TaskPod(state, r.Client, r.PDClientManager), tasks.TaskStoreLabels(state, r.Client, r.PDClientManager), tasks.TaskEvictLeader(state, r.PDClientManager), common.TaskInstanceConditionSynced[scope.TiKV](state), diff --git a/pkg/controllers/tikv/tasks/ctx.go b/pkg/controllers/tikv/tasks/ctx.go index 0f7145133fb..4b7a1c1b267 100644 --- a/pkg/controllers/tikv/tasks/ctx.go +++ b/pkg/controllers/tikv/tasks/ctx.go @@ -32,11 +32,13 @@ import ( type ReconcileContext struct { State - LeaderEvicting bool + ShouldEvictLeader bool + LeaderEvicting bool - Store *pdv1.Store PDSynced bool + Store *pdv1.Store + // IsStoreReady will be set only when pd is synced and the store is ok // It may be outdated so the tikv is healthy only when the pod is also available // If it's true and the pod is ready but not available, @@ -97,6 +99,7 @@ func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task if coreutil.ShouldSuspendCompute(state.Cluster()) { return task.Complete().With("cluster is suspending") } + scheduler, err := c.Underlay().GetEvictLeaderScheduler(ctx, state.Store.ID) if err != nil { return task.Fail().With("pd is unexpectedly crashed: %v", err) diff --git a/pkg/controllers/tikv/tasks/evict_leader.go b/pkg/controllers/tikv/tasks/evict_leader.go index 2557e838051..e30a5a34b32 100644 --- a/pkg/controllers/tikv/tasks/evict_leader.go +++ b/pkg/controllers/tikv/tasks/evict_leader.go @@ -16,7 +16,13 @@ package tasks import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" ) @@ -27,26 +33,30 @@ func TaskEvictLeader(state *ReconcileContext, m pdm.PDClientManager) task.Task { if !ok { return task.Wait().With("wait if pd client is not registered") } - switch { - case !state.PDSynced: - return task.Wait().With("pd is unsynced") - case state.Store == nil: + if state.Store == nil { return task.Complete().With("store has been deleted or not created") - case state.Instance().IsOffline() || state.IsPodTerminating(): - if !state.LeaderEvicting { - if err := pc.Underlay().BeginEvictLeader(ctx, state.Store.ID); err != nil { - return task.Fail().With("cannot add evict leader scheduler: %v", err) - } + } + + if state.ShouldEvictLeader && !state.LeaderEvicting { + if err := pc.Underlay().BeginEvictLeader(ctx, state.Store.ID); err != nil { + return task.Fail().With("cannot add evict leader scheduler: %v", err) } - return task.Complete().With("ensure evict leader scheduler exists") - default: - if state.LeaderEvicting { - if err := pc.Underlay().EndEvictLeader(ctx, state.Store.ID); err != nil { - return task.Fail().With("cannot remove evict leader scheduler: %v", err) - } + state.LeaderEvicting = true + } + + if state.LeaderEvicting && !state.ShouldEvictLeader { + if err := pc.Underlay().EndEvictLeader(ctx, state.Store.ID); err != nil { + return task.Fail().With("cannot remove evict leader scheduler: %v", err) } - return task.Complete().With("ensure evict leader scheduler doesn't exist") + state.LeaderEvicting = false } + + needUpdate := syncLeadersEvictedCond(state.TiKV(), state.Store, state.LeaderEvicting) + if needUpdate { + state.SetStatusChanged() + } + + return task.Complete().With("sync evict leader scheduler, expected: %v, actual: %v", state.ShouldEvictLeader, state.LeaderEvicting) }) } @@ -69,3 +79,32 @@ func TaskEndEvictLeader(state *ReconcileContext, m pdm.PDClientManager) task.Tas return task.Complete().With(msg) }) } + +// Status of this condition can only transfer as the below +func syncLeadersEvictedCond(tikv *v1alpha1.TiKV, store *pdv1.Store, isEvicting bool) bool { + status := metav1.ConditionFalse + reason := v1alpha1.ReasonNotEvicted + msg := "leaders are not all evicted" + switch { + case store == nil: + status = metav1.ConditionTrue + reason = v1alpha1.ReasonStoreNotExist + msg = "store does not exist" + case isEvicting && store.LeaderCount == 0: + status = metav1.ConditionTrue + reason = v1alpha1.ReasonEvicted + msg = "all leaders are evicted" + case isEvicting: + status = metav1.ConditionFalse + reason = v1alpha1.ReasonEvicting + msg = fmt.Sprintf("not all leaders are evicted, still: %v", store.LeaderCount) + } + + return meta.SetStatusCondition(&tikv.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiKVCondLeadersEvicted, + Status: status, + ObservedGeneration: tikv.Generation, + Reason: reason, + Message: msg, + }) +} diff --git a/pkg/controllers/tikv/tasks/evict_leader_test.go b/pkg/controllers/tikv/tasks/evict_leader_test.go new file mode 100644 index 00000000000..5ac4ebbc7fd --- /dev/null +++ b/pkg/controllers/tikv/tasks/evict_leader_test.go @@ -0,0 +1,114 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" + stateutil "github.com/pingcap/tidb-operator/v2/pkg/state" + pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" + "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +func TestTaskEvictLeader(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + expectBegin bool + expectEnd bool + expectEvicting bool + expectedStatus task.Status + }{ + { + desc: "begin evict leader when requested", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + ShouldEvictLeader: true, + PDSynced: true, + Store: &pdv1.Store{ + ID: "1", + }, + }, + expectBegin: true, + expectEvicting: true, + expectedStatus: task.SComplete, + }, + { + desc: "end evict leader when annotation is absent", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + PDSynced: true, + LeaderEvicting: true, + Store: &pdv1.Store{ + ID: "1", + }, + }, + expectEnd: true, + expectEvicting: false, + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctrl := gomock.NewController(tt) + mockPDClient := pdm.NewMockPDClient(ctrl) + mockUnderlay := pdapi.NewMockPDClient(ctrl) + mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() + if c.expectBegin { + mockUnderlay.EXPECT().BeginEvictLeader(gomock.Any(), "1").Return(nil) + } + if c.expectEnd { + mockUnderlay.EXPECT().EndEvictLeader(gomock.Any(), "1").Return(nil) + } + + s := c.state.State.(*state) + s.IPDClient = &stubPDClientState{client: mockPDClient} + + res, done := task.RunTask(context.Background(), TaskEvictLeader(c.state, nil)) + assert.Equal(tt, c.expectedStatus, res.Status()) + assert.False(tt, done) + assert.Equal(tt, c.expectEvicting, c.state.LeaderEvicting) + }) + } +} + +type stubPDClientState struct { + client pdm.PDClient +} + +func (s *stubPDClientState) GetPDClient(pdm.PDClientManager) (pdm.PDClient, bool) { + return s.client, true +} + +var _ stateutil.IPDClient = (*stubPDClientState)(nil) diff --git a/pkg/controllers/tikv/tasks/offline.go b/pkg/controllers/tikv/tasks/offline.go index 51b4d162386..2dbc4e09493 100644 --- a/pkg/controllers/tikv/tasks/offline.go +++ b/pkg/controllers/tikv/tasks/offline.go @@ -18,11 +18,6 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" - - "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/controllers/common" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" @@ -47,19 +42,13 @@ func TaskOfflineStore(state *ReconcileContext, m pdm.PDClientManager) task.Task // If the store is nil, it means the store has been deleted or not created yet. // No need to check if leaders are evicted. if state.Store != nil && isOffline { - var reason string - beginTime := getBeginEvictLeaderTime(tikv) - switch { - case state.LeaderEvicting && state.GetLeaderCount() == 0: - reason = "leaders have been all evicted" - case beginTime != nil && beginTime.Add(defaultLeaderEvictTimeout).Before(time.Now()): - reason = "leader eviction timeout" - } + state.ShouldEvictLeader = true - if reason == "" { + if err := CheckTiKVLeadersEvictedOrTimeout(tikv, defaultLeaderEvictTimeout); err != nil { return task.Retry(defaultLeaderEvictTimeout+jitter). - With("waiting for leaders evicted or timeout, current leader count: %d", state.GetLeaderCount()) + With("waiting for leaders evicted or timeout: %v", err) } + } pc, ok := state.GetPDClient(m) @@ -87,13 +76,3 @@ func TaskOfflineStore(state *ReconcileContext, m pdm.PDClientManager) task.Task return task.Complete().With("offline is completed or no need, spec.offline: %v", isOffline) }) } - -// getBeginEvictLeaderTime returns the time when the leader eviction started. -// If the condition is not found or the status is not False, it returns nil. -func getBeginEvictLeaderTime(tikv *v1alpha1.TiKV) *time.Time { - cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) - if cond != nil && cond.Status == metav1.ConditionFalse { - return ptr.To(cond.LastTransitionTime.Time) - } - return nil -} diff --git a/pkg/controllers/tikv/tasks/pod.go b/pkg/controllers/tikv/tasks/pod.go index 0349627ea01..9862813f024 100644 --- a/pkg/controllers/tikv/tasks/pod.go +++ b/pkg/controllers/tikv/tasks/pod.go @@ -33,9 +33,11 @@ import ( "github.com/pingcap/tidb-operator/v2/pkg/features" "github.com/pingcap/tidb-operator/v2/pkg/image" "github.com/pingcap/tidb-operator/v2/pkg/overlay" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" "github.com/pingcap/tidb-operator/v2/pkg/reloadable" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/v2/pkg/utils/map" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" @@ -68,10 +70,16 @@ func TaskSuspendPod(state *ReconcileContext, c client.Client) task.Task { }) } -func TaskPod(state *ReconcileContext, c client.Client) task.Task { +func TaskPod(state *ReconcileContext, c client.Client, cm pdm.PDClientManager) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) expected := newPod(state.Cluster(), state.TiKV(), state.Store, state.FeatureGates()) + + pc, ok := state.GetPDClient(cm) + if !ok { + return task.Wait().With("wait if pd client is not registered") + } + pod := state.Pod() if pod == nil { if err := c.Apply(ctx, expected); err != nil { @@ -91,6 +99,16 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { } if !reloadable.CheckTiKVPod(state.TiKV(), pod) { + state.ShouldEvictLeader = true + + if err := checkDownPeerCountIsZero(ctx, state.Store, pc); err != nil { + return task.Wait().With("cannot recreate pod, check down peer: %v", err) + } + + if err := CheckTiKVLeadersEvicted(state.TiKV()); err != nil { + return task.Wait().With("cannot recreate pod, check leader count: %v", err) + } + logger.Info("will recreate the pod") regionCount := 0 if state.Store != nil { @@ -116,6 +134,44 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { }) } +func checkDownPeerCountIsZero(ctx context.Context, store *pdv1.Store, pc pdm.PDClient) error { + downPeerInfo, err := pc.Underlay().GetDownPeerRegions(ctx) + if err != nil { + return fmt.Errorf("failed to get down peer info: %v", err) + } + downPeerCount := countNonSelfDownPeers(downPeerInfo, store) + if downPeerCount != 0 { + return fmt.Errorf("down peer is not zero: %v", downPeerCount) + } + + return nil +} + +func countNonSelfDownPeers(downPeerInfo *pdapi.RegionsCheckInfo, store *pdv1.Store) int { + if store == nil || store.ID == "" { + return downPeerInfo.Count + } + if downPeerInfo.Count == 0 { + return 0 + } + + nonSelfDownPeerCount := 0 + for _, region := range downPeerInfo.Regions { + for _, downPeer := range region.DownPeers { + if downPeer == nil || downPeer.Peer == nil { + nonSelfDownPeerCount++ + continue + } + if fmt.Sprint(downPeer.Peer.StoreId) == store.ID { + continue + } + nonSelfDownPeerCount++ + } + } + + return nonSelfDownPeerCount +} + func newPod(cluster *v1alpha1.Cluster, tikv *v1alpha1.TiKV, store *pdv1.Store, g features.Gates) *corev1.Pod { vols := []corev1.Volume{ { diff --git a/pkg/controllers/tikv/tasks/pod_test.go b/pkg/controllers/tikv/tasks/pod_test.go index 6f1c0ce9121..5f6c73bf347 100644 --- a/pkg/controllers/tikv/tasks/pod_test.go +++ b/pkg/controllers/tikv/tasks/pod_test.go @@ -19,8 +19,10 @@ import ( "fmt" "testing" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,9 +31,12 @@ import ( "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" "github.com/pingcap/tidb-operator/v2/pkg/features" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" + "github.com/pingcap/tidb-operator/v2/pkg/reloadable" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" stateutil "github.com/pingcap/tidb-operator/v2/pkg/state" pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" ) @@ -45,15 +50,18 @@ const ( func TestTaskPod(t *testing.T) { now := metav1.Now() cases := []struct { - desc string - state *ReconcileContext - objs []client.Object + desc string + state *ReconcileContext + objs []client.Object + downPeerCount int + downPeerInfo *pdapi.RegionsCheckInfo // if true, cannot apply pod unexpectedErr bool expectUpdatedPod bool expectedPodIsTerminating bool expectedStatus task.Status + expectedShouldEvict bool }{ { desc: "no pod", @@ -61,6 +69,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -76,6 +85,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -108,6 +118,7 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: false, }, { desc: "version is changed", @@ -115,6 +126,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -128,8 +140,10 @@ func TestTaskPod(t *testing.T) { }, }, + downPeerCount: 0, expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: true, }, { desc: "version is changed, failed to delete", @@ -137,6 +151,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -149,9 +164,11 @@ func TestTaskPod(t *testing.T) { ), }, }, + downPeerCount: 0, unexpectedErr: true, - expectedStatus: task.SFail, + expectedStatus: task.SFail, + expectedShouldEvict: true, }, { desc: "config changed, hot reload policy", @@ -185,6 +202,7 @@ func TestTaskPod(t *testing.T) { tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion obj.Spec.UpdateStrategy.Config = v1alpha1.ConfigUpdateStrategyRestart + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -200,8 +218,111 @@ func TestTaskPod(t *testing.T) { }, }, + downPeerCount: 0, + expectedPodIsTerminating: true, + expectedStatus: task.SWait, + expectedShouldEvict: true, + }, + { + desc: "version is changed but restart precheck is blocked by leader count", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + leaderCount: 1, + }, + Store: &pdv1.Store{RegionCount: 400}, + }, + + downPeerCount: 0, + expectedPodIsTerminating: false, + expectedStatus: task.SWait, + expectedShouldEvict: true, + }, + { + desc: "version is changed but restart precheck is blocked by down peer count", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 2, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 2, + Regions: []*pdapi.RegionCheckEntry{ + { + ID: 101, + DownPeers: []*pdapi.RegionPeerStat{ + {Peer: &metapb.Peer{StoreId: 2}}, + {Peer: &metapb.Peer{StoreId: 3}}, + }, + }, + }, + }, + expectedPodIsTerminating: false, + expectedStatus: task.SWait, + expectedShouldEvict: true, + }, + { + desc: "version is changed but restart precheck ignores self down peer", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 1, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 1, + Regions: []*pdapi.RegionCheckEntry{ + { + ID: 101, + DownPeers: []*pdapi.RegionPeerStat{ + {Peer: &metapb.Peer{StoreId: 1}}, + }, + }, + }, + }, expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: true, }, { desc: "pod labels changed, config not changed", @@ -233,8 +354,9 @@ func TestTaskPod(t *testing.T) { }, }, - expectUpdatedPod: true, - expectedStatus: task.SComplete, + expectUpdatedPod: true, + expectedStatus: task.SComplete, + expectedShouldEvict: false, }, { desc: "pod labels changed, config not changed, apply failed", @@ -267,7 +389,8 @@ func TestTaskPod(t *testing.T) { }, unexpectedErr: true, - expectedStatus: task.SFail, + expectedStatus: task.SFail, + expectedShouldEvict: false, }, { desc: "all are not changed", @@ -290,7 +413,8 @@ func TestTaskPod(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SComplete, + expectedShouldEvict: false, }, } @@ -312,17 +436,38 @@ func TestTaskPod(t *testing.T) { s := c.state.State.(*state) s.IFeatureGates = stateutil.NewFeatureGates[scope.TiKV](s) + ctrl := gomock.NewController(tt) + mockPDClient := pdm.NewMockPDClient(ctrl) + mockUnderlay := pdapi.NewMockPDClient(ctrl) + mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() + s.IPDClient = &stubPDClientState{client: mockPDClient} + + shouldQueryDownPeer := c.state.Pod() != nil && + c.state.Pod().GetDeletionTimestamp().IsZero() && + !reloadable.CheckTiKVPod(c.state.TiKV(), c.state.Pod()) + if shouldQueryDownPeer { + mockUnderlay.EXPECT(). + GetDownPeerRegions(gomock.Any()). + Return(func() *pdapi.RegionsCheckInfo { + if c.downPeerInfo != nil { + return c.downPeerInfo + } + return &pdapi.RegionsCheckInfo{Count: c.downPeerCount} + }(), nil) + } + if c.unexpectedErr { // cannot update pod fc.WithError("patch", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) fc.WithError("delete", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) } - res, done := task.RunTask(ctx, TaskPod(c.state, fc)) + res, done := task.RunTask(ctx, TaskPod(c.state, fc, nil)) assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), res.Message()) assert.False(tt, done, c.desc) assert.Equal(tt, c.expectedPodIsTerminating, c.state.IsPodTerminating(), c.desc) + assert.Equal(tt, c.expectedShouldEvict, c.state.ShouldEvictLeader, c.desc) if c.expectUpdatedPod { expectedPod := newPod(c.state.Cluster(), c.state.TiKV(), nil, c.state.FeatureGates()) @@ -339,3 +484,11 @@ func TestTaskPod(t *testing.T) { func fakePod(c *v1alpha1.Cluster, tikv *v1alpha1.TiKV) *corev1.Pod { return newPod(c, tikv, nil, features.NewFromFeatures(nil)) } + +func setLeadersEvicted(tikv *v1alpha1.TiKV) { + tikv.Status.Conditions = append(tikv.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiKVCondLeadersEvicted, + Status: metav1.ConditionTrue, + Reason: v1alpha1.ReasonEvicted, + }) +} diff --git a/pkg/controllers/tikv/tasks/status.go b/pkg/controllers/tikv/tasks/status.go index 6d136934796..978ecd4b9e2 100644 --- a/pkg/controllers/tikv/tasks/status.go +++ b/pkg/controllers/tikv/tasks/status.go @@ -26,7 +26,6 @@ import ( coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" - pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" "github.com/pingcap/tidb-operator/v2/pkg/utils/compare" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/v2/third_party/kubernetes/pkg/controller/statefulset" @@ -48,9 +47,6 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { ready := coreutil.IsReady[scope.TiKV](tikv) needUpdate = syncSuspendCond(tikv) || needUpdate - if state.PDSynced { - needUpdate = syncLeadersEvictedCond(tikv, state.Store, state.LeaderEvicting) || needUpdate - } if state.Store != nil { needUpdate = compare.SetIfNotEmptyAndChanged(&tikv.Status.ID, state.Store.ID) || needUpdate } @@ -86,10 +82,6 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { return task.Retry(defaultTaskWaitDuration).With("pod is terminating, retry after it's terminated") } - if state.LeaderEvicting { - return task.Wait().With("tikv is evicting leader, wait") - } - // TODO: use a condition to refactor it if !ready || tikv.Status.ID == "" { return task.Wait().With("tikv may not be ready, wait") @@ -109,28 +101,3 @@ func syncSuspendCond(tikv *v1alpha1.TiKV) bool { Message: "instance is not suspended", }) } - -// Status of this condition can only transfer as the below -func syncLeadersEvictedCond(tikv *v1alpha1.TiKV, store *pdv1.Store, isEvicting bool) bool { - status := metav1.ConditionFalse - reason := v1alpha1.ReasonNotEvicted - msg := "leaders are not all evicted" - switch { - case store == nil: - status = metav1.ConditionTrue - reason = v1alpha1.ReasonStoreIsRemoved - msg = "store does not exist" - case isEvicting && store.LeaderCount == 0: - status = metav1.ConditionTrue - reason = v1alpha1.ReasonEvicted - msg = "all leaders are evicted" - } - - return meta.SetStatusCondition(&tikv.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: status, - ObservedGeneration: tikv.Generation, - Reason: reason, - Message: msg, - }) -} diff --git a/pkg/controllers/tikv/tasks/util.go b/pkg/controllers/tikv/tasks/util.go index 28b123ba3be..2ccd7529ced 100644 --- a/pkg/controllers/tikv/tasks/util.go +++ b/pkg/controllers/tikv/tasks/util.go @@ -16,11 +16,15 @@ package tasks import ( "context" + "fmt" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" - meta "github.com/pingcap/tidb-operator/api/v2/meta/v1alpha1" + metav1alpha1 "github.com/pingcap/tidb-operator/api/v2/meta/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" ) @@ -56,7 +60,7 @@ func CalcGracePeriod(regionCount int) int64 { // VolumeName returns the real spec.volumes[*].name of pod // TODO(liubo02): extract to namer pkg func VolumeName(volName string) string { - return meta.VolNamePrefix + volName + return metav1alpha1.VolNamePrefix + volName } func VolumeMount(name string, mount *v1alpha1.VolumeMount) *corev1.VolumeMount { @@ -73,3 +77,38 @@ func VolumeMount(name string, mount *v1alpha1.VolumeMount) *corev1.VolumeMount { return vm } + +func CheckTiKVLeadersEvicted(tikv *v1alpha1.TiKV) error { + cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + if cond == nil { + return fmt.Errorf("tikv leaders are not evicting") + } + if cond.Status == metav1.ConditionTrue { + return nil + } + + return fmt.Errorf("tikv leaders are not evicted: %v, %v", cond.Reason, cond.Message) +} + +func CheckTiKVLeadersEvictedOrTimeout(tikv *v1alpha1.TiKV, timeout time.Duration) error { + cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + if cond == nil { + return fmt.Errorf("tikv leaders are not evicting") + } + // all leaders are evicted + if cond.Status == metav1.ConditionTrue { + return nil + } + + // TODO: this is not a stable way, we should not use reason directly + if cond.Reason != v1alpha1.ReasonEvicting { + return fmt.Errorf("tikv leaders are not evicted: %v", cond.Reason) + } + + t := cond.LastTransitionTime.Time + if t.Add(timeout).Before(time.Now()) { + return nil + } + + return fmt.Errorf("tikv leaders are evicting: %v, %v", cond.Reason, cond.Message) +} diff --git a/pkg/pdapi/v1/client.go b/pkg/pdapi/v1/client.go index b3ac85e218a..44d2a8f1a5e 100644 --- a/pkg/pdapi/v1/client.go +++ b/pkg/pdapi/v1/client.go @@ -87,6 +87,8 @@ type PDClient interface { // GetTombStoneStores() (*StoresInfo, error) // GetStore gets a TiKV/TiFlash store for a specific store id of the cluster. GetStore(ctx context.Context, storeID string) (*StoreInfo, error) + // GetDownPeerRegions gets regions with down peers. + GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) // GetEvictLeaderScheduler gets leader eviction schedulers for stores. GetEvictLeaderScheduler(ctx context.Context, storeID string) (string, error) @@ -124,9 +126,10 @@ const ( membersPrefix = "pd/api/v1/members" microServicePrefix = "pd/api/v2/ms" - storesPrefix = "pd/api/v1/stores" - storePrefix = "pd/api/v1/store" - storeUpStatePrefix = "pd/api/v1/store/%v/state?state=Up" + storesPrefix = "pd/api/v1/stores" + storePrefix = "pd/api/v1/store" + storeUpStatePrefix = "pd/api/v1/store/%v/state?state=Up" + downPeerRegionsPrefix = "pd/api/v1/regions/check/down-peer" pdReplicationPrefix = "pd/api/v1/config/replicate" @@ -290,6 +293,19 @@ func (c *pdClient) GetStore(ctx context.Context, storeID string) (*StoreInfo, er return storeInfo, nil } +func (c *pdClient) GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) { + apiURL := fmt.Sprintf("%s/%s", c.url, downPeerRegionsPrefix) + body, err := httputil.GetBodyOK(ctx, c.httpClient, apiURL) + if err != nil { + return nil, err + } + info := &RegionsCheckInfo{} + if err := json.Unmarshal(body, info); err != nil { + return nil, err + } + return info, nil +} + func (c *pdClient) getStores(ctx context.Context, apiURL string, states ...metapb.StoreState) (*StoresInfo, error) { if len(states) != 0 { var q []string diff --git a/pkg/pdapi/v1/client_test.go b/pkg/pdapi/v1/client_test.go index 4d320a4513a..09e41cc1a1b 100644 --- a/pkg/pdapi/v1/client_test.go +++ b/pkg/pdapi/v1/client_test.go @@ -525,6 +525,36 @@ func TestPDClient_GetStore(t *testing.T) { assert.Equal(t, "Up", store.Store.StateName) } +func TestPDClient_GetDownPeerRegions(t *testing.T) { + jsonStr := ` +{ + "count": 2, + "regions": [ + { + "id": 101 + }, + { + "id": 102 + } + ] +}` + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/pd/api/v1/regions/check/down-peer", r.URL.Path) + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(jsonStr)) + assert.NoError(t, err) + })) + defer server.Close() + + client := NewPDClient(server.URL, time.Second, nil) + info, err := client.GetDownPeerRegions(context.Background()) + require.NoError(t, err) + require.NotNil(t, info) + assert.Equal(t, 2, info.Count) + require.Len(t, info.Regions, 2) + assert.Equal(t, uint64(101), info.Regions[0].ID) +} + func TestPDClinet_SetStoreLabels(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, "/pd/api/v1/store/1/label", r.URL.Path) diff --git a/pkg/pdapi/v1/mock_generated.go b/pkg/pdapi/v1/mock_generated.go index 57837df6202..c5e2d07e83d 100644 --- a/pkg/pdapi/v1/mock_generated.go +++ b/pkg/pdapi/v1/mock_generated.go @@ -151,6 +151,21 @@ func (mr *MockPDClientMockRecorder) GetConfig(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfig", reflect.TypeOf((*MockPDClient)(nil).GetConfig), ctx) } +// GetDownPeerRegions mocks base method. +func (m *MockPDClient) GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDownPeerRegions", ctx) + ret0, _ := ret[0].(*RegionsCheckInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetDownPeerRegions indicates an expected call of GetDownPeerRegions. +func (mr *MockPDClientMockRecorder) GetDownPeerRegions(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDownPeerRegions", reflect.TypeOf((*MockPDClient)(nil).GetDownPeerRegions), ctx) +} + // GetEvictLeaderScheduler mocks base method. func (m *MockPDClient) GetEvictLeaderScheduler(ctx context.Context, storeID string) (string, error) { m.ctrl.T.Helper() diff --git a/pkg/pdapi/v1/types.go b/pkg/pdapi/v1/types.go index b3aa4149762..7efba6e6702 100644 --- a/pkg/pdapi/v1/types.go +++ b/pkg/pdapi/v1/types.go @@ -89,6 +89,22 @@ type StoresInfo struct { Stores []*StoreInfo `json:"stores"` } +// RegionsCheckInfo is regions check info returned from PD RESTful interface. +type RegionsCheckInfo struct { + Count int `json:"count"` + Regions []*RegionCheckEntry `json:"regions,omitempty"` +} + +type RegionCheckEntry struct { + ID uint64 `json:"id"` + DownPeers []*RegionPeerStat `json:"down_peers,omitempty"` +} + +type RegionPeerStat struct { + Peer *metapb.Peer `json:"peer,omitempty"` + DownSeconds uint64 `json:"down_seconds,omitempty"` +} + // SchedulerInfo is a single scheduler info returned from PD RESTful interface. type SchedulerInfo struct { Name string `json:"name"` From a8dca3c329ad3ed2cec5e5c9bac4b4f414a043ad Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 5 May 2026 21:41:38 +0800 Subject: [PATCH 2/4] fix unit Signed-off-by: liubo02 --- pkg/controllers/tikv/tasks/status_test.go | 37 +---------------------- 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/pkg/controllers/tikv/tasks/status_test.go b/pkg/controllers/tikv/tasks/status_test.go index 7e8e36d6889..0bd6d176ae9 100644 --- a/pkg/controllers/tikv/tasks/status_test.go +++ b/pkg/controllers/tikv/tasks/status_test.go @@ -93,13 +93,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -168,13 +161,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -230,13 +216,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -282,7 +261,7 @@ func TestTaskStatus(t *testing.T) { PDSynced: true, }, - expectedStatus: task.SWait, + expectedStatus: task.SComplete, expectedObj: fake.FakeObj(fakeTiKVName, func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Generation = 3 obj.Labels = map[string]string{ @@ -307,13 +286,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionTrue, - ObservedGeneration: 3, - Reason: "Evicted", - Message: "all leaders are evicted", - }, } return obj @@ -413,13 +385,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj From 682c5d88069b5e41c7c4a96c3901f83040942cfa Mon Sep 17 00:00:00 2001 From: liubo02 Date: Wed, 6 May 2026 09:18:05 +0800 Subject: [PATCH 3/4] fix lint Signed-off-by: liubo02 --- pkg/controllers/tikv/tasks/offline.go | 1 - pkg/controllers/tikv/tasks/pod.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/controllers/tikv/tasks/offline.go b/pkg/controllers/tikv/tasks/offline.go index 2dbc4e09493..07eaff60e6c 100644 --- a/pkg/controllers/tikv/tasks/offline.go +++ b/pkg/controllers/tikv/tasks/offline.go @@ -48,7 +48,6 @@ func TaskOfflineStore(state *ReconcileContext, m pdm.PDClientManager) task.Task return task.Retry(defaultLeaderEvictTimeout+jitter). With("waiting for leaders evicted or timeout: %v", err) } - } pc, ok := state.GetPDClient(m) diff --git a/pkg/controllers/tikv/tasks/pod.go b/pkg/controllers/tikv/tasks/pod.go index 9862813f024..d2ffe903714 100644 --- a/pkg/controllers/tikv/tasks/pod.go +++ b/pkg/controllers/tikv/tasks/pod.go @@ -137,7 +137,7 @@ func TaskPod(state *ReconcileContext, c client.Client, cm pdm.PDClientManager) t func checkDownPeerCountIsZero(ctx context.Context, store *pdv1.Store, pc pdm.PDClient) error { downPeerInfo, err := pc.Underlay().GetDownPeerRegions(ctx) if err != nil { - return fmt.Errorf("failed to get down peer info: %v", err) + return fmt.Errorf("failed to get down peer info: %w", err) } downPeerCount := countNonSelfDownPeers(downPeerInfo, store) if downPeerCount != 0 { From e0f3507e7c572c545d7400f0feaca78c661be866 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Wed, 6 May 2026 22:06:05 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: liubo02 --- pkg/controllers/tikv/tasks/evict_leader.go | 3 + .../tikv/tasks/evict_leader_test.go | 42 +++++++ pkg/controllers/tikv/tasks/pod.go | 35 +++--- pkg/controllers/tikv/tasks/pod_test.go | 111 +++++++++++++++--- 4 files changed, 156 insertions(+), 35 deletions(-) diff --git a/pkg/controllers/tikv/tasks/evict_leader.go b/pkg/controllers/tikv/tasks/evict_leader.go index e30a5a34b32..9777e11756e 100644 --- a/pkg/controllers/tikv/tasks/evict_leader.go +++ b/pkg/controllers/tikv/tasks/evict_leader.go @@ -34,6 +34,9 @@ func TaskEvictLeader(state *ReconcileContext, m pdm.PDClientManager) task.Task { return task.Wait().With("wait if pd client is not registered") } if state.Store == nil { + if syncLeadersEvictedCond(state.TiKV(), nil, state.LeaderEvicting) { + state.SetStatusChanged() + } return task.Complete().With("store has been deleted or not created") } diff --git a/pkg/controllers/tikv/tasks/evict_leader_test.go b/pkg/controllers/tikv/tasks/evict_leader_test.go index 5ac4ebbc7fd..e7910659461 100644 --- a/pkg/controllers/tikv/tasks/evict_leader_test.go +++ b/pkg/controllers/tikv/tasks/evict_leader_test.go @@ -19,8 +19,10 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" @@ -74,6 +76,23 @@ func TestTaskEvictLeader(t *testing.T) { expectEvicting: false, expectedStatus: task.SComplete, }, + { + desc: "sync leaders evicted condition when store is absent", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Generation = 3 + return obj + }), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + PDSynced: true, + LeaderEvicting: true, + Store: nil, + }, + expectEvicting: true, + expectedStatus: task.SComplete, + }, } for i := range cases { @@ -99,6 +118,12 @@ func TestTaskEvictLeader(t *testing.T) { assert.Equal(tt, c.expectedStatus, res.Status()) assert.False(tt, done) assert.Equal(tt, c.expectEvicting, c.state.LeaderEvicting) + if c.state.Store == nil { + cond := findCondition(c.state.TiKV().Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + require.NotNil(tt, cond) + assert.Equal(tt, metav1.ConditionTrue, cond.Status) + assert.Equal(tt, v1alpha1.ReasonStoreNotExist, cond.Reason) + } }) } } @@ -112,3 +137,20 @@ func (s *stubPDClientState) GetPDClient(pdm.PDClientManager) (pdm.PDClient, bool } var _ stateutil.IPDClient = (*stubPDClientState)(nil) + +type stubPDClientUnavailableState struct{} + +func (s *stubPDClientUnavailableState) GetPDClient(pdm.PDClientManager) (pdm.PDClient, bool) { + return nil, false +} + +var _ stateutil.IPDClient = (*stubPDClientUnavailableState)(nil) + +func findCondition(conds []metav1.Condition, typ string) *metav1.Condition { + for i := range conds { + if conds[i].Type == typ { + return &conds[i] + } + } + return nil +} diff --git a/pkg/controllers/tikv/tasks/pod.go b/pkg/controllers/tikv/tasks/pod.go index d2ffe903714..ea6ba238368 100644 --- a/pkg/controllers/tikv/tasks/pod.go +++ b/pkg/controllers/tikv/tasks/pod.go @@ -75,11 +75,6 @@ func TaskPod(state *ReconcileContext, c client.Client, cm pdm.PDClientManager) t logger := logr.FromContextOrDiscard(ctx) expected := newPod(state.Cluster(), state.TiKV(), state.Store, state.FeatureGates()) - pc, ok := state.GetPDClient(cm) - if !ok { - return task.Wait().With("wait if pd client is not registered") - } - pod := state.Pod() if pod == nil { if err := c.Apply(ctx, expected); err != nil { @@ -101,12 +96,17 @@ func TaskPod(state *ReconcileContext, c client.Client, cm pdm.PDClientManager) t if !reloadable.CheckTiKVPod(state.TiKV(), pod) { state.ShouldEvictLeader = true + pc, ok := state.GetPDClient(cm) + if !ok { + return task.Wait().With("wait if pd client is not registered") + } + if err := checkDownPeerCountIsZero(ctx, state.Store, pc); err != nil { - return task.Wait().With("cannot recreate pod, check down peer: %v", err) + return task.Retry(defaultTaskWaitDuration).With("cannot recreate pod, check down peer: %v", err) } if err := CheckTiKVLeadersEvicted(state.TiKV()); err != nil { - return task.Wait().With("cannot recreate pod, check leader count: %v", err) + return task.Retry(defaultTaskWaitDuration).With("cannot recreate pod, check leader count: %v", err) } logger.Info("will recreate the pod") @@ -148,28 +148,31 @@ func checkDownPeerCountIsZero(ctx context.Context, store *pdv1.Store, pc pdm.PDC } func countNonSelfDownPeers(downPeerInfo *pdapi.RegionsCheckInfo, store *pdv1.Store) int { + if downPeerInfo == nil { + return 0 + } if store == nil || store.ID == "" { return downPeerInfo.Count } - if downPeerInfo.Count == 0 { - return 0 - } - - nonSelfDownPeerCount := 0 + remainingDownPeerCount := downPeerInfo.Count for _, region := range downPeerInfo.Regions { + if region == nil || len(region.DownPeers) == 0 { + continue + } for _, downPeer := range region.DownPeers { if downPeer == nil || downPeer.Peer == nil { - nonSelfDownPeerCount++ continue } if fmt.Sprint(downPeer.Peer.StoreId) == store.ID { - continue + remainingDownPeerCount-- } - nonSelfDownPeerCount++ } } - return nonSelfDownPeerCount + if remainingDownPeerCount < 0 { + return 0 + } + return remainingDownPeerCount } func newPod(cluster *v1alpha1.Cluster, tikv *v1alpha1.TiKV, store *pdv1.Store, g features.Gates) *corev1.Pod { diff --git a/pkg/controllers/tikv/tasks/pod_test.go b/pkg/controllers/tikv/tasks/pod_test.go index 5f6c73bf347..6b3ecda9b77 100644 --- a/pkg/controllers/tikv/tasks/pod_test.go +++ b/pkg/controllers/tikv/tasks/pod_test.go @@ -55,6 +55,7 @@ func TestTaskPod(t *testing.T) { objs []client.Object downPeerCount int downPeerInfo *pdapi.RegionsCheckInfo + pdClientReady bool // if true, cannot apply pod unexpectedErr bool @@ -78,6 +79,7 @@ func TestTaskPod(t *testing.T) { expectUpdatedPod: true, expectedStatus: task.SComplete, + pdClientReady: false, }, { desc: "no pod, failed to apply", @@ -94,6 +96,7 @@ func TestTaskPod(t *testing.T) { unexpectedErr: true, expectedStatus: task.SFail, + pdClientReady: false, }, { desc: "pod is deleting", @@ -119,6 +122,7 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, expectedShouldEvict: false, + pdClientReady: false, }, { desc: "version is changed", @@ -144,6 +148,7 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, expectedShouldEvict: true, + pdClientReady: true, }, { desc: "version is changed, failed to delete", @@ -169,6 +174,7 @@ func TestTaskPod(t *testing.T) { expectedStatus: task.SFail, expectedShouldEvict: true, + pdClientReady: true, }, { desc: "config changed, hot reload policy", @@ -194,6 +200,7 @@ func TestTaskPod(t *testing.T) { expectUpdatedPod: true, expectedStatus: task.SComplete, + pdClientReady: false, }, { desc: "config changed, restart policy", @@ -222,6 +229,7 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, expectedShouldEvict: true, + pdClientReady: true, }, { desc: "version is changed but restart precheck is blocked by leader count", @@ -246,8 +254,9 @@ func TestTaskPod(t *testing.T) { downPeerCount: 0, expectedPodIsTerminating: false, - expectedStatus: task.SWait, + expectedStatus: task.SRetry, expectedShouldEvict: true, + pdClientReady: true, }, { desc: "version is changed but restart precheck is blocked by down peer count", @@ -284,8 +293,9 @@ func TestTaskPod(t *testing.T) { }, }, expectedPodIsTerminating: false, - expectedStatus: task.SWait, + expectedStatus: task.SRetry, expectedShouldEvict: true, + pdClientReady: true, }, { desc: "version is changed but restart precheck ignores self down peer", @@ -323,6 +333,62 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, expectedShouldEvict: true, + pdClientReady: true, + }, + { + desc: "version is changed but pd client is not ready", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + }, + + expectedPodIsTerminating: false, + expectedStatus: task.SWait, + expectedShouldEvict: true, + pdClientReady: false, + }, + { + desc: "version is changed but down peer details are missing", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 1, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 1, + }, + expectedPodIsTerminating: false, + expectedStatus: task.SRetry, + expectedShouldEvict: true, + pdClientReady: true, }, { desc: "pod labels changed, config not changed", @@ -357,6 +423,7 @@ func TestTaskPod(t *testing.T) { expectUpdatedPod: true, expectedStatus: task.SComplete, expectedShouldEvict: false, + pdClientReady: false, }, { desc: "pod labels changed, config not changed, apply failed", @@ -391,6 +458,7 @@ func TestTaskPod(t *testing.T) { expectedStatus: task.SFail, expectedShouldEvict: false, + pdClientReady: false, }, { desc: "all are not changed", @@ -415,6 +483,7 @@ func TestTaskPod(t *testing.T) { expectedStatus: task.SComplete, expectedShouldEvict: false, + pdClientReady: false, }, } @@ -437,23 +506,27 @@ func TestTaskPod(t *testing.T) { s.IFeatureGates = stateutil.NewFeatureGates[scope.TiKV](s) ctrl := gomock.NewController(tt) - mockPDClient := pdm.NewMockPDClient(ctrl) - mockUnderlay := pdapi.NewMockPDClient(ctrl) - mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() - s.IPDClient = &stubPDClientState{client: mockPDClient} - - shouldQueryDownPeer := c.state.Pod() != nil && - c.state.Pod().GetDeletionTimestamp().IsZero() && - !reloadable.CheckTiKVPod(c.state.TiKV(), c.state.Pod()) - if shouldQueryDownPeer { - mockUnderlay.EXPECT(). - GetDownPeerRegions(gomock.Any()). - Return(func() *pdapi.RegionsCheckInfo { - if c.downPeerInfo != nil { - return c.downPeerInfo - } - return &pdapi.RegionsCheckInfo{Count: c.downPeerCount} - }(), nil) + if c.pdClientReady { + mockPDClient := pdm.NewMockPDClient(ctrl) + mockUnderlay := pdapi.NewMockPDClient(ctrl) + mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() + s.IPDClient = &stubPDClientState{client: mockPDClient} + + shouldQueryDownPeer := c.state.Pod() != nil && + c.state.Pod().GetDeletionTimestamp().IsZero() && + !reloadable.CheckTiKVPod(c.state.TiKV(), c.state.Pod()) + if shouldQueryDownPeer { + mockUnderlay.EXPECT(). + GetDownPeerRegions(gomock.Any()). + Return(func() *pdapi.RegionsCheckInfo { + if c.downPeerInfo != nil { + return c.downPeerInfo + } + return &pdapi.RegionsCheckInfo{Count: c.downPeerCount} + }(), nil) + } + } else { + s.IPDClient = &stubPDClientUnavailableState{} } if c.unexpectedErr {