Skip to content

Commit aab0f29

Browse files
damianmomotgooglecopybara-github
authored andcommitted
fix: pre-merge stateDelta before onUserMessageCallback in Runner
Previously, stateDelta passed to runAsync was merged into the session only after pluginManager.onUserMessageCallback was invoked, so plugins observed a stale session state during the user-message callback. PiperOrigin-RevId: 921350398
1 parent 198b2fb commit aab0f29

2 files changed

Lines changed: 42 additions & 0 deletions

File tree

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,12 @@ protected Flowable<Event> runAsyncImpl(
485485
BaseAgent rootAgent = this.agent;
486486
String invocationId = InvocationContext.newInvocationContextId();
487487

488+
// Pre-merge stateDelta so onUserMessageCallback can access it.
489+
// Safe: session is a copy; persistence still happens via appendNewMessageToSession.
490+
if (stateDelta != null && !stateDelta.isEmpty()) {
491+
stateDelta.forEach((key, value) -> session.state().put(key, value));
492+
}
493+
488494
// Create initial context
489495
InvocationContext initialContext =
490496
newInvocationContextBuilder(session)

core/src/test/java/com/google/adk/runner/RunnerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,42 @@ public void beforeRunCallback_withStateDelta_seesMergedState() {
11221122
assertThat(sessionInCallback.state()).containsEntry("number", 123);
11231123
}
11241124

1125+
@Test
1126+
public void onUserMessageCallback_withStateDelta_seesMergedState() {
1127+
// Snapshot the session state *inside* the callback, otherwise the assertion would
1128+
// observe the post-runAsync state which is mutated by appendEvent regardless of whether
1129+
// the pre-merge in Runner is applied.
1130+
AtomicReference<ConcurrentHashMap<String, Object>> stateInCallback = new AtomicReference<>();
1131+
when(plugin.onUserMessageCallback(any(), any()))
1132+
.thenAnswer(
1133+
invocation -> {
1134+
InvocationContext ctx = invocation.getArgument(0);
1135+
stateInCallback.set(new ConcurrentHashMap<>(ctx.session().state()));
1136+
return Maybe.empty();
1137+
});
1138+
1139+
ImmutableMap<String, Object> stateDelta =
1140+
ImmutableMap.of("callback_key", "callback_value", "number", 123);
1141+
1142+
var unused =
1143+
runner
1144+
.runAsync(
1145+
"user",
1146+
session.id(),
1147+
createContent("test with state"),
1148+
RunConfig.builder().build(),
1149+
stateDelta)
1150+
.toList()
1151+
.blockingGet();
1152+
1153+
// Verify onUserMessageCallback was called
1154+
verify(plugin).onUserMessageCallback(any(), any());
1155+
1156+
// Verify state delta was merged before onUserMessageCallback was invoked
1157+
assertThat(stateInCallback.get()).containsEntry("callback_key", "callback_value");
1158+
assertThat(stateInCallback.get()).containsEntry("number", 123);
1159+
}
1160+
11251161
@Test
11261162
public void runAsync_ensureEventsAreAppendedInOrder() throws Exception {
11271163
Event event1 = TestUtils.createEvent("1");

0 commit comments

Comments
 (0)