Skip to content

Commit ab700ac

Browse files
committed
[fel] demonstrate data anonymization with example
1 parent c0085b0 commit ab700ac

3 files changed

Lines changed: 156 additions & 0 deletions

File tree

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,23 @@ public <O> AiMatchHappen<O, D, I, RF, F> match(Operators.Whether<I> whether,
5757
node -> processor.process(new AiState<>(node, this.flow())).state), this.flow());
5858
}
5959

60+
/**
61+
* 指定条件和处理器创建条件分支。
62+
*
63+
* @param whether 表示匹配条件的 {@link Operators.Whether}{@code <}{@link I}{@code >}。
64+
* @param processor 表示分支处理器的 {@link Operators.Then}{@code <}{@link O}{@code , }{@link D}{@code ,
65+
* }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。
66+
* @param <O> 表示第一个条件分支指定的返回类型。
67+
* @return 表示条件分支的 {@link AiMatchHappen}{@code <}{@link O}{@code , }{@link D}{@code ,
68+
* }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。
69+
* @throws IllegalArgumentException 当 {@code processor} 为 {@code null} 时。
70+
*/
71+
public <O> AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether,
72+
Operators.Then<I, O> processor) {
73+
Validation.notNull(processor, "Ai branch processor cannot be null.");
74+
return new AiWhenHappen<>(this.conditions.when(whether, processor), this.flow());
75+
}
76+
6077
/**
6178
* 指定条件和对应的处理器创建条件跳转分支。
6279
*
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fel.engine.activities;
8+
9+
import modelengine.fel.engine.flows.AiFlow;
10+
import modelengine.fit.waterflow.domain.flow.Flow;
11+
import modelengine.fit.waterflow.domain.states.WhenHappen;
12+
import modelengine.fit.waterflow.domain.stream.operators.Operators;
13+
import modelengine.fitframework.inspection.Validation;
14+
15+
/**
16+
* Represents a conditional branch that matches when conditions in an AI processing flow.
17+
* This class handles the branching logic when specific conditions are met in the workflow.
18+
*
19+
* @param <O> The output data type of the current node.
20+
* @param <D> The initial data type of the containing flow.
21+
* @param <I> The input parameter data type.
22+
* @param <RF> The internal flow type, extending {@link Flow}{@code <D>}.
23+
* @param <F> The AI flow type, extending {@link AiFlow}{@code <D, RF>}.
24+
*
25+
* @author 宋永坦
26+
* @since 2025-06-12
27+
*/
28+
public class AiWhenHappen<O, D, I, RF extends Flow<D>, F extends AiFlow<D, RF>> {
29+
private final WhenHappen<O, D, I, RF> matchHappen;
30+
31+
private final F flow;
32+
33+
public AiWhenHappen(WhenHappen<O, D, I, RF> matchHappen, F flow) {
34+
this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null.");
35+
this.flow = Validation.notNull(flow, "Flow cannot be null.");
36+
}
37+
38+
/**
39+
* Creates a conditional branch with the specified predicate and handler.
40+
*
41+
* @param whether The condition predicate that determines branch activation.
42+
* @param processor The transformation handler to execute when condition is met.
43+
* @return A new {@link AiWhenHappen} instance representing the conditional branch.
44+
* @throws IllegalArgumentException if processor is null.
45+
*/
46+
public AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether, Operators.Then<I, O> processor) {
47+
Validation.notNull(processor, "Ai branch processor cannot be null.");
48+
return new AiWhenHappen<>(this.matchHappen.when(whether, processor), this.flow);
49+
}
50+
51+
/**
52+
* Provides a default processing logic and terminates the conditional node.
53+
*
54+
* @param processor The handler to process unmatched inputs.
55+
* @return An {@link AiState} representing the terminal node of the conditional flow.
56+
* @throws IllegalArgumentException if processor is null
57+
*/
58+
public AiState<O, D, O, RF, F> others(Operators.Then<I, O> processor) {
59+
Validation.notNull(processor, "Ai branch processor cannot be null.");
60+
return new AiState<>(this.matchHappen.others(processor), this.flow);
61+
}
62+
}

framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import modelengine.fel.engine.operators.models.ChatFlowModel;
1717
import modelengine.fel.engine.operators.prompts.Prompts;
1818
import modelengine.fit.waterflow.domain.context.FlowSession;
19+
import modelengine.fit.waterflow.domain.context.StateContext;
1920
import modelengine.fit.waterflow.domain.utils.SleepUtil;
2021
import modelengine.fitframework.flowable.Choir;
2122

@@ -33,6 +34,82 @@
3334
* @since 2025-06-11
3435
*/
3536
public class AiFlowCaseTest {
37+
@Nested
38+
class DesensitizeCase {
39+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
40+
emitter.emit(new AiMessage("<think>"));
41+
for (int i = 0; i < 10; i++) {
42+
emitter.emit(new AiMessage(String.valueOf(i)));
43+
SleepUtil.sleep(100);
44+
}
45+
emitter.emit(new AiMessage("</think>"));
46+
for (int i = 100; i < 110; i++) {
47+
emitter.emit(new AiMessage(String.valueOf(i)));
48+
SleepUtil.sleep(100);
49+
}
50+
emitter.complete();
51+
}), ChatOption.custom().model("modelName").stream(true).build());
52+
53+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
54+
.prompt(Prompts.human("{{0}}"))
55+
.generate(model)
56+
.map(this::classic)
57+
.conditions()
58+
.when(chunk -> chunk.isThinkContent, input -> input)
59+
.others(input -> {
60+
this.log(input);
61+
return input;
62+
})
63+
.map(this::mockDesensitize)
64+
.close();
65+
66+
@Test
67+
void run() {
68+
AtomicInteger counter = new AtomicInteger(0);
69+
long startTime = System.currentTimeMillis();
70+
System.out.printf("time:%s, start.\n", startTime);
71+
ConverseLatch<String> result = flow.converse(new FlowSession(true)).doOnConsume(answer -> {
72+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
73+
counter.incrementAndGet();
74+
}).offer(Tip.fromArray("hi"));
75+
result.await();
76+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
77+
Assertions.assertEquals(22, counter.get());
78+
}
79+
80+
private Chunk classic(ChatMessage message, StateContext ctx) {
81+
if (message.text().trim().equals("<think>")) {
82+
ctx.setState("isThinking", true);
83+
return new Chunk(true, message.text());
84+
}
85+
if (message.text().trim().equals("</think>")) {
86+
ctx.setState("isThinking", false);
87+
return new Chunk(true, message.text());
88+
}
89+
if (Boolean.TRUE.equals(ctx.getState("isThinking"))) {
90+
return new Chunk(true, message.text());
91+
}
92+
return new Chunk(false, message.text());
93+
}
94+
95+
private String mockDesensitize(Chunk chunk) {
96+
return chunk.content.replace("3", "*");
97+
}
98+
99+
private void log(Chunk chunk) {
100+
System.out.println("log content:" + chunk.content);
101+
}
102+
103+
private static class Chunk {
104+
private final boolean isThinkContent;
105+
private final String content;
106+
107+
private Chunk(boolean isThinkContent, String content) {this.isThinkContent = isThinkContent;
108+
this.content = content;
109+
}
110+
}
111+
}
112+
36113
/**
37114
* Simulates a backpressure scenario where:
38115
* <ol>

0 commit comments

Comments
 (0)