Skip to content

Commit 77094b3

Browse files
author
jefftlin
committed
Adjust the priority of savepoint & Enhance create job client
1 parent f2c0018 commit 77094b3

2 files changed

Lines changed: 2 additions & 13 deletions

File tree

  • streamis-jobmanager
    • streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/client/factory
    • streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/client/factory/AbstractJobClientFactory.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,7 @@ class AbstractJobClientFactory extends Logging {
3838
val client = getJobClientFactory(clientType)
3939
.createJobClient(onceJob, jobInfo, jobStateManager)
4040
.asInstanceOf[JobClient[LinkisJobInfo]]
41-
Utils.tryThrow {
42-
Utils.waitUntil(() => {
43-
client.getJobInfo(true).asInstanceOf[EngineConnJobInfo].getApplicationId != null
44-
}, Duration(10, TimeUnit.SECONDS), 100, 1000)
45-
client
46-
} {
47-
case t: TimeoutException => {
48-
logger.warn("Timeout to launch job, cannot get applicationId after deployment")
49-
// Downgraded to yarn call
50-
null
51-
}
52-
}
41+
client
5342
}
5443

5544
/**

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkSavepointConfigTransform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class FlinkSavepointConfigTransform extends FlinkConfigTransform {
2626
case (key, value) =>
2727
(FlinkConfigTransform.FLINK_CONFIG_PREFIX + key.replace(JobConfKeyConstants.SAVEPOINT.getValue, SAVE_POINT_PREFIX), value)
2828
}.asJava
29-
Option(config.get(SAVE_POINT_PREFIX + "path")) match {
29+
Option(config.get(FlinkConfigTransform.FLINK_CONFIG_PREFIX + SAVE_POINT_PREFIX + "path")) match {
3030
case Some(path) =>
3131
config.put(VAR_FLINK_SAVEPOINT_PATH.getValue, path)
3232
case _ =>

0 commit comments

Comments
 (0)