From 8b83f10a9f6913afa2becd3729b63acd0a6e73d0 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Thu, 23 Apr 2026 15:18:29 +0800 Subject: [PATCH] api,cli: reject resume for running changefeeds Signed-off-by: wlwilliamx --- api/v2/changefeed.go | 13 ++ api/v2/changefeed_test.go | 162 ++++++++++++++++++++++ cmd/cdc/cli/cli_changefeed_resume.go | 6 + cmd/cdc/cli/cli_changefeed_resume_test.go | 33 +++++ coordinator/controller.go | 12 +- coordinator/controller_test.go | 5 +- pkg/config/changefeed.go | 10 ++ pkg/config/changefeed_test.go | 22 +++ 8 files changed, 256 insertions(+), 7 deletions(-) create mode 100644 api/v2/changefeed_test.go diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 29b7c8bf2c..e3cc3f1930 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -761,6 +761,10 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { c.Abort() return } + if err = validateResumeChangefeedState(cfInfo.State); err != nil { + _ = c.Error(err) + return + } // Reject a changefeed if the downstream is the same TiDB logical cluster as the upstream. isSame, err := check.IsSameUpstreamDownstream(ctx, h.server.GetPdClient(), cfInfo.ToChangefeedConfig()) @@ -1059,6 +1063,15 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { c.JSON(getStatus(c), CfInfoToAPIModel(oldCfInfo, status, nil)) } +func validateResumeChangefeedState(state config.FeedState) error { + if state.IsResumable() { + return nil + } + return errors.ErrChangefeedUpdateRefused.GenWithStackByArgs( + fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", state), + ) +} + // verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed // overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user. // or it is the checkpointTs of the changefeed before it is paused. diff --git a/api/v2/changefeed_test.go b/api/v2/changefeed_test.go new file mode 100644 index 0000000000..cfd7d0ba46 --- /dev/null +++ b/api/v2/changefeed_test.go @@ -0,0 +1,162 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/ticdc/maintainer" + "github.com/pingcap/ticdc/pkg/api" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/liveness" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/server" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +// TestValidateResumeChangefeedState covers the API-side guard that runs before +// resume GC safepoint/barrier setup. Running states must fail fast, while states +// that are actually stopped can proceed to the remaining resume validation. +func TestValidateResumeChangefeedState(t *testing.T) { + for _, state := range []config.FeedState{config.StateStopped, config.StateFailed, config.StateFinished} { + require.NoError(t, validateResumeChangefeedState(state)) + } + + for _, state := range []config.FeedState{config.StateNormal, config.StateWarning, config.StatePending} { + err := validateResumeChangefeedState(state) + require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(err)) + require.Contains(t, err.Error(), string(state)) + } +} + +// TestResumeChangefeedRejectsNormalBeforeGC covers the HTTP resume regression: +// a normal changefeed must fail before the handler requests PD/etcd clients for +// GC safepoint/barrier setup or calls the coordinator resume path. +func TestResumeChangefeedRejectsNormalBeforeGC(t *testing.T) { + gin.SetMode(gin.TestMode) + + co := &resumeNormalCoordinator{} + srv := &resumeNormalServer{coordinator: co} + h := &OpenAPIV2{server: srv} + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v2/changefeeds/test/resume?keyspace=default", nil) + c.Params = gin.Params{{Key: api.APIOpVarChangefeedID, Value: "test"}} + c.Set("ctx-keyspace", &keyspacepb.KeyspaceMeta{ + Id: common.DefaultKeyspaceID, + State: keyspacepb.KeyspaceState_ENABLED, + }) + + h.ResumeChangefeed(c) + + require.Len(t, c.Errors, 1) + require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(c.Errors.Last().Err)) + require.False(t, srv.pdClientRequested) + require.False(t, srv.etcdClientRequested) + require.False(t, co.resumeCalled) +} + +type resumeNormalServer struct { + coordinator server.Coordinator + pdClientRequested bool + etcdClientRequested bool +} + +func (s *resumeNormalServer) Run(ctx context.Context) error { return nil } + +func (s *resumeNormalServer) Close() {} + +func (s *resumeNormalServer) SelfInfo() (*node.Info, error) { return nil, nil } + +func (s *resumeNormalServer) Liveness() liveness.Liveness { return liveness.CaptureAlive } + +func (s *resumeNormalServer) GetCoordinator() (server.Coordinator, error) { + return s.coordinator, nil +} + +func (s *resumeNormalServer) IsCoordinator() bool { return true } + +func (s *resumeNormalServer) GetCoordinatorInfo(ctx context.Context) (*node.Info, error) { + return nil, nil +} + +func (s *resumeNormalServer) GetPdClient() pd.Client { + s.pdClientRequested = true + return nil +} + +func (s *resumeNormalServer) GetEtcdClient() etcd.CDCEtcdClient { + s.etcdClientRequested = true + return nil +} + +func (s *resumeNormalServer) GetMaintainerManager() *maintainer.Manager { return nil } + +type resumeNormalCoordinator struct { + resumeCalled bool +} + +func (c *resumeNormalCoordinator) Stop() {} + +func (c *resumeNormalCoordinator) Run(ctx context.Context) error { return nil } + +func (c *resumeNormalCoordinator) ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) { + return nil, nil, nil +} + +func (c *resumeNormalCoordinator) GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) { + changefeedID := common.NewChangeFeedIDWithName(changefeedDisplayName.Name, changefeedDisplayName.Keyspace) + return &config.ChangeFeedInfo{ + ChangefeedID: changefeedID, + State: config.StateNormal, + }, &config.ChangeFeedStatus{ + CheckpointTs: 123, + }, nil +} + +func (c *resumeNormalCoordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { + return nil +} + +func (c *resumeNormalCoordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) { + return 0, nil +} + +func (c *resumeNormalCoordinator) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error { + return nil +} + +func (c *resumeNormalCoordinator) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, overwriteCheckpointTs bool) error { + c.resumeCalled = true + return nil +} + +func (c *resumeNormalCoordinator) UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error { + return nil +} + +func (c *resumeNormalCoordinator) RequestResolvedTsFromLogCoordinator(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) { +} + +func (c *resumeNormalCoordinator) Initialized() bool { return true } diff --git a/cmd/cdc/cli/cli_changefeed_resume.go b/cmd/cdc/cli/cli_changefeed_resume.go index 4565b3b47a..1d8a1c8212 100644 --- a/cmd/cdc/cli/cli_changefeed_resume.go +++ b/cmd/cdc/cli/cli_changefeed_resume.go @@ -15,6 +15,7 @@ package cli import ( "context" + "fmt" "strconv" "strings" @@ -161,6 +162,11 @@ func (o *resumeChangefeedOptions) validateParams(ctx context.Context) error { return err } o.changefeedDetail = detail + if !detail.State.IsResumable() { + return cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs( + fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", detail.State), + ) + } tso, err := o.getTSO(ctx) if err != nil { diff --git a/cmd/cdc/cli/cli_changefeed_resume_test.go b/cmd/cdc/cli/cli_changefeed_resume_test.go index 1af5034509..7338e05ee5 100644 --- a/cmd/cdc/cli/cli_changefeed_resume_test.go +++ b/cmd/cdc/cli/cli_changefeed_resume_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/pkg/api" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -43,6 +44,7 @@ func TestChangefeedResumeCli(t *testing.T) { ID: "abc", CheckpointTime: api.JSONTime{}, Error: nil, + State: config.StateStopped, }, nil) f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(&v2.Tso{ Timestamp: time.Now().Unix() * 1000, @@ -68,6 +70,7 @@ func TestChangefeedResumeCli(t *testing.T) { ID: "abc", CheckpointTime: api.JSONTime{}, CheckpointTs: 2, + State: config.StateStopped, }, nil) f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(nil, errors.New("test")).AnyTimes() require.NotNil(t, o.run(cmd)) @@ -80,6 +83,7 @@ func TestChangefeedResumeCli(t *testing.T) { ID: "abc", CheckpointTime: api.JSONTime{}, CheckpointTs: 2, + State: config.StateStopped, }, nil) f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(&v2.Tso{ Timestamp: time.Now().Unix() * 1000, @@ -117,6 +121,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) { CheckpointTs: 2, CheckpointTime: api.JSONTime{}, Error: nil, + State: config.StateStopped, }, nil).Times(2) tso := &v2.Tso{ Timestamp: time.Now().Unix() * 1000, @@ -139,6 +144,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) { ID: "abc", CheckpointTime: api.JSONTime{}, Error: nil, + State: config.StateStopped, }, nil) f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(tso, nil).AnyTimes() o.noConfirm = true @@ -153,6 +159,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) { ID: "abc", CheckpointTime: api.JSONTime{}, Error: nil, + State: config.StateStopped, }, nil) f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(tso, nil).AnyTimes() o.overwriteCheckpointTs = "18446744073709551615" @@ -167,6 +174,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) { CheckpointTs: 2, CheckpointTime: api.JSONTime{}, Error: nil, + State: config.StateStopped, }, nil).Times(2) tso = &v2.Tso{ Timestamp: 1, @@ -180,3 +188,28 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) { o.overwriteCheckpointTs = "262144" require.NotNil(t, o.run(cmd)) } + +func TestChangefeedResumeRejectsNormalState(t *testing.T) { + // Scenario: the CLI reads a running changefeed before sending resume. It + // should return a state error immediately and never call the resume API. + ctrl := gomock.NewController(t) + defer ctrl.Finish() + f := newMockFactory(ctrl) + o := newResumeChangefeedOptions() + require.NoError(t, o.complete(f)) + cmd := newCmdResumeChangefeed(f) + + f.changefeeds.EXPECT().Get(gomock.Any(), gomock.Any(), "abc").Return(&v2.ChangeFeedInfo{ + UpstreamID: 1, + Keyspace: "default", + ID: "abc", + CheckpointTime: api.JSONTime{}, + State: config.StateNormal, + }, nil) + + o.noConfirm = true + o.changefeedID = "abc" + err := o.run(cmd) + require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(err)) + require.Contains(t, err.Error(), string(config.StateNormal)) +} diff --git a/coordinator/controller.go b/coordinator/controller.go index 7e38cf2fd7..827747f503 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -15,6 +15,7 @@ package coordinator import ( "context" + "fmt" "sync" "time" @@ -795,12 +796,13 @@ func (c *Controller) ResumeChangefeed( } state := cf.GetInfo().State - switch state { - case config.StateFailed, config.StateStopped, config.StateFinished: - default: - log.Warn("ignore resume the changefeed", + if !state.IsResumable() { + err := errors.ErrChangefeedUpdateRefused.GenWithStackByArgs( + fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", state), + ) + log.Warn("refuse to resume the changefeed", zap.Stringer("changefeedID", id), zap.Any("state", state)) - return nil + return err } if err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs); err != nil { diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index 6702501cd5..6b8dccd75a 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -149,6 +149,8 @@ func TestResumeChangefeed(t *testing.T) { } func TestResumeChangefeedNormalState(t *testing.T) { + // Scenario: a running changefeed receives a resume request. The controller + // should reject it before touching backend state, so the epoch remains unchanged. ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) @@ -167,9 +169,8 @@ func TestResumeChangefeedNormalState(t *testing.T) { changefeedDB.AddReplicatingMaintainer(cf, "node1") err := controller.ResumeChangefeed(context.Background(), cfID, 12, true) - require.NoError(t, err) + require.True(t, errors.ErrChangefeedUpdateRefused.Equal(err)) - // The resume operation is skipped, so the epoch is not updated and should remain its original value. changefeed := controller.changefeedDB.GetByID(cfID) require.Equal(t, changefeed.GetInfo().Epoch, uint64(233)) } diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index a7c9364f97..40d8e27366 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -110,6 +110,16 @@ func (s FeedState) IsRunning() bool { return s == StateNormal || s == StateWarning } +// IsResumable returns true if the feedState can be resumed to StateNormal. +func (s FeedState) IsResumable() bool { + switch s { + case StateFailed, StateStopped, StateFinished: + return true + default: + return false + } +} + // RunningError represents some running error from cdc components, such as processor. type RunningError struct { Time time.Time `json:"time"` diff --git a/pkg/config/changefeed_test.go b/pkg/config/changefeed_test.go index 02c1d006c6..6f03b9e9f3 100644 --- a/pkg/config/changefeed_test.go +++ b/pkg/config/changefeed_test.go @@ -21,6 +21,28 @@ import ( "github.com/stretchr/testify/require" ) +// TestFeedStateIsResumable verifies the shared state predicate used by CLI/API +// resume paths. Only stopped or terminal-but-resumable states should be accepted. +func TestFeedStateIsResumable(t *testing.T) { + tests := []struct { + state FeedState + resumable bool + }{ + {state: StateStopped, resumable: true}, + {state: StateFailed, resumable: true}, + {state: StateFinished, resumable: true}, + {state: StateNormal, resumable: false}, + {state: StateWarning, resumable: false}, + {state: StatePending, resumable: false}, + {state: StateRemoved, resumable: false}, + {state: StateUnInitialized, resumable: false}, + } + + for _, tt := range tests { + require.Equal(t, tt.resumable, tt.state.IsResumable()) + } +} + // TestChangeFeedInfoToChangefeedConfigBatchFields ensures the maintainer-facing // changefeed config keeps the optional event collector batch overrides. func TestChangeFeedInfoToChangefeedConfigBatchFields(t *testing.T) {