Skip to content

Commit 3c6a882

Browse files
authored
downstreamadapter: preserve remove upgrade during close (#4815)
close #4825
1 parent 0bbe316 commit 3c6a882

22 files changed

Lines changed: 251 additions & 88 deletions

File tree

cmd/storage-consumer/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func main() {
101101
deferFunc := func() int {
102102
stop()
103103
if consumer != nil {
104-
consumer.sink.Close(false)
104+
consumer.sink.Close()
105105
}
106106
if err != nil && err != context.Canceled {
107107
return 1

downstreamadapter/dispatcher/mock_sink_helper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func newDispatcherTestSink(t *testing.T, sinkType common.SinkType) *dispatcherTe
7272
}).AnyTimes()
7373
testSink.sink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes()
7474
testSink.sink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes()
75-
testSink.sink.EXPECT().Close(gomock.Any()).AnyTimes()
75+
testSink.sink.EXPECT().Close().AnyTimes()
7676
testSink.sink.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
7777
return testSink
7878
}

downstreamadapter/dispatchermanager/dispatcher_manager.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,19 @@ type DispatcherManager struct {
133133

134134
closing atomic.Bool
135135
closed atomic.Bool
136-
cancel context.CancelFunc
137-
wg sync.WaitGroup
136+
// removeChangefeedRequested is sticky once any close request asks for removed=true.
137+
// A later removed=false request must not downgrade the final cleanup semantics.
138+
removeChangefeedRequested atomic.Bool
139+
// removeChangefeedCleaned records whether the best-effort remove-only cleanup has finished.
140+
// It is intentionally not part of the TryClose success condition so the close contract
141+
// stays compatible with the historical behavior.
142+
removeChangefeedCleaned atomic.Bool
143+
// removeChangefeedCleanupRunning prevents duplicate background cleanup runs
144+
// while late remove requests or retries keep asking for remove semantics after
145+
// the base close path ends.
146+
removeChangefeedCleanupRunning atomic.Bool
147+
cancel context.CancelFunc
148+
wg sync.WaitGroup
138149

139150
// removeTaskHandles stores the task handles for async dispatcher removal
140151
// map[common.DispatcherID]*threadpool.TaskHandle
@@ -855,19 +866,23 @@ func (e *DispatcherManager) mergeEventDispatcher(dispatcherIDs []common.Dispatch
855866
// ==== remove and clean related functions ====
856867

857868
func (e *DispatcherManager) TryClose(removeChangefeed bool) bool {
869+
if removeChangefeed {
870+
e.removeChangefeedRequested.Store(true)
871+
}
858872
if e.closed.Load() {
873+
e.tryScheduleRemoveChangefeedCleanup()
859874
return true
860875
}
861876
if e.closing.Load() {
862877
return e.closed.Load()
863878
}
864879

865880
e.closing.Store(true)
866-
go e.close(removeChangefeed)
881+
go e.close()
867882
return false
868883
}
869884

870-
func (e *DispatcherManager) close(removeChangefeed bool) {
885+
func (e *DispatcherManager) close() {
871886
log.Info("closing event dispatcher manager",
872887
zap.Stringer("changefeedID", e.changefeedID))
873888

@@ -919,11 +934,9 @@ func (e *DispatcherManager) close(removeChangefeed bool) {
919934
log.Info("shared info closed", zap.Stringer("changefeedID", e.changefeedID))
920935

921936
if e.IsRedoEnabled() {
922-
e.redoSink.Close(removeChangefeed)
923-
// FIXME: cleanup redo log when remove the changefeed
924-
e.closeRedoMeta(removeChangefeed)
937+
e.redoSink.Close()
925938
}
926-
e.sink.Close(removeChangefeed)
939+
e.sink.Close()
927940
log.Info("sink closed", zap.Stringer("changefeedID", e.changefeedID))
928941

929942
e.wg.Wait()
@@ -937,10 +950,58 @@ func (e *DispatcherManager) close(removeChangefeed bool) {
937950
e.cleanMetrics()
938951

939952
e.closed.Store(true)
953+
e.tryScheduleRemoveChangefeedCleanup()
940954
log.Info("event dispatcher manager closed",
941955
zap.Stringer("changefeedID", e.changefeedID))
942956
}
943957

958+
func (e *DispatcherManager) tryScheduleRemoveChangefeedCleanup() {
959+
if !e.removeChangefeedRequested.Load() {
960+
return
961+
}
962+
if e.removeChangefeedCleaned.Load() {
963+
return
964+
}
965+
if !e.removeChangefeedCleanupRunning.CompareAndSwap(false, true) {
966+
return
967+
}
968+
969+
go func() {
970+
defer e.removeChangefeedCleanupRunning.Store(false)
971+
972+
if err := e.runRemoveChangefeedCleanup(); err != nil {
973+
log.Warn("failed to cleanup removed changefeed",
974+
zap.Stringer("changefeedID", e.changefeedID),
975+
zap.Error(err))
976+
return
977+
}
978+
e.removeChangefeedCleaned.Store(true)
979+
}()
980+
}
981+
982+
func (e *DispatcherManager) runRemoveChangefeedCleanup() error {
983+
if !e.removeChangefeedRequested.Load() || e.removeChangefeedCleaned.Load() {
984+
return nil
985+
}
986+
987+
if e.IsRedoEnabled() {
988+
// Redo meta cleanup is the remove-only step for redo mode. It is safe to retry
989+
// because removeChangefeedCleaned is only set after all remove-only cleanup succeeds.
990+
if err := e.closeRedoMeta(true); err != nil {
991+
return errors.Trace(err)
992+
}
993+
}
994+
995+
if mysqlSink, ok := e.sink.(*mysql.Sink); ok {
996+
// MySQL sink may still need ddl_ts cleanup after the base close path has already
997+
// released the long-lived DB connection, so keep the retryable remove step here.
998+
if err := mysqlSink.CleanupRemovedChangefeed(); err != nil {
999+
return errors.Trace(err)
1000+
}
1001+
}
1002+
return nil
1003+
}
1004+
9441005
// cleanEventDispatcher is called when the event dispatcher is removed successfully.
9451006
func (e *DispatcherManager) cleanEventDispatcher(id common.DispatcherID, schemaID int64) {
9461007
e.dispatcherMap.Delete(id)

downstreamadapter/dispatchermanager/dispatcher_manager_redo.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,16 +257,19 @@ func (e *DispatcherManager) cleanRedoDispatcher(id common.DispatcherID, schemaID
257257
)
258258
}
259259

260-
func (e *DispatcherManager) closeRedoMeta(removeChangefeed bool) {
260+
func (e *DispatcherManager) closeRedoMeta(removeChangefeed bool) error {
261261
if d := e.GetTableTriggerRedoDispatcher(); d != nil {
262262
redoMeta := d.GetRedoMeta()
263263
if redoMeta != nil {
264264
redoMeta.CleanupMetrics()
265265
if removeChangefeed {
266-
redoMeta.Cleanup(context.Background())
266+
if err := redoMeta.Cleanup(context.Background()); err != nil {
267+
return errors.Trace(err)
268+
}
267269
}
268270
}
269271
}
272+
return nil
270273
}
271274

272275
func (e *DispatcherManager) InitalizeTableTriggerRedoDispatcher(schemaInfo []*heartbeatpb.SchemaInfo) error {

downstreamadapter/dispatchermanager/dispatcher_manager_test.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/pingcap/ticdc/downstreamadapter/eventcollector"
2525
"github.com/pingcap/ticdc/downstreamadapter/sink"
2626
"github.com/pingcap/ticdc/downstreamadapter/sink/mock"
27+
"github.com/pingcap/ticdc/downstreamadapter/sink/mysql"
2728
"github.com/pingcap/ticdc/heartbeatpb"
2829
"github.com/pingcap/ticdc/pkg/common"
2930
appcontext "github.com/pingcap/ticdc/pkg/common/context"
@@ -33,6 +34,7 @@ import (
3334
"github.com/pingcap/ticdc/pkg/metrics"
3435
"github.com/pingcap/ticdc/pkg/node"
3536
"github.com/pingcap/ticdc/pkg/pdutil"
37+
mysqlcfg "github.com/pingcap/ticdc/pkg/sink/mysql"
3638
"github.com/pingcap/ticdc/pkg/util"
3739
"github.com/pingcap/ticdc/utils/threadpool"
3840
"github.com/stretchr/testify/require"
@@ -53,7 +55,7 @@ func newDispatcherManagerTestSink(t *testing.T, sinkType common.SinkType) sink.S
5355
}).AnyTimes()
5456
mockSink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes()
5557
mockSink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes()
56-
mockSink.EXPECT().Close(gomock.Any()).AnyTimes()
58+
mockSink.EXPECT().Close().AnyTimes()
5759
mockSink.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
5860
return mockSink
5961
}
@@ -274,6 +276,36 @@ func TestMergeDispatcherInvalidIDs(t *testing.T) {
274276
require.False(t, exists)
275277
}
276278

279+
func TestTryCloseRemovedRequestAfterClosedReturnsImmediatelyAndTriggersCleanup(t *testing.T) {
280+
changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
281+
mysqlConfig := mysqlcfg.New()
282+
mysqlConfig.EnableDDLTs = false
283+
mysqlSink := mysql.NewMySQLSink(
284+
context.Background(),
285+
changefeedID,
286+
mysqlConfig,
287+
nil,
288+
false,
289+
false,
290+
time.Minute,
291+
)
292+
manager := &DispatcherManager{
293+
changefeedID: changefeedID,
294+
sink: mysqlSink,
295+
}
296+
manager.closed.Store(true)
297+
298+
// Preserve the historical close contract: once the manager is already closed,
299+
// late remove requests should not delay TryClose success.
300+
closed := manager.TryClose(true)
301+
require.True(t, closed)
302+
require.True(t, manager.removeChangefeedRequested.Load())
303+
require.Eventually(t, func() bool {
304+
return manager.removeChangefeedCleaned.Load()
305+
}, time.Second, 10*time.Millisecond)
306+
require.True(t, manager.TryClose(true))
307+
}
308+
277309
func TestMergeDispatcherExistingID(t *testing.T) {
278310
manager := createTestManager(t)
279311

downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,11 @@ func getPendingMessageKey(msg *messaging.TargetMessage) (pendingMessageKey, bool
119119
// handleMessages processes messages from the queue
120120
func (m *DispatcherOrchestrator) handleMessages() {
121121
for {
122-
key, ok := m.msgQueue.Pop()
122+
msg, ok := m.msgQueue.Pop()
123123
if !ok {
124124
log.Info("dispatcher orchestrator is shutting down, exit handleMessages")
125125
return
126126
}
127-
msg := m.msgQueue.Get(key)
128127

129128
// Process the message
130129
switch req := msg.Message[0].(type) {
@@ -146,8 +145,6 @@ func (m *DispatcherOrchestrator) handleMessages() {
146145
zap.String("type", msg.Type.String()),
147146
zap.Any("message", msg.Message))
148147
}
149-
150-
m.msgQueue.Done(key)
151148
}
152149
}
153150

downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/stretchr/testify/require"
2525
)
2626

27-
func TestPendingMessageQueue_TryEnqueueDropsDuplicatesUntilDone(t *testing.T) {
27+
func TestPendingMessageQueue_TryEnqueueDropsDuplicatesOnlyWhileQueued(t *testing.T) {
2828
t.Parallel()
2929

3030
q := newPendingMessageQueue()
@@ -38,14 +38,18 @@ func TestPendingMessageQueue_TryEnqueueDropsDuplicatesUntilDone(t *testing.T) {
3838
require.True(t, q.TryEnqueue(key, msg))
3939
require.False(t, q.TryEnqueue(key, msg))
4040

41-
poppedKey, ok := q.Pop()
41+
poppedMsg, ok := q.Pop()
4242
require.True(t, ok)
43-
require.Equal(t, key, poppedKey)
43+
require.Same(t, msg, poppedMsg)
4444

45-
// The key remains pending while being processed.
45+
// Once the request is popped, allow one queued retry for the next round.
46+
require.True(t, q.TryEnqueue(key, msg))
4647
require.False(t, q.TryEnqueue(key, msg))
4748

48-
q.Done(key)
49+
nextMsg, ok := q.Pop()
50+
require.True(t, ok)
51+
require.Same(t, msg, nextMsg)
52+
4953
require.True(t, q.TryEnqueue(key, msg))
5054
}
5155

@@ -62,15 +66,13 @@ func TestPendingMessageQueue_OrderPreservedAcrossKeys(t *testing.T) {
6266
require.True(t, q.TryEnqueue(key1, &messaging.TargetMessage{Type: key1.msgType}))
6367
require.True(t, q.TryEnqueue(key2, &messaging.TargetMessage{Type: key2.msgType}))
6468

65-
poppedKey, ok := q.Pop()
69+
poppedMsg, ok := q.Pop()
6670
require.True(t, ok)
67-
require.Equal(t, key1, poppedKey)
68-
q.Done(poppedKey)
71+
require.Equal(t, key1.msgType, poppedMsg.Type)
6972

70-
poppedKey, ok = q.Pop()
73+
poppedMsg, ok = q.Pop()
7174
require.True(t, ok)
72-
require.Equal(t, key2, poppedKey)
73-
q.Done(poppedKey)
75+
require.Equal(t, key2.msgType, poppedMsg.Type)
7476
}
7577

7678
func TestPendingMessageQueue_PopReturnsAfterClose(t *testing.T) {
@@ -118,17 +120,14 @@ func TestPendingMessageQueue_CloseRequestRemovedTrueOverridesPendingFalse(t *tes
118120
require.True(t, q.TryEnqueue(key, msgFalse))
119121
require.True(t, q.TryEnqueue(key, msgTrue))
120122

121-
poppedKey, ok := q.Pop()
123+
poppedMsg, ok := q.Pop()
122124
require.True(t, ok)
123-
require.Equal(t, key, poppedKey)
124-
poppedMsg := q.Get(poppedKey)
125125
require.NotNil(t, poppedMsg)
126126
req := poppedMsg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
127127
require.True(t, req.Removed)
128-
q.Done(key)
129128
}
130129

131-
func TestPendingMessageQueue_CloseRequestUpgradeBetweenPopAndGet(t *testing.T) {
130+
func TestPendingMessageQueue_CloseRequestUpgradeAfterPopKeepsReturnedMessageStable(t *testing.T) {
132131
t.Parallel()
133132

134133
q := newPendingMessageQueue()
@@ -150,16 +149,67 @@ func TestPendingMessageQueue_CloseRequestUpgradeBetweenPopAndGet(t *testing.T) {
150149
)
151150

152151
require.True(t, q.TryEnqueue(key, msgFalse))
153-
poppedKey, ok := q.Pop()
152+
poppedMsg, ok := q.Pop()
154153
require.True(t, ok)
155-
require.Equal(t, key, poppedKey)
154+
require.NotNil(t, poppedMsg)
156155

157156
require.True(t, q.TryEnqueue(key, msgTrue))
158-
poppedMsg := q.Get(poppedKey)
159-
require.NotNil(t, poppedMsg)
160157
req2 := poppedMsg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
161-
require.True(t, req2.Removed)
162-
q.Done(key)
158+
require.False(t, req2.Removed)
159+
}
160+
161+
func TestPendingMessageQueue_CloseRequestUpgradeAfterPopRequeuesNextRound(t *testing.T) {
162+
t.Parallel()
163+
164+
q := newPendingMessageQueue()
165+
cfID := common.NewChangeFeedIDWithName("cf", "default")
166+
key := pendingMessageKey{
167+
changefeedID: cfID,
168+
msgType: messaging.TypeMaintainerCloseRequest,
169+
}
170+
171+
msgFalse := messaging.NewSingleTargetMessage(
172+
node.ID("to"),
173+
messaging.DispatcherManagerManagerTopic,
174+
&heartbeatpb.MaintainerCloseRequest{ChangefeedID: cfID.ToPB(), Removed: false},
175+
)
176+
msgTrue := messaging.NewSingleTargetMessage(
177+
node.ID("to"),
178+
messaging.DispatcherManagerManagerTopic,
179+
&heartbeatpb.MaintainerCloseRequest{ChangefeedID: cfID.ToPB(), Removed: true},
180+
)
181+
182+
require.True(t, q.TryEnqueue(key, msgFalse))
183+
184+
poppedMsg, ok := q.Pop()
185+
require.True(t, ok)
186+
187+
require.NotNil(t, poppedMsg)
188+
req := poppedMsg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
189+
require.False(t, req.Removed)
190+
191+
require.True(t, q.TryEnqueue(key, msgTrue))
192+
193+
type popResult struct {
194+
msg *messaging.TargetMessage
195+
ok bool
196+
}
197+
resultCh := make(chan popResult, 1)
198+
go func() {
199+
nextMsg, nextOK := q.Pop()
200+
resultCh <- popResult{msg: nextMsg, ok: nextOK}
201+
}()
202+
203+
select {
204+
case result := <-resultCh:
205+
require.True(t, result.ok)
206+
require.NotNil(t, result.msg)
207+
nextReq := result.msg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
208+
require.True(t, nextReq.Removed)
209+
case <-time.After(time.Second):
210+
q.Close()
211+
require.FailNow(t, "upgraded close request was not requeued after the first pop")
212+
}
163213
}
164214

165215
func TestGetPendingMessageKey_SupportedTypes(t *testing.T) {

0 commit comments

Comments
 (0)