Skip to content

Commit 889ce70

Browse files
committed
[Fix][Zeta] Fix NPE in JobInfoService
1 parent 7704bb7 commit 889ce70

2 files changed

Lines changed: 152 additions & 5 deletions

File tree

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,25 +61,35 @@ public JsonObject getJobInfoJson(Long jobId) {
6161
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
6262
JobInfo jobInfo = (JobInfo) jobInfoMap.get(jobId);
6363

64-
IMap<Object, Object> finishedJobStateMap =
65-
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
66-
JobState finishedJobState = (JobState) finishedJobStateMap.get(jobId);
67-
6864
if (jobInfo != null) {
6965
return convertToJson(jobInfo, jobId);
70-
} else if (finishedJobState != null) {
66+
}
67+
68+
JobState finishedJobState =
69+
(JobState)
70+
nodeEngine
71+
.getHazelcastInstance()
72+
.getMap(Constant.IMAP_FINISHED_JOB_STATE)
73+
.get(jobId);
74+
75+
if (finishedJobState != null) {
7176
JobMetrics finishedJobMetrics =
7277
(JobMetrics)
7378
nodeEngine
7479
.getHazelcastInstance()
7580
.getMap(Constant.IMAP_FINISHED_JOB_METRICS)
7681
.get(jobId);
82+
if (finishedJobMetrics == null) {
83+
finishedJobMetrics = JobMetrics.empty();
84+
}
85+
7786
JobDAGInfo finishedJobDAGInfo =
7887
(JobDAGInfo)
7988
nodeEngine
8089
.getHazelcastInstance()
8190
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
8291
.get(jobId);
92+
8393
return getJobInfoJson(
8494
finishedJobState, finishedJobMetrics.toJsonString(), finishedJobDAGInfo);
8595
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server.rest.service;
19+
20+
import org.apache.seatunnel.api.common.metrics.JobMetrics;
21+
import org.apache.seatunnel.engine.common.Constant;
22+
import org.apache.seatunnel.engine.common.job.JobStatus;
23+
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
24+
import org.apache.seatunnel.engine.server.master.JobHistoryService;
25+
import org.apache.seatunnel.engine.server.rest.RestConstant;
26+
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import com.hazelcast.core.HazelcastInstance;
32+
import com.hazelcast.internal.json.JsonObject;
33+
import com.hazelcast.map.IMap;
34+
import com.hazelcast.spi.impl.NodeEngineImpl;
35+
36+
import java.util.Collections;
37+
38+
import static org.mockito.Mockito.mock;
39+
import static org.mockito.Mockito.when;
40+
41+
class JobInfoServiceNullSafetyTest {
42+
43+
private final Long jobId = 1L;
44+
45+
private JobInfoService jobInfoService;
46+
private NodeEngineImpl nodeEngine;
47+
private HazelcastInstance hazelcastInstance;
48+
49+
private IMap<Object, Object> runningJobInfoMap;
50+
private IMap<Object, Object> finishedJobStateMap;
51+
private IMap<Object, Object> finishedJobMetricsMap;
52+
private IMap<Object, Object> finishedJobVertexInfoMap;
53+
54+
@BeforeEach
55+
void setUp() {
56+
nodeEngine = mock(NodeEngineImpl.class);
57+
hazelcastInstance = mock(HazelcastInstance.class);
58+
59+
runningJobInfoMap = mock(IMap.class);
60+
finishedJobStateMap = mock(IMap.class);
61+
finishedJobMetricsMap = mock(IMap.class);
62+
finishedJobVertexInfoMap = mock(IMap.class);
63+
64+
when(nodeEngine.getHazelcastInstance()).thenReturn(hazelcastInstance);
65+
when(hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_INFO))
66+
.thenReturn(runningJobInfoMap);
67+
when(hazelcastInstance.getMap(Constant.IMAP_FINISHED_JOB_STATE))
68+
.thenReturn(finishedJobStateMap);
69+
when(hazelcastInstance.getMap(Constant.IMAP_FINISHED_JOB_METRICS))
70+
.thenReturn(finishedJobMetricsMap);
71+
when(hazelcastInstance.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO))
72+
.thenReturn(finishedJobVertexInfoMap);
73+
74+
jobInfoService = new JobInfoService(nodeEngine);
75+
}
76+
77+
private JobHistoryService.JobState buildJobState(Long jobId, Long startTime, Long finishTime) {
78+
return new JobHistoryService.JobState(
79+
jobId,
80+
"test-job",
81+
JobStatus.FAILED,
82+
System.currentTimeMillis(),
83+
startTime,
84+
finishTime,
85+
Collections.emptyMap(),
86+
null);
87+
}
88+
89+
@Test
90+
void shouldReturnJobIdOnlyWhenFinishedMetricsIsMissing() {
91+
JobHistoryService.JobState jobState = buildJobState(jobId, 1000L, 2000L);
92+
93+
when(runningJobInfoMap.get(jobId)).thenReturn(null);
94+
when(finishedJobStateMap.get(jobId)).thenReturn(jobState);
95+
when(finishedJobMetricsMap.get(jobId)).thenReturn(null);
96+
97+
JsonObject result = jobInfoService.getJobInfoJson(jobId);
98+
99+
Assertions.assertNotNull(result);
100+
Assertions.assertEquals(jobId.toString(), result.getString(RestConstant.JOB_ID, null));
101+
}
102+
103+
@Test
104+
void shouldReturnFinishedJobInfoWhenFinishedStateAndMetricsExist() {
105+
JobHistoryService.JobState jobState = buildJobState(jobId, 1000L, 2000L);
106+
107+
JobMetrics jobMetrics = mock(JobMetrics.class);
108+
when(jobMetrics.toJsonString()).thenReturn("{}");
109+
110+
JobDAGInfo dagInfo = mock(JobDAGInfo.class);
111+
JsonObject dagJson = new JsonObject().add("key", "value");
112+
when(dagInfo.toJsonObject()).thenReturn(dagJson);
113+
114+
when(runningJobInfoMap.get(jobId)).thenReturn(null);
115+
when(finishedJobStateMap.get(jobId)).thenReturn(jobState);
116+
when(finishedJobMetricsMap.get(jobId)).thenReturn(jobMetrics);
117+
when(finishedJobVertexInfoMap.get(jobId)).thenReturn(dagInfo);
118+
119+
JsonObject result = jobInfoService.getJobInfoJson(jobId);
120+
121+
Assertions.assertNotNull(result);
122+
Assertions.assertEquals(jobId.toString(), result.getString(RestConstant.JOB_ID, null));
123+
Assertions.assertEquals(dagJson.toString(), result.get(RestConstant.JOB_DAG).toString());
124+
}
125+
126+
@Test
127+
void shouldReturnJobIdOnlyWhenFinishedStateDoesNotExist() {
128+
when(runningJobInfoMap.get(jobId)).thenReturn(null);
129+
when(finishedJobStateMap.get(jobId)).thenReturn(null);
130+
when(finishedJobMetricsMap.get(jobId)).thenReturn(null);
131+
132+
JsonObject result = jobInfoService.getJobInfoJson(jobId);
133+
134+
Assertions.assertNotNull(result);
135+
Assertions.assertEquals(jobId.toString(), result.getString(RestConstant.JOB_ID, null));
136+
}
137+
}

0 commit comments

Comments
 (0)