Skip to content

Commit 6d54aa9

Browse files
authored
Merge pull request #600 from swfnotswift/swf
✨ 新功能 实现普通节点的一拖多/多对一的分支并行处理
2 parents 214b09c + 5d6cdd3 commit 6d54aa9

35 files changed

Lines changed: 790 additions & 93 deletions

File tree

agent-flow/src/components/base/jadeNode.jsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ export const jadeNode = (id, x, y, width, height, parent, drawer) => {
469469
* @returns {number} 连接数。
470470
*/
471471
self.maxNumToLink = () => {
472-
return 1;
472+
return self.graph?.connectionLimitDisabled ? 10 : 1;
473473
};
474474

475475
/**

agent-flow/src/components/base/validator.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ export class NormalNodeConnectorValidator extends Validator {
3535
validate() {
3636
const nextEvents = this.node.getNextRunnableEvents();
3737
const i18n = this.node.graph.i18n;
38-
if (nextEvents.length !== 1) {
38+
const isConnectionLimitDisabled = Boolean(this.node.graph?.connectionLimitDisabled);
39+
const isValid = isConnectionLimitDisabled ? nextEvents.length >= 1 : nextEvents.length === 1;
40+
if (!isValid) {
3941
return Promise.reject({
4042
errorFields: [{
4143
errors: [`${i18n?.t('node') ?? 'node'} ${this.node.text} ${i18n?.t('problemWithConnection') ?? 'problemWithConnection'}`],

agent-flow/src/components/code/codeNodeState.jsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ export const codeNodeState = (id, x, y, width, height, parent, drawer) => {
9393
* @override
9494
*/
9595
self.maxNumToLink = () => {
96-
return 10;
96+
return self.graph?.connectionLimitDisabled ? 100 : 10;
9797
};
9898

9999
return self;

agent-flow/src/flow/jadeFlowEntry.jsx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ const jadeFlowAgent = (graph) => {
378378
graph.destroy();
379379
};
380380

381+
self.setConnectionLimitDisabled = (disabled) => {
382+
graph.connectionLimitDisabled = Boolean(disabled);
383+
};
384+
381385
return self;
382386
};
383387

