Skip to content

Commit 19aea6a

Browse files
authored
Add shardMode support to serverless migration CLI (#352)
1 parent 0e003a8 commit 19aea6a

10 files changed

Lines changed: 278 additions & 14 deletions

File tree

internal/cli/serverless/migration/create.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
8686
}
8787
definitionStr := string(definitionBytes)
8888

89-
sources, target, mode, err := parseMigrationDefinition(definitionStr)
89+
sources, target, mode, shardMode, err := parseMigrationDefinition(definitionStr)
9090
if err != nil {
9191
return err
9292
}
@@ -97,6 +97,7 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
9797
Sources: sources,
9898
Target: target,
9999
Mode: mode,
100+
ShardMode: shardMode,
100101
}
101102
return runMigrationPrecheck(ctx, d, clusterID, precheckBody, h)
102103
}
@@ -106,6 +107,7 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
106107
Sources: sources,
107108
Target: target,
108109
Mode: mode,
110+
ShardMode: shardMode,
109111
}
110112

111113
resp, err := d.CreateMigration(ctx, clusterID, createBody)
@@ -247,34 +249,39 @@ func shouldPrintPrecheckItem(status *pkgmigration.PrecheckItemStatus) bool {
247249
}
248250
}
249251

250-
func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, error) {
252+
func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, *pkgmigration.TaskShardMode, error) {
251253
trimmed := strings.TrimSpace(value)
252254
if trimmed == "" {
253-
return nil, pkgmigration.Target{}, "", errors.New("migration config is required; use --config-file")
255+
return nil, pkgmigration.Target{}, "", nil, errors.New("migration config is required; use --config-file")
254256
}
255257
var payload struct {
256-
Sources []pkgmigration.Source `json:"sources"`
257-
Target *pkgmigration.Target `json:"target"`
258-
Mode string `json:"mode"`
258+
Sources []pkgmigration.Source `json:"sources"`
259+
Target *pkgmigration.Target `json:"target"`
260+
Mode string `json:"mode"`
261+
ShardMode *string `json:"shardMode,omitempty"`
259262
}
260263
stdJson, err := standardizeJSON([]byte(trimmed))
261264
if err != nil {
262-
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
265+
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
263266
}
264267
if err := json.Unmarshal(stdJson, &payload); err != nil {
265-
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
268+
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
266269
}
267270
if len(payload.Sources) == 0 {
268-
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include at least one source")
271+
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include at least one source")
269272
}
270273
if payload.Target == nil {
271-
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include the target block")
274+
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include the target block")
272275
}
273276
mode, err := parseMigrationMode(payload.Mode)
274277
if err != nil {
275-
return nil, pkgmigration.Target{}, "", err
278+
return nil, pkgmigration.Target{}, "", nil, err
276279
}
277-
return payload.Sources, *payload.Target, mode, nil
280+
shardMode, err := parseMigrationShardMode(payload.ShardMode)
281+
if err != nil {
282+
return nil, pkgmigration.Target{}, "", nil, err
283+
}
284+
return payload.Sources, *payload.Target, mode, shardMode, nil
278285
}
279286

280287
func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
@@ -290,6 +297,22 @@ func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
290297
return "", errors.Errorf("invalid mode %q, allowed values: %s", value, pkgmigration.AllowedTaskModeEnumValues)
291298
}
292299

300+
func parseMigrationShardMode(value *string) (*pkgmigration.TaskShardMode, error) {
301+
if value == nil {
302+
return nil, nil
303+
}
304+
trimmed := strings.TrimSpace(*value)
305+
if trimmed == "" {
306+
return nil, errors.Errorf("invalid shardMode %q, allowed values: %s", *value, pkgmigration.AllowedTaskShardModeEnumValues)
307+
}
308+
normalized := strings.ToUpper(trimmed)
309+
mode := pkgmigration.TaskShardMode(normalized)
310+
if slices.Contains(pkgmigration.AllowedTaskShardModeEnumValues, mode) {
311+
return &mode, nil
312+
}
313+
return nil, errors.Errorf("invalid shardMode %q, allowed values: %s", *value, pkgmigration.AllowedTaskShardModeEnumValues)
314+
}
315+
293316
// standardizeJSON accepts JSON With Commas and Comments(JWCC) see
294317
// https://nigeltao.github.io/blog/2021/json-with-commas-comments.html) and
295318
// returns a standard JSON byte slice ready for json.Unmarshal.

