feat: add bidirectional live streaming to Runner and LlmAgent#325
Open
tSte wants to merge 4 commits into
Open
Conversation
Implements Runner.runLive plus the underlying LlmAgent and connection plumbing so audio/video bidi sessions work end-to-end, bringing parity with adk-python's run_live. - Runner.runLive drives a live invocation via a LiveRequestQueue, defaults responseModalities to AUDIO, auto-enables transcription configs for multi-agent setups, and runs the plugin lifecycle. - LlmAgent.runLiveFlow runs the same preprocessors as runAsync, opens llm.connect, drains the live request queue on a parallel send loop, yields events from the receive loop, ferries function responses back to the open websocket, and recurses on transfer_to_agent. - GeminiLlmConnection bridges the GenAI Session callbacks into an AsyncGenerator, aggregates text, transcriptions, tool calls, session resumption updates, turn-complete and interruption signals. - BaseLlmConnection gains optional sendActivityStart/sendActivityEnd for manual activity boundary signalling. - BaseAgent.runLive now mirrors runAsync (before/after callbacks, abort handling, otel span) and dispatches to runLiveImpl. - Live model audio events with inline data are yielded but not persisted to the session to avoid storing raw blobs; transcription, tool, and usage events are persisted as in runAsync. - Adds 6 unit tests covering missing queue, missing session, blob forwarding, default modalities, audio-blob persistence skip, and function-call round-tripping.
Required for Gemini 3.1 realtime preview, which ignores
sendClientContent text turns and expects audio/video routed via
sendRealtimeInput.audio / sendRealtimeInput.video instead of media.
- sendRealtime now branches on blob.mimeType: audio/* uses
{audio: blob}, image/* uses {video: blob}, otherwise falls back
to the existing {media: blob} path.
- sendContent forwards a single user text part via
sendRealtimeInput.text so it interleaves with the audio stream.
Multi-part user content and non-user content keep using
sendClientContent.
- receive no longer breaks after turnComplete. The same call now
spans an entire conversation, surfacing turnComplete as an
in-stream signal and continuing until the websocket closes
(kind: 'close') or the consumer closes the connection.
Adds 12 unit tests in core/test/models/gemini_llm_connection_test.ts
covering sendRealtime mime routing, sendContent text/multi-part/
function-response paths, and the multi-turn receive() flow including
close and error termination.
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
adk-jscurrently lacks real-time bidirectional audio/video streaming, whileadk-pythonexposes the equivalent feature viaRunner.run_live. The TScodebase has had
// TODO - b/425992518: Implement runLive and related methods.inrunner.tssince the initial release,BaseAgent.runLivethrows
Live mode is not implemented yet.,LlmAgent.runLiveFlowthrowsLlmAgent.runLiveFlow not implemented, andGeminiLlmConnection.receivethrows
Not Implemented.. TheRunConfiginterface already exposesresponseModalitiesandspeechConfig, so the configuration layer existed— only the
runLive/LiveRequestQueueplumbing was missing.Solution:
Implement
Runner.runLiveend-to-end with the same surface area as thePython counterpart, including the supporting
LlmAgent,BaseAgent, andGemini live connection plumbing.
Runner.runLivedrives a live invocation via aLiveRequestQueue,defaults
responseModalitiestoAUDIO, auto-enables transcriptionconfigs for multi-agent setups, and runs the plugin lifecycle
(
beforeRun/onEvent/afterRun).LlmAgent.runLiveFlowruns the same preprocessors asrunAsync, opensllm.connect, drains the live request queue on a parallel send loop,yields events from the receive loop, ferries function responses back
to the open websocket, and recurses on
transfer_to_agent.GeminiLlmConnectionbridges the GenAISessioncallbacks into anAsyncGenerator, aggregating text, transcriptions, tool calls, sessionresumption updates, and turn-complete / interruption signals.
GeminiLlmConnection.sendRealtimeroutes by mime type —audio/*→sendRealtimeInput.audio,image/*→sendRealtimeInput.video,otherwise the existing
mediapath.sendContentforwards a singleuser text part via
sendRealtimeInput.textso it interleaves with theaudio stream. Both are required for the Gemini 3.1 realtime preview,
which ignores
sendClientContenttext turns and rejects audio senton the legacy
mediachannel.GeminiLlmConnection.receiveno longer terminates onturnComplete.A single
receive()call now spans an entire conversation: itsurfaces
turnCompleteas an in-stream signal and keeps iteratinguntil the websocket closes or the consumer closes the connection.
inlineDataare yielded but skippedwhen persisting (matching Python). Transcription, tool, and usage
events are persisted as in
runAsync.Testing Plan
Unit Tests:
All 18 unit tests added by this PR pass. The 17 failing tests are
pre-existing in
core/test/sessions/database_session_service_test.tsand
core/test/sessions/db/operations_test.ts; they fail because thelocal sandbox is missing a built
sqlite3native binding and areunrelated to this change. The same suite is green on machines where
the binding is built.
core/test/runner/run_live_test.tsadds six tests:runLivethrows whenliveRequestQueueis missing.runLivethrows when the session does not exist.model events (audio, text,
turnComplete) are yielded back.responseModalitiesdefaults toAUDIOand is propagated intoliveConnectConfig.inlineDataare NOT persisted to thesession, while transcription events ARE persisted.
response is sent back to the model over the open connection.
core/test/models/gemini_llm_connection_test.tsadds twelve tests:sendRealtimeroutesaudio/*blobs throughsendRealtimeInput.audio.sendRealtimeroutesimage/*blobs throughsendRealtimeInput.video.sendRealtimefalls back tosendRealtimeInput.mediafor othermime types.
sendRealtimefalls back tomediawhen mime type is missing.sendContentsends a single user text part viasendRealtimeInput.text.sendContentusessendClientContentfor multi-part user content.sendContentusessendClientContentfor non-user roles.sendContentroutes function responses throughsendToolResponse.sendContentthrows when content has no parts.receivekeeps iterating afterturnCompleteand surfaces eventsfor subsequent turns.
receiveterminates cleanly onclose.receiverethrows on websocket error.The full integration suite also passes:
npm run lint,npm run format:check, andnpm run docs:checkare clean.Manual End-to-End (E2E) Tests:
The reproducer below mirrors the Python
bidi-demo
sample but uses the Gemini 3.1 realtime preview model. It confirms that:
LiveRequestQueuereach the live model.runLive.turnCompleteflow through.Run with
GOOGLE_GENAI_API_KEY(or Vertex equivalents) set.Reviewers can also exercise this by porting the Python
bidi-demoagentscript to TS — the public
Runner.runLivesignature is intentionallycompatible.
Checklist
Additional context
This brings TS ADK to parity with
adk-pythonfor bidi live streaming.The implementation closely follows the Python design but diverges in two
places that matter for reviewers:
GenAI SDK bridging. The Python
genaiSDK exposessession.receive()as an async iterator. The JS SDK only providescallbacks (
onmessage/onerror/onclose). The newIncomingMessageBufferingemini_llm_connection.tsadapts thecallback model into a back-pressured async generator so
BaseLlmConnection.receive()keeps its existing contract.Function response routing. Python pushes function responses back
through
LiveRequestQueueso the user-owned send loop can fan them outto active streaming tools. In TS, function responses are sent directly
via
connection.sendContentfrom the receive loop. This avoids a racein which the user-owned queue is closed but the open websocket still
needs to ferry a tool result to the model. Active streaming tool
fan-out can be added later without changing the public API.
Files touched:
core/src/runner/runner.ts— addsRunner.runLive.core/src/agents/base_agent.ts—runLive()mirrorsrunAsync(callbacks, abort, otel) and dispatches to
runLiveImpl.core/src/agents/llm_agent.ts— implementsrunLiveFlow,runLivePreprocess,runSendLoop,dispatchLiveRequest,runReceiveLoop,postprocessLive,isUserAuthoredResponse.core/src/models/base_llm_connection.ts— adds optionalsendActivityStart/sendActivityEnd.core/src/models/gemini_llm_connection.ts— fullreceive()implementation; bridges
Sessioncallbacks viaIncomingMessageBuffer;Gemini 3.1-compatible
sendRealtimemime routing; single-text-partuser content over
sendRealtimeInput.text;receiveno longerbreaks on
turnComplete.core/src/models/google_llm.ts—Gemini.connectwiresonmessage/onerror/oncloseinto the buffer.core/test/runner/run_live_test.ts— new unit tests forrunLive.core/test/models/gemini_llm_connection_test.ts— new unit tests forGeminiLlmConnection.@ScottMansfield