diff --git a/api/core/v1alpha1/tikv_types.go b/api/core/v1alpha1/tikv_types.go index 57619816a3..e5adc0d350 100644 --- a/api/core/v1alpha1/tikv_types.go +++ b/api/core/v1alpha1/tikv_types.go @@ -45,9 +45,12 @@ const ( // condition TiKVCondLeadersEvicted = "LeadersEvicted" // reason - ReasonNotEvicted = "NotEvicted" - ReasonEvicting = "Evicting" - ReasonEvicted = "Evicted" + ReasonNotEvicted = "NotEvicted" + ReasonEvicting = "Evicting" + ReasonEvicted = "Evicted" + ReasonStoreNotExist = "StoreNotExist" + + // Deprecated: replaced by ReasonStoreNotExist ReasonStoreIsRemoved = "StoreIsRemoved" ) diff --git a/pkg/controllers/tikv/builder.go b/pkg/controllers/tikv/builder.go index 16d913524c..77cdc799f4 100644 --- a/pkg/controllers/tikv/builder.go +++ b/pkg/controllers/tikv/builder.go @@ -88,7 +88,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task tasks.TaskOfflineStore(state, r.PDClientManager), tasks.TaskConfigMap(state, r.Client), common.TaskPVC[scope.TiKV](state, r.Client, r.VolumeModifierFactory, tasks.PVCNewer()), - tasks.TaskPod(state, r.Client), + tasks.TaskPod(state, r.Client, r.PDClientManager), tasks.TaskStoreLabels(state, r.Client, r.PDClientManager), tasks.TaskEvictLeader(state, r.PDClientManager), common.TaskInstanceConditionSynced[scope.TiKV](state), diff --git a/pkg/controllers/tikv/tasks/ctx.go b/pkg/controllers/tikv/tasks/ctx.go index 0f7145133f..4b7a1c1b26 100644 --- a/pkg/controllers/tikv/tasks/ctx.go +++ b/pkg/controllers/tikv/tasks/ctx.go @@ -32,11 +32,13 @@ import ( type ReconcileContext struct { State - LeaderEvicting bool + ShouldEvictLeader bool + LeaderEvicting bool - Store *pdv1.Store PDSynced bool + Store *pdv1.Store + // IsStoreReady will be set only when pd is synced and the store is ok // It may be outdated so the tikv is healthy only when the pod is also available // If it's true and the pod is ready but not available, @@ -97,6 +99,7 @@ func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task if coreutil.ShouldSuspendCompute(state.Cluster()) { return task.Complete().With("cluster is suspending") } + scheduler, err := c.Underlay().GetEvictLeaderScheduler(ctx, state.Store.ID) if err != nil { return task.Fail().With("pd is unexpectedly crashed: %v", err) diff --git a/pkg/controllers/tikv/tasks/evict_leader.go b/pkg/controllers/tikv/tasks/evict_leader.go index 2557e83805..9777e11756 100644 --- a/pkg/controllers/tikv/tasks/evict_leader.go +++ b/pkg/controllers/tikv/tasks/evict_leader.go @@ -16,7 +16,13 @@ package tasks import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" ) @@ -27,26 +33,33 @@ func TaskEvictLeader(state *ReconcileContext, m pdm.PDClientManager) task.Task { if !ok { return task.Wait().With("wait if pd client is not registered") } - switch { - case !state.PDSynced: - return task.Wait().With("pd is unsynced") - case state.Store == nil: + if state.Store == nil { + if syncLeadersEvictedCond(state.TiKV(), nil, state.LeaderEvicting) { + state.SetStatusChanged() + } return task.Complete().With("store has been deleted or not created") - case state.Instance().IsOffline() || state.IsPodTerminating(): - if !state.LeaderEvicting { - if err := pc.Underlay().BeginEvictLeader(ctx, state.Store.ID); err != nil { - return task.Fail().With("cannot add evict leader scheduler: %v", err) - } + } + + if state.ShouldEvictLeader && !state.LeaderEvicting { + if err := pc.Underlay().BeginEvictLeader(ctx, state.Store.ID); err != nil { + return task.Fail().With("cannot add evict leader scheduler: %v", err) } - return task.Complete().With("ensure evict leader scheduler exists") - default: - if state.LeaderEvicting { - if err := pc.Underlay().EndEvictLeader(ctx, state.Store.ID); err != nil { - return task.Fail().With("cannot remove evict leader scheduler: %v", err) - } + state.LeaderEvicting = true + } + + if state.LeaderEvicting && !state.ShouldEvictLeader { + if err := pc.Underlay().EndEvictLeader(ctx, state.Store.ID); err != nil { + return task.Fail().With("cannot remove evict leader scheduler: %v", err) } - return task.Complete().With("ensure evict leader scheduler doesn't exist") + state.LeaderEvicting = false } + + needUpdate := syncLeadersEvictedCond(state.TiKV(), state.Store, state.LeaderEvicting) + if needUpdate { + state.SetStatusChanged() + } + + return task.Complete().With("sync evict leader scheduler, expected: %v, actual: %v", state.ShouldEvictLeader, state.LeaderEvicting) }) } @@ -69,3 +82,32 @@ func TaskEndEvictLeader(state *ReconcileContext, m pdm.PDClientManager) task.Tas return task.Complete().With(msg) }) } + +// Status of this condition can only transfer as the below +func syncLeadersEvictedCond(tikv *v1alpha1.TiKV, store *pdv1.Store, isEvicting bool) bool { + status := metav1.ConditionFalse + reason := v1alpha1.ReasonNotEvicted + msg := "leaders are not all evicted" + switch { + case store == nil: + status = metav1.ConditionTrue + reason = v1alpha1.ReasonStoreNotExist + msg = "store does not exist" + case isEvicting && store.LeaderCount == 0: + status = metav1.ConditionTrue + reason = v1alpha1.ReasonEvicted + msg = "all leaders are evicted" + case isEvicting: + status = metav1.ConditionFalse + reason = v1alpha1.ReasonEvicting + msg = fmt.Sprintf("not all leaders are evicted, still: %v", store.LeaderCount) + } + + return meta.SetStatusCondition(&tikv.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiKVCondLeadersEvicted, + Status: status, + ObservedGeneration: tikv.Generation, + Reason: reason, + Message: msg, + }) +} diff --git a/pkg/controllers/tikv/tasks/evict_leader_test.go b/pkg/controllers/tikv/tasks/evict_leader_test.go new file mode 100644 index 0000000000..e791065946 --- /dev/null +++ b/pkg/controllers/tikv/tasks/evict_leader_test.go @@ -0,0 +1,156 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" + stateutil "github.com/pingcap/tidb-operator/v2/pkg/state" + pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" + "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +func TestTaskEvictLeader(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + expectBegin bool + expectEnd bool + expectEvicting bool + expectedStatus task.Status + }{ + { + desc: "begin evict leader when requested", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + ShouldEvictLeader: true, + PDSynced: true, + Store: &pdv1.Store{ + ID: "1", + }, + }, + expectBegin: true, + expectEvicting: true, + expectedStatus: task.SComplete, + }, + { + desc: "end evict leader when annotation is absent", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj[v1alpha1.TiKV]("aaa-xxx"), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + PDSynced: true, + LeaderEvicting: true, + Store: &pdv1.Store{ + ID: "1", + }, + }, + expectEnd: true, + expectEvicting: false, + expectedStatus: task.SComplete, + }, + { + desc: "sync leaders evicted condition when store is absent", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Generation = 3 + return obj + }), + pod: fake.FakeObj[corev1.Pod]("aaa-tikv-xxx"), + }, + PDSynced: true, + LeaderEvicting: true, + Store: nil, + }, + expectEvicting: true, + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctrl := gomock.NewController(tt) + mockPDClient := pdm.NewMockPDClient(ctrl) + mockUnderlay := pdapi.NewMockPDClient(ctrl) + mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() + if c.expectBegin { + mockUnderlay.EXPECT().BeginEvictLeader(gomock.Any(), "1").Return(nil) + } + if c.expectEnd { + mockUnderlay.EXPECT().EndEvictLeader(gomock.Any(), "1").Return(nil) + } + + s := c.state.State.(*state) + s.IPDClient = &stubPDClientState{client: mockPDClient} + + res, done := task.RunTask(context.Background(), TaskEvictLeader(c.state, nil)) + assert.Equal(tt, c.expectedStatus, res.Status()) + assert.False(tt, done) + assert.Equal(tt, c.expectEvicting, c.state.LeaderEvicting) + if c.state.Store == nil { + cond := findCondition(c.state.TiKV().Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + require.NotNil(tt, cond) + assert.Equal(tt, metav1.ConditionTrue, cond.Status) + assert.Equal(tt, v1alpha1.ReasonStoreNotExist, cond.Reason) + } + }) + } +} + +type stubPDClientState struct { + client pdm.PDClient +} + +func (s *stubPDClientState) GetPDClient(pdm.PDClientManager) (pdm.PDClient, bool) { + return s.client, true +} + +var _ stateutil.IPDClient = (*stubPDClientState)(nil) + +type stubPDClientUnavailableState struct{} + +func (s *stubPDClientUnavailableState) GetPDClient(pdm.PDClientManager) (pdm.PDClient, bool) { + return nil, false +} + +var _ stateutil.IPDClient = (*stubPDClientUnavailableState)(nil) + +func findCondition(conds []metav1.Condition, typ string) *metav1.Condition { + for i := range conds { + if conds[i].Type == typ { + return &conds[i] + } + } + return nil +} diff --git a/pkg/controllers/tikv/tasks/offline.go b/pkg/controllers/tikv/tasks/offline.go index 51b4d16238..07eaff60e6 100644 --- a/pkg/controllers/tikv/tasks/offline.go +++ b/pkg/controllers/tikv/tasks/offline.go @@ -18,11 +18,6 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" - - "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/controllers/common" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" @@ -47,18 +42,11 @@ func TaskOfflineStore(state *ReconcileContext, m pdm.PDClientManager) task.Task // If the store is nil, it means the store has been deleted or not created yet. // No need to check if leaders are evicted. if state.Store != nil && isOffline { - var reason string - beginTime := getBeginEvictLeaderTime(tikv) - switch { - case state.LeaderEvicting && state.GetLeaderCount() == 0: - reason = "leaders have been all evicted" - case beginTime != nil && beginTime.Add(defaultLeaderEvictTimeout).Before(time.Now()): - reason = "leader eviction timeout" - } + state.ShouldEvictLeader = true - if reason == "" { + if err := CheckTiKVLeadersEvictedOrTimeout(tikv, defaultLeaderEvictTimeout); err != nil { return task.Retry(defaultLeaderEvictTimeout+jitter). - With("waiting for leaders evicted or timeout, current leader count: %d", state.GetLeaderCount()) + With("waiting for leaders evicted or timeout: %v", err) } } @@ -87,13 +75,3 @@ func TaskOfflineStore(state *ReconcileContext, m pdm.PDClientManager) task.Task return task.Complete().With("offline is completed or no need, spec.offline: %v", isOffline) }) } - -// getBeginEvictLeaderTime returns the time when the leader eviction started. -// If the condition is not found or the status is not False, it returns nil. -func getBeginEvictLeaderTime(tikv *v1alpha1.TiKV) *time.Time { - cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) - if cond != nil && cond.Status == metav1.ConditionFalse { - return ptr.To(cond.LastTransitionTime.Time) - } - return nil -} diff --git a/pkg/controllers/tikv/tasks/pod.go b/pkg/controllers/tikv/tasks/pod.go index 0349627ea0..ea6ba23836 100644 --- a/pkg/controllers/tikv/tasks/pod.go +++ b/pkg/controllers/tikv/tasks/pod.go @@ -33,9 +33,11 @@ import ( "github.com/pingcap/tidb-operator/v2/pkg/features" "github.com/pingcap/tidb-operator/v2/pkg/image" "github.com/pingcap/tidb-operator/v2/pkg/overlay" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" "github.com/pingcap/tidb-operator/v2/pkg/reloadable" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/k8s" maputil "github.com/pingcap/tidb-operator/v2/pkg/utils/map" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" @@ -68,10 +70,11 @@ func TaskSuspendPod(state *ReconcileContext, c client.Client) task.Task { }) } -func TaskPod(state *ReconcileContext, c client.Client) task.Task { +func TaskPod(state *ReconcileContext, c client.Client, cm pdm.PDClientManager) task.Task { return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { logger := logr.FromContextOrDiscard(ctx) expected := newPod(state.Cluster(), state.TiKV(), state.Store, state.FeatureGates()) + pod := state.Pod() if pod == nil { if err := c.Apply(ctx, expected); err != nil { @@ -91,6 +94,21 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { } if !reloadable.CheckTiKVPod(state.TiKV(), pod) { + state.ShouldEvictLeader = true + + pc, ok := state.GetPDClient(cm) + if !ok { + return task.Wait().With("wait if pd client is not registered") + } + + if err := checkDownPeerCountIsZero(ctx, state.Store, pc); err != nil { + return task.Retry(defaultTaskWaitDuration).With("cannot recreate pod, check down peer: %v", err) + } + + if err := CheckTiKVLeadersEvicted(state.TiKV()); err != nil { + return task.Retry(defaultTaskWaitDuration).With("cannot recreate pod, check leader count: %v", err) + } + logger.Info("will recreate the pod") regionCount := 0 if state.Store != nil { @@ -116,6 +134,47 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task { }) } +func checkDownPeerCountIsZero(ctx context.Context, store *pdv1.Store, pc pdm.PDClient) error { + downPeerInfo, err := pc.Underlay().GetDownPeerRegions(ctx) + if err != nil { + return fmt.Errorf("failed to get down peer info: %w", err) + } + downPeerCount := countNonSelfDownPeers(downPeerInfo, store) + if downPeerCount != 0 { + return fmt.Errorf("down peer is not zero: %v", downPeerCount) + } + + return nil +} + +func countNonSelfDownPeers(downPeerInfo *pdapi.RegionsCheckInfo, store *pdv1.Store) int { + if downPeerInfo == nil { + return 0 + } + if store == nil || store.ID == "" { + return downPeerInfo.Count + } + remainingDownPeerCount := downPeerInfo.Count + for _, region := range downPeerInfo.Regions { + if region == nil || len(region.DownPeers) == 0 { + continue + } + for _, downPeer := range region.DownPeers { + if downPeer == nil || downPeer.Peer == nil { + continue + } + if fmt.Sprint(downPeer.Peer.StoreId) == store.ID { + remainingDownPeerCount-- + } + } + } + + if remainingDownPeerCount < 0 { + return 0 + } + return remainingDownPeerCount +} + func newPod(cluster *v1alpha1.Cluster, tikv *v1alpha1.TiKV, store *pdv1.Store, g features.Gates) *corev1.Pod { vols := []corev1.Volume{ { diff --git a/pkg/controllers/tikv/tasks/pod_test.go b/pkg/controllers/tikv/tasks/pod_test.go index 6f1c0ce912..6b3ecda9b7 100644 --- a/pkg/controllers/tikv/tasks/pod_test.go +++ b/pkg/controllers/tikv/tasks/pod_test.go @@ -19,8 +19,10 @@ import ( "fmt" "testing" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,9 +31,12 @@ import ( "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" "github.com/pingcap/tidb-operator/v2/pkg/features" + pdapi "github.com/pingcap/tidb-operator/v2/pkg/pdapi/v1" + "github.com/pingcap/tidb-operator/v2/pkg/reloadable" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" stateutil "github.com/pingcap/tidb-operator/v2/pkg/state" pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" + pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd" "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" ) @@ -45,15 +50,19 @@ const ( func TestTaskPod(t *testing.T) { now := metav1.Now() cases := []struct { - desc string - state *ReconcileContext - objs []client.Object + desc string + state *ReconcileContext + objs []client.Object + downPeerCount int + downPeerInfo *pdapi.RegionsCheckInfo + pdClientReady bool // if true, cannot apply pod unexpectedErr bool expectUpdatedPod bool expectedPodIsTerminating bool expectedStatus task.Status + expectedShouldEvict bool }{ { desc: "no pod", @@ -61,6 +70,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -69,6 +79,7 @@ func TestTaskPod(t *testing.T) { expectUpdatedPod: true, expectedStatus: task.SComplete, + pdClientReady: false, }, { desc: "no pod, failed to apply", @@ -76,6 +87,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -84,6 +96,7 @@ func TestTaskPod(t *testing.T) { unexpectedErr: true, expectedStatus: task.SFail, + pdClientReady: false, }, { desc: "pod is deleting", @@ -108,6 +121,8 @@ func TestTaskPod(t *testing.T) { expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: false, + pdClientReady: false, }, { desc: "version is changed", @@ -115,6 +130,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -128,8 +144,11 @@ func TestTaskPod(t *testing.T) { }, }, + downPeerCount: 0, expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: true, + pdClientReady: true, }, { desc: "version is changed, failed to delete", @@ -137,6 +156,7 @@ func TestTaskPod(t *testing.T) { State: &state{ tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -149,9 +169,12 @@ func TestTaskPod(t *testing.T) { ), }, }, + downPeerCount: 0, unexpectedErr: true, - expectedStatus: task.SFail, + expectedStatus: task.SFail, + expectedShouldEvict: true, + pdClientReady: true, }, { desc: "config changed, hot reload policy", @@ -177,6 +200,7 @@ func TestTaskPod(t *testing.T) { expectUpdatedPod: true, expectedStatus: task.SComplete, + pdClientReady: false, }, { desc: "config changed, restart policy", @@ -185,6 +209,7 @@ func TestTaskPod(t *testing.T) { tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Spec.Version = fakeVersion obj.Spec.UpdateStrategy.Config = v1alpha1.ConfigUpdateStrategyRestart + setLeadersEvicted(obj) return obj }), cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), @@ -200,8 +225,170 @@ func TestTaskPod(t *testing.T) { }, }, + downPeerCount: 0, expectedPodIsTerminating: true, expectedStatus: task.SWait, + expectedShouldEvict: true, + pdClientReady: true, + }, + { + desc: "version is changed but restart precheck is blocked by leader count", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + leaderCount: 1, + }, + Store: &pdv1.Store{RegionCount: 400}, + }, + + downPeerCount: 0, + expectedPodIsTerminating: false, + expectedStatus: task.SRetry, + expectedShouldEvict: true, + pdClientReady: true, + }, + { + desc: "version is changed but restart precheck is blocked by down peer count", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 2, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 2, + Regions: []*pdapi.RegionCheckEntry{ + { + ID: 101, + DownPeers: []*pdapi.RegionPeerStat{ + {Peer: &metapb.Peer{StoreId: 2}}, + {Peer: &metapb.Peer{StoreId: 3}}, + }, + }, + }, + }, + expectedPodIsTerminating: false, + expectedStatus: task.SRetry, + expectedShouldEvict: true, + pdClientReady: true, + }, + { + desc: "version is changed but restart precheck ignores self down peer", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 1, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 1, + Regions: []*pdapi.RegionCheckEntry{ + { + ID: 101, + DownPeers: []*pdapi.RegionPeerStat{ + {Peer: &metapb.Peer{StoreId: 1}}, + }, + }, + }, + }, + expectedPodIsTerminating: true, + expectedStatus: task.SWait, + expectedShouldEvict: true, + pdClientReady: true, + }, + { + desc: "version is changed but pd client is not ready", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + }, + + expectedPodIsTerminating: false, + expectedStatus: task.SWait, + expectedShouldEvict: true, + pdClientReady: false, + }, + { + desc: "version is changed but down peer details are missing", + state: &ReconcileContext{ + State: &state{ + tikv: fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeVersion + setLeadersEvicted(obj) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pod: fakePod( + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa-xxx", func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { + obj.Spec.Version = fakeNewVersion + return obj + }), + ), + }, + Store: &pdv1.Store{ID: "1", RegionCount: 400}, + }, + + downPeerCount: 1, + downPeerInfo: &pdapi.RegionsCheckInfo{ + Count: 1, + }, + expectedPodIsTerminating: false, + expectedStatus: task.SRetry, + expectedShouldEvict: true, + pdClientReady: true, }, { desc: "pod labels changed, config not changed", @@ -233,8 +420,10 @@ func TestTaskPod(t *testing.T) { }, }, - expectUpdatedPod: true, - expectedStatus: task.SComplete, + expectUpdatedPod: true, + expectedStatus: task.SComplete, + expectedShouldEvict: false, + pdClientReady: false, }, { desc: "pod labels changed, config not changed, apply failed", @@ -267,7 +456,9 @@ func TestTaskPod(t *testing.T) { }, unexpectedErr: true, - expectedStatus: task.SFail, + expectedStatus: task.SFail, + expectedShouldEvict: false, + pdClientReady: false, }, { desc: "all are not changed", @@ -290,7 +481,9 @@ func TestTaskPod(t *testing.T) { }, }, - expectedStatus: task.SComplete, + expectedStatus: task.SComplete, + expectedShouldEvict: false, + pdClientReady: false, }, } @@ -312,17 +505,42 @@ func TestTaskPod(t *testing.T) { s := c.state.State.(*state) s.IFeatureGates = stateutil.NewFeatureGates[scope.TiKV](s) + ctrl := gomock.NewController(tt) + if c.pdClientReady { + mockPDClient := pdm.NewMockPDClient(ctrl) + mockUnderlay := pdapi.NewMockPDClient(ctrl) + mockPDClient.EXPECT().Underlay().Return(mockUnderlay).AnyTimes() + s.IPDClient = &stubPDClientState{client: mockPDClient} + + shouldQueryDownPeer := c.state.Pod() != nil && + c.state.Pod().GetDeletionTimestamp().IsZero() && + !reloadable.CheckTiKVPod(c.state.TiKV(), c.state.Pod()) + if shouldQueryDownPeer { + mockUnderlay.EXPECT(). + GetDownPeerRegions(gomock.Any()). + Return(func() *pdapi.RegionsCheckInfo { + if c.downPeerInfo != nil { + return c.downPeerInfo + } + return &pdapi.RegionsCheckInfo{Count: c.downPeerCount} + }(), nil) + } + } else { + s.IPDClient = &stubPDClientUnavailableState{} + } + if c.unexpectedErr { // cannot update pod fc.WithError("patch", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) fc.WithError("delete", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) } - res, done := task.RunTask(ctx, TaskPod(c.state, fc)) + res, done := task.RunTask(ctx, TaskPod(c.state, fc, nil)) assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), res.Message()) assert.False(tt, done, c.desc) assert.Equal(tt, c.expectedPodIsTerminating, c.state.IsPodTerminating(), c.desc) + assert.Equal(tt, c.expectedShouldEvict, c.state.ShouldEvictLeader, c.desc) if c.expectUpdatedPod { expectedPod := newPod(c.state.Cluster(), c.state.TiKV(), nil, c.state.FeatureGates()) @@ -339,3 +557,11 @@ func TestTaskPod(t *testing.T) { func fakePod(c *v1alpha1.Cluster, tikv *v1alpha1.TiKV) *corev1.Pod { return newPod(c, tikv, nil, features.NewFromFeatures(nil)) } + +func setLeadersEvicted(tikv *v1alpha1.TiKV) { + tikv.Status.Conditions = append(tikv.Status.Conditions, metav1.Condition{ + Type: v1alpha1.TiKVCondLeadersEvicted, + Status: metav1.ConditionTrue, + Reason: v1alpha1.ReasonEvicted, + }) +} diff --git a/pkg/controllers/tikv/tasks/status.go b/pkg/controllers/tikv/tasks/status.go index 6d13693479..978ecd4b9e 100644 --- a/pkg/controllers/tikv/tasks/status.go +++ b/pkg/controllers/tikv/tasks/status.go @@ -26,7 +26,6 @@ import ( coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" - pdv1 "github.com/pingcap/tidb-operator/v2/pkg/timanager/apis/pd/v1" "github.com/pingcap/tidb-operator/v2/pkg/utils/compare" "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/v2/third_party/kubernetes/pkg/controller/statefulset" @@ -48,9 +47,6 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { ready := coreutil.IsReady[scope.TiKV](tikv) needUpdate = syncSuspendCond(tikv) || needUpdate - if state.PDSynced { - needUpdate = syncLeadersEvictedCond(tikv, state.Store, state.LeaderEvicting) || needUpdate - } if state.Store != nil { needUpdate = compare.SetIfNotEmptyAndChanged(&tikv.Status.ID, state.Store.ID) || needUpdate } @@ -86,10 +82,6 @@ func TaskStatus(state *ReconcileContext, c client.Client) task.Task { return task.Retry(defaultTaskWaitDuration).With("pod is terminating, retry after it's terminated") } - if state.LeaderEvicting { - return task.Wait().With("tikv is evicting leader, wait") - } - // TODO: use a condition to refactor it if !ready || tikv.Status.ID == "" { return task.Wait().With("tikv may not be ready, wait") @@ -109,28 +101,3 @@ func syncSuspendCond(tikv *v1alpha1.TiKV) bool { Message: "instance is not suspended", }) } - -// Status of this condition can only transfer as the below -func syncLeadersEvictedCond(tikv *v1alpha1.TiKV, store *pdv1.Store, isEvicting bool) bool { - status := metav1.ConditionFalse - reason := v1alpha1.ReasonNotEvicted - msg := "leaders are not all evicted" - switch { - case store == nil: - status = metav1.ConditionTrue - reason = v1alpha1.ReasonStoreIsRemoved - msg = "store does not exist" - case isEvicting && store.LeaderCount == 0: - status = metav1.ConditionTrue - reason = v1alpha1.ReasonEvicted - msg = "all leaders are evicted" - } - - return meta.SetStatusCondition(&tikv.Status.Conditions, metav1.Condition{ - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: status, - ObservedGeneration: tikv.Generation, - Reason: reason, - Message: msg, - }) -} diff --git a/pkg/controllers/tikv/tasks/status_test.go b/pkg/controllers/tikv/tasks/status_test.go index 7e8e36d688..0bd6d176ae 100644 --- a/pkg/controllers/tikv/tasks/status_test.go +++ b/pkg/controllers/tikv/tasks/status_test.go @@ -93,13 +93,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -168,13 +161,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -230,13 +216,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj @@ -282,7 +261,7 @@ func TestTaskStatus(t *testing.T) { PDSynced: true, }, - expectedStatus: task.SWait, + expectedStatus: task.SComplete, expectedObj: fake.FakeObj(fakeTiKVName, func(obj *v1alpha1.TiKV) *v1alpha1.TiKV { obj.Generation = 3 obj.Labels = map[string]string{ @@ -307,13 +286,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionTrue, - ObservedGeneration: 3, - Reason: "Evicted", - Message: "all leaders are evicted", - }, } return obj @@ -413,13 +385,6 @@ func TestTaskStatus(t *testing.T) { Reason: v1alpha1.ReasonUnsuspended, Message: "instance is not suspended", }, - { - Type: v1alpha1.TiKVCondLeadersEvicted, - Status: metav1.ConditionFalse, - ObservedGeneration: 3, - Reason: "NotEvicted", - Message: "leaders are not all evicted", - }, } return obj diff --git a/pkg/controllers/tikv/tasks/util.go b/pkg/controllers/tikv/tasks/util.go index 28b123ba3b..2ccd7529ce 100644 --- a/pkg/controllers/tikv/tasks/util.go +++ b/pkg/controllers/tikv/tasks/util.go @@ -16,11 +16,15 @@ package tasks import ( "context" + "fmt" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" - meta "github.com/pingcap/tidb-operator/api/v2/meta/v1alpha1" + metav1alpha1 "github.com/pingcap/tidb-operator/api/v2/meta/v1alpha1" "github.com/pingcap/tidb-operator/v2/pkg/client" ) @@ -56,7 +60,7 @@ func CalcGracePeriod(regionCount int) int64 { // VolumeName returns the real spec.volumes[*].name of pod // TODO(liubo02): extract to namer pkg func VolumeName(volName string) string { - return meta.VolNamePrefix + volName + return metav1alpha1.VolNamePrefix + volName } func VolumeMount(name string, mount *v1alpha1.VolumeMount) *corev1.VolumeMount { @@ -73,3 +77,38 @@ func VolumeMount(name string, mount *v1alpha1.VolumeMount) *corev1.VolumeMount { return vm } + +func CheckTiKVLeadersEvicted(tikv *v1alpha1.TiKV) error { + cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + if cond == nil { + return fmt.Errorf("tikv leaders are not evicting") + } + if cond.Status == metav1.ConditionTrue { + return nil + } + + return fmt.Errorf("tikv leaders are not evicted: %v, %v", cond.Reason, cond.Message) +} + +func CheckTiKVLeadersEvictedOrTimeout(tikv *v1alpha1.TiKV, timeout time.Duration) error { + cond := meta.FindStatusCondition(tikv.Status.Conditions, v1alpha1.TiKVCondLeadersEvicted) + if cond == nil { + return fmt.Errorf("tikv leaders are not evicting") + } + // all leaders are evicted + if cond.Status == metav1.ConditionTrue { + return nil + } + + // TODO: this is not a stable way, we should not use reason directly + if cond.Reason != v1alpha1.ReasonEvicting { + return fmt.Errorf("tikv leaders are not evicted: %v", cond.Reason) + } + + t := cond.LastTransitionTime.Time + if t.Add(timeout).Before(time.Now()) { + return nil + } + + return fmt.Errorf("tikv leaders are evicting: %v, %v", cond.Reason, cond.Message) +} diff --git a/pkg/pdapi/v1/client.go b/pkg/pdapi/v1/client.go index b3ac85e218..44d2a8f1a5 100644 --- a/pkg/pdapi/v1/client.go +++ b/pkg/pdapi/v1/client.go @@ -87,6 +87,8 @@ type PDClient interface { // GetTombStoneStores() (*StoresInfo, error) // GetStore gets a TiKV/TiFlash store for a specific store id of the cluster. GetStore(ctx context.Context, storeID string) (*StoreInfo, error) + // GetDownPeerRegions gets regions with down peers. + GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) // GetEvictLeaderScheduler gets leader eviction schedulers for stores. GetEvictLeaderScheduler(ctx context.Context, storeID string) (string, error) @@ -124,9 +126,10 @@ const ( membersPrefix = "pd/api/v1/members" microServicePrefix = "pd/api/v2/ms" - storesPrefix = "pd/api/v1/stores" - storePrefix = "pd/api/v1/store" - storeUpStatePrefix = "pd/api/v1/store/%v/state?state=Up" + storesPrefix = "pd/api/v1/stores" + storePrefix = "pd/api/v1/store" + storeUpStatePrefix = "pd/api/v1/store/%v/state?state=Up" + downPeerRegionsPrefix = "pd/api/v1/regions/check/down-peer" pdReplicationPrefix = "pd/api/v1/config/replicate" @@ -290,6 +293,19 @@ func (c *pdClient) GetStore(ctx context.Context, storeID string) (*StoreInfo, er return storeInfo, nil } +func (c *pdClient) GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) { + apiURL := fmt.Sprintf("%s/%s", c.url, downPeerRegionsPrefix) + body, err := httputil.GetBodyOK(ctx, c.httpClient, apiURL) + if err != nil { + return nil, err + } + info := &RegionsCheckInfo{} + if err := json.Unmarshal(body, info); err != nil { + return nil, err + } + return info, nil +} + func (c *pdClient) getStores(ctx context.Context, apiURL string, states ...metapb.StoreState) (*StoresInfo, error) { if len(states) != 0 { var q []string diff --git a/pkg/pdapi/v1/client_test.go b/pkg/pdapi/v1/client_test.go index 4d320a4513..09e41cc1a1 100644 --- a/pkg/pdapi/v1/client_test.go +++ b/pkg/pdapi/v1/client_test.go @@ -525,6 +525,36 @@ func TestPDClient_GetStore(t *testing.T) { assert.Equal(t, "Up", store.Store.StateName) } +func TestPDClient_GetDownPeerRegions(t *testing.T) { + jsonStr := ` +{ + "count": 2, + "regions": [ + { + "id": 101 + }, + { + "id": 102 + } + ] +}` + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/pd/api/v1/regions/check/down-peer", r.URL.Path) + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(jsonStr)) + assert.NoError(t, err) + })) + defer server.Close() + + client := NewPDClient(server.URL, time.Second, nil) + info, err := client.GetDownPeerRegions(context.Background()) + require.NoError(t, err) + require.NotNil(t, info) + assert.Equal(t, 2, info.Count) + require.Len(t, info.Regions, 2) + assert.Equal(t, uint64(101), info.Regions[0].ID) +} + func TestPDClinet_SetStoreLabels(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, "/pd/api/v1/store/1/label", r.URL.Path) diff --git a/pkg/pdapi/v1/mock_generated.go b/pkg/pdapi/v1/mock_generated.go index 57837df620..c5e2d07e83 100644 --- a/pkg/pdapi/v1/mock_generated.go +++ b/pkg/pdapi/v1/mock_generated.go @@ -151,6 +151,21 @@ func (mr *MockPDClientMockRecorder) GetConfig(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfig", reflect.TypeOf((*MockPDClient)(nil).GetConfig), ctx) } +// GetDownPeerRegions mocks base method. +func (m *MockPDClient) GetDownPeerRegions(ctx context.Context) (*RegionsCheckInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDownPeerRegions", ctx) + ret0, _ := ret[0].(*RegionsCheckInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetDownPeerRegions indicates an expected call of GetDownPeerRegions. +func (mr *MockPDClientMockRecorder) GetDownPeerRegions(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDownPeerRegions", reflect.TypeOf((*MockPDClient)(nil).GetDownPeerRegions), ctx) +} + // GetEvictLeaderScheduler mocks base method. func (m *MockPDClient) GetEvictLeaderScheduler(ctx context.Context, storeID string) (string, error) { m.ctrl.T.Helper() diff --git a/pkg/pdapi/v1/types.go b/pkg/pdapi/v1/types.go index b3aa414976..7efba6e670 100644 --- a/pkg/pdapi/v1/types.go +++ b/pkg/pdapi/v1/types.go @@ -89,6 +89,22 @@ type StoresInfo struct { Stores []*StoreInfo `json:"stores"` } +// RegionsCheckInfo is regions check info returned from PD RESTful interface. +type RegionsCheckInfo struct { + Count int `json:"count"` + Regions []*RegionCheckEntry `json:"regions,omitempty"` +} + +type RegionCheckEntry struct { + ID uint64 `json:"id"` + DownPeers []*RegionPeerStat `json:"down_peers,omitempty"` +} + +type RegionPeerStat struct { + Peer *metapb.Peer `json:"peer,omitempty"` + DownSeconds uint64 `json:"down_seconds,omitempty"` +} + // SchedulerInfo is a single scheduler info returned from PD RESTful interface. type SchedulerInfo struct { Name string `json:"name"`