Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,10 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
c.Abort()
return
}
if err = validateResumeChangefeedState(cfInfo.State); err != nil {
_ = c.Error(err)
return
}

// Reject a changefeed if the downstream is the same TiDB logical cluster as the upstream.
isSame, err := check.IsSameUpstreamDownstream(ctx, h.server.GetPdClient(), cfInfo.ToChangefeedConfig())
Expand Down Expand Up @@ -1059,6 +1063,15 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
c.JSON(getStatus(c), CfInfoToAPIModel(oldCfInfo, status, nil))
}

func validateResumeChangefeedState(state config.FeedState) error {
if state.IsResumable() {
return nil
}
return errors.ErrChangefeedUpdateRefused.GenWithStackByArgs(
fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", state),
)
}
Comment on lines +1066 to +1073
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This helper function is only used once in ResumeChangefeed. While it's not a bug, for consistency with other handlers in this file (like UpdateChangefeed at line 930), you might consider inlining this check or refactoring it into a shared location in pkg/config since the same logic and error message are repeated in the CLI and Coordinator layers.


// verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed
// overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user.
// or it is the checkpointTs of the changefeed before it is paused.
Expand Down
162 changes: 162 additions & 0 deletions api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/ticdc/maintainer"
"github.com/pingcap/ticdc/pkg/api"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/liveness"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/server"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)

// TestValidateResumeChangefeedState covers the API-side guard that runs before
// resume GC safepoint/barrier setup. Running states must fail fast, while states
// that are actually stopped can proceed to the remaining resume validation.
func TestValidateResumeChangefeedState(t *testing.T) {
for _, state := range []config.FeedState{config.StateStopped, config.StateFailed, config.StateFinished} {
require.NoError(t, validateResumeChangefeedState(state))
}

for _, state := range []config.FeedState{config.StateNormal, config.StateWarning, config.StatePending} {
err := validateResumeChangefeedState(state)
require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(err))
require.Contains(t, err.Error(), string(state))
}
}

// TestResumeChangefeedRejectsNormalBeforeGC covers the HTTP resume regression:
// a normal changefeed must fail before the handler requests PD/etcd clients for
// GC safepoint/barrier setup or calls the coordinator resume path.
func TestResumeChangefeedRejectsNormalBeforeGC(t *testing.T) {
gin.SetMode(gin.TestMode)

co := &resumeNormalCoordinator{}
srv := &resumeNormalServer{coordinator: co}
h := &OpenAPIV2{server: srv}

w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/api/v2/changefeeds/test/resume?keyspace=default", nil)
c.Params = gin.Params{{Key: api.APIOpVarChangefeedID, Value: "test"}}
c.Set("ctx-keyspace", &keyspacepb.KeyspaceMeta{
Id: common.DefaultKeyspaceID,
State: keyspacepb.KeyspaceState_ENABLED,
})

h.ResumeChangefeed(c)

require.Len(t, c.Errors, 1)
require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(c.Errors.Last().Err))
require.False(t, srv.pdClientRequested)
require.False(t, srv.etcdClientRequested)
require.False(t, co.resumeCalled)
}

type resumeNormalServer struct {
coordinator server.Coordinator
pdClientRequested bool
etcdClientRequested bool
}

func (s *resumeNormalServer) Run(ctx context.Context) error { return nil }

func (s *resumeNormalServer) Close() {}

func (s *resumeNormalServer) SelfInfo() (*node.Info, error) { return nil, nil }

func (s *resumeNormalServer) Liveness() liveness.Liveness { return liveness.CaptureAlive }

func (s *resumeNormalServer) GetCoordinator() (server.Coordinator, error) {
return s.coordinator, nil
}

func (s *resumeNormalServer) IsCoordinator() bool { return true }

func (s *resumeNormalServer) GetCoordinatorInfo(ctx context.Context) (*node.Info, error) {
return nil, nil
}

func (s *resumeNormalServer) GetPdClient() pd.Client {
s.pdClientRequested = true
return nil
}

func (s *resumeNormalServer) GetEtcdClient() etcd.CDCEtcdClient {
s.etcdClientRequested = true
return nil
}

func (s *resumeNormalServer) GetMaintainerManager() *maintainer.Manager { return nil }

type resumeNormalCoordinator struct {
resumeCalled bool
}

func (c *resumeNormalCoordinator) Stop() {}

