Skip to content

Commit d16ccba

Browse files
committed
fix: 修复节点执行跳过问题
1 parent 55b0c6d commit d16ccba

4 files changed

Lines changed: 62 additions & 88 deletions

File tree

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/FlowContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ trait FlowContext {
4040
def getFlowStatus: SchedulerEventState
4141

4242
def isNodeCompleted(nodeName: String): Boolean
43+
44+
def isNodeSkipped(nodeName: String): Boolean
45+
46+
def isNodeSucceed(nodeName: String): Boolean
4347
}
4448

4549
object FlowContext {
@@ -81,4 +85,4 @@ object FlowContext {
8185
}
8286
nodes
8387
}
84-
}
88+
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/FlowContextImpl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ class FlowContextImpl extends FlowContext with Logging {
5555
override def getFlowStatus: SchedulerEventState = this.flowStatus
5656

5757
override def isNodeCompleted(nodeName: String): Boolean = {
58-
getSkippedNodes.containsKey(nodeName) || getSucceedNodes.containsKey(nodeName) || getFailedNodes.containsKey(nodeName)
58+
isNodeSkipped(nodeName) || isNodeSucceed(nodeName) || getFailedNodes.containsKey(nodeName)
5959
}
6060

61-
}
61+
override def isNodeSkipped(nodeName: String): Boolean = getSkippedNodes.containsKey(nodeName)
62+
63+
override def isNodeSucceed(nodeName: String): Boolean = getSucceedNodes.containsKey(nodeName)
64+
65+
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/resolver/FlowDependencyResolverImpl.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
4848
true
4949
}
5050

51-
def hasSkippedParent(node: WorkflowNode): Boolean = {
52-
node.getDependencys != null && node.getDependencys.exists(flowContext.getSkippedNodes.containsKey)
51+
def areAllParentsSkipped(node: WorkflowNode): Boolean = {
52+
node.getDependencys != null &&
53+
!node.getDependencys.isEmpty &&
54+
node.getDependencys.forall(flowContext.isNodeSkipped)
5355
}
5456

5557
def shouldSkipByBranch(node: WorkflowNode): Boolean = {
@@ -64,7 +66,7 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
6466
}
6567

6668
def shouldSkip(node: WorkflowNode): Boolean = {
67-
shouldSkipByBranch(node) || hasSkippedParent(node)
69+
shouldSkipByBranch(node) || areAllParentsSkipped(node)
6870
}
6971

7072
def isBranchRouteMatched(node: WorkflowNode): Boolean = {
@@ -102,3 +104,4 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
102104
info(s"${flowJob.getId} Finished to get executable node(${flowContext.getScheduledNodes.size()})")
103105
}
104106
}
107+

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java

