Skip to content

Commit ffdab10

Browse files
committed
fix: 分支规则格式适配schedulis#AI Commit#
1 parent 3033442 commit ffdab10

5 files changed

Lines changed: 209 additions & 44 deletions

File tree

db/version-update/dss_1.21.0_update.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ INSERT INTO dss_workflow_node_to_ui (workflow_node_id,ui_id) values (
2828

2929

3030
INSERT INTO `dss_workflow_node_ui`(`key`,`description`,`description_en`,`lable_name`,`lable_name_en`,`ui_type`,`required`,`value`,`default_value`,`is_hidden`,`condition`,`is_advanced`,`order`,`node_menu_type`,`is_base_info`,`position`)
31-
values ('branch.rules','请填写分支规则,每行一条,格式如amount>100=节点A','Please enter one branch rule per line, such as amount>100=NodeA','分支规则','Branch rules','Text',1,NULL,NULL,0,NULL,0,2,1,0,'special');
31+
values ('branch.rules','请填写分支规则,格式如condition.1=amount>100;on.success.1=节点A;on.failure.1=节点B','Please enter branch rules, such as condition.1=amount>100;on.success.1=NodeA;on.failure.1=NodeB','分支规则','Branch rules','Text',1,NULL,NULL,0,NULL,0,2,1,0,'special');
3232

3333
insert into `dss_workflow_node_to_ui`(`workflow_node_id`,`ui_id`) values (
3434
(select id from dss_workflow_node where node_type = 'workflow.branch' limit 1),

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/LinkisJobConverter.java

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,33 +162,75 @@ private void putDecisionRules(LinkisJob job, String branchRuleText) {
162162
int index = i + 1;
163163
job.getConf().put(BranchSchedulisConstant.DECISION_CONDITION_PREFIX + index, rule.condition);
164164
job.getConf().put(BranchSchedulisConstant.DECISION_ON_SUCCESS_PREFIX + index, rule.targetJobName);
165-
job.getConf().put(BranchSchedulisConstant.DECISION_ON_FAILURE_PREFIX + index, "");
165+
job.getConf().put(BranchSchedulisConstant.DECISION_ON_FAILURE_PREFIX + index, rule.onFailure);
166166
}
167167
}
168168

169169
private List<DecisionRule> parseDecisionRules(String branchRuleText) {
170-
List<DecisionRule> rules = new ArrayList<>();
170+
Map<Integer, DecisionRuleBuilder> decisionRuleMap = new java.util.TreeMap<>();
171171
for (String ruleText : branchRuleText.split("[\\r\\n;]+")) {
172172
if (StringUtils.isBlank(ruleText)) {
173173
continue;
174174
}
175-
int separatorIndex = ruleText.lastIndexOf('=');
176-
if (separatorIndex <= 0 || separatorIndex >= ruleText.length() - 1) {
175+
int separatorIndex = ruleText.indexOf('=');
176+
if (separatorIndex <= 0) {
177177
continue;
178178
}
179-
String condition = ruleText.substring(0, separatorIndex).trim();
180-
String targetJobName = ruleText.substring(separatorIndex + 1).trim();
181-
if (StringUtils.isBlank(condition) || StringUtils.isBlank(targetJobName)) {
179+
String key = ruleText.substring(0, separatorIndex).trim();
180+
String value = ruleText.substring(separatorIndex + 1);
181+
IndexedRuleKey indexedRuleKey = parseIndexedRuleKey(key);
182+
if (indexedRuleKey == null) {
182183
continue;
183184
}
184-
if (isUnsupportedDefaultKeyword(condition)) {
185+
DecisionRuleBuilder builder = decisionRuleMap.computeIfAbsent(indexedRuleKey.index, ignored -> new DecisionRuleBuilder());
186+
if ("condition".equals(indexedRuleKey.ruleType)) {
187+
builder.condition = value.trim();
188+
} else if ("on.success".equals(indexedRuleKey.ruleType)) {
189+
builder.targetJobName = value.trim();
190+
} else if ("on.failure".equals(indexedRuleKey.ruleType)) {
191+
builder.onFailure = value == null ? "" : value.trim();
192+
}
193+
}
194+
List<DecisionRule> rules = new ArrayList<>();
195+
for (DecisionRuleBuilder builder : decisionRuleMap.values()) {
196+
if (StringUtils.isBlank(builder.condition) || StringUtils.isBlank(builder.targetJobName)) {
185197
continue;
186198
}
187-
rules.add(new DecisionRule(condition, targetJobName));
199+
if (isUnsupportedDefaultKeyword(builder.condition)) {
200+
continue;
201+
}
202+
rules.add(new DecisionRule(builder.condition, builder.targetJobName, StringUtils.defaultString(builder.onFailure)));
188203
}
189204
return rules;
190205
}
191206

207+
private IndexedRuleKey parseIndexedRuleKey(String key) {
208+
if (StringUtils.isBlank(key)) {
209+
return null;
210+
}
211+
if (key.startsWith("condition.")) {
212+
return buildIndexedRuleKey("condition", key.substring("condition.".length()));
213+
}
214+
if (key.startsWith("on.success.")) {
215+
return buildIndexedRuleKey("on.success", key.substring("on.success.".length()));
216+
}
217+
if (key.startsWith("on.failure.")) {
218+
return buildIndexedRuleKey("on.failure", key.substring("on.failure.".length()));
219+
}
220+
return null;
221+
}
222+
223+
private IndexedRuleKey buildIndexedRuleKey(String ruleType, String rawIndex) {
224+
if (StringUtils.isBlank(rawIndex)) {
225+
return null;
226+
}
227+
try {
228+
return new IndexedRuleKey(ruleType, Integer.parseInt(rawIndex.trim()));
229+
} catch (NumberFormatException ignored) {
230+
return null;
231+
}
232+
}
233+
192234
private boolean isUnsupportedDefaultKeyword(String condition) {
193235
if (StringUtils.isBlank(condition)) {
194236
return false;
@@ -228,10 +270,28 @@ private void convertJobCommand(WorkflowNode workflowNode, LinkisJob job){
228270
private static class DecisionRule {
229271
private final String condition;
230272
private final String targetJobName;
273+
private final String onFailure;
231274

232-
private DecisionRule(String condition, String targetJobName) {
275+
private DecisionRule(String condition, String targetJobName, String onFailure) {
233276
this.condition = condition;
234277
this.targetJobName = targetJobName;
278+
this.onFailure = onFailure;
279+
}
280+
}
281+
282+
private static class DecisionRuleBuilder {
283+
private String condition;
284+
private String targetJobName;
285+
private String onFailure;
286+
}
287+
288+
private static class IndexedRuleKey {
289+
private final String ruleType;
290+
private final int index;
291+
292+
private IndexedRuleKey(String ruleType, int index) {
293+
this.ruleType = ruleType;
294+
this.index = index;
235295
}
236296
}
237297
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/BranchNodeRunner.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner with Logging {
9999
throw new IllegalStateException(s"Branch node ${node.getName} must define branch.rules.")
100100
}
101101
val parsedRules = BranchExpressionUtils.parseBranchRules(branchRuleText)
102-
logInfo(s"Branch node ${node.getName} parsed rules: ${parsedRules.map(rule => s"${rule.condition}=>${rule.targetName}").mkString(", ")}")
102+
logInfo(s"Branch node ${node.getName} parsed rules: ${parsedRules.map(rule => s"${rule.condition}=>success=${rule.targetName.getOrElse("")},failure=${rule.onFailureTarget.getOrElse("")}").mkString(", ")}")
103103
val selectedEdge = selectEdgeByRules(outgoingEdges, parsedRules, context)
104104
if (selectedEdge.isEmpty) {
105105
throw new IllegalStateException(s"No branch rule matched for node ${node.getName}.")
@@ -132,15 +132,16 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner with Logging {
132132
}
133133
}
134134
}
135-
rules.find { rule =>
135+
rules.iterator.map { rule =>
136136
val matched = BranchExpressionUtils.evaluateCondition(rule.condition, context)
137-
logInfo(s"Branch node ${node.getName} rule evaluated: ${rule.condition} => ${rule.targetName}, matched=$matched")
138-
matched
139-
}.flatMap { rule =>
140-
val target = matchEdge(rule.targetName)
141-
logInfo(s"Branch node ${node.getName} matched rule target lookup: ${rule.targetName}, found=${target.isDefined}")
142-
target
143-
}
137+
val selectedTargetName = if (matched) rule.targetName else rule.onFailureTarget
138+
logInfo(s"Branch node ${node.getName} rule evaluated: ${rule.condition}, matched=$matched, selectedTarget=${selectedTargetName.getOrElse("")}")
139+
selectedTargetName.flatMap { targetName =>
140+
val target = matchEdge(targetName)
141+
logInfo(s"Branch node ${node.getName} target lookup: ${targetName}, found=${target.isDefined}")
142+
target
143+
}
144+
}.find(_.isDefined).flatten
144145
}
145146

146147
private def describeEdges(edges: Seq[DSSEdge]): String = {

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/utils/BranchExpressionUtils.scala

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@ import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode
2424
import org.apache.linkis.common.utils.Logging
2525

2626
import scala.collection.JavaConversions._
27+
import scala.collection.mutable
2728

2829
object BranchExpressionUtils extends Logging {
2930

3031
val BranchNodeType = "workflow.branch"
3132
val BranchRuleKey = "branch.rules"
33+
private val ConditionPrefix = "condition."
34+
private val OnSuccessPrefix = "on.success."
35+
private val OnFailurePrefix = "on.failure."
3236

33-
case class BranchRule(condition: String, targetName: String)
37+
case class BranchRule(condition: String, targetName: Option[String], onFailureTarget: Option[String])
38+
private case class IndexedBranchRule(condition: Option[String] = None, targetName: Option[String] = None, onFailure: Option[String] = None)
3439

3540
def isBranchNode(node: WorkflowNode): Boolean = node != null && BranchNodeType.equalsIgnoreCase(node.getNodeType)
3641

@@ -77,28 +82,68 @@ object BranchExpressionUtils extends Logging {
7782
}
7883

7984
def parseBranchRules(raw: String): Seq[BranchRule] = {
85+
val indexedRules = mutable.LinkedHashMap[Int, IndexedBranchRule]()
8086
Option(raw).map(_.split("[\\r\\n;]+").toSeq).getOrElse(Seq.empty)
8187
.map(_.trim)
8288
.filter(_.nonEmpty)
83-
.flatMap(parseBranchRule)
89+
.foreach { line =>
90+
parseBranchRuleEntry(line).foreach { case (ruleType, index, ruleValue) =>
91+
val current = indexedRules.getOrElse(index, IndexedBranchRule())
92+
val updated = ruleType match {
93+
case ConditionPrefix => current.copy(condition = Some(ruleValue))
94+
case OnSuccessPrefix => current.copy(targetName = Some(ruleValue))
95+
case OnFailurePrefix => current.copy(onFailure = Some(ruleValue))
96+
}
97+
indexedRules.put(index, updated)
98+
}
99+
}
100+
indexedRules.toSeq.sortBy(_._1).flatMap { case (_, rule) =>
101+
(rule.condition.map(_.trim), rule.targetName.map(_.trim).filter(_.nonEmpty), rule.onFailure.map(_.trim).filter(_.nonEmpty)) match {
102+
case (Some(condition), targetName, onFailureTarget)
103+
if condition.nonEmpty && !isUnsupportedDefaultKeyword(condition) && (targetName.isDefined || onFailureTarget.isDefined) =>
104+
Some(BranchRule(condition, targetName, onFailureTarget))
105+
case _ => None
106+
}
107+
}
84108
}
85109

86-
private def parseBranchRule(line: String): Option[BranchRule] = {
87-
val separatorIndex = Option(line).map(_.lastIndexOf('=')).getOrElse(-1)
88-
if (separatorIndex <= 0 || separatorIndex >= line.length - 1) {
110+
private def parseBranchRuleEntry(line: String): Option[(String, Int, String)] = {
111+
val separatorIndex = Option(line).map(_.indexOf('=')).getOrElse(-1)
112+
if (separatorIndex <= 0) {
89113
warn(s"Invalid branch rule syntax: $line")
90114
None
91115
} else {
92-
val condition = line.substring(0, separatorIndex).trim
93-
val targetName = line.substring(separatorIndex + 1).trim
94-
if (condition.nonEmpty && targetName.nonEmpty && !isUnsupportedDefaultKeyword(condition)) Some(BranchRule(condition, targetName))
95-
else {
96-
warn(s"Invalid branch rule syntax: $line")
97-
None
116+
val key = line.substring(0, separatorIndex).trim
117+
val value = line.substring(separatorIndex + 1)
118+
parseIndexedRuleKey(key) match {
119+
case Some((ruleType, index)) => Some((ruleType, index, value))
120+
case None =>
121+
warn(s"Invalid branch rule syntax: $line")
122+
None
98123
}
99124
}
100125
}
101126

127+
private def parseIndexedRuleKey(key: String): Option[(String, Int)] = {
128+
if (key.startsWith(ConditionPrefix)) {
129+
parseRuleIndex(key.substring(ConditionPrefix.length)).map(index => (ConditionPrefix, index))
130+
} else if (key.startsWith(OnSuccessPrefix)) {
131+
parseRuleIndex(key.substring(OnSuccessPrefix.length)).map(index => (OnSuccessPrefix, index))
132+
} else if (key.startsWith(OnFailurePrefix)) {
133+
parseRuleIndex(key.substring(OnFailurePrefix.length)).map(index => (OnFailurePrefix, index))
134+
} else {
135+
None
136+
}
137+
}
138+
139+
private def parseRuleIndex(rawIndex: String): Option[Int] = {
140+
try {
141+
Some(rawIndex.trim.toInt)
142+
} catch {
143+
case _: Throwable => None
144+
}
145+
}
146+
102147
def evaluateCondition(condition: String, context: Map[String, String]): Boolean = {
103148
val normalized = Option(condition).map(_.trim).getOrElse("")
104149
if (normalized.isEmpty) {

dss-orchestrator/orchestrators/dss-workflow/dss-workflow-server/src/main/java/com/webank/wedatasphere/dss/workflow/service/impl/DSSFlowServiceImpl.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -543,14 +543,16 @@ private void validateBranchNodeConfig(String jsonFlow) throws DSSErrorException
543543
if (StringUtils.isBlank(branchRuleText)) {
544544
throw new DSSErrorException(80001, "Branch node [" + branchNodeName + "] must define branch rules on the node properties.");
545545
}
546-
List<String[]> branchRules = parseBranchRules(branchRuleText);
546+
List<BranchRuleHolder> branchRules = parseBranchRules(branchRuleText);
547547
if (branchRules.isEmpty()) {
548548
throw new DSSErrorException(80001, "Branch node [" + branchNodeName + "] has invalid branch rules.");
549549
}
550-
for (String[] branchRule : branchRules) {
551-
String targetName = branchRule[1];
552-
if (!targetNames.contains(targetName) && !targetIds.contains(targetName)) {
553-
throw new DSSErrorException(80001, "Branch node [" + branchNodeName + "] references a non-outgoing target node [" + targetName + "].");
550+
for (BranchRuleHolder branchRule : branchRules) {
551+
if (StringUtils.isNotBlank(branchRule.targetName) && !targetNames.contains(branchRule.targetName) && !targetIds.contains(branchRule.targetName)) {
552+
throw new DSSErrorException(80001, "Branch node [" + branchNodeName + "] references a non-outgoing target node [" + branchRule.targetName + "].");
553+
}
554+
if (StringUtils.isNotBlank(branchRule.onFailure) && !targetNames.contains(branchRule.onFailure) && !targetIds.contains(branchRule.onFailure)) {
555+
throw new DSSErrorException(80001, "Branch node [" + branchNodeName + "] references a non-outgoing failure target node [" + branchRule.onFailure + "].");
554556
}
555557
}
556558
}
@@ -578,8 +580,9 @@ private String getBranchRuleText(JsonObject branchNode) {
578580
return special.get("branch.rules").getAsString();
579581
}
580582

581-
private List<String[]> parseBranchRules(String branchRuleText) {
582-
List<String[]> branchRules = new ArrayList<>();
583+
private List<BranchRuleHolder> parseBranchRules(String branchRuleText) {
584+
Map<Integer, BranchRuleHolder> branchRuleMap = new java.util.TreeMap<>();
585+
List<BranchRuleHolder> branchRules = new ArrayList<>();
583586
if (StringUtils.isBlank(branchRuleText)) {
584587
return branchRules;
585588
}
@@ -588,19 +591,59 @@ private List<String[]> parseBranchRules(String branchRuleText) {
588591
if (StringUtils.isBlank(rawRule)) {
589592
continue;
590593
}
591-
int separatorIndex = rawRule.lastIndexOf('=');
592-
if (separatorIndex <= 0 || separatorIndex >= rawRule.length() - 1) {
594+
int separatorIndex = rawRule.indexOf('=');
595+
if (separatorIndex <= 0) {
596+
continue;
597+
}
598+
String key = rawRule.substring(0, separatorIndex).trim();
599+
String value = rawRule.substring(separatorIndex + 1);
600+
IndexedBranchRuleKey indexedRuleKey = parseIndexedBranchRuleKey(key);
601+
if (indexedRuleKey == null) {
593602
continue;
594603
}
595-
String condition = rawRule.substring(0, separatorIndex).trim();
596-
String targetName = rawRule.substring(separatorIndex + 1).trim();
597-
if (StringUtils.isNotBlank(condition) && StringUtils.isNotBlank(targetName) && !isUnsupportedDefaultKeyword(condition)) {
598-
branchRules.add(new String[]{condition, targetName});
604+
BranchRuleHolder holder = branchRuleMap.computeIfAbsent(indexedRuleKey.index, ignored -> new BranchRuleHolder());
605+
if ("condition".equals(indexedRuleKey.ruleType)) {
606+
holder.condition = value == null ? null : value.trim();
607+
} else if ("on.success".equals(indexedRuleKey.ruleType)) {
608+
holder.targetName = value == null ? null : value.trim();
609+
} else if ("on.failure".equals(indexedRuleKey.ruleType)) {
610+
holder.onFailure = value == null ? "" : value.trim();
611+
}
612+
}
613+
for (BranchRuleHolder holder : branchRuleMap.values()) {
614+
if (StringUtils.isNotBlank(holder.condition) && !isUnsupportedDefaultKeyword(holder.condition) && (StringUtils.isNotBlank(holder.targetName) || StringUtils.isNotBlank(holder.onFailure))) {
615+
branchRules.add(holder);
599616
}
600617
}
601618
return branchRules;
602619
}
603620

621+
private IndexedBranchRuleKey parseIndexedBranchRuleKey(String key) {
622+
if (StringUtils.isBlank(key)) {
623+
return null;
624+
}
625+
if (key.startsWith("condition.")) {
626+
return buildIndexedBranchRuleKey("condition", key.substring("condition.".length()));
627+
}
628+
if (key.startsWith("on.success.")) {
629+
return buildIndexedBranchRuleKey("on.success", key.substring("on.success.".length()));
630+
}
631+
if (key.startsWith("on.failure.")) {
632+
return buildIndexedBranchRuleKey("on.failure", key.substring("on.failure.".length()));
633+
}
634+
return null;
635+
}
636+
637+
private IndexedBranchRuleKey buildIndexedBranchRuleKey(String ruleType, String rawIndex) {
638+
if (StringUtils.isBlank(rawIndex)) {
639+
return null;
640+
}
641+
try {
642+
return new IndexedBranchRuleKey(ruleType, Integer.parseInt(rawIndex.trim()));
643+
} catch (NumberFormatException ignored) {
644+
return null;
645+
}
646+
}
604647

605648
private boolean isUnsupportedDefaultKeyword(String condition) {
606649
if (StringUtils.isBlank(condition)) {
@@ -609,6 +652,22 @@ private boolean isUnsupportedDefaultKeyword(String condition) {
609652
String normalized = condition.trim().toLowerCase();
610653
return "default".equals(normalized) || "else".equals(normalized) || "*".equals(normalized);
611654
}
655+
656+
private static class BranchRuleHolder {
657+
private String condition;
658+
private String targetName;
659+
private String onFailure;
660+
}
661+
662+
private static class IndexedBranchRuleKey {
663+
private final String ruleType;
664+
private final int index;
665+
666+
private IndexedBranchRuleKey(String ruleType, int index) {
667+
this.ruleType = ruleType;
668+
this.index = index;
669+
}
670+
}
612671
private boolean parseEdgeDefault(JsonObject edge) {
613672
if (edge == null || !edge.has("isDefault") || edge.get("isDefault").isJsonNull()) {
614673
return false;

0 commit comments

Comments
 (0)