|
41 | 41 | import org.apache.flink.kubernetes.KubernetesClusterClientFactory; |
42 | 42 | import org.apache.flink.kubernetes.KubernetesClusterDescriptor; |
43 | 43 | import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; |
44 | | -import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; |
45 | | -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; |
46 | 44 | import org.apache.flink.python.PythonOptions; |
47 | 45 |
|
48 | | -import java.lang.reflect.Method; |
| 46 | +import java.lang.reflect.InvocationTargetException; |
49 | 47 | import java.util.Collections; |
50 | 48 | import java.util.Map; |
51 | 49 | import java.util.UUID; |
|
55 | 53 | import cn.hutool.core.io.FileUtil; |
56 | 54 | import cn.hutool.core.lang.Assert; |
57 | 55 | import cn.hutool.core.text.StrFormatter; |
58 | | -import cn.hutool.core.util.ReflectUtil; |
59 | 56 | import cn.hutool.core.util.StrUtil; |
60 | 57 | import io.fabric8.kubernetes.api.model.Pod; |
61 | 58 | import lombok.Data; |
@@ -224,26 +221,37 @@ public TestResult test() { |
224 | 221 | // Test mode no jobName, use uuid . |
225 | 222 | addConfigParas(KubernetesConfigOptions.CLUSTER_ID, UUID.randomUUID().toString()); |
226 | 223 | initConfig(); |
227 | | - FlinkKubeClient client = k8sClientHelper.getClient(); |
228 | | - if (client instanceof Fabric8FlinkKubeClient) { |
229 | | - Object internalClient = ReflectUtil.getFieldValue(client, "internalClient"); |
230 | | - Method method = ReflectUtil.getMethod(internalClient.getClass(), "getVersion"); |
231 | | - Object versionInfo = method.invoke(internalClient); |
232 | | - logger.info( |
233 | | - "k8s cluster link successful ; k8s version: {} ; platform: {}", |
234 | | - ReflectUtil.getFieldValue(versionInfo, "gitVersion"), |
235 | | - ReflectUtil.getFieldValue(versionInfo, "platform")); |
236 | | - } |
| 224 | + String namespace = configuration.get(KubernetesConfigOptions.NAMESPACE); |
| 225 | + k8sClientHelper.getKubernetesClient().pods().inNamespace(namespace).list(); |
| 226 | + logger.info("k8s cluster link successful ; namespace: {}", namespace); |
237 | 227 | return TestResult.success(); |
238 | 228 | } catch (Exception e) { |
239 | 229 | logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e); |
| 230 | + String errorDetail = extractTestErrorDetail(e); |
240 | 231 | return TestResult.fail( |
241 | | - StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage())); |
| 232 | + StrFormatter.format("{} {}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), errorDetail)); |
242 | 233 | } finally { |
243 | 234 | close(); |
244 | 235 | } |
245 | 236 | } |
246 | 237 |
|
| 238 | + static String extractTestErrorDetail(Throwable throwable) { |
| 239 | + Throwable rootCause = throwable; |
| 240 | + while (rootCause instanceof InvocationTargetException |
| 241 | + && ((InvocationTargetException) rootCause).getTargetException() != null) { |
| 242 | + rootCause = ((InvocationTargetException) rootCause).getTargetException(); |
| 243 | + } |
| 244 | + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { |
| 245 | + rootCause = rootCause.getCause(); |
| 246 | + } |
| 247 | + |
| 248 | + String message = rootCause.getMessage(); |
| 249 | + if (StringUtils.isBlank(message)) { |
| 250 | + return rootCause.getClass().getName(); |
| 251 | + } |
| 252 | + return StrFormatter.format("{}: {}", rootCause.getClass().getName(), message); |
| 253 | + } |
| 254 | + |
247 | 255 | @Override |
248 | 256 | public void killCluster() { |
249 | 257 | log.info("Start kill cluster: " + config.getFlinkConfig().getJobName()); |
|
0 commit comments