Lines changed: 45 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -349,103 +349,53 @@ public String getResult(Job job, int index, int maxSize) {
349349
@Override
350350
public Map<String, String> getResultVariables(Job job, int maxSize) {
351351
Map<String, String> variables = new LinkedHashMap<>();
352-
Object fileContent = getResultFileContent(job, 0, maxSize);
353-
if (fileContent == null) {
354-
job.getLogObj().warn("Branch variable extraction skipped because result file content is null.");
355-
LOGGER.warn("Branch variable extraction skipped because result file content is null.");
356-
return variables;
357-
}
358-
job.getLogObj().info("Branch variable extraction file content type: " + fileContent.getClass().getName());
359-
LOGGER.info("Branch variable extraction file content type: {}", fileContent.getClass().getName());
360-
if (!(fileContent instanceof ArrayList)) {
361-
job.getLogObj().warn("Branch variable extraction skipped because result file content is not ArrayList: " + fileContent);
362-
LOGGER.warn("Branch variable extraction skipped because result file content is not ArrayList: {}", fileContent);
363-
return variables;
364-
}
365-
ArrayList rows = (ArrayList) fileContent;
366-
if (rows.isEmpty()) {
367-
job.getLogObj().warn("Branch variable extraction skipped because result rows are empty.");
368-
LOGGER.warn("Branch variable extraction skipped because result rows are empty.");
369-
return variables;
370-
}
371-
LOGGER.info("Branch variable extraction rows size: {}", rows.size());
372-
LOGGER.info("Branch variable extraction rows preview: {}", previewRows(rows));
373-
Object firstRow = rows.get(0);
374-
if (firstRow instanceof Map) {
375-
extractVariablesFromMapRows(rows, variables);
376-
} else if (firstRow instanceof ArrayList) {
377-
extractVariablesFromArrayRows(rows, variables);
378-
} else {
379-
job.getLogObj().warn("Branch variable extraction skipped because first row type is unsupported: " + firstRow.getClass().getName());
380-
LOGGER.warn("Branch variable extraction skipped because first row type is unsupported: {}", firstRow.getClass().getName());
352+
int resultSize = 0;
353+
try{
354+
resultSize = getLinkisNodeExecution().getResultSize(job);
355+
}catch(final Throwable t){
356+
LOGGER.error("failed to get result size");
357+
resultSize = -1;
381358
}
382-
job.getLogObj().info("Branch variable extraction result: " + variables);
383-
LOGGER.info("Branch variable extraction result: {}", variables);
384-
return variables;
385-
}
386-
387-
private void extractVariablesFromArrayRows(ArrayList rows, Map<String, String> variables) {
388-
if (rows.size() >= 2 && rows.get(0) instanceof ArrayList && rows.get(1) instanceof ArrayList) {
389-
ArrayList headers = (ArrayList) rows.get(0);
390-
ArrayList values = (ArrayList) rows.get(1);
391-
int size = Math.min(headers.size(), values.size());
392-
for (int i = 0; i < size; i++) {
393-
String key = normalizeCellValue(headers.get(i));
394-
String value = normalizeCellValue(values.get(i));
395-
if (StringUtils.isNotBlank(key) && value != null) {
396-
variables.put(key.trim(), value);
397-
}
359+
for (int i = 0; i < resultSize; i++) {
360+
Object fileContent = getResultFileContent(job, i, maxSize);
361+
if (fileContent == null) {
362+
LOGGER.warn("Branch variable extraction skipped because result file content is null.");
363+
return variables;
398364
}
399-
if (!variables.isEmpty()) {
400-
return;
365+
LOGGER.info("Branch variable extraction file content type: {}", fileContent.getClass().getName());
366+
if (!(fileContent instanceof ArrayList)) {
367+
LOGGER.warn("Branch variable extraction skipped because result file content is not ArrayList: {}", fileContent);
368+
return variables;
401369
}
402-
}
403-
if (rows.size() == 1 && rows.get(0) instanceof ArrayList) {
404-
ArrayList row = (ArrayList) rows.get(0);
405-
if (row.size() == 1) {
406-
LOGGER.warn("Branch variable extraction saw a single-row single-column result: {}. Column name may not be present in fileContent.", row);
407-
}
408-
}
409-
410-
for (Object rowObj : rows) {
411-
if (!(rowObj instanceof ArrayList)) {
412-
continue;
370+
ArrayList rows = (ArrayList) fileContent;
371+
if (rows.isEmpty()) {
372+
LOGGER.warn("Branch variable extraction skipped because result rows are empty.");
373+
return variables;
413374
}
414-
ArrayList row = (ArrayList) rowObj;
415-
if (row.size() >= 2) {
416-
String key = normalizeCellValue(row.get(0));
417-
String value = normalizeCellValue(row.get(1));
418-
if (StringUtils.isNotBlank(key) && value != null) {
419-
variables.put(key.trim(), value);
375+
LOGGER.info("Branch variable extraction rows size: {}", rows.size());
376+
LOGGER.info("Branch variable extraction rows preview: {}", previewRows(rows));
377+
if (rows.size() == 1) {
378+
ArrayList oneRow = (ArrayList) rows.get(0);
379+
if (oneRow.size() == 1) {
380+
Object metadata = getResultMetadata(job, i, maxSize);
381+
ArrayList metadataList = (ArrayList) metadata;
382+
Map metadataMap = (Map) metadataList.get(0);
383+
String columnName = metadataMap.get("columnName").toString();
384+
variables.put(columnName, oneRow.get(0).toString());
420385
}
421386
}
422387
}
423-
}
424388

425-
private void extractVariablesFromMapRows(ArrayList rows, Map<String, String> variables) {
426-
Object rowObj = rows.get(0);
427-
if (!(rowObj instanceof Map)) {
428-
return;
429-
}
430-
Map row = (Map) rowObj;
431-
for (Object entryObj : row.entrySet()) {
432-
Map.Entry entry = (Map.Entry) entryObj;
433-
String key = normalizeCellValue(entry.getKey());
434-
String value = normalizeCellValue(entry.getValue());
435-
if (StringUtils.isNotBlank(key) && value != null) {
436-
variables.put(key.trim(), value);
437-
}
438-
}
389+
LOGGER.info("Branch variable extraction result: {}", variables);
390+
return variables;
439391
}
440392

441393
private String previewRows(ArrayList rows) {
442394
int previewSize = Math.min(rows.size(), 3);
443395
return rows.subList(0, previewSize).toString();
444396
}
445397

446-
private String normalizeCellValue(Object value) {
447-
return value == null ? null : value.toString();
448-
}
398+
449399

450400
private Object getResultFileContent(Job job, int index, int maxSize) {
451401
JobInfoResult jobInfo = getClient(job).getJobInfo(job.getJobExecuteResult());
@@ -458,6 +408,19 @@ private Object getResultFileContent(Job job, int index, int maxSize) {
458408
}
459409
return null;
460410
}
411+
412+
private Object getResultMetadata(Job job, int index, int maxSize) {
413+
JobInfoResult jobInfo = getClient(job).getJobInfo(job.getJobExecuteResult());
414+
String[] resultSetList = jobInfo.getResultSetList(getClient(job));
415+
if (resultSetList != null && resultSetList.length > index) {
416+
return getClient(job).resultSet(ResultSetAction.builder()
417+
.setPath(resultSetList[index])
418+
.setUser(job.getJobExecuteResult().getUser())
419+
.setPageSize(maxSize).build()).getMetadata();
420+
}
421+
return null;
422+
}
423+
461424
@Override
462425
public void onStatusChanged(String fromState, String toState, Job job) {
463426
}

0 commit comments

Comments
 (0)