@@ -432,6 +436,7 @@ export const JadeFlow = (() => {
432436
div,
433437
tenant,
434438
appId,
439+
connectionLimitDisabled = false,
435440
flowConfigData,
436441
configs,
437442
i18n,
@@ -440,7 +445,7 @@ export const JadeFlow = (() => {
440445
}) => {
441446
const graphDom = getGraphDom(div);
442447
const g = jadeFlowGraph(div, 'jadeFlow');
443-
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements);
448+
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled);
444449
g.flowType = flowType;
445450
const pageData = g.getPageData(0);
446451
await g.editFlow(0, graphDom, pageData.id);
@@ -470,8 +475,9 @@ export const JadeFlow = (() => {
470475
return jadeFlowAgent(g);
471476
};
472477

473-
const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements) => {
478+
const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled = false) => {
474479
g.collaboration.mute = true;
480+
g.connectionLimitDisabled = Boolean(connectionLimitDisabled);
475481
g.configs = configs;
476482
g.i18n = i18n;
477483
for (let i = 0; i < importStatements.length; i++) {

agent-flow/src/flow/jadeFlowGraph.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export const jadeFlowGraph = (div, title) => {
7676
const self = defaultGraph(div, title);
7777
self.type = 'jadeFlowGraph';
7878
self.pageType = 'jadeFlowPage';
79+
self.connectionLimitDisabled = false;
7980
self.enableText = false;
8081
self.flowMeta = {
8182
exceptionFitables: ['modelengine.fit.jober.aipp.fitable.AippFlowExceptionHandler'],

agent-flow/src/flow/jadeFlowPage.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export const jadeFlowPage = (div, graph, name, id) => {
4040
self.addEventListener('COPY_SHAPE', shapeChangeListener);
4141
self.addEventListener('DELETE_SHAPE', shapeChangeListener);
4242

43+
const isConnectionLimitDisabled = () => Boolean(self.graph?.connectionLimitDisabled);
44+
4345
/**
4446
* @override
4547
*/
@@ -305,7 +307,7 @@ export const jadeFlowPage = (div, graph, name, id) => {
305307
*/
306308
self.canDragOut = (node, connector) => {
307309
const lines = self.getEvents().filter(s => s.fromShape === node.id && s.getDefinedFromConnector() === connector);
308-
return lines && lines.length < 1;
310+
return lines.length < (isConnectionLimitDisabled() ? 10 : 1);
309311
};
310312

311313
/**
@@ -330,7 +332,9 @@ export const jadeFlowPage = (div, graph, name, id) => {
330332
}
331333
};
332334

333-
return jadeEvent.fromShape !== jadeEvent.toShape && isConnectorAllowToLink() && isConnectorWithinLimit();
335+
return jadeEvent.fromShape !== jadeEvent.toShape
336+
&& isConnectorAllowToLink()
337+
&& (isConnectionLimitDisabled() || isConnectorWithinLimit());
334338
};
335339

336340
/**

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,29 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
265265
return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList());
266266
}
267267

268+
/**
269+
* fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。
270+
*
271+
* @return 新的分支context
272+
*/
273+
public FlowContext<T> fork() {
274+
return this.convertData(this.data);
275+
}
276+
277+
/**
278+
* convertData
279+
*
280+
* @param <R> 转换后的数据类型
281+
* @param data 转换后的数据
282+
* @return 转换后的context
283+
*/
284+
public <R> FlowContext<R> convertData(R data) {
285+
FlowContext<R> context = this.copyContextWithoutID(data);
286+
context.previous = this.previous;
287+
context.nextPositionId = this.nextPositionId;
288+
return context;
289+
}
290+
268291
/**
269292
* 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样
270293
*
@@ -274,12 +297,17 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
274297
* @return 转换后的context
275298
*/
276299
public <R> FlowContext<R> convertData(R data, String id) {
300+
FlowContext<R> context = this.copyContextWithoutID(data);
301+
context.previous = this.previous;
302+
context.id = id;
303+
return context;
304+
}
305+
306+
private <R> FlowContext<R> copyContextWithoutID(R data) {
277307
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
278308
this.parallel, this.parallelMode, LocalDateTime.now());
279-
context.previous = this.previous;
280309
context.status = this.status;
281310
context.trans = this.trans;
282-
context.id = id;
283311
context.batchId = this.batchId;
284312
context.toBatch = this.toBatch;
285313
context.createAt = this.createAt;

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,16 @@ default void update(List<FlowContext<T>> contexts) {
9292
}
9393

9494
/**
95-
* updateToSent
95+
* 更新context状态为SENT
9696
*
97-
* @param contexts contexts
97+
* @param contexts 上下文列表
9898
*/
9999
void updateToSent(List<FlowContext<T>> contexts);
100100

101101
/**
102-
* updateToReady
102+
* 更新context状态为READY
103103
*
104-
* @param contexts contexts
104+
* @param contexts 上下文列表
105105
*/
106106
void updateToReady(List<FlowContext<T>> contexts);
107107

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowConditionNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
8080
protected void subscribe(
8181
FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to, FlowEvent event) {
8282
from.subscribe(event.getMetaId(), to, null, this.getWhether(from.getStreamId(), event));
83+
to.setAllMode();
8384
}
8485

8586
private Processors.Whether<FlowData> getWhether(String streamId, FlowEvent event) {

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowForkNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowcontext.FlowContextRepo;
1515
import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowlock.FlowLocks;
1616
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
17+
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FlowDataMerger;
1718
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Node;
1819
import modelengine.fitframework.log.Logger;
1920

@@ -46,7 +47,8 @@ public class FlowForkNode extends FlowNode {
4647
public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, FlowContextRepo<FlowData> repo,
4748
FlowContextMessenger messenger, FlowLocks locks) {
4849
if (!Optional.ofNullable(processor).isPresent()) {
49-
this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type);
50+
this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type,
51+
new FlowDataMerger());
5052
this.processor.onError(errorHandler(streamId));
5153
}
5254
return this.processor;

0 commit comments

Comments
 (0)