Skip to content

Commit f5979f8

Browse files
committed
Merge branch 'dev-1.20.0' of https://github.com/WeDataSphere/DataSphereStudio into dev-1.20.0
2 parents 8a30ba5 + f6b5ff8 commit f5979f8

4 files changed

Lines changed: 83 additions & 14 deletions

File tree

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ class FlowEntranceJob(persistManager: PersistenceManager) extends EntranceExecut
160160
}
161161
}
162162
}
163-
info(s"Collected output variables from node($nodeName): ${outputVariables.keySet().mkString(",")}")
163+
info(s"Collected output variables from node($nodeName): ${outputVariables.toSeq.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(", ")}")
164+
info(s"Current flow variables after node($nodeName): ${this.flowVariables.toSeq.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(", ")}")
164165
}
165166
}
166167

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/parser/FlowJobNodeParser.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class FlowJobNodeParser extends FlowEntranceJobParser with Logging{
128128
val runTodayHStd = new CustomHourType(curHour, true)
129129
flowVar.put("run_today_h_std", runTodayHStd.toString())
130130
flowVar.put("run_today_hour_std", runTodayHStd.toString())
131+
info(s"Initialized flow variables for job(${flowEntranceJob.getId}): ${flowVar.toSeq.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(", ")}")
131132
}
132133
flowVar
133134
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/BranchNodeRunner.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import com.webank.wedatasphere.dss.flow.execution.entrance.node.NodeExecutionSta
2424
import com.webank.wedatasphere.dss.flow.execution.entrance.utils.BranchExpressionUtils
2525
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob
2626
import com.webank.wedatasphere.dss.workflow.core.entity.{Workflow, WorkflowNode}
27+
import org.apache.linkis.common.utils.Logging
2728

2829
import scala.collection.JavaConversions._
2930

30-
class BranchNodeRunner(flow: Workflow) extends NodeRunner {
31+
class BranchNodeRunner(flow: Workflow) extends NodeRunner with Logging {
3132

3233
private var node: WorkflowNode = _
3334
private var canceled: Boolean = false
@@ -91,14 +92,20 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner {
9192
)
9293
val context = BranchExpressionUtils.buildEvaluationContext(node)
9394
val branchRuleText = BranchExpressionUtils.getBranchRuleText(node)
95+
info(s"Branch node ${node.getName} start evaluating. context=${context.toSeq.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(", ")}")
96+
info(s"Branch node ${node.getName} rules: ${Option(branchRuleText).getOrElse("")}")
97+
info(s"Branch node ${node.getName} outgoing targets: ${describeEdges(outgoingEdges)}")
9498
if (!Option(branchRuleText).exists(_.trim.nonEmpty)) {
9599
throw new IllegalStateException(s"Branch node ${node.getName} must define branch.rules.")
96100
}
97-
val selectedEdge = selectEdgeByRules(outgoingEdges, BranchExpressionUtils.parseBranchRules(branchRuleText), context)
101+
val parsedRules = BranchExpressionUtils.parseBranchRules(branchRuleText)
102+
info(s"Branch node ${node.getName} parsed rules: ${parsedRules.map(rule => s"${rule.condition}=>${rule.targetName}").mkString(", ")}")
103+
val selectedEdge = selectEdgeByRules(outgoingEdges, parsedRules, context)
98104
if (selectedEdge.isEmpty) {
99105
throw new IllegalStateException(s"No branch rule matched for node ${node.getName}.")
100106
}
101107
val selectedTarget = selectedEdge.map(_.getTarget).orNull
108+
info(s"Branch node ${node.getName} selected target id: ${Option(selectedTarget).getOrElse("")}")
102109
getNodeRunnerListener match {
103110
case flowEntranceJob: FlowEntranceJob =>
104111
flowEntranceJob.recordBranchSelection(currentNodeId, selectedTarget)
@@ -125,9 +132,32 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner {
125132
}
126133
}
127134
}
128-
rules.find(rule => !BranchExpressionUtils.isDefaultRule(rule) && BranchExpressionUtils.evaluateCondition(rule.condition, context))
129-
.flatMap(rule => matchEdge(rule.targetName))
130-
.orElse(rules.find(BranchExpressionUtils.isDefaultRule).flatMap(rule => matchEdge(rule.targetName)))
135+
rules.find { rule =>
136+
if (BranchExpressionUtils.isDefaultRule(rule)) {
137+
false
138+
} else {
139+
val matched = BranchExpressionUtils.evaluateCondition(rule.condition, context)
140+
info(s"Branch node ${node.getName} rule evaluated: ${rule.condition} => ${rule.targetName}, matched=$matched")
141+
matched
142+
}
143+
}.flatMap { rule =>
144+
val target = matchEdge(rule.targetName)
145+
info(s"Branch node ${node.getName} matched rule target lookup: ${rule.targetName}, found=${target.isDefined}")
146+
target
147+
}.orElse {
148+
rules.find(BranchExpressionUtils.isDefaultRule).flatMap { rule =>
149+
val target = matchEdge(rule.targetName)
150+
info(s"Branch node ${node.getName} use default rule target lookup: ${rule.targetName}, found=${target.isDefined}")
151+
target
152+
}
153+
}
131154
}
132155

