diff --git a/pom.xml b/pom.xml
index e1e32a9859..42262c105b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -782,11 +782,6 @@
commons-compress
${commons-compress.version}
-
- commons-codec
- commons-codec
- 1.14
-
org.apache.commons
commons-collections4
diff --git a/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java b/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java
index c16f673f29..f7f14aba64 100644
--- a/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java
+++ b/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java
@@ -22,7 +22,7 @@
import org.slf4j.MDC;
public abstract class RouteLogCallable implements Callable {
-
+ public static final String LOG_PATH_PATTERN = "%s/%s/%s/%s.log";
protected static Logger log = LogManager.getLogger(RouteLogCallable.class);
private final String workSpace;
private final String taskId;
diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java
index 66aaab9581..fbc13a6c5a 100644
--- a/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java
+++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java
@@ -27,6 +27,7 @@ public final class AlarmUtils {
* Base alarm message names
*/
public static final String CLUSTER_NAME = "Cluster";
+ public static final String INSTANCE_NAME = "instanceId";
public static final String TENANT_NAME = "Tenant";
public static final String ORGANIZATION_NAME = "OrganizationId";
public static final String MESSAGE_NAME = "Message";
@@ -42,6 +43,7 @@ public final class AlarmUtils {
public static final String RESOURCE_TYPE = "ResourceType";
public static final String TASK_TYPE_NAME = "TaskType";
public static final String SCHEDULE_ID_NAME = "ScheduleId";
+ public static final String FLOW_INSTANCE_ID_NAME = "FlowInstanceId";
public static final Collection TASK_FRAMEWORK_ALARM_DIGEST_NAMES =
Arrays.asList(CLUSTER_NAME, TENANT_NAME, SCHEDULE_ID_NAME);
diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java
index 0959ec64dc..d3193a3874 100644
--- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java
+++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java
@@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.server.web.controller.v2;
+import java.util.List;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -25,7 +27,10 @@
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.schedule.export.ScheduleExportService;
import com.oceanbase.odc.service.schedule.export.model.FileExportResponse;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest;
+import com.oceanbase.odc.service.state.model.StateName;
+import com.oceanbase.odc.service.state.model.StatefulRoute;
@RequestMapping("/api/v2/export")
@RestController
@@ -34,8 +39,27 @@ public class ExportController {
@Autowired
private ScheduleExportService scheduleExportService;
- @RequestMapping(value = "/exportScheduleTask", method = RequestMethod.POST)
- public SuccessResponse exportScheduleTask(@RequestBody ScheduleTaskExportRequest request) {
- return Responses.success(scheduleExportService.export(request));
+ @RequestMapping(value = "getExportListView", method = RequestMethod.POST)
+ public SuccessResponse> getExportListView(
+ @RequestBody ScheduleTaskExportRequest request) {
+ return Responses.success(scheduleExportService.getExportListView(request));
+ }
+
+ @RequestMapping(value = "/exportSchedule", method = RequestMethod.POST)
+ public SuccessResponse exportScheduleTask2(@RequestBody ScheduleTaskExportRequest request) {
+ return Responses.success(scheduleExportService.startExport(request));
+ }
+
+ @RequestMapping(value = "/getExportResult", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId")
+ public SuccessResponse exportScheduleTask(String exportId) {
+ return Responses.success(scheduleExportService.getExportResult(exportId));
+ }
+
+ @RequestMapping(value = "/getExportLog", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId")
+ public SuccessResponse getExportLog(String exportId) {
+ return Responses.success(scheduleExportService.getExportLog(exportId));
}
+
}
diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java
index 2e1fd46293..abe2d688fc 100644
--- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java
+++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java
@@ -16,6 +16,7 @@
package com.oceanbase.odc.server.web.controller.v2;
import java.io.IOException;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -49,6 +50,7 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.flow.FlowInstanceService;
import com.oceanbase.odc.service.flow.FlowTaskInstanceService;
+import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult;
import com.oceanbase.odc.service.flow.model.BinaryDataResult;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.FlowInstanceApprovalReq;
@@ -61,6 +63,8 @@
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.session.model.SqlExecuteResult;
+import com.oceanbase.odc.service.state.model.StateName;
+import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;
import io.swagger.annotations.ApiOperation;
@@ -251,4 +255,24 @@ public SuccessResponse getPartitionPlan(@PathVariable Long
return Responses.ok(this.partitionPlanScheduleService.getPartitionPlanByFlowInstanceId(id));
}
+ @ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程")
+ @RequestMapping(value = "/asyncCancel", method = RequestMethod.POST)
+ public SuccessResponse batchCancelFlowInstance(@RequestBody Collection flowInstanceIds) {
+ return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds));
+ }
+
+ @ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果")
+ @RequestMapping(value = "/asyncCancelResult", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
+ public SuccessResponse> getBatchCancelResult(String terminateId) {
+ return Responses.single(flowInstanceService.getBatchCancelResult(terminateId));
+ }
+
+ @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志")
+ @RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
+ public SuccessResponse getBatchCancelLog(String terminateId) {
+ return Responses.single(flowInstanceService.getBatchCancelLog(terminateId));
+ }
+
}
diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java
index ef2b76f005..02ed9279be 100644
--- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java
+++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java
@@ -41,6 +41,8 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.schedule.ScheduleService;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult;
import com.oceanbase.odc.service.schedule.model.ChangeScheduleResp;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.OperationType;
@@ -59,6 +61,8 @@
import com.oceanbase.odc.service.schedule.model.ScheduleTaskOverview;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq;
+import com.oceanbase.odc.service.state.model.StateName;
+import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;
@@ -286,6 +290,23 @@ public SuccessResponse updateLimiterConfig(@PathVariable
return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig));
}
+ @RequestMapping(value = "/schedules/asyncTerminate", method = RequestMethod.POST)
+ public SuccessResponse startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) {
+ return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd));
+ }
+
+ @RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
+ public SuccessResponse> getTerminateScheduleResult(String terminateId) {
+ return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId));
+ }
+
+ @RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET)
+ @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
+ public SuccessResponse getTerminateScheduleLog(String terminateId) {
+ return Responses.ok(scheduleService.getTerminateLog(terminateId));
+ }
+
@RequestMapping(value = "/schedules/stats", method = RequestMethod.GET)
public ListResponse getScheduleStats(
@RequestParam(required = false, name = "types") Set types,
diff --git a/server/odc-server/src/main/resources/log4j2.xml b/server/odc-server/src/main/resources/log4j2.xml
index cc19b190f0..b5bb5c0c3f 100644
--- a/server/odc-server/src/main/resources/log4j2.xml
+++ b/server/odc-server/src/main/resources/log4j2.xml
@@ -918,6 +918,10 @@
+
+
+
+
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java
index 048c3ff01b..0f7f6e599a 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java
@@ -311,8 +311,8 @@ public ThreadPoolTaskExecutor queryProfileMonitorExecutor() {
return executor;
}
- @Bean(name = "scheduleImportExecutor")
- public ThreadPoolTaskExecutor scheduleImportExecutor() {
+ @Bean(name = "commonAsyncTaskExecutor")
+ public ThreadPoolTaskExecutor commonAsyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int minPoolSize = Math.max(SystemUtils.availableProcessors(), 4);
executor.setCorePoolSize(minPoolSize);
@@ -324,7 +324,7 @@ public ThreadPoolTaskExecutor scheduleImportExecutor() {
executor.setTaskDecorator(new TraceDecorator<>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
- log.info("scheduleImportExecutor initialized");
+ log.info("commonAsyncTaskExecutor initialized");
return executor;
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java
index b89e29400f..0f580227bd 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java
@@ -85,6 +85,7 @@
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
import com.oceanbase.odc.service.iam.ResourceRoleService;
import com.oceanbase.odc.service.iam.UserOrganizationService;
+import com.oceanbase.odc.service.iam.UserService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.auth.AuthorizationFacade;
import com.oceanbase.odc.service.iam.model.User;
@@ -164,7 +165,8 @@ public class ProjectService {
@Autowired
@Lazy
private FlowInstanceService flowInstanceService;
-
+ @Autowired
+ private UserService userService;
@Value("${odc.integration.bastion.enabled:false}")
private boolean bastionEnabled;
@@ -340,7 +342,11 @@ public Page list(@Valid QueryProjectParams params, @NotNull Pageable pa
@SkipAuthorize("odc internal usage")
public List listByIds(@NotEmpty Set ids) {
- return repository.findAllById(ids).stream().map(this::entityToModel).collect(Collectors.toList());
+ List projects =
+ repository.findAllById(ids).stream().map(projectMapper::entityToModel).collect(Collectors.toList());
+ userService.assignInnerUserByCreatorId(projects, c -> c.getCreator().getId(), Project::setCreator);
+ userService.assignInnerUserByCreatorId(projects, c -> c.getLastModifier().getId(), Project::setCreator);
+ return projects;
}
private Page innerList(@Valid QueryProjectParams params, @NotNull Pageable pageable,
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java
index 91503c0147..24eab9b127 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java
@@ -32,6 +32,7 @@ public class FutureCache {
private final Cache> tempId2Future =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)
+ .maximumSize(1000L)
.removalListener((String key, Future> future, RemovalCause cause) -> {
if (future != null) {
future.cancel(true);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java
index 4704f56ed0..36591aba14 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java
@@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -1299,4 +1300,18 @@ private void deleteDatabaseIfInstanceNotExists(Long connectionId, OrganizationTy
}
}
+
+ public void assignDatabaseById(List content, Function databaseIdProvider,
+ BiConsumer databaseSetter) {
+ List databaseIds = content.stream().map(databaseIdProvider).collect(
+ Collectors.toList());
+ List entities = databaseRepository.findByIdIn(databaseIds);
+ List databases = entitiesToModels(new PageImpl<>(entities), false).getContent();
+ Map idDatabaseEntityMap = databases.stream().collect(
+ Collectors.toMap(Database::getId, t -> t));
+ content.forEach(c -> {
+ Database database = idDatabaseEntityMap.get(databaseIdProvider.apply(c));
+ databaseSetter.accept(c, database);
+ });
+ }
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
index 741d050819..63f8c2c39b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
@@ -154,6 +154,7 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException
ps.setString(9, taskGenerator.getPartitionSavePoint());
ps.setString(10, JsonUtils.toJson(taskGenerator.getPartName2MinKey()));
ps.setString(11, JsonUtils.toJson(taskGenerator.getPartName2MaxKey()));
+ ps.execute();
}
}
}
@@ -226,6 +227,7 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
ps.setString(7,
taskMeta.getCursorPrimaryKey() == null ? "" : taskMeta.getCursorPrimaryKey().toSqlString());
ps.setString(8, taskMeta.getPartitionName());
+ ps.execute();
}
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
index f1211a4194..b47ac9994a 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
@@ -130,6 +130,11 @@ public List findByScheduleTaskId(Long scheduleTaskId) {
@SkipAuthorize("odc internal usage")
public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
List dlmTableUnits = findByScheduleTaskId(scheduleTaskId);
+ return getFinalTaskStatus(dlmTableUnits);
+ }
+
+ @SkipAuthorize("odc internal usage")
+ public TaskStatus getFinalTaskStatus(List dlmTableUnits) {
Set collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
// If the tables do not exist or any table fails, the task is considered a failure.
@@ -148,4 +153,5 @@ public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
return TaskStatus.DONE;
}
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java
index 8ddfaa5b0d..2213a57b69 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java
@@ -143,7 +143,9 @@ public ExportRowDataReader getRowDataReader() throws IOException {
ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
File configJson = getConfigJson(this.tempFilePath);
Verify.notNull(configJson, "Invalid archived file");
- try (InputStream inputStream = Files.newInputStream(configJson.toPath())) {
+ InputStream inputStream = null;
+ try {
+ inputStream = Files.newInputStream(configJson.toPath());
JsonParser jsonParser = jsonFactory.createParser(inputStream);
JsonToken jsonToken = jsonParser.nextToken();
@@ -177,6 +179,12 @@ public ExportRowDataReader getRowDataReader() throws IOException {
throw new IllegalStateException("Expected data to be an Array");
}
return new JsonRowDataReader(properties, jsonParser, objectMapper, exportedFile.getSecret(), tempFilePath);
+ } catch (Exception e) {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ log.error("Get row data reader failed", e);
+ throw e;
}
}
@@ -231,12 +239,19 @@ public ExportProperties getProperties() {
@Override
public R readRow(Class rowDataClass) throws IOException {
- JsonNode jsonNode = readRow();
- R rowData = objectMapper.convertValue(jsonNode, rowDataClass);
- if (rowData != null && encryptKey != null) {
- rowData.decrypt(encryptKey);
+ try {
+ JsonNode jsonNode = readRow();
+ R rowData = objectMapper.convertValue(jsonNode, rowDataClass);
+ log.info("read rowData={}", rowData);
+ if (rowData != null && encryptKey != null) {
+ rowData.decrypt(encryptKey);
+ }
+ return rowData;
+ } catch (Exception e) {
+ log.error("read row failed", e);
+ throw e;
}
- return rowData;
+
}
@Override
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
index 220ad353cf..4fdcdd9de3 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
@@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -48,6 +49,7 @@
import org.flowable.engine.history.HistoricProcessInstanceQuery;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Page;
@@ -59,6 +61,7 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.validation.annotation.Validated;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -67,6 +70,7 @@
import com.oceanbase.odc.common.i18n.I18n;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.lang.Holder;
+import com.oceanbase.odc.common.task.RouteLogCallable;
import com.oceanbase.odc.common.util.ObjectUtil;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
@@ -103,6 +107,7 @@
import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig;
import com.oceanbase.odc.service.collaboration.environment.EnvironmentService;
import com.oceanbase.odc.service.collaboration.project.ProjectService;
+import com.oceanbase.odc.service.common.FutureCache;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.common.util.SpringContextUtil;
import com.oceanbase.odc.service.common.util.SqlUtils;
@@ -131,6 +136,7 @@
import com.oceanbase.odc.service.flow.instance.FlowInstanceConfigurer;
import com.oceanbase.odc.service.flow.instance.FlowTaskInstance;
import com.oceanbase.odc.service.flow.listener.AutoApproveUserTaskListener;
+import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig;
import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp;
@@ -158,6 +164,7 @@
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.User;
import com.oceanbase.odc.service.iam.model.UserResourceRole;
+import com.oceanbase.odc.service.iam.util.SecurityContextUtils;
import com.oceanbase.odc.service.integration.IntegrationService;
import com.oceanbase.odc.service.integration.client.ApprovalClient;
import com.oceanbase.odc.service.integration.model.ApprovalProperties;
@@ -186,9 +193,12 @@
import com.oceanbase.odc.service.regulation.risklevel.model.RiskLevelDescriberIdentifier;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
+import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator;
import com.oceanbase.odc.service.task.TaskService;
import com.oceanbase.odc.service.task.base.precheck.PreCheckRiskLevel;
+import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.ExecutorInfo;
+import com.oceanbase.odc.service.task.service.SpringTransactionManager;
import com.oceanbase.tools.loaddump.common.enums.ObjectType;
import io.micrometer.core.instrument.Tag;
@@ -277,6 +287,12 @@ public class FlowInstanceService {
@Autowired
private EnvironmentService environmentService;
@Autowired
+ private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator;
+ @Autowired
+ private ThreadPoolTaskExecutor commonAsyncTaskExecutor;
+ @Autowired
+ private FutureCache futureCache;
+ @Autowired
private FlowPermissionHelper flowPermissionHelper;
@Autowired
private MeterManager meterManager;
@@ -288,7 +304,10 @@ public class FlowInstanceService {
private ProjectService projectService;
@Autowired
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
-
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+ @Value("${odc.log.directory:./log}")
+ private String logPath;
private static final long MAX_EXPORT_OBJECT_COUNT = 10000;
private static final String ODC_SITE_URL = "odc.site.url";
private static final int MAX_APPLY_DATABASE_SIZE = 10;
@@ -725,6 +744,53 @@ public FlowInstanceDetailResp cancelWithoutPermission(@NotNull Long id) {
return cancel(flowInstance, true);
}
+ public String startBatchCancelFlowInstance(Collection flowInstanceIds) {
+ String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("BatchFlowTerminate");
+ User user = authenticationFacade.currentUser();
+ Future> future = commonAsyncTaskExecutor.submit(
+ new RouteLogCallable>("BatchFlowTerminate", terminateId, "terminate") {
+ @Override
+ public List doCall() {
+ SecurityContextUtils.setCurrentUser(user);
+ List results = new ArrayList<>();
+ for (Long id : flowInstanceIds) {
+ try {
+ new SpringTransactionManager(transactionTemplate)
+ .doInTransactionWithoutResult(() -> cancelWithWritePermission(id, false));
+ results.add(BatchTerminateFlowResult.success(id));
+ log.info("Terminate flow success, flowInstanceId={}", id);
+ } catch (Exception e) {
+ log.info("Terminate flow failed, flowInstanceId={}", id, e);
+ results.add(BatchTerminateFlowResult.failed(id, e.getMessage()));
+ }
+ }
+ return results;
+ }
+ });
+ futureCache.put(terminateId, future);
+ return terminateId;
+ }
+
+ public List getBatchCancelResult(String terminateId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
+ Future> future =
+ (Future>) futureCache.get(terminateId);
+ if (!future.isDone()) {
+ return Collections.emptyList();
+ }
+ try {
+ futureCache.invalid(terminateId);
+ return future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getBatchCancelLog(String terminateId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
+ return LogUtils.getRouteTaskLog(logPath, "BatchFlowTerminate", terminateId, "terminate");
+ }
+
public Map getStatus(Set ids) {
Specification specification = Specification.where(FlowInstanceSpecs.idIn(ids))
.and(FlowInstanceSpecs.organizationIdEquals(authenticationFacade.currentOrganizationId()));
@@ -927,6 +993,23 @@ public TaskEntity getTaskByFlowInstanceId(Long id) {
return taskService.detail(taskId);
}
+ public Map getTaskByFlowInstanceIds(Collection flowInstanceIds) {
+ List serviceTaskInstanceEntities = serviceTaskRepository
+ .findByFlowInstanceIdIn(flowInstanceIds)
+ .stream()
+ .filter(e -> e.getTaskType() != TaskType.GENERATE_ROLLBACK && e.getTaskType() != TaskType.SQL_CHECK
+ && e.getTaskType() != TaskType.PRE_CHECK)
+ .collect(Collectors.toList());
+ Map flowId2taskIds = serviceTaskInstanceEntities.stream().collect(
+ Collectors.toMap(ServiceTaskInstanceEntity::getFlowInstanceId,
+ ServiceTaskInstanceEntity::getTargetTaskId, (exist, duplicate) -> exist));
+ List taskEntities = taskService.findByIds(flowId2taskIds.values());
+ Map idTaskEntityMap =
+ taskEntities.stream().collect(Collectors.toMap(TaskEntity::getId, t -> t));
+ return flowId2taskIds.entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, v -> idTaskEntityMap.get(v.getValue())));
+ }
+
private void checkCreateFlowInstancePermission(CreateFlowInstanceReq req) {
if (authenticationFacade.currentUser().getOrganizationType() == OrganizationType.INDIVIDUAL) {
return;
@@ -1524,7 +1607,7 @@ public Set listAlterScheduleIdsByScheduleIdAndStatus(Long scheduleId, Flow
/**
* This is a temporary method that only uses ODC 4.3.4
- *
+ *
* @param params
* @return
*/
@@ -1582,7 +1665,7 @@ public int getPartitionPlanCount(@NotNull InnerQueryFlowInstanceParams params) {
/**
* This is a temporary method that only uses ODC 4.3.4
- *
+ *
* @param params
* @return
*/
@@ -1613,7 +1696,7 @@ private List listPartitionPlanSubTaskStates(@NonNull InnerQue
/**
* This is a temporary method that only uses ODC 4.3.4
- *
+ *
* @param params
* @return
*/
@@ -1646,7 +1729,7 @@ private List listSqlPlanSubTaskStates(@NonNull InnerQueryFlow
/**
* This is a temporary method that only uses ODC 4.3.4
- *
+ *
* @param params
* @return
*/
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
index 617e28a50e..daa6d1d2a2 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java
@@ -34,6 +34,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.collections4.CollectionUtils;
@@ -783,7 +784,10 @@ private void setDownloadUrlsIfNecessary(Long taskId, List extends FlowTaskResu
return;
}
for (FlowTaskResult result : results) {
- TaskDownloadUrls urls = databaseChangeOssUrlCache.get(taskId);
+ TaskDownloadUrls urls = getTaskDownloadUrls(taskId);
+ if (urls == null) {
+ return;
+ }
if (result instanceof AbstractFlowTaskResult) {
((AbstractFlowTaskResult) result)
.setFullLogDownloadUrl(urls.getLogDownloadUrl());
@@ -798,4 +802,14 @@ private void setDownloadUrlsIfNecessary(Long taskId, List extends FlowTaskResu
}
}
+ @Nullable
+ private TaskDownloadUrls getTaskDownloadUrls(Long taskId) {
+ try {
+ return databaseChangeOssUrlCache.get(taskId);
+ } catch (NotFoundException e) {
+ log.warn("Failed to retrieve task download urls from cloud object storage, taskId={}", taskId, e);
+ return null;
+ }
+ }
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java
index 0fbcea2be3..f82367405d 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java
@@ -40,6 +40,7 @@
import com.google.common.collect.Sets;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.StringUtils;
+import com.oceanbase.odc.core.shared.constant.ResourceRoleName;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.metadb.connection.ConnectionConfigRepository;
import com.oceanbase.odc.metadb.connection.ConnectionEntity;
@@ -75,9 +76,7 @@
import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase;
import com.oceanbase.odc.service.flow.ApprovalPermissionService;
import com.oceanbase.odc.service.flow.instance.FlowInstance;
-import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp;
import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp.FlowInstanceMapper;
-import com.oceanbase.odc.service.flow.model.FlowNodeInstanceDetailResp;
import com.oceanbase.odc.service.flow.model.FlowNodeInstanceDetailResp.FlowNodeInstanceMapper;
import com.oceanbase.odc.service.flow.model.FlowNodeStatus;
import com.oceanbase.odc.service.flow.model.FlowTaskExecutionStrategy;
@@ -113,6 +112,8 @@
@Component
public class FlowResponseMapperFactory {
+ private final ConnectionMapper connectionMapper = ConnectionMapper.INSTANCE;
+ private final RiskLevelMapper riskLevelMapper = RiskLevelMapper.INSTANCE;
@Autowired
private UserTaskInstanceRepository userTaskInstanceRepository;
@Autowired
@@ -148,10 +149,6 @@ public class FlowResponseMapperFactory {
@Autowired
private ProjectService projectService;
- private final ConnectionMapper connectionMapper = ConnectionMapper.INSTANCE;
-
- private final RiskLevelMapper riskLevelMapper = RiskLevelMapper.INSTANCE;
-
public FlowNodeInstanceMapper generateNodeMapperByInstances(@NonNull Collection flowInstances,
boolean skipAuth) {
return generateNodeMapper(getLongSet(flowInstances, FlowInstance::getId),
@@ -254,10 +251,7 @@ private FlowNodeInstanceMapper generateNodeMapper(@NonNull Collection flow
Specification serviceSpec =
Specification.where(ServiceTaskInstanceSpecs.flowInstanceIdIn(flowInstanceIds));
List serviceEntities = serviceTaskRepository.findAll(serviceSpec);
- Set taskIds = serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null)
- .map(ServiceTaskInstanceEntity::getTargetTaskId).collect(Collectors.toSet());
- Map taskId2TaskEntity = listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream()
- .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity));
+ Map taskId2TaskEntity = getTaskEntityMap(serviceEntities);
return FlowNodeInstanceMapper.builder()
.getCandidatesByApprovalId(approvalId2Candidates::get)
@@ -287,50 +281,24 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance
if (flowInstanceIds.isEmpty()) {
return FlowInstanceMapper.builder().build();
}
- Specification serviceSpec =
- Specification.where(ServiceTaskInstanceSpecs.flowInstanceIdIn(flowInstanceIds));
- List serviceEntities = serviceTaskRepository.findAll(serviceSpec);
-
- Map> flowInstanceId2ExecutionTime = serviceEntities.stream()
- .filter(entity -> entity.getExecutionTime() != null)
- .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId,
- Collectors.mapping(ServiceTaskInstanceEntity::getExecutionTime, Collectors.toList())));
-
- Map> flowInstanceId2ExecutionStrategy = serviceEntities.stream()
- .filter(e -> e.getTaskType().needForExecutionStrategy())
- .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId,
- Collectors.mapping(ServiceTaskInstanceEntity::getStrategy, Collectors.toList())));
-
- Map parentInstanceIdMap = flowInstanceRepository
- .findByParentInstanceIdIn(flowInstanceIds)
- .stream().collect(
- Collectors.toMap(ParentInstanceIdCount::getParentInstanceId, ParentInstanceIdCount::getCount));
-
- Map flowInstanceId2Rollbackable = flowInstanceIds.stream().collect(Collectors
- .toMap(Function.identity(), id -> MoreObjects.firstNonNull(parentInstanceIdMap.get(id), 0) == 0));
-
+ List serviceEntities =
+ serviceTaskRepository.findByFlowInstanceIdIn(new HashSet<>(flowInstanceIds));
+ Map> flowInstanceId2ExecutionTime = mapExecutionTimes(serviceEntities);
+ Map> flowInstanceId2ExecutionStrategy =
+ mapExecutionStrategies(serviceEntities);
+ Map flowInstanceId2Rollbackable = mapRollbackableStatus(flowInstanceIds);
/**
+ *
* In order to improve the interface efficiency, it is necessary to find out the task entity
* corresponding to the process instance at one time
*/
Map> flowInstanceId2Tasks = new HashMap<>();
- Set taskIds = serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null)
- .map(ServiceTaskInstanceEntity::getTargetTaskId).collect(Collectors.toSet());
- Map taskId2TaskEntity = listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream()
- .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity));
- serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null).forEach(entity -> {
- Set taskEntities =
- flowInstanceId2Tasks.computeIfAbsent(entity.getFlowInstanceId(), id -> new HashSet<>());
- TaskEntity taskEntity = taskId2TaskEntity.get(entity.getTargetTaskId());
- if (taskEntity != null) {
- taskEntities.add(taskEntity);
- }
- });
+ Map taskId2TaskEntity = getTaskEntityMap(serviceEntities);
+ populateTaskMappings(serviceEntities, flowInstanceId2Tasks, taskId2TaskEntity);
/**
* Get Database associated with each TaskEntity
*/
- Map id2Database = new HashMap<>();
Set databaseIds = taskId2TaskEntity.values().stream()
.map(TaskEntity::getDatabaseId)
.filter(Objects::nonNull).collect(Collectors.toSet());
@@ -340,59 +308,23 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance
databaseIds.addAll(collectApplyDatabasePermissionDatabaseIds(taskId2TaskEntity));
databaseIds.addAll(collectApplyTablePermissionDatabaseIds(taskId2TaskEntity));
Set projectIds = new HashSet<>();
- Map id2Project = new HashMap<>();
-
- if (CollectionUtils.isNotEmpty(databaseIds)) {
- id2Database = databaseService.listDatabasesByIds(databaseIds).stream()
- .collect(Collectors.toMap(Database::getId, database -> database));
- projectIds.addAll(id2Database.values().stream().map(db -> db.getProject().getId())
- .filter(Objects::nonNull).collect(Collectors.toSet()));
- }
- projectIds.addAll(collectApplyProjectIds(taskId2TaskEntity));
- if (CollectionUtils.isNotEmpty(projectIds)) {
- id2Project = projectService.listByIds(projectIds).stream()
- .collect(Collectors.toMap(Project::getId, project -> project, (a, b) -> a));
- }
- /**
- * find the ConnectionConfig associated with each Database
- */
- Set connectionIds = id2Database.values().stream()
- .filter(e -> e.getDataSource() != null && e.getDataSource().getId() != null)
- .map(e -> e.getDataSource().getId()).collect(Collectors.toSet());
- Map id2Connection = listConnectionsByConnectionIdsWithoutPermissionCheck(connectionIds)
- .stream().collect(Collectors.toMap(ConnectionEntity::getId, connectionMapper::entityToModel));
- id2Database.values().forEach(database -> {
- if (id2Connection.containsKey(database.getDataSource().getId())) {
- database.setDataSource(id2Connection.get(database.getDataSource().getId()));
- }
- });
-
+ Map id2Database = getIdDatabaseMapAndFillProjectIds(
+ databaseIds, projectIds, taskId2TaskEntity);
+ Map id2Project = getIdProjectMap(projectIds, skipAuth);
/**
* list candidates
*/
Map> candidatesByFlowInstanceIds =
approvalPermissionService.getCandidatesByFlowInstanceIds(flowInstanceIds);
-
/**
* In order to improve the interface efficiency, it is necessary to find out the user entity
* corresponding to the process instance at one time
*/
- Set userIds = flowInstanceId2Tasks.values().stream()
- .flatMap(Collection::stream)
- .filter(entity -> entity.getCreatorId() != null)
- .map(TaskEntity::getCreatorId).collect(Collectors.toSet());
- userIds.addAll(creatorIds);
- Map userId2User = listUsersByUserIds(userIds)
- .stream().collect(Collectors.toMap(UserEntity::getId, entity -> entity));
-
+ Map userId2User = getUserId2EntityMap(creatorIds,
+ flowInstanceId2Tasks);
Map> userId2Roles = getUserId2Roles(userId2User.keySet(), skipAuth);
+ Set approvableFlowInstanceIds = getApprovableFlowInstanceIds(skipAuth);
- Set approvableFlowInstanceIds = skipAuth ? Sets.newHashSet()
- : approvalPermissionService.getApprovableApprovalInstances()
- .stream()
- .filter(entity -> FlowNodeStatus.EXECUTING == entity.getStatus()
- || entity.getStatus() == FlowNodeStatus.WAIT_FOR_CONFIRM)
- .map(UserTaskInstanceEntity::getFlowInstanceId).collect(Collectors.toSet());
return FlowInstanceMapper.builder()
.ifRollbackable(flowInstanceId2Rollbackable::get)
.ifApprovable(approvableFlowInstanceIds::contains)
@@ -409,6 +341,124 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance
.build();
}
+ private void populateTaskMappings(List serviceEntities,
+ Map> flowInstanceId2Tasks, Map taskId2TaskEntity) {
+ serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null).forEach(entity -> {
+ Set taskEntities =
+ flowInstanceId2Tasks.computeIfAbsent(entity.getFlowInstanceId(), id -> new HashSet<>());
+ TaskEntity taskEntity = taskId2TaskEntity.get(entity.getTargetTaskId());
+ if (taskEntity != null) {
+ taskEntities.add(taskEntity);
+ }
+ });
+ }
+
+ private Map> mapExecutionStrategies(
+ List serviceEntities) {
+ return serviceEntities.stream()
+ .filter(e -> e.getTaskType().needForExecutionStrategy())
+ .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId,
+ Collectors.mapping(ServiceTaskInstanceEntity::getStrategy, Collectors.toList())));
+ }
+
+ private Map> mapExecutionTimes(List serviceEntities) {
+ return serviceEntities.stream()
+ .filter(entity -> entity.getExecutionTime() != null)
+ .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId,
+ Collectors.mapping(ServiceTaskInstanceEntity::getExecutionTime, Collectors.toList())));
+ }
+
+ private Map getIdProjectMap(Set projectIds, boolean skipAuth) {
+ Map id2Project = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(projectIds)) {
+ List projects = projectService.listByIds(projectIds);
+ if (!skipAuth) {
+ Map> projectId2ResourceRoleNames =
+ projectService.getProjectId2ResourceRoleNames();
+ projects.forEach(p -> p.setCurrentUserResourceRoles(
+ projectId2ResourceRoleNames.getOrDefault(p.getId(),
+ Collections.emptySet())));
+ id2Project = projects.stream()
+ .collect(Collectors.toMap(Project::getId, project -> project, (a, b) -> a));
+ }
+ }
+ return id2Project;
+ }
+
+ private Map getIdDatabaseMapAndFillProjectIds(Set databaseIds, Set projectIds,
+ Map taskId2TaskEntity) {
+ Map id2Database = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(databaseIds)) {
+ id2Database = databaseService.listDatabasesByIds(databaseIds).stream()
+ .collect(Collectors.toMap(Database::getId, database -> database));
+ populateDatasourceToDatabase(id2Database);
+ projectIds.addAll(id2Database.values().stream().map(db -> db.getProject().getId())
+ .filter(Objects::nonNull).collect(Collectors.toSet()));
+ }
+ projectIds.addAll(collectApplyProjectIds(taskId2TaskEntity));
+ return id2Database;
+ }
+
+ private Set getApprovableFlowInstanceIds(boolean skipAuth) {
+ return skipAuth ? Sets.newHashSet()
+ : approvalPermissionService.getApprovableApprovalInstances()
+ .stream()
+ .filter(entity -> FlowNodeStatus.EXECUTING == entity.getStatus()
+ || entity.getStatus() == FlowNodeStatus.WAIT_FOR_CONFIRM)
+ .map(UserTaskInstanceEntity::getFlowInstanceId).collect(Collectors.toSet());
+ }
+
+ private Map getUserId2EntityMap(Set creatorIds,
+ Map> flowInstanceId2Tasks) {
+ Set userIds = flowInstanceId2Tasks.values().stream()
+ .flatMap(Collection::stream)
+ .map(TaskEntity::getCreatorId)
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+ userIds.addAll(creatorIds);
+ return listUsersByUserIds(userIds)
+ .stream().collect(Collectors.toMap(UserEntity::getId, entity -> entity));
+ }
+
+ private void populateDatasourceToDatabase(Map id2Database) {
+ Set connectionIds = id2Database.values().stream()
+ .filter(e -> e.getDataSource() != null && e.getDataSource().getId() != null)
+ .map(e -> e.getDataSource().getId()).collect(Collectors.toSet());
+ Map id2Connection = listConnectionsByConnectionIdsWithoutPermissionCheck(connectionIds)
+ .stream().collect(Collectors.toMap(ConnectionEntity::getId, connectionMapper::entityToModel));
+ id2Database.values().forEach(database -> {
+ if (id2Connection.containsKey(database.getDataSource().getId())) {
+ database.setDataSource(id2Connection.get(database.getDataSource().getId()));
+ }
+ });
+ }
+
+ private Set getDatabaseIds(Map taskId2TaskEntity) {
+ Set databaseIds = taskId2TaskEntity.values().stream()
+ .map(TaskEntity::getDatabaseId)
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+
+ databaseIds.addAll(collectMultiDatabaseChangeDatabaseIds(taskId2TaskEntity));
+ databaseIds.addAll(collectDBStructureComparisonDatabaseIds(taskId2TaskEntity));
+ return databaseIds;
+ }
+
+ private Map getTaskEntityMap(List serviceEntities) {
+ Set taskIds = serviceEntities.stream().map(ServiceTaskInstanceEntity::getTargetTaskId)
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+ return listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream()
+ .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity));
+ }
+
+ private Map mapRollbackableStatus(Collection flowInstanceIds) {
+ Map parentInstanceIdMap = flowInstanceRepository
+ .findByParentInstanceIdIn(flowInstanceIds)
+ .stream().collect(
+ Collectors.toMap(ParentInstanceIdCount::getParentInstanceId, ParentInstanceIdCount::getCount));
+
+ return flowInstanceIds.stream().collect(Collectors
+ .toMap(Function.identity(), id -> MoreObjects.firstNonNull(parentInstanceIdMap.get(id), 0) == 0));
+ }
+
public Map> getUserId2Roles(@NonNull Collection userIds, boolean skipAuth) {
List userRoleEntities = skipAuth ? userRoleRepository.findByRoleIdIn(userIds)
: userRoleRepository.findByOrganizationIdAndUserIdIn(authenticationFacade.currentOrganizationId(),
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java
new file mode 100644
index 0000000000..da107cd5e3
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.flow.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class BatchTerminateFlowResult {
+ private Boolean terminateSucceed;
+ private Long flowInstanceId;
+ private String failReason;
+
+ public static BatchTerminateFlowResult success(Long id) {
+ return new BatchTerminateFlowResult(true, id, null);
+ }
+
+ public static BatchTerminateFlowResult failed(Long id, String failReason) {
+ return new BatchTerminateFlowResult(false, id, failReason);
+
+ }
+
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java
index 8aa6a87d5c..d9373d24f4 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java
@@ -98,6 +98,7 @@
import com.oceanbase.odc.metadb.iam.UserRoleRepository;
import com.oceanbase.odc.metadb.iam.UserSpecs;
import com.oceanbase.odc.service.automation.model.TriggerEvent;
+import com.oceanbase.odc.service.common.model.InnerUser;
import com.oceanbase.odc.service.common.response.CustomPage;
import com.oceanbase.odc.service.common.response.PaginatedData;
import com.oceanbase.odc.service.common.util.SpringContextUtil;
@@ -1082,6 +1083,20 @@ public void assignCreatorNameByCreatorId(Collection elements, Function void assignInnerUserByCreatorId(List content, Function userIdProvider,
+ BiConsumer innerUserSetter) {
+ List userIds = content.stream().map(userIdProvider).collect(
+ Collectors.toList());
+ List entities = userRepository.findByIdIn(userIds);
+ Map idUserEntityMap = entities.stream().collect(
+ Collectors.toMap(UserEntity::getId, t -> t));
+ content.forEach(c -> {
+ UserEntity userEntity = idUserEntityMap.get(userIdProvider.apply(c));
+ innerUserSetter.accept(c, new InnerUser(userEntity, null));
+ });
+ }
+
@SkipAuthorize("odc internal usage")
public List listByBoundOrganizationId(@NotNull Long organizationId) {
return userRepository.findByBoundOrganization(organizationId).stream().map(User::new)
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java
index 3bfec747fc..216db110e5 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java
@@ -171,11 +171,12 @@ protected boolean checkStepFinished(OmsStepName name) {
switch (name) {
case INCR_TRANSFER:
Long chkInTimestamp = progressResponse.getIncrSyncCheckpoint();
- return status == OmsStepStatus.MONITORING && competedFunc.apply(progress)
- // why set check value to 25 seconds. cause oms collect checkpoint every 10 seconds and oms writer
- // save checkpoint
- // every 10 seconds
- // max gap time is 20, we set to 25 to let it pass.
+ return status == OmsStepStatus.MONITORING
+ // why set check value to 25 seconds. cause oms collect checkpoint every 10 seconds and oms
+ // writer
+ // save checkpoint
+ // every 10 seconds
+ // max gap time is 20, we set to 25 to let it pass.
&& (null != chkInTimestamp && (chkInTimestamp > currentSeconds
|| Math.abs(currentSeconds - chkInTimestamp) <= 25));
case FULL_VERIFIER:
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java
index 40c0f999d5..4fa93543ae 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java
@@ -28,7 +28,9 @@
import org.apache.commons.lang3.Validate;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.support.TransactionTemplate;
import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
@@ -44,6 +46,7 @@
import com.oceanbase.odc.metadb.partitionplan.PartitionPlanTablePartitionKeyRepository;
import com.oceanbase.odc.metadb.partitionplan.PartitionPlanTableRepository;
import com.oceanbase.odc.metadb.schedule.ScheduleEntity;
+import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.service.connection.database.DatabaseService;
import com.oceanbase.odc.service.connection.database.model.Database;
import com.oceanbase.odc.service.flow.FlowInstanceService;
@@ -55,6 +58,8 @@
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTableConfig;
import com.oceanbase.odc.service.quartz.model.MisfireStrategy;
import com.oceanbase.odc.service.schedule.ScheduleService;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.TriggerConfig;
@@ -87,9 +92,12 @@ public class PartitionPlanScheduleService {
@Autowired
private PartitionPlanTablePartitionKeyRepository partitionPlanTablePartitionKeyRepository;
@Autowired
+ @Lazy
private FlowInstanceService flowInstanceService;
@Autowired
private FlowInstanceRepository flowInstanceRepository;
+ @Autowired
+ private TransactionTemplate transactionTemplate;
public PartitionPlanConfig getPartitionPlanByFlowInstanceId(@NonNull Long flowInstanceId) {
FlowInstanceDetailResp resp = this.flowInstanceService.detail(flowInstanceId);
@@ -208,6 +216,29 @@ public void submit(@NonNull PartitionPlanConfig partitionPlanConfig)
}
}
+ public void processTerminatePartitionPlan(ScheduleTerminateCmd cmd, List results) {
+ Map flowInstanceId2TaskEntity = flowInstanceService.getTaskByFlowInstanceIds(cmd.getIds());
+ for (Map.Entry entry : flowInstanceId2TaskEntity.entrySet()) {
+ Long flowInstanceId = entry.getKey();
+ TaskEntity taskEntity = entry.getValue();
+ try {
+ transactionTemplate.executeWithoutResult((status) -> {
+ try {
+ disablePartitionPlan(taskEntity.getDatabaseId());
+ } catch (Exception e) {
+ status.setRollbackOnly();
+ throw new RuntimeException(e);
+ }
+ });
+ results.add(ScheduleTerminateResult.ofSuccess(cmd.getScheduleType(), flowInstanceId));
+ log.info("PartitionPlan task stop success, flowInstanceId={}", flowInstanceId);
+ } catch (Exception e) {
+ results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), flowInstanceId, e.getMessage()));
+ log.info("PartitionPlan task stop failed, flowInstanceId={}", flowInstanceId, e);
+ }
+ }
+ }
+
@Transactional(rollbackOn = Exception.class)
public void disablePartitionPlan(@NonNull Long databaseId) throws SchedulerException {
List ppEntities = this.partitionPlanRepository.findByDatabaseIdAndEnabled(
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java
index d37fb4ecef..f9b659b64d 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java
@@ -105,6 +105,15 @@ public void run(JobExecutionContext context) {
public OdcJob getOdcJob(JobExecutionContext context) {
JobKey key = context.getJobDetail().getKey();
ScheduleTaskType taskType = ScheduleTaskType.valueOf(key.getGroup());
+ if (taskType == ScheduleTaskType.DATA_ARCHIVE && context.getResult() != null) {
+ try {
+ ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
+ taskType = ScheduleTaskType.valueOf(taskEntity.getJobGroup());
+ log.info("Recovery task,scheduleTaskId={}", taskEntity.getId());
+ } catch (Exception e) {
+ log.warn("Load history task failed,jobKey={}", key);
+ }
+ }
switch (taskType) {
case SQL_PLAN:
return new SqlPlanJob();
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java
index 716967ee1b..81dd9abcfd 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java
@@ -32,6 +32,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
@@ -57,6 +58,7 @@
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
@@ -65,11 +67,13 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.oceanbase.odc.common.json.JsonUtils;
+import com.oceanbase.odc.common.task.RouteLogCallable;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.config.jpa.OdcJpaRepository;
import com.oceanbase.odc.core.alarm.AlarmUtils;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
import com.oceanbase.odc.core.shared.PreConditions;
+import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.core.shared.constant.ErrorCodes;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.core.shared.constant.OrganizationType;
@@ -93,6 +97,7 @@
import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository;
import com.oceanbase.odc.service.collaboration.project.ProjectService;
import com.oceanbase.odc.service.collaboration.project.model.Project;
+import com.oceanbase.odc.service.common.FutureCache;
import com.oceanbase.odc.service.common.util.SpringContextUtil;
import com.oceanbase.odc.service.connection.ConnectionService;
import com.oceanbase.odc.service.connection.database.DatabaseService;
@@ -114,11 +119,15 @@
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.Organization;
import com.oceanbase.odc.service.iam.model.User;
+import com.oceanbase.odc.service.iam.util.SecurityContextUtils;
import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade;
+import com.oceanbase.odc.service.partitionplan.PartitionPlanScheduleService;
import com.oceanbase.odc.service.quartz.QuartzJobServiceProxy;
import com.oceanbase.odc.service.quartz.model.MisfireStrategy;
import com.oceanbase.odc.service.quartz.util.QuartzCronExpressionUtils;
import com.oceanbase.odc.service.regulation.approval.ApprovalFlowConfigSelector;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult;
import com.oceanbase.odc.service.schedule.factory.ScheduleResponseMapperFactory;
import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleParameters;
import com.oceanbase.odc.service.schedule.flowtask.ApprovalFlowClient;
@@ -154,12 +163,16 @@
import com.oceanbase.odc.service.schedule.model.TriggerStrategy;
import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq;
import com.oceanbase.odc.service.schedule.processor.ScheduleChangePreprocessor;
+import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator;
import com.oceanbase.odc.service.schedule.util.ScheduleDescriptionGenerator;
import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters;
+import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator;
import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
import com.oceanbase.odc.service.task.exception.JobException;
+import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;
import com.oceanbase.odc.service.task.schedule.JobScheduler;
+import com.oceanbase.odc.service.task.service.SpringTransactionManager;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
@@ -177,11 +190,11 @@
@SkipAuthorize
public class ScheduleService {
+ private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE;
@Value("${odc.task.trigger.minimum-interval:600}")
private int minInterval;
@Autowired
private ScheduleRepository scheduleRepository;
-
@Autowired
private ScheduleTaskRepository scheduleTaskRepository;
@Autowired
@@ -189,75 +202,68 @@ public class ScheduleService {
@Autowired
@Qualifier("quartzJobServiceProxy")
private QuartzJobServiceProxy quartzJobService;
-
@Autowired
private ObjectStorageFacade objectStorageFacade;
-
@Autowired
private FlowInstanceRepository flowInstanceRepository;
-
+ @Autowired
+ private PartitionPlanScheduleService partitionPlanScheduleService;
@Autowired
private ScheduleTaskService scheduleTaskService;
-
@Autowired
private ScheduleResponseMapperFactory scheduleResponseMapperFactory;
-
@Autowired
@Lazy
private ProjectService projectService;
-
@Autowired
private ProjectPermissionValidator projectPermissionValidator;
-
@Autowired
private ApprovalFlowConfigSelector approvalFlowConfigSelector;
-
@Autowired
private DatabaseService databaseService;
-
@Autowired
private EnvironmentRepository environmentRepository;
-
@Autowired
private ScheduleChangeLogService scheduleChangeLogService;
-
@Autowired
private OrganizationService organizationService;
-
@Autowired
private ScheduleChangePreprocessor preprocessor;
-
@Autowired
private LatestTaskMappingRepository latestTaskMappingRepository;
-
@Autowired
private ConnectionService connectionService;
-
@Autowired
private DlmLimiterService dlmLimiterService;
-
@Autowired
private UserService userService;
-
@Autowired
private ScheduledTaskLoggerService scheduledTaskLoggerService;
-
@Autowired
private JdbcLockRegistry jdbcLockRegistry;
-
@Autowired
private ApprovalFlowClient approvalFlowService;
-
@Autowired
private ScheduleDescriptionGenerator descriptionGenerator;
+ @Autowired
+ private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator;
+ @Autowired
+ private ThreadPoolTaskExecutor commonAsyncTaskExecutor;
+ @Autowired
+ private FutureCache futureCache;
+
+ @Autowired
+ private BatchSchedulePermissionValidator batchSchedulePermissionValidator;
+
+ @Value("${odc.log.directory:./log}")
+ private String logPath;
+
@Autowired
private TransactionTemplate txTemplate;
@Autowired
@Lazy
private FlowInstanceService flowInstanceService;
- private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE;
-
@Transactional(rollbackFor = Exception.class)
public List dispatchCreateSchedule(CreateFlowInstanceReq createReq) {
AlterScheduleParameters parameters = (AlterScheduleParameters) createReq.getParameters();
@@ -721,6 +727,108 @@ public void stopTask(Long scheduleId, Long scheduleTaskId) {
}
}
+ public String startTerminateScheduleAndTask(ScheduleTerminateCmd cmd) {
+ batchSchedulePermissionValidator.checkScheduleIdsPermission(cmd.getScheduleType(), cmd.getIds());
+ User user = authenticationFacade.currentUser();
+ String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("ScheduleTerminate");
+ Future> future = commonAsyncTaskExecutor.submit(
+ new RouteLogCallable>("ScheduleTerminate", terminateId, "terminate") {
+ @Override
+ public List doCall() {
+ SecurityContextUtils.setCurrentUser(user);
+ return syncTerminateScheduleAndTask(cmd);
+ }
+ });
+ futureCache.put(terminateId, future);
+ return terminateId;
+ }
+
+ public List getTerminateScheduleResult(String terminateId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
+ Future> future = futureCache.get(terminateId);
+ if (!future.isDone()) {
+ return Collections.emptyList();
+ }
+ try {
+ futureCache.invalid(terminateId);
+ return (List) future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getTerminateLog(String terminateId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
+ return LogUtils.getRouteTaskLog(logPath, "ScheduleTerminate", terminateId, "terminate");
+ }
+
+ public List syncTerminateScheduleAndTask(ScheduleTerminateCmd cmd) {
+ log.info("Start to terminate schedule, type={}, scheduleIds={}", cmd.getScheduleType(), cmd.getIds());
+ List results = new ArrayList<>();
+ if (ScheduleType.PARTITION_PLAN.equals(cmd.getScheduleType())) {
+ // The partition plan uses Schedule, but it is created through flow, and the front end also displays
+ // it through flow.
+ // To ensure that the customer see the same id, the flowInstanceId is used
+ partitionPlanScheduleService.processTerminatePartitionPlan(cmd, results);
+ return results;
+ }
+ List scheduleEntities = scheduleRepository.findByIdIn(cmd.getIds());
+ Verify.verify(Objects.equals(scheduleEntities.size(), cmd.getIds().size()), "Invalid schedule Ids");
+ for (ScheduleEntity schedule : scheduleEntities) {
+ try {
+ Optional latestTaskEntity =
+ scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup(
+ String.valueOf(schedule.getId()),
+ schedule.getType().name());
+ if (!latestTaskEntity.isPresent() || !latestTaskEntity.get().getStatus().isProcessing()) {
+ innerTerminateInTx(schedule);
+ log.info("Schedule task stop success, scheduleId={}", schedule.getId());
+ results.add(ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId()));
+ continue;
+ }
+ ScheduleTaskEntity scheduleTaskEntity = latestTaskEntity.get();
+ scheduleTaskService.stop(scheduleTaskEntity.getId());
+ final int maxRetryTimes = 30;
+ int retryTimes = 0;
+ while (retryTimes < maxRetryTimes) {
+ latestTaskEntity = scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup(
+ String.valueOf(schedule.getId()),
+ schedule.getType().name());
+ if (latestTaskEntity.get().getStatus().isTerminated()) {
+ innerTerminateInTx(schedule);
+ results.add(
+ ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId()));
+ log.info("Schedule task stop success, scheduleId={}", schedule.getId());
+ break;
+ }
+ retryTimes++;
+ Thread.sleep(2000);
+ }
+ log.info(
+ "Wait task 60s, still not terminate, please try again. Schedule task stop Failed, scheduleId={}",
+ schedule.getId());
+ results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(),
+ "Wait task 60s, still not terminate, please try again."));
+ } catch (Exception e) {
+ log.error("Terminate schedule task failed,scheduleId={}", schedule.getId(), e);
+ results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(),
+ e.getMessage()));
+ }
+ }
+ return results;
+ }
+
+ private void innerTerminateInTx(ScheduleEntity schedule) {
+ new SpringTransactionManager(txTemplate)
+ .doInTransactionWithoutResult(() -> {
+ try {
+ innerTerminate(schedule.getId());
+ } catch (SchedulerException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
/**
* @param scheduleId the task must belong to a valid schedule,so this param is not be null.
* @param scheduleTaskId the task uid. Start a paused or pending task.
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java
index d1a119f2c7..fa692d9617 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java
@@ -17,39 +17,43 @@
import java.io.File;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
-import com.oceanbase.odc.common.security.PasswordUtils;
+import com.oceanbase.odc.common.task.RouteLogCallable;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
-import com.oceanbase.odc.core.shared.PreConditions;
import com.oceanbase.odc.core.shared.SingleOrganizationResource;
-import com.oceanbase.odc.core.shared.constant.OrganizationType;
-import com.oceanbase.odc.core.shared.constant.ResourceRoleName;
import com.oceanbase.odc.core.shared.constant.ResourceType;
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
+import com.oceanbase.odc.metadb.schedule.ScheduleEntity;
import com.oceanbase.odc.metadb.schedule.ScheduleRepository;
-import com.oceanbase.odc.service.common.util.OdcFileUtil;
-import com.oceanbase.odc.service.exporter.model.ExportProperties;
-import com.oceanbase.odc.service.exporter.model.ExportedFile;
+import com.oceanbase.odc.metadb.task.TaskEntity;
+import com.oceanbase.odc.service.common.FutureCache;
+import com.oceanbase.odc.service.connection.database.DatabaseService;
import com.oceanbase.odc.service.flow.FlowInstanceService;
import com.oceanbase.odc.service.iam.HorizontalDataPermissionValidator;
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
+import com.oceanbase.odc.service.iam.UserService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
+import com.oceanbase.odc.service.iam.model.User;
import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade;
-import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata;
-import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.export.model.FileExportResponse;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest;
-import com.oceanbase.odc.service.schedule.model.Schedule;
import com.oceanbase.odc.service.schedule.model.ScheduleMapper;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
+import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator;
+import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator;
+import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -76,7 +80,10 @@ public class ScheduleExportService {
private FlowInstanceService flowInstanceService;
@Autowired
- private ScheduleService scheduleService;
+ private BatchSchedulePermissionValidator batchSchedulePermissionValidator;
+
+ @Autowired
+ private UserService userService;
@Autowired
private HorizontalDataPermissionValidator horizontalDataPermissionValidator;
@@ -84,65 +91,95 @@ public class ScheduleExportService {
@Autowired
private ScheduleRepository scheduleRepository;
- private static String getPersonalBucketName(String userIdStr) {
- PreConditions.notEmpty(userIdStr, "userIdStr");
- return ASYNC_TASK_BASE_BUCKET.concat(File.separator).concat(userIdStr);
- }
+ @Autowired
+ private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator;
- public FileExportResponse export(ScheduleTaskExportRequest request) {
- checkRequestIdsPermission(request);
+ @Autowired
+ private ThreadPoolTaskExecutor commonAsyncTaskExecutor;
- ExportProperties properties = scheduleTaskExporter.generateExportProperties();
- String encryptKey = new BCryptPasswordEncoder().encode(PasswordUtils.random());
+ @Autowired
+ private DatabaseService databaseService;
- ExportedFile exportedFile = null;
- if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) {
- exportedFile = scheduleTaskExporter.exportPartitionPlan(encryptKey, properties,
- request.getIds());
- } else {
- exportedFile = scheduleTaskExporter.exportSchedule(request.getScheduleType(), encryptKey,
- properties, request.getIds());
- }
- return mapToFileExportResponse(exportedFile);
+ @Autowired
+ private FutureCache futureCache;
+
+ @Value("${odc.log.directory:./log}")
+ private String logPath;
+
+ private String getPersonalBucketName() {
+ return ASYNC_TASK_BASE_BUCKET.concat(File.separator).concat(authenticationFacade.currentUserIdStr());
}
+ public String startExport(ScheduleTaskExportRequest request) {
+ batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds());
+ String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleExport");
+ User user = authenticationFacade.currentUser();
+ Future future = commonAsyncTaskExecutor.submit(
+ new ScheduleTaskExportCallable(previewId, request, user, scheduleTaskExporter,
+ getPersonalBucketName(), objectStorageFacade));
+ futureCache.put(previewId, future);
+ return previewId;
+ }
- private void checkRequestIdsPermission(ScheduleTaskExportRequest request) {
- Set projectIds;
+ public List getExportListView(ScheduleTaskExportRequest request) {
+ batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds());
if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) {
- List flowInstanceEntities = flowInstanceService.listByIds(request.getIds());
- projectIds =
- flowInstanceEntities.stream().map(FlowInstanceEntity::getProjectId).collect(Collectors.toSet());
- List flowOrganizationIsolateds = flowInstanceEntities.stream().map(
- f -> new FlowOrganizationIsolated(f.getOrganizationId(), f.getId())).collect(
- Collectors.toList());
- horizontalDataPermissionValidator.checkCurrentOrganization(flowOrganizationIsolateds);
- } else {
- List scheduleEntities = scheduleRepository.findByIdIn(request.getIds()).stream()
- .map(ScheduleMapper.INSTANCE::entityToModel).collect(
- Collectors.toList());
- horizontalDataPermissionValidator.checkCurrentOrganization(scheduleEntities);
- projectIds = scheduleEntities.stream().map(Schedule::getProjectId).collect(Collectors.toSet());
- }
- if (authenticationFacade.currentOrganization().getType().equals(OrganizationType.TEAM)) {
- projectPermissionValidator.checkProjectRole(projectIds, ResourceRoleName.all());
+ return getPartitionPlanView(request);
}
+ List scheduleEntities = scheduleRepository.findByIdIn(request.getIds());
+ List view = scheduleEntities.stream().map(
+ ScheduleMapper.INSTANCE::toScheduleExportListView).collect(Collectors.toList());
+ databaseService.assignDatabaseById(view, ScheduleExportListView::getDatabaseId,
+ ScheduleExportListView::setDatabase);
+ return view;
}
+ private List getPartitionPlanView(ScheduleTaskExportRequest request) {
+ Map flowIn2TaskMap = flowInstanceService.getTaskByFlowInstanceIds(
+ request.getIds());
+ List flowInstanceEntities = flowInstanceService.listByIds(request.getIds());
+ List views = flowInstanceEntities.stream().map(f -> {
+ TaskEntity taskEntity = flowIn2TaskMap.get(f.getId());
+ ScheduleExportListView scheduleExportListView = new ScheduleExportListView();
+ scheduleExportListView.setScheduleType(ScheduleType.PARTITION_PLAN);
+ scheduleExportListView.setId(f.getId());
+ scheduleExportListView.setCreatorId(f.getCreatorId());
+ scheduleExportListView.setDatabaseId(taskEntity.getDatabaseId());
+ scheduleExportListView.setCreateTime(f.getCreateTime());
+ scheduleExportListView.setDescription(f.getDescription());
+ return scheduleExportListView;
+ }).collect(Collectors.toList());
+ databaseService.assignDatabaseById(views, ScheduleExportListView::getDatabaseId,
+ ScheduleExportListView::setDatabase);
+ userService.assignInnerUserByCreatorId(views, ScheduleExportListView::getCreatorId,
+ ScheduleExportListView::setCreator);
+ return views;
+ }
- private FileExportResponse mapToFileExportResponse(ExportedFile exportedFile) {
- FileExportResponse fileExportResponse = new FileExportResponse();
- fileExportResponse.setSecret(exportedFile.getSecret());
+ public FileExportResponse getExportResult(String exportId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(exportId);
+ Future> future = futureCache.get(exportId);
+ if (future == null) {
+ return null;
+ }
+ if (!future.isDone()) {
+ return FileExportResponse.exporting();
+ }
try {
- String bucketName = getPersonalBucketName(authenticationFacade.currentUserIdStr());
- objectStorageFacade.createBucketIfNotExists(bucketName);
- ObjectMetadata metadata = objectStorageFacade.putTempObject(bucketName, exportedFile.getFile());
- String downloadUrl = objectStorageFacade.getDownloadUrl(metadata.getBucketName(), metadata.getObjectId());
- fileExportResponse.setDownloadUrl(downloadUrl);
- } finally {
- OdcFileUtil.deleteFiles(exportedFile.getFile());
+ futureCache.invalid(exportId);
+ return (FileExportResponse) future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
- return fileExportResponse;
+ }
+
+ public String getExportLog(String exportId) {
+ statefulUuidStateIdGenerator.checkCurrentUserId(exportId);
+ String filePath = String.format(RouteLogCallable.LOG_PATH_PATTERN, logPath,
+ ScheduleTaskExportCallable.WORK_SPACE, exportId,
+ ScheduleTaskExportCallable.LOG_NAME);
+ File logFile = new File(filePath);
+ return LogUtils.getLatestLogContent(logFile, 10000L, 1048576L);
}
@Data
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java
new file mode 100644
index 0000000000..21a00cfa86
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.schedule.export;
+
+import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
+
+import com.oceanbase.odc.common.json.JsonUtils;
+import com.oceanbase.odc.common.security.PasswordUtils;
+import com.oceanbase.odc.common.task.RouteLogCallable;
+import com.oceanbase.odc.service.common.util.OdcFileUtil;
+import com.oceanbase.odc.service.exporter.model.ExportProperties;
+import com.oceanbase.odc.service.exporter.model.ExportedFile;
+import com.oceanbase.odc.service.iam.model.User;
+import com.oceanbase.odc.service.iam.util.SecurityContextUtils;
+import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade;
+import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata;
+import com.oceanbase.odc.service.schedule.export.model.FileExportResponse;
+import com.oceanbase.odc.service.schedule.export.model.FileExportStatus;
+import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest;
+import com.oceanbase.odc.service.schedule.model.ScheduleType;
+
+public class ScheduleTaskExportCallable extends RouteLogCallable {
+
+ public static final String WORK_SPACE = "scheduleTaskExport";
+ public static final String LOG_NAME = "export";
+ private final ScheduleTaskExportRequest request;
+ private final User user;
+ private final ScheduleTaskExporter scheduleTaskExporter;
+ private final String bucketName;
+ private final ObjectStorageFacade objectStorageFacade;
+
+ public ScheduleTaskExportCallable(String taskId,
+ ScheduleTaskExportRequest request, User user,
+ ScheduleTaskExporter scheduleTaskExporter, String bucketName, ObjectStorageFacade objectStorageFacade) {
+ super(WORK_SPACE, taskId, LOG_NAME);
+ this.request = request;
+ this.user = user;
+ this.scheduleTaskExporter = scheduleTaskExporter;
+ this.bucketName = bucketName;
+ this.objectStorageFacade = objectStorageFacade;
+
+ }
+
+ @Override
+ public FileExportResponse doCall() {
+ log.info("Start to export schedule, request={}", JsonUtils.toJson(request));
+ SecurityContextUtils.setCurrentUser(user);
+ try {
+ ExportProperties properties = scheduleTaskExporter.generateExportProperties();
+ String encryptKey = new BCryptPasswordEncoder().encode(PasswordUtils.random());
+
+ ExportedFile exportedFile = null;
+ if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) {
+ exportedFile = scheduleTaskExporter.exportPartitionPlan(encryptKey, properties,
+ request.getIds());
+ } else {
+ exportedFile = scheduleTaskExporter.exportSchedule(request.getScheduleType(), encryptKey,
+ properties, request.getIds());
+ }
+ return mapToFileExportResponse(exportedFile);
+ } catch (Exception e) {
+ log.error("Export schedule failed", e);
+ return FileExportResponse.failed(e.getMessage());
+ }
+
+ }
+
+ private FileExportResponse mapToFileExportResponse(ExportedFile exportedFile) {
+ FileExportResponse fileExportResponse = new FileExportResponse();
+ fileExportResponse.setSecret(exportedFile.getSecret());
+ fileExportResponse.setStatus(FileExportStatus.SUCCESS);
+ fileExportResponse.setFileName(exportedFile.getFile().getName());
+ try {
+ objectStorageFacade.createBucketIfNotExists(bucketName);
+ ObjectMetadata metadata = objectStorageFacade.putTempObject(bucketName, exportedFile.getFile());
+ String downloadUrl = objectStorageFacade.getDownloadUrl(metadata.getBucketName(), metadata.getObjectId());
+ fileExportResponse.setDownloadUrl(downloadUrl);
+ } finally {
+ OdcFileUtil.deleteFiles(exportedFile.getFile());
+ }
+ return fileExportResponse;
+ }
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java
index ec57c9182b..7235d96b16 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java
@@ -316,7 +316,7 @@ private ExportedDatabase getExportedDatabase(Long databaseId) {
}
private String getProjectName(Long projectId) {
- if (projectId == null) {
+ if (projectId == null || projectId < 0) {
return null;
}
return projectRepository.findById(projectId).map(ProjectEntity::getName).orElseThrow(NullPointerException::new);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java
index 5b4e7c657f..19e75c3992 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java
@@ -40,14 +40,17 @@
import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
+import lombok.extern.slf4j.Slf4j;
+
@Service
+@Slf4j
public class ScheduleTaskImportService {
@Autowired
private FutureCache futureCache;
@Autowired
- private ThreadPoolTaskExecutor scheduleImportExecutor;
+ private ThreadPoolTaskExecutor commonAsyncTaskExecutor;
@Autowired
private ScheduleTaskImporter scheduleTaskImporter;
@@ -65,10 +68,16 @@ public class ScheduleTaskImportService {
public String startPreviewImportTask(ScheduleTaskImportRequest request) {
String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleImportReview");
User user = authenticationFacade.currentUser();
- Future> future = scheduleImportExecutor.submit(
+ Future> future = commonAsyncTaskExecutor.submit(
() -> {
- SecurityContextUtils.setCurrentUser(user);
- return scheduleTaskImporter.preview(request);
+ try {
+ SecurityContextUtils.setCurrentUser(user);
+ return scheduleTaskImporter.preview(request);
+ } catch (Exception e) {
+ log.info("Preview Import task failed", e);
+ throw e;
+ }
+
});
futureCache.put(previewId, future);
return previewId;
@@ -103,7 +112,7 @@ public String startImportTask(ScheduleTaskImportRequest request) {
String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleImport");
User user = authenticationFacade.currentUser();
- Future> future = scheduleImportExecutor.submit(
+ Future> future = commonAsyncTaskExecutor.submit(
new ScheduleTaskImportCallable(user, previewId, scheduleTaskImporter, request));
futureCache.put(previewId, future);
return previewId;
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java
index cef8bd017a..9660acfa73 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java
@@ -83,6 +83,7 @@ public ScheduleRowPreviewDto preview() {
rowPreviewDto.setOriginProjectName(originProjectName);
rowPreviewDto.setDatabase(database);
rowPreviewDto.setTargetDatabase(targetDatabase);
+ rowPreviewDto.setDescription(description);
return rowPreviewDto;
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java
index e56d5055f7..e1736a1743 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java
@@ -21,8 +21,29 @@
@Data
public class FileExportResponse {
+ private String taskId;
+
+ private FileExportStatus status;
+
+ private String fileName;
+
private String downloadUrl;
@Exclude
private String secret;
+
+ private String failedReason;
+
+ public static FileExportResponse exporting() {
+ FileExportResponse fileExportResponse = new FileExportResponse();
+ fileExportResponse.setStatus(FileExportStatus.EXPORTING);
+ return fileExportResponse;
+ }
+
+ public static FileExportResponse failed(String failedReason) {
+ FileExportResponse fileExportResponse = new FileExportResponse();
+ fileExportResponse.setStatus(FileExportStatus.FAILED);
+ fileExportResponse.setFailedReason(failedReason);
+ return fileExportResponse;
+ }
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java
new file mode 100644
index 0000000000..b8c2d2c9bb
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.schedule.export.model;
+
+public enum FileExportStatus {
+ SUCCESS,
+ FAILED,
+ EXPORTING
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java
index 207cd5ad0e..65687c9bc0 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java
@@ -17,6 +17,7 @@
import javax.annotation.Nullable;
+import com.oceanbase.odc.common.i18n.Internationalizable;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import lombok.Data;
@@ -40,6 +41,9 @@ public class ImportScheduleTaskView {
@Nullable
private ScheduleNonImportableType nonImportableType;
+ @Internationalizable
+ private String description;
+
/**
* Schedule id of the system before export
*/
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java
new file mode 100644
index 0000000000..ba7f703ecd
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.schedule.export.model;
+
+import java.util.Date;
+
+import com.oceanbase.odc.common.i18n.Internationalizable;
+import com.oceanbase.odc.service.common.model.InnerUser;
+import com.oceanbase.odc.service.connection.database.model.Database;
+import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
+import com.oceanbase.odc.service.schedule.model.ScheduleType;
+
+import lombok.Data;
+
+@Data
+public class ScheduleExportListView {
+ private Long id;
+ private ScheduleType scheduleType;
+ private Long databaseId;
+ private Database database;
+ @Internationalizable
+ private String description;
+ private Long creatorId;
+ private InnerUser creator;
+ private ScheduleStatus scheduleStatus;
+ private Date createTime;
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java
index e74856b948..869751ba36 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java
@@ -27,6 +27,7 @@ public class ScheduleRowPreviewDto {
private String rowId;
private String originId;
private String originProjectName;
+ private String description;
private ScheduleType type;
private ExportedDatabase database;
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java
new file mode 100644
index 0000000000..735d1c5fe3
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.schedule.export.model;
+
+import java.util.List;
+
+import com.oceanbase.odc.service.schedule.model.ScheduleType;
+
+import lombok.Data;
+
+@Data
+public class ScheduleTerminateCmd {
+ private ScheduleType scheduleType;
+ private List ids;
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java
new file mode 100644
index 0000000000..34b8e84e39
--- /dev/null
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.schedule.export.model;
+
+import com.oceanbase.odc.service.schedule.model.ScheduleType;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ScheduleTerminateResult {
+
+ private Boolean terminateSucceed;
+
+ private ScheduleType scheduleType;
+
+ private Long id;
+
+ private String failReason;
+
+ public static ScheduleTerminateResult ofSuccess(ScheduleType scheduleType, Long id) {
+ return new ScheduleTerminateResult(true, scheduleType, id, null);
+ }
+
+ public static ScheduleTerminateResult ofFailed(ScheduleType scheduleType, Long id, String failReason) {
+ return new ScheduleTerminateResult(false, scheduleType, id, failReason);
+ }
+}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java
index 386bec6ccd..3f8829c605 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java
@@ -15,15 +15,18 @@
*/
package com.oceanbase.odc.service.schedule.job;
+import java.util.Map;
import java.util.Optional;
import org.quartz.JobExecutionContext;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
import com.oceanbase.odc.service.schedule.model.DataArchiveClearParameters;
+import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
import com.oceanbase.tools.migrator.common.enums.JobType;
import lombok.extern.slf4j.Slf4j;
@@ -62,11 +65,19 @@ public void executeJob(JobExecutionContext context) {
scheduleTaskRepository.updateStatusById(taskEntity.getId(), TaskStatus.FAILED);
return;
}
-
- DLMJobReq parameters = getDLMJobReqWithArchiveRange(dataArchiveTask.getJobId());
- parameters.setJobType(JobType.DELETE);
- parameters.setFireTime(context.getFireTime());
- parameters.setScheduleTaskId(taskEntity.getId());
+ DLMJobReq parameters;
+ if (taskEntity.getJobId() != null) {
+ parameters = JsonUtils.fromJson(JsonUtils.fromJson(
+ taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(),
+ new TypeReference