Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/microservice/aslan/config/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ const (
DBInstanceTypeMariaDB DBInstanceType = "mariadb"
)

type DMSJobExecuteMode string

const (
DMSJobExecuteModeParallel DMSJobExecuteMode = "parallel"
DMSJobExecuteModeSerial DMSJobExecuteMode = "serial"
)

type ObservabilityType string

const (
Expand Down Expand Up @@ -761,7 +768,7 @@ type ValueMergeStrategy string

const (
ValueMergeStrategyReuseValue ValueMergeStrategy = "reuse-values"
ValueMergeStrategyOverride ValueMergeStrategy= "override"
ValueMergeStrategyOverride ValueMergeStrategy = "override"
)

type YAMLMergeStrategy string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ type JobTaskDeploySpec struct {
ReplaceResources []Resource `bson:"replace_resources" json:"replace_resources" yaml:"replace_resources"`
RelatedPodLabels []map[string]string `bson:"-" json:"-" yaml:"-"`
// overrideResource is used to do a full yaml override instead of a 2-way merge patching for all the resources
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
// for compatibility
ServiceModule string `bson:"service_module" json:"service_module" yaml:"-"`
Image string `bson:"image" json:"image" yaml:"-"`
Expand Down Expand Up @@ -266,7 +266,7 @@ type JobTaskDeployRevertSpec struct {
OverrideKVs string `bson:"override_kvs" json:"override_kvs" yaml:"override_kvs"`
Revision int64 `bson:"revision" json:"revision" yaml:"revision"`
RevisionCreateTime int64 `bson:"revision_create_time" json:"revision_create_time" yaml:"revision_create_time"`
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
}

type DeployServiceModule struct {
Expand Down Expand Up @@ -614,8 +614,9 @@ type SQLExecResult struct {
}

type JobTaskDMSSpec struct {
ID string `bson:"id" json:"id" yaml:"id"`
Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"`
ID string `bson:"id" json:"id" yaml:"id"`
ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"`
Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"`
}

type DMSTaskOrder struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ type SQLJobSpec struct {
type DMSJobSpec struct {
ID string `bson:"id" json:"id" yaml:"id"`
RemarkTemplate string `bson:"remark_template" json:"remark_template" yaml:"remark_template"`
ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"`
Orders []*DMSOrder `bson:"orders" json:"orders" yaml:"orders"`
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package jobcontroller

import (
"context"
"strings"
"time"

dms "github.com/alibabacloud-go/dms-enterprise-20181101/v3/client"
Expand Down Expand Up @@ -72,9 +73,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
return
}

switch normalizeDMSJobExecuteMode(c.jobTaskSpec.ExecuteMode) {
case config.DMSJobExecuteModeSerial:
c.runSerial(ctx, client)
default:
c.runParallel(ctx, client)
}
}

func (c *DMSJobCtl) runParallel(ctx context.Context, client *dms.Client) {
failed := false
for _, order := range c.jobTaskSpec.Orders {
err = execDMSDataCorrectOrder(ctx, client, order.ID)
err := execDMSDataCorrectOrder(ctx, client, order.ID)
if err != nil {
failed = true
order.Error = err.Error()
Expand All @@ -86,21 +96,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
for {
c.ack()

select {
case <-ctx.Done():
c.job.Status = config.StatusCancelled
logError(c.job, "job cancelled", c.logger)
if c.checkCancelled(ctx) {
return
default:
}

allDone := true
for _, order := range c.jobTaskSpec.Orders {
if order.Error != "" {
failed = true
continue
}

if order.JobStatus == "FAIL" || order.JobStatus == "SUCCESS" || order.JobStatus == "DELETE" {
if isDMSOrderDone(order.JobStatus) {
if order.JobStatus == "FAIL" {
failed = true
}
Expand All @@ -118,6 +125,9 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
}

order.JobStatus = tea.StringValue(taskDetail.GetJobStatus())
if order.JobStatus == "FAIL" {
failed = true
}
}

if allDone {
Expand All @@ -133,6 +143,74 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
}
}

func (c *DMSJobCtl) runSerial(ctx context.Context, client *dms.Client) {
for _, order := range c.jobTaskSpec.Orders {
if c.checkCancelled(ctx) {
return
}

err := execDMSDataCorrectOrder(ctx, client, order.ID)
if err != nil {
order.Error = err.Error()
logError(c.job, err.Error(), c.logger)
c.job.Status = config.StatusFailed
return
}

for {
c.ack()

if c.checkCancelled(ctx) {
return
}

taskDetail, err := getDMSDataCorrectTaskDetail(ctx, client, order.ID)
if err != nil {
order.Error = err.Error()
logError(c.job, err.Error(), c.logger)
c.job.Status = config.StatusFailed
return
}

order.JobStatus = tea.StringValue(taskDetail.GetJobStatus())
if !isDMSOrderDone(order.JobStatus) {
time.Sleep(time.Second * 3)
continue
}
if order.JobStatus == "FAIL" {
c.job.Status = config.StatusFailed
return
}
break
}
}
c.job.Status = config.StatusPassed
}

func (c *DMSJobCtl) checkCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
c.job.Status = config.StatusCancelled
logError(c.job, "job cancelled", c.logger)
return true
default:
return false
}
}

func isDMSOrderDone(status string) bool {
return status == "FAIL" || status == "SUCCESS" || status == "DELETE"
}

func normalizeDMSJobExecuteMode(mode string) config.DMSJobExecuteMode {
switch strings.ToLower(mode) {
case string(config.DMSJobExecuteModeSerial):
return config.DMSJobExecuteModeSerial
default:
return config.DMSJobExecuteModeParallel
}
}

func (c *DMSJobCtl) SaveInfo(ctx context.Context) error {
return mongodb.NewJobInfoColl().Create(context.TODO(), &commonmodels.JobInfo{
Type: c.job.JobType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (j DMSJobController) Validate(isExecution bool) error {
if j.jobSpec.ID != currJobSpec.ID {
return fmt.Errorf("given apollo job spec does not match current apollo job")
}
if mode := strings.ToLower(j.jobSpec.ExecuteMode); mode != "" &&
mode != string(config.DMSJobExecuteModeParallel) &&
mode != string(config.DMSJobExecuteModeSerial) {
return fmt.Errorf("invalid dms execute mode: %s", j.jobSpec.ExecuteMode)
}

if isExecution {
}
Expand Down Expand Up @@ -109,6 +114,7 @@ func (j DMSJobController) Update(useUserInput bool, ticket *commonmodels.Approva

j.jobSpec.ID = currJobSpec.ID
j.jobSpec.RemarkTemplate = currJobSpec.RemarkTemplate
j.jobSpec.ExecuteMode = normalizeDMSExecuteMode(currJobSpec.ExecuteMode)

return nil
}
Expand Down Expand Up @@ -138,7 +144,8 @@ func (j DMSJobController) ToTask(taskID int64) ([]*commonmodels.JobTask, error)
},
JobType: string(config.JobDMS),
Spec: &commonmodels.JobTaskDMSSpec{
ID: j.jobSpec.ID,
ID: j.jobSpec.ID,
ExecuteMode: normalizeDMSExecuteMode(j.jobSpec.ExecuteMode),
Orders: func() (list []*commonmodels.DMSTaskOrder) {
for _, order := range j.jobSpec.Orders {
list = append(list, &commonmodels.DMSTaskOrder{
Expand Down Expand Up @@ -188,3 +195,12 @@ func (j DMSJobController) RenderDynamicVariableOptions(key string, option *Rende
func (j DMSJobController) IsServiceTypeJob() bool {
return false
}

func normalizeDMSExecuteMode(mode string) string {
switch strings.ToLower(mode) {
case string(config.DMSJobExecuteModeSerial):
return string(config.DMSJobExecuteModeSerial)
default:
return string(config.DMSJobExecuteModeParallel)
}
}
Loading