Skip to content

Commit b8e37bc

Browse files
committed
#AI Commit#add log
1 parent 4dc2bea commit b8e37bc

2 files changed

Lines changed: 60 additions & 13 deletions

File tree

  • dss-orchestrator/orchestrators/dss-workflow
    • dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node
    • dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/BranchNodeRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class BranchNodeRunner(flow: Workflow) extends NodeRunner with Logging {
9393
val context = BranchExpressionUtils.buildEvaluationContext(node)
9494
val branchRuleText = BranchExpressionUtils.getBranchRuleText(node)
9595
logInfo(s"Branch node ${node.getName} start evaluating. context=${context.toSeq.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(", ")}")
96-
logInfo(s"Branch node ${node.getName} rules: ${Option(branchRuleText).getOrElse("")}")
96+
logInfo(s"Branch node ${node.getName} rules are: ${Option(branchRuleText).getOrElse("")}")
9797
logInfo(s"Branch node ${node.getName} outgoing targets: ${describeEdges(outgoingEdges)}")
9898
if (!Option(branchRuleText).exists(_.trim.nonEmpty)) {
9999
throw new IllegalStateException(s"Branch node ${node.getName} must define branch.rules.")

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -346,34 +346,81 @@ public String getResult(Job job, int index, int maxSize) {
346346
public Map<String, String> getResultVariables(Job job, int maxSize) {
347347
Map<String, String> variables = new LinkedHashMap<>();
348348
Object fileContent = getResultFileContent(job, 0, maxSize);
349+
if (fileContent == null) {
350+
job.getLogObj().warn("Branch variable extraction skipped because result file content is null.");
351+
return variables;
352+
}
353+
job.getLogObj().info("Branch variable extraction file content type: " + fileContent.getClass().getName());
349354
if (!(fileContent instanceof ArrayList)) {
355+
job.getLogObj().warn("Branch variable extraction skipped because result file content is not ArrayList: " + fileContent);
350356
return variables;
351357
}
352-
ArrayList<ArrayList<String>> rows = (ArrayList<ArrayList<String>>) fileContent;
353-
if (rows == null || rows.isEmpty()) {
358+
ArrayList rows = (ArrayList) fileContent;
359+
if (rows.isEmpty()) {
360+
job.getLogObj().warn("Branch variable extraction skipped because result rows are empty.");
354361
return variables;
355362
}
356-
if (rows.size() >= 2 && rows.get(0) != null && rows.get(1) != null) {
357-
ArrayList<String> headers = rows.get(0);
358-
ArrayList<String> values = rows.get(1);
363+
Object firstRow = rows.get(0);
364+
if (firstRow instanceof Map) {
365+
extractVariablesFromMapRows(rows, variables);
366+
} else if (firstRow instanceof ArrayList) {
367+
extractVariablesFromArrayRows(rows, variables);
368+
} else {
369+
job.getLogObj().warn("Branch variable extraction skipped because first row type is unsupported: " + firstRow.getClass().getName());
370+
}
371+
job.getLogObj().info("Branch variable extraction result: " + variables);
372+
return variables;
373+
}
374+
375+
private void extractVariablesFromArrayRows(ArrayList rows, Map<String, String> variables) {
376+
if (rows.size() >= 2 && rows.get(0) instanceof ArrayList && rows.get(1) instanceof ArrayList) {
377+
ArrayList headers = (ArrayList) rows.get(0);
378+
ArrayList values = (ArrayList) rows.get(1);
359379
int size = Math.min(headers.size(), values.size());
360380
for (int i = 0; i < size; i++) {
361-
String key = headers.get(i);
362-
String value = values.get(i);
381+
String key = normalizeCellValue(headers.get(i));
382+
String value = normalizeCellValue(values.get(i));
363383
if (StringUtils.isNotBlank(key) && value != null) {
364384
variables.put(key.trim(), value);
365385
}
366386
}
367387
if (!variables.isEmpty()) {
368-
return variables;
388+
return;
369389
}
370390
}
371-
for (ArrayList<String> row : rows) {
372-
if (row != null && row.size() >= 2 && StringUtils.isNotBlank(row.get(0)) && row.get(1) != null) {
373-
variables.put(row.get(0).trim(), row.get(1));
391+
for (Object rowObj : rows) {
392+
if (!(rowObj instanceof ArrayList)) {
393+
continue;
394+
}
395+
ArrayList row = (ArrayList) rowObj;
396+
if (row.size() >= 2) {
397+
String key = normalizeCellValue(row.get(0));
398+
String value = normalizeCellValue(row.get(1));
399+
if (StringUtils.isNotBlank(key) && value != null) {
400+
variables.put(key.trim(), value);
401+
}
374402
}
375403
}
376-
return variables;
404+
}
405+
406+
private void extractVariablesFromMapRows(ArrayList rows, Map<String, String> variables) {
407+
Object rowObj = rows.get(0);
408+
if (!(rowObj instanceof Map)) {
409+
return;
410+
}
411+
Map row = (Map) rowObj;
412+
for (Object entryObj : row.entrySet()) {
413+
Map.Entry entry = (Map.Entry) entryObj;
414+
String key = normalizeCellValue(entry.getKey());
415+
String value = normalizeCellValue(entry.getValue());
416+
if (StringUtils.isNotBlank(key) && value != null) {
417+
variables.put(key.trim(), value);
418+
}
419+
}
420+
}
421+
422+
private String normalizeCellValue(Object value) {
423+
return value == null ? null : value.toString();
377424
}
378425

379426
private Object getResultFileContent(Job job, int index, int maxSize) {

0 commit comments

Comments
 (0)