Skip to content

Commit 0bbe316

Browse files
authored
orchestrator: clear stale capture tombstones on re-register (#4696)
close #4695
1 parent 0213a79 commit 0bbe316

5 files changed

Lines changed: 179 additions & 35 deletions

File tree

coordinator/coordinator_test.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,9 @@ func TestCoordinatorScheduling(t *testing.T) {
373373
}
374374

375375
func TestScaleNode(t *testing.T) {
376-
ctx := context.Background()
377-
info := node.NewInfo("127.0.0.1:28300", "")
376+
ctx, cancel := context.WithCancel(context.Background())
377+
t.Cleanup(cancel)
378+
info, lis1 := newMaintainerNodeForTest(t)
378379
etcdClient := newMockEtcdClient(string(info.ID))
379380
nodeManager := watcher.NewNodeManager(nil, etcdClient)
380381
appcontext.SetService(watcher.NodeManagerName, nodeManager)
@@ -384,13 +385,10 @@ func TestScaleNode(t *testing.T) {
384385
cfg := config.NewDefaultMessageCenterConfig(info.AdvertiseAddr)
385386
mc1 := messaging.NewMessageCenter(ctx, info.ID, cfg, nil)
386387
mc1.Run(ctx)
387-
defer func() {
388-
mc1.Close()
389-
log.Info("close message center 1")
390-
}()
391388

392389
appcontext.SetService(appcontext.MessageCenter, mc1)
393-
startMaintainerNode(ctx, info, mc1, nodeManager)
390+
node1 := startMaintainerNode(ctx, info, mc1, nodeManager, lis1)
391+
t.Cleanup(node1.stop)
394392

395393
serviceID := "default"
396394

@@ -428,23 +426,16 @@ func TestScaleNode(t *testing.T) {
428426
}, waitTime, time.Millisecond*5)
429427

430428
// add two nodes
431-
info2 := node.NewInfo("127.0.0.1:28400", "")
429+
info2, lis2 := newMaintainerNodeForTest(t)
432430
mc2 := messaging.NewMessageCenter(ctx, info2.ID, config.NewDefaultMessageCenterConfig(info2.AdvertiseAddr), nil)
433431
mc2.Run(ctx)
434-
defer func() {
435-
mc2.Close()
436-
log.Info("close message center 2")
437-
}()
438-
startMaintainerNode(ctx, info2, mc2, nodeManager)
439-
info3 := node.NewInfo("127.0.0.1:28500", "")
432+
node2 := startMaintainerNode(ctx, info2, mc2, nodeManager, lis2)
433+
t.Cleanup(node2.stop)
434+
info3, lis3 := newMaintainerNodeForTest(t)
440435
mc3 := messaging.NewMessageCenter(ctx, info3.ID, config.NewDefaultMessageCenterConfig(info3.AdvertiseAddr), nil)
441436
mc3.Run(ctx)
442-
defer func() {
443-
mc3.Close()
444-
log.Info("close message center 3")
445-
}()
446-
447-
startMaintainerNode(ctx, info3, mc3, nodeManager)
437+
node3 := startMaintainerNode(ctx, info3, mc3, nodeManager, lis3)
438+
t.Cleanup(node3.stop)
448439

449440
log.Info("Start maintainer node",
450441
zap.Stringer("id", info3.ID),
@@ -490,7 +481,7 @@ func TestScaleNode(t *testing.T) {
490481
func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
491482
ctx, cancel := context.WithCancel(context.Background())
492483
defer cancel()
493-
info := node.NewInfo("127.0.0.1:28301", "")
484+
info, lis := newMaintainerNodeForTest(t)
494485
etcdClient := newMockEtcdClient(string(info.ID))
495486
nodeManager := watcher.NewNodeManager(nil, etcdClient)
496487
appcontext.SetService(watcher.NodeManagerName, nodeManager)
@@ -500,10 +491,10 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
500491

501492
mc1 := messaging.NewMessageCenter(ctx, info.ID, config.NewDefaultMessageCenterConfig(info.AdvertiseAddr), nil)
502493
mc1.Run(ctx)
503-
defer mc1.Close()
504494

505495
appcontext.SetService(appcontext.MessageCenter, mc1)
506-
mNode := startMaintainerNode(ctx, info, mc1, nodeManager)
496+
mNode := startMaintainerNode(ctx, info, mc1, nodeManager, lis)
497+
defer mNode.stop()
507498

508499
removingCf1 := &changefeed.ChangefeedMetaWrapper{
509500
Info: &config.ChangeFeedInfo{
@@ -712,40 +703,51 @@ type maintainNode struct {
712703
cancel context.CancelFunc
713704
mc messaging.MessageCenter
714705
manager *mockMaintainerManager
706+
wg sync.WaitGroup
715707
}
716708

717709
func (d *maintainNode) stop() {
718-
d.mc.Close()
719710
d.cancel()
711+
d.wg.Wait()
712+
d.mc.Close()
713+
}
714+
715+
func newMaintainerNodeForTest(t *testing.T) (*node.Info, net.Listener) {
716+
t.Helper()
717+
718+
lis, err := net.Listen("tcp", "127.0.0.1:0")
719+
require.NoError(t, err)
720+
721+
return node.NewInfo(lis.Addr().String(), ""), lis
720722
}
721723

722724
func startMaintainerNode(ctx context.Context,
723725
node *node.Info, mc messaging.MessageCenter,
724726
nodeManager *watcher.NodeManager,
727+
lis net.Listener,
725728
) *maintainNode {
726729
nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges)
727730
ctx, cancel := context.WithCancel(ctx)
728731
maintainerM := NewMaintainerManager(mc)
732+
res := &maintainNode{
733+
cancel: cancel,
734+
mc: mc,
735+
manager: maintainerM,
736+
}
737+
res.wg.Add(1)
729738
go func() {
739+
defer res.wg.Done()
730740
var opts []grpc.ServerOption
731741
grpcServer := grpc.NewServer(opts...)
732742
mcs := messaging.NewMessageCenterServer(mc)
733743
proto.RegisterMessageServiceServer(grpcServer, mcs)
734-
lis, err := net.Listen("tcp", node.AdvertiseAddr)
735-
if err != nil {
736-
panic(err)
737-
}
738744
go func() {
739745
_ = grpcServer.Serve(lis)
740746
}()
741747
_ = maintainerM.Run(ctx)
742748
grpcServer.Stop()
743749
}()
744-
return &maintainNode{
745-
cancel: cancel,
746-
mc: mc,
747-
manager: maintainerM,
748-
}
750+
return res
749751
}
750752

751753
type mockEtcdClient struct {

coordinator/operator/operator_move.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,13 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) {
9898
m.lck.Lock()
9999
defer m.lck.Unlock()
100100

101-
if m.finished || m.canceled {
101+
if m.canceled {
102102
return
103103
}
104104

105105
if n == m.dest {
106-
// the origin node is finished, we must mark the maintainer as absent to reschedule it again
106+
// Node removal must win over a just-finished move. Otherwise PostFinish can still
107+
// mark the changefeed replicating on a node that has already been removed.
107108
if m.originNodeStopped {
108109
log.Info("dest node is stopped, mark changefeed absent",
109110
zap.String("changefeed", m.changefeed.ID.String()),
@@ -123,6 +124,10 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) {
123124
m.db.BindChangefeedToNode(m.dest, m.origin, m.changefeed)
124125
m.bind = true
125126
m.originNodeStopped = true
127+
return
128+
}
129+
if m.finished {
130+
return
126131
}
127132
if n == m.origin {
128133
log.Info("origin node is stopped",

coordinator/operator/operator_move_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pingcap/ticdc/heartbeatpb"
2121
"github.com/pingcap/ticdc/pkg/common"
2222
"github.com/pingcap/ticdc/pkg/config"
23+
"github.com/pingcap/ticdc/pkg/node"
2324
"github.com/stretchr/testify/require"
2425
)
2526

@@ -106,3 +107,35 @@ func TestMoveMaintainerOperator_CheckRequiresDestBootstrapDone(t *testing.T) {
106107
require.True(t, op.finished)
107108
require.Nil(t, op.Schedule())
108109
}
110+
111+
func TestMoveMaintainerOperator_OnNodeRemoveAfterFinishMarksAbsent(t *testing.T) {
112+
changefeedDB := changefeed.NewChangefeedDB(1216)
113+
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
114+
cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{
115+
ChangefeedID: cfID,
116+
Config: config.GetDefaultReplicaConfig(),
117+
SinkURI: "mysql://127.0.0.1:3306",
118+
},
119+
1, true)
120+
changefeedDB.AddReplicatingMaintainer(cf, "n1")
121+
122+
op := NewMoveMaintainerOperator(changefeedDB, cf, "n1", "n2")
123+
op.Check("n1", &heartbeatpb.MaintainerStatus{State: heartbeatpb.ComponentState_Stopped})
124+
require.NotNil(t, op.Schedule())
125+
require.Equal(t, node.ID("n2"), cf.GetNodeID())
126+
127+
op.Check("n2", &heartbeatpb.MaintainerStatus{
128+
State: heartbeatpb.ComponentState_Working,
129+
BootstrapDone: true,
130+
})
131+
require.True(t, op.finished)
132+
133+
op.OnNodeRemove("n2")
134+
require.True(t, op.canceled)
135+
require.Equal(t, 1, changefeedDB.GetAbsentSize())
136+
require.Len(t, changefeedDB.GetByNodeID("n2"), 0)
137+
138+
op.PostFinish()
139+
require.Equal(t, 1, changefeedDB.GetAbsentSize())
140+
require.Len(t, changefeedDB.GetByNodeID("n2"), 0)
141+
}

pkg/orchestrator/reactor_state.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
115115
}
116116

117117
log.Info("remote capture online", zap.Any("info", newCaptureInfo), zap.String("role", s.Role))
118+
// A fresh online event supersedes any pending delayed removal for the same capture.
119+
delete(s.toRemoveCaptures, k.CaptureID)
118120
if s.onCaptureAdded != nil {
119121
s.onCaptureAdded(k.CaptureID, newCaptureInfo.AdvertiseAddr)
120122
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package orchestrator
15+
16+
import (
17+
"testing"
18+
"time"
19+
20+
"github.com/pingcap/ticdc/pkg/config"
21+
"github.com/pingcap/ticdc/pkg/etcd"
22+
"github.com/pingcap/ticdc/pkg/orchestrator/util"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestGlobalReactorStateKeepsCaptureAfterReRegister(t *testing.T) {
27+
t.Parallel()
28+
29+
state := NewGlobalState(etcd.DefaultCDCClusterID, 0)
30+
state.captureRemoveTTL = 10
31+
32+
captureID := config.CaptureID("capture-1")
33+
var removed []config.CaptureID
34+
state.SetOnCaptureRemoved(func(id config.CaptureID) {
35+
removed = append(removed, id)
36+
})
37+
38+
mustUpdateCapture(t, state, captureID, "127.0.0.1:8300")
39+
mustDeleteCapture(t, state, captureID)
40+
state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second)
41+
mustUpdateCapture(t, state, captureID, "127.0.0.1:8301")
42+
43+
state.UpdatePendingChange()
44+
45+
require.Contains(t, state.Captures, captureID)
46+
require.Equal(t, "127.0.0.1:8301", state.Captures[captureID].AdvertiseAddr)
47+
require.Empty(t, removed)
48+
require.NotContains(t, state.toRemoveCaptures, captureID)
49+
}
50+
51+
func TestGlobalReactorStateRemovesCaptureAfterTombstoneExpires(t *testing.T) {
52+
t.Parallel()
53+
54+
state := NewGlobalState(etcd.DefaultCDCClusterID, 0)
55+
state.captureRemoveTTL = 10
56+
57+
captureID := config.CaptureID("capture-1")
58+
var removed []config.CaptureID
59+
state.SetOnCaptureRemoved(func(id config.CaptureID) {
60+
removed = append(removed, id)
61+
})
62+
63+
mustUpdateCapture(t, state, captureID, "127.0.0.1:8300")
64+
mustDeleteCapture(t, state, captureID)
65+
66+
state.UpdatePendingChange()
67+
require.Contains(t, state.Captures, captureID)
68+
require.Empty(t, removed)
69+
70+
state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second)
71+
state.UpdatePendingChange()
72+
73+
require.NotContains(t, state.Captures, captureID)
74+
require.Equal(t, []config.CaptureID{captureID}, removed)
75+
require.NotContains(t, state.toRemoveCaptures, captureID)
76+
}
77+
78+
func mustUpdateCapture(
79+
t *testing.T,
80+
state *GlobalReactorState,
81+
captureID config.CaptureID,
82+
advertiseAddr string,
83+
) {
84+
t.Helper()
85+
86+
info := &config.CaptureInfo{
87+
ID: captureID,
88+
AdvertiseAddr: advertiseAddr,
89+
}
90+
data, err := info.Marshal()
91+
require.NoError(t, err)
92+
93+
err = state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), data, false)
94+
require.NoError(t, err)
95+
}
96+
97+
func mustDeleteCapture(t *testing.T, state *GlobalReactorState, captureID config.CaptureID) {
98+
t.Helper()
99+
100+
err := state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), nil, false)
101+
require.NoError(t, err)
102+
}

0 commit comments

Comments
 (0)