Skip to content

Commit bb631ff

Browse files
committed
Merge branch '3.6.x'
2 parents 0403dd0 + 63a0ff9 commit bb631ff

File tree

27 files changed

+982
-87
lines changed

27 files changed

+982
-87
lines changed

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class CompleteContext extends FlowContext {
2727
*/
2828
public CompleteContext(FlowContext context, String position) {
2929
super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position,
30-
context.getParallel(), context.getParallelMode(), context.getSession());
30+
context.getParallel(), context.getParallelMode(), context.getSession(), context.getCreateAt());
3131
this.batchId = context.getBatchId();
3232
this.setIndex(Constants.NOT_PRESERVED_INDEX);
3333
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@ public class FlowContext<T> extends IdGenerator implements StateContext {
142142
@Setter
143143
private Integer index;
144144

145+
/**
146+
* 当前context接下来要走到位置:可以是连线或者节点id
147+
*/
148+
@Setter
149+
@Getter
150+
private String nextPositionId;
151+
145152
/**
146153
* 创建一个 {@link FlowContext} 实例。
147154
*
@@ -154,7 +161,7 @@ public class FlowContext<T> extends IdGenerator implements StateContext {
154161
*/
155162
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position,
156163
FlowSession session) {
157-
this(streamId, rootId, data, traceId, position, "", "", session);
164+
this(streamId, rootId, data, traceId, position, "", "", session, LocalDateTime.now());
158165
}
159166

