Skip to content

Commit b935c8e

Browse files
zqburdeSunPengWan
authored andcommitted
fix: 适配调度系统#AI Commit#
1 parent 1781565 commit b935c8e

7 files changed

Lines changed: 334 additions & 559 deletions

File tree

Lines changed: 106 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1,188 +1,120 @@
11
# 工作流分支节点设计文档
22

33
## 1. 文档信息
4-
- 版本:1.20.0
4+
- 版本:1.21.0
55
- 状态:已更新
6-
- 实现方案:节点属性 `branch.rules` 驱动
6+
- 设计范围:DSS 发布层分支适配 + 语义对齐要求
77

88
## 2. 设计结论
9-
当前实现采用:
10-
- 分支节点类型:`workflow.branch`
11-
- 规则配置位置:分支节点属性 `branch.rules`
12-
- 上游变量映射位置:上游节点属性 `branch.output.mapping`
13-
- 运行时控制:记录选中的目标节点,仅放行该下游,其余直接下游跳过
14-
15-
## 3. 核心架构
16-
### 3.1 元数据层
17-
通过节点元数据表下发分支节点和相关属性,不新增分支专用业务表。
18-
19-
涉及元数据:
20-
- `workflow.branch` 节点定义
21-
- `branch.rules` 节点 UI 属性
22-
- `branch.output.mapping` 节点 UI 属性
23-
24-
实现位置:
25-
- `db/dss_dml.sql`
26-
27-
### 3.2 运行时链路
28-
1. 用户通过工作流执行接口提交流程。
29-
2. `FlowJobNodeParser` 为每个节点创建 runner,并把共享 `flow_var_map` 注入节点参数。
30-
3. 上游普通节点成功后,`FlowEntranceJob` 从结果集提取变量并回写到共享 `flow_var_map`
31-
4. 分支节点执行时,`BranchNodeRunner` 读取 `branch.rules`,计算命中的目标下游节点。
32-
5. `FlowEntranceJob.recordBranchSelection` 记录分支节点最终选中的 target。
33-
6. `FlowDependencyResolverImpl` 解析 pending 节点时:
34-
- 命中的直接下游转为 `scheduled`
35-
- 未命中的兄弟直接下游转为 `skipped`
36-
37-
## 4. 关键实现类
38-
### 4.1 执行入口
39-
文件:
40-
- `dss-flow-execution-server/.../restful/FlowEntranceRestfulApi.java`
41-
42-
职责:
43-
- 接收执行请求
44-
- 提交流程到 `entranceServer`
45-
46-
说明:
47-
- 不直接决定分支下游是否执行
48-
- 仅是工作流执行入口
49-
50-
### 4.2 节点解析
51-
文件:
52-
- `dss-flow-execution-server/.../job/parser/FlowJobNodeParser.scala`
53-
54-
职责:
55-
- 初始化共享 `flow_var_map`
56-
-`workflow.branch` 绑定 `BranchNodeRunner`
57-
- 为普通节点绑定 `DefaultNodeRunner`
58-
59-
### 4.3 变量收集
60-
文件:
61-
- `dss-flow-execution-server/.../job/FlowEntranceJob.scala`
62-
63-
职责:
64-
- 在上游节点成功后采集输出变量
65-
- 支持读取上游结果集变量
66-
- 支持 `branch.output.mapping` 映射别名
67-
68-
### 4.4 规则解析
69-
文件:
70-
- `dss-flow-execution-server/.../utils/BranchExpressionUtils.scala`
71-
72-
职责:
73-
- 判断节点是否为分支节点
74-
- 读取 `branch.rules`
75-
- 将规则文本解析为 `condition -> targetName`
76-
- 计算条件表达式
77-
78-
当前规则:
79-
- 多行规则
80-
- 按顺序匹配
81-
- 支持默认规则关键字:`default``else``*`
82-
83-
### 4.5 分支节点执行
84-
文件:
85-
- `dss-flow-execution-server/.../node/BranchNodeRunner.scala`
86-
87-
职责:
88-
- 读取共享变量上下文
89-
- 解析并执行 `branch.rules`
90-
- 将命中的 target 记录到 `FlowEntranceJob`
91-
- 自身不提交 Linkis 作业
92-
- 成功后触发后续调度
93-
94-
### 4.6 下游节点放行与跳过
95-
文件:
96-
- `dss-flow-execution-server/.../resolver/FlowDependencyResolverImpl.scala`
97-
98-
职责:
99-
- 判断某个 pending 节点是否是分支命中的 target
100-
- 命中则 `scheduled`
101-
- 未命中则 `skipped`
102-
103-
这是“控制下游节点要不要执行”的真正实现位置。
104-
105-
### 4.7 保存校验
106-
文件:
107-
- `dss-workflow-server/.../service/impl/DSSFlowServiceImpl.java`
108-
109-
职责:
110-
- 保存工作流时校验分支节点配置
111-
112-
校验规则:
113-
- 至少两条直接出边
114-
- `branch.rules` 非空
115-
- 规则中的目标节点名必须是直接下游节点名
116-
117-
## 5. 规则语法
118-
### 5.1 基本格式
119-
```text
120-
条件=下游节点名
9+
本版本采用“DSS 规则配置 + 发布为 decision + 语义对齐 DSS FlowExecution”的方案:
10+
- DSS 节点类型保持 `workflow.branch`
11+
- 规则配置保持 `branch.rules`
12+
- 发布到 Schedulis 时分支节点 `.job` 转换为 `type=decision`
13+
- Schedulis 运行期必须按 DSS FlowExecution 语义判定分支下游。
14+
15+
## 3. 总体架构
16+
17+
### 3.1 关键模块
18+
- `BranchWorkflowRewriteConverter`
19+
- 提取分支规则并补充 route 元数据。
20+
- `BranchRouteJobTuning`
21+
- 将分支节点类型调优为 `decision`
22+
- `LinkisJobConverter`
23+
- 生成 decision 节点属性:`condition.N/on.success.N/on.failure.N`
24+
- `AzkabanDssJobType`
25+
- 仅保留动态变量提取与 props 输出,不承担 guard/route 分支裁剪。
26+
27+
### 3.2 架构示意图
28+
```mermaid
29+
flowchart LR
30+
A[workflow.branch + branch.rules] --> B[BranchWorkflowRewriteConverter]
31+
B --> C[LinkisJobConverter]
32+
C --> D[BranchRouteJobTuning]
33+
D --> E[type=decision .job]
34+
E --> F[Schedulis]
35+
F --> G[按 DSS FlowExecution 语义调度]
36+
```
37+
38+
## 4. 核心设计
39+
40+
### 4.1 决策节点发布设计
41+
分支节点发布规则:
42+
1. 读取 `wds.branch.route.rule.text`
43+
2.`;` 切分规则。
44+
3. 每条规则按最后一个 `=` 切分条件与目标。
45+
4. 输出:
46+
- `condition.N`
47+
- `on.success.N`
48+
- `on.failure.N=`
49+
5. `default=xxx` 转换为 `condition.N=true`
50+
51+
### 4.2 字段过滤设计
52+
`type=decision` 时,不输出 linkis 专属字段:
53+
- `linkis.version`
54+
- `linkistype`
55+
- `command`
56+
57+
### 4.3 分支语义对齐设计(重点)
58+
对齐依据:DSS `FlowDependencyResolverImpl`
59+
60+
关键语义:
61+
1. 父节点未全部完成 -> 等待。
62+
2. 存在分支父节点未命中当前节点 -> `skip`
63+
3. 所有分支父节点命中且普通父节点完成无失败 -> 可执行。
64+
4. 普通父节点失败 -> 失败链路。
65+
66+
特别说明:
67+
- 混合上游场景(分支未命中 + 普通父成功)必须 `skip`
68+
- 该语义优先于“任一父成功即可执行”。
69+
70+
### 4.4 状态判定流程图
71+
```mermaid
72+
flowchart TD
73+
A[候选节点X] --> B{父节点是否全部完成?}
74+
B -- 否 --> W[等待]
75+
B -- 是 --> C{存在普通父失败?}
76+
C -- 是 --> F[失败链路]
77+
C -- 否 --> D{存在分支父未命中X?}
78+
D -- 是 --> S[Skip]
79+
D -- 否 --> E[Scheduled/Execute]
12180
```
12281

82+
## 5. 关键场景补充
83+
84+
### 5.1 编号10场景(混合父节点)
85+
- 条件:节点 X 有普通父 A(成功)和分支父 B(未命中 X)。
86+
- 结果:X 必须 `skip`
87+
- 原因:分支未命中判定优先,严格对齐 DSS FlowExecution。
88+
12389
### 5.2 示例
90+
输入:
12491
```text
125-
amount>100=节点A
126-
amount<10=节点B
127-
default=节点C
92+
branch.rules = b==200=sql_1345;b<50=sql_9000
12893
```
12994

130-
### 5.3 语义
131-
- 条件成立时,选择右侧节点名对应的直接下游节点
132-
- 默认规则在前面所有规则都未命中时生效
133-
- 若无规则命中且无默认规则,则抛异常
134-
135-
## 6. 变量模型
136-
### 6.1 共享变量
137-
共享变量保存在 `flow_var_map`
138-
139-
来源包括:
140-
- 工作流全局属性
141-
- 工作流启动参数
142-
- 上游节点结果集变量
143-
- `branch.output.mapping` 映射后的别名变量
144-
145-
### 6.2 上游结果集约束
146-
当前版本主要支持:
147-
- 单结果集
148-
- 简单列名到值的映射
149-
150-
建议上游输出示例:
151-
```sql
152-
select sum(price) as amount from order_info
95+
输出:
96+
```properties
97+
type=decision
98+
condition.1=b==200
99+
on.success.1=sql_1345
100+
on.failure.1=
101+
condition.2=b<50
102+
on.success.2=sql_9000
103+
on.failure.2=
153104
```
154105

