Skip to content

Commit cd00457

Browse files
authored
feat(workflow): add serial/parallel execute mode for DMS job (#4600)
* feat(workflow): add serial/parallel execute mode for DMS job Signed-off-by: YuTang Song <2313186065@qq.com> * fix(dms): make parallel execution fail-fast on first order failure Signed-off-by: YuTang Song <2313186065@qq.com> * feat(dms): support serial/parallel execution modes for DMS job controller Signed-off-by: YuTang Song <2313186065@qq.com> --------- Signed-off-by: YuTang Song <2313186065@qq.com>
1 parent 316664e commit cd00457

5 files changed

Lines changed: 115 additions & 12 deletions

File tree

pkg/microservice/aslan/config/consts.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ const (
276276
DBInstanceTypeMariaDB DBInstanceType = "mariadb"
277277
)
278278

279+
type DMSJobExecuteMode string
280+
281+
const (
282+
DMSJobExecuteModeParallel DMSJobExecuteMode = "parallel"
283+
DMSJobExecuteModeSerial DMSJobExecuteMode = "serial"
284+
)
285+
279286
type ObservabilityType string
280287

281288
const (

pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ type JobTaskDeploySpec struct {
234234
ReplaceResources []Resource `bson:"replace_resources" json:"replace_resources" yaml:"replace_resources"`
235235
RelatedPodLabels []map[string]string `bson:"-" json:"-" yaml:"-"`
236236
// overrideResource is used to do a full yaml override instead of a 2-way merge patching for all the resources
237-
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
237+
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
238238
// for compatibility
239239
ServiceModule string `bson:"service_module" json:"service_module" yaml:"-"`
240240
Image string `bson:"image" json:"image" yaml:"-"`
@@ -266,7 +266,7 @@ type JobTaskDeployRevertSpec struct {
266266
OverrideKVs string `bson:"override_kvs" json:"override_kvs" yaml:"override_kvs"`
267267
Revision int64 `bson:"revision" json:"revision" yaml:"revision"`
268268
RevisionCreateTime int64 `bson:"revision_create_time" json:"revision_create_time" yaml:"revision_create_time"`
269-
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
269+
OverrideResource bool `bson:"override_resource" json:"override_resource" yaml:"override_resource"`
270270
}
271271

272272
type DeployServiceModule struct {
@@ -614,8 +614,9 @@ type SQLExecResult struct {
614614
}
615615

616616
type JobTaskDMSSpec struct {
617-
ID string `bson:"id" json:"id" yaml:"id"`
618-
Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"`
617+
ID string `bson:"id" json:"id" yaml:"id"`
618+
ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"`
619+
Orders []*DMSTaskOrder `bson:"orders" json:"orders" yaml:"orders"`
619620
}
620621

621622
type DMSTaskOrder struct {

pkg/microservice/aslan/core/common/repository/models/workflow_v4.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ type SQLJobSpec struct {
10111011
type DMSJobSpec struct {
10121012
ID string `bson:"id" json:"id" yaml:"id"`
10131013
RemarkTemplate string `bson:"remark_template" json:"remark_template" yaml:"remark_template"`
1014+
ExecuteMode string `bson:"execute_mode" json:"execute_mode" yaml:"execute_mode"`
10141015
Orders []*DMSOrder `bson:"orders" json:"orders" yaml:"orders"`
10151016
}
10161017

pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_dms.go

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package jobcontroller
1818

1919
import (
2020
"context"
21+
"strings"
2122
"time"
2223

2324
dms "github.com/alibabacloud-go/dms-enterprise-20181101/v3/client"
@@ -72,9 +73,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
7273
return
7374
}
7475

76+
switch normalizeDMSJobExecuteMode(c.jobTaskSpec.ExecuteMode) {
77+
case config.DMSJobExecuteModeSerial:
78+
c.runSerial(ctx, client)
79+
default:
80+
c.runParallel(ctx, client)
81+
}
82+
}
83+
84+
func (c *DMSJobCtl) runParallel(ctx context.Context, client *dms.Client) {
7585
failed := false
7686
for _, order := range c.jobTaskSpec.Orders {
77-
err = execDMSDataCorrectOrder(ctx, client, order.ID)
87+
err := execDMSDataCorrectOrder(ctx, client, order.ID)
7888
if err != nil {
7989
failed = true
8090
order.Error = err.Error()
@@ -86,21 +96,18 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
8696
for {
8797
c.ack()
8898

89-
select {
90-
case <-ctx.Done():
91-
c.job.Status = config.StatusCancelled
92-
logError(c.job, "job cancelled", c.logger)
99+
if c.checkCancelled(ctx) {
93100
return
94-
default:
95101
}
96102

97103
allDone := true
98104
for _, order := range c.jobTaskSpec.Orders {
99105
if order.Error != "" {
106+
failed = true
100107
continue
101108
}
102109

103-
if order.JobStatus == "FAIL" || order.JobStatus == "SUCCESS" || order.JobStatus == "DELETE" {
110+
if isDMSOrderDone(order.JobStatus) {
104111
if order.JobStatus == "FAIL" {
105112
failed = true
106113
}
@@ -118,6 +125,9 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
118125
}
119126

120127
order.JobStatus = tea.StringValue(taskDetail.GetJobStatus())
128+
if order.JobStatus == "FAIL" {
129+
failed = true
130+
}
121131
}
122132

123133
if allDone {
@@ -133,6 +143,74 @@ func (c *DMSJobCtl) Run(ctx context.Context) {
133143
}
134144
}
135145

146+
func (c *DMSJobCtl) runSerial(ctx context.Context, client *dms.Client) {
147+
for _, order := range c.jobTaskSpec.Orders {
148+
if c.checkCancelled(ctx) {
149+
return
150+
}
151+
152+
err := execDMSDataCorrectOrder(ctx, client, order.ID)
153+
if err != nil {
154+
order.Error = err.Error()
155+
logError(c.job, err.Error(), c.logger)
156+
c.job.Status = config.StatusFailed
157+
return
158+
}
159+
160+
for {
161+
c.ack()
162+
163+
if c.checkCancelled(ctx) {
164+
return
165+
}
166+
167+
taskDetail, err := getDMSDataCorrectTaskDetail(ctx, client, order.ID)
168+
if err != nil {
169+
order.Error = err.Error()
170+
logError(c.job, err.Error(), c.logger)
171+
c.job.Status = config.StatusFailed
172+
return
173+
}
174+
175+
order.JobStatus = tea.StringValue(taskDetail.GetJobStatus())
176+
if !isDMSOrderDone(order.JobStatus) {
177+
time.Sleep(time.Second * 3)
178+
continue
179+
}
180+
if order.JobStatus == "FAIL" {
181+
c.job.Status = config.StatusFailed
182+
return
183+
}
184+
break
185+
}
186+
}
187+
c.job.Status = config.StatusPassed
188+
}
189+
190+
func (c *DMSJobCtl) checkCancelled(ctx context.Context) bool {
191+
select {
192+
case <-ctx.Done():
193+
c.job.Status = config.StatusCancelled
194+
logError(c.job, "job cancelled", c.logger)
195+
return true
196+
default:
197+
return false
198+
}
199+
}
200+
201+
func isDMSOrderDone(status string) bool {
202+
return status == "FAIL" || status == "SUCCESS" || status == "DELETE"
203+
}
204+
205+
func normalizeDMSJobExecuteMode(mode string) config.DMSJobExecuteMode {
206+
switch strings.ToLower(mode) {
207+
case string(config.DMSJobExecuteModeSerial):
208+
return config.DMSJobExecuteModeSerial
209+
default:
210+
return config.DMSJobExecuteModeParallel
211+
}
212+
}
213+
136214
func (c *DMSJobCtl) SaveInfo(ctx context.Context) error {
137215
return mongodb.NewJobInfoColl().Create(context.TODO(), &commonmodels.JobInfo{
138216
Type: c.job.JobType,

pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_dms.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ func (j DMSJobController) Validate(isExecution bool) error {
8080
if j.jobSpec.ID != currJobSpec.ID {
8181
return fmt.Errorf("given apollo job spec does not match current apollo job")
8282
}
83+
if mode := strings.ToLower(j.jobSpec.ExecuteMode); mode != "" &&
84+
mode != string(config.DMSJobExecuteModeParallel) &&
85+
mode != string(config.DMSJobExecuteModeSerial) {
86+
return fmt.Errorf("invalid dms execute mode: %s", j.jobSpec.ExecuteMode)
87+
}
8388

8489
if isExecution {
8590
}
@@ -109,6 +114,7 @@ func (j DMSJobController) Update(useUserInput bool, ticket *commonmodels.Approva
109114

110115
j.jobSpec.ID = currJobSpec.ID
111116
j.jobSpec.RemarkTemplate = currJobSpec.RemarkTemplate
117+
j.jobSpec.ExecuteMode = normalizeDMSExecuteMode(currJobSpec.ExecuteMode)
112118

113119
return nil
114120
}
@@ -138,7 +144,8 @@ func (j DMSJobController) ToTask(taskID int64) ([]*commonmodels.JobTask, error)
138144
},
139145
JobType: string(config.JobDMS),
140146
Spec: &commonmodels.JobTaskDMSSpec{
141-
ID: j.jobSpec.ID,
147+
ID: j.jobSpec.ID,
148+
ExecuteMode: normalizeDMSExecuteMode(j.jobSpec.ExecuteMode),
142149
Orders: func() (list []*commonmodels.DMSTaskOrder) {
143150
for _, order := range j.jobSpec.Orders {
144151
list = append(list, &commonmodels.DMSTaskOrder{
@@ -188,3 +195,12 @@ func (j DMSJobController) RenderDynamicVariableOptions(key string, option *Rende
188195
func (j DMSJobController) IsServiceTypeJob() bool {
189196
return false
190197
}
198+
199+
func normalizeDMSExecuteMode(mode string) string {
200+
switch strings.ToLower(mode) {
201+
case string(config.DMSJobExecuteModeSerial):
202+
return string(config.DMSJobExecuteModeSerial)
203+
default:
204+
return string(config.DMSJobExecuteModeParallel)
205+
}
206+
}

0 commit comments

Comments
 (0)