Skip to content

Commit ce04b44

Browse files
committed
fix(agent): preserve cumulative stream history after tool calls
1 parent 60f3909 commit ce04b44

2 files changed

Lines changed: 1454 additions & 1108 deletions

File tree

Lines changed: 217 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -1,163 +1,217 @@
1-
/*
2-
* Copyright 2024-2026 the original author or authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package io.agentscope.core.agent;
17-
18-
import io.agentscope.core.hook.ActingChunkEvent;
19-
import io.agentscope.core.hook.Hook;
20-
import io.agentscope.core.hook.HookEvent;
21-
import io.agentscope.core.hook.PostActingEvent;
22-
import io.agentscope.core.hook.PostReasoningEvent;
23-
import io.agentscope.core.hook.PostSummaryEvent;
24-
import io.agentscope.core.hook.ReasoningChunkEvent;
25-
import io.agentscope.core.hook.SummaryChunkEvent;
26-
import io.agentscope.core.message.ContentBlock;
27-
import io.agentscope.core.message.Msg;
28-
import io.agentscope.core.message.MsgRole;
29-
import io.agentscope.core.message.ToolResultBlock;
30-
import java.util.ArrayList;
31-
import java.util.HashMap;
32-
import java.util.List;
33-
import java.util.Map;
34-
import reactor.core.publisher.FluxSink;
35-
import reactor.core.publisher.Mono;
36-
37-
/**
38-
* Internal hook implementation for streaming events.
39-
*
40-
* <p>Intercepts hook callbacks and emits {@link Event} instances to a FluxSink. Handles event
41-
* filtering and chunk mode processing.
42-
*/
43-
class StreamingHook implements Hook {
44-
45-
private final FluxSink<Event> sink;
46-
private final StreamOptions options;
47-
48-
// Track previous content for incremental mode
49-
private final Map<String, List<ContentBlock>> previousContent = new HashMap<>();
50-
51-
/**
52-
* Creates a new streaming hook.
53-
*
54-
* @param sink The FluxSink to emit events to
55-
* @param options Configuration options for streaming
56-
*/
57-
StreamingHook(FluxSink<Event> sink, StreamOptions options) {
58-
this.sink = sink;
59-
this.options = options;
60-
}
61-
62-
@Override
63-
public <T extends HookEvent> Mono<T> onEvent(T event) {
64-
if (event instanceof PostReasoningEvent) {
65-
PostReasoningEvent e = (PostReasoningEvent) event;
66-
// postReasoning is called after streaming completes
67-
// This is the last/complete message
68-
if (options.shouldStream(EventType.REASONING)
69-
&& options.shouldIncludeReasoningEmission(false)) {
70-
emitEvent(EventType.REASONING, e.getReasoningMessage(), true);
71-
}
72-
return Mono.just(event);
73-
} else if (event instanceof ReasoningChunkEvent) {
74-
ReasoningChunkEvent e = (ReasoningChunkEvent) event;
75-
// This is an intermediate chunk
76-
if (options.shouldStream(EventType.REASONING)
77-
&& options.shouldIncludeReasoningEmission(true)) {
78-
// Use incremental or accumulated based on StreamOptions
79-
Msg msgToEmit =
80-
options.isIncremental() ? e.getIncrementalChunk() : e.getAccumulated();
81-
emitEvent(EventType.REASONING, msgToEmit, false);
82-
}
83-
return Mono.just(event);
84-
} else if (event instanceof PostActingEvent) {
85-
PostActingEvent e = (PostActingEvent) event;
86-
// Tool execution completed
87-
if (options.shouldStream(EventType.TOOL_RESULT)) {
88-
Msg toolMsg = createToolMessage(e.getToolResult());
89-
emitEvent(EventType.TOOL_RESULT, toolMsg, true);
90-
}
91-
return Mono.just(event);
92-
} else if (event instanceof ActingChunkEvent) {
93-
ActingChunkEvent e = (ActingChunkEvent) event;
94-
// Intermediate tool chunk
95-
if (options.shouldStream(EventType.TOOL_RESULT) && options.isIncludeActingChunk()) {
96-
Msg toolMsg = createToolMessage(e.getChunk());
97-
emitEvent(EventType.TOOL_RESULT, toolMsg, false);
98-
}
99-
return Mono.just(event);
100-
} else if (event instanceof PostSummaryEvent) {
101-
PostSummaryEvent e = (PostSummaryEvent) event;
102-
// Summary generation completed
103-
if (options.shouldStream(EventType.SUMMARY)
104-
&& options.shouldIncludeSummaryEmission(false)) {
105-
emitEvent(EventType.SUMMARY, e.getSummaryMessage(), true);
106-
}
107-
return Mono.just(event);
108-
} else if (event instanceof SummaryChunkEvent) {
109-
SummaryChunkEvent e = (SummaryChunkEvent) event;
110-
// Intermediate summary chunk
111-
if (options.shouldStream(EventType.SUMMARY)
112-
&& options.shouldIncludeSummaryEmission(true)) {
113-
// Use incremental or accumulated based on StreamOptions
114-
Msg msgToEmit =
115-
options.isIncremental() ? e.getIncrementalChunk() : e.getAccumulated();
116-
emitEvent(EventType.SUMMARY, msgToEmit, false);
117-
}
118-
return Mono.just(event);
119-
}
120-
return Mono.just(event);
121-
}
122-
123-
// ========== Helper Methods ==========
124-
125-
/**
126-
* Creates a tool message from a tool result block.
127-
*
128-
* @param toolResultBlock The tool result or chunk
129-
* @return A message with TOOL role containing the result
130-
*/
131-
private Msg createToolMessage(ToolResultBlock toolResultBlock) {
132-
return Msg.builder()
133-
.name("system")
134-
.role(MsgRole.TOOL)
135-
.content(List.of(toolResultBlock))
136-
.build();
137-
}
138-
139-
/**
140-
* Emit an event to the sink.
141-
*
142-
* @param type The event type
143-
* @param msg The message
144-
* @param isLast Whether this is the last/complete message in the stream
145-
*/
146-
private void emitEvent(EventType type, Msg msg, boolean isLast) {
147-
Msg processedMsg = msg;
148-
149-
// For incremental mode, calculate the diff (if needed in the future)
150-
// Currently we directly use the incremental chunk from ReasoningChunkEvent
151-
152-
// Create and emit the event
153-
Event event = new Event(type, processedMsg, isLast);
154-
sink.next(event);
155-
156-
// Update tracking
157-
if (!isLast) {
158-
previousContent.put(msg.getId(), new ArrayList<>(msg.getContent()));
159-
} else {
160-
previousContent.remove(msg.getId());
161-
}
162-
}
163-
}
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.agentscope.core.agent;
17+
18+
import io.agentscope.core.hook.ActingChunkEvent;
19+
import io.agentscope.core.hook.Hook;
20+
import io.agentscope.core.hook.HookEvent;
21+
import io.agentscope.core.hook.PostActingEvent;
22+
import io.agentscope.core.hook.PostReasoningEvent;
23+
import io.agentscope.core.hook.PostSummaryEvent;
24+
import io.agentscope.core.hook.ReasoningChunkEvent;
25+
import io.agentscope.core.hook.SummaryChunkEvent;
26+
import io.agentscope.core.message.ContentBlock;
27+
import io.agentscope.core.message.Msg;
28+
import io.agentscope.core.message.MsgRole;
29+
import io.agentscope.core.message.TextBlock;
30+
import io.agentscope.core.message.ThinkingBlock;
31+
import io.agentscope.core.message.ToolResultBlock;
32+
import io.agentscope.core.message.ToolUseBlock;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import reactor.core.publisher.FluxSink;
38+
import reactor.core.publisher.Mono;
39+
40+
/**
41+
* Internal hook implementation for streaming events.
42+
*
43+
* <p>Intercepts hook callbacks and emits {@link Event} instances to a FluxSink. Handles event
44+
* filtering and chunk mode processing.
45+
*/
46+
class StreamingHook implements Hook {
47+
48+
private final FluxSink<Event> sink;
49+
private final StreamOptions options;
50+
51+
// Track previous content for incremental mode
52+
private final Map<String, List<ContentBlock>> previousContent = new HashMap<>();
53+
54+
// Track cumulative reasoning content across ReAct reasoning/acting boundaries.
55+
private final List<ContentBlock> cumulativeReasoningContent = new ArrayList<>();
56+
private final Map<String, Integer> cumulativeReasoningPositions = new HashMap<>();
57+
58+
/**
59+
* Creates a new streaming hook.
60+
*
61+
* @param sink The FluxSink to emit events to
62+
* @param options Configuration options for streaming
63+
*/
64+
StreamingHook(FluxSink<Event> sink, StreamOptions options) {
65+
this.sink = sink;
66+
this.options = options;
67+
}
68+
69+
@Override
70+
public <T extends HookEvent> Mono<T> onEvent(T event) {
71+
if (event instanceof PostReasoningEvent) {
72+
PostReasoningEvent e = (PostReasoningEvent) event;
73+
// postReasoning is called after streaming completes
74+
// This is the last/complete message
75+
if (options.shouldStream(EventType.REASONING)
76+
&& options.shouldIncludeReasoningEmission(false)) {
77+
Msg msgToEmit =
78+
options.isIncremental()
79+
? e.getReasoningMessage()
80+
: accumulateReasoning(e.getReasoningMessage());
81+
emitEvent(EventType.REASONING, msgToEmit, true);
82+
}
83+
return Mono.just(event);
84+
} else if (event instanceof ReasoningChunkEvent) {
85+
ReasoningChunkEvent e = (ReasoningChunkEvent) event;
86+
// This is an intermediate chunk
87+
if (options.shouldStream(EventType.REASONING)
88+
&& options.shouldIncludeReasoningEmission(true)) {
89+
// Use incremental or accumulated based on StreamOptions
90+
Msg msgToEmit =
91+
options.isIncremental()
92+
? e.getIncrementalChunk()
93+
: accumulateReasoning(e.getAccumulated());
94+
emitEvent(EventType.REASONING, msgToEmit, false);
95+
}
96+
return Mono.just(event);
97+
} else if (event instanceof PostActingEvent) {
98+
PostActingEvent e = (PostActingEvent) event;
99+
// Tool execution completed
100+
if (options.shouldStream(EventType.TOOL_RESULT)) {
101+
Msg toolMsg = createToolMessage(e.getToolResult());
102+
emitEvent(EventType.TOOL_RESULT, toolMsg, true);
103+
}
104+
return Mono.just(event);
105+
} else if (event instanceof ActingChunkEvent) {
106+
ActingChunkEvent e = (ActingChunkEvent) event;
107+
// Intermediate tool chunk
108+
if (options.shouldStream(EventType.TOOL_RESULT) && options.isIncludeActingChunk()) {
109+
Msg toolMsg = createToolMessage(e.getChunk());
110+
emitEvent(EventType.TOOL_RESULT, toolMsg, false);
111+
}
112+
return Mono.just(event);
113+
} else if (event instanceof PostSummaryEvent) {
114+
PostSummaryEvent e = (PostSummaryEvent) event;
115+
// Summary generation completed
116+
if (options.shouldStream(EventType.SUMMARY)
117+
&& options.shouldIncludeSummaryEmission(false)) {
118+
emitEvent(EventType.SUMMARY, e.getSummaryMessage(), true);
119+
}
120+
return Mono.just(event);
121+
} else if (event instanceof SummaryChunkEvent) {
122+
SummaryChunkEvent e = (SummaryChunkEvent) event;
123+
// Intermediate summary chunk
124+
if (options.shouldStream(EventType.SUMMARY)
125+
&& options.shouldIncludeSummaryEmission(true)) {
126+
// Use incremental or accumulated based on StreamOptions
127+
Msg msgToEmit =
128+
options.isIncremental() ? e.getIncrementalChunk() : e.getAccumulated();
129+
emitEvent(EventType.SUMMARY, msgToEmit, false);
130+
}
131+
return Mono.just(event);
132+
}
133+
return Mono.just(event);
134+
}
135+
136+
// ========== Helper Methods ==========
137+
138+
/**
139+
* Creates a tool message from a tool result block.
140+
*
141+
* @param toolResultBlock The tool result or chunk
142+
* @return A message with TOOL role containing the result
143+
*/
144+
private Msg createToolMessage(ToolResultBlock toolResultBlock) {
145+
return Msg.builder()
146+
.name("system")
147+
.role(MsgRole.TOOL)
148+
.content(List.of(toolResultBlock))
149+
.build();
150+
}
151+
152+
private Msg accumulateReasoning(Msg reasoningMsg) {
153+
for (int index = 0; index < reasoningMsg.getContent().size(); index++) {
154+
ContentBlock block = reasoningMsg.getContent().get(index);
155+
String key = reasoningContentKey(reasoningMsg.getId(), block, index);
156+
Integer position = cumulativeReasoningPositions.get(key);
157+
158+
if (position == null) {
159+
cumulativeReasoningPositions.put(key, cumulativeReasoningContent.size());
160+
cumulativeReasoningContent.add(block);
161+
} else {
162+
cumulativeReasoningContent.set(position, block);
163+
}
164+
}
165+
166+
return Msg.builder()
167+
.id(reasoningMsg.getId())
168+
.name(reasoningMsg.getName())
169+
.role(reasoningMsg.getRole())
170+
.content(new ArrayList<>(cumulativeReasoningContent))
171+
.metadata(new HashMap<>(reasoningMsg.getMetadata()))
172+
.timestamp(reasoningMsg.getTimestamp())
173+
.build();
174+
}
175+
176+
private String reasoningContentKey(String messageId, ContentBlock block, int index) {
177+
if (block instanceof ThinkingBlock) {
178+
return messageId + ":thinking";
179+
}
180+
if (block instanceof TextBlock) {
181+
return messageId + ":text";
182+
}
183+
if (block instanceof ToolUseBlock toolUseBlock) {
184+
String toolCallId = toolUseBlock.getId();
185+
if (toolCallId == null || toolCallId.isBlank()) {
186+
toolCallId = toolUseBlock.getName() + ":" + index;
187+
}
188+
return messageId + ":tool:" + toolCallId;
189+
}
190+
return messageId + ":" + block.getClass().getName() + ":" + index;
191+
}
192+
193+
/**
194+
* Emit an event to the sink.
195+
*
196+
* @param type The event type
197+
* @param msg The message
198+
* @param isLast Whether this is the last/complete message in the stream
199+
*/
200+
private void emitEvent(EventType type, Msg msg, boolean isLast) {
201+
Msg processedMsg = msg;
202+
203+
// For incremental mode, calculate the diff (if needed in the future)
204+
// Currently we directly use the incremental chunk from ReasoningChunkEvent
205+
206+
// Create and emit the event
207+
Event event = new Event(type, processedMsg, isLast);
208+
sink.next(event);
209+
210+
// Update tracking
211+
if (!isLast) {
212+
previousContent.put(msg.getId(), new ArrayList<>(msg.getContent()));
213+
} else {
214+
previousContent.remove(msg.getId());
215+
}
216+
}
217+
}

0 commit comments

Comments
 (0)