156+
private def describeEdges(edges: Seq[DSSEdge]): String = {
157+
val workflowNodesById = flow.getWorkflowNodes.map(node => node.getId -> node).toMap
158+
edges.map { edge =>
159+
val targetName = workflowNodesById.get(edge.getTarget).map(_.getName).getOrElse(edge.getTarget)
160+
s"${edge.getTarget}($targetName)"
161+
}.mkString(", ")
162+
}
133163
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/utils/BranchExpressionUtils.scala

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import java.util
2121
import com.webank.wedatasphere.dss.common.entity.node.DSSEdge
2222
import com.webank.wedatasphere.dss.flow.execution.entrance.conf.FlowExecutionEntranceConfiguration
2323
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode
24+
import org.apache.linkis.common.utils.Logging
2425

2526
import scala.collection.JavaConversions._
2627

27-
object BranchExpressionUtils {
28+
object BranchExpressionUtils extends Logging {
2829

2930
val BranchNodeType = "workflow.branch"
3031
val BranchRuleKey = "branch.rules"
@@ -102,10 +103,30 @@ object BranchExpressionUtils {
102103
operators.collectFirst {
103104
case operator if expr.contains(operator) =>
104105
val parts = expr.split(java.util.regex.Pattern.quote(operator), 2).map(_.trim)
105-
if (parts.length != 2) false else compare(resolveValue(parts(0), context), resolveValue(parts(1), context), operator)
106+
if (parts.length != 2) {
107+
warn(s"Invalid branch condition syntax: $condition")
108+
false
109+
} else {
110+
(resolveValue(parts(0), context), resolveValue(parts(1), context)) match {
111+
case (Some(left), Some(right)) =>
112+
val matched = compare(left, right, operator)
113+
info(s"Branch condition evaluated: expr=$expr, left=$left, operator=$operator, right=$right, matched=$matched")
114+
matched
115+
case _ =>
116+
warn(s"Branch condition unresolved token: expr=$expr, leftToken=${parts(0)}, rightToken=${parts(1)}, contextKeys=${context.keys.toSeq.sorted.mkString(",")}")
117+
false
118+
}
119+
}
106120
}.getOrElse {
107-
val resolved = resolveValue(expr, context)
108-
resolved.equalsIgnoreCase("true") || resolved.nonEmpty
121+
resolveValue(expr, context) match {
122+
case Some(resolved) =>
123+
val matched = resolved.equalsIgnoreCase("true") || resolved.nonEmpty
124+
info(s"Branch condition evaluated: expr=$expr, value=$resolved, matched=$matched")
125+
matched
126+
case None =>
127+
warn(s"Branch condition unresolved token: expr=$expr, contextKeys=${context.keys.toSeq.sorted.mkString(",")}")
128+
false
129+
}
109130
}
110131
}
111132
}
@@ -116,13 +137,29 @@ object BranchExpressionUtils {
116137
} else expression
117138
}
118139

119-
private def resolveValue(token: String, context: Map[String, String]): String = {
120-
val normalized = token.trim
121-
val unquoted = normalized.stripPrefix("\"").stripSuffix("\"").stripPrefix("'").stripSuffix("'")
122-
context.getOrElse(normalized, context.getOrElse(unquoted, unquoted))
140+
private def resolveValue(token: String, context: Map[String, String]): Option[String] = {
141+
val normalized = Option(token).map(_.trim).getOrElse("")
142+
if (normalized.isEmpty) {
143+
None
144+
} else {
145+
val unquoted = normalized.stripPrefix("\"").stripSuffix("\"").stripPrefix("'").stripSuffix("'")
146+
if (isQuotedToken(normalized)) {
147+
Some(unquoted)
148+
} else {
149+
context.get(normalized)
150+
.orElse(context.get(unquoted))
151+
.orElse(if (isLiteralToken(unquoted)) Some(unquoted) else None)
152+
}
153+
}
123154
}
124155

156+
private def isQuotedToken(token: String): Boolean = {
157+
(token.startsWith("\"") && token.endsWith("\"")) || (token.startsWith("'") && token.endsWith("'"))
158+
}
125159

160+
private def isLiteralToken(token: String): Boolean = {
161+
token.equalsIgnoreCase("true") || token.equalsIgnoreCase("false") || toBigDecimal(token).nonEmpty
162+
}
126163

127164
private def getStringValue(value: Any): Option[String] = Option(value).map(_.toString.trim).filter(_.nonEmpty)
128165

0 commit comments

Comments
 (0)