Skip to content

Commit 2bda813

Browse files
committed
[fel] add key examples for backpressure and concurrency
1 parent aae4738 commit 2bda813

2 files changed

Lines changed: 129 additions & 0 deletions

File tree

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class Conversation<D, R> {
4747
*/
4848
public Conversation(AiProcessFlow<D, R> flow, FlowSession session) {
4949
this.flow = Validation.notNull(flow, "Flow cannot be null.");
50+
if (session != null) {
51+
session.begin();
52+
}
5053
this.session =
5154
(session == null) ? this.setConverseListener(new FlowSession(true)) : this.setSubConverseListener(session);
5255
this.session.begin();
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fel.engine;
8+
9+
import modelengine.fel.core.chat.ChatMessage;
10+
import modelengine.fel.core.chat.ChatOption;
11+
import modelengine.fel.core.chat.support.AiMessage;
12+
import modelengine.fel.core.util.Tip;
13+
import modelengine.fel.engine.flows.AiFlows;
14+
import modelengine.fel.engine.flows.AiProcessFlow;
15+
import modelengine.fel.engine.flows.ConverseLatch;
16+
import modelengine.fel.engine.operators.models.ChatFlowModel;
17+
import modelengine.fel.engine.operators.prompts.Prompts;
18+
import modelengine.fit.waterflow.domain.context.FlowSession;
19+
import modelengine.fit.waterflow.domain.utils.SleepUtil;
20+
import modelengine.fitframework.flowable.Choir;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Nested;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
28+
/**
29+
* Test cases demonstrating different flow control scenarios in AI processing pipelines.
30+
* Contains nested test classes for specific flow control mechanisms.
31+
*
32+
* @author 宋永坦
33+
* @since 2025-06-11
34+
*/
35+
public class AiFlowCaseTest {
36+
/**
37+
* Simulates a backpressure scenario where:
38+
* <ol>
39+
* <li>The LLM generates data (50ms per item) faster than the TTS can process it.</li>
40+
* <li>TTS processing is constrained to a single thread.</li>
41+
* <li>TTS processing speed is artificially slowed (100ms per item).</li>
42+
* </ol>
43+
*/
44+
@Nested
45+
class BackPressureCase {
46+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
47+
for (int i = 0; i < 10; i++) {
48+
emitter.emit(new AiMessage(String.valueOf(i)));
49+
SleepUtil.sleep(50);
50+
}
51+
emitter.complete();
52+
System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis());
53+
}), ChatOption.custom().model("modelName").stream(true).build());
54+
55+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
56+
.prompt(Prompts.human("{{0}}"))
57+
.generate(model)
58+
.map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread
59+
.close();
60+
61+
@Test
62+
void run() {
63+
AtomicInteger counter = new AtomicInteger(0);
64+
long startTime = System.currentTimeMillis();
65+
System.out.printf("time:%s, start.\n", startTime);
66+
ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> {
67+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
68+
counter.incrementAndGet();
69+
}).offer(Tip.fromArray("hi"));
70+
result.await();
71+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
72+
Assertions.assertEquals(10, counter.get());
73+
}
74+
75+
private String mockTTS(ChatMessage chunk) {
76+
// Simulate time-consuming operation with a delay.
77+
SleepUtil.sleep(100);
78+
return chunk.text();
79+
}
80+
}
81+
82+
/**
83+
* Demonstrates concurrent processing with balanced throughput where:
84+
* <ol>
85+
* <li>LLM generates data at moderate pace (50ms per item)</li>
86+
* <li>Downstream processing runs with 3 concurrent threads</li>
87+
* <li>Processing speed is slightly slower than generation (150ms vs 50ms)</li>
88+
* </ol>
89+
*/
90+
@Nested
91+
class ConcurrencyCase {
92+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
93+
for (int i = 0; i < 10; i++) {
94+
emitter.emit(new AiMessage(String.valueOf(i)));
95+
SleepUtil.sleep(50);
96+
}
97+
emitter.complete();
98+
}), ChatOption.custom().model("modelName").stream(true).build());
99+
100+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
101+
.prompt(Prompts.human("{{0}}"))
102+
.generate(model)
103+
.map(this::mockDesensitize).concurrency(3) // Set processing to 3 concurrent thread
104+
.close();
105+
106+
@Test
107+
void run() {
108+
AtomicInteger counter = new AtomicInteger(0);
109+
long startTime = System.currentTimeMillis();
110+
System.out.printf("time:%s, start.\n", startTime);
111+
ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> {
112+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
113+
counter.incrementAndGet();
114+
}).offer(Tip.fromArray("hi"));
115+
result.await();
116+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
117+
Assertions.assertEquals(10, counter.get());
118+
}
119+
120+
private String mockDesensitize(ChatMessage chunk) {
121+
// Simulate slower processing at 1/3 speed of LLM generation.
122+
SleepUtil.sleep(150);
123+
return chunk.text().replace("3", "*");
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)