diff --git a/core/src/main/java/com/google/adk/flows/llmflows/Contents.java b/core/src/main/java/com/google/adk/flows/llmflows/Contents.java index 91651a92d..32600dea8 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/Contents.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/Contents.java @@ -23,9 +23,11 @@ import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LlmAgent; import com.google.adk.events.Event; +import com.google.adk.events.EventCompaction; import com.google.adk.models.LlmRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.genai.types.Content; import com.google.genai.types.FunctionCall; import com.google.genai.types.FunctionResponse; @@ -36,6 +38,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -99,24 +102,25 @@ private ImmutableList getCurrentTurnContents( private ImmutableList getContents( Optional currentBranch, List events, String agentName, String modelName) { List filteredEvents = new ArrayList<>(); + boolean hasCompactEvent = false; // Filter the events, leaving the contents and the function calls and responses from the current // agent. for (Event event : events) { - // Skip events without content, or generated neither by user nor by model or has empty text. - // E.g. events purely for mutating session states. - if (event.content().isEmpty()) { + if (event.actions().compaction().isPresent()) { + // Always include the compaction event for the later processCompactionEvent call. + // The compaction event is used to filter out normal events that are covered by the + // compaction event. + hasCompactEvent = true; + filteredEvents.add(event); continue; } - var content = event.content().get(); - if (content.role().isEmpty() - || content.role().get().isEmpty() - || content.parts().isEmpty() - || content.parts().get().isEmpty() - || content.parts().get().get(0).text().map(String::isEmpty).orElse(false)) { + + // Skip events without content, or generated neither by user nor by model or has empty text. + // E.g. events purely for mutating session states. + if (isEmptyContent(event)) { continue; } - if (!isEventBelongsToBranch(currentBranch, event)) { continue; } @@ -133,6 +137,10 @@ private ImmutableList getContents( } } + if (hasCompactEvent) { + filteredEvents = processCompactionEvent(filteredEvents); + } + List resultEvents = rearrangeEventsForLatestFunctionResponse(filteredEvents); resultEvents = rearrangeEventsForAsyncFunctionResponsesInHistory(resultEvents, modelName); @@ -142,6 +150,93 @@ private ImmutableList getContents( .collect(toImmutableList()); } + /** + * Check if an event has missing or empty content. + * + *

This can happen to the events that only changed session state. When both content and + * transcriptions are empty, the event will be considered as empty. The content is considered + * empty if none of its parts contain text, inline data, file data, function call, or function + * response. Parts with only thoughts are also considered empty. + * + * @param event the event to check. + * @return {@code true} if the event is considered to have empty content, {@code false} otherwise. + */ + private boolean isEmptyContent(Event event) { + if (event.content().isEmpty()) { + return true; + } + var content = event.content().get(); + return (content.role().isEmpty() + || content.role().get().isEmpty() + || content.parts().isEmpty() + || content.parts().get().isEmpty() + || content.parts().get().get(0).text().map(String::isEmpty).orElse(false)); + } + + /** + * Filters events that are covered by compaction events by identifying compacted ranges and + * filters out events that are covered by compaction summaries + * + *

Example of input + * + *

+   * [
+   *   event_1(timestamp=1),
+   *   event_2(timestamp=2),
+   *   compaction_1(event_1, event_2, timestamp=3, content=summary_1_2, startTime=1, endTime=2),
+   *   event_3(timestamp=4),
+   *   compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=3),
+   *   event_4(timestamp=6)
+   * ]
+   * 
+ * + * Will result in the following events output + * + *
+   * [
+   *   compaction_1,
+   *   compaction_2
+   *   event_4
+   * ]
+   * 
+ * + * Compaction events are always strictly in order based on event timestamp. + * + * @param events the list of event to filter. + * @return a new list with compaction applied. + */ + private List processCompactionEvent(List events) { + List result = new ArrayList<>(); + ListIterator iter = events.listIterator(events.size()); + Long lastCompactionStartTime = null; + + while (iter.hasPrevious()) { + Event event = iter.previous(); + EventCompaction compaction = event.actions().compaction().orElse(null); + if (compaction == null) { + if (lastCompactionStartTime == null || event.timestamp() < lastCompactionStartTime) { + result.add(event); + } + continue; + } + // Create a new event for the compaction event in the result. + result.add( + Event.builder() + .timestamp(compaction.endTimestamp()) + .author("model") + .content(compaction.compactedContent()) + .branch(event.branch()) + .invocationId(event.invocationId()) + .actions(event.actions()) + .build()); + lastCompactionStartTime = + lastCompactionStartTime == null + ? compaction.startTimestamp() + : Long.min(lastCompactionStartTime, compaction.startTimestamp()); + } + return Lists.reverse(result); + } + /** Whether the event is a reply from another agent. */ private static boolean isOtherAgentReply(String agentName, Event event) { return !agentName.isEmpty() diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index c774ff361..289c5503f 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -29,11 +29,15 @@ import com.google.adk.events.EventActions; import com.google.adk.flows.llmflows.ResumabilityConfig; import com.google.adk.memory.BaseMemoryService; +import com.google.adk.models.Model; import com.google.adk.plugins.BasePlugin; import com.google.adk.plugins.PluginManager; import com.google.adk.sessions.BaseSessionService; import com.google.adk.sessions.InMemorySessionService; import com.google.adk.sessions.Session; +import com.google.adk.summarizer.EventsCompactionConfig; +import com.google.adk.summarizer.LlmEventSummarizer; +import com.google.adk.summarizer.SlidingWindowEventCompactor; import com.google.adk.tools.BaseTool; import com.google.adk.tools.FunctionTool; import com.google.adk.utils.CollectionUtils; @@ -68,6 +72,7 @@ public class Runner { @Nullable private final BaseMemoryService memoryService; private final PluginManager pluginManager; private final ResumabilityConfig resumabilityConfig; + @Nullable private final EventsCompactionConfig eventsCompactionConfig; /** Builder for {@link Runner}. */ public static class Builder { @@ -78,6 +83,7 @@ public static class Builder { @Nullable private BaseMemoryService memoryService = null; private List plugins = ImmutableList.of(); private ResumabilityConfig resumabilityConfig = new ResumabilityConfig(); + @Nullable private EventsCompactionConfig eventsCompactionConfig; @CanIgnoreReturnValue public Builder agent(BaseAgent agent) { @@ -121,6 +127,12 @@ public Builder resumabilityConfig(ResumabilityConfig resumabilityConfig) { return this; } + @CanIgnoreReturnValue + public Builder eventsCompactionConfig(EventsCompactionConfig eventsCompactionConfig) { + this.eventsCompactionConfig = eventsCompactionConfig; + return this; + } + public Runner build() { if (agent == null) { throw new IllegalStateException("Agent must be provided."); @@ -141,7 +153,8 @@ public Runner build() { sessionService, memoryService, plugins, - resumabilityConfig); + resumabilityConfig, + eventsCompactionConfig); } } @@ -208,6 +221,32 @@ public Runner( @Nullable BaseMemoryService memoryService, List plugins, ResumabilityConfig resumabilityConfig) { + this( + agent, + appName, + artifactService, + sessionService, + memoryService, + plugins, + resumabilityConfig, + null); + } + + /** + * Creates a new {@code Runner} with a list of plugins and resumability config. + * + * @deprecated Use {@link Runner.Builder} instead. + */ + @Deprecated + protected Runner( + BaseAgent agent, + String appName, + BaseArtifactService artifactService, + BaseSessionService sessionService, + @Nullable BaseMemoryService memoryService, + List plugins, + ResumabilityConfig resumabilityConfig, + @Nullable EventsCompactionConfig eventsCompactionConfig) { this.agent = agent; this.appName = appName; this.artifactService = artifactService; @@ -215,6 +254,10 @@ public Runner( this.memoryService = memoryService; this.pluginManager = new PluginManager(plugins); this.resumabilityConfig = resumabilityConfig; + this.eventsCompactionConfig = + Optional.ofNullable(eventsCompactionConfig) + .map(c -> createEventsCompactionConfig(agent, c)) + .orElse(null); } /** @@ -493,7 +536,10 @@ public Flowable runAsync( Completable.defer( () -> pluginManager.runAfterRunCallback( - contextWithUpdatedSession))); + contextWithUpdatedSession))) + .concatWith( + Completable.defer( + () -> compactEvents(updatedSession))); }); })) .doOnError( @@ -509,6 +555,13 @@ public Flowable runAsync( } } + private Completable compactEvents(Session session) { + return Optional.ofNullable(eventsCompactionConfig) + .map(SlidingWindowEventCompactor::new) + .map(c -> c.compact(session, sessionService)) + .orElse(Completable.complete()); + } + private void copySessionStates(Session source, Session target) { // TODO: remove this hack when deprecating all runAsync with Session. for (var entry : source.state().entrySet()) { @@ -740,5 +793,27 @@ private boolean hasLiveRequestQueueParameter(FunctionTool functionTool) { .anyMatch(parameter -> parameter.getType().equals(LiveRequestQueue.class)); } + /** + * Creates a new {@link EventsCompactionConfig} based on the given configuration. If the {@link + * com.google.adk.summarizer.BaseEventSummarizer} is missing, it will be default to the {@link + * LlmEventSummarizer} using the same model as the LLM base agent. + */ + private static EventsCompactionConfig createEventsCompactionConfig( + BaseAgent agent, EventsCompactionConfig config) { + return new EventsCompactionConfig( + config.compactionInterval(), + config.overlapSize(), + config + .summarizer() + .or( + () -> + Optional.of(agent) + .filter(LlmAgent.class::isInstance) + .map(LlmAgent.class::cast) + .map(LlmAgent::resolvedModel) + .flatMap(Model::model) + .map(LlmEventSummarizer::new))); + } + // TODO: run statelessly } diff --git a/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java index c59e656ca..09eebbacd 100644 --- a/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java +++ b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java @@ -12,6 +12,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class performs events compaction in a sliding window fashion based on the {@link @@ -19,13 +21,18 @@ */ public final class SlidingWindowEventCompactor implements EventCompactor { + private static final Logger logger = LoggerFactory.getLogger(SlidingWindowEventCompactor.class); + private final EventsCompactionConfig config; private final BaseEventSummarizer summarizer; public SlidingWindowEventCompactor(EventsCompactionConfig config) { this.config = config; - // TODO default to LLM summarizer - this.summarizer = config.summarizer().orElseThrow(); + this.summarizer = + config + .summarizer() + .orElseThrow( + () -> new IllegalArgumentException("Summarizer is required for event compaction.")); } /** @@ -80,6 +87,8 @@ public SlidingWindowEventCompactor(EventsCompactionConfig config) { */ @Override public Completable compact(Session session, BaseSessionService sessionService) { + logger.debug("Running event compaction for session {}", session.id()); + return Completable.fromMaybe( getCompactionEvents(session) .flatMap(summarizer::summarizeEvents) diff --git a/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java b/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java index 1cb0c8771..86ecda7cb 100644 --- a/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java +++ b/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java @@ -17,12 +17,15 @@ package com.google.adk.flows.llmflows; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.truth.Correspondence.transforming; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LlmAgent; import com.google.adk.events.Event; +import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; import com.google.adk.models.LlmRequest; import com.google.adk.models.Model; import com.google.adk.sessions.Session; @@ -487,6 +490,77 @@ public void rearrangeHistory_gemini3interleavedFCFR_groupsFcAndFr() { .hasValue("tool2"); } + @Test + public void processRequest_singleCompaction() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createCompactedEvent(1, 2, "Summary 1-2"), + createUserEvent("env3", "content 3", "inv3", 3)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-2", "content 3"); + } + + @Test + public void processRequest_startsWithCompaction() { + ImmutableList events = + ImmutableList.of( + createCompactedEvent(1, 2, "Summary 1-2"), + createUserEvent("env3", "content 3", "inv3", 3), + createUserEvent("env4", "content 4", "inv4", 4)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-2", "content 3", "content 4"); + } + + @Test + public void processRequest_endsWithCompaction() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createUserEvent("env3", "content 3", "inv3", 2), + createCompactedEvent(2, 3, "Summary 2-3")); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("content 1", "Summary 2-3"); + } + + @Test + public void processRequest_multipleCompactions() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createUserEvent("env3", "content 3", "inv3", 3), + createUserEvent("env4", "content 4", "inv4", 4), + createCompactedEvent(1, 4, "Summary 1-4"), + createUserEvent("env5", "content 5", "inv5", 5), + createUserEvent("env6", "content 6", "inv6", 6), + createUserEvent("env7-1", "content 7-1", "inv7", 7), + createUserEvent("env7-2", "content 7-2", "inv8", 8), + createUserEvent("env9", "content 9", "inv9", 9), + createCompactedEvent(6, 9, "Summary 6-9"), + createUserEvent("env10", "content 10", "inv10", 10)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-4", "content 5", "Summary 6-9", "content 10"); + } + private static Event createUserEvent(String id, String text) { return Event.builder() .id(id) @@ -496,6 +570,17 @@ private static Event createUserEvent(String id, String text) { .build(); } + private static Event createUserEvent( + String id, String text, String invocationId, long timestamp) { + return Event.builder() + .id(id) + .author(USER) + .content(Optional.of(Content.fromParts(Part.fromText(text)))) + .invocationId(invocationId) + .timestamp(timestamp) + .build(); + } + private static Event createAgentEvent(String id, String text) { return createAgentEvent(AGENT, id, text); } @@ -684,4 +769,22 @@ private static ImmutableList eventsToContents(List events) { .map(Optional::get) .collect(toImmutableList()); } + + private Event createCompactedEvent(long startTimestamp, long endTimestamp, String content) { + return Event.builder() + .actions( + EventActions.builder() + .compaction( + EventCompaction.builder() + .startTimestamp(startTimestamp) + .endTimestamp(endTimestamp) + .compactedContent( + Content.builder() + .role("model") + .parts(Part.builder().text(content).build()) + .build()) + .build()) + .build()) + .build(); + } } diff --git a/core/src/test/java/com/google/adk/runner/RunnerTest.java b/core/src/test/java/com/google/adk/runner/RunnerTest.java index 52218a0e9..7a734deb2 100644 --- a/core/src/test/java/com/google/adk/runner/RunnerTest.java +++ b/core/src/test/java/com/google/adk/runner/RunnerTest.java @@ -17,6 +17,7 @@ package com.google.adk.runner; import static com.google.adk.testing.TestUtils.createLlmResponse; +import static com.google.adk.testing.TestUtils.createTestAgent; import static com.google.adk.testing.TestUtils.createTestAgentBuilder; import static com.google.adk.testing.TestUtils.createTestLlm; import static com.google.adk.testing.TestUtils.simplifyEvents; @@ -38,6 +39,7 @@ import com.google.adk.models.LlmResponse; import com.google.adk.plugins.BasePlugin; import com.google.adk.sessions.Session; +import com.google.adk.summarizer.EventsCompactionConfig; import com.google.adk.testing.TestLlm; import com.google.adk.testing.TestUtils; import com.google.adk.testing.TestUtils.EchoTool; @@ -119,6 +121,45 @@ public void tearDown() { Telemetry.setTracerForTesting(originalTracer); } + @Test + public void eventsCompaction_enabled() { + LlmAgent agent = + createTestAgent( + createTestLlm( + createLlmResponse(createContent("llm 1")), + createLlmResponse(createContent("summary 1")), + createLlmResponse(createContent("llm 2")), + createLlmResponse(createContent("summary 2")))); + + Runner runner = + Runner.builder() + .eventsCompactionConfig(new EventsCompactionConfig(1, 0)) + .agent(agent) + .sessionService(this.runner.sessionService()) + .appName(this.runner.appName()) + .build(); + var events = + runner.runAsync("user", session.id(), createContent("user 1")).toList().blockingGet(); + assertThat(simplifyEvents(events)).containsExactly("test agent: llm 1"); + + events = runner.runAsync("user", session.id(), createContent("user 2")).toList().blockingGet(); + assertThat(simplifyEvents(events)).containsExactly("test agent: llm 2"); + + Session updatedSession = + runner + .sessionService() + .getSession(session.appName(), session.userId(), session.id(), Optional.empty()) + .blockingGet(); + assertThat(simplifyEvents(updatedSession.events())) + .containsExactly( + "user: user 1", + "test agent: llm 1", + "user: summary 1", + "user: user 2", + "test agent: llm 2", + "user: summary 2"); + } + @Test public void pluginDoesNothing() { var events = diff --git a/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java index 8fcc104ec..6ec2fb42c 100644 --- a/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java +++ b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java @@ -2,6 +2,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -63,6 +64,26 @@ public void compaction_notEnoughInvocations() { verify(mockSessionService, never()).appendEvent(any(), any()); } + @Test + public void compaction_notEnoughInvocationsAfterCompact() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 0, Optional.of(mockSummarizer))); + Session session = + Session.builder("id") + .events( + ImmutableList.of( + Event.builder().invocationId("1").timestamp(1).build(), + Event.builder().invocationId("2").timestamp(2).build(), + createCompactedEvent(1, 2, "Summary 1-2"), + Event.builder().invocationId("3").timestamp(3).build())) + .build(); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + verify(mockSummarizer, never()).summarizeEvents(anyList()); + verify(mockSessionService, never()).appendEvent(any(), any()); + } + @Test public void compaction_firstCompaction() { EventCompactor compactor = diff --git a/core/src/test/java/com/google/adk/testing/TestUtils.java b/core/src/test/java/com/google/adk/testing/TestUtils.java index e4c2949eb..d21a19f46 100644 --- a/core/src/test/java/com/google/adk/testing/TestUtils.java +++ b/core/src/test/java/com/google/adk/testing/TestUtils.java @@ -26,6 +26,7 @@ import com.google.adk.agents.RunConfig; import com.google.adk.events.Event; import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; import com.google.adk.models.BaseLlm; import com.google.adk.models.LlmResponse; import com.google.adk.sessions.InMemorySessionService; @@ -89,7 +90,8 @@ public static ImmutableList simplifyEvents(List events) { private static String formatEventContent(Event event) { return event .content() - .flatMap(content -> content.parts()) + .or(() -> event.actions().compaction().map(EventCompaction::compactedContent)) + .flatMap(Content::parts) .map( parts -> { if (parts.size() == 1) {