Skip to content

Commit 2f5c253

Browse files
authored
feat(updater): enhance scaling and update logic for TiCDC instances (#6488)
1 parent 7620fa5 commit 2f5c253

4 files changed

Lines changed: 112 additions & 17 deletions

File tree

go.work.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ github.com/aws/aws-sdk-go-v2/service/sns v1.31.3/go.mod h1:1dn0delSO3J69THuty5iw
320320
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3/go.mod h1:L0enV3GCRd5iG9B64W35C4/hwsCB00Ib+DKVGTadKHI=
321321
github.com/aws/aws-sdk-go-v2/service/ssm v1.52.4/go.mod h1:v7NIzEFIHBiicOMaMTuEmbnzGnqW0d+6ulNALul6fYE=
322322
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
323+
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
323324
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
324325
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
325326
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -340,6 +341,7 @@ github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04=
340341
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
341342
github.com/cilium/ebpf v0.11.0/go.mod h1:WE7CZAnqOL2RouJ4f1uyNhqr2P4CCvXFIqdRDUgWsVs=
342343
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
344+
github.com/cloudfoundry/gosigar v1.3.6 h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRGXkcVc=
343345
github.com/cloudfoundry/gosigar v1.3.6/go.mod h1:lNWstu5g5gw59O09Y+wsMNFzBSnU8a0u+Sfx4dq360E=
344346
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU=
345347
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@@ -875,6 +877,7 @@ golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
875877
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
876878
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457 h1:zf5N6UOrA487eEFacMePxjXAJctxKmyjKUsjA11Uzuk=
877879
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
880+
golang.org/x/telemetry v0.0.0-20250807160809-1a19826ec488 h1:3doPGa+Gg4snce233aCWnbZVFsyFMo/dR40KK/6skyE=
878881
golang.org/x/telemetry v0.0.0-20250807160809-1a19826ec488/go.mod h1:fGb/2+tgXXjhjHsTNdVEEMZNWA0quBnfrO+AfoDSAKw=
879882
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
880883
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

pkg/controllers/ticdcgroup/tasks/updater.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
coreutil "github.com/pingcap/tidb-operator/pkg/apiutil/core/v1alpha1"
2828
"github.com/pingcap/tidb-operator/pkg/client"
2929
"github.com/pingcap/tidb-operator/pkg/features"
30+
"github.com/pingcap/tidb-operator/pkg/reloadable"
3031
"github.com/pingcap/tidb-operator/pkg/runtime"
3132
"github.com/pingcap/tidb-operator/pkg/runtime/scope"
3233
"github.com/pingcap/tidb-operator/pkg/updater"
@@ -70,6 +71,18 @@ func TaskUpdater(state *ReconcileContext, c client.Client, af tracker.AllocateFa
7071
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
7172
}
7273

74+
needUpdate, needRestart := precheckInstances(cdcg, runtime.ToTiCDCSlice(cdcs), updateRevision)
75+
if !needUpdate {
76+
return task.Complete().With("all instances are synced")
77+
}
78+
79+
maxSurge, maxUnavailable := 0, 1
80+
noUpdate := false
81+
if needRestart {
82+
maxSurge, maxUnavailable = 1, 0
83+
noUpdate = true
84+
}
85+
7386
var instances []string
7487
for _, in := range cdcs {
7588
instances = append(instances, in.Name)
@@ -81,8 +94,8 @@ func TaskUpdater(state *ReconcileContext, c client.Client, af tracker.AllocateFa
8194
WithInstances(cdcs...).
8295
WithDesired(int(state.Group().Replicas())).
8396
WithClient(c).
84-
WithMaxSurge(0).
85-
WithMaxUnavailable(1).
97+
WithMaxSurge(maxSurge).
98+
WithMaxUnavailable(maxUnavailable).
8699
WithRevision(updateRevision).
87100
WithNewFactory(TiCDCNewer(cdcg, updateRevision, state.FeatureGates())).
88101
WithAddHooks(
@@ -94,6 +107,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client, af tracker.AllocateFa
94107
WithScaleInPreferPolicy(
95108
topoPolicy,
96109
).
110+
WithNoInPaceUpdate(noUpdate).
97111
Build().
98112
Do(ctx)
99113
if err != nil {
@@ -139,3 +153,21 @@ func TiCDCNewer(cdcg *v1alpha1.TiCDCGroup, rev string, fg features.Gates) update
139153
return runtime.FromTiCDC(ticdc)
140154
})
141155
}
156+
157+
func precheckInstances(cdcg *v1alpha1.TiCDCGroup, cdcs []*v1alpha1.TiCDC, updateRevision string) (needUpdate, needRestart bool) {
158+
if len(cdcs) != int(coreutil.Replicas[scope.TiCDCGroup](cdcg)) {
159+
needUpdate = true
160+
}
161+
for _, cdc := range cdcs {
162+
if coreutil.UpdateRevision[scope.TiCDC](cdc) == updateRevision {
163+
continue
164+
}
165+
166+
needUpdate = true
167+
if !reloadable.CheckTiCDC(cdcg, cdc) {
168+
needRestart = true
169+
}
170+
}
171+
172+
return needUpdate, needRestart
173+
}

tests/e2e/cluster/cluster.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -872,28 +872,27 @@ var _ = Describe("TiDB Cluster", func() {
872872
})
873873
})
874874

