Skip to content

Commit 17e963b

Browse files
feat(core): opt memory concurrency and exception handling (#196)
1 parent 022b3e5 commit 17e963b

4 files changed

Lines changed: 117 additions & 16 deletions

File tree

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.agentscope.core.exception;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
public class CompositeAgentException extends RuntimeException {
23+
private final List<AgentExceptionInfo> causes;
24+
25+
public CompositeAgentException(String message, List<AgentExceptionInfo> causes) {
26+
super(message);
27+
this.causes = new ArrayList<>(causes != null ? causes : Collections.emptyList());
28+
}
29+
30+
@Override
31+
public String getMessage() {
32+
String baseMessage = super.getMessage();
33+
if (causes == null || causes.isEmpty()) {
34+
return baseMessage;
35+
}
36+
37+
StringBuilder sb = new StringBuilder(baseMessage);
38+
sb.append(" (caused by: ");
39+
40+
for (int i = 0; i < causes.size(); i++) {
41+
if (i > 0) {
42+
sb.append(", ");
43+
}
44+
45+
AgentExceptionInfo cause = causes.get(i);
46+
47+
if (cause.agentId() != null) {
48+
sb.append("agentId=").append(cause.agentId());
49+
if (cause.agentName() != null) {
50+
sb.append("(").append(cause.agentName()).append(")");
51+
}
52+
} else if (cause.agentName() != null) {
53+
sb.append("agentName=").append(cause.agentName());
54+
}
55+
56+
Throwable throwable = cause.throwable();
57+
if (throwable != null) {
58+
if (cause.agentId() != null || cause.agentName() != null) {
59+
sb.append(": ");
60+
}
61+
sb.append(throwable.getClass().getSimpleName())
62+
.append(": ")
63+
.append(
64+
throwable.getMessage() != null
65+
? throwable.getMessage()
66+
: "No message");
67+
} else {
68+
if (cause.agentId() != null || cause.agentName() != null) {
69+
sb.append(": ");
70+
}
71+
sb.append("Unknown error");
72+
}
73+
}
74+
sb.append(")");
75+
76+
return sb.toString();
77+
}
78+
79+
public List<AgentExceptionInfo> getCauses() {
80+
return Collections.unmodifiableList(causes);
81+
}
82+
83+
public record AgentExceptionInfo(String agentId, String agentName, Throwable throwable) {}
84+
}

agentscope-core/src/main/java/io/agentscope/core/memory/InMemoryMemory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Objects;
25-
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.concurrent.ConcurrentLinkedQueue;
2626
import java.util.stream.Collectors;
2727

2828
/**
@@ -36,7 +36,7 @@
3636
*/
3737
public class InMemoryMemory extends StateModuleBase implements Memory {
3838

39-
private final List<Msg> messages = new CopyOnWriteArrayList<>();
39+
private final ConcurrentLinkedQueue<Msg> messages = new ConcurrentLinkedQueue<>();
4040
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4141

4242
/**

agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
package io.agentscope.core.pipeline;
1717

1818
import io.agentscope.core.agent.AgentBase;
19+
import io.agentscope.core.exception.CompositeAgentException;
1920
import io.agentscope.core.message.Msg;
2021
import java.util.ArrayList;
22+
import java.util.Collections;
2123
import java.util.List;
22-
import java.util.concurrent.atomic.AtomicReference;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
2526
import reactor.core.scheduler.Schedulers;
@@ -87,7 +88,7 @@ public Mono<List<Msg>> execute(Msg input, Class<?> structuredOutputClass) {
8788

8889
/**
8990
* Execute agents concurrently using reactive merge with true parallelism.
90-
* All agents are executed even if some fail, but the first error is propagated.
91+
* All agents are executed even if some fail, but the all error is propagated.
9192
*
9293
* <p>Implementation: Each agent's call is subscribed on a separate thread from the
9394
* {@link Schedulers#boundedElastic()} scheduler, enabling true concurrent execution
@@ -99,7 +100,9 @@ public Mono<List<Msg>> execute(Msg input, Class<?> structuredOutputClass) {
99100
*/
100101
private Mono<List<Msg>> executeConcurrent(Msg input, Class<?> structuredOutputClass) {
101102
// Collect all agent results and errors
102-
AtomicReference<Throwable> firstError = new AtomicReference<>();
103+
104+
List<CompositeAgentException.AgentExceptionInfo> errors =
105+
Collections.synchronizedList(new ArrayList<>());
103106

104107
List<Mono<Msg>> agentMonos =
105108
agents.stream()
@@ -113,10 +116,13 @@ private Mono<List<Msg>> executeConcurrent(Msg input, Class<?> structuredOutputCl
113116

114117
return mono.subscribeOn(Schedulers.boundedElastic())
115118
.doOnError(
116-
e -> {
117-
// Capture the first error encountered
118-
firstError.compareAndSet(null, e);
119-
})
119+
throwable ->
120+
errors.add(
121+
new CompositeAgentException
122+
.AgentExceptionInfo(
123+
agent.getAgentId(),
124+
agent.getName(),
125+
throwable)))
120126
.onErrorResume(e -> Mono.empty());
121127
})
122128
.toList();
@@ -126,9 +132,11 @@ private Mono<List<Msg>> executeConcurrent(Msg input, Class<?> structuredOutputCl
126132
.flatMap(
127133
results -> {
128134
// If there was an error, propagate the first one
129-
Throwable error = firstError.get();
130-
if (error != null) {
131-
return Mono.error(error);
135+
if (!errors.isEmpty()) {
136+
return Mono.error(
137+
new CompositeAgentException(
138+
"Multiple agent execution failures occurred",
139+
errors));
132140
}
133141
return Mono.just(results);
134142
});

agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919
import static org.junit.jupiter.api.Assertions.assertFalse;
20+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2021
import static org.junit.jupiter.api.Assertions.assertNotNull;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
2224

2325
import io.agentscope.core.ReActAgent;
2426
import io.agentscope.core.agent.test.MockModel;
2527
import io.agentscope.core.agent.test.TestUtils;
28+
import io.agentscope.core.exception.CompositeAgentException;
2629
import io.agentscope.core.memory.InMemoryMemory;
2730
import io.agentscope.core.message.Msg;
2831
import io.agentscope.core.tool.Toolkit;
@@ -131,7 +134,11 @@ void shouldPropagatePartialFailure() {
131134
() -> pipeline.execute(input).block(TIMEOUT),
132135
"Fanout pipeline should surface the failure");
133136

134-
assertEquals("Simulated error", exception.getMessage(), "Unexpected exception message");
137+
assertInstanceOf(CompositeAgentException.class, exception);
138+
CompositeAgentException compositeException = (CompositeAgentException) exception;
139+
assertEquals(1, compositeException.getCauses().size(), "Expected one cause");
140+
assertTrue(
141+
exception.getMessage().contains("Simulated error"), "Unexpected exception message");
135142
assertEquals(1, model1.getCallCount(), "Successful agent should still be invoked");
136143
assertEquals(1, errorModel.getCallCount(), "Failing agent should be invoked once");
137144
}
@@ -154,9 +161,11 @@ void shouldPropagateAllFailures() {
154161
// In concurrent execution, either error could be captured first
155162
String errorMsg = exception.getMessage();
156163
assertNotNull(errorMsg, "Should return an error");
157-
assertEquals(
158-
true,
159-
Set.of("Error 1", "Error 2").contains(errorMsg),
164+
assertTrue(
165+
errorMsg.contains("Error 1"),
166+
"Should return either Error 1 or Error 2, got: " + errorMsg);
167+
assertTrue(
168+
errorMsg.contains("Error 2"),
160169
"Should return either Error 1 or Error 2, got: " + errorMsg);
161170
assertEquals(1, errorModel1.getCallCount(), "First failing agent should be invoked");
162171
assertEquals(1, errorModel2.getCallCount(), "Second failing agent should be invoked");

0 commit comments

Comments
 (0)