diff --git a/pkg/microservice/aslan/config/consts.go b/pkg/microservice/aslan/config/consts.go index bae6ed987e..29ffc8b71c 100644 --- a/pkg/microservice/aslan/config/consts.go +++ b/pkg/microservice/aslan/config/consts.go @@ -276,6 +276,13 @@ const ( DBInstanceTypeMariaDB DBInstanceType = "mariadb" ) +type DMSJobExecuteMode string + +const ( + DMSJobExecuteModeParallel DMSJobExecuteMode = "parallel" + DMSJobExecuteModeSerial DMSJobExecuteMode = "serial" +) + type ObservabilityType string const ( @@ -761,7 +768,7 @@ type ValueMergeStrategy string const ( ValueMergeStrategyReuseValue ValueMergeStrategy = "reuse-values" - ValueMergeStrategyOverride ValueMergeStrategy= "override" + ValueMergeStrategyOverride ValueMergeStrategy = "override" ) type YAMLMergeStrategy string diff --git a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go index af2aafb7b3..6df9c5bfa8 100644 --- a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go +++ b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go @@ -234,7 +234,7 @@ type JobTaskDeploySpec struct { ReplaceResources []Resource `bson:"replace_resources" json:"replace_resources" yaml:"replace_resources"` RelatedPodLabels []map[string]string `bson:"-" json:"-" yaml:"-"` // overrideResource is used to do a full yaml override instead of a 2-way merge patching for all the resources - OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"` + OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"` // for compatibility ServiceModule string `bson:"service_module" json:"service_module" yaml:"-"` Image string `bson:"image" json:"image" yaml:"-"` @@ -266,7 +266,7 @@ type JobTaskDeployRevertSpec struct { OverrideKVs string `bson:"override_kvs" json:"override_kvs" yaml:"override_kvs"` Revision int64 `bson:"revision" json:"revision" yaml:"revision"` RevisionCreateTime int64 `bson:"revision_create_time" json:"revision_create_time" yaml:"revision_create_time"` - OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"` + OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"` } type DeployServiceModule struct { @@ -614,8 +614,9 @@ type SQLExecResult struct { } type JobTaskDMSSpec struct { - ID string `bson:"id" json:"id" yaml:"id"` - Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"` + ID string `bson:"id" json:"id" yaml:"id"` + ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"` + Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"` } type DMSTaskOrder struct { diff --git a/pkg/microservice/aslan/core/common/repository/models/workflow_v4.go b/pkg/microservice/aslan/core/common/repository/models/workflow_v4.go index 427c1961b8..ea6efd4ea7 100644 --- a/pkg/microservice/aslan/core/common/repository/models/workflow_v4.go +++ b/pkg/microservice/aslan/core/common/repository/models/workflow_v4.go @@ -1011,6 +1011,7 @@ type SQLJobSpec struct { type DMSJobSpec struct { ID string `bson:"id" json:"id" yaml:"id"` RemarkTemplate string `bson:"remark_template" json:"remark_template" yaml:"remark_template"` + ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"` Orders []*DMSOrder `bson:"orders" json:"orders" yaml:"orders"` } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_dms.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_dms.go index cc540a6a0d..8a4aa7126d 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_dms.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_dms.go @@ -18,6 +18,7 @@ package jobcontroller import ( "context" + "strings" "time" dms "github.com/alibabacloud-go/dms-enterprise-20181101/v3/client" @@ -72,9 +73,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) { return } + switch normalizeDMSJobExecuteMode(c.jobTaskSpec.ExecuteMode) { + case config.DMSJobExecuteModeSerial: + c.runSerial(ctx, client) + default: + c.runParallel(ctx, client) + } +} + +func (c *DMSJobCtl) runParallel(ctx context.Context, client *dms.Client) { failed := false for _, order := range c.jobTaskSpec.Orders { - err = execDMSDataCorrectOrder(ctx, client, order.ID) + err := execDMSDataCorrectOrder(ctx, client, order.ID) if err != nil { failed = true order.Error = err.Error() @@ -86,21 +96,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) { for { c.ack() - select { - case <-ctx.Done(): - c.job.Status = config.StatusCancelled - logError(c.job, "job cancelled", c.logger) + if c.checkCancelled(ctx) { return - default: } allDone := true for _, order := range c.jobTaskSpec.Orders { if order.Error != "" { + failed = true continue } - if order.JobStatus == "FAIL" || order.JobStatus == "SUCCESS" || order.JobStatus == "DELETE" { + if isDMSOrderDone(order.JobStatus) { if order.JobStatus == "FAIL" { failed = true } @@ -118,6 +125,9 @@ func (c *DMSJobCtl) Run(ctx context.Context) { } order.JobStatus = tea.StringValue(taskDetail.GetJobStatus()) + if order.JobStatus == "FAIL" { + failed = true + } } if allDone { @@ -133,6 +143,74 @@ func (c *DMSJobCtl) Run(ctx context.Context) { } } +func (c *DMSJobCtl) runSerial(ctx context.Context, client *dms.Client) { + for _, order := range c.jobTaskSpec.Orders { + if c.checkCancelled(ctx) { + return + } + + err := execDMSDataCorrectOrder(ctx, client, order.ID) + if err != nil { + order.Error = err.Error() + logError(c.job, err.Error(), c.logger) + c.job.Status = config.StatusFailed + return + } + + for { + c.ack() + + if c.checkCancelled(ctx) { + return + } + + taskDetail, err := getDMSDataCorrectTaskDetail(ctx, client, order.ID) + if err != nil { + order.Error = err.Error() + logError(c.job, err.Error(), c.logger) + c.job.Status = config.StatusFailed + return + } + + order.JobStatus = tea.StringValue(taskDetail.GetJobStatus()) + if !isDMSOrderDone(order.JobStatus) { + time.Sleep(time.Second * 3) + continue + } + if order.JobStatus == "FAIL" { + c.job.Status = config.StatusFailed + return + } + break + } + } + c.job.Status = config.StatusPassed +} + +func (c *DMSJobCtl) checkCancelled(ctx context.Context) bool { + select { + case <-ctx.Done(): + c.job.Status = config.StatusCancelled + logError(c.job, "job cancelled", c.logger) + return true + default: + return false + } +} + +func isDMSOrderDone(status string) bool { + return status == "FAIL" || status == "SUCCESS" || status == "DELETE" +} + +func normalizeDMSJobExecuteMode(mode string) config.DMSJobExecuteMode { + switch strings.ToLower(mode) { + case string(config.DMSJobExecuteModeSerial): + return config.DMSJobExecuteModeSerial + default: + return config.DMSJobExecuteModeParallel + } +} + func (c *DMSJobCtl) SaveInfo(ctx context.Context) error { return mongodb.NewJobInfoColl().Create(context.TODO(), &commonmodels.JobInfo{ Type: c.job.JobType, diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_dms.go b/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_dms.go index 2cc1221c23..a225ed0681 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_dms.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_dms.go @@ -80,6 +80,11 @@ func (j DMSJobController) Validate(isExecution bool) error { if j.jobSpec.ID != currJobSpec.ID { return fmt.Errorf("given apollo job spec does not match current apollo job") } + if mode := strings.ToLower(j.jobSpec.ExecuteMode); mode != "" && + mode != string(config.DMSJobExecuteModeParallel) && + mode != string(config.DMSJobExecuteModeSerial) { + return fmt.Errorf("invalid dms execute mode: %s", j.jobSpec.ExecuteMode) + } if isExecution { } @@ -109,6 +114,7 @@ func (j DMSJobController) Update(useUserInput bool, ticket *commonmodels.Approva j.jobSpec.ID = currJobSpec.ID j.jobSpec.RemarkTemplate = currJobSpec.RemarkTemplate + j.jobSpec.ExecuteMode = normalizeDMSExecuteMode(currJobSpec.ExecuteMode) return nil } @@ -138,7 +144,8 @@ func (j DMSJobController) ToTask(taskID int64) ([]*commonmodels.JobTask, error) }, JobType: string(config.JobDMS), Spec: &commonmodels.JobTaskDMSSpec{ - ID: j.jobSpec.ID, + ID: j.jobSpec.ID, + ExecuteMode: normalizeDMSExecuteMode(j.jobSpec.ExecuteMode), Orders: func() (list []*commonmodels.DMSTaskOrder) { for _, order := range j.jobSpec.Orders { list = append(list, &commonmodels.DMSTaskOrder{ @@ -188,3 +195,12 @@ func (j DMSJobController) RenderDynamicVariableOptions(key string, option *Rende func (j DMSJobController) IsServiceTypeJob() bool { return false } + +func normalizeDMSExecuteMode(mode string) string { + switch strings.ToLower(mode) { + case string(config.DMSJobExecuteModeSerial): + return string(config.DMSJobExecuteModeSerial) + default: + return string(config.DMSJobExecuteModeParallel) + } +}