875-
It("pd/tikv/tiflash/ticdc: should perform a rolling update for config change", func() {
875+
It("pd/tikv/tiflash: should perform a rolling update for config change", func() {
876876
pdg := data.NewPDGroup(ns.Name, "pd", tc.Name, ptr.To(int32(3)), nil)
877877
kvg := data.NewTiKVGroup(ns.Name, "tikv", tc.Name, ptr.To(int32(3)), nil)
878878
dbg := data.NewTiDBGroup(ns.Name, "tidb", tc.Name, ptr.To(int32(1)), nil)
879879
flashg := data.NewTiFlashGroup(ns.Name, "flash", tc.Name, ptr.To(int32(3)), nil)
880-
cdcg := data.NewTiCDCGroup(ns.Name, "cdc", tc.Name, ptr.To(int32(3)), nil)
880+
881881
Expect(k8sClient.Create(ctx, pdg)).To(Succeed())
882882
Expect(k8sClient.Create(ctx, kvg)).To(Succeed())
883883
Expect(k8sClient.Create(ctx, dbg)).To(Succeed())
884884
Expect(k8sClient.Create(ctx, flashg)).To(Succeed())
885-
Expect(k8sClient.Create(ctx, cdcg)).To(Succeed())
886885

887886
By("Waiting for the cluster to be ready")
888887
Eventually(func(g Gomega) {
889888
_, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace)
890889
g.Expect(ready).To(BeTrue())
891890
g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg,
892891
[]*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg},
893-
[]*v1alpha1.TiFlashGroup{flashg}, []*v1alpha1.TiCDCGroup{cdcg})).To(Succeed())
892+
[]*v1alpha1.TiFlashGroup{flashg}, nil)).To(Succeed())
894893
}).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed())
895894

896-
groupNames := []string{pdg.Name, kvg.Name, flashg.Name, cdcg.Name}
895+
groupNames := []string{pdg.Name, kvg.Name, flashg.Name}
897896
outerCtx, cancel := context.WithCancel(ctx)
898897
defer cancel()
899898
for _, groupName := range groupNames {
@@ -972,21 +971,14 @@ var _ = Describe("TiDB Cluster", func() {
972971
flashgGet.Spec.Template.Spec.Config = v1alpha1.ConfigFile(cfg)
973972
updateTime = time.Now()
974973
Expect(k8sClient.Update(ctx, &flashgGet)).To(Succeed())
975-
case "cdc":
976-
cfg = "log-level = 'debug'"
977-
var cdcgGet v1alpha1.TiCDCGroup
978-
Expect(k8sClient.Get(ctx, client.ObjectKey{Namespace: tc.Namespace, Name: groupName}, &cdcgGet)).To(Succeed())
979-
cdcgGet.Spec.Template.Spec.Config = v1alpha1.ConfigFile(cfg)
980-
updateTime = time.Now()
981-
Expect(k8sClient.Update(ctx, &cdcgGet)).To(Succeed())
982974
}
983975

984976
Eventually(func(g Gomega) {
985977
_, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace)
986978
g.Expect(ready).To(BeTrue())
987979
g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg,
988980
[]*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg},
989-
[]*v1alpha1.TiFlashGroup{flashg}, []*v1alpha1.TiCDCGroup{cdcg})).To(Succeed())
981+
[]*v1alpha1.TiFlashGroup{flashg}, nil)).To(Succeed())
990982

991983
podList, err := clientSet.CoreV1().Pods(tc.Namespace).List(ctx, listOpts)
992984
g.Expect(err).To(BeNil())

tests/e2e/ticdc/ticdc.go

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@ package ticdc
1616