func (c *resumeNormalCoordinator) Run(ctx context.Context) error { return nil }

func (c *resumeNormalCoordinator) ListChangefeeds(ctx context.Context, keyspace string) ([]*config.ChangeFeedInfo, []*config.ChangeFeedStatus, error) {
return nil, nil, nil
}

func (c *resumeNormalCoordinator) GetChangefeed(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) {
changefeedID := common.NewChangeFeedIDWithName(changefeedDisplayName.Name, changefeedDisplayName.Keyspace)
return &config.ChangeFeedInfo{
ChangefeedID: changefeedID,
State: config.StateNormal,
}, &config.ChangeFeedStatus{
CheckpointTs: 123,
}, nil
}

func (c *resumeNormalCoordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error {
return nil
}

func (c *resumeNormalCoordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) {
return 0, nil
}

func (c *resumeNormalCoordinator) PauseChangefeed(ctx context.Context, id common.ChangeFeedID) error {
return nil
}

func (c *resumeNormalCoordinator) ResumeChangefeed(ctx context.Context, id common.ChangeFeedID, newCheckpointTs uint64, overwriteCheckpointTs bool) error {
c.resumeCalled = true
return nil
}

func (c *resumeNormalCoordinator) UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error {
return nil
}

func (c *resumeNormalCoordinator) RequestResolvedTsFromLogCoordinator(ctx context.Context, changefeedDisplayName common.ChangeFeedDisplayName) {
}

func (c *resumeNormalCoordinator) Initialized() bool { return true }
6 changes: 6 additions & 0 deletions cmd/cdc/cli/cli_changefeed_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cli

import (
"context"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -161,6 +162,11 @@ func (o *resumeChangefeedOptions) validateParams(ctx context.Context) error {
return err
}
o.changefeedDetail = detail
if !detail.State.IsResumable() {
return cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(
fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", detail.State),
)
}

tso, err := o.getTSO(ctx)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions cmd/cdc/cli/cli_changefeed_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/pkg/api"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -43,6 +44,7 @@ func TestChangefeedResumeCli(t *testing.T) {
ID: "abc",
CheckpointTime: api.JSONTime{},
Error: nil,
State: config.StateStopped,
}, nil)
f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(&v2.Tso{
Timestamp: time.Now().Unix() * 1000,
Expand All @@ -68,6 +70,7 @@ func TestChangefeedResumeCli(t *testing.T) {
ID: "abc",
CheckpointTime: api.JSONTime{},
CheckpointTs: 2,
State: config.StateStopped,
}, nil)
f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(nil, errors.New("test")).AnyTimes()
require.NotNil(t, o.run(cmd))
Expand All @@ -80,6 +83,7 @@ func TestChangefeedResumeCli(t *testing.T) {
ID: "abc",
CheckpointTime: api.JSONTime{},
CheckpointTs: 2,
State: config.StateStopped,
}, nil)
f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(&v2.Tso{
Timestamp: time.Now().Unix() * 1000,
Expand Down Expand Up @@ -117,6 +121,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) {
CheckpointTs: 2,
CheckpointTime: api.JSONTime{},
Error: nil,
State: config.StateStopped,
}, nil).Times(2)
tso := &v2.Tso{
Timestamp: time.Now().Unix() * 1000,
Expand All @@ -139,6 +144,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) {
ID: "abc",
CheckpointTime: api.JSONTime{},
Error: nil,
State: config.StateStopped,
}, nil)
f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(tso, nil).AnyTimes()
o.noConfirm = true
Expand All @@ -153,6 +159,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) {
ID: "abc",
CheckpointTime: api.JSONTime{},
Error: nil,
State: config.StateStopped,
}, nil)
f.tso.EXPECT().Query(gomock.Any(), gomock.Any()).Return(tso, nil).AnyTimes()
o.overwriteCheckpointTs = "18446744073709551615"
Expand All @@ -167,6 +174,7 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) {
CheckpointTs: 2,
CheckpointTime: api.JSONTime{},
Error: nil,
State: config.StateStopped,
}, nil).Times(2)
tso = &v2.Tso{
Timestamp: 1,
Expand All @@ -180,3 +188,28 @@ func TestChangefeedResumeWithNewCheckpointTs(t *testing.T) {
o.overwriteCheckpointTs = "262144"
require.NotNil(t, o.run(cmd))
}

func TestChangefeedResumeRejectsNormalState(t *testing.T) {
// Scenario: the CLI reads a running changefeed before sending resume. It
// should return a state error immediately and never call the resume API.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
f := newMockFactory(ctrl)
o := newResumeChangefeedOptions()
require.NoError(t, o.complete(f))
cmd := newCmdResumeChangefeed(f)

f.changefeeds.EXPECT().Get(gomock.Any(), gomock.Any(), "abc").Return(&v2.ChangeFeedInfo{
UpstreamID: 1,
Keyspace: "default",
ID: "abc",
CheckpointTime: api.JSONTime{},
State: config.StateNormal,
}, nil)

o.noConfirm = true
o.changefeedID = "abc"
err := o.run(cmd)
require.True(t, cerror.ErrChangefeedUpdateRefused.Equal(err))
require.Contains(t, err.Error(), string(config.StateNormal))
}
12 changes: 7 additions & 5 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package coordinator

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -795,12 +796,13 @@ func (c *Controller) ResumeChangefeed(
}

state := cf.GetInfo().State
switch state {
case config.StateFailed, config.StateStopped, config.StateFinished:
default:
log.Warn("ignore resume the changefeed",
if !state.IsResumable() {
err := errors.ErrChangefeedUpdateRefused.GenWithStackByArgs(
fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", state),
)
log.Warn("refuse to resume the changefeed",
zap.Stringer("changefeedID", id), zap.Any("state", state))
return nil
return err
}

if err := c.backend.ResumeChangefeed(ctx, id, newCheckpointTs); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions coordinator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func TestResumeChangefeed(t *testing.T) {
}

func TestResumeChangefeedNormalState(t *testing.T) {
// Scenario: a running changefeed receives a resume request. The controller
// should reject it before touching backend state, so the epoch remains unchanged.
ctrl := gomock.NewController(t)
backend := mock_changefeed.NewMockBackend(ctrl)
changefeedDB := changefeed.NewChangefeedDB(1216)
Expand All @@ -167,9 +169,8 @@ func TestResumeChangefeedNormalState(t *testing.T) {
changefeedDB.AddReplicatingMaintainer(cf, "node1")

err := controller.ResumeChangefeed(context.Background(), cfID, 12, true)
require.NoError(t, err)
require.True(t, errors.ErrChangefeedUpdateRefused.Equal(err))

// The resume operation is skipped, so the epoch is not updated and should remain its original value.
changefeed := controller.changefeedDB.GetByID(cfID)
require.Equal(t, changefeed.GetInfo().Epoch, uint64(233))
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ func (s FeedState) IsRunning() bool {
return s == StateNormal || s == StateWarning
}

// IsResumable returns true if the feedState can be resumed to StateNormal.
func (s FeedState) IsResumable() bool {
switch s {
case StateFailed, StateStopped, StateFinished:
return true
default:
return false
}
}
Comment on lines +114 to +121
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message associated with this state check is duplicated in three different places (api/v2/changefeed.go, coordinator/controller.go, and cmd/cdc/cli/cli_changefeed_resume.go).

To improve maintainability and ensure consistency if the list of resumable states ever changes, consider defining a shared constant or a helper method on FeedState that returns the allowed states as a string, or even a method that returns the formatted error message.


// RunningError represents some running error from cdc components, such as processor.
type RunningError struct {
Time time.Time `json:"time"`
Expand Down
22 changes: 22 additions & 0 deletions pkg/config/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ import (
"github.com/stretchr/testify/require"
)

// TestFeedStateIsResumable verifies the shared state predicate used by CLI/API
// resume paths. Only stopped or terminal-but-resumable states should be accepted.
func TestFeedStateIsResumable(t *testing.T) {
tests := []struct {
state FeedState
resumable bool
}{
{state: StateStopped, resumable: true},
{state: StateFailed, resumable: true},
{state: StateFinished, resumable: true},
{state: StateNormal, resumable: false},
{state: StateWarning, resumable: false},
{state: StatePending, resumable: false},
{state: StateRemoved, resumable: false},
{state: StateUnInitialized, resumable: false},
}

for _, tt := range tests {
require.Equal(t, tt.resumable, tt.state.IsResumable())
}
}

// TestChangeFeedInfoToChangefeedConfigBatchFields ensures the maintainer-facing
// changefeed config keeps the optional event collector batch overrides.
func TestChangeFeedInfoToChangefeedConfigBatchFields(t *testing.T) {
Expand Down
Loading