Skip to content

Commit 136d56b

Browse files
authored
[FixBug] Fix Exception on Convert.toInt of YarnGateway to track the new JobManager address in flink1.20 (#4382)
1 parent 78ae407 commit 136d56b

8 files changed

Lines changed: 61 additions & 2 deletions

File tree

dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.table.api.TableResult;
2527
import org.apache.flink.table.catalog.CatalogManager;
2628
import org.apache.flink.table.catalog.Column;
@@ -31,6 +33,8 @@
3133
import java.util.Optional;
3234
import java.util.concurrent.ExecutionException;
3335

36+
import cn.hutool.core.convert.Convert;
37+
3438
/**
3539
* FlinkUtil
3640
*
@@ -93,4 +97,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
9397
.get()
9498
.toString();
9599
}
100+
101+
public static int getZookeeperSessionTimeout(Configuration configuration) {
102+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
103+
}
96104
}

dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
96+
}
8997
}

dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
96+
}
8997
}

dinky-client/dinky-client-1.17/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
96+
}
8997
}

dinky-client/dinky-client-1.18/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
96+
}
8997
}

dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,8 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
96+
}
8997
}

dinky-client/dinky-client-1.20/src/main/java/org/dinky/utils/FlinkUtil.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.client.program.ClusterClient;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.HighAvailabilityOptions;
2426
import org.apache.flink.core.execution.SavepointFormatType;
2527
import org.apache.flink.table.api.TableResult;
2628
import org.apache.flink.table.catalog.CatalogManager;
@@ -33,6 +35,8 @@
3335
import java.util.Optional;
3436
import java.util.concurrent.ExecutionException;
3537

38+
import cn.hutool.core.convert.Convert;
39+
3640
/**
3741
* FlinkUtil
3842
*
@@ -86,4 +90,11 @@ public static String cancelWithSavepoint(ClusterClient clusterClient, String job
8690
.get()
8791
.toString();
8892
}
93+
94+
public static int getZookeeperSessionTimeout(Configuration configuration) {
95+
return Convert.toInt(configuration
96+
.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT)
97+
.getSeconds()
98+
* 1000);
99+
}
89100
}

dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.dinky.gateway.result.YarnResult;
4040
import org.dinky.gateway.utils.RequestKerberosUrlUtils;
4141
import org.dinky.utils.FlinkJsonUtil;
42+
import org.dinky.utils.FlinkUtil;
4243
import org.dinky.utils.ThreadUtil;
4344

4445
import org.apache.commons.lang3.StringUtils;
@@ -94,7 +95,6 @@
9495

9596
import cn.hutool.core.collection.CollUtil;
9697
import cn.hutool.core.collection.CollectionUtil;
97-
import cn.hutool.core.convert.Convert;
9898
import cn.hutool.core.io.FileUtil;
9999
import cn.hutool.core.lang.Assert;
100100
import cn.hutool.core.util.ReUtil;
@@ -513,7 +513,7 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
513513
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key()
514514
+ "'.");
515515
}
516-
int sessionTimeout = Convert.toInt(configuration.get(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT));
516+
int sessionTimeout = FlinkUtil.getZookeeperSessionTimeout(configuration);
517517
String root = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
518518
String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
519519

0 commit comments

Comments
 (0)