Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ dist/
.tidbcloud-cli.toml
.CHANGELOG.md
node_modules/
.gomodcache/
.gocache/
43 changes: 23 additions & 20 deletions internal/cli/serverless/migration/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,28 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
}
definitionStr := string(definitionBytes)

sources, target, mode, err := parseMigrationDefinition(definitionStr)
sources, target, mode, binlogFilterRule, err := parseMigrationDefinition(definitionStr)
if err != nil {
return err
}

if dryRun {
precheckBody := &pkgmigration.MigrationServicePrecheckBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
BinlogFilterRule: binlogFilterRule,
}
return runMigrationPrecheck(ctx, d, clusterID, precheckBody, h)
}

createBody := &pkgmigration.MigrationServiceCreateMigrationBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
BinlogFilterRule: binlogFilterRule,
}

resp, err := d.CreateMigration(ctx, clusterID, createBody)
Expand Down Expand Up @@ -247,34 +249,35 @@ func shouldPrintPrecheckItem(status *pkgmigration.PrecheckItemStatus) bool {
}
}

func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, error) {
func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, *pkgmigration.BinlogFilterRule, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return nil, pkgmigration.Target{}, "", errors.New("migration config is required; use --config-file")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration config is required; use --config-file")
}
var payload struct {
Sources []pkgmigration.Source `json:"sources"`
Target *pkgmigration.Target `json:"target"`
Mode string `json:"mode"`
Sources []pkgmigration.Source `json:"sources"`
Target *pkgmigration.Target `json:"target"`
Mode string `json:"mode"`
BinlogFilterRule *pkgmigration.BinlogFilterRule `json:"binlogFilterRule"`
}
stdJson, err := standardizeJSON([]byte(trimmed))
if err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
}
if err := json.Unmarshal(stdJson, &payload); err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
return nil, pkgmigration.Target{}, "", nil, errors.Annotate(err, "invalid migration definition JSON")
}
if len(payload.Sources) == 0 {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include at least one source")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include at least one source")
}
if payload.Target == nil {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include the target block")
return nil, pkgmigration.Target{}, "", nil, errors.New("migration definition must include the target block")
}
mode, err := parseMigrationMode(payload.Mode)
if err != nil {
return nil, pkgmigration.Target{}, "", err
return nil, pkgmigration.Target{}, "", nil, err
}
return payload.Sources, *payload.Target, mode, nil
return payload.Sources, *payload.Target, mode, payload.BinlogFilterRule, nil
}

