Skip to content

Commit 9c8e9b3

Browse files
committed
Append the exception analyzer.
1 parent 77094b3 commit 9c8e9b3

1 file changed

Lines changed: 25 additions & 5 deletions

File tree

  • streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLaunc
2222
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.exception.FlinkJobLaunchErrorException
2323
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.jobInfo.LinkisJobInfo
2424
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.client.factory.AbstractJobClientFactory
25+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.FlinkJobLaunchManager.EXCEPTION_PATTERN
2526
import org.apache.linkis.common.utils.{Logging, Utils}
2627
import org.apache.linkis.computation.client.once.{OnceJob, SubmittableOnceJob}
2728
import org.apache.linkis.computation.client.utils.LabelKeyUtils
2829
import org.apache.linkis.protocol.utils.TaskUtils
2930

31+
import scala.util.matching.Regex
32+
3033

3134
trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
3235

@@ -65,9 +68,9 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
6568
job.getLabels.get(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY) match {
6669
case engineConnType: String =>
6770
if(!engineConnType.toLowerCase.startsWith(FlinkJobLaunchManager.FLINK_ENGINE_CONN_TYPE))
68-
throw new FlinkJobLaunchErrorException(30401, s"Only ${FlinkJobLaunchManager.FLINK_ENGINE_CONN_TYPE} job is supported to be launched to Linkis, but $engineConnType is found.", null)
71+
throw new FlinkJobLaunchErrorException(30401, s"Only ${FlinkJobLaunchManager.FLINK_ENGINE_CONN_TYPE} job is supported to be launched to Linkis, but $engineConnType is found.(不识别的引擎类型)", null)
6972
//todo add flink and spark
70-
case _ => throw new FlinkJobLaunchErrorException(30401, s"Not exists ${LabelKeyUtils.ENGINE_TYPE_LABEL_KEY}, StreamisJob cannot be submitted to Linkis successfully.", null)
73+
case _ => throw new FlinkJobLaunchErrorException(30401, s"Not exists ${LabelKeyUtils.ENGINE_TYPE_LABEL_KEY}(缺少引擎标签), StreamisJob cannot be submitted to Linkis successfully.", null)
7174
}
7275
Utils.tryCatch {
7376
val onceJob = buildOnceJob(job)
@@ -77,16 +80,16 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
7780
throw e
7881
case t: Throwable =>
7982
error(s"${job.getSubmitUser} create jobInfo failed, now stop this EngineConn ${onceJob.getId}.")
80-
Utils.tryAndWarn(onceJob.kill())
81-
throw new FlinkJobLaunchErrorException(-1, "Fail to obtain launched job info", t)
83+
Utils.tryQuietly(onceJob.kill())
84+
throw new FlinkJobLaunchErrorException(-1, exceptionAnalyze("Fail to obtain launched job info(获取任务信息失败,引擎服务可能启动失败)", t), t)
8285
}
8386
val client = AbstractJobClientFactory.getJobManager().createJobClient(onceJob, jobInfo, getJobStateManager)
8487
client
8588
}{
8689
case e: FlinkJobLaunchErrorException => throw e
8790
case t: Throwable =>
8891
error(s"Server Exception in submitting Flink job [${job.getJobName}] to Linkis remote server", t)
89-
throw new FlinkJobLaunchErrorException(-1, s"Exception in submitting Flink job to Linkis remote server (提交至Linkis服务失败,请检查服务及网络)", t)
92+
throw new FlinkJobLaunchErrorException(-1, exceptionAnalyze(s"Exception in submitting Flink job to Linkis remote server (提交至Linkis服务失败,请检查服务及网络)", t), t)
9093
}
9194
}
9295

@@ -122,7 +125,24 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
122125
}
123126
}
124127

128+
/**
129+
* Exception analyzer
130+
* @param errorMsg error message
131+
* @param t throwable
132+
* @return
133+
*/
134+
def exceptionAnalyze(errorMsg: String, t: Throwable): String = {
135+
EXCEPTION_PATTERN.findFirstMatchIn(t.getMessage) match {
136+
case Some(m) =>
137+
errorMsg + s", 原因分析[${m.group(1)}]"
138+
case _ => errorMsg
139+
}
140+
}
125141
}
142+
126143
object FlinkJobLaunchManager {
127144
val FLINK_ENGINE_CONN_TYPE = "flink"
145+
146+
val EXCEPTION_PATTERN: Regex = "[\\s\\S]+,desc:([\\s\\S]+?),(ip|port|serviceKind)[\\s\\S]+$".r
147+
128148
}

0 commit comments

Comments
 (0)