Skip to content

Commit 0785222

Browse files
authored
Merge pull request #4429 from oceanbase/tianke_202502bp1_upgradesdk
fix(dlm): upgrade sdk version to 1.2.3.1
2 parents 99bd0d5 + 76e37e7 commit 0785222

7 files changed

Lines changed: 60 additions & 21 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@
119119
<mina.version>2.1.6</mina.version>
120120

121121
<!-- data-lifecycle-manager version -->
122-
<data-lifecycle-manager.version>1.2.3</data-lifecycle-manager.version>
122+
<data-lifecycle-manager.version>1.2.3.1</data-lifecycle-manager.version>
123123

124124
<!-- plugin version -->
125125
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>

server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException
147147
ps.setString(9, taskGenerator.getPartitionSavePoint());
148148
ps.setString(10, JsonUtils.toJson(taskGenerator.getPartName2MinKey()));
149149
ps.setString(11, JsonUtils.toJson(taskGenerator.getPartName2MaxKey()));
150+
ps.execute();
150151
}
151152
}
152153
}
@@ -219,6 +220,7 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
219220
ps.setString(7,
220221
taskMeta.getCursorPrimaryKey() == null ? "" : taskMeta.getCursorPrimaryKey().toSqlString());
221222
ps.setString(8, taskMeta.getPartitionName());
223+
ps.execute();
222224
}
223225
}
224226
}

server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ public void run(JobExecutionContext context) {
105105
public OdcJob getOdcJob(JobExecutionContext context) {
106106
JobKey key = context.getJobDetail().getKey();
107107
ScheduleTaskType taskType = ScheduleTaskType.valueOf(key.getGroup());
108+
if (taskType == ScheduleTaskType.DATA_ARCHIVE && context.getResult() != null) {
109+
try {
110+
ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
111+
taskType = ScheduleTaskType.valueOf(taskEntity.getJobGroup());
112+
log.info("Recovery task,scheduleTaskId={}", taskEntity.getId());
113+
} catch (Exception e) {
114+
log.warn("Load history task failed,jobKey={}", key);
115+
}
116+
}
108117
switch (taskType) {
109118
case SQL_PLAN:
110119
return new SqlPlanJob();

server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@
1515
*/
1616
package com.oceanbase.odc.service.schedule.job;
1717

18+
import java.util.Map;
1819
import java.util.Optional;
1920

2021
import org.quartz.JobExecutionContext;
2122

23+
import com.fasterxml.jackson.core.type.TypeReference;
2224
import com.oceanbase.odc.common.json.JsonUtils;
2325
import com.oceanbase.odc.core.shared.constant.TaskStatus;
2426
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
2527
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
2628
import com.oceanbase.odc.service.schedule.model.DataArchiveClearParameters;
29+
import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
2730
import com.oceanbase.tools.migrator.common.enums.JobType;
2831

2932
import lombok.extern.slf4j.Slf4j;
@@ -62,11 +65,19 @@ public void executeJob(JobExecutionContext context) {
6265
scheduleTaskRepository.updateStatusById(taskEntity.getId(), TaskStatus.FAILED);
6366
return;
6467
}
65-
66-
DLMJobReq parameters = getDLMJobReq(dataArchiveTask.getJobId());
67-
parameters.setJobType(JobType.DELETE);
68-
parameters.setFireTime(context.getFireTime());
69-
parameters.setScheduleTaskId(taskEntity.getId());
68+
DLMJobReq parameters;
69+
if (taskEntity.getJobId() != null) {
70+
parameters = JsonUtils.fromJson(JsonUtils.fromJson(
71+
taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(),
72+
new TypeReference<Map<String, String>>() {})
73+
.get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON),
74+
DLMJobReq.class);
75+
} else {
76+
parameters = getDLMJobReq(dataArchiveTask.getJobId());
77+
parameters.setJobType(JobType.DELETE);
78+
parameters.setFireTime(context.getFireTime());
79+
parameters.setScheduleTaskId(taskEntity.getId());
80+
}
7081
parameters
7182
.setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName())));
7283
Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(),

server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@
1515
*/
1616
package com.oceanbase.odc.service.schedule.job;
1717

18+
import java.util.Map;
1819
import java.util.Optional;
1920

2021
import org.quartz.JobExecutionContext;
2122