func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
Expand Down
17 changes: 17 additions & 0 deletions internal/cli/serverless/migration/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (suite *CreateMigrationSuite) writeTempConfig(content string) string {
func validMigrationConfig() string {
return `{
"mode": "ALL",
"binlogFilterRule": {
"ignoreEvent": ["truncate table", "drop database"],
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
"target": {
"user": "migration_user",
"password": "Passw0rd!"
Expand All @@ -171,6 +175,19 @@ func validMigrationConfig() string {
}`
}

func TestParseMigrationDefinition_BinlogFilterRule(t *testing.T) {
assert := require.New(t)

sources, target, mode, binlogFilterRule, err := parseMigrationDefinition(validMigrationConfig())
assert.NoError(err)
assert.Equal(pkgmigration.TASKMODE_ALL, mode)
assert.NotNil(binlogFilterRule)
assert.Equal([]string{"truncate table", "drop database"}, binlogFilterRule.IgnoreEvent)
assert.Equal([]string{`^DROP\s+TABLE.*`, `^TRUNCATE\s+TABLE.*`}, binlogFilterRule.IgnoreSql)
assert.NotEmpty(sources)
assert.Equal("migration_user", target.User)
}

func TestCreateMigrationSuite(t *testing.T) {
suite.Run(t, new(CreateMigrationSuite))
}
14 changes: 14 additions & 0 deletions internal/cli/serverless/migration/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
"user": "migration_user",
"password": "Passw0rd!"
},
// Optional global binlog filter rules applied during incremental replication.
"binlogFilterRule": {
// Event types to ignore, see https://docs.pingcap.com/tidb/stable/dm-binlog-event-filter/#parameter-descriptions .
"ignoreEvent": ["truncate table", "drop database"],
// SQL patterns to ignore.
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
// List at least one migration source
"sources": [
{
Expand Down Expand Up @@ -111,6 +118,13 @@ const (
"user": "migration_user",
"password": "Passw0rd!"
},
// Optional global binlog filter rules applied during incremental replication.
"binlogFilterRule": {
// Event types to ignore, see https://docs.pingcap.com/tidb/stable/dm-binlog-event-filter/#parameter-descriptions .
"ignoreEvent": ["truncate table", "drop database"],
// SQL patterns to ignore.
"ignoreSql": ["^DROP\\s+TABLE.*", "^TRUNCATE\\s+TABLE.*"]
},
"sources": [
{
// Required: source database type. Supported values: MYSQL, ALICLOUD_RDS_MYSQL, AWS_RDS_MYSQL
Expand Down
136 changes: 121 additions & 15 deletions pkg/tidbcloud/v1beta1/serverless/dm.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,26 @@
},
"additionalProperties": {}
},
"BinlogFilterRule": {
"type": "object",
"properties": {
"ignoreEvent": {
"type": "array",
"items": {
"type": "string"
},
"description": "Event types to ignore (e.g., \"truncate table\", \"drop database\")."
},
"ignoreSql": {
"type": "array",
"items": {
"type": "string"
},
"description": "SQL patterns to ignore."
}
},
"description": "Binlog filter rules applied during incremental replication."
},
"ConnProfile": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -518,11 +538,19 @@
]
}
},
"required": ["connType", "port", "user", "password"]
"required": [
"connType",
"port",
"user",
"password"
]
},
"ConnType": {
"type": "string",
"enum": ["PUBLIC", "PRIVATE_LINK"],
"enum": [
"PUBLIC",
"PRIVATE_LINK"
],
"description": "The connection type used to connect to the source database.\n\n - PUBLIC: Connect over the public internet.\n - PRIVATE_LINK: Connect via Private Link/Private Endpoint."
},
"CreateMigrationPrecheckResp": {
Expand Down Expand Up @@ -680,12 +708,27 @@
"$ref": "#/definitions/Migration.State"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to the migration.",
"readOnly": true,
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
}
},
"Migration.State": {
"type": "string",
"enum": ["CREATING", "RUNNING", "PAUSED", "FAILED", "DELETING"],
"enum": [
"CREATING",
"RUNNING",
"PAUSED",
"FAILED",
"DELETING"
],
"description": "Overall state of a migration.\n\n - CREATING: Task is being created.\n - RUNNING: Task is actively running.\n - PAUSED: Task is paused.\n - FAILED: Task failed with error.\n - DELETING: Task is being deleted."
},
"MigrationPrecheck": {
Expand Down Expand Up @@ -742,7 +785,13 @@
},
"MigrationPrecheck.Status": {
"type": "string",
"enum": ["RUNNING", "FINISHED", "PENDING", "FAILED", "CANCELED"],
"enum": [
"RUNNING",
"FINISHED",
"PENDING",
"FAILED",
"CANCELED"
],
"description": " - RUNNING: Precheck is in progress.\n - FINISHED: Precheck finished successfully.\n - PENDING: Precheck is pending.\n - FAILED: Precheck failed.\n - CANCELED: Precheck is canceled."
},
"MigrationRule": {
Expand Down Expand Up @@ -778,7 +827,9 @@
"description": "Table pattern of the source, supports wildcards."
}
},
"required": ["schemaPattern"]
"required": [
"schemaPattern"
]
},
"MigrationRule.Table": {
"type": "object",
Expand All @@ -792,7 +843,9 @@
"description": "Table name. Wildcards are not supported. Set empty to use the source table name."
}
},
"required": ["schema"]
"required": [
"schema"
]
},
"MigrationService.CreateMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -824,9 +877,22 @@
"$ref": "#/definitions/TaskMode"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to all migrated tables.",
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
},
"required": ["displayName", "sources", "target", "mode"]
"required": [
"displayName",
"sources",
"target",
"mode"
]
},
"MigrationService.PauseMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -862,9 +928,22 @@
"$ref": "#/definitions/TaskMode"
}
]
},
"binlogFilterRule": {
"description": "Global binlog filter rule applied to all migrated tables.",
"allOf": [
{
"$ref": "#/definitions/BinlogFilterRule"
}
]
}
},
"required": ["displayName", "sources", "target", "mode"]
"required": [
"displayName",
"sources",
"target",
"mode"
]
},
"MigrationService.ResumeMigrationBody": {
"type": "object",
Expand Down Expand Up @@ -915,7 +994,11 @@
},
"PrecheckItem.Status": {
"type": "string",
"enum": ["SUCCESS", "WARNING", "FAILED"],
"enum": [
"SUCCESS",
"WARNING",
"FAILED"
],
"description": " - SUCCESS: Check passed successfully.\n - WARNING: Check resulted in a warning.\n - FAILED: Check failed."
},
"PrecheckItemType": {
Expand Down Expand Up @@ -1009,11 +1092,18 @@
]
}
},
"required": ["connProfile", "sourceType"]
"required": [
"connProfile",
"sourceType"
]
},
"Source.SourceType": {
"type": "string",
"enum": ["MYSQL", "ALICLOUD_RDS_MYSQL", "AWS_RDS_MYSQL"],
"enum": [
"MYSQL",
"ALICLOUD_RDS_MYSQL",
"AWS_RDS_MYSQL"
],
"description": "The source database type.\n\n - MYSQL: Self-managed MySQL.\n - ALICLOUD_RDS_MYSQL: Alibaba Cloud RDS for MySQL.\n - AWS_RDS_MYSQL: Amazon RDS for MySQL."
},
"Status": {
Expand Down Expand Up @@ -1102,12 +1192,22 @@
},
"SubTask.Stage": {
"type": "string",
"enum": ["RUNNING", "PAUSED", "FAILED", "FINISHED", "UNKNOWN"],
"enum": [
"RUNNING",
"PAUSED",
"FAILED",
"FINISHED",
"UNKNOWN"
],
"description": "The high-level lifecycle stage of a subtask.\n\n - RUNNING: Subtask is running.\n - PAUSED: Subtask is paused.\n - FAILED: Subtask failed.\n - FINISHED: Subtask finished successfully.\n - UNKNOWN: Subtask stage is unknown."
},
"SubTask.Step": {
"type": "string",
"enum": ["DUMP", "LOAD", "SYNC"],
"enum": [
"DUMP",
"LOAD",
"SYNC"
],
"description": "The current step within a subtask.\n\n - DUMP: Dump/export data from source.\n - LOAD: Load/import data into target.\n - SYNC: Sync/replicate binlog changes."
},
"SyncDetail": {
Expand Down Expand Up @@ -1144,11 +1244,17 @@
"description": "Target database password."
}
},
"required": ["user", "password"]
"required": [
"user",
"password"
]
},
"TaskMode": {
"type": "string",
"enum": ["ALL", "INCREMENTAL"],
"enum": [
"ALL",
"INCREMENTAL"
],
"description": "Migration task mode.\n\n - ALL: Full + incremental migration (all phases).\n - INCREMENTAL: Incremental-only migration (replication)."
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ client.go
configuration.go
git_push.sh
model_any.go
model_binlog_filter_rule.go
model_conn_profile.go
model_conn_type.go
model_create_migration_precheck_resp.go
Expand Down
Loading
Loading