3434 * @since 2025-06-11
3535 */
3636public class AiFlowCaseTest {
37+ private static final int SPEED = 1 ;
3738 @ Nested
3839 class DesensitizeCase {
3940 private final ChatFlowModel model = new ChatFlowModel ((prompt , chatOption ) -> Choir .create (emitter -> {
4041 emitter .emit (new AiMessage ("<think>" ));
41- for (int i = 0 ; i < 10 ; i ++) {
42+ int takeTime = 10 * SPEED ;
43+ SleepUtil .sleep (takeTime );
44+ for (int i = 0 ; i < 48 ; i ++) {
4245 emitter .emit (new AiMessage (String .valueOf (i )));
43- SleepUtil .sleep (100 );
46+ SleepUtil .sleep (takeTime );
4447 }
4548 emitter .emit (new AiMessage ("</think>" ));
46- for (int i = 100 ; i < 110 ; i ++) {
49+ SleepUtil .sleep (takeTime );
50+ for (int i = 100 ; i < 150 ; i ++) {
4751 emitter .emit (new AiMessage (String .valueOf (i )));
48- SleepUtil .sleep (100 );
52+ SleepUtil .sleep (takeTime );
4953 }
5054 emitter .complete ();
5155 }), ChatOption .custom ().model ("modelName" ).stream (true ).build ());
@@ -60,7 +64,8 @@ class DesensitizeCase {
6064 this .log (input );
6165 return input ;
6266 })
63- .map (this ::mockDesensitize )
67+ .map (this ::mockDesensitize1 )
68+ .map (this ::mockDesensitize2 )
6469 .close ();
6570
6671 @ Test
@@ -74,7 +79,7 @@ void run() {
7479 }).offer (Tip .fromArray ("hi" ));
7580 result .await ();
7681 System .out .printf ("time:%s, cost=%s\n " , System .currentTimeMillis (), System .currentTimeMillis () - startTime );
77- Assertions .assertEquals (22 , counter .get ());
82+ Assertions .assertEquals (100 , counter .get ());
7883 }
7984
8085 private Chunk classic (ChatMessage message , StateContext ctx ) {
@@ -92,10 +97,16 @@ private Chunk classic(ChatMessage message, StateContext ctx) {
9297 return new Chunk (false , message .text ());
9398 }
9499
95- private String mockDesensitize (Chunk chunk ) {
100+ private String mockDesensitize1 (Chunk chunk ) {
101+ SleepUtil .sleep (10 * SPEED );
96102 return chunk .content .replace ("3" , "*" );
97103 }
98104
105+ private String mockDesensitize2 (String chunk ) {
106+ SleepUtil .sleep (10 * SPEED );
107+ return chunk .replace ("4" , "*" );
108+ }
109+
99110 private void log (Chunk chunk ) {
100111 System .out .println ("log content:" + chunk .content );
101112 }
@@ -121,9 +132,9 @@ private static class Chunk {
121132 @ Nested
122133 class BackPressureCase {
123134 private final ChatFlowModel model = new ChatFlowModel ((prompt , chatOption ) -> Choir .create (emitter -> {
124- for (int i = 0 ; i < 10 ; i ++) {
135+ for (int i = 0 ; i < 100 ; i ++) {
125136 emitter .emit (new AiMessage (String .valueOf (i )));
126- SleepUtil .sleep (50 );
137+ SleepUtil .sleep (5 * SPEED );
127138 }
128139 emitter .complete ();
129140 System .out .printf ("time:%s, generate completed.\n " , System .currentTimeMillis ());
@@ -132,6 +143,7 @@ class BackPressureCase {
132143 private final AiProcessFlow <Tip , String > flow = AiFlows .<Tip >create ()
133144 .prompt (Prompts .human ("{{0}}" ))
134145 .generate (model )
146+ .map (this ::mockDesensitize ).concurrency (1 ) // Limit processing to 1 concurrent thread
135147 .map (this ::mockTTS ).concurrency (1 ) // Limit processing to 1 concurrent thread
136148 .close ();
137149
@@ -146,13 +158,19 @@ void run() {
146158 }).offer (Tip .fromArray ("hi" ));
147159 result .await ();
148160 System .out .printf ("time:%s, cost=%s\n " , System .currentTimeMillis (), System .currentTimeMillis () - startTime );
149- Assertions .assertEquals (10 , counter .get ());
161+ Assertions .assertEquals (100 , counter .get ());
162+ }
163+
164+ private String mockDesensitize (ChatMessage chunk ) {
165+ // Simulate time-consuming operation with a delay.
166+ SleepUtil .sleep (10 * SPEED );
167+ return chunk .text ().replace ("3" , "*" );
150168 }
151169
152- private String mockTTS (ChatMessage chunk ) {
170+ private String mockTTS (String chunk ) {
153171 // Simulate time-consuming operation with a delay.
154- SleepUtil .sleep (100 );
155- return chunk . text () ;
172+ SleepUtil .sleep (10 * SPEED );
173+ return chunk ;
156174 }
157175 }
158176
@@ -167,9 +185,9 @@ private String mockTTS(ChatMessage chunk) {
167185 @ Nested
168186 class ConcurrencyCase {
169187 private final ChatFlowModel model = new ChatFlowModel ((prompt , chatOption ) -> Choir .create (emitter -> {
170- for (int i = 0 ; i < 10 ; i ++) {
188+ for (int i = 0 ; i < 100 ; i ++) {
171189 emitter .emit (new AiMessage (String .valueOf (i )));
172- SleepUtil .sleep (50 );
190+ SleepUtil .sleep (10 * SPEED );
173191 }
174192 emitter .complete ();
175193 }), ChatOption .custom ().model ("modelName" ).stream (true ).build ());
@@ -191,12 +209,12 @@ void run() {
191209 }).offer (Tip .fromArray ("hi" ));
192210 result .await ();
193211 System .out .printf ("time:%s, cost=%s\n " , System .currentTimeMillis (), System .currentTimeMillis () - startTime );
194- Assertions .assertEquals (10 , counter .get ());
212+ Assertions .assertEquals (100 , counter .get ());
195213 }
196214
197215 private String mockDesensitize (ChatMessage chunk ) {
198216 // Simulate slower processing at 1/3 speed of LLM generation.
199- SleepUtil .sleep (150 );
217+ SleepUtil .sleep (30 * SPEED );
200218 return chunk .text ().replace ("3" , "*" );
201219 }
202220 }
0 commit comments