1717
import (
1818
"context"
19+
"time"
1920

2021
"github.com/onsi/ginkgo/v2"
22+
"github.com/pingcap/tidb-operator/pkg/client"
23+
"github.com/pingcap/tidb-operator/tests/e2e/utils/waiter"
24+
"k8s.io/utils/ptr"
2125

2226
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
2327
"github.com/pingcap/tidb-operator/pkg/apicall"
@@ -29,12 +33,14 @@ import (
2933
"github.com/pingcap/tidb-operator/tests/e2e/utils/cert"
3034
)
3135

36+
const (
37+
changedConfig = `log-level = 'warn'`
38+
)
39+
3240
var _ = ginkgo.Describe("TiCDC", label.TiCDC, func() {
3341
f := framework.New()
3442
f.Setup()
3543

36-
// NOTE(liubo02): this case is failed in e2e env because of the cgroup v2.
37-
// Enable it if env is fixed.
3844
ginkgo.DescribeTableSubtree("Leader Eviction", label.P1,
3945
func(enableTLS bool) {
4046
if enableTLS {
@@ -92,4 +98,66 @@ var _ = ginkgo.Describe("TiCDC", label.TiCDC, func() {
9298
ginkgo.Entry(nil, false),
9399
ginkgo.Entry(nil, label.FeatureTLS, true),
94100
)
101+
102+
ginkgo.Context("Scale and Update", label.P0, func() {
103+
ginkgo.It("support scale out/in TiCDC", label.Scale, func(ctx context.Context) {
104+
pdg := f.MustCreatePD(ctx)
105+
kvg := f.MustCreateTiKV(ctx)
106+
dbg := f.MustCreateTiDB(ctx)
107+
cdcg := f.MustCreateTiCDC(ctx)
108+
109+
ginkgo.By("Wait for Cluster Ready")
110+
f.WaitForPDGroupReady(ctx, pdg)
111+
f.WaitForTiKVGroupReady(ctx, kvg)
112+
f.WaitForTiDBGroupReady(ctx, dbg)
113+
f.WaitForTiCDCGroupReady(ctx, cdcg)
114+
115+
ginkgo.By("Change replica of the TiCDCGroup to 4")
116+
patch := client.MergeFrom(cdcg.DeepCopy())
117+
cdcg.Spec.Replicas = ptr.To[int32](4)
118+
f.Must(f.Client.Patch(ctx, cdcg, patch))
119+
f.WaitForTiCDCGroupReady(ctx, cdcg)
120+
121+
ginkgo.By("Change replica of the TiCDCGroup to 2")
122+
patch = client.MergeFrom(cdcg.DeepCopy())
123+
cdcg.Spec.Replicas = ptr.To[int32](2)
124+
f.Must(f.Client.Patch(ctx, cdcg, patch))
125+
f.WaitForTiCDCGroupReady(ctx, cdcg)
126+
})
127+
128+
ginkgo.It("support scale TiCDC from 3 to 2 and rolling update at same time", label.Scale, label.Update, func(ctx context.Context) {
129+
pdg := f.MustCreatePD(ctx)
130+
kvg := f.MustCreateTiKV(ctx)
131+
dbg := f.MustCreateTiDB(ctx)
132+
cdcg := f.MustCreateTiCDC(ctx, data.WithReplicas[*runtime.TiCDCGroup](3))
133+
134+
f.WaitForPDGroupReady(ctx, pdg)
135+
f.WaitForTiKVGroupReady(ctx, kvg)
136+
f.WaitForTiDBGroupReady(ctx, dbg)
137+
f.WaitForTiCDCGroupReady(ctx, cdcg)
138+
139+
patch := client.MergeFrom(cdcg.DeepCopy())
140+
cdcg.Spec.Replicas = ptr.To[int32](2)
141+
cdcg.Spec.Template.Spec.Config = changedConfig
142+
143+
nctx, cancel := context.WithCancel(ctx)
144+
ch := make(chan struct{})
145+
go func() {
146+
defer close(ch)
147+
defer ginkgo.GinkgoRecover()
148+
f.Must(waiter.WaitPodsRollingUpdateOnce(nctx, f.Client, runtime.FromTiCDCGroup(cdcg), 3, 1, waiter.LongTaskTimeout))
149+
}()
150+
151+
maxTime, err := waiter.MaxPodsCreateTimestamp(ctx, f.Client, runtime.FromTiCDCGroup(cdcg))
152+
f.Must(err)
153+
changeTime := maxTime.Add(time.Second)
154+
155+
ginkgo.By("Change config and replicas of the TiCDCGroup")
156+
f.Must(f.Client.Patch(ctx, cdcg, patch))
157+
f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromTiCDCGroup(cdcg), changeTime, waiter.LongTaskTimeout))
158+
f.WaitForTiCDCGroupReady(ctx, cdcg)
159+
cancel()
160+
<-ch
161+
})
162+
})
95163
})

0 commit comments

Comments
 (0)