diff --git a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java index d94844988..354939229 100644 --- a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java +++ b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java @@ -33,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * Hook for automatically registering AutoContextMemory integration with ReActAgent. @@ -230,8 +229,7 @@ private Mono handlePreReasoning(PreReasoningEvent event) { ignored -> { event.setInputMessages(buildInputMessages(event, autoContextMemory)); return event; - }) - .subscribeOn(Schedulers.boundedElastic()); + }); } private List buildInputMessages( diff --git a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java index cb3442f97..3f8d9ae80 100644 --- a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java +++ b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextMemory.java @@ -305,7 +305,15 @@ public boolean compressIfNeeded() { return false; } - Mono compressIfNeededAsync() { + /** + * Asynchronously compresses the working memory on a bounded elastic scheduler. + * + *

This is the preferred entry point for reactive callers such as WebFlux pipelines, + * because the synchronous compression logic contains blocking model-stream waits. + * + * @return a Mono that completes with whether compression was performed + */ + public Mono compressIfNeededAsync() { return Mono.fromCallable(this::compressIfNeeded).subscribeOn(Schedulers.boundedElastic()); } diff --git a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextMemoryTest.java b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextMemoryTest.java index e6fe2c9a9..be40306f1 100644 --- a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextMemoryTest.java +++ b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/test/java/io/agentscope/core/memory/autocontext/AutoContextMemoryTest.java @@ -131,6 +131,25 @@ void testCompressionTriggeredByMessageCount() { "Messages should be compressed or model should be called"); } + @Test + @DisplayName("Should expose async compression API for non-blocking callers") + void testCompressIfNeededAsyncOnNonBlockingScheduler() { + for (int i = 0; i < 12; i++) { + memory.addMessage(createTextMessage("User message " + i, MsgRole.USER)); + memory.addMessage(createTextMessage("Assistant response " + i, MsgRole.ASSISTANT)); + } + + Boolean compressed = + Mono.defer(memory::compressIfNeededAsync) + .subscribeOn(Schedulers.parallel()) + .block(); + + assertNotNull(compressed); + assertTrue( + compressed || testModel.getCallCount() > 0 || memory.getMessages().size() < 24, + "Async compression should run successfully from a non-blocking scheduler"); + } + @Test @DisplayName("Should call summaryPreviousRoundConversation when summarizing previous rounds") void testSummaryPreviousRoundConversation() { @@ -970,37 +989,6 @@ void testCompressToolsInvocationFullCoverage() { "Should have tool compression event with plan-aware hint"); } - @Test - @DisplayName("Should compress asynchronously on non-blocking scheduler") - void testCompressIfNeededAsyncOnNonBlockingScheduler() { - AutoContextConfig asyncConfig = - AutoContextConfig.builder() - .msgThreshold(5) - .maxToken(10000) - .tokenRatio(0.9) - .lastKeep(2) - .minConsecutiveToolMessages(10) - .largePayloadThreshold(10000) - .minCompressionTokenThreshold(0) - .build(); - TestModel asyncModel = new TestModel("Conversation summary"); - AutoContextMemory asyncMemory = new AutoContextMemory(asyncConfig, asyncModel); - - addCompressibleConversation(asyncMemory, 3); - asyncMemory.addMessage(createTextMessage("Latest request", MsgRole.USER)); - int initialCount = asyncMemory.getMessages().size(); - - Boolean compressed = - Mono.defer(asyncMemory::compressIfNeededAsync) - .subscribeOn(Schedulers.parallel()) - .block(); - - assertTrue(Boolean.TRUE.equals(compressed)); - assertTrue(asyncModel.getCallCount() >= 1, "Compression should invoke the model"); - assertTrue(asyncMemory.getMessages().size() < initialCount); - assertFalse(asyncMemory.getOffloadContext().isEmpty()); - } - @Test @DisplayName("Should allow repeated async compression calls") void testCompressIfNeededAsyncMultipleCalls() {