diff --git a/pom.xml b/pom.xml index e1e32a9859..42262c105b 100644 --- a/pom.xml +++ b/pom.xml @@ -782,11 +782,6 @@ commons-compress ${commons-compress.version} - - commons-codec - commons-codec - 1.14 - org.apache.commons commons-collections4 diff --git a/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java b/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java index c16f673f29..f7f14aba64 100644 --- a/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java +++ b/server/odc-common/src/main/java/com/oceanbase/odc/common/task/RouteLogCallable.java @@ -22,7 +22,7 @@ import org.slf4j.MDC; public abstract class RouteLogCallable implements Callable { - + public static final String LOG_PATH_PATTERN = "%s/%s/%s/%s.log"; protected static Logger log = LogManager.getLogger(RouteLogCallable.class); private final String workSpace; private final String taskId; diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java index 66aaab9581..fbc13a6c5a 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/alarm/AlarmUtils.java @@ -27,6 +27,7 @@ public final class AlarmUtils { * Base alarm message names */ public static final String CLUSTER_NAME = "Cluster"; + public static final String INSTANCE_NAME = "instanceId"; public static final String TENANT_NAME = "Tenant"; public static final String ORGANIZATION_NAME = "OrganizationId"; public static final String MESSAGE_NAME = "Message"; @@ -42,6 +43,7 @@ public final class AlarmUtils { public static final String RESOURCE_TYPE = "ResourceType"; public static final String TASK_TYPE_NAME = "TaskType"; public static final String SCHEDULE_ID_NAME = "ScheduleId"; + public static final String FLOW_INSTANCE_ID_NAME = "FlowInstanceId"; public static final Collection TASK_FRAMEWORK_ALARM_DIGEST_NAMES = Arrays.asList(CLUSTER_NAME, TENANT_NAME, SCHEDULE_ID_NAME); diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java index 0959ec64dc..d3193a3874 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ExportController.java @@ -15,6 +15,8 @@ */ package com.oceanbase.odc.server.web.controller.v2; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -25,7 +27,10 @@ import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.schedule.export.ScheduleExportService; import com.oceanbase.odc.service.schedule.export.model.FileExportResponse; +import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView; import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest; +import com.oceanbase.odc.service.state.model.StateName; +import com.oceanbase.odc.service.state.model.StatefulRoute; @RequestMapping("/api/v2/export") @RestController @@ -34,8 +39,27 @@ public class ExportController { @Autowired private ScheduleExportService scheduleExportService; - @RequestMapping(value = "/exportScheduleTask", method = RequestMethod.POST) - public SuccessResponse exportScheduleTask(@RequestBody ScheduleTaskExportRequest request) { - return Responses.success(scheduleExportService.export(request)); + @RequestMapping(value = "getExportListView", method = RequestMethod.POST) + public SuccessResponse> getExportListView( + @RequestBody ScheduleTaskExportRequest request) { + return Responses.success(scheduleExportService.getExportListView(request)); + } + + @RequestMapping(value = "/exportSchedule", method = RequestMethod.POST) + public SuccessResponse exportScheduleTask2(@RequestBody ScheduleTaskExportRequest request) { + return Responses.success(scheduleExportService.startExport(request)); + } + + @RequestMapping(value = "/getExportResult", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId") + public SuccessResponse exportScheduleTask(String exportId) { + return Responses.success(scheduleExportService.getExportResult(exportId)); + } + + @RequestMapping(value = "/getExportLog", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId") + public SuccessResponse getExportLog(String exportId) { + return Responses.success(scheduleExportService.getExportLog(exportId)); } + } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 2e1fd46293..abe2d688fc 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.server.web.controller.v2; import java.io.IOException; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -49,6 +50,7 @@ import com.oceanbase.odc.service.common.util.WebResponseUtils; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.flow.FlowTaskInstanceService; +import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult; import com.oceanbase.odc.service.flow.model.BinaryDataResult; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.FlowInstanceApprovalReq; @@ -61,6 +63,8 @@ import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.session.model.SqlExecuteResult; +import com.oceanbase.odc.service.state.model.StateName; +import com.oceanbase.odc.service.state.model.StatefulRoute; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; import io.swagger.annotations.ApiOperation; @@ -251,4 +255,24 @@ public SuccessResponse getPartitionPlan(@PathVariable Long return Responses.ok(this.partitionPlanScheduleService.getPartitionPlanByFlowInstanceId(id)); } + @ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程") + @RequestMapping(value = "/asyncCancel", method = RequestMethod.POST) + public SuccessResponse batchCancelFlowInstance(@RequestBody Collection flowInstanceIds) { + return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds)); + } + + @ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果") + @RequestMapping(value = "/asyncCancelResult", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") + public SuccessResponse> getBatchCancelResult(String terminateId) { + return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); + } + + @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志") + @RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") + public SuccessResponse getBatchCancelLog(String terminateId) { + return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); + } + } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java index ef2b76f005..02ed9279be 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java @@ -41,6 +41,8 @@ import com.oceanbase.odc.service.common.util.WebResponseUtils; import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult; import com.oceanbase.odc.service.schedule.model.ChangeScheduleResp; import com.oceanbase.odc.service.schedule.model.CreateScheduleReq; import com.oceanbase.odc.service.schedule.model.OperationType; @@ -59,6 +61,8 @@ import com.oceanbase.odc.service.schedule.model.ScheduleTaskOverview; import com.oceanbase.odc.service.schedule.model.ScheduleType; import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq; +import com.oceanbase.odc.service.state.model.StateName; +import com.oceanbase.odc.service.state.model.StatefulRoute; import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; @@ -286,6 +290,23 @@ public SuccessResponse updateLimiterConfig(@PathVariable return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig)); } + @RequestMapping(value = "/schedules/asyncTerminate", method = RequestMethod.POST) + public SuccessResponse startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) { + return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd)); + } + + @RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") + public SuccessResponse> getTerminateScheduleResult(String terminateId) { + return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId)); + } + + @RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") + public SuccessResponse getTerminateScheduleLog(String terminateId) { + return Responses.ok(scheduleService.getTerminateLog(terminateId)); + } + @RequestMapping(value = "/schedules/stats", method = RequestMethod.GET) public ListResponse getScheduleStats( @RequestParam(required = false, name = "types") Set types, diff --git a/server/odc-server/src/main/resources/log4j2.xml b/server/odc-server/src/main/resources/log4j2.xml index cc19b190f0..b5bb5c0c3f 100644 --- a/server/odc-server/src/main/resources/log4j2.xml +++ b/server/odc-server/src/main/resources/log4j2.xml @@ -918,6 +918,10 @@ + + + + diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java index 048c3ff01b..0f7f6e599a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java @@ -311,8 +311,8 @@ public ThreadPoolTaskExecutor queryProfileMonitorExecutor() { return executor; } - @Bean(name = "scheduleImportExecutor") - public ThreadPoolTaskExecutor scheduleImportExecutor() { + @Bean(name = "commonAsyncTaskExecutor") + public ThreadPoolTaskExecutor commonAsyncTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); int minPoolSize = Math.max(SystemUtils.availableProcessors(), 4); executor.setCorePoolSize(minPoolSize); @@ -324,7 +324,7 @@ public ThreadPoolTaskExecutor scheduleImportExecutor() { executor.setTaskDecorator(new TraceDecorator<>()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); - log.info("scheduleImportExecutor initialized"); + log.info("commonAsyncTaskExecutor initialized"); return executor; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java index b89e29400f..0f580227bd 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/collaboration/project/ProjectService.java @@ -85,6 +85,7 @@ import com.oceanbase.odc.service.iam.ProjectPermissionValidator; import com.oceanbase.odc.service.iam.ResourceRoleService; import com.oceanbase.odc.service.iam.UserOrganizationService; +import com.oceanbase.odc.service.iam.UserService; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.auth.AuthorizationFacade; import com.oceanbase.odc.service.iam.model.User; @@ -164,7 +165,8 @@ public class ProjectService { @Autowired @Lazy private FlowInstanceService flowInstanceService; - + @Autowired + private UserService userService; @Value("${odc.integration.bastion.enabled:false}") private boolean bastionEnabled; @@ -340,7 +342,11 @@ public Page list(@Valid QueryProjectParams params, @NotNull Pageable pa @SkipAuthorize("odc internal usage") public List listByIds(@NotEmpty Set ids) { - return repository.findAllById(ids).stream().map(this::entityToModel).collect(Collectors.toList()); + List projects = + repository.findAllById(ids).stream().map(projectMapper::entityToModel).collect(Collectors.toList()); + userService.assignInnerUserByCreatorId(projects, c -> c.getCreator().getId(), Project::setCreator); + userService.assignInnerUserByCreatorId(projects, c -> c.getLastModifier().getId(), Project::setCreator); + return projects; } private Page innerList(@Valid QueryProjectParams params, @NotNull Pageable pageable, diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java index 91503c0147..24eab9b127 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java @@ -32,6 +32,7 @@ public class FutureCache { private final Cache> tempId2Future = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES) + .maximumSize(1000L) .removalListener((String key, Future future, RemovalCause cause) -> { if (future != null) { future.cancel(true); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java index 4704f56ed0..36591aba14 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -1299,4 +1300,18 @@ private void deleteDatabaseIfInstanceNotExists(Long connectionId, OrganizationTy } } + + public void assignDatabaseById(List content, Function databaseIdProvider, + BiConsumer databaseSetter) { + List databaseIds = content.stream().map(databaseIdProvider).collect( + Collectors.toList()); + List entities = databaseRepository.findByIdIn(databaseIds); + List databases = entitiesToModels(new PageImpl<>(entities), false).getContent(); + Map idDatabaseEntityMap = databases.stream().collect( + Collectors.toMap(Database::getId, t -> t)); + content.forEach(c -> { + Database database = idDatabaseEntityMap.get(databaseIdProvider.apply(c)); + databaseSetter.accept(c, database); + }); + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java index 741d050819..63f8c2c39b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java @@ -154,6 +154,7 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException ps.setString(9, taskGenerator.getPartitionSavePoint()); ps.setString(10, JsonUtils.toJson(taskGenerator.getPartName2MinKey())); ps.setString(11, JsonUtils.toJson(taskGenerator.getPartName2MaxKey())); + ps.execute(); } } } @@ -226,6 +227,7 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException { ps.setString(7, taskMeta.getCursorPrimaryKey() == null ? "" : taskMeta.getCursorPrimaryKey().toSqlString()); ps.setString(8, taskMeta.getPartitionName()); + ps.execute(); } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java index f1211a4194..b47ac9994a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java @@ -130,6 +130,11 @@ public List findByScheduleTaskId(Long scheduleTaskId) { @SkipAuthorize("odc internal usage") public TaskStatus getFinalTaskStatus(Long scheduleTaskId) { List dlmTableUnits = findByScheduleTaskId(scheduleTaskId); + return getFinalTaskStatus(dlmTableUnits); + } + + @SkipAuthorize("odc internal usage") + public TaskStatus getFinalTaskStatus(List dlmTableUnits) { Set collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect( Collectors.toSet()); // If the tables do not exist or any table fails, the task is considered a failure. @@ -148,4 +153,5 @@ public TaskStatus getFinalTaskStatus(Long scheduleTaskId) { return TaskStatus.DONE; } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java index 8ddfaa5b0d..2213a57b69 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/exporter/impl/JsonExtractor.java @@ -143,7 +143,9 @@ public ExportRowDataReader getRowDataReader() throws IOException { ObjectMapper objectMapper = new ObjectMapper(jsonFactory); File configJson = getConfigJson(this.tempFilePath); Verify.notNull(configJson, "Invalid archived file"); - try (InputStream inputStream = Files.newInputStream(configJson.toPath())) { + InputStream inputStream = null; + try { + inputStream = Files.newInputStream(configJson.toPath()); JsonParser jsonParser = jsonFactory.createParser(inputStream); JsonToken jsonToken = jsonParser.nextToken(); @@ -177,6 +179,12 @@ public ExportRowDataReader getRowDataReader() throws IOException { throw new IllegalStateException("Expected data to be an Array"); } return new JsonRowDataReader(properties, jsonParser, objectMapper, exportedFile.getSecret(), tempFilePath); + } catch (Exception e) { + if (inputStream != null) { + inputStream.close(); + } + log.error("Get row data reader failed", e); + throw e; } } @@ -231,12 +239,19 @@ public ExportProperties getProperties() { @Override public R readRow(Class rowDataClass) throws IOException { - JsonNode jsonNode = readRow(); - R rowData = objectMapper.convertValue(jsonNode, rowDataClass); - if (rowData != null && encryptKey != null) { - rowData.decrypt(encryptKey); + try { + JsonNode jsonNode = readRow(); + R rowData = objectMapper.convertValue(jsonNode, rowDataClass); + log.info("read rowData={}", rowData); + if (rowData != null && encryptKey != null) { + rowData.decrypt(encryptKey); + } + return rowData; + } catch (Exception e) { + log.error("read row failed", e); + throw e; } - return rowData; + } @Override diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index 220ad353cf..4fdcdd9de3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -48,6 +49,7 @@ import org.flowable.engine.history.HistoricProcessInstanceQuery; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.data.domain.Page; @@ -59,6 +61,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.validation.annotation.Validated; import com.fasterxml.jackson.core.type.TypeReference; @@ -67,6 +70,7 @@ import com.oceanbase.odc.common.i18n.I18n; import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.common.lang.Holder; +import com.oceanbase.odc.common.task.RouteLogCallable; import com.oceanbase.odc.common.util.ObjectUtil; import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; @@ -103,6 +107,7 @@ import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig; import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; import com.oceanbase.odc.service.collaboration.project.ProjectService; +import com.oceanbase.odc.service.common.FutureCache; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.common.util.SpringContextUtil; import com.oceanbase.odc.service.common.util.SqlUtils; @@ -131,6 +136,7 @@ import com.oceanbase.odc.service.flow.instance.FlowInstanceConfigurer; import com.oceanbase.odc.service.flow.instance.FlowTaskInstance; import com.oceanbase.odc.service.flow.listener.AutoApproveUserTaskListener; +import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig; import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; @@ -158,6 +164,7 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.iam.model.UserResourceRole; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; import com.oceanbase.odc.service.integration.IntegrationService; import com.oceanbase.odc.service.integration.client.ApprovalClient; import com.oceanbase.odc.service.integration.model.ApprovalProperties; @@ -186,9 +193,12 @@ import com.oceanbase.odc.service.regulation.risklevel.model.RiskLevelDescriberIdentifier; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.model.ScheduleStatus; +import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.TaskService; import com.oceanbase.odc.service.task.base.precheck.PreCheckRiskLevel; +import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.ExecutorInfo; +import com.oceanbase.odc.service.task.service.SpringTransactionManager; import com.oceanbase.tools.loaddump.common.enums.ObjectType; import io.micrometer.core.instrument.Tag; @@ -277,6 +287,12 @@ public class FlowInstanceService { @Autowired private EnvironmentService environmentService; @Autowired + private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; + @Autowired + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; + @Autowired + private FutureCache futureCache; + @Autowired private FlowPermissionHelper flowPermissionHelper; @Autowired private MeterManager meterManager; @@ -288,7 +304,10 @@ public class FlowInstanceService { private ProjectService projectService; @Autowired private NamedParameterJdbcTemplate namedParameterJdbcTemplate; - + @Autowired + private TransactionTemplate transactionTemplate; + @Value("${odc.log.directory:./log}") + private String logPath; private static final long MAX_EXPORT_OBJECT_COUNT = 10000; private static final String ODC_SITE_URL = "odc.site.url"; private static final int MAX_APPLY_DATABASE_SIZE = 10; @@ -725,6 +744,53 @@ public FlowInstanceDetailResp cancelWithoutPermission(@NotNull Long id) { return cancel(flowInstance, true); } + public String startBatchCancelFlowInstance(Collection flowInstanceIds) { + String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("BatchFlowTerminate"); + User user = authenticationFacade.currentUser(); + Future> future = commonAsyncTaskExecutor.submit( + new RouteLogCallable>("BatchFlowTerminate", terminateId, "terminate") { + @Override + public List doCall() { + SecurityContextUtils.setCurrentUser(user); + List results = new ArrayList<>(); + for (Long id : flowInstanceIds) { + try { + new SpringTransactionManager(transactionTemplate) + .doInTransactionWithoutResult(() -> cancelWithWritePermission(id, false)); + results.add(BatchTerminateFlowResult.success(id)); + log.info("Terminate flow success, flowInstanceId={}", id); + } catch (Exception e) { + log.info("Terminate flow failed, flowInstanceId={}", id, e); + results.add(BatchTerminateFlowResult.failed(id, e.getMessage())); + } + } + return results; + } + }); + futureCache.put(terminateId, future); + return terminateId; + } + + public List getBatchCancelResult(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + Future> future = + (Future>) futureCache.get(terminateId); + if (!future.isDone()) { + return Collections.emptyList(); + } + try { + futureCache.invalid(terminateId); + return future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getBatchCancelLog(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + return LogUtils.getRouteTaskLog(logPath, "BatchFlowTerminate", terminateId, "terminate"); + } + public Map getStatus(Set ids) { Specification specification = Specification.where(FlowInstanceSpecs.idIn(ids)) .and(FlowInstanceSpecs.organizationIdEquals(authenticationFacade.currentOrganizationId())); @@ -927,6 +993,23 @@ public TaskEntity getTaskByFlowInstanceId(Long id) { return taskService.detail(taskId); } + public Map getTaskByFlowInstanceIds(Collection flowInstanceIds) { + List serviceTaskInstanceEntities = serviceTaskRepository + .findByFlowInstanceIdIn(flowInstanceIds) + .stream() + .filter(e -> e.getTaskType() != TaskType.GENERATE_ROLLBACK && e.getTaskType() != TaskType.SQL_CHECK + && e.getTaskType() != TaskType.PRE_CHECK) + .collect(Collectors.toList()); + Map flowId2taskIds = serviceTaskInstanceEntities.stream().collect( + Collectors.toMap(ServiceTaskInstanceEntity::getFlowInstanceId, + ServiceTaskInstanceEntity::getTargetTaskId, (exist, duplicate) -> exist)); + List taskEntities = taskService.findByIds(flowId2taskIds.values()); + Map idTaskEntityMap = + taskEntities.stream().collect(Collectors.toMap(TaskEntity::getId, t -> t)); + return flowId2taskIds.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, v -> idTaskEntityMap.get(v.getValue()))); + } + private void checkCreateFlowInstancePermission(CreateFlowInstanceReq req) { if (authenticationFacade.currentUser().getOrganizationType() == OrganizationType.INDIVIDUAL) { return; @@ -1524,7 +1607,7 @@ public Set listAlterScheduleIdsByScheduleIdAndStatus(Long scheduleId, Flow /** * This is a temporary method that only uses ODC 4.3.4 - * + * * @param params * @return */ @@ -1582,7 +1665,7 @@ public int getPartitionPlanCount(@NotNull InnerQueryFlowInstanceParams params) { /** * This is a temporary method that only uses ODC 4.3.4 - * + * * @param params * @return */ @@ -1613,7 +1696,7 @@ private List listPartitionPlanSubTaskStates(@NonNull InnerQue /** * This is a temporary method that only uses ODC 4.3.4 - * + * * @param params * @return */ @@ -1646,7 +1729,7 @@ private List listSqlPlanSubTaskStates(@NonNull InnerQueryFlow /** * This is a temporary method that only uses ODC 4.3.4 - * + * * @param params * @return */ diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java index 617e28a50e..daa6d1d2a2 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java @@ -34,6 +34,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import org.apache.commons.collections4.CollectionUtils; @@ -783,7 +784,10 @@ private void setDownloadUrlsIfNecessary(Long taskId, List flowInstances, boolean skipAuth) { return generateNodeMapper(getLongSet(flowInstances, FlowInstance::getId), @@ -254,10 +251,7 @@ private FlowNodeInstanceMapper generateNodeMapper(@NonNull Collection flow Specification serviceSpec = Specification.where(ServiceTaskInstanceSpecs.flowInstanceIdIn(flowInstanceIds)); List serviceEntities = serviceTaskRepository.findAll(serviceSpec); - Set taskIds = serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null) - .map(ServiceTaskInstanceEntity::getTargetTaskId).collect(Collectors.toSet()); - Map taskId2TaskEntity = listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream() - .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity)); + Map taskId2TaskEntity = getTaskEntityMap(serviceEntities); return FlowNodeInstanceMapper.builder() .getCandidatesByApprovalId(approvalId2Candidates::get) @@ -287,50 +281,24 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance if (flowInstanceIds.isEmpty()) { return FlowInstanceMapper.builder().build(); } - Specification serviceSpec = - Specification.where(ServiceTaskInstanceSpecs.flowInstanceIdIn(flowInstanceIds)); - List serviceEntities = serviceTaskRepository.findAll(serviceSpec); - - Map> flowInstanceId2ExecutionTime = serviceEntities.stream() - .filter(entity -> entity.getExecutionTime() != null) - .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId, - Collectors.mapping(ServiceTaskInstanceEntity::getExecutionTime, Collectors.toList()))); - - Map> flowInstanceId2ExecutionStrategy = serviceEntities.stream() - .filter(e -> e.getTaskType().needForExecutionStrategy()) - .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId, - Collectors.mapping(ServiceTaskInstanceEntity::getStrategy, Collectors.toList()))); - - Map parentInstanceIdMap = flowInstanceRepository - .findByParentInstanceIdIn(flowInstanceIds) - .stream().collect( - Collectors.toMap(ParentInstanceIdCount::getParentInstanceId, ParentInstanceIdCount::getCount)); - - Map flowInstanceId2Rollbackable = flowInstanceIds.stream().collect(Collectors - .toMap(Function.identity(), id -> MoreObjects.firstNonNull(parentInstanceIdMap.get(id), 0) == 0)); - + List serviceEntities = + serviceTaskRepository.findByFlowInstanceIdIn(new HashSet<>(flowInstanceIds)); + Map> flowInstanceId2ExecutionTime = mapExecutionTimes(serviceEntities); + Map> flowInstanceId2ExecutionStrategy = + mapExecutionStrategies(serviceEntities); + Map flowInstanceId2Rollbackable = mapRollbackableStatus(flowInstanceIds); /** + * * In order to improve the interface efficiency, it is necessary to find out the task entity * corresponding to the process instance at one time */ Map> flowInstanceId2Tasks = new HashMap<>(); - Set taskIds = serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null) - .map(ServiceTaskInstanceEntity::getTargetTaskId).collect(Collectors.toSet()); - Map taskId2TaskEntity = listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream() - .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity)); - serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null).forEach(entity -> { - Set taskEntities = - flowInstanceId2Tasks.computeIfAbsent(entity.getFlowInstanceId(), id -> new HashSet<>()); - TaskEntity taskEntity = taskId2TaskEntity.get(entity.getTargetTaskId()); - if (taskEntity != null) { - taskEntities.add(taskEntity); - } - }); + Map taskId2TaskEntity = getTaskEntityMap(serviceEntities); + populateTaskMappings(serviceEntities, flowInstanceId2Tasks, taskId2TaskEntity); /** * Get Database associated with each TaskEntity */ - Map id2Database = new HashMap<>(); Set databaseIds = taskId2TaskEntity.values().stream() .map(TaskEntity::getDatabaseId) .filter(Objects::nonNull).collect(Collectors.toSet()); @@ -340,59 +308,23 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance databaseIds.addAll(collectApplyDatabasePermissionDatabaseIds(taskId2TaskEntity)); databaseIds.addAll(collectApplyTablePermissionDatabaseIds(taskId2TaskEntity)); Set projectIds = new HashSet<>(); - Map id2Project = new HashMap<>(); - - if (CollectionUtils.isNotEmpty(databaseIds)) { - id2Database = databaseService.listDatabasesByIds(databaseIds).stream() - .collect(Collectors.toMap(Database::getId, database -> database)); - projectIds.addAll(id2Database.values().stream().map(db -> db.getProject().getId()) - .filter(Objects::nonNull).collect(Collectors.toSet())); - } - projectIds.addAll(collectApplyProjectIds(taskId2TaskEntity)); - if (CollectionUtils.isNotEmpty(projectIds)) { - id2Project = projectService.listByIds(projectIds).stream() - .collect(Collectors.toMap(Project::getId, project -> project, (a, b) -> a)); - } - /** - * find the ConnectionConfig associated with each Database - */ - Set connectionIds = id2Database.values().stream() - .filter(e -> e.getDataSource() != null && e.getDataSource().getId() != null) - .map(e -> e.getDataSource().getId()).collect(Collectors.toSet()); - Map id2Connection = listConnectionsByConnectionIdsWithoutPermissionCheck(connectionIds) - .stream().collect(Collectors.toMap(ConnectionEntity::getId, connectionMapper::entityToModel)); - id2Database.values().forEach(database -> { - if (id2Connection.containsKey(database.getDataSource().getId())) { - database.setDataSource(id2Connection.get(database.getDataSource().getId())); - } - }); - + Map id2Database = getIdDatabaseMapAndFillProjectIds( + databaseIds, projectIds, taskId2TaskEntity); + Map id2Project = getIdProjectMap(projectIds, skipAuth); /** * list candidates */ Map> candidatesByFlowInstanceIds = approvalPermissionService.getCandidatesByFlowInstanceIds(flowInstanceIds); - /** * In order to improve the interface efficiency, it is necessary to find out the user entity * corresponding to the process instance at one time */ - Set userIds = flowInstanceId2Tasks.values().stream() - .flatMap(Collection::stream) - .filter(entity -> entity.getCreatorId() != null) - .map(TaskEntity::getCreatorId).collect(Collectors.toSet()); - userIds.addAll(creatorIds); - Map userId2User = listUsersByUserIds(userIds) - .stream().collect(Collectors.toMap(UserEntity::getId, entity -> entity)); - + Map userId2User = getUserId2EntityMap(creatorIds, + flowInstanceId2Tasks); Map> userId2Roles = getUserId2Roles(userId2User.keySet(), skipAuth); + Set approvableFlowInstanceIds = getApprovableFlowInstanceIds(skipAuth); - Set approvableFlowInstanceIds = skipAuth ? Sets.newHashSet() - : approvalPermissionService.getApprovableApprovalInstances() - .stream() - .filter(entity -> FlowNodeStatus.EXECUTING == entity.getStatus() - || entity.getStatus() == FlowNodeStatus.WAIT_FOR_CONFIRM) - .map(UserTaskInstanceEntity::getFlowInstanceId).collect(Collectors.toSet()); return FlowInstanceMapper.builder() .ifRollbackable(flowInstanceId2Rollbackable::get) .ifApprovable(approvableFlowInstanceIds::contains) @@ -409,6 +341,124 @@ private FlowInstanceMapper generateMapper(@NonNull Collection flowInstance .build(); } + private void populateTaskMappings(List serviceEntities, + Map> flowInstanceId2Tasks, Map taskId2TaskEntity) { + serviceEntities.stream().filter(entity -> entity.getTargetTaskId() != null).forEach(entity -> { + Set taskEntities = + flowInstanceId2Tasks.computeIfAbsent(entity.getFlowInstanceId(), id -> new HashSet<>()); + TaskEntity taskEntity = taskId2TaskEntity.get(entity.getTargetTaskId()); + if (taskEntity != null) { + taskEntities.add(taskEntity); + } + }); + } + + private Map> mapExecutionStrategies( + List serviceEntities) { + return serviceEntities.stream() + .filter(e -> e.getTaskType().needForExecutionStrategy()) + .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId, + Collectors.mapping(ServiceTaskInstanceEntity::getStrategy, Collectors.toList()))); + } + + private Map> mapExecutionTimes(List serviceEntities) { + return serviceEntities.stream() + .filter(entity -> entity.getExecutionTime() != null) + .collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId, + Collectors.mapping(ServiceTaskInstanceEntity::getExecutionTime, Collectors.toList()))); + } + + private Map getIdProjectMap(Set projectIds, boolean skipAuth) { + Map id2Project = new HashMap<>(); + if (CollectionUtils.isNotEmpty(projectIds)) { + List projects = projectService.listByIds(projectIds); + if (!skipAuth) { + Map> projectId2ResourceRoleNames = + projectService.getProjectId2ResourceRoleNames(); + projects.forEach(p -> p.setCurrentUserResourceRoles( + projectId2ResourceRoleNames.getOrDefault(p.getId(), + Collections.emptySet()))); + id2Project = projects.stream() + .collect(Collectors.toMap(Project::getId, project -> project, (a, b) -> a)); + } + } + return id2Project; + } + + private Map getIdDatabaseMapAndFillProjectIds(Set databaseIds, Set projectIds, + Map taskId2TaskEntity) { + Map id2Database = new HashMap<>(); + if (CollectionUtils.isNotEmpty(databaseIds)) { + id2Database = databaseService.listDatabasesByIds(databaseIds).stream() + .collect(Collectors.toMap(Database::getId, database -> database)); + populateDatasourceToDatabase(id2Database); + projectIds.addAll(id2Database.values().stream().map(db -> db.getProject().getId()) + .filter(Objects::nonNull).collect(Collectors.toSet())); + } + projectIds.addAll(collectApplyProjectIds(taskId2TaskEntity)); + return id2Database; + } + + private Set getApprovableFlowInstanceIds(boolean skipAuth) { + return skipAuth ? Sets.newHashSet() + : approvalPermissionService.getApprovableApprovalInstances() + .stream() + .filter(entity -> FlowNodeStatus.EXECUTING == entity.getStatus() + || entity.getStatus() == FlowNodeStatus.WAIT_FOR_CONFIRM) + .map(UserTaskInstanceEntity::getFlowInstanceId).collect(Collectors.toSet()); + } + + private Map getUserId2EntityMap(Set creatorIds, + Map> flowInstanceId2Tasks) { + Set userIds = flowInstanceId2Tasks.values().stream() + .flatMap(Collection::stream) + .map(TaskEntity::getCreatorId) + .filter(Objects::nonNull).collect(Collectors.toSet()); + userIds.addAll(creatorIds); + return listUsersByUserIds(userIds) + .stream().collect(Collectors.toMap(UserEntity::getId, entity -> entity)); + } + + private void populateDatasourceToDatabase(Map id2Database) { + Set connectionIds = id2Database.values().stream() + .filter(e -> e.getDataSource() != null && e.getDataSource().getId() != null) + .map(e -> e.getDataSource().getId()).collect(Collectors.toSet()); + Map id2Connection = listConnectionsByConnectionIdsWithoutPermissionCheck(connectionIds) + .stream().collect(Collectors.toMap(ConnectionEntity::getId, connectionMapper::entityToModel)); + id2Database.values().forEach(database -> { + if (id2Connection.containsKey(database.getDataSource().getId())) { + database.setDataSource(id2Connection.get(database.getDataSource().getId())); + } + }); + } + + private Set getDatabaseIds(Map taskId2TaskEntity) { + Set databaseIds = taskId2TaskEntity.values().stream() + .map(TaskEntity::getDatabaseId) + .filter(Objects::nonNull).collect(Collectors.toSet()); + + databaseIds.addAll(collectMultiDatabaseChangeDatabaseIds(taskId2TaskEntity)); + databaseIds.addAll(collectDBStructureComparisonDatabaseIds(taskId2TaskEntity)); + return databaseIds; + } + + private Map getTaskEntityMap(List serviceEntities) { + Set taskIds = serviceEntities.stream().map(ServiceTaskInstanceEntity::getTargetTaskId) + .filter(Objects::nonNull).collect(Collectors.toSet()); + return listTasksByTaskIdsWithoutPermissionCheck(taskIds).stream() + .collect(Collectors.toMap(TaskEntity::getId, taskEntity -> taskEntity)); + } + + private Map mapRollbackableStatus(Collection flowInstanceIds) { + Map parentInstanceIdMap = flowInstanceRepository + .findByParentInstanceIdIn(flowInstanceIds) + .stream().collect( + Collectors.toMap(ParentInstanceIdCount::getParentInstanceId, ParentInstanceIdCount::getCount)); + + return flowInstanceIds.stream().collect(Collectors + .toMap(Function.identity(), id -> MoreObjects.firstNonNull(parentInstanceIdMap.get(id), 0) == 0)); + } + public Map> getUserId2Roles(@NonNull Collection userIds, boolean skipAuth) { List userRoleEntities = skipAuth ? userRoleRepository.findByRoleIdIn(userIds) : userRoleRepository.findByOrganizationIdAndUserIdIn(authenticationFacade.currentOrganizationId(), diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java new file mode 100644 index 0000000000..da107cd5e3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class BatchTerminateFlowResult { + private Boolean terminateSucceed; + private Long flowInstanceId; + private String failReason; + + public static BatchTerminateFlowResult success(Long id) { + return new BatchTerminateFlowResult(true, id, null); + } + + public static BatchTerminateFlowResult failed(Long id, String failReason) { + return new BatchTerminateFlowResult(false, id, failReason); + + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java index 8aa6a87d5c..d9373d24f4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/iam/UserService.java @@ -98,6 +98,7 @@ import com.oceanbase.odc.metadb.iam.UserRoleRepository; import com.oceanbase.odc.metadb.iam.UserSpecs; import com.oceanbase.odc.service.automation.model.TriggerEvent; +import com.oceanbase.odc.service.common.model.InnerUser; import com.oceanbase.odc.service.common.response.CustomPage; import com.oceanbase.odc.service.common.response.PaginatedData; import com.oceanbase.odc.service.common.util.SpringContextUtil; @@ -1082,6 +1083,20 @@ public void assignCreatorNameByCreatorId(Collection elements, Function void assignInnerUserByCreatorId(List content, Function userIdProvider, + BiConsumer innerUserSetter) { + List userIds = content.stream().map(userIdProvider).collect( + Collectors.toList()); + List entities = userRepository.findByIdIn(userIds); + Map idUserEntityMap = entities.stream().collect( + Collectors.toMap(UserEntity::getId, t -> t)); + content.forEach(c -> { + UserEntity userEntity = idUserEntityMap.get(userIdProvider.apply(c)); + innerUserSetter.accept(c, new InnerUser(userEntity, null)); + }); + } + @SkipAuthorize("odc internal usage") public List listByBoundOrganizationId(@NotNull Long organizationId) { return userRepository.findByBoundOrganization(organizationId).stream().map(User::new) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java index 3bfec747fc..216db110e5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/onlineschemachange/oscfms/action/oms/ProjectStepResultChecker.java @@ -171,11 +171,12 @@ protected boolean checkStepFinished(OmsStepName name) { switch (name) { case INCR_TRANSFER: Long chkInTimestamp = progressResponse.getIncrSyncCheckpoint(); - return status == OmsStepStatus.MONITORING && competedFunc.apply(progress) - // why set check value to 25 seconds. cause oms collect checkpoint every 10 seconds and oms writer - // save checkpoint - // every 10 seconds - // max gap time is 20, we set to 25 to let it pass. + return status == OmsStepStatus.MONITORING + // why set check value to 25 seconds. cause oms collect checkpoint every 10 seconds and oms + // writer + // save checkpoint + // every 10 seconds + // max gap time is 20, we set to 25 to let it pass. && (null != chkInTimestamp && (chkInTimestamp > currentSeconds || Math.abs(currentSeconds - chkInTimestamp) <= 25)); case FULL_VERIFIER: diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java index 40c0f999d5..4fa93543ae 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanScheduleService.java @@ -28,7 +28,9 @@ import org.apache.commons.lang3.Validate; import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; import com.fasterxml.jackson.core.type.TypeReference; import com.oceanbase.odc.common.json.JsonUtils; @@ -44,6 +46,7 @@ import com.oceanbase.odc.metadb.partitionplan.PartitionPlanTablePartitionKeyRepository; import com.oceanbase.odc.metadb.partitionplan.PartitionPlanTableRepository; import com.oceanbase.odc.metadb.schedule.ScheduleEntity; +import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.flow.FlowInstanceService; @@ -55,6 +58,8 @@ import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTableConfig; import com.oceanbase.odc.service.quartz.model.MisfireStrategy; import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult; import com.oceanbase.odc.service.schedule.model.ScheduleStatus; import com.oceanbase.odc.service.schedule.model.ScheduleType; import com.oceanbase.odc.service.schedule.model.TriggerConfig; @@ -87,9 +92,12 @@ public class PartitionPlanScheduleService { @Autowired private PartitionPlanTablePartitionKeyRepository partitionPlanTablePartitionKeyRepository; @Autowired + @Lazy private FlowInstanceService flowInstanceService; @Autowired private FlowInstanceRepository flowInstanceRepository; + @Autowired + private TransactionTemplate transactionTemplate; public PartitionPlanConfig getPartitionPlanByFlowInstanceId(@NonNull Long flowInstanceId) { FlowInstanceDetailResp resp = this.flowInstanceService.detail(flowInstanceId); @@ -208,6 +216,29 @@ public void submit(@NonNull PartitionPlanConfig partitionPlanConfig) } } + public void processTerminatePartitionPlan(ScheduleTerminateCmd cmd, List results) { + Map flowInstanceId2TaskEntity = flowInstanceService.getTaskByFlowInstanceIds(cmd.getIds()); + for (Map.Entry entry : flowInstanceId2TaskEntity.entrySet()) { + Long flowInstanceId = entry.getKey(); + TaskEntity taskEntity = entry.getValue(); + try { + transactionTemplate.executeWithoutResult((status) -> { + try { + disablePartitionPlan(taskEntity.getDatabaseId()); + } catch (Exception e) { + status.setRollbackOnly(); + throw new RuntimeException(e); + } + }); + results.add(ScheduleTerminateResult.ofSuccess(cmd.getScheduleType(), flowInstanceId)); + log.info("PartitionPlan task stop success, flowInstanceId={}", flowInstanceId); + } catch (Exception e) { + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), flowInstanceId, e.getMessage())); + log.info("PartitionPlan task stop failed, flowInstanceId={}", flowInstanceId, e); + } + } + } + @Transactional(rollbackOn = Exception.class) public void disablePartitionPlan(@NonNull Long databaseId) throws SchedulerException { List ppEntities = this.partitionPlanRepository.findByDatabaseIdAndEnabled( diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java index d37fb4ecef..f9b659b64d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java @@ -105,6 +105,15 @@ public void run(JobExecutionContext context) { public OdcJob getOdcJob(JobExecutionContext context) { JobKey key = context.getJobDetail().getKey(); ScheduleTaskType taskType = ScheduleTaskType.valueOf(key.getGroup()); + if (taskType == ScheduleTaskType.DATA_ARCHIVE && context.getResult() != null) { + try { + ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); + taskType = ScheduleTaskType.valueOf(taskEntity.getJobGroup()); + log.info("Recovery task,scheduleTaskId={}", taskEntity.getId()); + } catch (Exception e) { + log.warn("Load history task failed,jobKey={}", key); + } + } switch (taskType) { case SQL_PLAN: return new SqlPlanJob(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 716967ee1b..81dd9abcfd 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.Function; @@ -57,6 +58,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.domain.Specification; import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionTemplate; @@ -65,11 +67,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.task.RouteLogCallable; import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.config.jpa.OdcJpaRepository; import com.oceanbase.odc.core.alarm.AlarmUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.constant.FlowStatus; import com.oceanbase.odc.core.shared.constant.OrganizationType; @@ -93,6 +97,7 @@ import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository; import com.oceanbase.odc.service.collaboration.project.ProjectService; import com.oceanbase.odc.service.collaboration.project.model.Project; +import com.oceanbase.odc.service.common.FutureCache; import com.oceanbase.odc.service.common.util.SpringContextUtil; import com.oceanbase.odc.service.connection.ConnectionService; import com.oceanbase.odc.service.connection.database.DatabaseService; @@ -114,11 +119,15 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.Organization; import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; +import com.oceanbase.odc.service.partitionplan.PartitionPlanScheduleService; import com.oceanbase.odc.service.quartz.QuartzJobServiceProxy; import com.oceanbase.odc.service.quartz.model.MisfireStrategy; import com.oceanbase.odc.service.quartz.util.QuartzCronExpressionUtils; import com.oceanbase.odc.service.regulation.approval.ApprovalFlowConfigSelector; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult; import com.oceanbase.odc.service.schedule.factory.ScheduleResponseMapperFactory; import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleParameters; import com.oceanbase.odc.service.schedule.flowtask.ApprovalFlowClient; @@ -154,12 +163,16 @@ import com.oceanbase.odc.service.schedule.model.TriggerStrategy; import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq; import com.oceanbase.odc.service.schedule.processor.ScheduleChangePreprocessor; +import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator; import com.oceanbase.odc.service.schedule.util.ScheduleDescriptionGenerator; import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters; +import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.odc.service.task.exception.JobException; +import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; import com.oceanbase.odc.service.task.schedule.JobScheduler; +import com.oceanbase.odc.service.task.service.SpringTransactionManager; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; @@ -177,11 +190,11 @@ @SkipAuthorize public class ScheduleService { + private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; @Value("${odc.task.trigger.minimum-interval:600}") private int minInterval; @Autowired private ScheduleRepository scheduleRepository; - @Autowired private ScheduleTaskRepository scheduleTaskRepository; @Autowired @@ -189,75 +202,68 @@ public class ScheduleService { @Autowired @Qualifier("quartzJobServiceProxy") private QuartzJobServiceProxy quartzJobService; - @Autowired private ObjectStorageFacade objectStorageFacade; - @Autowired private FlowInstanceRepository flowInstanceRepository; - + @Autowired + private PartitionPlanScheduleService partitionPlanScheduleService; @Autowired private ScheduleTaskService scheduleTaskService; - @Autowired private ScheduleResponseMapperFactory scheduleResponseMapperFactory; - @Autowired @Lazy private ProjectService projectService; - @Autowired private ProjectPermissionValidator projectPermissionValidator; - @Autowired private ApprovalFlowConfigSelector approvalFlowConfigSelector; - @Autowired private DatabaseService databaseService; - @Autowired private EnvironmentRepository environmentRepository; - @Autowired private ScheduleChangeLogService scheduleChangeLogService; - @Autowired private OrganizationService organizationService; - @Autowired private ScheduleChangePreprocessor preprocessor; - @Autowired private LatestTaskMappingRepository latestTaskMappingRepository; - @Autowired private ConnectionService connectionService; - @Autowired private DlmLimiterService dlmLimiterService; - @Autowired private UserService userService; - @Autowired private ScheduledTaskLoggerService scheduledTaskLoggerService; - @Autowired private JdbcLockRegistry jdbcLockRegistry; - @Autowired private ApprovalFlowClient approvalFlowService; - @Autowired private ScheduleDescriptionGenerator descriptionGenerator; + @Autowired + private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; + @Autowired + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; + @Autowired + private FutureCache futureCache; + + @Autowired + private BatchSchedulePermissionValidator batchSchedulePermissionValidator; + + @Value("${odc.log.directory:./log}") + private String logPath; + @Autowired private TransactionTemplate txTemplate; @Autowired @Lazy private FlowInstanceService flowInstanceService; - private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; - @Transactional(rollbackFor = Exception.class) public List dispatchCreateSchedule(CreateFlowInstanceReq createReq) { AlterScheduleParameters parameters = (AlterScheduleParameters) createReq.getParameters(); @@ -721,6 +727,108 @@ public void stopTask(Long scheduleId, Long scheduleTaskId) { } } + public String startTerminateScheduleAndTask(ScheduleTerminateCmd cmd) { + batchSchedulePermissionValidator.checkScheduleIdsPermission(cmd.getScheduleType(), cmd.getIds()); + User user = authenticationFacade.currentUser(); + String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("ScheduleTerminate"); + Future> future = commonAsyncTaskExecutor.submit( + new RouteLogCallable>("ScheduleTerminate", terminateId, "terminate") { + @Override + public List doCall() { + SecurityContextUtils.setCurrentUser(user); + return syncTerminateScheduleAndTask(cmd); + } + }); + futureCache.put(terminateId, future); + return terminateId; + } + + public List getTerminateScheduleResult(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + Future future = futureCache.get(terminateId); + if (!future.isDone()) { + return Collections.emptyList(); + } + try { + futureCache.invalid(terminateId); + return (List) future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getTerminateLog(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + return LogUtils.getRouteTaskLog(logPath, "ScheduleTerminate", terminateId, "terminate"); + } + + public List syncTerminateScheduleAndTask(ScheduleTerminateCmd cmd) { + log.info("Start to terminate schedule, type={}, scheduleIds={}", cmd.getScheduleType(), cmd.getIds()); + List results = new ArrayList<>(); + if (ScheduleType.PARTITION_PLAN.equals(cmd.getScheduleType())) { + // The partition plan uses Schedule, but it is created through flow, and the front end also displays + // it through flow. + // To ensure that the customer see the same id, the flowInstanceId is used + partitionPlanScheduleService.processTerminatePartitionPlan(cmd, results); + return results; + } + List scheduleEntities = scheduleRepository.findByIdIn(cmd.getIds()); + Verify.verify(Objects.equals(scheduleEntities.size(), cmd.getIds().size()), "Invalid schedule Ids"); + for (ScheduleEntity schedule : scheduleEntities) { + try { + Optional latestTaskEntity = + scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup( + String.valueOf(schedule.getId()), + schedule.getType().name()); + if (!latestTaskEntity.isPresent() || !latestTaskEntity.get().getStatus().isProcessing()) { + innerTerminateInTx(schedule); + log.info("Schedule task stop success, scheduleId={}", schedule.getId()); + results.add(ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); + continue; + } + ScheduleTaskEntity scheduleTaskEntity = latestTaskEntity.get(); + scheduleTaskService.stop(scheduleTaskEntity.getId()); + final int maxRetryTimes = 30; + int retryTimes = 0; + while (retryTimes < maxRetryTimes) { + latestTaskEntity = scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup( + String.valueOf(schedule.getId()), + schedule.getType().name()); + if (latestTaskEntity.get().getStatus().isTerminated()) { + innerTerminateInTx(schedule); + results.add( + ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); + log.info("Schedule task stop success, scheduleId={}", schedule.getId()); + break; + } + retryTimes++; + Thread.sleep(2000); + } + log.info( + "Wait task 60s, still not terminate, please try again. Schedule task stop Failed, scheduleId={}", + schedule.getId()); + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(), + "Wait task 60s, still not terminate, please try again.")); + } catch (Exception e) { + log.error("Terminate schedule task failed,scheduleId={}", schedule.getId(), e); + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(), + e.getMessage())); + } + } + return results; + } + + private void innerTerminateInTx(ScheduleEntity schedule) { + new SpringTransactionManager(txTemplate) + .doInTransactionWithoutResult(() -> { + try { + innerTerminate(schedule.getId()); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + }); + } + /** * @param scheduleId the task must belong to a valid schedule,so this param is not be null. * @param scheduleTaskId the task uid. Start a paused or pending task. diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java index d1a119f2c7..fa692d9617 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java @@ -17,39 +17,43 @@ import java.io.File; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; -import com.oceanbase.odc.common.security.PasswordUtils; +import com.oceanbase.odc.common.task.RouteLogCallable; import com.oceanbase.odc.core.authority.util.SkipAuthorize; -import com.oceanbase.odc.core.shared.PreConditions; import com.oceanbase.odc.core.shared.SingleOrganizationResource; -import com.oceanbase.odc.core.shared.constant.OrganizationType; -import com.oceanbase.odc.core.shared.constant.ResourceRoleName; import com.oceanbase.odc.core.shared.constant.ResourceType; import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.metadb.schedule.ScheduleRepository; -import com.oceanbase.odc.service.common.util.OdcFileUtil; -import com.oceanbase.odc.service.exporter.model.ExportProperties; -import com.oceanbase.odc.service.exporter.model.ExportedFile; +import com.oceanbase.odc.metadb.task.TaskEntity; +import com.oceanbase.odc.service.common.FutureCache; +import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.iam.HorizontalDataPermissionValidator; import com.oceanbase.odc.service.iam.ProjectPermissionValidator; +import com.oceanbase.odc.service.iam.UserService; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; -import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata; -import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.export.model.FileExportResponse; +import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView; import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest; -import com.oceanbase.odc.service.schedule.model.Schedule; import com.oceanbase.odc.service.schedule.model.ScheduleMapper; import com.oceanbase.odc.service.schedule.model.ScheduleType; +import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator; +import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; +import com.oceanbase.odc.service.task.executor.logger.LogUtils; import lombok.AllArgsConstructor; import lombok.Data; @@ -76,7 +80,10 @@ public class ScheduleExportService { private FlowInstanceService flowInstanceService; @Autowired - private ScheduleService scheduleService; + private BatchSchedulePermissionValidator batchSchedulePermissionValidator; + + @Autowired + private UserService userService; @Autowired private HorizontalDataPermissionValidator horizontalDataPermissionValidator; @@ -84,65 +91,95 @@ public class ScheduleExportService { @Autowired private ScheduleRepository scheduleRepository; - private static String getPersonalBucketName(String userIdStr) { - PreConditions.notEmpty(userIdStr, "userIdStr"); - return ASYNC_TASK_BASE_BUCKET.concat(File.separator).concat(userIdStr); - } + @Autowired + private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; - public FileExportResponse export(ScheduleTaskExportRequest request) { - checkRequestIdsPermission(request); + @Autowired + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; - ExportProperties properties = scheduleTaskExporter.generateExportProperties(); - String encryptKey = new BCryptPasswordEncoder().encode(PasswordUtils.random()); + @Autowired + private DatabaseService databaseService; - ExportedFile exportedFile = null; - if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) { - exportedFile = scheduleTaskExporter.exportPartitionPlan(encryptKey, properties, - request.getIds()); - } else { - exportedFile = scheduleTaskExporter.exportSchedule(request.getScheduleType(), encryptKey, - properties, request.getIds()); - } - return mapToFileExportResponse(exportedFile); + @Autowired + private FutureCache futureCache; + + @Value("${odc.log.directory:./log}") + private String logPath; + + private String getPersonalBucketName() { + return ASYNC_TASK_BASE_BUCKET.concat(File.separator).concat(authenticationFacade.currentUserIdStr()); } + public String startExport(ScheduleTaskExportRequest request) { + batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds()); + String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleExport"); + User user = authenticationFacade.currentUser(); + Future future = commonAsyncTaskExecutor.submit( + new ScheduleTaskExportCallable(previewId, request, user, scheduleTaskExporter, + getPersonalBucketName(), objectStorageFacade)); + futureCache.put(previewId, future); + return previewId; + } - private void checkRequestIdsPermission(ScheduleTaskExportRequest request) { - Set projectIds; + public List getExportListView(ScheduleTaskExportRequest request) { + batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds()); if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) { - List flowInstanceEntities = flowInstanceService.listByIds(request.getIds()); - projectIds = - flowInstanceEntities.stream().map(FlowInstanceEntity::getProjectId).collect(Collectors.toSet()); - List flowOrganizationIsolateds = flowInstanceEntities.stream().map( - f -> new FlowOrganizationIsolated(f.getOrganizationId(), f.getId())).collect( - Collectors.toList()); - horizontalDataPermissionValidator.checkCurrentOrganization(flowOrganizationIsolateds); - } else { - List scheduleEntities = scheduleRepository.findByIdIn(request.getIds()).stream() - .map(ScheduleMapper.INSTANCE::entityToModel).collect( - Collectors.toList()); - horizontalDataPermissionValidator.checkCurrentOrganization(scheduleEntities); - projectIds = scheduleEntities.stream().map(Schedule::getProjectId).collect(Collectors.toSet()); - } - if (authenticationFacade.currentOrganization().getType().equals(OrganizationType.TEAM)) { - projectPermissionValidator.checkProjectRole(projectIds, ResourceRoleName.all()); + return getPartitionPlanView(request); } + List scheduleEntities = scheduleRepository.findByIdIn(request.getIds()); + List view = scheduleEntities.stream().map( + ScheduleMapper.INSTANCE::toScheduleExportListView).collect(Collectors.toList()); + databaseService.assignDatabaseById(view, ScheduleExportListView::getDatabaseId, + ScheduleExportListView::setDatabase); + return view; } + private List getPartitionPlanView(ScheduleTaskExportRequest request) { + Map flowIn2TaskMap = flowInstanceService.getTaskByFlowInstanceIds( + request.getIds()); + List flowInstanceEntities = flowInstanceService.listByIds(request.getIds()); + List views = flowInstanceEntities.stream().map(f -> { + TaskEntity taskEntity = flowIn2TaskMap.get(f.getId()); + ScheduleExportListView scheduleExportListView = new ScheduleExportListView(); + scheduleExportListView.setScheduleType(ScheduleType.PARTITION_PLAN); + scheduleExportListView.setId(f.getId()); + scheduleExportListView.setCreatorId(f.getCreatorId()); + scheduleExportListView.setDatabaseId(taskEntity.getDatabaseId()); + scheduleExportListView.setCreateTime(f.getCreateTime()); + scheduleExportListView.setDescription(f.getDescription()); + return scheduleExportListView; + }).collect(Collectors.toList()); + databaseService.assignDatabaseById(views, ScheduleExportListView::getDatabaseId, + ScheduleExportListView::setDatabase); + userService.assignInnerUserByCreatorId(views, ScheduleExportListView::getCreatorId, + ScheduleExportListView::setCreator); + return views; + } - private FileExportResponse mapToFileExportResponse(ExportedFile exportedFile) { - FileExportResponse fileExportResponse = new FileExportResponse(); - fileExportResponse.setSecret(exportedFile.getSecret()); + public FileExportResponse getExportResult(String exportId) { + statefulUuidStateIdGenerator.checkCurrentUserId(exportId); + Future future = futureCache.get(exportId); + if (future == null) { + return null; + } + if (!future.isDone()) { + return FileExportResponse.exporting(); + } try { - String bucketName = getPersonalBucketName(authenticationFacade.currentUserIdStr()); - objectStorageFacade.createBucketIfNotExists(bucketName); - ObjectMetadata metadata = objectStorageFacade.putTempObject(bucketName, exportedFile.getFile()); - String downloadUrl = objectStorageFacade.getDownloadUrl(metadata.getBucketName(), metadata.getObjectId()); - fileExportResponse.setDownloadUrl(downloadUrl); - } finally { - OdcFileUtil.deleteFiles(exportedFile.getFile()); + futureCache.invalid(exportId); + return (FileExportResponse) future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } - return fileExportResponse; + } + + public String getExportLog(String exportId) { + statefulUuidStateIdGenerator.checkCurrentUserId(exportId); + String filePath = String.format(RouteLogCallable.LOG_PATH_PATTERN, logPath, + ScheduleTaskExportCallable.WORK_SPACE, exportId, + ScheduleTaskExportCallable.LOG_NAME); + File logFile = new File(filePath); + return LogUtils.getLatestLogContent(logFile, 10000L, 1048576L); } @Data diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java new file mode 100644 index 0000000000..21a00cfa86 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExportCallable.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export; + +import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.security.PasswordUtils; +import com.oceanbase.odc.common.task.RouteLogCallable; +import com.oceanbase.odc.service.common.util.OdcFileUtil; +import com.oceanbase.odc.service.exporter.model.ExportProperties; +import com.oceanbase.odc.service.exporter.model.ExportedFile; +import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; +import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; +import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata; +import com.oceanbase.odc.service.schedule.export.model.FileExportResponse; +import com.oceanbase.odc.service.schedule.export.model.FileExportStatus; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest; +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +public class ScheduleTaskExportCallable extends RouteLogCallable { + + public static final String WORK_SPACE = "scheduleTaskExport"; + public static final String LOG_NAME = "export"; + private final ScheduleTaskExportRequest request; + private final User user; + private final ScheduleTaskExporter scheduleTaskExporter; + private final String bucketName; + private final ObjectStorageFacade objectStorageFacade; + + public ScheduleTaskExportCallable(String taskId, + ScheduleTaskExportRequest request, User user, + ScheduleTaskExporter scheduleTaskExporter, String bucketName, ObjectStorageFacade objectStorageFacade) { + super(WORK_SPACE, taskId, LOG_NAME); + this.request = request; + this.user = user; + this.scheduleTaskExporter = scheduleTaskExporter; + this.bucketName = bucketName; + this.objectStorageFacade = objectStorageFacade; + + } + + @Override + public FileExportResponse doCall() { + log.info("Start to export schedule, request={}", JsonUtils.toJson(request)); + SecurityContextUtils.setCurrentUser(user); + try { + ExportProperties properties = scheduleTaskExporter.generateExportProperties(); + String encryptKey = new BCryptPasswordEncoder().encode(PasswordUtils.random()); + + ExportedFile exportedFile = null; + if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) { + exportedFile = scheduleTaskExporter.exportPartitionPlan(encryptKey, properties, + request.getIds()); + } else { + exportedFile = scheduleTaskExporter.exportSchedule(request.getScheduleType(), encryptKey, + properties, request.getIds()); + } + return mapToFileExportResponse(exportedFile); + } catch (Exception e) { + log.error("Export schedule failed", e); + return FileExportResponse.failed(e.getMessage()); + } + + } + + private FileExportResponse mapToFileExportResponse(ExportedFile exportedFile) { + FileExportResponse fileExportResponse = new FileExportResponse(); + fileExportResponse.setSecret(exportedFile.getSecret()); + fileExportResponse.setStatus(FileExportStatus.SUCCESS); + fileExportResponse.setFileName(exportedFile.getFile().getName()); + try { + objectStorageFacade.createBucketIfNotExists(bucketName); + ObjectMetadata metadata = objectStorageFacade.putTempObject(bucketName, exportedFile.getFile()); + String downloadUrl = objectStorageFacade.getDownloadUrl(metadata.getBucketName(), metadata.getObjectId()); + fileExportResponse.setDownloadUrl(downloadUrl); + } finally { + OdcFileUtil.deleteFiles(exportedFile.getFile()); + } + return fileExportResponse; + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java index ec57c9182b..7235d96b16 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskExporter.java @@ -316,7 +316,7 @@ private ExportedDatabase getExportedDatabase(Long databaseId) { } private String getProjectName(Long projectId) { - if (projectId == null) { + if (projectId == null || projectId < 0) { return null; } return projectRepository.findById(projectId).map(ProjectEntity::getName).orElseThrow(NullPointerException::new); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java index 5b4e7c657f..19e75c3992 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java @@ -40,14 +40,17 @@ import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.executor.logger.LogUtils; +import lombok.extern.slf4j.Slf4j; + @Service +@Slf4j public class ScheduleTaskImportService { @Autowired private FutureCache futureCache; @Autowired - private ThreadPoolTaskExecutor scheduleImportExecutor; + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; @Autowired private ScheduleTaskImporter scheduleTaskImporter; @@ -65,10 +68,16 @@ public class ScheduleTaskImportService { public String startPreviewImportTask(ScheduleTaskImportRequest request) { String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleImportReview"); User user = authenticationFacade.currentUser(); - Future> future = scheduleImportExecutor.submit( + Future> future = commonAsyncTaskExecutor.submit( () -> { - SecurityContextUtils.setCurrentUser(user); - return scheduleTaskImporter.preview(request); + try { + SecurityContextUtils.setCurrentUser(user); + return scheduleTaskImporter.preview(request); + } catch (Exception e) { + log.info("Preview Import task failed", e); + throw e; + } + }); futureCache.put(previewId, future); return previewId; @@ -103,7 +112,7 @@ public String startImportTask(ScheduleTaskImportRequest request) { String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleImport"); User user = authenticationFacade.currentUser(); - Future> future = scheduleImportExecutor.submit( + Future> future = commonAsyncTaskExecutor.submit( new ScheduleTaskImportCallable(user, previewId, scheduleTaskImporter, request)); futureCache.put(previewId, future); return previewId; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java index cef8bd017a..9660acfa73 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/BaseScheduleRowData.java @@ -83,6 +83,7 @@ public ScheduleRowPreviewDto preview() { rowPreviewDto.setOriginProjectName(originProjectName); rowPreviewDto.setDatabase(database); rowPreviewDto.setTargetDatabase(targetDatabase); + rowPreviewDto.setDescription(description); return rowPreviewDto; } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java index e56d5055f7..e1736a1743 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportResponse.java @@ -21,8 +21,29 @@ @Data public class FileExportResponse { + private String taskId; + + private FileExportStatus status; + + private String fileName; + private String downloadUrl; @Exclude private String secret; + + private String failedReason; + + public static FileExportResponse exporting() { + FileExportResponse fileExportResponse = new FileExportResponse(); + fileExportResponse.setStatus(FileExportStatus.EXPORTING); + return fileExportResponse; + } + + public static FileExportResponse failed(String failedReason) { + FileExportResponse fileExportResponse = new FileExportResponse(); + fileExportResponse.setStatus(FileExportStatus.FAILED); + fileExportResponse.setFailedReason(failedReason); + return fileExportResponse; + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java new file mode 100644 index 0000000000..b8c2d2c9bb --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/FileExportStatus.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +public enum FileExportStatus { + SUCCESS, + FAILED, + EXPORTING +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java index 207cd5ad0e..65687c9bc0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ImportScheduleTaskView.java @@ -17,6 +17,7 @@ import javax.annotation.Nullable; +import com.oceanbase.odc.common.i18n.Internationalizable; import com.oceanbase.odc.service.schedule.model.ScheduleType; import lombok.Data; @@ -40,6 +41,9 @@ public class ImportScheduleTaskView { @Nullable private ScheduleNonImportableType nonImportableType; + @Internationalizable + private String description; + /** * Schedule id of the system before export */ diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java new file mode 100644 index 0000000000..ba7f703ecd --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleExportListView.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +import java.util.Date; + +import com.oceanbase.odc.common.i18n.Internationalizable; +import com.oceanbase.odc.service.common.model.InnerUser; +import com.oceanbase.odc.service.connection.database.model.Database; +import com.oceanbase.odc.service.schedule.model.ScheduleStatus; +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.Data; + +@Data +public class ScheduleExportListView { + private Long id; + private ScheduleType scheduleType; + private Long databaseId; + private Database database; + @Internationalizable + private String description; + private Long creatorId; + private InnerUser creator; + private ScheduleStatus scheduleStatus; + private Date createTime; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java index e74856b948..869751ba36 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleRowPreviewDto.java @@ -27,6 +27,7 @@ public class ScheduleRowPreviewDto { private String rowId; private String originId; private String originProjectName; + private String description; private ScheduleType type; private ExportedDatabase database; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java new file mode 100644 index 0000000000..735d1c5fe3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +import java.util.List; + +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.Data; + +@Data +public class ScheduleTerminateCmd { + private ScheduleType scheduleType; + private List ids; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java new file mode 100644 index 0000000000..34b8e84e39 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ScheduleTerminateResult { + + private Boolean terminateSucceed; + + private ScheduleType scheduleType; + + private Long id; + + private String failReason; + + public static ScheduleTerminateResult ofSuccess(ScheduleType scheduleType, Long id) { + return new ScheduleTerminateResult(true, scheduleType, id, null); + } + + public static ScheduleTerminateResult ofFailed(ScheduleType scheduleType, Long id, String failReason) { + return new ScheduleTerminateResult(false, scheduleType, id, failReason); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java index 386bec6ccd..3f8829c605 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java @@ -15,15 +15,18 @@ */ package com.oceanbase.odc.service.schedule.job; +import java.util.Map; import java.util.Optional; import org.quartz.JobExecutionContext; +import com.fasterxml.jackson.core.type.TypeReference; import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity; import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; import com.oceanbase.odc.service.schedule.model.DataArchiveClearParameters; +import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.tools.migrator.common.enums.JobType; import lombok.extern.slf4j.Slf4j; @@ -62,11 +65,19 @@ public void executeJob(JobExecutionContext context) { scheduleTaskRepository.updateStatusById(taskEntity.getId(), TaskStatus.FAILED); return; } - - DLMJobReq parameters = getDLMJobReqWithArchiveRange(dataArchiveTask.getJobId()); - parameters.setJobType(JobType.DELETE); - parameters.setFireTime(context.getFireTime()); - parameters.setScheduleTaskId(taskEntity.getId()); + DLMJobReq parameters; + if (taskEntity.getJobId() != null) { + parameters = JsonUtils.fromJson(JsonUtils.fromJson( + taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(), + new TypeReference>() {}) + .get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), + DLMJobReq.class); + } else { + parameters = getDLMJobReqWithArchiveRange(dataArchiveTask.getJobId()); + parameters.setJobType(JobType.DELETE); + parameters.setFireTime(context.getFireTime()); + parameters.setScheduleTaskId(taskEntity.getId()); + } parameters .setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName()))); Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(), diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java index 2bae16c43a..f77425d0d0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java @@ -15,15 +15,18 @@ */ package com.oceanbase.odc.service.schedule.job; +import java.util.Map; import java.util.Optional; import org.quartz.JobExecutionContext; +import com.fasterxml.jackson.core.type.TypeReference; import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity; import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; import com.oceanbase.odc.service.schedule.model.DataArchiveRollbackParameters; +import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.tools.migrator.common.configure.DataSourceInfo; import com.oceanbase.tools.migrator.common.enums.JobType; @@ -58,20 +61,29 @@ public void executeJob(JobExecutionContext context) { DataArchiveParameters dataArchiveParameters = JsonUtils.fromJson(dataArchiveTask.getParametersJson(), DataArchiveParameters.class); // execute in task framework. - DLMJobReq parameters = getDLMJobReqWithArchiveRange(dataArchiveTask.getJobId()); - parameters.setJobType(JobType.ROLLBACK); - DataSourceInfo tempDataSource = parameters.getSourceDs(); - parameters.setSourceDs(parameters.getTargetDs()); - parameters.setTargetDs(tempDataSource); - parameters.setFireTime(context.getFireTime()); + DLMJobReq parameters; + if (taskEntity.getJobId() != null) { + parameters = JsonUtils.fromJson(JsonUtils.fromJson( + taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(), + new TypeReference>() {}) + .get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), + DLMJobReq.class); + } else { + parameters = getDLMJobReqWithArchiveRange(dataArchiveTask.getJobId()); + parameters.setJobType(JobType.ROLLBACK); + DataSourceInfo tempDataSource = parameters.getSourceDs(); + parameters.setSourceDs(parameters.getTargetDs()); + parameters.setTargetDs(tempDataSource); + parameters.setFireTime(context.getFireTime()); + parameters.getTables().forEach(o -> { + String temp = o.getTableName(); + o.setTableName(o.getTargetTableName()); + o.setTargetTableName(temp); + }); + parameters.setScheduleTaskId(taskEntity.getId()); + } parameters .setRateLimit(limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName()))); - parameters.getTables().forEach(o -> { - String temp = o.getTableName(); - o.setTableName(o.getTargetTableName()); - o.setTargetTableName(temp); - }); - parameters.setScheduleTaskId(taskEntity.getId()); Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(), dataArchiveParameters.getSourceDatabaseId()); log.info("Publish DLM job to task framework succeed,scheduleTaskId={},jobIdentity={}", taskEntity.getId(), diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java index b4a3d15097..09cb399a1e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java @@ -16,7 +16,9 @@ package com.oceanbase.odc.service.schedule.job; +import java.text.MessageFormat; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -25,6 +27,8 @@ import org.quartz.JobExecutionContext; import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.core.alarm.AlarmEventNames; +import com.oceanbase.odc.core.alarm.AlarmUtils; import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy; import com.oceanbase.odc.core.shared.constant.TaskType; @@ -138,7 +142,7 @@ public void execute(JobExecutionContext context) { resps.stream().flatMap(i -> i.getSqls().stream()).collect(Collectors.toList()), paramemters.getTimeoutMillis(), paramemters.getErrorStrategy()); } catch (Exception e) { - log.warn("Failed to execute a partition plan task", e); + doAlarm(e, scheduleEntity, paramemters); if (this.notificationProperties.isEnabled()) { try { Event event = this.eventBuilder.ofFailedTask(this.flowInstanceService @@ -160,6 +164,18 @@ public void execute(JobExecutionContext context) { } } + private static void doAlarm(Exception e, ScheduleEntity scheduleEntity, PartitionPlanConfig paramemters) { + log.warn("Failed to execute a partition plan task", e); + Map eventMessage = AlarmUtils.createAlarmMapBuilder() + .item(AlarmUtils.SCHEDULE_ID_NAME, scheduleEntity.getId().toString()) + .item(AlarmUtils.FLOW_INSTANCE_ID_NAME, paramemters.getFlowInstanceId().toString()) + .item(AlarmUtils.MESSAGE_NAME, + MessageFormat.format( + "PartitionPlan execute failed, msg={0}", e.getMessage())) + .build(); + AlarmUtils.alarm(AlarmEventNames.TASK_EXECUTION_FAILED, eventMessage); + } + private void submitSubDatabaseChangeTask(Long parentFlowInstanceId, Long databaseId, List sqls, long timeoutMillis, TaskErrorStrategy errorStrategy) { if (CollectionUtils.isEmpty(sqls)) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleMapper.java index 3fd949fe98..ea4dc95b40 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleMapper.java @@ -28,6 +28,7 @@ import com.oceanbase.odc.service.loaddata.model.LoadDataParameters; import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeParameters; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig; +import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView; import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters; /** @@ -47,6 +48,10 @@ public interface ScheduleMapper { ScheduleEntity modelToEntity(Schedule model); + @Mapping(target = "scheduleStatus", source = "entity.status") + @Mapping(target = "scheduleType", source = "entity.type") + ScheduleExportListView toScheduleExportListView(ScheduleEntity entity); + @Named("entityToParameters") default ScheduleTaskParameters entityToParameters(ScheduleEntity entity) { switch (entity.getType()) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java new file mode 100644 index 0000000000..a9bef0c2a5 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.util; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import com.oceanbase.odc.core.shared.SingleOrganizationResource; +import com.oceanbase.odc.core.shared.constant.OrganizationType; +import com.oceanbase.odc.core.shared.constant.ResourceRoleName; +import com.oceanbase.odc.core.shared.constant.ResourceType; +import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.schedule.ScheduleRepository; +import com.oceanbase.odc.service.flow.FlowInstanceService; +import com.oceanbase.odc.service.iam.HorizontalDataPermissionValidator; +import com.oceanbase.odc.service.iam.ProjectPermissionValidator; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.schedule.model.Schedule; +import com.oceanbase.odc.service.schedule.model.ScheduleMapper; +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Component +public class BatchSchedulePermissionValidator { + + @Autowired + private AuthenticationFacade authenticationFacade; + + @Autowired + private ProjectPermissionValidator projectPermissionValidator; + + @Autowired + @Lazy + private FlowInstanceService flowInstanceService; + + @Autowired + private HorizontalDataPermissionValidator horizontalDataPermissionValidator; + + @Autowired + private ScheduleRepository scheduleRepository; + + + public void checkScheduleIdsPermission(ScheduleType scheduleType, Collection ids) { + Set projectIds; + if (scheduleType.equals(ScheduleType.PARTITION_PLAN)) { + List flowInstanceEntities = flowInstanceService.listByIds(ids); + projectIds = + flowInstanceEntities.stream().map(FlowInstanceEntity::getProjectId).collect(Collectors.toSet()); + List flowOrganizationIsolateds = flowInstanceEntities.stream().map( + f -> new FlowOrganizationIsolated(f.getOrganizationId(), f.getId())).collect( + Collectors.toList()); + horizontalDataPermissionValidator.checkCurrentOrganization(flowOrganizationIsolateds); + } else { + List scheduleEntities = scheduleRepository.findByIdIn(ids).stream() + .map(ScheduleMapper.INSTANCE::entityToModel).collect( + Collectors.toList()); + horizontalDataPermissionValidator.checkCurrentOrganization(scheduleEntities); + projectIds = scheduleEntities.stream().map(Schedule::getProjectId).collect(Collectors.toSet()); + } + if (authenticationFacade.currentOrganization().getType().equals(OrganizationType.TEAM)) { + projectPermissionValidator.checkProjectRole(projectIds, ResourceRoleName.all()); + } + } + + + @Data + @AllArgsConstructor + private final static class FlowOrganizationIsolated implements SingleOrganizationResource { + + private Long organizationId; + private Long id; + + @Override + public String resourceType() { + return ResourceType.ODC_FLOW_INSTANCE.name(); + } + + @Override + public Long organizationId() { + return organizationId; + } + + @Override + public Long id() { + return id; + } + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java index 749d34da0a..0cc770d11f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java @@ -18,6 +18,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -367,6 +368,10 @@ private TaskEntity nullSafeFindById(Long id) { .orElseThrow(() -> new NotFoundException(ResourceType.ODC_TASK, "id", id)); } + public List findByIds(Collection ids) { + return taskRepository.findByIdIn(ids); + } + private void innerCancel(@NonNull Long id, Object taskResult) { TaskEntity taskEntity = nullSafeFindById(id); taskEntity.setStatus(TaskStatus.CANCELED); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/config/JobSchedulerFactoryBean.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/config/JobSchedulerFactoryBean.java index d7fee27141..269f6d6bc3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/config/JobSchedulerFactoryBean.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/config/JobSchedulerFactoryBean.java @@ -22,7 +22,6 @@ import com.oceanbase.odc.service.task.listener.DefaultJobProcessUpdateListener; import com.oceanbase.odc.service.task.listener.DefaultJobTerminateListener; -import com.oceanbase.odc.service.task.listener.JobTerminateNotifyListener; import com.oceanbase.odc.service.task.schedule.JobScheduler; import com.oceanbase.odc.service.task.schedule.StdJobScheduler; @@ -37,8 +36,6 @@ public class JobSchedulerFactoryBean implements FactoryBean, Initi private JobScheduler jobScheduler; - @Autowired - private JobTerminateNotifyListener jobTerminateNotifyListener; @Autowired private DefaultJobProcessUpdateListener defaultJobProcessUpdateListener; @Autowired @@ -60,7 +57,6 @@ public Class getObjectType() { @Override public void afterPropertiesSet() throws Exception { jobScheduler = new StdJobScheduler(jobConfiguration); - jobScheduler.getEventPublisher().addEventListener(jobTerminateNotifyListener); jobScheduler.getEventPublisher().addEventListener(defaultJobProcessUpdateListener); jobScheduler.getEventPublisher().addEventListener(defaultJobTerminateListener); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java index fd6d66a8e8..4c5903dde6 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java @@ -16,6 +16,8 @@ package com.oceanbase.odc.service.task.executor.logger; +import static com.oceanbase.odc.common.task.RouteLogCallable.LOG_PATH_PATTERN; + import java.io.File; import java.nio.charset.StandardCharsets; import java.util.LinkedList; @@ -97,4 +99,15 @@ public static String generateScheduleTaskLogFileName(@NonNull Long scheduleId, L return String.format(SCHEDULE_LOG_FILE_NAME_PATTERN, scheduleId, scheduleTaskId); } + public static String getRouteTaskLog(String logPath, String workspace, String taskId, String fileName) { + File logFile = getLogFileFromCurrentMachine(logPath, workspace, taskId, fileName); + return LogUtils.getLatestLogContent(logFile, 10000L, 1048576L); + } + + private static File getLogFileFromCurrentMachine(String logPath, String workspace, String taskId, String fileName) { + String filePath = String.format(LOG_PATH_PATTERN, logPath, workspace, taskId, fileName); + log.info("GetLogFilePath: {}", filePath); + return new File(filePath); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java index d4921b8c06..6660403695 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java @@ -29,6 +29,9 @@ import com.oceanbase.odc.core.alarm.AlarmUtils; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.metadb.task.JobEntity; +import com.oceanbase.odc.service.notification.Broker; +import com.oceanbase.odc.service.notification.NotificationProperties; +import com.oceanbase.odc.service.notification.helper.EventBuilder; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.ScheduleTaskService; import com.oceanbase.odc.service.schedule.alarm.ScheduleAlarmUtils; @@ -58,6 +61,12 @@ public class DefaultJobTerminateListener extends AbstractEventListener terminateProcessors; + @Autowired + private NotificationProperties notificationProperties; + @Autowired + private Broker broker; + @Autowired + private EventBuilder eventBuilder; @Override public void onEvent(JobTerminateEvent event) { @@ -65,11 +74,11 @@ public void onEvent(JobTerminateEvent event) { scheduleTaskService.findByJobId(jobEntity.getId()).ifPresent(scheduleTask -> { // correct status TaskStatus taskStatus = TerminateProcessor.correctTaskStatus(terminateProcessors, jobEntity.getJobType(), - scheduleTask, event.getStatus().convertTaskStatus()); - // correct current local variable to right status - scheduleTask.setStatus(taskStatus); + scheduleTask, event.getStatus().convertTaskStatus(), event.getTaskResult()); + // correct to final status scheduleTaskService.updateStatusById(scheduleTask.getId(), taskStatus); log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, scheduleTask.getId()); + scheduleTask.setStatus(taskStatus); // Refresh the schedule status after the task is completed. scheduleService.refreshScheduleStatus(Long.parseLong(scheduleTask.getJobName())); // Trigger the alarm if the task is failed or canceled. @@ -80,6 +89,7 @@ public void onEvent(JobTerminateEvent event) { if (event.getStatus() == JobStatus.EXEC_TIMEOUT) { ScheduleAlarmUtils.timeout(scheduleTask.getId()); } + notify(scheduleTask); // invoke task related processor doProcessor(jobEntity, scheduleTask); }); @@ -105,4 +115,16 @@ private void doProcessor(JobEntity jobEntity, ScheduleTask scheduleTask) { } } } + + private void notify(ScheduleTask task) { + if (!notificationProperties.isEnabled()) { + return; + } + try { + broker.enqueueEvent(task.getStatus() == TaskStatus.DONE ? eventBuilder.ofSucceededTask(task) + : eventBuilder.ofFailedTask(task)); + } catch (Exception e) { + log.warn("Failed to enqueue event.", e); + } + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateEvent.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateEvent.java index f580a5133e..c4fd8cd61f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateEvent.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateEvent.java @@ -19,6 +19,7 @@ import com.oceanbase.odc.common.event.AbstractEvent; import com.oceanbase.odc.service.task.enums.JobStatus; +import com.oceanbase.odc.service.task.executor.TaskResult; import com.oceanbase.odc.service.task.schedule.JobIdentity; import lombok.Getter; @@ -36,6 +37,9 @@ public class JobTerminateEvent extends AbstractEvent { @Getter private final JobStatus status; + @Getter + private TaskResult taskResult; + @Nullable @Getter private String errorMessage; @@ -52,6 +56,11 @@ public JobTerminateEvent(JobIdentity ji, JobStatus status) { this.status = status; } + public JobTerminateEvent(JobIdentity ji, JobStatus status, TaskResult taskResult) { + this(ji, status); + this.taskResult = taskResult; + } + public JobTerminateEvent(JobIdentity ji, JobStatus status, String errorMessage) { this(ji, status); this.errorMessage = errorMessage; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java deleted file mode 100644 index be4126cca9..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/JobTerminateNotifyListener.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.oceanbase.odc.service.task.listener; - -import java.util.List; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import com.oceanbase.odc.common.event.AbstractEventListener; -import com.oceanbase.odc.core.shared.constant.TaskStatus; -import com.oceanbase.odc.metadb.task.JobEntity; -import com.oceanbase.odc.service.notification.Broker; -import com.oceanbase.odc.service.notification.NotificationProperties; -import com.oceanbase.odc.service.notification.helper.EventBuilder; -import com.oceanbase.odc.service.schedule.ScheduleTaskService; -import com.oceanbase.odc.service.task.processor.terminate.TerminateProcessor; -import com.oceanbase.odc.service.task.service.TaskFrameworkService; - -import lombok.extern.slf4j.Slf4j; - -/** - * @author liuyizhuo.lyz - * @date 2024/2/23 - */ -@Slf4j -@Component("jobTerminateNotifyListener") -public class JobTerminateNotifyListener extends AbstractEventListener { - - @Autowired - private TaskFrameworkService taskFrameworkService; - @Autowired - private NotificationProperties notificationProperties; - @Autowired - private Broker broker; - @Autowired - private EventBuilder eventBuilder; - @Autowired - private ScheduleTaskService scheduleTaskService; - @Autowired - private List terminateProcessors; - - @Override - public void onEvent(JobTerminateEvent event) { - if (!notificationProperties.isEnabled()) { - return; - } - try { - JobEntity jobEntity = taskFrameworkService.find(event.getJi().getId()); - scheduleTaskService.findByJobId(jobEntity.getId()) - .ifPresent(task -> { - TaskStatus status = TerminateProcessor.correctTaskStatus(terminateProcessors, - jobEntity.getJobType(), task, jobEntity.getStatus().convertTaskStatus()); - broker.enqueueEvent(status == TaskStatus.DONE ? eventBuilder.ofSucceededTask(task) - : eventBuilder.ofFailedTask(task)); - }); - } catch (Exception e) { - log.warn("Failed to enqueue event.", e); - } - } - -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java index dc19b0ff0c..26b467a276 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/DLMTerminateProcessor.java @@ -31,6 +31,7 @@ import com.oceanbase.odc.service.schedule.job.DLMJobReq; import com.oceanbase.odc.service.schedule.model.ScheduleTask; import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; +import com.oceanbase.odc.service.task.executor.TaskResult; import com.oceanbase.odc.service.task.processor.matcher.DLMProcessorMatcher; import com.oceanbase.tools.migrator.common.enums.JobType; @@ -50,16 +51,25 @@ public class DLMTerminateProcessor extends DLMProcessorMatcher implements Termin @Autowired protected ScheduleTaskService scheduleTaskService; - public TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus) { + public TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus, TaskResult taskResult) { // correct sub task status - List dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId()); + List dlmTableUnits; + if (taskResult != null) { + dlmTableUnits = JsonUtils.fromJsonList(taskResult.getResultJson(), DlmTableUnit.class); + } else { + dlmTableUnits = dlmService.findByScheduleTaskId(scheduleTask.getId()); + } dlmTableUnits.forEach(dlmTableUnit -> { if (!dlmTableUnit.getStatus().isTerminated()) { dlmTableUnit.setStatus(TaskStatus.CANCELED); } }); dlmService.createOrUpdateDlmTableUnits(dlmTableUnits); - return dlmService.getFinalTaskStatus(scheduleTask.getId()); + TaskStatus correctStatus = + currentStatus == TaskStatus.EXEC_TIMEOUT || currentStatus == TaskStatus.CANCELED ? TaskStatus.CANCELED + : dlmService.getFinalTaskStatus(dlmTableUnits); + log.info("Correct status to {},scheduleTaskId={}", correctStatus, scheduleTask.getId()); + return correctStatus; } @Override diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/TerminateProcessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/TerminateProcessor.java index db5e4bad76..556c53edcf 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/TerminateProcessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/terminate/TerminateProcessor.java @@ -23,6 +23,7 @@ import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.metadb.task.JobEntity; import com.oceanbase.odc.service.schedule.model.ScheduleTask; +import com.oceanbase.odc.service.task.executor.TaskResult; import com.oceanbase.odc.service.task.processor.ProcessorMatcher; /** @@ -38,7 +39,7 @@ public interface TerminateProcessor extends ProcessorMatcher { * @param currentStatus * @return */ - default TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus) { + default TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus currentStatus, TaskResult taskResult) { return currentStatus; } @@ -48,12 +49,12 @@ default TaskStatus correctTaskStatus(ScheduleTask scheduleTask, TaskStatus curre void process(ScheduleTask scheduleTask, JobEntity jobEntity); static TaskStatus correctTaskStatus(Collection terminateProcessors, String jobType, - ScheduleTask scheduleTask, TaskStatus currentStatus) { + ScheduleTask scheduleTask, TaskStatus currentStatus, TaskResult taskResult) { List correctedStatus = new ArrayList<>(); // return first matched for (TerminateProcessor processor : terminateProcessors) { if (processor.interested(jobType)) { - correctedStatus.add(processor.correctTaskStatus(scheduleTask, currentStatus)); + correctedStatus.add(processor.correctTaskStatus(scheduleTask, currentStatus, taskResult)); } } if (correctedStatus.size() > 1) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java index ebac2992ec..85bc941a02 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java @@ -491,7 +491,7 @@ public void publishEvent(TaskResult result, JobEntity jobEntity, JobStatus expec if (publisher != null && result.getStatus() != null && result.getStatus().isTerminated()) { taskResultPublisherExecutor.execute(() -> publisher - .publishEvent(new JobTerminateEvent(result.getJobIdentity(), expectedJobStatus))); + .publishEvent(new JobTerminateEvent(result.getJobIdentity(), expectedJobStatus, result))); // TODO maybe we can destroy the pod there. if (result.getStatus() == TaskStatus.FAILED) {