Skip to content

Commit 987dca1

Browse files
authored
feat(chat): Enhance message content retrieval and streaming output (#193)
1 parent bcc2dcd commit 987dca1

6 files changed

Lines changed: 186 additions & 39 deletions

File tree

agentscope-core/src/main/java/io/agentscope/core/session/JsonSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.stream.Collectors;
2728
import java.util.stream.Stream;
2829
import reactor.core.publisher.Mono;
2930
import reactor.core.scheduler.Schedulers;
@@ -228,7 +229,7 @@ public List<String> listSessions() {
228229
// extension
229230
})
230231
.sorted()
231-
.collect(java.util.stream.Collectors.toList());
232+
.collect(Collectors.toList());
232233
}
233234
} catch (IOException e) {
234235
throw new RuntimeException("Failed to list sessions", e);

agentscope-core/src/test/java/io/agentscope/core/agent/test/TestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.agentscope.core.message.ToolUseBlock;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
import java.util.stream.Collectors;
2627

2728
/**
2829
* Utility methods for Agent module tests.
@@ -115,7 +116,7 @@ public static String extractTextContent(Msg msg) {
115116
return "";
116117
})
117118
.filter(s -> !s.isEmpty())
118-
.collect(java.util.stream.Collectors.joining("\n"));
119+
.collect(Collectors.joining("\n"));
119120
}
120121

121122
/**

agentscope-core/src/test/java/io/agentscope/core/tool/method/MethodToolExample.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.agentscope.core.message.TextBlock;
2323
import io.agentscope.core.tool.ExampleConfig;
2424
import io.agentscope.core.tool.Toolkit;
25+
import java.util.stream.Collectors;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

@@ -86,7 +87,7 @@ public static void main(String[] args) {
8687
response.getContent().stream()
8788
.filter(block -> block instanceof TextBlock)
8889
.map(block -> ((TextBlock) block).getText())
89-
.collect(java.util.stream.Collectors.joining("\n")));
90+
.collect(Collectors.joining("\n")));
9091
}
9192

9293
log.info("✅ Tool example completed successfully!");

agentscope-examples/quickstart/src/main/java/io/agentscope/examples/quickstart/ExampleUtils.java

Lines changed: 145 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,29 @@
1616
package io.agentscope.examples.quickstart;
1717

1818
import io.agentscope.core.agent.Agent;
19+
import io.agentscope.core.message.ContentBlock;
1920
import io.agentscope.core.message.Msg;
2021
import io.agentscope.core.message.MsgRole;
2122
import io.agentscope.core.message.TextBlock;
23+
import io.agentscope.core.message.ThinkingBlock;
2224
import io.agentscope.examples.quickstart.util.MsgUtils;
2325
import java.io.BufferedReader;
2426
import java.io.IOException;
2527
import java.io.InputStreamReader;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.stream.Collectors;
2631

2732
/**
2833
* Utility class providing common functionality for examples.
2934
*
30-
* <p>Features:
35+
* <p>
36+
* Features:
3137
*
3238
* <ul>
33-
* <li>Interactive API key configuration
34-
* <li>Chat loop implementation
35-
* <li>Helper methods for user interaction
39+
* <li>Interactive API key configuration
40+
* <li>Chat loop implementation
41+
* <li>Helper methods for user interaction
3642
* </ul>
3743
*/
3844
public class ExampleUtils {
@@ -64,9 +70,9 @@ public static String getOpenAIApiKey() throws IOException {
6470
/**
6571
* Get API key from environment variable or interactive input.
6672
*
67-
* @param envVarName environment variable name
73+
* @param envVarName environment variable name
6874
* @param serviceName service name for display
69-
* @param helpUrl URL to get API key
75+
* @param helpUrl URL to get API key
7076
* @return API key
7177
* @throws IOException if input fails
7278
*/
@@ -144,15 +150,96 @@ public static void startChat(Agent agent) throws IOException {
144150
.content(TextBlock.builder().text(input).build())
145151
.build();
146152

147-
Msg response = agent.call(userMsg).block();
153+
System.out.print("Agent> ");
148154

149-
if (response != null) {
150-
System.out.println("Agent> " + MsgUtils.getTextContent(response) + "\n");
151-
} else {
152-
System.out.println("Agent> [No response]\n");
155+
try {
156+
// Try to use stream() first for real-time output
157+
AtomicBoolean hasPrintedThinkingHeader = new AtomicBoolean(false);
158+
AtomicBoolean hasPrintedTextHeader = new AtomicBoolean(false);
159+
AtomicBoolean hasPrintedTextSeparator = new AtomicBoolean(false);
160+
AtomicReference<String> lastThinkingContent = new AtomicReference<>("");
161+
AtomicReference<String> lastTextContent = new AtomicReference<>("");
162+
163+
agent.stream(userMsg)
164+
.doOnNext(
165+
event -> {
166+
Msg msg = event.getMessage();
167+
for (ContentBlock block : msg.getContent()) {
168+
if (block instanceof ThinkingBlock) {
169+
printStreamContent(
170+
((ThinkingBlock) block).getThinking(),
171+
lastThinkingContent,
172+
hasPrintedThinkingHeader,
173+
"> Thinking: ",
174+
null);
175+
} else if (block instanceof TextBlock) {
176+
printStreamContent(
177+
((TextBlock) block).getText(),
178+
lastTextContent,
179+
hasPrintedTextHeader,
180+
"Text: ",
181+
() -> {
182+
if (hasPrintedThinkingHeader.get()
183+
&& !hasPrintedTextSeparator
184+
.get()) {
185+
System.out.print("\n\n");
186+
hasPrintedTextSeparator.set(true);
187+
}
188+
});
189+
}
190+
}
191+
})
192+
.blockLast();
193+
} catch (Exception e) {
194+
// Fallback to call() if streaming is not supported or fails
195+
if (e instanceof UnsupportedOperationException) {
196+
System.err.println(
197+
"\n[Info] Streaming not supported by this agent. Falling back to"
198+
+ " call().");
199+
} else {
200+
System.err.println(
201+
"\n[Warning] Exception during streaming: " + e.getMessage());
202+
e.printStackTrace();
203+
System.err.println("[Info] Falling back to call().");
204+
}
205+
206+
Msg response = agent.call(userMsg).block();
207+
if (response != null) {
208+
// Extract thinking and text separately to match streaming format
209+
String thinking =
210+
response.getContent().stream()
211+
.filter(block -> block instanceof ThinkingBlock)
212+
.map(block -> ((ThinkingBlock) block).getThinking())
213+
.collect(Collectors.joining("\n"));
214+
215+
String text =
216+
response.getContent().stream()
217+
.filter(block -> block instanceof TextBlock)
218+
.map(block -> ((TextBlock) block).getText())
219+
.collect(Collectors.joining("\n"));
220+
221+
boolean hasContent = false;
222+
if (!thinking.isEmpty()) {
223+
System.out.print("> Thinking: " + thinking);
224+
hasContent = true;
225+
}
226+
if (!text.isEmpty()) {
227+
if (hasContent) {
228+
System.out.print("\n\n");
229+
}
230+
System.out.print("Text: " + text);
231+
hasContent = true;
232+
}
233+
if (!hasContent) {
234+
System.out.print("[No response]");
235+
}
236+
}
153237
}
238+
239+
System.out.println("\n");
240+
154241
} catch (Exception e) {
155-
System.err.println("Error: " + e.getMessage());
242+
System.err.println("\nError: " + e.getMessage());
156243
e.printStackTrace();
157244
}
158245
}
@@ -171,7 +258,7 @@ public static String readLine() throws IOException {
171258
/**
172259
* Print a welcome banner.
173260
*
174-
* @param title example title
261+
* @param title example title
175262
* @param description example description
176263
*/
177264
public static void printWelcome(String title, String description) {
@@ -189,4 +276,49 @@ public static void printWelcome(String title, String description) {
189276
public static String extractTextFromMsg(Msg msg) {
190277
return MsgUtils.getTextContent(msg);
191278
}
279+
280+
/**
281+
* Helper method to print streaming content.
282+
*
283+
* @param content content to print
284+
* @param lastContentRef reference to the last content for delta
285+
* calculation
286+
* @param hasPrintedHeaderRef reference to whether the header has been printed
287+
* @param header header to print
288+
* @param prePrintAction action to run before printing (e.g., adding
289+
* separators)
290+
*/
291+
private static void printStreamContent(
292+
String content,
293+
AtomicReference<String> lastContentRef,
294+
AtomicBoolean hasPrintedHeaderRef,
295+
String header,
296+
Runnable prePrintAction) {
297+
String lastContent = lastContentRef.get();
298+
String toPrint;
299+
300+
// Detect if cumulative or incremental
301+
if (content.startsWith(lastContent)) {
302+
// Cumulative: print only new part
303+
toPrint = content.substring(lastContent.length());
304+
lastContentRef.set(content);
305+
} else {
306+
// Incremental: print as-is and append
307+
toPrint = content;
308+
lastContentRef.set(lastContent + content);
309+
}
310+
311+
if (!toPrint.isEmpty()) {
312+
if (prePrintAction != null) {
313+
prePrintAction.run();
314+
}
315+
316+
if (!hasPrintedHeaderRef.get()) {
317+
System.out.print(header);
318+
hasPrintedHeaderRef.set(true);
319+
}
320+
System.out.print(toPrint);
321+
System.out.flush();
322+
}
323+
}
192324
}

agentscope-examples/quickstart/src/main/java/io/agentscope/examples/quickstart/util/MsgUtils.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,42 @@
2626
import java.util.stream.Collectors;
2727

2828
/**
29-
* Utility methods for working with Msg in examples. These are convenience methods for common
29+
* Utility methods for working with Msg in examples. These are convenience
30+
* methods for common
3031
* operations.
3132
*/
3233
public class MsgUtils {
3334

3435
/**
35-
* Extract text content from a message. Concatenates text from all text-containing blocks
36+
* Extract text content from a message. Concatenates text from all
37+
* text-containing blocks
3638
* (TextBlock and ThinkingBlock).
3739
*
3840
* @param msg The message to extract text from
3941
* @return Concatenated text content or empty string if not available
4042
*/
4143
public static String getTextContent(Msg msg) {
42-
return msg.getContent().stream()
43-
.map(
44-
block -> {
45-
if (block instanceof TextBlock) {
46-
return "Text: " + ((TextBlock) block).getText();
47-
} else if (block instanceof ThinkingBlock) {
48-
return "Thinking: " + ((ThinkingBlock) block).getThinking();
49-
}
50-
return "";
51-
})
52-
.filter(s -> !s.isEmpty())
53-
.collect(Collectors.joining("\n"));
44+
String thinking =
45+
msg.getContent().stream()
46+
.filter(block -> block instanceof ThinkingBlock)
47+
.map(block -> ((ThinkingBlock) block).getThinking())
48+
.collect(Collectors.joining("\n"));
49+
50+
String text =
51+
msg.getContent().stream()
52+
.filter(block -> block instanceof TextBlock)
53+
.map(block -> ((TextBlock) block).getText())
54+
.collect(Collectors.joining("\n"));
55+
56+
if (!thinking.isEmpty() && !text.isEmpty()) {
57+
return thinking + "\n\n" + text;
58+
} else if (!thinking.isEmpty()) {
59+
return thinking;
60+
} else if (!text.isEmpty()) {
61+
return text;
62+
} else {
63+
return "[No response]";
64+
}
5465
}
5566

5667
/**
@@ -98,8 +109,8 @@ public static Msg textMsg(String name, MsgRole role, String text) {
98109
/**
99110
* Create a message with image content (convenience method).
100111
*
101-
* @param name Sender name
102-
* @param role Message role
112+
* @param name Sender name
113+
* @param role Message role
103114
* @param source Image source
104115
* @return Message with image content
105116
*/
@@ -114,8 +125,8 @@ public static Msg imageMsg(String name, MsgRole role, Source source) {
114125
/**
115126
* Create a message with audio content (convenience method).
116127
*
117-
* @param name Sender name
118-
* @param role Message role
128+
* @param name Sender name
129+
* @param role Message role
119130
* @param source Audio source
120131
* @return Message with audio content
121132
*/
@@ -130,8 +141,8 @@ public static Msg audioMsg(String name, MsgRole role, Source source) {
130141
/**
131142
* Create a message with video content (convenience method).
132143
*
133-
* @param name Sender name
134-
* @param role Message role
144+
* @param name Sender name
145+
* @param role Message role
135146
* @param source Video source
136147
* @return Message with video content
137148
*/
@@ -146,8 +157,8 @@ public static Msg videoMsg(String name, MsgRole role, Source source) {
146157
/**
147158
* Create a message with thinking content (convenience method).
148159
*
149-
* @param name Sender name
150-
* @param role Message role
160+
* @param name Sender name
161+
* @param role Message role
151162
* @param thinking Thinking content
152163
* @return Message with thinking content
153164
*/

agentscope-extensions/agentscope-extensions-rag-simple/src/test/java/io/agentscope/core/rag/e2e/TestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.agentscope.core.message.ToolUseBlock;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
import java.util.stream.Collectors;
2627

2728
/**
2829
* Utility methods for Agent module tests.
@@ -115,7 +116,7 @@ public static String extractTextContent(Msg msg) {
115116
return "";
116117
})
117118
.filter(s -> !s.isEmpty())
118-
.collect(java.util.stream.Collectors.joining("\n"));
119+
.collect(Collectors.joining("\n"));
119120
}
120121

121122
/**

0 commit comments

Comments
 (0)