23+
import com.fasterxml.jackson.core.type.TypeReference;
2224
import com.oceanbase.odc.common.json.JsonUtils;
2325
import com.oceanbase.odc.core.shared.constant.TaskStatus;
2426
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
2527
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
2628
import com.oceanbase.odc.service.schedule.model.DataArchiveRollbackParameters;
29+
import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
2730
import com.oceanbase.tools.migrator.common.configure.DataSourceInfo;
2831
import com.oceanbase.tools.migrator.common.enums.JobType;
2932

@@ -58,20 +61,29 @@ public void executeJob(JobExecutionContext context) {
5861
DataArchiveParameters dataArchiveParameters = JsonUtils.fromJson(dataArchiveTask.getParametersJson(),
5962
DataArchiveParameters.class);
6063
// execute in task framework.
61-
DLMJobReq parameters = getDLMJobReq(dataArchiveTask.getJobId());
62-
parameters.setJobType(JobType.ROLLBACK);
63-
DataSourceInfo tempDataSource = parameters.getSourceDs();
64-
parameters.setSourceDs(parameters.getTargetDs());
65-
parameters.setTargetDs(tempDataSource);
66-
parameters.setFireTime(context.getFireTime());
64+
DLMJobReq parameters;
65+
if (taskEntity.getJobId() != null) {
66+
parameters = JsonUtils.fromJson(JsonUtils.fromJson(
67+
taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(),
68+
new TypeReference<Map<String, String>>() {})
69+
.get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON),
70+
DLMJobReq.class);
71+
} else {
72+
parameters = getDLMJobReq(dataArchiveTask.getJobId());
73+
parameters.setJobType(JobType.ROLLBACK);
74+
DataSourceInfo tempDataSource = parameters.getSourceDs();
75+
parameters.setSourceDs(parameters.getTargetDs());
76+
parameters.setTargetDs(tempDataSource);
77+
parameters.setFireTime(context.getFireTime());
78+
parameters.getTables().forEach(o -> {
79+
String temp = o.getTableName();
80+
o.setTableName(o.getTargetTableName());
81+
o.setTargetTableName(temp);
82+
});
83+
parameters.setScheduleTaskId(taskEntity.getId());
84+
}
6785
parameters
6886
.setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName())));
69-
parameters.getTables().forEach(o -> {
70-
String temp = o.getTableName();
71-
o.setTableName(o.getTargetTableName());
72-
o.setTargetTableName(temp);
73-
});
74-
parameters.setScheduleTaskId(taskEntity.getId());
7587
Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(),
7688
dataArchiveParameters.getSourceDatabaseId());
7789
log.info("Publish DLM job to task framework succeed,scheduleTaskId={},jobIdentity={}", taskEntity.getId(),

server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.Optional;
23+
import java.util.stream.Collectors;
2324

2425
import org.springframework.beans.factory.annotation.Autowired;
2526
import org.springframework.stereotype.Component;
@@ -67,9 +68,12 @@ public void onEvent(JobTerminateEvent event) {
6768
TaskStatus taskStatus = TerminateProcessor.correctTaskStatus(terminateProcessors, jobEntity.getJobType(),
6869
scheduleTask, event.getStatus().convertTaskStatus());
6970
// correct current local variable to right status
71+
scheduleTaskService.updateStatusById(scheduleTask.getId(), taskStatus,
72+
TaskStatus.getProcessingStatus().stream().map(TaskStatus::name).collect(
73+
Collectors.toList()));
74+
log.info("Update schedule task status from {} to {} succeed,scheduleTaskId={}", scheduleTask.getStatus(),
75+
taskStatus, scheduleTask.getId());
7076
scheduleTask.setStatus(taskStatus);
71-
scheduleTaskService.updateStatusById(scheduleTask.getId(), taskStatus);
72-
log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, scheduleTask.getId());
7377
// Refresh the schedule status after the task is completed.
7478
scheduleService.refreshScheduleStatus(Long.parseLong(scheduleTask.getJobName()));
7579
// Trigger the alarm if the task is failed or canceled.

server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus curren
5959
}
6060
});
6161
dlmService.createOrUpdateDlmTableUnits(dlmTableUnits);
62-
return dlmService.getFinalTaskStatus(scheduleTask.getId());
62+
return currentStatus == TaskStatus.EXEC_TIMEOUT || currentStatus == TaskStatus.CANCELED ? TaskStatus.CANCELED
63+
: dlmService.getFinalTaskStatus(scheduleTask.getId());
6364
}
6465

6566
@Override

0 commit comments

Comments
 (0)