internal/cli/serverless/migration/create_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func (suite *CreateMigrationSuite) TestCreateMigration() {
7272
return body != nil &&
7373
body.DisplayName == displayName &&
7474
body.Mode == pkgmigration.TASKMODE_ALL &&
75+
body.ShardMode != nil &&
76+
*body.ShardMode == pkgmigration.TASKSHARDMODE_PESSIMISTIC &&
7577
len(body.Sources) == 1 &&
7678
body.Target.User == "migration_user"
7779
}),
@@ -98,6 +100,7 @@ func (suite *CreateMigrationSuite) TestCreateMigrationInvalidInputs() {
98100
blankPath := suite.writeTempConfig(" ")
99101
invalidJSONPath := suite.writeTempConfig("{invalid")
100102
invalidModePath := suite.writeTempConfig(`{ "mode": "invalid", "target": {"user":"u","password":"p"}, "sources": [{"sourceType":"MYSQL","connProfile":{"connType":"PUBLIC","host":"h","port":3306,"user":"u","password":"p"}}] }`)
103+
invalidShardModePath := suite.writeTempConfig(`{ "mode": "ALL", "shardMode": "invalid", "target": {"user":"u","password":"p"}, "sources": [{"sourceType":"MYSQL","connProfile":{"connType":"PUBLIC","host":"h","port":3306,"user":"u","password":"p"}}] }`)
101104

102105
tests := []struct {
103106
name string
@@ -124,6 +127,11 @@ func (suite *CreateMigrationSuite) TestCreateMigrationInvalidInputs() {
124127
args: []string{"--cluster-id", "c1", "--display-name", "name", "--config-file", invalidModePath},
125128
errContains: "invalid mode",
126129
},
130+
{
131+
name: "invalid shard mode",
132+
args: []string{"--cluster-id", "c1", "--display-name", "name", "--config-file", invalidShardModePath},
133+
errContains: "invalid shardMode",
134+
},
127135
}
128136

129137
for _, tt := range tests {
@@ -152,6 +160,7 @@ func (suite *CreateMigrationSuite) writeTempConfig(content string) string {
152160
func validMigrationConfig() string {
153161
return `{
154162
"mode": "ALL",
163+
"shardMode": "PESSIMISTIC",
155164
"target": {
156165
"user": "migration_user",
157166
"password": "Passw0rd!"

internal/cli/serverless/migration/template.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const (
3131
migrationDefinitionAllTemplate = `{
3232
// Required migration mode. Use "ALL" for full + incremental.
3333
"mode": "ALL",
34+
// Optional shard DDL coordination mode. Only set this for shard merge
35+
// (sharded database/table) scenarios. See:
36+
// https://docs.pingcap.com/tidb/stable/dm-shard-merge/
37+
// "shardMode": "PESSIMISTIC",
3438
// Target TiDB Cloud user credentials used by the migration
3539
"target": {
3640
"user": "migration_user",
@@ -106,6 +110,10 @@ const (
106110
migrationDefinitionIncrementalTemplate = `{
107111
// Incremental-only mode keeps the source and target in sync
108112
"mode": "INCREMENTAL",
113+
// Optional shard DDL coordination mode. Only set this for shard merge
114+
// (sharded database/table) scenarios. See:
115+
// https://docs.pingcap.com/tidb/stable/dm-shard-merge/
116+
// "shardMode": "PESSIMISTIC",
109117
// Target TiDB Cloud user credentials used by the migration
110118
"target": {
111119
"user": "migration_user",

pkg/tidbcloud/v1beta1/serverless/dm.swagger.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,14 @@
824824
"$ref": "#/definitions/TaskMode"
825825
}
826826
]
827+
},
828+
"shardMode": {
829+
"description": "Optional shard mode for shard DDL coordination.",
830+
"allOf": [
831+
{
832+
"$ref": "#/definitions/TaskShardMode"
833+
}
834+
]
827835
}
828836
},
829837
"required": ["displayName", "sources", "target", "mode"]
@@ -862,6 +870,14 @@
862870
"$ref": "#/definitions/TaskMode"
863871
}
864872
]
873+
},
874+
"shardMode": {
875+
"description": "Optional shard mode for shard DDL coordination.",
876+
"allOf": [
877+
{
878+
"$ref": "#/definitions/TaskShardMode"
879+
}
880+
]
865881
}
866882
},
867883
"required": ["displayName", "sources", "target", "mode"]
@@ -1150,6 +1166,11 @@
11501166
"type": "string",
11511167
"enum": ["ALL", "INCREMENTAL"],
11521168
"description": "Migration task mode.\n\n - ALL: Full + incremental migration (all phases).\n - INCREMENTAL: Incremental-only migration (replication)."
1169+
},
1170+
"TaskShardMode": {
1171+
"type": "string",
1172+
"enum": ["PESSIMISTIC", "OPTIMISTIC"],
1173+
"description": "Migration shard mode.\n\n - PESSIMISTIC: Use pessimistic shard DDL coordination.\n - OPTIMISTIC: Use optimistic shard DDL coordination."
11531174
}
11541175
}
11551176
}

pkg/tidbcloud/v1beta1/serverless/migration/.openapi-generator/FILES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ model_sub_task_step.go
3636
model_sync_detail.go
3737
model_target.go
3838
model_task_mode.go
39+
model_task_shard_mode.go
3940
response.go
4041
utils.go

pkg/tidbcloud/v1beta1/serverless/migration/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Class | Method | HTTP request | Description
120120
- [SyncDetail](docs/SyncDetail.md)
121121
- [Target](docs/Target.md)
122122
- [TaskMode](docs/TaskMode.md)
123+
- [TaskShardMode](docs/TaskShardMode.md)
123124

124125

125126
## Documentation For Authorization

pkg/tidbcloud/v1beta1/serverless/migration/api/openapi.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,11 @@ components:
778778
- $ref: '#/components/schemas/TaskMode'
779779
description: The migration mode (full+incremental or incremental-only).
780780
type: object
781+
shardMode:
782+
allOf:
783+
- $ref: '#/components/schemas/TaskShardMode'
784+
description: Optional shard mode for shard DDL coordination.
785+
type: object
781786
required:
782787
- displayName
783788
- mode
@@ -807,6 +812,11 @@ components:
807812
- $ref: '#/components/schemas/TaskMode'
808813
description: The migration mode (full+incremental or incremental-only).
809814
type: object
815+
shardMode:
816+
allOf:
817+
- $ref: '#/components/schemas/TaskShardMode'
818+
description: Optional shard mode for shard DDL coordination.
819+
type: object
810820
required:
811821
- displayName
812822
- mode
@@ -1099,4 +1109,14 @@ components:
10991109
- ALL
11001110
- INCREMENTAL
11011111
type: string
1112+
TaskShardMode:
1113+
description: |-
1114+
Migration shard mode.
1115+
1116+
- PESSIMISTIC: Use pessimistic shard DDL coordination.
1117+
- OPTIMISTIC: Use optimistic shard DDL coordination.
1118+
enum:
1119+
- PESSIMISTIC
1120+
- OPTIMISTIC
1121+
type: string
11021122
x-original-swagger-version: "2.0"

pkg/tidbcloud/v1beta1/serverless/migration/model_migration_service_create_migration_body.go

Lines changed: 39 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/tidbcloud/v1beta1/serverless/migration/model_migration_service_precheck_body.go

Lines changed: 39 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)