Skip to content

Commit f2c0018

Browse files
author
jefftlin
committed
Adjust the priority of savepoint
1 parent 088fc1e commit f2c0018

1 file changed

Lines changed: 15 additions & 3 deletions

File tree

  • streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkSavepointConfigTransform.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl
22
import com.webank.wedatasphere.streamis.jobmanager.launcher.conf.JobConfKeyConstants
33
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.LaunchJob
4+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration.VAR_FLINK_SAVEPOINT_PATH
5+
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl.FlinkSavepointConfigTransform.SAVE_POINT_PREFIX
46

57
import java.util
68
import scala.collection.JavaConverters._
@@ -19,10 +21,20 @@ class FlinkSavepointConfigTransform extends FlinkConfigTransform {
1921
override protected def configGroup(): String = JobConfKeyConstants.GROUP_PRODUCE.getValue
2022

2123
override protected def transform(valueSet: util.Map[String, Any], job: LaunchJob): LaunchJob = {
22-
transformConfig(valueSet.asScala.filter(_._1.startsWith(JobConfKeyConstants.SAVEPOINT.getValue))
24+
val config: util.Map[String, Any] = valueSet.asScala.filter(_._1.startsWith(JobConfKeyConstants.SAVEPOINT.getValue))
2325
.map{
2426
case (key, value) =>
25-
(FlinkConfigTransform.FLINK_CONFIG_PREFIX + key.replace(JobConfKeyConstants.SAVEPOINT.getValue, "execution.savepoint."), value)
26-
}.asJava, job)
27+
(FlinkConfigTransform.FLINK_CONFIG_PREFIX + key.replace(JobConfKeyConstants.SAVEPOINT.getValue, SAVE_POINT_PREFIX), value)
28+
}.asJava
29+
Option(config.get(SAVE_POINT_PREFIX + "path")) match {
30+
case Some(path) =>
31+
config.put(VAR_FLINK_SAVEPOINT_PATH.getValue, path)
32+
case _ =>
33+
}
34+
transformConfig(config, job)
2735
}
2836
}
37+
38+
object FlinkSavepointConfigTransform{
39+
val SAVE_POINT_PREFIX: String = "execution.savepoint."
40+
}

0 commit comments

Comments
 (0)