From 54c826c80c2bfe09056396c2a21f8241f9d2898b Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Fri, 9 Jan 2026 12:50:43 -0800 Subject: [PATCH] feat: Integrate event compaction in Java ADK runner This change integrates the event compaction mechanism into the ADK Runner. The Contents class is updated to filter out events that fall within the timestamp range of a compaction event, replacing them with the compacted summary. The Runner now uses a SlidingWindowEventCompactor to periodically summarize events based on the provided EventsCompactionConfig. Tests are added to verify the compaction logic in both Contents and Runner. PiperOrigin-RevId: 854310265 --- .../google/adk/flows/llmflows/Contents.java | 115 ++++++++++++++++-- .../java/com/google/adk/runner/Runner.java | 79 +++++++++++- .../SlidingWindowEventCompactor.java | 13 +- .../adk/flows/llmflows/ContentsTest.java | 103 ++++++++++++++++ .../com/google/adk/runner/RunnerTest.java | 41 +++++++ .../SlidingWindowEventCompactorTest.java | 21 ++++ .../com/google/adk/testing/TestUtils.java | 4 +- 7 files changed, 361 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/com/google/adk/flows/llmflows/Contents.java b/core/src/main/java/com/google/adk/flows/llmflows/Contents.java index 91651a92d..32600dea8 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/Contents.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/Contents.java @@ -23,9 +23,11 @@ import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LlmAgent; import com.google.adk.events.Event; +import com.google.adk.events.EventCompaction; import com.google.adk.models.LlmRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.genai.types.Content; import com.google.genai.types.FunctionCall; import com.google.genai.types.FunctionResponse; @@ -36,6 +38,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -99,24 +102,25 @@ private ImmutableList getCurrentTurnContents( private ImmutableList getContents( Optional currentBranch, List events, String agentName, String modelName) { List filteredEvents = new ArrayList<>(); + boolean hasCompactEvent = false; // Filter the events, leaving the contents and the function calls and responses from the current // agent. for (Event event : events) { - // Skip events without content, or generated neither by user nor by model or has empty text. - // E.g. events purely for mutating session states. - if (event.content().isEmpty()) { + if (event.actions().compaction().isPresent()) { + // Always include the compaction event for the later processCompactionEvent call. + // The compaction event is used to filter out normal events that are covered by the + // compaction event. + hasCompactEvent = true; + filteredEvents.add(event); continue; } - var content = event.content().get(); - if (content.role().isEmpty() - || content.role().get().isEmpty() - || content.parts().isEmpty() - || content.parts().get().isEmpty() - || content.parts().get().get(0).text().map(String::isEmpty).orElse(false)) { + + // Skip events without content, or generated neither by user nor by model or has empty text. + // E.g. events purely for mutating session states. + if (isEmptyContent(event)) { continue; } - if (!isEventBelongsToBranch(currentBranch, event)) { continue; } @@ -133,6 +137,10 @@ private ImmutableList getContents( } } + if (hasCompactEvent) { + filteredEvents = processCompactionEvent(filteredEvents); + } + List resultEvents = rearrangeEventsForLatestFunctionResponse(filteredEvents); resultEvents = rearrangeEventsForAsyncFunctionResponsesInHistory(resultEvents, modelName); @@ -142,6 +150,93 @@ private ImmutableList getContents( .collect(toImmutableList()); } + /** + * Check if an event has missing or empty content. + * + *

This can happen to the events that only changed session state. When both content and + * transcriptions are empty, the event will be considered as empty. The content is considered + * empty if none of its parts contain text, inline data, file data, function call, or function + * response. Parts with only thoughts are also considered empty. + * + * @param event the event to check. + * @return {@code true} if the event is considered to have empty content, {@code false} otherwise. + */ + private boolean isEmptyContent(Event event) { + if (event.content().isEmpty()) { + return true; + } + var content = event.content().get(); + return (content.role().isEmpty() + || content.role().get().isEmpty() + || content.parts().isEmpty() + || content.parts().get().isEmpty() + || content.parts().get().get(0).text().map(String::isEmpty).orElse(false)); + } + + /** + * Filters events that are covered by compaction events by identifying compacted ranges and + * filters out events that are covered by compaction summaries + * + *

Example of input + * + *

+   * [
+   *   event_1(timestamp=1),
+   *   event_2(timestamp=2),
+   *   compaction_1(event_1, event_2, timestamp=3, content=summary_1_2, startTime=1, endTime=2),
+   *   event_3(timestamp=4),
+   *   compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=3),
+   *   event_4(timestamp=6)
+   * ]
+   * 
+ * + * Will result in the following events output + * + *
+   * [
+   *   compaction_1,
+   *   compaction_2
+   *   event_4
+   * ]
+   * 
+ * + * Compaction events are always strictly in order based on event timestamp. + * + * @param events the list of event to filter. + * @return a new list with compaction applied. + */ + private List processCompactionEvent(List events) { + List result = new ArrayList<>(); + ListIterator iter = events.listIterator(events.size()); + Long lastCompactionStartTime = null; + + while (iter.hasPrevious()) { + Event event = iter.previous(); + EventCompaction compaction = event.actions().compaction().orElse(null); + if (compaction == null) { + if (lastCompactionStartTime == null || event.timestamp() < lastCompactionStartTime) { + result.add(event); + } + continue; + } + // Create a new event for the compaction event in the result. + result.add( + Event.builder() + .timestamp(compaction.endTimestamp()) + .author("model") + .content(compaction.compactedContent()) + .branch(event.branch()) + .invocationId(event.invocationId()) + .actions(event.actions()) + .build()); + lastCompactionStartTime = + lastCompactionStartTime == null + ? compaction.startTimestamp() + : Long.min(lastCompactionStartTime, compaction.startTimestamp()); + } + return Lists.reverse(result); + } + /** Whether the event is a reply from another agent. */ private static boolean isOtherAgentReply(String agentName, Event event) { return !agentName.isEmpty() diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index c774ff361..289c5503f 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -29,11 +29,15 @@ import com.google.adk.events.EventActions; import com.google.adk.flows.llmflows.ResumabilityConfig; import com.google.adk.memory.BaseMemoryService; +import com.google.adk.models.Model; import com.google.adk.plugins.BasePlugin; import com.google.adk.plugins.PluginManager; import com.google.adk.sessions.BaseSessionService; import com.google.adk.sessions.InMemorySessionService; import com.google.adk.sessions.Session; +import com.google.adk.summarizer.EventsCompactionConfig; +import com.google.adk.summarizer.LlmEventSummarizer; +import com.google.adk.summarizer.SlidingWindowEventCompactor; import com.google.adk.tools.BaseTool; import com.google.adk.tools.FunctionTool; import com.google.adk.utils.CollectionUtils; @@ -68,6 +72,7 @@ public class Runner { @Nullable private final BaseMemoryService memoryService; private final PluginManager pluginManager; private final ResumabilityConfig resumabilityConfig; + @Nullable private final EventsCompactionConfig eventsCompactionConfig; /** Builder for {@link Runner}. */ public static class Builder { @@ -78,6 +83,7 @@ public static class Builder { @Nullable private BaseMemoryService memoryService = null; private List plugins = ImmutableList.of(); private ResumabilityConfig resumabilityConfig = new ResumabilityConfig(); + @Nullable private EventsCompactionConfig eventsCompactionConfig; @CanIgnoreReturnValue public Builder agent(BaseAgent agent) { @@ -121,6 +127,12 @@ public Builder resumabilityConfig(ResumabilityConfig resumabilityConfig) { return this; } + @CanIgnoreReturnValue + public Builder eventsCompactionConfig(EventsCompactionConfig eventsCompactionConfig) { + this.eventsCompactionConfig = eventsCompactionConfig; + return this; + } + public Runner build() { if (agent == null) { throw new IllegalStateException("Agent must be provided."); @@ -141,7 +153,8 @@ public Runner build() { sessionService, memoryService, plugins, - resumabilityConfig); + resumabilityConfig, + eventsCompactionConfig); } } @@ -208,6 +221,32 @@ public Runner( @Nullable BaseMemoryService memoryService, List plugins, ResumabilityConfig resumabilityConfig) { + this( + agent, + appName, + artifactService, + sessionService, + memoryService, + plugins, + resumabilityConfig, + null); + } + + /** + * Creates a new {@code Runner} with a list of plugins and resumability config. + * + * @deprecated Use {@link Runner.Builder} instead. + */ + @Deprecated + protected Runner( + BaseAgent agent, + String appName, + BaseArtifactService artifactService, + BaseSessionService sessionService, + @Nullable BaseMemoryService memoryService, + List plugins, + ResumabilityConfig resumabilityConfig, + @Nullable EventsCompactionConfig eventsCompactionConfig) { this.agent = agent; this.appName = appName; this.artifactService = artifactService; @@ -215,6 +254,10 @@ public Runner( this.memoryService = memoryService; this.pluginManager = new PluginManager(plugins); this.resumabilityConfig = resumabilityConfig; + this.eventsCompactionConfig = + Optional.ofNullable(eventsCompactionConfig) + .map(c -> createEventsCompactionConfig(agent, c)) + .orElse(null); } /** @@ -493,7 +536,10 @@ public Flowable runAsync( Completable.defer( () -> pluginManager.runAfterRunCallback( - contextWithUpdatedSession))); + contextWithUpdatedSession))) + .concatWith( + Completable.defer( + () -> compactEvents(updatedSession))); }); })) .doOnError( @@ -509,6 +555,13 @@ public Flowable runAsync( } } + private Completable compactEvents(Session session) { + return Optional.ofNullable(eventsCompactionConfig) + .map(SlidingWindowEventCompactor::new) + .map(c -> c.compact(session, sessionService)) + .orElse(Completable.complete()); + } + private void copySessionStates(Session source, Session target) { // TODO: remove this hack when deprecating all runAsync with Session. for (var entry : source.state().entrySet()) { @@ -740,5 +793,27 @@ private boolean hasLiveRequestQueueParameter(FunctionTool functionTool) { .anyMatch(parameter -> parameter.getType().equals(LiveRequestQueue.class)); } + /** + * Creates a new {@link EventsCompactionConfig} based on the given configuration. If the {@link + * com.google.adk.summarizer.BaseEventSummarizer} is missing, it will be default to the {@link + * LlmEventSummarizer} using the same model as the LLM base agent. + */ + private static EventsCompactionConfig createEventsCompactionConfig( + BaseAgent agent, EventsCompactionConfig config) { + return new EventsCompactionConfig( + config.compactionInterval(), + config.overlapSize(), + config + .summarizer() + .or( + () -> + Optional.of(agent) + .filter(LlmAgent.class::isInstance) + .map(LlmAgent.class::cast) + .map(LlmAgent::resolvedModel) + .flatMap(Model::model) + .map(LlmEventSummarizer::new))); + } + // TODO: run statelessly } diff --git a/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java index c59e656ca..09eebbacd 100644 --- a/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java +++ b/core/src/main/java/com/google/adk/summarizer/SlidingWindowEventCompactor.java @@ -12,6 +12,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class performs events compaction in a sliding window fashion based on the {@link @@ -19,13 +21,18 @@ */ public final class SlidingWindowEventCompactor implements EventCompactor { + private static final Logger logger = LoggerFactory.getLogger(SlidingWindowEventCompactor.class); + private final EventsCompactionConfig config; private final BaseEventSummarizer summarizer; public SlidingWindowEventCompactor(EventsCompactionConfig config) { this.config = config; - // TODO default to LLM summarizer - this.summarizer = config.summarizer().orElseThrow(); + this.summarizer = + config + .summarizer() + .orElseThrow( + () -> new IllegalArgumentException("Summarizer is required for event compaction.")); } /** @@ -80,6 +87,8 @@ public SlidingWindowEventCompactor(EventsCompactionConfig config) { */ @Override public Completable compact(Session session, BaseSessionService sessionService) { + logger.debug("Running event compaction for session {}", session.id()); + return Completable.fromMaybe( getCompactionEvents(session) .flatMap(summarizer::summarizeEvents) diff --git a/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java b/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java index 1cb0c8771..86ecda7cb 100644 --- a/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java +++ b/core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java @@ -17,12 +17,15 @@ package com.google.adk.flows.llmflows; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.truth.Correspondence.transforming; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LlmAgent; import com.google.adk.events.Event; +import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; import com.google.adk.models.LlmRequest; import com.google.adk.models.Model; import com.google.adk.sessions.Session; @@ -487,6 +490,77 @@ public void rearrangeHistory_gemini3interleavedFCFR_groupsFcAndFr() { .hasValue("tool2"); } + @Test + public void processRequest_singleCompaction() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createCompactedEvent(1, 2, "Summary 1-2"), + createUserEvent("env3", "content 3", "inv3", 3)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-2", "content 3"); + } + + @Test + public void processRequest_startsWithCompaction() { + ImmutableList events = + ImmutableList.of( + createCompactedEvent(1, 2, "Summary 1-2"), + createUserEvent("env3", "content 3", "inv3", 3), + createUserEvent("env4", "content 4", "inv4", 4)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-2", "content 3", "content 4"); + } + + @Test + public void processRequest_endsWithCompaction() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createUserEvent("env3", "content 3", "inv3", 2), + createCompactedEvent(2, 3, "Summary 2-3")); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("content 1", "Summary 2-3"); + } + + @Test + public void processRequest_multipleCompactions() { + ImmutableList events = + ImmutableList.of( + createUserEvent("env1", "content 1", "inv1", 1), + createUserEvent("env2", "content 2", "inv2", 2), + createUserEvent("env3", "content 3", "inv3", 3), + createUserEvent("env4", "content 4", "inv4", 4), + createCompactedEvent(1, 4, "Summary 1-4"), + createUserEvent("env5", "content 5", "inv5", 5), + createUserEvent("env6", "content 6", "inv6", 6), + createUserEvent("env7-1", "content 7-1", "inv7", 7), + createUserEvent("env7-2", "content 7-2", "inv8", 8), + createUserEvent("env9", "content 9", "inv9", 9), + createCompactedEvent(6, 9, "Summary 6-9"), + createUserEvent("env10", "content 10", "inv10", 10)); + + List contents = runContentsProcessor(events); + assertThat(contents) + .comparingElementsUsing( + transforming((Content c) -> c.parts().get().get(0).text().get(), "content text")) + .containsExactly("Summary 1-4", "content 5", "Summary 6-9", "content 10"); + } + private static Event createUserEvent(String id, String text) { return Event.builder() .id(id) @@ -496,6 +570,17 @@ private static Event createUserEvent(String id, String text) { .build(); } + private static Event createUserEvent( + String id, String text, String invocationId, long timestamp) { + return Event.builder() + .id(id) + .author(USER) + .content(Optional.of(Content.fromParts(Part.fromText(text)))) + .invocationId(invocationId) + .timestamp(timestamp) + .build(); + } + private static Event createAgentEvent(String id, String text) { return createAgentEvent(AGENT, id, text); } @@ -684,4 +769,22 @@ private static ImmutableList eventsToContents(List events) { .map(Optional::get) .collect(toImmutableList()); } + + private Event createCompactedEvent(long startTimestamp, long endTimestamp, String content) { + return Event.builder() + .actions( + EventActions.builder() + .compaction( + EventCompaction.builder() + .startTimestamp(startTimestamp) + .endTimestamp(endTimestamp) + .compactedContent( + Content.builder() + .role("model") + .parts(Part.builder().text(content).build()) + .build()) + .build()) + .build()) + .build(); + } } diff --git a/core/src/test/java/com/google/adk/runner/RunnerTest.java b/core/src/test/java/com/google/adk/runner/RunnerTest.java index 52218a0e9..7a734deb2 100644 --- a/core/src/test/java/com/google/adk/runner/RunnerTest.java +++ b/core/src/test/java/com/google/adk/runner/RunnerTest.java @@ -17,6 +17,7 @@ package com.google.adk.runner; import static com.google.adk.testing.TestUtils.createLlmResponse; +import static com.google.adk.testing.TestUtils.createTestAgent; import static com.google.adk.testing.TestUtils.createTestAgentBuilder; import static com.google.adk.testing.TestUtils.createTestLlm; import static com.google.adk.testing.TestUtils.simplifyEvents; @@ -38,6 +39,7 @@ import com.google.adk.models.LlmResponse; import com.google.adk.plugins.BasePlugin; import com.google.adk.sessions.Session; +import com.google.adk.summarizer.EventsCompactionConfig; import com.google.adk.testing.TestLlm; import com.google.adk.testing.TestUtils; import com.google.adk.testing.TestUtils.EchoTool; @@ -119,6 +121,45 @@ public void tearDown() { Telemetry.setTracerForTesting(originalTracer); } + @Test + public void eventsCompaction_enabled() { + LlmAgent agent = + createTestAgent( + createTestLlm( + createLlmResponse(createContent("llm 1")), + createLlmResponse(createContent("summary 1")), + createLlmResponse(createContent("llm 2")), + createLlmResponse(createContent("summary 2")))); + + Runner runner = + Runner.builder() + .eventsCompactionConfig(new EventsCompactionConfig(1, 0)) + .agent(agent) + .sessionService(this.runner.sessionService()) + .appName(this.runner.appName()) + .build(); + var events = + runner.runAsync("user", session.id(), createContent("user 1")).toList().blockingGet(); + assertThat(simplifyEvents(events)).containsExactly("test agent: llm 1"); + + events = runner.runAsync("user", session.id(), createContent("user 2")).toList().blockingGet(); + assertThat(simplifyEvents(events)).containsExactly("test agent: llm 2"); + + Session updatedSession = + runner + .sessionService() + .getSession(session.appName(), session.userId(), session.id(), Optional.empty()) + .blockingGet(); + assertThat(simplifyEvents(updatedSession.events())) + .containsExactly( + "user: user 1", + "test agent: llm 1", + "user: summary 1", + "user: user 2", + "test agent: llm 2", + "user: summary 2"); + } + @Test public void pluginDoesNothing() { var events = diff --git a/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java index 8fcc104ec..6ec2fb42c 100644 --- a/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java +++ b/core/src/test/java/com/google/adk/summarizer/SlidingWindowEventCompactorTest.java @@ -2,6 +2,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -63,6 +64,26 @@ public void compaction_notEnoughInvocations() { verify(mockSessionService, never()).appendEvent(any(), any()); } + @Test + public void compaction_notEnoughInvocationsAfterCompact() { + EventCompactor compactor = + new SlidingWindowEventCompactor( + new EventsCompactionConfig(2, 0, Optional.of(mockSummarizer))); + Session session = + Session.builder("id") + .events( + ImmutableList.of( + Event.builder().invocationId("1").timestamp(1).build(), + Event.builder().invocationId("2").timestamp(2).build(), + createCompactedEvent(1, 2, "Summary 1-2"), + Event.builder().invocationId("3").timestamp(3).build())) + .build(); + + compactor.compact(session, mockSessionService).blockingSubscribe(); + verify(mockSummarizer, never()).summarizeEvents(anyList()); + verify(mockSessionService, never()).appendEvent(any(), any()); + } + @Test public void compaction_firstCompaction() { EventCompactor compactor = diff --git a/core/src/test/java/com/google/adk/testing/TestUtils.java b/core/src/test/java/com/google/adk/testing/TestUtils.java index e4c2949eb..d21a19f46 100644 --- a/core/src/test/java/com/google/adk/testing/TestUtils.java +++ b/core/src/test/java/com/google/adk/testing/TestUtils.java @@ -26,6 +26,7 @@ import com.google.adk.agents.RunConfig; import com.google.adk.events.Event; import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; import com.google.adk.models.BaseLlm; import com.google.adk.models.LlmResponse; import com.google.adk.sessions.InMemorySessionService; @@ -89,7 +90,8 @@ public static ImmutableList simplifyEvents(List events) { private static String formatEventContent(Event event) { return event .content() - .flatMap(content -> content.parts()) + .or(() -> event.actions().compaction().map(EventCompaction::compactedContent)) + .flatMap(Content::parts) .map( parts -> { if (parts.size() == 1) {