Skip to content

feat: add bidirectional live streaming to Runner and LlmAgent#325

Open
tSte wants to merge 4 commits into
google:mainfrom
tSte:feat/runner-run-live-bidi
Open

feat: add bidirectional live streaming to Runner and LlmAgent#325
tSte wants to merge 4 commits into
google:mainfrom
tSte:feat/runner-run-live-bidi

Conversation

@tSte
Copy link
Copy Markdown

@tSte tSte commented May 5, 2026

Please ensure you have read the contribution guide before creating a pull request.

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

2. Or, if no issue exists, describe the change:

Problem:

adk-js currently lacks real-time bidirectional audio/video streaming, while
adk-python exposes the equivalent feature via Runner.run_live. The TS
codebase has had // TODO - b/425992518: Implement runLive and related methods. in runner.ts since the initial release, BaseAgent.runLive
throws Live mode is not implemented yet., LlmAgent.runLiveFlow throws
LlmAgent.runLiveFlow not implemented, and GeminiLlmConnection.receive
throws Not Implemented.. The RunConfig interface already exposes
responseModalities and speechConfig, so the configuration layer existed
— only the runLive / LiveRequestQueue plumbing was missing.

Solution:

Implement Runner.runLive end-to-end with the same surface area as the
Python counterpart, including the supporting LlmAgent, BaseAgent, and
Gemini live connection plumbing.

  • Runner.runLive drives a live invocation via a LiveRequestQueue,
    defaults responseModalities to AUDIO, auto-enables transcription
    configs for multi-agent setups, and runs the plugin lifecycle
    (beforeRun / onEvent / afterRun).
  • LlmAgent.runLiveFlow runs the same preprocessors as runAsync, opens
    llm.connect, drains the live request queue on a parallel send loop,
    yields events from the receive loop, ferries function responses back
    to the open websocket, and recurses on transfer_to_agent.
  • GeminiLlmConnection bridges the GenAI Session callbacks into an
    AsyncGenerator, aggregating text, transcriptions, tool calls, session
    resumption updates, and turn-complete / interruption signals.
  • GeminiLlmConnection.sendRealtime routes by mime type — audio/*
    sendRealtimeInput.audio, image/*sendRealtimeInput.video,
    otherwise the existing media path. sendContent forwards a single
    user text part via sendRealtimeInput.text so it interleaves with the
    audio stream. Both are required for the Gemini 3.1 realtime preview,
    which ignores sendClientContent text turns and rejects audio sent
    on the legacy media channel.
  • GeminiLlmConnection.receive no longer terminates on turnComplete.
    A single receive() call now spans an entire conversation: it
    surfaces turnComplete as an in-stream signal and keeps iterating
    until the websocket closes or the consumer closes the connection.
  • Live model audio events with raw inlineData are yielded but skipped
    when persisting (matching Python). Transcription, tool, and usage
    events are persisted as in runAsync.

Testing Plan

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.
$ npm run test:unit
Test Files  115 passed | 2 failed (117)
Tests       1274 passed | 17 failed (1291)

All 18 unit tests added by this PR pass. The 17 failing tests are
pre-existing in core/test/sessions/database_session_service_test.ts
and core/test/sessions/db/operations_test.ts; they fail because the
local sandbox is missing a built sqlite3 native binding and are
unrelated to this change. The same suite is green on machines where
the binding is built.

core/test/runner/run_live_test.ts adds six tests:

  1. runLive throws when liveRequestQueue is missing.
  2. runLive throws when the session does not exist.
  3. Realtime blobs from the queue are forwarded to the connection and
    model events (audio, text, turnComplete) are yielded back.
  4. responseModalities defaults to AUDIO and is propagated into
    liveConnectConfig.
  5. Live model audio events with inlineData are NOT persisted to the
    session, while transcription events ARE persisted.
  6. Tool function calls round-trip: the tool runs and the function
    response is sent back to the model over the open connection.

core/test/models/gemini_llm_connection_test.ts adds twelve tests:

  1. sendRealtime routes audio/* blobs through sendRealtimeInput.audio.
  2. sendRealtime routes image/* blobs through sendRealtimeInput.video.
  3. sendRealtime falls back to sendRealtimeInput.media for other
    mime types.
  4. sendRealtime falls back to media when mime type is missing.
  5. sendContent sends a single user text part via
    sendRealtimeInput.text.
  6. sendContent uses sendClientContent for multi-part user content.
  7. sendContent uses sendClientContent for non-user roles.
  8. sendContent routes function responses through sendToolResponse.
  9. sendContent throws when content has no parts.
  10. receive keeps iterating after turnComplete and surfaces events
    for subsequent turns.
  11. receive terminates cleanly on close.
  12. receive rethrows on websocket error.

The full integration suite also passes:

$ npm run test:integration
Test Files  23 passed (23)
Tests       75 passed | 8 skipped (83)

npm run lint, npm run format:check, and npm run docs:check are clean.

Manual End-to-End (E2E) Tests:

The reproducer below mirrors the Python
bidi-demo
sample but uses the Gemini 3.1 realtime preview model. It confirms that:

  • Audio chunks pushed onto LiveRequestQueue reach the live model.
  • Audio events from the model are yielded back out of runLive.
  • Transcriptions, tool calls, and turnComplete flow through.
import {
  InMemorySessionService,
  LiveRequestQueue,
  LlmAgent,
  Runner,
} from '@google/adk';
import {Modality} from '@google/genai';
import {readFile} from 'node:fs/promises';

const agent = new LlmAgent({
  name: 'voice_assistant',
  model: 'gemini-3.1-realtime-preview',
  instruction: 'You are a concise voice assistant.',
});

const sessionService = new InMemorySessionService();
const session = await sessionService.createSession({
  appName: 'voice_demo',
  userId: 'user-1',
});

const runner = new Runner({
  appName: 'voice_demo',
  agent,
  sessionService,
});

const queue = new LiveRequestQueue();

// Push 16kHz mono PCM frames captured from a mic (or replay a wav file).
const pcm = await readFile('./hello.pcm');
const frame = 32 * 1024;
for (let i = 0; i < pcm.length; i += frame) {
  queue.sendRealtime({
    data: pcm.subarray(i, i + frame).toString('base64'),
    mimeType: 'audio/pcm;rate=16000',
  });
}
queue.sendActivityEnd();

for await (const event of runner.runLive({
  userId: 'user-1',
  sessionId: session.id,
  liveRequestQueue: queue,
  runConfig: {
    responseModalities: [Modality.AUDIO],
    outputAudioTranscription: {},
    inputAudioTranscription: {},
  },
})) {
  if (event.outputTranscription?.text) {
    console.log('model:', event.outputTranscription.text);
  }
  // Audio frames arrive as event.content.parts[].inlineData (audio/pcm).
  // Pipe them to a speaker / file as desired.
  if (event.turnComplete) break;
}

queue.close();

Run with GOOGLE_GENAI_API_KEY (or Vertex equivalents) set.

Reviewers can also exercise this by porting the Python bidi-demo agent
script to TS — the public Runner.runLive signature is intentionally
compatible.

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end. I used Gemini 3.1 realtime for testing, using audio
  • Any dependent changes have been merged and published in downstream modules.

Additional context

This brings TS ADK to parity with adk-python for bidi live streaming.
The implementation closely follows the Python design but diverges in two
places that matter for reviewers:

  • GenAI SDK bridging. The Python genai SDK exposes
    session.receive() as an async iterator. The JS SDK only provides
    callbacks (onmessage / onerror / onclose). The new
    IncomingMessageBuffer in gemini_llm_connection.ts adapts the
    callback model into a back-pressured async generator so
    BaseLlmConnection.receive() keeps its existing contract.

  • Function response routing. Python pushes function responses back
    through LiveRequestQueue so the user-owned send loop can fan them out
    to active streaming tools. In TS, function responses are sent directly
    via connection.sendContent from the receive loop. This avoids a race
    in which the user-owned queue is closed but the open websocket still
    needs to ferry a tool result to the model. Active streaming tool
    fan-out can be added later without changing the public API.

Files touched:

  • core/src/runner/runner.ts — adds Runner.runLive.
  • core/src/agents/base_agent.tsrunLive() mirrors runAsync
    (callbacks, abort, otel) and dispatches to runLiveImpl.
  • core/src/agents/llm_agent.ts — implements runLiveFlow,
    runLivePreprocess, runSendLoop, dispatchLiveRequest,
    runReceiveLoop, postprocessLive, isUserAuthoredResponse.
  • core/src/models/base_llm_connection.ts — adds optional
    sendActivityStart / sendActivityEnd.
  • core/src/models/gemini_llm_connection.ts — full receive()
    implementation; bridges Session callbacks via IncomingMessageBuffer;
    Gemini 3.1-compatible sendRealtime mime routing; single-text-part
    user content over sendRealtimeInput.text; receive no longer
    breaks on turnComplete.
  • core/src/models/google_llm.tsGemini.connect wires
    onmessage / onerror / onclose into the buffer.
  • core/test/runner/run_live_test.ts — new unit tests for runLive.
  • core/test/models/gemini_llm_connection_test.ts — new unit tests for
    GeminiLlmConnection.

@ScottMansfield

tSte added 2 commits May 5, 2026 15:22
Implements Runner.runLive plus the underlying LlmAgent and connection
plumbing so audio/video bidi sessions work end-to-end, bringing parity
with adk-python's run_live.

- Runner.runLive drives a live invocation via a LiveRequestQueue,
  defaults responseModalities to AUDIO, auto-enables transcription
  configs for multi-agent setups, and runs the plugin lifecycle.
- LlmAgent.runLiveFlow runs the same preprocessors as runAsync, opens
  llm.connect, drains the live request queue on a parallel send loop,
  yields events from the receive loop, ferries function responses back
  to the open websocket, and recurses on transfer_to_agent.
- GeminiLlmConnection bridges the GenAI Session callbacks into an
  AsyncGenerator, aggregates text, transcriptions, tool calls, session
  resumption updates, turn-complete and interruption signals.
- BaseLlmConnection gains optional sendActivityStart/sendActivityEnd
  for manual activity boundary signalling.
- BaseAgent.runLive now mirrors runAsync (before/after callbacks,
  abort handling, otel span) and dispatches to runLiveImpl.
- Live model audio events with inline data are yielded but not
  persisted to the session to avoid storing raw blobs; transcription,
  tool, and usage events are persisted as in runAsync.
- Adds 6 unit tests covering missing queue, missing session, blob
  forwarding, default modalities, audio-blob persistence skip, and
  function-call round-tripping.
Required for Gemini 3.1 realtime preview, which ignores
sendClientContent text turns and expects audio/video routed via
sendRealtimeInput.audio / sendRealtimeInput.video instead of media.

- sendRealtime now branches on blob.mimeType: audio/* uses
  {audio: blob}, image/* uses {video: blob}, otherwise falls back
  to the existing {media: blob} path.
- sendContent forwards a single user text part via
  sendRealtimeInput.text so it interleaves with the audio stream.
  Multi-part user content and non-user content keep using
  sendClientContent.
- receive no longer breaks after turnComplete. The same call now
  spans an entire conversation, surfacing turnComplete as an
  in-stream signal and continuing until the websocket closes
  (kind: 'close') or the consumer closes the connection.

Adds 12 unit tests in core/test/models/gemini_llm_connection_test.ts
covering sendRealtime mime routing, sendContent text/multi-part/
function-response paths, and the multi-turn receive() flow including
close and error termination.
@google-cla
Copy link
Copy Markdown

google-cla Bot commented May 5, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for real-time bidirectional audio streaming

1 participant