Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from livekit.plugins.google.realtime.api_proto import ClientEvents, LiveAPIModels, Voice

from ..log import logger
from ..utils import create_tools_config, get_tool_results_for_realtime
from ..utils import create_function_response, create_tools_config, get_tool_results_for_realtime
from ..version import __version__

INPUT_AUDIO_SAMPLE_RATE = 16000
Expand All @@ -44,6 +44,9 @@

lk_google_debug = int(os.getenv("LK_GOOGLE_DEBUG", 0))

# stop rejecting tool calls after this many in a row to avoid a loop (tool_choice="none")
MAX_TOOL_CALL_REJECTIONS = 3

# Known VertexAI models for the Live API
# See: https://docs.cloud.google.com/vertex-ai/generative-ai/docs/live-api
KNOWN_VERTEXAI_MODELS: frozenset[str] = frozenset(
Expand Down Expand Up @@ -148,6 +151,7 @@ class _RealtimeOptions:
api_version: NotGivenOr[str] = NOT_GIVEN
tool_behavior: NotGivenOr[types.Behavior] = NOT_GIVEN
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN
thinking_config: NotGivenOr[types.ThinkingConfig] = NOT_GIVEN
session_resumption: NotGivenOr[types.SessionResumptionConfig] = NOT_GIVEN
credentials: google.auth.credentials.Credentials | None = None
Expand Down Expand Up @@ -488,6 +492,10 @@ def __init__(self, realtime_model: RealtimeModel) -> None:
self._session_should_close = asyncio.Event()
self._response_created_futures: dict[str, asyncio.Future[llm.GenerationCreatedEvent]] = {}
self._pending_generation_fut: asyncio.Future[llm.GenerationCreatedEvent] | None = None
# number of tool calls rejected in the current tool_choice="none" turn; non-zero also
# means we're draining that turn's trailing events (which have no generation to attach
# to). reset when the next generation starts.
self._rejected_tool_calls = 0

self._session_resumption_handle: str | None = (
self._opts.session_resumption.handle
Expand Down Expand Up @@ -554,7 +562,19 @@ def update_options(
# no need to restart

if is_given(tool_choice):
logger.warning("tool_choice is not supported by the Google Realtime API.")
# no per-response tool_choice on Gemini; "none" is emulated by rejecting any tool
# call emitted during the turn (see _reject_tool_calls).
self._opts.tool_choice = tool_choice
if tool_choice == "none":
logger.warning(
"the Google Realtime API has no tool_choice='none'; tool calls emitted "
"this turn will be rejected so the model replies directly."
)
elif tool_choice not in (None, "auto"):
logger.warning(
f"tool_choice='{tool_choice}' is not supported by the Google Realtime API, "
"falling back to 'auto'."
)

if should_restart:
self._mark_restart_needed()
Expand Down Expand Up @@ -1016,6 +1036,13 @@ async def _recv_task(self, session: AsyncSession) -> None:
part["inline_data"] = "<audio>"
logger.debug("<<< received response", extra={"response": resp_copy})

if response.tool_call and self._opts.tool_choice == "none":
# reject without opening a generation, so the pending generate_reply
# stays bound to the model's eventual reply and tools stay suppressed
# for the whole turn.
self._reject_tool_calls(response.tool_call.function_calls or [])
continue
Comment on lines +1039 to +1044

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 The continue on rejected tool calls skips processing of co-occurring response fields

When tool_choice="none" and a response.tool_call is present, the continue at line 1044 skips ALL other processing for that response — including session_resumption_update, tool_call_cancellation, usage_metadata, and critically go_away (which signals an upcoming server disconnection). If any of these fields co-occur with tool_call in the same LiveServerMessage, they would be silently dropped. In practice, the Gemini API likely sends these as separate messages, but if go_away ever accompanies a tool_call, the session wouldn't prepare for disconnection. The usage_metadata case is partially mitigated by the _rejected_tool_calls guard in _handle_usage_metadata.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if not self._current_generation or self._current_generation._done:
if (sc := response.server_content) and sc.interrupted:
# two cases an interrupted event is sent without an active generation
Expand Down Expand Up @@ -1133,6 +1160,7 @@ def _build_connect_config(self) -> types.LiveConnectConfig:
return conf

def _start_new_generation(self) -> None:
self._rejected_tool_calls = 0
if self._current_generation and not self._current_generation._done:
logger.warning("starting new generation while another is active. Finalizing previous.")
self._mark_current_generation_done()
Expand Down Expand Up @@ -1184,7 +1212,13 @@ def _start_new_generation(self) -> None:
def _handle_server_content(self, server_content: types.LiveServerContent) -> None:
current_gen = self._current_generation
if not current_gen:
logger.warning("received server content but no active generation.")
if self._rejected_tool_calls:
logger.debug(
"ignoring server content from a rejected tool call turn",
extra={"server_content": server_content.model_dump_json(exclude_none=True)},
)
else:
logger.warning("received server content but no active generation.")
return

if model_turn := server_content.model_turn:
Expand Down Expand Up @@ -1302,6 +1336,38 @@ def _handle_input_speech_stopped(self) -> None:
llm.InputSpeechStoppedEvent(user_transcription_enabled=False),
)

def _reject_tool_calls(self, function_calls: list[types.FunctionCall]) -> None:
if not function_calls:
return

self._rejected_tool_calls += 1
extra = {"functions": [fnc_call.name for fnc_call in function_calls]}
if self._rejected_tool_calls > MAX_TOOL_CALL_REJECTIONS:
# stop responding to break the loop; the user can still interrupt by voice
if self._rejected_tool_calls == MAX_TOOL_CALL_REJECTIONS + 1:
logger.error(
"model keeps calling tools despite tool_choice='none'; "
f"stopping after {MAX_TOOL_CALL_REJECTIONS} rejections to avoid a loop",
extra=extra,
)
return

logger.warning("rejecting tool call requested while tool_choice='none'", extra=extra)
responses = [
create_function_response(
llm.FunctionCallOutput(
name=fnc_call.name or "",
call_id=fnc_call.id or "",
output="Tool calls are disabled for this turn, respond to the user directly.",
is_error=True,
),
vertexai=self._opts.vertexai,
tool_response_scheduling=self._opts.tool_response_scheduling,
)
for fnc_call in function_calls
]
self._send_client_event(types.LiveClientToolResponse(function_responses=responses))

def _handle_tool_calls(self, tool_call: types.LiveServerToolCall) -> None:
if not self._current_generation:
logger.warning("received tool call but no active generation.")
Expand Down Expand Up @@ -1331,7 +1397,10 @@ def _handle_tool_call_cancellation(
def _handle_usage_metadata(self, usage_metadata: types.UsageMetadata) -> None:
current_gen = self._current_generation
if not current_gen:
logger.warning("no active generation to report metrics for")
if self._rejected_tool_calls:
logger.debug("ignoring usage metadata from a rejected tool call turn")
else:
logger.warning("no active generation to report metrics for")
return

ttft = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,40 @@ def create_tools_config(
return gemini_tools


def create_function_response(
output: llm.FunctionCallOutput,
*,
vertexai: bool = False,
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN,
) -> types.FunctionResponse:
res = types.FunctionResponse(
name=output.name,
response={"error": output.output} if output.is_error else {"output": output.output},
Comment thread
longcw marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Behavioral change in error response format for get_tool_results_for_realtime

The refactoring of create_function_response introduces a behavioral change: when is_error=True, the function response dict key changes from {"output": msg} (old behavior in get_tool_results_for_realtime) to {"error": msg} (new behavior). This affects all tool execution failures sent via update_chat_ctxget_tool_results_for_realtime. While likely intentional (and arguably more correct since it signals errors differently to the model), this is a semantic change to an existing code path that could subtly affect model behavior for error-case tool responses. The Gemini API's FunctionResponse.response field is a generic dict, so the key name is what the model "sees" — changing it from "output" to "error" may change how the model interprets failed tool calls.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

)
if is_given(tool_response_scheduling):
# vertexai currently doesn't support the scheduling parameter, gemini api defaults to idle
# it's the user's responsibility to avoid this parameter when using vertexai
res.scheduling = tool_response_scheduling
if not vertexai:
# vertexai does not support id in FunctionResponse
# see: https://github.com/googleapis/python-genai/blob/85e00bc/google/genai/_live_converters.py#L1435
res.id = output.call_id
return res


def get_tool_results_for_realtime(
chat_ctx: llm.ChatContext,
*,
vertexai: bool = False,
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN,
) -> types.LiveClientToolResponse | None:
function_responses: list[types.FunctionResponse] = []
for msg in chat_ctx.items:
if msg.type == "function_call_output":
res = types.FunctionResponse(
name=msg.name,
response={"output": msg.output},
)
if is_given(tool_response_scheduling):
# vertexai currently doesn't support the scheduling parameter, gemini api defaults to idle
# it's the user's responsibility to avoid this parameter when using vertexai
res.scheduling = tool_response_scheduling
if not vertexai:
# vertexai does not support id in FunctionResponse
# see: https://github.com/googleapis/python-genai/blob/85e00bc/google/genai/_live_converters.py#L1435
res.id = msg.call_id
function_responses.append(res)
function_responses = [
create_function_response(
msg, vertexai=vertexai, tool_response_scheduling=tool_response_scheduling
)
for msg in chat_ctx.items
if msg.type == "function_call_output"
]
return (
types.LiveClientToolResponse(function_responses=function_responses)
if function_responses
Expand Down