Skip to content

Commit 0f6a1db

Browse files
authored
fix(autocontext): avoid blocking pre-reasoning compression (#1235)
## Summary - move AutoContextMemory compression off Reactor non-blocking threads before `PreReasoningEvent` continues - add an async compression entrypoint for hook integration while preserving the existing synchronous compression API - add regression coverage for hook execution and direct async compression on non-blocking schedulers ## Why this fix `AutoContextHook` currently calls `compressIfNeeded()` inline. When the hook runs on a Reactor non-blocking thread, the internal summarization flow eventually reaches `block()` inside AutoContextMemory and fails before reasoning can proceed. Running the compression work on a bounded-elastic scheduler keeps the existing compression logic intact while avoiding reactive-thread blocking errors. ## Validation - `mvn -pl agentscope-extensions/agentscope-extensions-autocontext-memory -am '-Dtest=AutoContextHookTest,AutoContextMemoryTest' test`
1 parent 9c0ff69 commit 0f6a1db

4 files changed

Lines changed: 240 additions & 11 deletions

File tree

agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535
import reactor.core.publisher.Mono;
36+
import reactor.core.scheduler.Schedulers;
3637

3738
/**
3839
* Hook for automatically registering AutoContextMemory integration with ReActAgent.
@@ -214,27 +215,32 @@ private Mono<PreCallEvent> handlePreCall(PreCallEvent event) {
214215
private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
215216
Agent agent = event.getAgent();
216217

217-
// Only process ReActAgent instances
218218
if (!(agent instanceof ReActAgent reActAgent)) {
219219
return Mono.just(event);
220220
}
221221

222-
// Get memory from agent and verify it's an AutoContextMemory instance
223222
Memory memory = reActAgent.getMemory();
224223
if (!(memory instanceof AutoContextMemory autoContextMemory)) {
225224
return Mono.just(event);
226225
}
227226

228-
// Trigger compression if needed (this modifies workingMemoryStorage in place)
229-
autoContextMemory.compressIfNeeded();
227+
return autoContextMemory
228+
.compressIfNeededAsync()
229+
.map(
230+
ignored -> {
231+
event.setInputMessages(buildInputMessages(event, autoContextMemory));
232+
return event;
233+
})
234+
.subscribeOn(Schedulers.boundedElastic());
235+
}
230236

231-
// Always append system prompt instruction about compressed messages
237+
private List<Msg> buildInputMessages(
238+
PreReasoningEvent event, AutoContextMemory autoContextMemory) {
232239
List<Msg> originalInputMessages = event.getInputMessages();
233240
List<Msg> newInputMessages = new ArrayList<>();
234241

235242
if (!originalInputMessages.isEmpty()
236243
&& originalInputMessages.get(0).getRole() == MsgRole.SYSTEM) {
237-
// Append instruction to existing system prompt
238244
Msg originalSystemMsg = originalInputMessages.get(0);
239245
String originalSystemText = originalSystemMsg.getTextContent();
240246
String appendedInstruction =
@@ -260,7 +266,6 @@ private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
260266

261267
newInputMessages.add(updatedSystemMsg);
262268
} else {
263-
// No system message exists, create a new one with the instruction
264269
String instruction =
265270
"You may see compressed messages containing <!-- CONTEXT_OFFLOAD uuid=..."
266271
+ " -->.\n"
@@ -276,10 +281,7 @@ private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
276281
.build());
277282
}
278283

279-
// Add memory messages (compressed or not)
280284
newInputMessages.addAll(autoContextMemory.getMessages());
281-
event.setInputMessages(newInputMessages);
282-
283-
return Mono.just(event);
285+
return newInputMessages;
284286
}
285287
}

agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545
import reactor.core.publisher.Mono;
46+
import reactor.core.scheduler.Schedulers;
4647

4748
/**
4849
* AutoContextMemory - Intelligent context memory management system.
@@ -304,6 +305,10 @@ public boolean compressIfNeeded() {
304305
return false;
305306
}
306307

308+
Mono<Boolean> compressIfNeededAsync() {
309+
return Mono.fromCallable(this::compressIfNeeded).subscribeOn(Schedulers.boundedElastic());
310+
}
311+
307312
private List<Msg> replaceWorkingMessage(List<Msg> newMessages) {
308313
workingMemoryStorage.clear();
309314
for (Msg msg : newMessages) {

agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextHookTest.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.mockito.Mock;
4949
import org.mockito.junit.jupiter.MockitoExtension;
5050
import reactor.core.publisher.Flux;
51+
import reactor.core.publisher.Mono;
52+
import reactor.core.scheduler.Schedulers;
5153

5254
/**
5355
* Unit tests for AutoContextHook.
@@ -543,6 +545,145 @@ void testPreReasoningEventMultipleCalls() {
543545
assertNotNull(event2);
544546
}
545547

548+
@Test
549+
@DisplayName("Should handle PreReasoningEvent on non-blocking scheduler")
550+
void testPreReasoningEventOnNonBlockingScheduler() {
551+
AutoContextMemory compressionMemory =
552+
new AutoContextMemory(
553+
createCompressionConfig(), new TestModel("Compressed summary"));
554+
populateCompressibleConversation(compressionMemory, 3);
555+
compressionMemory.addMessage(
556+
Msg.builder()
557+
.role(MsgRole.USER)
558+
.name("user")
559+
.content(TextBlock.builder().text("Latest request").build())
560+
.build());
561+
562+
ReActAgent agent =
563+
ReActAgent.builder()
564+
.name("TestAgent")
565+
.model(mockModel)
566+
.memory(compressionMemory)
567+
.toolkit(toolkit)
568+
.build();
569+
570+
PreReasoningEvent event =
571+
new PreReasoningEvent(
572+
agent,
573+
"test-model",
574+
null,
575+
new ArrayList<>(compressionMemory.getMessages()));
576+
577+
PreReasoningEvent result =
578+
Mono.defer(() -> hook.onEvent(event)).subscribeOn(Schedulers.parallel()).block();
579+
580+
assertNotNull(result);
581+
assertEquals(MsgRole.SYSTEM, result.getInputMessages().get(0).getRole());
582+
assertTrue(
583+
result.getInputMessages().get(0).getTextContent().contains("context_reload"),
584+
"System prompt should include compressed-context instructions");
585+
assertEquals(
586+
compressionMemory.getMessages().size() + 1,
587+
result.getInputMessages().size(),
588+
"Result should include system prompt plus compressed memory messages");
589+
}
590+
591+
@Test
592+
@DisplayName("Should preserve existing system prompt on non-blocking scheduler")
593+
void testPreReasoningEventPreservesSystemPromptOnNonBlockingScheduler() {
594+
AutoContextMemory compressionMemory =
595+
new AutoContextMemory(
596+
createCompressionConfig(), new TestModel("Compressed summary"));
597+
populateCompressibleConversation(compressionMemory, 3);
598+
compressionMemory.addMessage(
599+
Msg.builder()
600+
.role(MsgRole.USER)
601+
.name("user")
602+
.content(TextBlock.builder().text("Latest request").build())
603+
.build());
604+
605+
ReActAgent agent =
606+
ReActAgent.builder()
607+
.name("TestAgent")
608+
.model(mockModel)
609+
.memory(compressionMemory)
610+
.toolkit(toolkit)
611+
.build();
612+
613+
List<Msg> inputMessages = new ArrayList<>();
614+
inputMessages.add(
615+
Msg.builder()
616+
.role(MsgRole.SYSTEM)
617+
.name("system")
618+
.content(TextBlock.builder().text("Base system prompt").build())
619+
.build());
620+
inputMessages.addAll(compressionMemory.getMessages());
621+
622+
PreReasoningEvent event = new PreReasoningEvent(agent, "test-model", null, inputMessages);
623+
PreReasoningEvent result =
624+
Mono.defer(() -> hook.onEvent(event)).subscribeOn(Schedulers.parallel()).block();
625+
626+
assertNotNull(result);
627+
String updatedSystemPrompt = result.getInputMessages().get(0).getTextContent();
628+
assertTrue(updatedSystemPrompt.startsWith("Base system prompt"));
629+
assertTrue(updatedSystemPrompt.contains("context_reload"));
630+
}
631+
632+
private AutoContextConfig createCompressionConfig() {
633+
return AutoContextConfig.builder()
634+
.msgThreshold(5)
635+
.maxToken(10000)
636+
.tokenRatio(0.9)
637+
.lastKeep(2)
638+
.minConsecutiveToolMessages(10)
639+
.largePayloadThreshold(10000)
640+
.minCompressionTokenThreshold(0)
641+
.build();
642+
}
643+
644+
private void populateCompressibleConversation(AutoContextMemory memory, int rounds) {
645+
for (int i = 0; i < rounds; i++) {
646+
memory.addMessage(
647+
Msg.builder()
648+
.role(MsgRole.USER)
649+
.name("user")
650+
.content(TextBlock.builder().text("User message " + i).build())
651+
.build());
652+
memory.addMessage(
653+
Msg.builder()
654+
.role(MsgRole.ASSISTANT)
655+
.name("assistant")
656+
.content(
657+
ToolUseBlock.builder()
658+
.name("test_tool")
659+
.id("call_" + i)
660+
.input(new HashMap<>())
661+
.build())
662+
.build());
663+
memory.addMessage(
664+
Msg.builder()
665+
.role(MsgRole.TOOL)
666+
.name("test_tool")
667+
.content(
668+
ToolResultBlock.builder()
669+
.name("test_tool")
670+
.id("call_" + i)
671+
.output(
672+
List.of(
673+
TextBlock.builder()
674+
.text("Result " + i)
675+
.build()))
676+
.build())
677+
.build());
678+
memory.addMessage(
679+
Msg.builder()
680+
.role(MsgRole.ASSISTANT)
681+
.name("assistant")
682+
.content(TextBlock.builder().text("Assistant response " + i).build())
683+
.build());
684+
}
685+
}
686+
546687
/**
547688
* Simple Model implementation for testing compression.
548689
*/

agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextMemoryTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.junit.jupiter.api.DisplayName;
4949
import org.junit.jupiter.api.Test;
5050
import reactor.core.publisher.Flux;
51+
import reactor.core.publisher.Mono;
52+
import reactor.core.scheduler.Schedulers;
5153

5254
/**
5355
* Unit tests for AutoContextMemory.
@@ -968,8 +970,87 @@ void testCompressToolsInvocationFullCoverage() {
968970
"Should have tool compression event with plan-aware hint");
969971
}
970972

973+
@Test
974+
@DisplayName("Should compress asynchronously on non-blocking scheduler")
975+
void testCompressIfNeededAsyncOnNonBlockingScheduler() {
976+
AutoContextConfig asyncConfig =
977+
AutoContextConfig.builder()
978+
.msgThreshold(5)
979+
.maxToken(10000)
980+
.tokenRatio(0.9)
981+
.lastKeep(2)
982+
.minConsecutiveToolMessages(10)
983+
.largePayloadThreshold(10000)
984+
.minCompressionTokenThreshold(0)
985+
.build();
986+
TestModel asyncModel = new TestModel("Conversation summary");
987+
AutoContextMemory asyncMemory = new AutoContextMemory(asyncConfig, asyncModel);
988+
989+
addCompressibleConversation(asyncMemory, 3);
990+
asyncMemory.addMessage(createTextMessage("Latest request", MsgRole.USER));
991+
int initialCount = asyncMemory.getMessages().size();
992+
993+
Boolean compressed =
994+
Mono.defer(asyncMemory::compressIfNeededAsync)
995+
.subscribeOn(Schedulers.parallel())
996+
.block();
997+
998+
assertTrue(Boolean.TRUE.equals(compressed));
999+
assertTrue(asyncModel.getCallCount() >= 1, "Compression should invoke the model");
1000+
assertTrue(asyncMemory.getMessages().size() < initialCount);
1001+
assertFalse(asyncMemory.getOffloadContext().isEmpty());
1002+
}
1003+
1004+
@Test
1005+
@DisplayName("Should allow repeated async compression calls")
1006+
void testCompressIfNeededAsyncMultipleCalls() {
1007+
AutoContextConfig asyncConfig =
1008+
AutoContextConfig.builder()
1009+
.msgThreshold(5)
1010+
.maxToken(10000)
1011+
.tokenRatio(0.9)
1012+
.lastKeep(2)
1013+
.minConsecutiveToolMessages(10)
1014+
.largePayloadThreshold(10000)
1015+
.minCompressionTokenThreshold(0)
1016+
.build();
1017+
TestModel asyncModel = new TestModel("Conversation summary");
1018+
AutoContextMemory asyncMemory = new AutoContextMemory(asyncConfig, asyncModel);
1019+
1020+
addCompressibleConversation(asyncMemory, 3);
1021+
asyncMemory.addMessage(createTextMessage("Latest request", MsgRole.USER));
1022+
Boolean firstCompressed =
1023+
Mono.defer(asyncMemory::compressIfNeededAsync)
1024+
.subscribeOn(Schedulers.parallel())
1025+
.block();
1026+
1027+
addCompressibleConversation(asyncMemory, 2);
1028+
asyncMemory.addMessage(createTextMessage("Another request", MsgRole.USER));
1029+
Boolean secondCompressed =
1030+
Mono.defer(asyncMemory::compressIfNeededAsync)
1031+
.subscribeOn(Schedulers.parallel())
1032+
.block();
1033+
1034+
assertTrue(Boolean.TRUE.equals(firstCompressed));
1035+
assertTrue(Boolean.TRUE.equals(secondCompressed));
1036+
assertTrue(
1037+
asyncModel.getCallCount() >= 2, "Both async compressions should reach the model");
1038+
assertFalse(asyncMemory.getMessages().isEmpty());
1039+
}
1040+
9711041
// Helper methods
9721042

1043+
private void addCompressibleConversation(AutoContextMemory targetMemory, int rounds) {
1044+
for (int i = 0; i < rounds; i++) {
1045+
String callId = "call_" + i + "_" + targetMemory.getMessages().size();
1046+
targetMemory.addMessage(createTextMessage("User message " + i, MsgRole.USER));
1047+
targetMemory.addMessage(createToolUseMessage("test_tool", callId));
1048+
targetMemory.addMessage(createToolResultMessage("test_tool", callId, "Result " + i));
1049+
targetMemory.addMessage(
1050+
createTextMessage("Assistant response " + i, MsgRole.ASSISTANT));
1051+
}
1052+
}
1053+
9731054
private Msg createTextMessage(String text, MsgRole role) {
9741055
return Msg.builder()
9751056
.role(role)

0 commit comments

Comments
 (0)