Skip to content

Commit 2494022

Browse files
committed
fix: 适配调度系统#AI Commit#
1 parent f12e3c1 commit 2494022

2 files changed

Lines changed: 29 additions & 1 deletion

File tree

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/AzkabanDssJobType.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class AzkabanDssJobType extends AbstractJob {
6161

6262
private Job job;
6363

64+
private volatile Props generatedProperties = new Props();
65+
6466
private boolean isCanceled = false;
6567

6668
public AzkabanDssJobType(String jobId, Props sysProps, Props jobProps, Logger log) {
@@ -151,6 +153,11 @@ public boolean isCanceled() {
151153
return isCanceled;
152154
}
153155

156+
@Override
157+
public Props getJobGeneratedProperties() {
158+
return this.generatedProperties == null ? new Props() : this.generatedProperties;
159+
}
160+
154161
@Override
155162
public double getProgress() throws Exception {
156163
return LinkisNodeExecutionImpl.getLinkisNodeExecution().getProgress(this.job);
@@ -165,6 +172,9 @@ private void collectBranchVariables() {
165172
Map<String, String> resultVariables = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultVariables(this.job, 128);
166173
Map<String, String> resolvedVariables = resolveBranchOutputVariables(resultVariables);
167174
if (!resolvedVariables.isEmpty()) {
175+
Props props = new Props();
176+
props.putAll(resolvedVariables);
177+
this.generatedProperties = props;
168178
BranchRuntimeStore.mergeFlowVariables(flowExecId, resolvedVariables);
169179
info("Collected branch flow variables: " + resolvedVariables);
170180
}
@@ -297,3 +307,4 @@ private String getRunTodayh(boolean stdFormat) {
297307
return null;
298308
}
299309
}
310+

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/BranchRouteExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,34 @@ public void execute() throws Exception {
5757

5858
private Map<String, String> buildEvaluationContext(String branchNodeId, String branchNodeName) {
5959
Map<String, String> context = new LinkedHashMap<>();
60+
for (Map.Entry<String, String> entry : jobProps.entrySet()) {
61+
if (!isBlank(entry.getKey()) && entry.getValue() != null) {
62+
context.put(entry.getKey(), entry.getValue());
63+
}
64+
}
6065
for (Map.Entry<String, String> entry : jobProps.entrySet()) {
6166
if (entry.getKey().startsWith(LinkisJobTypeConf.FLOW_VARIABLE_PREFIX) && entry.getValue() != null) {
6267
context.put(entry.getKey().substring(LinkisJobTypeConf.FLOW_VARIABLE_PREFIX.length()), entry.getValue());
6368
}
6469
}
65-
context.putAll(BranchRuntimeStore.snapshotFlowVariables(jobProps.get(LinkisJobTypeConf.FLOW_EXEC_ID)));
70+
mergeMissingVariables(context, BranchRuntimeStore.snapshotFlowVariables(jobProps.get(LinkisJobTypeConf.FLOW_EXEC_ID)));
6671
context.put("node.id", branchNodeId);
6772
context.put("node.name", branchNodeName == null ? "" : branchNodeName);
6873
context.put("node.type", LinkisJobTypeConf.BRANCH_ROUTE_LINKIS_TYPE);
6974
return context;
7075
}
7176

77+
private void mergeMissingVariables(Map<String, String> context, Map<String, String> fallbackVariables) {
78+
if (fallbackVariables == null || fallbackVariables.isEmpty()) {
79+
return;
80+
}
81+
for (Map.Entry<String, String> entry : fallbackVariables.entrySet()) {
82+
if (!isBlank(entry.getKey()) && entry.getValue() != null && !context.containsKey(entry.getKey())) {
83+
context.put(entry.getKey(), entry.getValue());
84+
}
85+
}
86+
}
87+
7288
private String selectTarget(String branchRuleText, Map<String, String> targetNameToId, Map<String, String> context) {
7389
List<BranchExpressionUtils.BranchRule> rules = BranchExpressionUtils.parseBranchRules(branchRuleText);
7490
for (BranchExpressionUtils.BranchRule rule : rules) {
@@ -114,3 +130,4 @@ private boolean isBlank(String value) {
114130
return value == null || value.trim().isEmpty();
115131
}
116132
}
133+

0 commit comments

Comments
 (0)