Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Hook for automatically registering AutoContextMemory integration with ReActAgent.
Expand Down Expand Up @@ -230,8 +229,7 @@ private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
ignored -> {
event.setInputMessages(buildInputMessages(event, autoContextMemory));
return event;
})
.subscribeOn(Schedulers.boundedElastic());
});
}

private List<Msg> buildInputMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,15 @@ public boolean compressIfNeeded() {
return false;
}

Mono<Boolean> compressIfNeededAsync() {
/**
* Asynchronously compresses the working memory on a bounded elastic scheduler.
*
* <p>This is the preferred entry point for reactive callers such as WebFlux pipelines,
* because the synchronous compression logic contains blocking model-stream waits.
*
* @return a Mono that completes with whether compression was performed
*/
public Mono<Boolean> compressIfNeededAsync() {
return Mono.fromCallable(this::compressIfNeeded).subscribeOn(Schedulers.boundedElastic());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ void testCompressionTriggeredByMessageCount() {
"Messages should be compressed or model should be called");
}

@Test
@DisplayName("Should expose async compression API for non-blocking callers")
void testCompressIfNeededAsyncOnNonBlockingScheduler() {
for (int i = 0; i < 12; i++) {
memory.addMessage(createTextMessage("User message " + i, MsgRole.USER));
memory.addMessage(createTextMessage("Assistant response " + i, MsgRole.ASSISTANT));
}

Boolean compressed =
Mono.defer(memory::compressIfNeededAsync)
.subscribeOn(Schedulers.parallel())
.block();

assertNotNull(compressed);
assertTrue(
compressed || testModel.getCallCount() > 0 || memory.getMessages().size() < 24,
"Async compression should run successfully from a non-blocking scheduler");
}

@Test
@DisplayName("Should call summaryPreviousRoundConversation when summarizing previous rounds")
void testSummaryPreviousRoundConversation() {
Expand Down Expand Up @@ -970,37 +989,6 @@ void testCompressToolsInvocationFullCoverage() {
"Should have tool compression event with plan-aware hint");
}

@Test
@DisplayName("Should compress asynchronously on non-blocking scheduler")
void testCompressIfNeededAsyncOnNonBlockingScheduler() {
AutoContextConfig asyncConfig =
AutoContextConfig.builder()
.msgThreshold(5)
.maxToken(10000)
.tokenRatio(0.9)
.lastKeep(2)
.minConsecutiveToolMessages(10)
.largePayloadThreshold(10000)
.minCompressionTokenThreshold(0)
.build();
TestModel asyncModel = new TestModel("Conversation summary");
AutoContextMemory asyncMemory = new AutoContextMemory(asyncConfig, asyncModel);

addCompressibleConversation(asyncMemory, 3);
asyncMemory.addMessage(createTextMessage("Latest request", MsgRole.USER));
int initialCount = asyncMemory.getMessages().size();

Boolean compressed =
Mono.defer(asyncMemory::compressIfNeededAsync)
.subscribeOn(Schedulers.parallel())
.block();

assertTrue(Boolean.TRUE.equals(compressed));
assertTrue(asyncModel.getCallCount() >= 1, "Compression should invoke the model");
assertTrue(asyncMemory.getMessages().size() < initialCount);
assertFalse(asyncMemory.getOffloadContext().isEmpty());
}

@Test
@DisplayName("Should allow repeated async compression calls")
void testCompressIfNeededAsyncMultipleCalls() {
Expand Down
Loading