Skip to content

Commit d99f294

Browse files
authored
tiproxy: add spec field gracefulShutdownDeleteDelaySeconds to gracefully mark unhealthy before deleting the pods (#6829)
1 parent 86cb65a commit d99f294

13 files changed

Lines changed: 1184 additions & 26 deletions

File tree

api/core/v1alpha1/common_types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ const (
164164
// Last instance template is recorded to check whether the pod should be restarted because of changes of instance template
165165
AnnoKeyLastInstanceTemplate = AnnoKeyPrefix + "last-instance-template"
166166

167+
// TiProxy graceful shutdown begin time is recorded on the pod when graceful shutdown begins.
168+
AnnoKeyTiProxyGracefulShutdownBeginTime = AnnoKeyPrefix + "tiproxy-graceful-shutdown-begin-time"
169+
170+
// TiProxy graceful shutdown delete delay controls how long operator waits before deleting a TiProxy pod
171+
// after it has been marked unhealthy during graceful shutdown.
172+
AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds = AnnoKeyPrefix + "tiproxy-graceful-shutdown-delete-delay-seconds"
173+
167174
// Features is recorded to check whether the pod should be restarted because of changes of features
168175
AnnoKeyFeatures = AnnoKeyPrefix + "features"
169176

pkg/controllers/tiproxy/builder.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
3737
common.TaskContextCluster[scope.TiProxy](state, r.Client),
3838
// if it's paused just return
3939
task.IfBreak(common.CondClusterIsPaused(state)),
40+
common.TaskContextPod[scope.TiProxy](state, r.Client),
4041
// if the cluster is deleting, del all subresources and remove the finalizer directly
4142
task.IfBreak(common.CondClusterIsDeleting(state),
4243
common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister),
@@ -45,7 +46,10 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
4546
task.IfBreak(common.CondClusterPDAddrIsNotRegistered(state)),
4647

4748
task.IfBreak(common.CondObjectIsDeleting[scope.TiProxy](state),
48-
common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister),
49+
tasks.TaskDrainPodForDelete(state, r.Client),
50+
task.If(task.CondFunc(func() bool { return state.Pod() == nil }),
51+
common.TaskInstanceFinalizerDel[scope.TiProxy](state, r.Client, common.DefaultInstanceSubresourceLister),
52+
),
4953
// TODO(liubo02): if the finalizer has been removed, no need to update status
5054
common.TaskInstanceConditionSynced[scope.TiProxy](state),
5155
common.TaskInstanceConditionReady[scope.TiProxy](state),
@@ -54,8 +58,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
5458
),
5559
common.TaskFinalizerAdd[scope.TiProxy](state, r.Client),
5660

57-
// get pod and check whether the cluster is suspending
58-
common.TaskContextPod[scope.TiProxy](state, r.Client),
61+
// check whether the cluster is suspending
5962
task.IfBreak(common.CondClusterIsSuspending(state),
6063
common.TaskSuspendPod(state, r.Client),
6164
common.TaskInstanceConditionSuspended[scope.TiProxy](state),
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tasks
16+
17+
import (
18+
"context"
19+
"crypto/tls"
20+
"strconv"
21+
"time"
22+
23+
"github.com/go-logr/logr"
24+
corev1 "k8s.io/api/core/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
27+
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
28+
"github.com/pingcap/tidb-operator/v2/pkg/apicall"
29+
coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1"
30+
"github.com/pingcap/tidb-operator/v2/pkg/client"
31+
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
32+
"github.com/pingcap/tidb-operator/v2/pkg/tiproxyapi/v1"
33+
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
34+
)
35+
36+
func TaskDrainPodForDelete(state State, c client.Client) task.Task {
37+
return task.NameTaskFunc("DrainPodForDelete", func(ctx context.Context) task.Result {
38+
pod := state.Pod()
39+
if pod == nil {
40+
return task.Complete().With("pod doesn't exist")
41+
}
42+
43+
retryAfter, err := drainOrDeletePod(ctx, c, state, pod)
44+
if err != nil {
45+
return task.Fail().With("cannot delete pod of tiproxy: %v", err)
46+
}
47+
if retryAfter > 0 {
48+
return task.Retry(retryAfter).With("wait for tiproxy pod to be deleted")
49+
}
50+
return task.Complete().With("pod is deleted")
51+
})
52+
}
53+
54+
func drainOrDeletePod(ctx context.Context, c client.Client, state State, pod *corev1.Pod) (time.Duration, error) {
55+
logger := logr.FromContextOrDiscard(ctx)
56+
tiproxy := state.Object()
57+
58+
if !pod.GetDeletionTimestamp().IsZero() {
59+
return task.DefaultRequeueAfter, nil
60+
}
61+
62+
seconds, ok, err := gracefulShutdownDeleteDelaySeconds(tiproxy)
63+
if err != nil {
64+
return 0, err
65+
}
66+
if !ok || seconds <= 0 {
67+
return deleteTiProxyPod(ctx, c, pod)
68+
}
69+
70+
startAt, ok := gracefulShutdownBeginTime(pod)
71+
if !ok {
72+
if !ensureTiProxyMarkedUnhealthy(ctx, state, c, logger) {
73+
return task.DefaultRequeueAfter, nil
74+
}
75+
startAt = time.Now()
76+
if err := markGracefulShutdownBeginTime(ctx, c, pod, startAt); err != nil {
77+
return 0, err
78+
}
79+
}
80+
81+
remaining := time.Until(startAt.Add(time.Duration(seconds) * time.Second))
82+
if remaining > task.DefaultRequeueAfter {
83+
remaining = task.DefaultRequeueAfter
84+
}
85+
if remaining > 0 {
86+
return remaining, nil
87+
}
88+
89+
return deleteTiProxyPod(ctx, c, pod)
90+
}
91+
92+
func gracefulShutdownDeleteDelaySeconds(tiproxy *v1alpha1.TiProxy) (seconds int32, ok bool, err error) {
93+
raw := tiproxy.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownDeleteDelaySeconds]
94+
if raw == "" {
95+
return 0, false, nil
96+
}
97+
98+
parsed, err := strconv.ParseInt(raw, 10, 32)
99+
if err != nil {
100+
return 0, false, err
101+
}
102+
return int32(parsed), true, nil
103+
}
104+
105+
func gracefulShutdownBeginTime(pod *corev1.Pod) (time.Time, bool) {
106+
raw := pod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime]
107+
if raw == "" {
108+
return time.Time{}, false
109+
}
110+
111+
startAt, err := time.Parse(time.RFC3339Nano, raw)
112+
if err != nil {
113+
return time.Time{}, false
114+
}
115+
return startAt, true
116+
}
117+
118+
func markGracefulShutdownBeginTime(ctx context.Context, c client.Client, pod *corev1.Pod, startAt time.Time) error {
119+
newPod := pod.DeepCopy()
120+
if newPod.Annotations == nil {
121+
newPod.Annotations = map[string]string{}
122+
}
123+
newPod.Annotations[v1alpha1.AnnoKeyTiProxyGracefulShutdownBeginTime] = startAt.Format(time.RFC3339Nano)
124+
return c.Update(ctx, newPod)
125+
}
126+
127+
func deleteTiProxyPod(ctx context.Context, c client.Client, pod *corev1.Pod) (time.Duration, error) {
128+
if err := c.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
129+
return 0, err
130+
}
131+
return task.DefaultRequeueAfter, nil
132+
}
133+
134+
func ensureTiProxyMarkedUnhealthy(ctx context.Context, state State, c client.Client, logger logr.Logger) bool {
135+
tiproxy := state.Object()
136+
137+
tpClient, err := newTiProxyDeleteClient(ctx, state, c)
138+
if err != nil {
139+
logger.Info(
140+
"failed to build TiProxy API client before graceful delete, continue retrying",
141+
"namespace", tiproxy.Namespace,
142+
"name", tiproxy.Name,
143+
"error", err,
144+
)
145+
return false
146+
}
147+
148+
healthy, err := tpClient.IsHealthy(ctx)
149+
if err != nil {
150+
logger.Info(
151+
"failed to query TiProxy health before graceful delete retry, continue retrying",
152+
"namespace", tiproxy.Namespace,
153+
"name", tiproxy.Name,
154+
"error", err,
155+
)
156+
return false
157+
}
158+
if !healthy {
159+
return true
160+
}
161+
162+
if err := tpClient.MarkUnhealthy(ctx); err != nil {
163+
logger.Info(
164+
"failed to mark TiProxy unhealthy before graceful delete, continue retrying",
165+
"namespace", tiproxy.Namespace,
166+
"name", tiproxy.Name,
167+
"error", err,
168+
)
169+
return false
170+
}
171+
172+
healthy, err = tpClient.IsHealthy(ctx)
173+
if err != nil {
174+
logger.Info(
175+
"failed to re-check TiProxy health after graceful delete action, continue retrying",
176+
"namespace", tiproxy.Namespace,
177+
"name", tiproxy.Name,
178+
"error", err,
179+
)
180+
return false
181+
}
182+
if healthy {
183+
logger.Info(
184+
"TiProxy health is still healthy after graceful delete action, continue retrying",
185+
"namespace", tiproxy.Namespace,
186+
"name", tiproxy.Name,
187+
)
188+
return false
189+
}
190+
return true
191+
}
192+
193+
func newTiProxyDeleteClient(ctx context.Context, state State, c client.Client) (tiproxyapi.TiProxyClient, error) {
194+
ck := state.Cluster()
195+
196+
var tlsConfig *tls.Config
197+
if coreutil.IsTiProxyHTTPServerTLSEnabled(ck, state.Object()) {
198+
var err error
199+
tlsConfig, err = apicall.GetClientTLSConfig(ctx, c, ck)
200+
if err != nil {
201+
return nil, err
202+
}
203+
}
204+
205+
tiproxy := state.TiProxy()
206+
addr := coreutil.InstanceAdvertiseAddress[scope.TiProxy](ck, tiproxy, coreutil.TiProxyAPIPort(tiproxy))
207+
return tiproxyapi.NewTiProxyClient(addr, tiproxyRequestTimeout, tlsConfig), nil
208+
}

0 commit comments

Comments
 (0)