160167
/**
@@ -167,10 +174,11 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
167174
* @param position 表示上下文当前所处的位置的 {@link String}。
168175
* @param parallel 表示并行节点唯一标识的 {@link String}。
169176
* @param parallelMode 表示并行模式的 {@link String}。
177+
* @param createAt 表示创建时间的 {@link LocalDateTime}。
170178
* @param session 表示上下文会话信息的 {@link FlowSession}。
171179
*/
172180
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position, String parallel,
173-
String parallelMode, FlowSession session) {
181+
String parallelMode, FlowSession session, LocalDateTime createAt) {
174182
this.streamId = streamId;
175183
this.rootId = rootId;
176184
this.data = data;
@@ -179,7 +187,7 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
179187
this.position = position;
180188
this.parallel = parallel;
181189
this.parallelMode = parallelMode;
182-
this.createAt = LocalDateTime.now();
190+
this.createAt = createAt;
183191
this.session = session;
184192
this.index = this.createIndex(); // 0起始,说明保序
185193
}
@@ -266,17 +274,19 @@ public FlowContext<T> toBatch(String toBatchId) {
266274
* @param <R> 表示返回值类型的泛型参数。
267275
* @param data 表示处理后数据的 {@link R}。
268276
* @param position 表示处理后所处的节点的 {@link String}。
277+
* @param createAt 表示创建时间的 {@link LocalDateTime}。
269278
* @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。
270279
*/
271-
public <R> FlowContext<R> generate(R data, String position) {
280+
public <R> FlowContext<R> generate(R data, String position, LocalDateTime createAt) {
272281
FlowContext<R> context = new FlowContext<>(this.streamId,
273282
this.rootId,
274283
data,
275284
this.traceId,
276285
this.position,
277286
this.parallel,
278287
this.parallelMode,
279-
this.session);
288+
this.session,
289+
createAt);
280290
context.position = position;
281291
context.previous = this.id;
282292
context.batchId = this.batchId;
@@ -293,7 +303,9 @@ public <R> FlowContext<R> generate(R data, String position) {
293303
* @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。
294304
*/
295305
public <R> List<FlowContext<R>> generate(List<R> dataList, String position) {
296-
return dataList.stream().map(data -> this.generate(data, position)).collect(Collectors.toList());
306+
return dataList.stream()
307+
.map(data -> this.generate(data, position, LocalDateTime.now()))
308+
.collect(Collectors.toList());
297309
}
298310

299311
/**
@@ -312,7 +324,8 @@ public <R> FlowContext<R> convertData(R data, String id) {
312324
this.position,
313325
this.parallel,
314326
this.parallelMode,
315-
this.session);
327+
this.session,
328+
LocalDateTime.now());
316329
context.previous = this.previous;
317330
context.status = this.status;
318331
context.id = id;

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class FlowSession extends IdGenerator implements StateContext {
7676
* @param preserved 是否保序
7777
*/
7878
public FlowSession(boolean preserved) {
79-
this(UUID.randomUUID().toString(), preserved);
79+
this.preserved = preserved;
8080
}
8181

8282
/**
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fit.waterflow.domain.context;
8+
9+
import java.util.List;
10+
import java.util.Set;
11+
import java.util.concurrent.locks.Lock;
12+
13+
/**
14+
* 提供trace的归属服务
15+
*
16+
* @author 夏斐
17+
* @since 2024/4/29
18+
*/
19+
public interface TraceOwner {
20+
/**
21+
* own
22+
*
23+
* @param traceId traceId
24+
* @param transId transId
25+
*/
26+
void own(String traceId, String transId);
27+
28+
/**
29+
* tryOwn
30+
*
31+
* @param traceId traceId
32+
* @param transId transId
33+
* @return boolean
34+
*/
35+
boolean tryOwn(String traceId, String transId);
36+
37+
/**
38+
* release
39+
*
40+
* @param traceId traceId
41+
*/
42+
void release(String traceId);
43+
44+
/**
45+
* isOwn
46+
*
47+
* @param traceId traceId
48+
* @return boolean
49+
*/
50+
boolean isOwn(String traceId);
51+
52+
/**
53+
* trace map中包含任意一个trace列表的值,返回true
54+
*
55+
* @param traceIds trace id列表
56+
* @return true or false
57+
*/
58+
boolean isAnyOwn(Set<String> traceIds);
59+
60+
/**
61+
* 获取链路标识列表。
62+
*
63+
* @return 链路标识列表。
64+
*/
65+
List<String> getTraces();
66+
67+
/**
68+
* 获取链路标识列表。
69+
*
70+
* @param targetTransId 实例标识。
71+
* @return 链路标识列表。
72+
*/
73+
List<String> getTraces(String targetTransId);
74+
75+
/**
76+
* 移除所有失效的链路标识。
77+
*
78+
* @param invalidLock 失效的链路标识锁。
79+
*/
80+
void removeInvalidTrace(Lock invalidLock);
81+
82+
/**
83+
* 判断trace是否在初始化保护期
84+
* 针对首次offer trace先加入到内存,但是实际数据库中还未插入时的情况使用
85+
*
86+
* @param traceId traceId
87+
* @return true-处于保护时间,false-超过保护时间
88+
*/
89+
boolean isInProtectTime(String traceId);
90+
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ private void fire() {
255255
cs.add(completeContext);
256256
List contexts = node.getProcessMode().process(node, cs);
257257
if (node instanceof Processor) {
258-
((Processor<?, ?>) node).offer(contexts);
258+
((Processor<?, ?>) node).offer(contexts, __ -> {});
259259
}
260260
completeContext = null;
261261
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public <I> void send(String nodeId, List<FlowContext<I>> contexts) {
3939
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
4040
*/
4141
@Override
42-
public <I> void sendCallback(List<FlowContext<I>> contexts) {
42+
public <I> void sendCallback(Object callback, List<FlowContext<I>> contexts) {
4343
LOG.warn("FlowEngine memo messenger does not support sending events.");
4444
}
4545
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,20 @@
88

99
import modelengine.fit.waterflow.domain.context.FlowContext;
1010
import modelengine.fit.waterflow.domain.context.FlowTrace;
11+
import modelengine.fit.waterflow.domain.context.TraceOwner;
1112
import modelengine.fit.waterflow.domain.enums.FlowNodeStatus;
1213
import modelengine.fit.waterflow.domain.stream.operators.Operators;
14+
import modelengine.fitframework.util.ObjectUtils;
1315

1416
import java.util.ArrayList;
1517
import java.util.Deque;
1618
import java.util.List;
1719
import java.util.Map;
1820
import java.util.Objects;
21+
import java.util.Set;
1922
import java.util.concurrent.ConcurrentHashMap;
2023
import java.util.concurrent.ConcurrentLinkedDeque;
24+
import java.util.concurrent.locks.Lock;
2125
import java.util.function.Function;
2226
import java.util.stream.Collectors;
2327
import java.util.stream.Stream;
@@ -33,6 +37,55 @@ public class FlowContextMemoRepo implements FlowContextRepo {
3337

3438
private final boolean isReserveTerminal;
3539

40+
private final TraceOwner traceOwner = new TraceOwner() {
41+
@Override
42+
public void own(String traceId, String transId) {
43+
}
44+
45+
@Override
46+
public boolean tryOwn(String traceId, String transId) {
47+
return true;
48+
}
49+
50+
@Override
51+
public void release(String traceId) {
52+
}
53+
54+
@Override
55+
public boolean isOwn(String traceId) {
56+
return true;
57+
}
58+
59+
@Override
60+
public boolean isAnyOwn(Set<String> traceIds) {
61+
return true;
62+
}
63+
64+
@Override
65+
public List<String> getTraces() {
66+
return List.of();
67+
}
68+
69+
@Override
70+
public List<String> getTraces(String targetTransId) {
71+
return List.of();
72+
}
73+
74+
@Override
75+
public void removeInvalidTrace(Lock invalidLock) {
76+
}
77+
78+
@Override
79+
public boolean isInProtectTime(String traceId) {
80+
return false;
81+
}
82+
};
83+
84+
@Override
85+
public TraceOwner getTraceOwner() {
86+
return this.traceOwner;
87+
}
88+
3689
/**
3790
* 构造方法
3891
*/
@@ -74,6 +127,11 @@ public <T> List<FlowContext<T>> getContextsByPosition(String streamId, String po
74127
.filter(context -> context.getStatus().toString().equals(status)));
75128
}
76129

130+
@Override
131+
public <T> List<FlowContext<T>> findWithoutFlowDataByTraceId(String traceId) {
132+
throw new IllegalStateException("Not support");
133+
}
134+
77135
@Override
78136
public <T> List<FlowContext<T>> getContextsByTrace(String traceId) {
79137
return query(stream -> stream
@@ -95,6 +153,21 @@ public synchronized <T> void save(List<FlowContext<T>> contexts) {
95153
});
96154
}
97155

156+
@Override
157+
public <T> void updateFlowDataAndToBatch(List<FlowContext<T>> contexts) {
158+
save(contexts);
159+
}
160+
161+
@Override
162+
public synchronized <T> void updateFlowData(Map<String, T> flowDataList) {
163+
flowDataList.forEach((contextId, data) -> {
164+
FlowContext<T> flowContext = ObjectUtils.cast(this.contextsMap.get(contextId));
165+
if (flowContext != null) {
166+
flowContext.setData(data);
167+
}
168+
});
169+
}
170+
98171
@Override
99172
public <T> void updateToSent(List<FlowContext<T>> contexts) {
100173
save(contexts);
@@ -125,6 +198,14 @@ public <T> List<FlowContext<T>> getByIds(List<String> ids) {
125198
return ids.stream().map(i -> (FlowContext<T>) contextsMap.get(i)).collect(Collectors.toList());
126199
}
127200

201+
@Override
202+
public <T> List<FlowContext<T>> getByToBatch(List<String> toBatchIds) {
203+
return query(stream -> stream
204+
.filter(context -> context.getStatus().equals(FlowNodeStatus.PENDING))
205+
.filter(FlowContext::isSent)
206+
.filter(context -> toBatchIds.contains(context.getToBatch())));
207+
}
208+
128209
@Override
129210
public <T> List<FlowContext<T>> requestMappingContext(String streamId, List<String> subscriptions,
130211
Map<String, Integer> sessions) {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ default <I> void send(ProcessType type, Subscriber<I, ?> subscriber, List<FlowCo
5050
/**
5151
* 发送回调函数事件到引擎外部
5252
*
53+
* @param callback 回调函数.
5354
* @param contexts 流程实例执行过程产生的contexts
5455
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
5556
*/
56-
<I> void sendCallback(List<FlowContext<I>> contexts);
57+
<I> void sendCallback(Object callback, List<FlowContext<I>> contexts);
5758

5859
/**
5960
* Directly processes a list of flow contexts through the specified subscriber.

0 commit comments

Comments
 (0)