api,cli: fix running changefeed resume handling#4894
api,cli: fix running changefeed resume handling#4894wlwilliamx wants to merge 1 commit intopingcap:masterfrom
Conversation
Signed-off-by: wlwilliamx <wlwilliamx@gmail.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughEarly state validation is added to CLI and API resume handlers to verify changefeeds are in a resumable state (stopped/failed/finished) before proceeding with resume operations. A new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized IsResumable check for changefeeds within the pkg/config package and integrates it into the API, CLI, and Coordinator layers to ensure that only changefeeds in Stopped, Failed, or Finished states can be resumed. The changes include updated error handling in the Coordinator and comprehensive test coverage across all affected components. Feedback from the review suggests further centralizing the validation logic and error messages to reduce duplication across the different layers and improve maintainability.
| func (s FeedState) IsResumable() bool { | ||
| switch s { | ||
| case StateFailed, StateStopped, StateFinished: | ||
| return true | ||
| default: | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
The error message associated with this state check is duplicated in three different places (api/v2/changefeed.go, coordinator/controller.go, and cmd/cdc/cli/cli_changefeed_resume.go).
To improve maintainability and ensure consistency if the list of resumable states ever changes, consider defining a shared constant or a helper method on FeedState that returns the allowed states as a string, or even a method that returns the formatted error message.
| func validateResumeChangefeedState(state config.FeedState) error { | ||
| if state.IsResumable() { | ||
| return nil | ||
| } | ||
| return errors.ErrChangefeedUpdateRefused.GenWithStackByArgs( | ||
| fmt.Sprintf("can only resume changefeed when it is stopped, failed, or finished, but current state is %s", state), | ||
| ) | ||
| } |
There was a problem hiding this comment.
This helper function is only used once in ResumeChangefeed. While it's not a bug, for consistency with other handlers in this file (like UpdateChangefeed at line 930), you might consider inlining this check or refactoring it into a shared location in pkg/config since the same logic and error message are repeated in the CLI and Coordinator layers.
There was a problem hiding this comment.
🧹 Nitpick comments (5)
coordinator/controller.go (1)
799-806: LGTM — refusal is now explicit.Returning
ErrChangefeedUpdateRefusedinstead of the previous silent no-op correctly surfaces the rejection to HTTP/CLI callers, and aligns with the new CLI/API early guards. The early return beforebackend.ResumeChangefeedandmoveChangefeedToSchedulingQueuealso avoids the temporary resuming GC guard problem described in the issue.Nit:
zap.Any("state", state)can bezap.String("state", string(state))sinceFeedStateis a string alias — cheaper and a more precise field type.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller.go` around lines 799 - 806, Replace the zap.Any usage for the changefeed state in the warning log with a string field for efficiency and precision: where the code checks state.IsResumable() and returns errors.ErrChangefeedUpdateRefused.GenWithStackByArgs, update the log.Warn call to use zap.String("state", string(state)) instead of zap.Any("state", state) so the FeedState (string alias) is logged as a concrete string field.api/v2/changefeed.go (1)
757-767: Consider hoisting the state check above the keyspace state check.The resumable-state guard is placed after
keyspaceMeta.State != KeyspaceState_ENABLED, but it only needscfInfo.Statewhich is already available from line 743. Moving thevalidateResumeChangefeedStatecall to immediately after theco.GetChangefeedblock (before the keyspaceMeta check, or at least before any further network work) would make the early-exit intent clearer and consistent with the CLI guard placement. Functionally equivalent today, but a cleaner ordering.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v2/changefeed.go` around lines 757 - 767, Move the resumable-state guard so it runs immediately after obtaining cfInfo (i.e., right after the co.GetChangefeed result) by calling validateResumeChangefeedState(cfInfo.State) before checking keyspaceMeta.State; specifically, hoist the validateResumeChangefeedState call up so it executes prior to the keyspaceMeta/state check and returns the same error via c.Error when invalid, keeping the existing behavior of c.IndentedJSON and c.Abort for the keyspaceMeta check intact.pkg/config/changefeed_test.go (1)
41-44: Minor: include the state in the assertion message for easier failure triage.
require.Equalon each loop iteration will report a mismatch but not which state failed. Addingtt.stateas a msgAndArgs makes the failure actionable at a glance.Suggested tweak
- for _, tt := range tests { - require.Equal(t, tt.resumable, tt.state.IsResumable()) - } + for _, tt := range tests { + require.Equal(t, tt.resumable, tt.state.IsResumable(), "state=%s", tt.state) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/config/changefeed_test.go` around lines 41 - 44, Update the loop assertion so failures include the state under test: when iterating over tests, pass tt.state (or a description like tt.state.String() if available) as the msgAndArgs to require.Equal so the assertion reads require.Equal(t, tt.resumable, tt.state.IsResumable(), tt.state) — this ensures any mismatch prints which tt.state caused the failure; reference the loop variables tests and tt and the IsResumable() method to locate where to add the msg.pkg/config/changefeed.go (1)
113-121: Consider exposing the user-facing error message alongsideIsResumable.The predicate centralizes the rule well, but the human-readable message "can only resume changefeed when it is stopped, failed, or finished, but current state is %s" is now duplicated verbatim at three call sites (
api/v2/changefeed.go:1071,cmd/cdc/cli/cli_changefeed_resume.go:167,coordinator/controller.go:801). If the set of resumable states ever changes, the three messages will drift. Optional: expose a helper onFeedState(e.g.,ResumableErrorMessage(s FeedState) string) or a package-level format string so the message tracks the predicate.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/config/changefeed.go` around lines 113 - 121, The IsResumable predicate is fine but the user-facing error string is duplicated at call sites; add a single source of truth (either a method on FeedState like ResumableErrorMessage() or a package-level format string constant such as ResumableErrorFmt) and update callers that currently check IsResumable() to use that helper when constructing the error ("can only resume changefeed when it is stopped, failed, or finished, but current state is %s"), ensuring FeedState and IsResumable remain the canonical state logic while the new ResumableErrorMessage/ResumableErrorFmt produces the formatted human message so changes to resumable states automatically keep the message in sync.api/v2/changefeed_test.go (1)
80-163: Consider extracting the fake server/coordinator to a shared test helper.
resumeNormalServerandresumeNormalCoordinatoreach implement ~10 interface methods just to exercise one handler. If additional v2 handler tests need similar fakes (likely, given the PR adds early-guard patterns), extracting these into achangefeed_test_helpers.go(or using a mock generated viagomock/mockery) would reduce boilerplate and keep the interface-satisfaction code out of the test body. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/v2/changefeed_test.go` around lines 80 - 163, Extract the test fakes resumeNormalServer and resumeNormalCoordinator into a shared test helper (e.g., changefeed_test_helpers.go) so other v2 handler tests can reuse them; move the type definitions and all their methods (Run, Close, SelfInfo, Liveness, GetCoordinator, IsCoordinator, GetCoordinatorInfo, GetPdClient, GetEtcdClient, GetMaintainerManager on resumeNormalServer and Stop, Run, ListChangefeeds, GetChangefeed, CreateChangefeed, RemoveChangefeed, PauseChangefeed, ResumeChangefeed, UpdateChangefeed, RequestResolvedTsFromLogCoordinator, Initialized on resumeNormalCoordinator) into that file (or replace with generated mocks via gomock/mockery) and update tests to import/use resumeNormalServer and resumeNormalCoordinator from the helper to remove boilerplate in changefeed_test.go.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@api/v2/changefeed_test.go`:
- Around line 80-163: Extract the test fakes resumeNormalServer and
resumeNormalCoordinator into a shared test helper (e.g.,
changefeed_test_helpers.go) so other v2 handler tests can reuse them; move the
type definitions and all their methods (Run, Close, SelfInfo, Liveness,
GetCoordinator, IsCoordinator, GetCoordinatorInfo, GetPdClient, GetEtcdClient,
GetMaintainerManager on resumeNormalServer and Stop, Run, ListChangefeeds,
GetChangefeed, CreateChangefeed, RemoveChangefeed, PauseChangefeed,
ResumeChangefeed, UpdateChangefeed, RequestResolvedTsFromLogCoordinator,
Initialized on resumeNormalCoordinator) into that file (or replace with
generated mocks via gomock/mockery) and update tests to import/use
resumeNormalServer and resumeNormalCoordinator from the helper to remove
boilerplate in changefeed_test.go.
In `@api/v2/changefeed.go`:
- Around line 757-767: Move the resumable-state guard so it runs immediately
after obtaining cfInfo (i.e., right after the co.GetChangefeed result) by
calling validateResumeChangefeedState(cfInfo.State) before checking
keyspaceMeta.State; specifically, hoist the validateResumeChangefeedState call
up so it executes prior to the keyspaceMeta/state check and returns the same
error via c.Error when invalid, keeping the existing behavior of c.IndentedJSON
and c.Abort for the keyspaceMeta check intact.
In `@coordinator/controller.go`:
- Around line 799-806: Replace the zap.Any usage for the changefeed state in the
warning log with a string field for efficiency and precision: where the code
checks state.IsResumable() and returns
errors.ErrChangefeedUpdateRefused.GenWithStackByArgs, update the log.Warn call
to use zap.String("state", string(state)) instead of zap.Any("state", state) so
the FeedState (string alias) is logged as a concrete string field.
In `@pkg/config/changefeed_test.go`:
- Around line 41-44: Update the loop assertion so failures include the state
under test: when iterating over tests, pass tt.state (or a description like
tt.state.String() if available) as the msgAndArgs to require.Equal so the
assertion reads require.Equal(t, tt.resumable, tt.state.IsResumable(), tt.state)
— this ensures any mismatch prints which tt.state caused the failure; reference
the loop variables tests and tt and the IsResumable() method to locate where to
add the msg.
In `@pkg/config/changefeed.go`:
- Around line 113-121: The IsResumable predicate is fine but the user-facing
error string is duplicated at call sites; add a single source of truth (either a
method on FeedState like ResumableErrorMessage() or a package-level format
string constant such as ResumableErrorFmt) and update callers that currently
check IsResumable() to use that helper when constructing the error ("can only
resume changefeed when it is stopped, failed, or finished, but current state is
%s"), ensuring FeedState and IsResumable remain the canonical state logic while
the new ResumableErrorMessage/ResumableErrorFmt produces the formatted human
message so changes to resumable states automatically keep the message in sync.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b3654cb8-198a-4f2c-922c-67b15d7c4d9a
📒 Files selected for processing (8)
api/v2/changefeed.goapi/v2/changefeed_test.gocmd/cdc/cli/cli_changefeed_resume.gocmd/cdc/cli/cli_changefeed_resume_test.gocoordinator/controller.gocoordinator/controller_test.gopkg/config/changefeed.gopkg/config/changefeed_test.go
What problem does this PR solve?
Issue Number: close #4893
What is changed and how it works?
This PR rejects
changefeed resumerequests when the changefeed is already in a running state.The change adds a shared
FeedState.IsResumablepredicate and uses it in:cdc cli changefeed resumefails before printing a misleading success message or sending the resume request.ErrChangefeedUpdateRefused.This prevents running changefeeds from reporting resume success and avoids creating unnecessary resuming GC guards for invalid resume requests.
Check List
Tests
go test ./api/v2 ./coordinator ./pkg/config ./cmd/cdc/cli -count=1Questions
Will it cause performance regression or break compatibility?
No performance regression. Invalid resume requests for running changefeeds now return an explicit error instead of success/no-op.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Bug Fixes
Tests