155-
## 7. 前端设计
156-
当前版本前端以节点属性表单为主,不再以边配置为主。
157-
158-
用户操作方式:
159-
1. 拖入分支节点
160-
2. 连出多个下游节点
161-
3. 在节点属性中填写 `branch.rules`
162-
4. 保存并执行
163-
164-
说明:
165-
- 分支规则配置入口应放在分支节点属性面板中
166-
- 不要求边级条件配置弹窗作为正式入口
167-
168-
## 8. 非目标设计
169-
以下方案已废弃:
170-
- `dss_workflow_branch`
171-
- `BranchNodeService`
172-
- `BranchNodeMapper`
173-
- `BranchNodeConfig` 独立实体
174-
- `POST/PUT/GET /workflow/node/branch/...` 配置接口
175-
- `SpEL` 复杂逻辑引擎依赖
176-
- `BranchNodeSkipStrategy` 独立跳过策略实现
177-
178-
## 9. 兼容性
179-
- 不含分支节点的现有工作流执行逻辑不变
180-
- 分支节点仅影响自身直接下游
181-
- 旧版边条件配置不作为正式能力保留
182-
183-
## 10. 验证要点
184-
- 分支节点必须能读取上游变量
185-
- 规则文本必须正确映射到直接下游节点名
186-
- 命中的 target 会执行
187-
- 同级未命中的 target 会跳过
188-
- 无命中且无默认规则时报错
106+
## 6. 实现约束
107+
1. 不再依赖 DSS `guard` 运行时裁剪语义来决定下游执行。
108+
2. 不允许 Schedulis 语义偏离 DSS 矩阵规则。
109+
3. jobtype 仅负责动态变量进入 props,供 decision 条件使用。
110+
111+
## 7. 验证要点
112+
1. 分支节点发布后是 `type=decision`
113+
2. decision 属性展开正确。
114+
3. 混合上游编号10场景验证为 `skip`
115+
4. 与 DSS FlowExecution 语义逐条对齐通过。
116+
117+
## 8. 非目标
118+
- 不引入新的分支 DSL。
119+
- 不新增分支数据库模型。
120+
- 不改动分支节点前端配置入口。

0 commit comments

Comments
 (0)