|
15 | 15 | */ |
16 | 16 | package com.oceanbase.odc.service.schedule.job; |
17 | 17 |
|
| 18 | +import java.util.Map; |
18 | 19 | import java.util.Optional; |
19 | 20 |
|
20 | 21 | import org.quartz.JobExecutionContext; |
21 | 22 |
|
| 23 | +import com.fasterxml.jackson.core.type.TypeReference; |
22 | 24 | import com.oceanbase.odc.common.json.JsonUtils; |
23 | 25 | import com.oceanbase.odc.core.shared.constant.TaskStatus; |
24 | 26 | import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity; |
25 | 27 | import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; |
26 | 28 | import com.oceanbase.odc.service.schedule.model.DataArchiveRollbackParameters; |
| 29 | +import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; |
27 | 30 | import com.oceanbase.tools.migrator.common.configure.DataSourceInfo; |
28 | 31 | import com.oceanbase.tools.migrator.common.enums.JobType; |
29 | 32 |
|
@@ -58,20 +61,29 @@ public void executeJob(JobExecutionContext context) { |
58 | 61 | DataArchiveParameters dataArchiveParameters = JsonUtils.fromJson(dataArchiveTask.getParametersJson(), |
59 | 62 | DataArchiveParameters.class); |
60 | 63 | // execute in task framework. |
61 | | - DLMJobReq parameters = getDLMJobReq(dataArchiveTask.getJobId()); |
62 | | - parameters.setJobType(JobType.ROLLBACK); |
63 | | - DataSourceInfo tempDataSource = parameters.getSourceDs(); |
64 | | - parameters.setSourceDs(parameters.getTargetDs()); |
65 | | - parameters.setTargetDs(tempDataSource); |
66 | | - parameters.setFireTime(context.getFireTime()); |
| 64 | + DLMJobReq parameters; |
| 65 | + if (taskEntity.getJobId() != null) { |
| 66 | + parameters = JsonUtils.fromJson(JsonUtils.fromJson( |
| 67 | + taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(), |
| 68 | + new TypeReference<Map<String, String>>() {}) |
| 69 | + .get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), |
| 70 | + DLMJobReq.class); |
| 71 | + } else { |
| 72 | + parameters = getDLMJobReq(dataArchiveTask.getJobId()); |
| 73 | + parameters.setJobType(JobType.ROLLBACK); |
| 74 | + DataSourceInfo tempDataSource = parameters.getSourceDs(); |
| 75 | + parameters.setSourceDs(parameters.getTargetDs()); |
| 76 | + parameters.setTargetDs(tempDataSource); |
| 77 | + parameters.setFireTime(context.getFireTime()); |
| 78 | + parameters.getTables().forEach(o -> { |
| 79 | + String temp = o.getTableName(); |
| 80 | + o.setTableName(o.getTargetTableName()); |
| 81 | + o.setTargetTableName(temp); |
| 82 | + }); |
| 83 | + parameters.setScheduleTaskId(taskEntity.getId()); |
| 84 | + } |
67 | 85 | parameters |
68 | 86 | .setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName()))); |
69 | | - parameters.getTables().forEach(o -> { |
70 | | - String temp = o.getTableName(); |
71 | | - o.setTableName(o.getTargetTableName()); |
72 | | - o.setTargetTableName(temp); |
73 | | - }); |
74 | | - parameters.setScheduleTaskId(taskEntity.getId()); |
75 | 87 | Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(), |
76 | 88 | dataArchiveParameters.getSourceDatabaseId()); |
77 | 89 | log.info("Publish DLM job to task framework succeed,scheduleTaskId={},jobIdentity={}", taskEntity.getId(), |
|
0 commit comments