Skip to content

Commit 5af3432

Browse files
google-genai-botcopybara-github
authored andcommitted
fix: Fix race condition and stale session in ADK Runner
PiperOrigin-RevId: 896045084
1 parent 51f4d1f commit 5af3432

5 files changed

Lines changed: 363 additions & 20 deletions

File tree

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.adk.utils.CollectionUtils;
4646
import com.google.common.base.Preconditions;
4747
import com.google.common.collect.ImmutableList;
48+
import com.google.common.collect.MapMaker;
4849
import com.google.errorprone.annotations.CanIgnoreReturnValue;
4950
import com.google.genai.types.AudioTranscriptionConfig;
5051
import com.google.genai.types.Content;
@@ -57,13 +58,16 @@
5758
import io.reactivex.rxjava3.core.Flowable;
5859
import io.reactivex.rxjava3.core.Maybe;
5960
import io.reactivex.rxjava3.core.Single;
61+
import io.reactivex.rxjava3.subjects.CompletableSubject;
62+
import java.time.Instant;
6063
import java.util.ArrayList;
6164
import java.util.Arrays;
6265
import java.util.Collections;
6366
import java.util.List;
6467
import java.util.Map;
6568
import java.util.Optional;
6669
import java.util.concurrent.ConcurrentHashMap;
70+
import java.util.concurrent.ConcurrentMap;
6771
import org.jspecify.annotations.Nullable;
6872

6973
/** The main class for the GenAI Agents runner. */
@@ -76,6 +80,8 @@ public class Runner {
7680
private final PluginManager pluginManager;
7781
@Nullable private final EventsCompactionConfig eventsCompactionConfig;
7882
@Nullable private final ContextCacheConfig contextCacheConfig;
83+
private final ConcurrentMap<String, Completable> activeSessionCompletables =
84+
new MapMaker().weakValues().makeMap();
7985

8086
/** Builder for {@link Runner}. */
8187
public static class Builder {
@@ -380,25 +386,56 @@ public Flowable<Event> runAsync(
380386
Content newMessage,
381387
RunConfig runConfig,
382388
@Nullable Map<String, Object> stateDelta) {
389+
Flowable<Event> result =
390+
Flowable.defer(
391+
() ->
392+
this.sessionService
393+
.getSession(appName, userId, sessionId, Optional.empty())
394+
.switchIfEmpty(
395+
Single.defer(
396+
() -> {
397+
if (runConfig.autoCreateSession()) {
398+
return this.sessionService.createSession(
399+
appName, userId, (Map<String, Object>) null, sessionId);
400+
}
401+
return Single.error(
402+
new IllegalArgumentException(
403+
String.format(
404+
"Session not found: %s for user %s",
405+
sessionId, userId)));
406+
}))
407+
.flatMapPublisher(
408+
session ->
409+
this.runAsyncImpl(session, newMessage, runConfig, stateDelta)))
410+
.compose(Tracing.trace("invocation"));
411+
383412
return Flowable.defer(
384-
() ->
385-
this.sessionService
386-
.getSession(appName, userId, sessionId, Optional.empty())
387-
.switchIfEmpty(
388-
Single.defer(
389-
() -> {
390-
if (runConfig.autoCreateSession()) {
391-
return this.sessionService.createSession(
392-
appName, userId, (Map<String, Object>) null, sessionId);
393-
}
394-
return Single.error(
395-
new IllegalArgumentException(
396-
String.format(
397-
"Session not found: %s for user %s", sessionId, userId)));
398-
}))
399-
.flatMapPublisher(
400-
session -> this.runAsyncImpl(session, newMessage, runConfig, stateDelta)))
401-
.compose(Tracing.trace("invocation"));
413+
() -> {
414+
CompletableSubject requestCompletion = CompletableSubject.create();
415+
416+
Completable[] previousHolder = new Completable[1];
417+
418+
activeSessionCompletables.compute(
419+
sessionId,
420+
(key, current) -> {
421+
previousHolder[0] = current;
422+
return requestCompletion;
423+
});
424+
425+
Completable previous = previousHolder[0];
426+
427+
Flowable<Event> execution =
428+
result.doFinally(
429+
() -> {
430+
requestCompletion.onComplete();
431+
activeSessionCompletables.remove(sessionId, requestCompletion);
432+
});
433+
434+
if (previous == null) {
435+
return execution;
436+
}
437+
return previous.onErrorComplete().andThen(execution);
438+
});
402439
}
403440

404441
/** See {@link #runAsync(String, String, Content, RunConfig, Map)}. */
@@ -540,6 +577,8 @@ private Flowable<Event> runAgentWithUpdatedSession(
540577
registeredEvent -> {
541578
// TODO: remove this hack after deprecating runAsync with Session.
542579
copySessionStates(updatedSession, initialContext.session());
580+
updatedSession.lastUpdateTime(
581+
Instant.ofEpochMilli(registeredEvent.timestamp()));
543582
return contextWithUpdatedSession
544583
.pluginManager()
545584
.onEventCallback(contextWithUpdatedSession, registeredEvent)
@@ -740,6 +779,9 @@ private BaseAgent findAgentToRun(Session session, BaseAgent rootAgent) {
740779

741780
for (Event event : events) {
742781
String author = event.author();
782+
if (author == null) {
783+
continue;
784+
}
743785
if (author.equals("user")) {
744786
continue;
745787
}

core/src/main/java/com/google/adk/sessions/Session.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public Builder userId(String userId) {
123123
@CanIgnoreReturnValue
124124
@JsonProperty("events")
125125
public Builder events(List<Event> events) {
126-
this.events = Collections.synchronizedList(events);
126+
this.events = Collections.synchronizedList(new ArrayList<>(events));
127127
return this;
128128
}
129129

0 commit comments

Comments
 (0)