Skip to content

Commit 7fa13ed

Browse files
wuliang229copybara-github
authored andcommitted
fix(live): Buffer tool calls and emit them together upon turn completion
The `receive` method now accumulates function calls from multiple `LiveServerMessage` instances. These accumulated tool calls are yielded as a single `LlmResponse` containing all function call parts only when a turn_complete message is received. Without the change, the tool_1's response is sent to the model as soon as it's generated, triggering a second call for tool_2. Upon receiving two consecutive tool_2's responses, the model utters the same message twice. Fixes issue #4902 Co-authored-by: Liang Wu <wuliang@google.com> PiperOrigin-RevId: 893197482
1 parent 642d337 commit 7fa13ed

File tree

5 files changed

+227
-18
lines changed

5 files changed

+227
-18
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Simple Live (Bidi-Streaming) Agent with Parallel Tools
2+
This project provides a basic example of a live, [bidirectional streaming](https://google.github.io/adk-docs/streaming/) agent that demonstrates parallel tool execution.
3+
4+
## Getting Started
5+
6+
Follow these steps to get the agent up and running:
7+
8+
1. **Start the ADK Web Server**
9+
Open your terminal, navigate to the root directory that contains the
10+
`live_bidi_streaming_parallel_tools_agent` folder, and execute the following
11+
command:
12+
```bash
13+
adk web
14+
```
15+
16+
2. **Access the ADK Web UI**
17+
Once the server is running, open your web browser and navigate to the URL
18+
provided in the terminal (it will typically be `http://localhost:8000`).
19+
20+
3. **Select the Agent**
21+
In the top-left corner of the ADK Web UI, use the dropdown menu to select
22+
this agent (`live_bidi_streaming_parallel_tools_agent`).
23+
24+
4. **Start Streaming**
25+
Click on the **Audio** icon located near the chat input
26+
box to begin the streaming session.
27+
28+
5. **Interact with the Agent**
29+
You can now begin talking to the agent, and it will respond in real-time.
30+
Try asking it to perform multiple actions at once, for example: "Turn on the
31+
lights and the TV at the same time." The agent will be able to invoke both
32+
`turn_on_lights` and `turn_on_tv` tools in parallel.
33+
34+
## Usage Notes
35+
36+
* You only need to click the **Audio** button once to initiate the
37+
stream. The current version does not support stopping and restarting the stream
38+
by clicking the button again during a session.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from . import agent
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from google.adk.agents.llm_agent import Agent
17+
18+
19+
def turn_on_lights():
20+
"""Turn on the lights."""
21+
print("turn_on_lights")
22+
return {"status": "OK"}
23+
24+
25+
def turn_on_tv():
26+
"""Turn on the tv."""
27+
print("turn_on_tv")
28+
return {"status": "OK"}
29+
30+
31+
root_agent = Agent(
32+
model="gemini-live-2.5-flash-native-audio",
33+
name="Home_helper",
34+
instruction="Be polite and answer all user's questions.",
35+
tools=[turn_on_lights, turn_on_tv],
36+
)

src/google/adk/models/gemini_llm_connection.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]:
203203
"""
204204

205205
text = ''
206+
tool_call_parts = []
206207
async with Aclosing(self._gemini_session.receive()) as agen:
207208
# TODO(b/440101573): Reuse StreamingResponseAggregator to accumulate
208209
# partial content and emit responses as needed.
@@ -332,6 +333,13 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]:
332333
if text:
333334
yield self.__build_full_text_response(text)
334335
text = ''
336+
if tool_call_parts:
337+
logger.debug('Returning aggregated tool_call_parts')
338+
yield LlmResponse(
339+
content=types.Content(role='model', parts=tool_call_parts),
340+
model_version=self._model_version,
341+
)
342+
tool_call_parts = []
335343
yield LlmResponse(
336344
turn_complete=True,
337345
interrupted=message.server_content.interrupted,
@@ -353,17 +361,14 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]:
353361
model_version=self._model_version,
354362
)
355363
if message.tool_call:
364+
logger.debug('Received tool call: %s', message.tool_call)
356365
if text:
357366
yield self.__build_full_text_response(text)
358367
text = ''
359-
parts = [
368+
tool_call_parts.extend([
360369
types.Part(function_call=function_call)
361370
for function_call in message.tool_call.function_calls
362-
]
363-
yield LlmResponse(
364-
content=types.Content(role='model', parts=parts),
365-
model_version=self._model_version,
366-
)
371+
])
367372
if message.session_resumption_update:
368373
logger.debug('Received session resumption message: %s', message)
369374
yield (
@@ -372,6 +377,12 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]:
372377
model_version=self._model_version,
373378
)
374379
)
380+
if tool_call_parts:
381+
logger.debug('Exited loop with pending tool_call_parts')
382+
yield LlmResponse(
383+
content=types.Content(role='model', parts=tool_call_parts),
384+
model_version=self._model_version,
385+
)
375386

376387
async def close(self):
377388
"""Closes the llm server connection."""

tests/unittests/models/test_gemini_llm_connection.py

Lines changed: 121 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -933,33 +933,142 @@ async def test_receive_tool_call_and_grounding_metadata_with_native_audio(
933933
mock_metadata_msg.tool_call = None
934934
mock_metadata_msg.session_resumption_update = None
935935

936+
# 3. Message with turn_complete
937+
mock_turn_complete_content = mock.create_autospec(
938+
types.LiveServerContent, instance=True
939+
)
940+
mock_turn_complete_content.model_turn = None
941+
mock_turn_complete_content.grounding_metadata = None
942+
mock_turn_complete_content.turn_complete = True
943+
mock_turn_complete_content.interrupted = False
944+
mock_turn_complete_content.input_transcription = None
945+
mock_turn_complete_content.output_transcription = None
946+
947+
mock_turn_complete_msg = mock.create_autospec(
948+
types.LiveServerMessage, instance=True
949+
)
950+
mock_turn_complete_msg.usage_metadata = None
951+
mock_turn_complete_msg.server_content = mock_turn_complete_content
952+
mock_turn_complete_msg.tool_call = None
953+
mock_turn_complete_msg.session_resumption_update = None
954+
936955
async def mock_receive_generator():
937956
yield mock_tool_call_msg
938957
yield mock_metadata_msg
958+
yield mock_turn_complete_msg
939959

940960
receive_mock = mock.Mock(return_value=mock_receive_generator())
941961
mock_gemini_session.receive = receive_mock
942962

943963
responses = [resp async for resp in connection.receive()]
944964

945-
assert len(responses) == 2
965+
assert len(responses) == 3
946966

947-
# First response: the tool call
967+
# First response: the audio content and grounding metadata
968+
assert responses[0].grounding_metadata == grounding_metadata
969+
assert responses[0].content == mock_content
948970
assert responses[0].content is not None
949971
assert responses[0].content.parts is not None
950-
assert responses[0].content.parts[0].function_call is not None
972+
assert responses[0].content.parts[0].inline_data == audio_blob
973+
974+
# Second response: the tool call, buffered until turn_complete
975+
assert responses[1].content is not None
976+
assert responses[1].content.parts is not None
977+
assert responses[1].content.parts[0].function_call is not None
951978
assert (
952-
responses[0].content.parts[0].function_call.name
979+
responses[1].content.parts[0].function_call.name
953980
== 'enterprise_web_search'
954981
)
955-
assert responses[0].content.parts[0].function_call.args == {
982+
assert responses[1].content.parts[0].function_call.args == {
956983
'query': 'Google stock price today'
957984
}
958-
assert responses[0].grounding_metadata is None
985+
assert responses[1].grounding_metadata is None
959986

960-
# Second response: the audio content and grounding metadata
961-
assert responses[1].grounding_metadata == grounding_metadata
962-
assert responses[1].content == mock_content
963-
assert responses[1].content is not None
964-
assert responses[1].content.parts is not None
965-
assert responses[1].content.parts[0].inline_data == audio_blob
987+
# Third response: the turn_complete
988+
assert responses[2].turn_complete is True
989+
990+
991+
@pytest.mark.asyncio
992+
async def test_receive_multiple_tool_calls_buffered_until_turn_complete(
993+
gemini_connection, mock_gemini_session
994+
):
995+
"""Test receive buffers multiple tool call messages until turn complete."""
996+
# First tool call message
997+
mock_tool_call_msg1 = mock.create_autospec(
998+
types.LiveServerMessage, instance=True
999+
)
1000+
mock_tool_call_msg1.usage_metadata = None
1001+
mock_tool_call_msg1.server_content = None
1002+
mock_tool_call_msg1.session_resumption_update = None
1003+
1004+
function_call1 = types.FunctionCall(
1005+
name='tool_1',
1006+
args={'arg': 'value1'},
1007+
)
1008+
mock_tool_call1 = mock.create_autospec(
1009+
types.LiveServerToolCall, instance=True
1010+
)
1011+
mock_tool_call1.function_calls = [function_call1]
1012+
mock_tool_call_msg1.tool_call = mock_tool_call1
1013+
1014+
# Second tool call message
1015+
mock_tool_call_msg2 = mock.create_autospec(
1016+
types.LiveServerMessage, instance=True
1017+
)
1018+
mock_tool_call_msg2.usage_metadata = None
1019+
mock_tool_call_msg2.server_content = None
1020+
mock_tool_call_msg2.session_resumption_update = None
1021+
1022+
function_call2 = types.FunctionCall(
1023+
name='tool_2',
1024+
args={'arg': 'value2'},
1025+
)
1026+
mock_tool_call2 = mock.create_autospec(
1027+
types.LiveServerToolCall, instance=True
1028+
)
1029+
mock_tool_call2.function_calls = [function_call2]
1030+
mock_tool_call_msg2.tool_call = mock_tool_call2
1031+
1032+
# Turn complete message
1033+
mock_turn_complete_content = mock.create_autospec(
1034+
types.LiveServerContent, instance=True
1035+
)
1036+
mock_turn_complete_content.model_turn = None
1037+
mock_turn_complete_content.grounding_metadata = None
1038+
mock_turn_complete_content.turn_complete = True
1039+
mock_turn_complete_content.interrupted = False
1040+
mock_turn_complete_content.input_transcription = None
1041+
mock_turn_complete_content.output_transcription = None
1042+
1043+
mock_turn_complete_msg = mock.create_autospec(
1044+
types.LiveServerMessage, instance=True
1045+
)
1046+
mock_turn_complete_msg.usage_metadata = None
1047+
mock_turn_complete_msg.server_content = mock_turn_complete_content
1048+
mock_turn_complete_msg.tool_call = None
1049+
mock_turn_complete_msg.session_resumption_update = None
1050+
1051+
async def mock_receive_generator():
1052+
yield mock_tool_call_msg1
1053+
yield mock_tool_call_msg2
1054+
yield mock_turn_complete_msg
1055+
1056+
receive_mock = mock.Mock(return_value=mock_receive_generator())
1057+
mock_gemini_session.receive = receive_mock
1058+
1059+
responses = [resp async for resp in gemini_connection.receive()]
1060+
1061+
# Expected: One LlmResponse with both tool calls, then one with turn_complete
1062+
assert len(responses) == 2
1063+
1064+
# First response: single LlmResponse carrying both function calls
1065+
assert responses[0].content is not None
1066+
parts = responses[0].content.parts
1067+
assert len(parts) == 2
1068+
assert parts[0].function_call.name == 'tool_1'
1069+
assert parts[0].function_call.args == {'arg': 'value1'}
1070+
assert parts[1].function_call.name == 'tool_2'
1071+
assert parts[1].function_call.args == {'arg': 'value2'}
1072+
1073+
# Second response: turn_complete True
1074+
assert responses[1].turn_complete is True

0 commit comments

Comments
 (0)