Skip to content

Commit 9d7fc0f

Browse files
author
qq_62395577
committed
修改后端代码,实现条件流程校验阶段对多节点的支持
1 parent 3dd9244 commit 9d7fc0f

6 files changed

Lines changed: 254 additions & 31 deletions

File tree

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public final class FlowContext<T> extends IdGenerator {
2626
/**
2727
* 通过from.offer(data)而不是.offer(context)发起的数据会新增一个trace,这个trace会延续到flow end
2828
*/
29+
2930
@Getter
3031
private final Set<String> traceId;
3132

@@ -265,6 +266,29 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
265266
return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList());
266267
}
267268

269+
/**
270+
* fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。
271+
*
272+
* @return 新的分支context
273+
*/
274+
public FlowContext<T> fork() {
275+
return this.convertData(this.data);
276+
}
277+
278+
/**
279+
* convertData
280+
*
281+
* @param <R> 转换后的数据类型
282+
* @param data 转换后的数据
283+
* @return 转换后的context
284+
*/
285+
public <R> FlowContext<R> convertData(R data) {
286+
FlowContext<R> context = this.copyContext(data);
287+
context.previous = this.id;
288+
context.nextPositionId = this.nextPositionId;
289+
return context;
290+
}
291+
268292
/**
269293
* 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样
270294
*
@@ -274,12 +298,17 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
274298
* @return 转换后的context
275299
*/
276300
public <R> FlowContext<R> convertData(R data, String id) {
301+
FlowContext<R> context = this.copyContext(data);
302+
context.previous = this.previous;
303+
context.id = id;
304+
return context;
305+
}
306+
307+
private <R> FlowContext<R> copyContext(R data) {
277308
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
278309
this.parallel, this.parallelMode, LocalDateTime.now());
279-
context.previous = this.previous;
280310
context.status = this.status;
281311
context.trans = this.trans;
282-
context.id = id;
283312
context.batchId = this.batchId;
284313
context.toBatch = this.toBatch;
285314
context.createAt = this.createAt;

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class FlowStateNode extends FlowNode {
5454
*/
5555
@Override
5656
public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, FlowContextRepo<FlowData> repo,
57-
FlowContextMessenger messenger, FlowLocks locks) {
57+
FlowContextMessenger messenger, FlowLocks locks) {
5858
if (!Optional.ofNullable(this.processor).isPresent()) {
5959
Node<FlowData, FlowData> node = new Node<>(streamId, this.metaId, this::stateProduce, repo, messenger,
6060
locks, this.type);

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.List;
4343
import java.util.Optional;
4444
import java.util.Set;
45+
import java.util.concurrent.locks.Lock;
4546
import java.util.function.Consumer;
4647
import java.util.stream.Collectors;
4748

@@ -322,26 +323,64 @@ public void offer(List<FlowContext<I>> contexts, Consumer<PreSendCallbackInfo<I>
322323
// qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context
323324
java.util.Map<FitStream.Subscription<I, ?>, List<FlowContext<I>>> matchedContexts = new LinkedHashMap<>();
324325
Set<FlowContext<I>> matchedContextSet = new HashSet<>();
325-
qualifiedWhens.forEach(
326-
w -> {
327-
List<FlowContext<I>> afterContexts = contexts
328-
.stream()
329-
.filter(c -> w.getWhether().is(c))
330-
.peek(c -> c.setNextPositionId(w.getId()))
331-
.collect(Collectors.toList());
332-
matchedContexts.put(w, afterContexts);
333-
matchedContextSet.addAll(afterContexts);
326+
List<FlowContext<I>> forkedContexts = new ArrayList<>();
327+
for (FlowContext<I> contextItem : contexts) {
328+
List<FitStream.Subscription<I, ?>> matchedSubscriptions = qualifiedWhens.stream()
329+
.filter(w -> w.getWhether().is(contextItem))
330+
.collect(Collectors.toList());
331+
if (CollectionUtils.isEmpty(matchedSubscriptions)) {
332+
continue;
333+
}
334+
matchedContextSet.add(contextItem);
335+
for (int index = 0; index < matchedSubscriptions.size(); index++) {
336+
FitStream.Subscription<I, ?> matchedSubscription = matchedSubscriptions.get(index);
337+
FlowContext<I> branchContext = index == 0 ? contextItem : contextItem.fork();
338+
branchContext.setNextPositionId(matchedSubscription.getId());
339+
matchedContexts.computeIfAbsent(matchedSubscription, key -> new ArrayList<>()).add(branchContext);
340+
if (index > 0) {
341+
forkedContexts.add(branchContext);
334342
}
335-
);
343+
}
344+
}
345+
qualifiedWhens.forEach(w -> matchedContexts.computeIfAbsent(w, key -> new ArrayList<>()));
336346
List<FlowContext<I>> unMatchedContexts = contexts
337347
.stream()
338348
.filter(c -> !matchedContextSet.contains(c))
339349
.collect(Collectors.toList());
340350
PreSendCallbackInfo<I> callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts);
341351
preSendCallback.accept(callbackInfo);
352+
persistForkedContexts(forkedContexts, matchedContexts);
342353
matchedContexts.forEach(FitStream.Subscription::cache);
343354
}
344355

356+
private void persistForkedContexts(List<FlowContext<I>> forkedContexts,
357+
java.util.Map<FitStream.Subscription<I, ?>, List<FlowContext<I>>> matchedContexts) {
358+
if (CollectionUtils.isEmpty(forkedContexts)) {
359+
return;
360+
}
361+
Set<String> forkedIds = forkedContexts.stream().map(FlowContext::getId).collect(Collectors.toSet());
362+
List<FlowContext<I>> effectiveForkedContexts = matchedContexts.values()
363+
.stream()
364+
.flatMap(List::stream)
365+
.filter(context -> forkedIds.contains(context.getId()))
366+
.collect(Collectors.toList());
367+
if (CollectionUtils.isEmpty(effectiveForkedContexts)) {
368+
return;
369+
}
370+
Set<String> traces = effectiveForkedContexts.stream()
371+
.flatMap(context -> context.getTraceId().stream())
372+
.collect(Collectors.toSet());
373+
Lock lock = this.locks.getDistributedLock(this.locks.streamNodeLockKey(this.streamId, this.id,
374+
"ForkContextPool"));
375+
lock.lock();
376+
try {
377+
this.repo.updateContextPool(effectiveForkedContexts, traces);
378+
this.repo.save(effectiveForkedContexts);
379+
} finally {
380+
lock.unlock();
381+
}
382+
}
383+
345384
/**
346385
* 是否有publisher目标
347386
* 用于stream闭环时将没有subscribed的publisher关闭到close subscriber

0 commit comments

Comments
 (0)