Skip to content

Commit 088fc1e

Browse files
author
jefftlin
committed
Synchronize linkis state to streamis state when stopping operation
1 parent 7865259 commit 088fc1e

1 file changed

Lines changed: 2 additions & 1 deletion

File tree

  • streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamTaskService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
326326
val streamJob = streamJobMapper.getJobById(finalJobId)
327327
info(s"Try to stop StreamJob [${streamJob.getName} with task(taskId: ${streamTask.getId}, linkisJobId: ${streamTask.getLinkisJobId}).")
328328
val jobClient = getJobLaunchManager(streamTask).connect(streamTask.getLinkisJobId, streamTask.getLinkisJobInfo)
329-
if ("Running".equals(jobClient.getJobInfo.getStatus)) {
329+
val status = JobConf.linkisStatusToStreamisStatus(jobClient.getJobInfo(true).getStatus)
330+
if (!JobConf.isCompleted(status)) {
330331
val jobStateInfo = Utils.tryCatch(jobClient.stop(snapshot)){
331332
case e: Exception =>
332333
val pauseError = new JobPauseErrorException(-1, s"Fail to stop the StreamJob [${streamJob.getName}] " +

0 commit comments

Comments
 (0)