Skip to content

Commit 920e7d6

Browse files
committed
optimize javadoc
1 parent 496aa0b commit 920e7d6

46 files changed

Lines changed: 570 additions & 171 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616
* @since 1.0
1717
*/
1818
public class CompleteContext extends FlowContext {
19+
/**
20+
* 构造一个 {@link CompleteContext} 实例。
21+
* <p>
22+
* 该构造函数用于在 session window complete 时创建一个结束上下文,通知 reduce 节点结束累积操作。
23+
* </p>
24+
*
25+
* @param context 表示当前上下文的 {@link FlowContext}。
26+
* @param position 表示上下文当前所处位置的 {@link String}。
27+
*/
1928
public CompleteContext(FlowContext context, String position) {
2029
super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position,
2130
context.getParallel(), context.getParallelMode(), context.getSession());

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlatMapWindow.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class FlatMapWindow extends Window {
3131
@Getter
3232
private Window source;
3333

34+
/**
35+
* 创建一个flatmap window
36+
*
37+
* @param from flatmap source window
38+
*/
3439
public FlatMapWindow(FlatMapSourceWindow from) {
3540
super();
3641
this.from = from;
@@ -112,6 +117,11 @@ public Object acc() {
112117
return this.from.acc();
113118
}
114119

120+
/**
121+
* 设置acc
122+
*
123+
* @param acc acc
124+
*/
115125
@Override
116126
public void setAcc(Object acc) {
117127
this.from.setAcc(acc);

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,10 @@ public class FlowContext<T> extends IdGenerator implements StateContext {
150150
* @param data 表示上下文里所带数据的 {@link T}。
151151
* @param traceId 表示路径唯一标识的 {@link Set}{@code <}的{@link String}{@code >}。
152152
* @param position 表示上下文当前所处的位置的 {@link String}。
153+
* @param session 表示上下文会话信息的 {@link FlowSession}。
153154
*/
154155
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position,
155-
FlowSession session) {
156+
FlowSession session) {
156157
this(streamId, rootId, data, traceId, position, "", "", session);
157158
}
158159

@@ -166,6 +167,7 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
166167
* @param position 表示上下文当前所处的位置的 {@link String}。
167168
* @param parallel 表示并行节点唯一标识的 {@link String}。
168169
* @param parallelMode 表示并行模式的 {@link String}。
170+
* @param session 表示上下文会话信息的 {@link FlowSession}。
169171
*/
170172
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position, String parallel,
171173
String parallelMode, FlowSession session) {
@@ -182,7 +184,7 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
182184
this.index = this.createIndex(); // 0起始,说明保序
183185
}
184186

185-
private Integer createIndex(){
187+
private Integer createIndex() {
186188
return session.preserved() ? session.getWindow().tokenCount() : -1;
187189
}
188190

@@ -261,13 +263,20 @@ public FlowContext<T> toBatch(String toBatchId) {
261263
/**
262264
* 在 map,reduce,produce 的过程中把大多数上一个 context 的内容复制给下一个。
263265
*
266+
* @param <R> 表示返回值类型的泛型参数。
264267
* @param data 表示处理后数据的 {@link R}。
265268
* @param position 表示处理后所处的节点的 {@link String}。
266269
* @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。
267270
*/
268271
public <R> FlowContext<R> generate(R data, String position) {
269-
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
270-
this.parallel, this.parallelMode, this.session);
272+
FlowContext<R> context = new FlowContext<>(this.streamId,
273+
this.rootId,
274+
data,
275+
this.traceId,
276+
this.position,
277+
this.parallel,
278+
this.parallelMode,
279+
this.session);
271280
context.position = position;
272281
context.previous = this.id;
273282
context.batchId = this.batchId;
@@ -278,6 +287,7 @@ public <R> FlowContext<R> generate(R data, String position) {
278287
/**
279288
* 在 map,reduce,produce 的过程中把大多数上一个 context 的内容复制给下一个。
280289
*
290+
* @param <R> 表示返回值类型的泛型参数。
281291
* @param dataList 表示处理后数据的 {@link List}{@code <}{@link R}{@code >}。
282292
* @param position 表示处理后所处节点的 {@link String}。
283293
* @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。
@@ -289,13 +299,20 @@ public <R> List<FlowContext<R>> generate(List<R> dataList, String position) {
289299
/**
290300
* 用于 when.convert 数据时候的转换 context,除了包裹的数据类型不一样,所有其他信息都一样。
291301
*
302+
* @param <R> 表示返回值类型的泛型参数。
292303
* @param data 表示转换后数据的 {@link R}。
293304
* @param id 表示 contextId 的 {@link String}。
294305
* @return 表示转换后的 context 的 {@link FlowContext}{@code <}{@link R}{@code >}。
295306
*/
296307
public <R> FlowContext<R> convertData(R data, String id) {
297-
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
298-
this.parallel, this.parallelMode, this.session);
308+
FlowContext<R> context = new FlowContext<>(this.streamId,
309+
this.rootId,
310+
data,
311+
this.traceId,
312+
this.position,
313+
this.parallel,
314+
this.parallelMode,
315+
this.session);
299316
context.previous = this.previous;
300317
context.status = this.status;
301318
context.id = id;
@@ -336,6 +353,11 @@ public Object keyBy() {
336353
return this.session.keyBy();
337354
}
338355

356+
/**
357+
* 获取window
358+
*
359+
* @return 窗口
360+
*/
339361
public Window getWindow() {
340362
return this.getSession().getWindow();
341363
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import modelengine.fitframework.util.ObjectUtils;
1313

1414
import java.util.Map;
15-
import java.util.Optional;
1615
import java.util.UUID;
1716
import java.util.concurrent.ConcurrentHashMap;
1817
import java.util.function.Consumer;
@@ -99,7 +98,7 @@ public FlowSession(String id) {
9998
/**
10099
* 构造方法,使用指定的 ID 和保序标识。
101100
*
102-
* @param id session 的唯一标识
101+
* @param id session 的唯一标识
103102
* @param preserved 是否保序
104103
*/
105104
public FlowSession(String id, boolean preserved) {
@@ -131,7 +130,7 @@ public FlowSession(FlowSession session) {
131130
* @param session the original {@link FlowSession} to copy properties from.
132131
* @param window the {@link Window} configuration to apply to the new session.
133132
* @return a new {@link FlowSession} instance with properties copied from the original session.
134-
* and the specified window configuration applied
133+
* and the specified window configuration applied
135134
*/
136135
public static FlowSession from(FlowSession session, Window window) {
137136
FlowSession newSession = new FlowSession(session.getId(), session.preserved);
@@ -149,7 +148,7 @@ public static FlowSession from(FlowSession session, Window window) {
149148
* @param session the original {@link FlowSession} to copy state from.
150149
* @param preserved {@code boolean} indicates whether the new session should be created as a preserved session.
151150
* @return a new root-level {@link FlowSession} initialized with the specified preservation state.
152-
* and containing copied state from the original session
151+
* and containing copied state from the original session
153152
*/
154153
public static FlowSession newRootSession(FlowSession session, boolean preserved) {
155154
FlowSession newSession = new FlowSession(preserved);
@@ -187,6 +186,7 @@ public Window getWindow() {
187186
/**
188187
* 设置当前 session 的window对象,并确保window关联到当前 session。
189188
*
189+
* @param <I> 泛型类型,表示上下文的数据类型
190190
* @param window 要设置的 Window 实例
191191
*/
192192
public <I> void setWindow(Window window) {
@@ -258,6 +258,7 @@ public void setState(String key, Object value) {
258258
/**
259259
* 获取指定键的内置上下文数据。
260260
*
261+
* @param <R> 泛型类型,表示上下文的数据类型
261262
* @param key 表示键的 {@link String}。
262263
* @return 上下文数据 {@link R}。
263264
*/
@@ -319,6 +320,7 @@ private void copyState(FlowSession session) {
319320
/**
320321
* 开始当前 session 的窗口,如果窗口尚未初始化,则创建一个新的 Window 实例并关联到当前 session。
321322
*
323+
* @param <I> 泛型类型,表示上下文的数据类型
322324
* @return 当前的 Window 实例
323325
*/
324326
public <I> Window begin() {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowTrace.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class FlowTrace extends IdGenerator {
7474
*/
7575
private FlowTraceStatus status = FlowTraceStatus.READY;
7676

77+
/**
78+
* 默认构造函数
79+
*/
7780
public FlowTrace() {
7881
contextPool = new HashSet<>();
7982
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,27 @@ public class MatchWindow extends Window {
2525

2626
private final Set<MatchWindow> arms = new HashSet<>();
2727

28+
/**
29+
* 创建一个MatchWindow
30+
*
31+
* @param source 源窗口
32+
* @param id 窗口ID
33+
* @param data 窗口数据
34+
*/
2835
public MatchWindow(Window source, UUID id, Object data) {
2936
super(inputs -> false, id);
3037
this.from = source;
3138
source.addTo(this);
3239
}
3340

41+
/**
42+
* 创建一个MatchWindow
43+
*
44+
* @param source 源窗口
45+
* @param id 窗口ID
46+
* @param data 窗口数据
47+
* @return 返回创建的MatchWindow对象
48+
*/
3449
public static synchronized MatchWindow from(Window source, UUID id, Object data) {
3550
MatchWindow window = all.get(id.toString());
3651
if (window == null) {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/StateContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public interface StateContext {
1616
* 获取指定key的上下文数据
1717
*
1818
* @param key 指定key
19+
* @param <R> 返回值的类型
1920
* @return 上下文数据
2021
*/
2122
<R> R getState(String key);

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,23 +78,37 @@ public class Window implements Completable {
7878

7979
private To node = null;
8080

81+
/**
82+
* 创建窗口
83+
*
84+
* @param condition 窗口条件
85+
* @param id 窗口ID
86+
*/
8187
public Window(Operators.WindowCondition condition, UUID id) {
8288
this.condition = condition;
8389
this.id = id;
8490
}
8591

92+
/**
93+
* 创建窗口
94+
*
95+
* @param condition 窗口条件
96+
*/
8697
public Window(Operators.WindowCondition condition) {
8798
this(condition, UUID.randomUUID());
8899
}
89100

101+
/**
102+
* 创建窗口
103+
*/
90104
public Window() {
91105
this(arg -> false);
92106
}
93107

94108
/**
95109
* 待删除
96110
*
97-
* @return
111+
* @return 待删除的token数量
98112
*/
99113
public int getTosSize() {
100114
return tos.size();
@@ -133,7 +147,8 @@ public String id() {
133147
* @return 是否到达
134148
*/
135149
public boolean fulfilled() {
136-
WindowArg arg = new WindowArg(this.isComplete(), this.tokens.size(),
150+
WindowArg arg = new WindowArg(this.isComplete(),
151+
this.tokens.size(),
137152
this.tokens.stream().filter(t -> !t.initialized() && !t.isReduced()).count(),
138153
Duration.between(this.now.get(), LocalDateTime.now()));
139154
// consuming and consumed are all counted
@@ -314,7 +329,8 @@ public <T, R> void setCompleteHook(To<T, R> to, FlowContext context) {
314329
}
315330

316331
/**
317-
* if this session window is closed and all elements have been consumed, then notify listener stream that i'm totally consumed
332+
* if this session window is closed and all elements have been consumed, then notify listener stream that i'm
333+
* totally consumed
318334
**/
319335
public void tryFinish() {
320336
synchronized (this) {
@@ -342,7 +358,7 @@ public Integer tokenCount() {
342358
/**
343359
* 待删除
344360
*
345-
* @return
361+
* @return token数量
346362
*/
347363
public synchronized String debugTokens() {
348364
return this.tokens.hashCode() + "-" + this.tokens.stream()

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/WindowToken.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,29 @@ public class WindowToken {
1919
* 状态枚举
2020
*/
2121
enum Status {
22+
/**
23+
* 初始化
24+
*/
2225
INITIALIZED,
26+
27+
/**
28+
* 正在处理
29+
*/
2330
CONSUMING,
31+
32+
/**
33+
* 已处理完成
34+
*/
2435
CONSUMED
2536
}
2637

2738
private final Window window;
2839

40+
/**
41+
* 状态
42+
*
43+
* @return 状态
44+
*/
2945
public Status getStatus() {
3046
return this.status;
3147
}
@@ -34,6 +50,11 @@ public Status getStatus() {
3450

3551
private boolean reduced;
3652

53+
/**
54+
* 构造函数
55+
*
56+
* @param window 窗口
57+
*/
3758
public WindowToken(Window window) {
3859
this.window = window;
3960
}

0 commit comments

Comments
 (0)