diff --git a/.changeset/user-turn-exceeded-hook.md b/.changeset/user-turn-exceeded-hook.md new file mode 100644 index 000000000..51e9078f3 --- /dev/null +++ b/.changeset/user-turn-exceeded-hook.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Support `onUserTurnExceeded` as an `Agent.create()` / `AgentTask.create()` hook, in addition to the existing subclass override. The callback is now gated on scheduling-paused / new-turns-blocked (start guard plus a post-wait re-check) to match the Python reference, so it is skipped during agent handoffs. diff --git a/agents/src/voice/agent.test.ts b/agents/src/voice/agent.test.ts index e644ca043..5c3b245f6 100644 --- a/agents/src/voice/agent.test.ts +++ b/agents/src/voice/agent.test.ts @@ -10,6 +10,7 @@ import { initializeLogger } from '../log.js'; import { Task } from '../utils.js'; import { Agent, AgentTask, _setActivityTaskInfo } from './agent.js'; import { AgentActivity, agentActivityStorage } from './agent_activity.js'; +import { createUserTurnExceededEvent } from './events.js'; import { defaultEndpointingOptions } from './turn_config/endpointing.js'; import { defaultInterruptionOptions } from './turn_config/interruption.js'; @@ -146,6 +147,52 @@ describe('Agent', () => { expect(calls).toEqual(['enter', 'exit', 'turn']); }); + it('routes onUserTurnExceeded to the provided hook', async () => { + const received: Array<{ transcript: string; accumulatedWordCount: number }> = []; + const agent = Agent.create({ + instructions: 'factory instructions', + onUserTurnExceeded: (ctx, ev) => { + expect(ctx.agent).toBe(agent); + received.push({ + transcript: ev.transcript, + accumulatedWordCount: ev.accumulatedWordCount, + }); + }, + }); + + const ev = createUserTurnExceededEvent({ + transcript: 'one two three four five', + accumulatedTranscript: 'one two three four five', + accumulatedWordCount: 5, + duration: 1000, + }); + + await agent.onUserTurnExceeded(ev); + + expect(received).toEqual([ + { transcript: 'one two three four five', accumulatedWordCount: 5 }, + ]); + }); + + it('falls back to the base onUserTurnExceeded when no hook is provided', async () => { + const agent = Agent.create({ instructions: 'factory instructions' }); + const fallback = vi.spyOn(Agent.prototype, 'onUserTurnExceeded').mockResolvedValue(undefined); + + const ev = createUserTurnExceededEvent({ + transcript: 'hello world', + accumulatedTranscript: 'hello world', + accumulatedWordCount: 2, + duration: 500, + }); + + try { + await agent.onUserTurnExceeded(ev); + expect(fallback).toHaveBeenCalledWith(ev); + } finally { + fallback.mockRestore(); + } + }); + it('adapts stream node hooks between ReadableStream and AsyncIterable', async () => { const audioFrame = 'audio' as unknown as AudioFrame; const agent = Agent.create({ @@ -448,6 +495,28 @@ describe('Agent', () => { await expect(wrapper.result).resolves.toBe('ok'); }); + it('routes onUserTurnExceeded to the provided hook', async () => { + const received: string[] = []; + const task = AgentTask.create({ + instructions: 'factory task', + onUserTurnExceeded: (ctx, ev) => { + expect(ctx.agent).toBe(task); + received.push(ev.transcript); + }, + }); + + const ev = createUserTurnExceededEvent({ + transcript: 'too many words here', + accumulatedTranscript: 'too many words here', + accumulatedWordCount: 4, + duration: 800, + }); + + await task.onUserTurnExceeded(ev); + + expect(received).toEqual(['too many words here']); + }); + it('adapts stream node hooks between ReadableStream and AsyncIterable', async () => { const audioFrame = 'audio' as unknown as AudioFrame; const task = AgentTask.create({ diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 16184861a..eb82437e0 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1626,6 +1626,14 @@ export class AgentActivity implements RecognitionHooks { } onUserTurnExceeded(ev: UserTurnExceededEvent): void { + if (this.schedulingPaused || this.newTurnsBlocked) { + this.logger.warn( + { numWords: ev.accumulatedWordCount, duration: ev.duration }, + 'skipping user turn exceeded, speech scheduling is paused', + ); + return; + } + if (this.userTurnExceededLocked) { return; } @@ -1689,6 +1697,12 @@ export class AgentActivity implements RecognitionHooks { } } + // re-check after the wait phase: if a handoff started in the meantime, + // don't fire the callback on this now-stale activity. + if (this.schedulingPaused || this.newTurnsBlocked) { + return; + } + this.logger.debug( { numWords: ev.accumulatedWordCount, duration: ev.duration }, 'user turn limit exceeded', diff --git a/agents/src/voice/agent_activity_handoff.test.ts b/agents/src/voice/agent_activity_handoff.test.ts index 0bd985d62..48cf83802 100644 --- a/agents/src/voice/agent_activity_handoff.test.ts +++ b/agents/src/voice/agent_activity_handoff.test.ts @@ -362,9 +362,19 @@ describe('AgentActivity blockNewTurns (handoff transition)', () => { expect(activity.createSpeechTask).toHaveBeenCalledTimes(1); }); - // The bot's port wrongly gated the user-turn-exceeded callback on newTurnsBlocked; - // Python never does. onUserTurnExceeded must stay independent of the handoff flag. - it('onUserTurnExceeded is independent of newTurnsBlocked', () => { + const exceededEvent: UserTurnExceededEvent = { + type: 'user_turn_exceeded', + transcript: 'hi', + accumulatedTranscript: 'hi', + accumulatedWordCount: 10, + duration: 5000, + createdAt: Date.now(), + }; + + // Mirrors Python's test_skipped_when_new_turns_blocked: while new turns are blocked + // (handoff transition window), onUserTurnExceeded must not schedule a callback on the + // outgoing activity — otherwise the old agent responds and delays the handoff. + it('onUserTurnExceeded skips scheduling while new turns are blocked', () => { const activity = createBareActivity(); activity._schedulingPaused = false; activity.newTurnsBlocked = false; @@ -373,17 +383,74 @@ describe('AgentActivity blockNewTurns (handoff transition)', () => { (activity as AgentActivity).blockNewTurns(); - const ev: UserTurnExceededEvent = { - type: 'user_turn_exceeded', - transcript: 'hi', - accumulatedTranscript: 'hi', - accumulatedWordCount: 10, - duration: 5000, - createdAt: Date.now(), + (activity as AgentActivity).onUserTurnExceeded(exceededEvent); + + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + expect(activity.userTurnExceededTask).toBeUndefined(); + expect(activity.logger.warn).toHaveBeenCalledTimes(1); + }); + + // The start guard also covers schedulingPaused (drain already started). + it('onUserTurnExceeded skips scheduling while scheduling is paused', () => { + const activity = createBareActivity(); + activity._schedulingPaused = true; + activity.newTurnsBlocked = false; + activity.userTurnExceededLocked = false; + activity.userTurnExceededTask = undefined; + + (activity as AgentActivity).onUserTurnExceeded(exceededEvent); + + expect(activity.createSpeechTask).not.toHaveBeenCalled(); + expect(activity.userTurnExceededTask).toBeUndefined(); + }); + + // Mirrors Python's test_inflight_task_aborts_when_handoff_starts: if the task is in its + // wait phase when a handoff flips newTurnsBlocked, it must self-abort at the post-wait + // re-check before invoking the user callback. + it('runUserTurnExceededTask aborts before the callback when a handoff starts mid-wait', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity.userTurnExceededLocked = false; + const onUserTurnExceeded = vi.fn(async () => {}); + activity.agent = { onUserTurnExceeded }; + activity.agentSession = { + agentState: 'listening', + on: vi.fn(), + off: vi.fn(), }; - (activity as AgentActivity).onUserTurnExceeded(ev); + // wait phase resolves immediately; flip the handoff flag so the post-wait re-check trips. + activity.waitForInactive = vi.fn(async () => { + activity.newTurnsBlocked = true; + }); - expect(activity.createSpeechTask).toHaveBeenCalledTimes(1); + const controller = new AbortController(); + await (activity as any).runUserTurnExceededTask(exceededEvent, controller.signal); + + expect(onUserTurnExceeded).not.toHaveBeenCalled(); + }); + + // Positive control: when nothing is blocked, the user callback is invoked. + it('runUserTurnExceededTask invokes the callback when not blocked', async () => { + const activity = createBareActivity(); + activity._schedulingPaused = false; + activity.newTurnsBlocked = false; + activity.userTurnExceededLocked = false; + activity.userTurnExceededTask = undefined; + const onUserTurnExceeded = vi.fn(async () => {}); + activity.agent = { onUserTurnExceeded }; + activity.agentSession = { + agentState: 'listening', + on: vi.fn(), + off: vi.fn(), + }; + activity.waitForInactive = vi.fn(async () => {}); + + const controller = new AbortController(); + await (activity as any).runUserTurnExceededTask(exceededEvent, controller.signal); + + expect(onUserTurnExceeded).toHaveBeenCalledTimes(1); + expect(onUserTurnExceeded).toHaveBeenCalledWith(exceededEvent); }); // When new turns are blocked before the turn completes, the reply must be skipped diff --git a/agents/src/voice/agent_v2.ts b/agents/src/voice/agent_v2.ts index 2f00402ce..26ab13b85 100644 --- a/agents/src/voice/agent_v2.ts +++ b/agents/src/voice/agent_v2.ts @@ -19,6 +19,7 @@ import { readStream, toStream } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent, AgentOptions, AgentTask, AgentTaskOptions, ModelSettings } from './agent.js'; import type { AgentSession } from './agent_session.js'; +import type { UserTurnExceededEvent } from './events.js'; import type { TurnHandlingOptions } from './turn_config/turn_handling.js'; /** Context passed to hooks created with `Agent.create()`. */ @@ -68,6 +69,8 @@ export interface AgentHooks< chatCtx: ChatContext, newMessage: ChatMessage, ) => Promise | void; + /** Called when the user turn has exceeded the configured user turn limit. */ + onUserTurnExceeded?: (ctx: ContextT, ev: UserTurnExceededEvent) => Promise | void; /** Transforms incoming audio into speech events or transcript text for the agent. */ sttNode?: ( ctx: ContextT, @@ -130,6 +133,7 @@ export function createAgentV2( onEnter, onExit, onUserTurnCompleted, + onUserTurnExceeded, sttNode, llmNode, ttsNode, @@ -146,6 +150,7 @@ export function createAgentV2( onEnter, onExit, onUserTurnCompleted, + onUserTurnExceeded, sttNode, llmNode, ttsNode, @@ -172,6 +177,10 @@ export function createAgentV2( ); } + override async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise { + return this.hookAdapter.onUserTurnExceeded(ev, () => super.onUserTurnExceeded(ev)); + } + override async sttNode( audio: ReadableStream, modelSettings: ModelSettings, @@ -224,6 +233,7 @@ export function createAgentTaskV2( onEnter, onExit, onUserTurnCompleted, + onUserTurnExceeded, sttNode, llmNode, ttsNode, @@ -240,6 +250,7 @@ export function createAgentTaskV2( onEnter, onExit, onUserTurnCompleted, + onUserTurnExceeded, sttNode, llmNode, ttsNode, @@ -266,6 +277,10 @@ export function createAgentTaskV2( ); } + override async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise { + return this.hookAdapter.onUserTurnExceeded(ev, () => super.onUserTurnExceeded(ev)); + } + override async sttNode( audio: ReadableStream, modelSettings: ModelSettings, @@ -341,6 +356,17 @@ class AgentHookAdapter> { return this.hooks.onUserTurnCompleted(this.context, chatCtx, newMessage); } + async onUserTurnExceeded( + ev: UserTurnExceededEvent, + fallback: () => Promise, + ): Promise { + if (!this.hooks.onUserTurnExceeded) { + return fallback(); + } + + return this.hooks.onUserTurnExceeded(this.context, ev); + } + async sttNode( audio: ReadableStream, modelSettings: ModelSettings,