Skip to content

Commit 3033442

Browse files
committed
fix: 去除分支节点默认选项#AI Commit#
1 parent b244e73 commit 3033442

6 files changed

Lines changed: 50 additions & 40 deletions

File tree

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/LinkisJobConverter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private void putDecisionRules(LinkisJob job, String branchRuleText) {
168168

169169
private List<DecisionRule> parseDecisionRules(String branchRuleText) {
170170
List<DecisionRule> rules = new ArrayList<>();
171-
for (String ruleText : branchRuleText.split(";")) {
171+
for (String ruleText : branchRuleText.split("[\\r\\n;]+")) {
172172
if (StringUtils.isBlank(ruleText)) {
173173
continue;
174174
}
@@ -181,14 +181,21 @@ private List<DecisionRule> parseDecisionRules(String branchRuleText) {
181181
if (StringUtils.isBlank(condition) || StringUtils.isBlank(targetJobName)) {
182182
continue;
183183
}
184-
if ("default".equalsIgnoreCase(condition)) {
185-
condition = "true";
184+
if (isUnsupportedDefaultKeyword(condition)) {
185+
continue;
186186
}
187187
rules.add(new DecisionRule(condition, targetJobName));
188188
}
189189
return rules;
190190
}
191191

192+
private boolean isUnsupportedDefaultKeyword(String condition) {
193+
if (StringUtils.isBlank(condition)) {
194+
return false;
195+
}
196+
String normalized = condition.trim().toLowerCase();
197+
return "default".equals(normalized) || "else".equals(normalized) || "*".equals(normalized);
198+
}
192199
private void putBranchConf(LinkisJob job, Map<String, Object> params, String key) {
193200
Object value = params.get(key);
194201
if (value != null) {

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/BranchNodeRunner.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,23 +133,13 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner with Logging {
133133
}
134134
}
135135
rules.find { rule =>
136-
if (BranchExpressionUtils.isDefaultRule(rule)) {
137-
false
138-
} else {
139-
val matched = BranchExpressionUtils.evaluateCondition(rule.condition, context)
140-
logInfo(s"Branch node ${node.getName} rule evaluated: ${rule.condition} => ${rule.targetName}, matched=$matched")
141-
matched
142-
}
136+
val matched = BranchExpressionUtils.evaluateCondition(rule.condition, context)
137+
logInfo(s"Branch node ${node.getName} rule evaluated: ${rule.condition} => ${rule.targetName}, matched=$matched")
138+
matched
143139
}.flatMap { rule =>
144140
val target = matchEdge(rule.targetName)
145141
logInfo(s"Branch node ${node.getName} matched rule target lookup: ${rule.targetName}, found=${target.isDefined}")
146142
target
147-
}.orElse {
148-
rules.find(BranchExpressionUtils.isDefaultRule).flatMap { rule =>
149-
val target = matchEdge(rule.targetName)
150-
logInfo(s"Branch node ${node.getName} use default rule target lookup: ${rule.targetName}, found=${target.isDefined}")
151-
target
152-
}
153143
}
154144
}
155145

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/utils/BranchExpressionUtils.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ object BranchExpressionUtils extends Logging {
2929

3030
val BranchNodeType = "workflow.branch"
3131
val BranchRuleKey = "branch.rules"
32-
val DefaultRuleValues = Set("default", "else", "*")
3332

3433
case class BranchRule(condition: String, targetName: String)
3534

@@ -92,16 +91,14 @@ object BranchExpressionUtils extends Logging {
9291
} else {
9392
val condition = line.substring(0, separatorIndex).trim
9493
val targetName = line.substring(separatorIndex + 1).trim
95-
if (condition.nonEmpty && targetName.nonEmpty) Some(BranchRule(condition, targetName))
94+
if (condition.nonEmpty && targetName.nonEmpty && !isUnsupportedDefaultKeyword(condition)) Some(BranchRule(condition, targetName))
9695
else {
9796
warn(s"Invalid branch rule syntax: $line")
9897
None
9998
}
10099
}
101100
}
102101

103-
def isDefaultRule(rule: BranchRule): Boolean = DefaultRuleValues.contains(Option(rule.condition).map(_.trim.toLowerCase).getOrElse(""))
104-
105102
def evaluateCondition(condition: String, context: Map[String, String]): Boolean = {
106103
val normalized = Option(condition).map(_.trim).getOrElse("")
107104
if (normalized.isEmpty) {
@@ -110,6 +107,7 @@ object BranchExpressionUtils extends Logging {
110107
val expr = stripExpressionWrapper(normalized)
111108
if (expr.equalsIgnoreCase("true")) return true
112109
if (expr.equalsIgnoreCase("false")) return false
110+
if (isUnsupportedDefaultKeyword(expr)) return false
113111
val operators = Seq("==", "!=", ">=", "<=", ">", "<")
114112
operators.collectFirst {
115113
case operator if expr.contains(operator) =>
@@ -173,6 +171,10 @@ object BranchExpressionUtils extends Logging {
173171
}
174172

175173
private def getStringValue(value: Any): Option[String] = Option(value).map(_.toString.trim).filter(_.nonEmpty)
174+
private def isUnsupportedDefaultKeyword(condition: String): Boolean = {
175+
val normalized = Option(condition).map(_.trim.toLowerCase).getOrElse("")
176+
normalized == "default" || normalized == "else" || normalized == "*"
177+
}
176178

177179
private def compare(left: String, right: String, operator: String): Boolean = {
178180
(toBigDecimal(left), toBigDecimal(right)) match {
@@ -205,3 +207,4 @@ object BranchExpressionUtils extends Logging {
205207
}
206208
}
207209

210+

dss-orchestrator/orchestrators/dss-workflow/dss-workflow-server/src/main/java/com/webank/wedatasphere/dss/workflow/service/impl/DSSFlowServiceImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -588,14 +588,27 @@ private List<String[]> parseBranchRules(String branchRuleText) {
588588
if (StringUtils.isBlank(rawRule)) {
589589
continue;
590590
}
591-
String[] parts = rawRule.split("=", 2);
592-
if (parts.length == 2 && StringUtils.isNotBlank(parts[0]) && StringUtils.isNotBlank(parts[1])) {
593-
branchRules.add(new String[]{parts[0].trim(), parts[1].trim()});
591+
int separatorIndex = rawRule.lastIndexOf('=');
592+
if (separatorIndex <= 0 || separatorIndex >= rawRule.length() - 1) {
593+
continue;
594+
}
595+
String condition = rawRule.substring(0, separatorIndex).trim();
596+
String targetName = rawRule.substring(separatorIndex + 1).trim();
597+
if (StringUtils.isNotBlank(condition) && StringUtils.isNotBlank(targetName) && !isUnsupportedDefaultKeyword(condition)) {
598+
branchRules.add(new String[]{condition, targetName});
594599
}
595600
}
596601
return branchRules;
597602
}
598603

604+
605+
private boolean isUnsupportedDefaultKeyword(String condition) {
606+
if (StringUtils.isBlank(condition)) {
607+
return false;
608+
}
609+
String normalized = condition.trim().toLowerCase();
610+
return "default".equals(normalized) || "else".equals(normalized) || "*".equals(normalized);
611+
}
599612
private boolean parseEdgeDefault(JsonObject edge) {
600613
if (edge == null || !edge.has("isDefault") || edge.get("isDefault").isJsonNull()) {
601614
return false;
@@ -4106,3 +4119,4 @@ private void handleWhiteNodeParams(EditFlowRequest editFlowRequest,DSSOrchestrat
41064119

41074120

41084121

4122+

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/BranchExpressionUtils.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ public static List<BranchRule> parseBranchRules(String raw) {
3030
return rules;
3131
}
3232

33-
public static boolean isDefaultRule(BranchRule rule) {
34-
return rule != null && DEFAULT_RULE_VALUES.contains(normalize(rule.getCondition()));
35-
}
3633

3734
public static boolean evaluateCondition(String condition, Map<String, String> context) {
3835
String normalized = trimToEmpty(condition);
@@ -46,6 +43,9 @@ public static boolean evaluateCondition(String condition, Map<String, String> co
4643
if ("false".equalsIgnoreCase(expr)) {
4744
return false;
4845
}
46+
if (isUnsupportedDefaultKeyword(expr)) {
47+
return false;
48+
}
4949
String[] operators = new String[]{"==", "!=", ">=", "<=", ">", "<"};
5050
for (String operator : operators) {
5151
int index = expr.indexOf(operator);
@@ -74,7 +74,7 @@ private static BranchRule parseBranchRule(String line) {
7474
}
7575
String condition = line.substring(0, separatorIndex).trim();
7676
String targetName = line.substring(separatorIndex + 1).trim();
77-
if (condition.isEmpty() || targetName.isEmpty()) {
77+
if (condition.isEmpty() || targetName.isEmpty() || isUnsupportedDefaultKeyword(condition)) {
7878
return null;
7979
}
8080
return new BranchRule(condition, targetName);
@@ -167,6 +167,13 @@ private static boolean isLiteralToken(String token) {
167167
return "true".equalsIgnoreCase(token) || "false".equalsIgnoreCase(token) || toBigDecimal(token) != null;
168168
}
169169

170+
private static boolean isUnsupportedDefaultKeyword(String condition) {
171+
if (isBlank(condition)) {
172+
return false;
173+
}
174+
String normalized = trimToEmpty(condition).toLowerCase();
175+
return "default".equals(normalized) || "else".equals(normalized) || "*".equals(normalized);
176+
}
170177
private static boolean isBlank(String value) {
171178
return value == null || value.trim().isEmpty();
172179
}
@@ -175,9 +182,6 @@ private static String trimToEmpty(String value) {
175182
return value == null ? "" : value.trim();
176183
}
177184

178-
private static String normalize(String value) {
179-
return trimToEmpty(value).toLowerCase();
180-
}
181185

182186
public static class BranchRule {
183187
private final String condition;

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/BranchRouteExecutor.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,22 +88,14 @@ private void mergeMissingVariables(Map<String, String> context, Map<String, Stri
8888
private String selectTarget(String branchRuleText, Map<String, String> targetNameToId, Map<String, String> context) {
8989
List<BranchExpressionUtils.BranchRule> rules = BranchExpressionUtils.parseBranchRules(branchRuleText);
9090
for (BranchExpressionUtils.BranchRule rule : rules) {
91-
if (!BranchExpressionUtils.isDefaultRule(rule) && BranchExpressionUtils.evaluateCondition(rule.getCondition(), context)) {
91+
if (BranchExpressionUtils.evaluateCondition(rule.getCondition(), context)) {
9292
String targetId = targetNameToId.get(rule.getTargetName());
9393
if (!isBlank(targetId)) {
9494
return targetId;
9595
}
9696
}
9797
}
98-
for (BranchExpressionUtils.BranchRule rule : rules) {
99-
if (BranchExpressionUtils.isDefaultRule(rule)) {
100-
String targetId = targetNameToId.get(rule.getTargetName());
101-
if (!isBlank(targetId)) {
102-
return targetId;
103-
}
104-
}
105-
}
106-
throw new IllegalStateException("No branch rule matched and no default rule was resolved.");
98+
throw new IllegalStateException("No branch rule matched.");
10799
}
108100

109101
private List<Map<String, Object>> parseTargetDefinitions(String json) throws Exception {

0 commit comments

Comments
 (0)