Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/user-turn-exceeded-hook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Support `onUserTurnExceeded` as an `Agent.create()` / `AgentTask.create()` hook, in addition to the existing subclass override. The callback is now gated on scheduling-paused / new-turns-blocked (start guard plus a post-wait re-check) to match the Python reference, so it is skipped during agent handoffs.
69 changes: 69 additions & 0 deletions agents/src/voice/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { initializeLogger } from '../log.js';
import { Task } from '../utils.js';
import { Agent, AgentTask, _setActivityTaskInfo } from './agent.js';
import { AgentActivity, agentActivityStorage } from './agent_activity.js';
import { createUserTurnExceededEvent } from './events.js';
import { defaultEndpointingOptions } from './turn_config/endpointing.js';
import { defaultInterruptionOptions } from './turn_config/interruption.js';

Expand Down Expand Up @@ -146,6 +147,52 @@ describe('Agent', () => {
expect(calls).toEqual(['enter', 'exit', 'turn']);
});

it('routes onUserTurnExceeded to the provided hook', async () => {
const received: Array<{ transcript: string; accumulatedWordCount: number }> = [];
const agent = Agent.create({
instructions: 'factory instructions',
onUserTurnExceeded: (ctx, ev) => {
expect(ctx.agent).toBe(agent);
received.push({
transcript: ev.transcript,
accumulatedWordCount: ev.accumulatedWordCount,
});
},
});

const ev = createUserTurnExceededEvent({
transcript: 'one two three four five',
accumulatedTranscript: 'one two three four five',
accumulatedWordCount: 5,
duration: 1000,
});

await agent.onUserTurnExceeded(ev);

expect(received).toEqual([
{ transcript: 'one two three four five', accumulatedWordCount: 5 },
]);
});

it('falls back to the base onUserTurnExceeded when no hook is provided', async () => {
const agent = Agent.create({ instructions: 'factory instructions' });
const fallback = vi.spyOn(Agent.prototype, 'onUserTurnExceeded').mockResolvedValue(undefined);

const ev = createUserTurnExceededEvent({
transcript: 'hello world',
accumulatedTranscript: 'hello world',
accumulatedWordCount: 2,
duration: 500,
});

try {
await agent.onUserTurnExceeded(ev);
expect(fallback).toHaveBeenCalledWith(ev);
} finally {
fallback.mockRestore();
}
});

it('adapts stream node hooks between ReadableStream and AsyncIterable', async () => {
const audioFrame = 'audio' as unknown as AudioFrame;
const agent = Agent.create({
Expand Down Expand Up @@ -448,6 +495,28 @@ describe('Agent', () => {
await expect(wrapper.result).resolves.toBe('ok');
});

it('routes onUserTurnExceeded to the provided hook', async () => {
const received: string[] = [];
const task = AgentTask.create<string>({
instructions: 'factory task',
onUserTurnExceeded: (ctx, ev) => {
expect(ctx.agent).toBe(task);
received.push(ev.transcript);
},
});

const ev = createUserTurnExceededEvent({
transcript: 'too many words here',
accumulatedTranscript: 'too many words here',
accumulatedWordCount: 4,
duration: 800,
});

await task.onUserTurnExceeded(ev);

expect(received).toEqual(['too many words here']);
});

it('adapts stream node hooks between ReadableStream and AsyncIterable', async () => {
const audioFrame = 'audio' as unknown as AudioFrame;
const task = AgentTask.create<string>({
Expand Down
14 changes: 14 additions & 0 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,14 @@ export class AgentActivity implements RecognitionHooks {
}

onUserTurnExceeded(ev: UserTurnExceededEvent): void {
if (this.schedulingPaused || this.newTurnsBlocked) {
this.logger.warn(
{ numWords: ev.accumulatedWordCount, duration: ev.duration },
'skipping user turn exceeded, speech scheduling is paused',
);
return;
}

if (this.userTurnExceededLocked) {
return;
}
Expand Down Expand Up @@ -1689,6 +1697,12 @@ export class AgentActivity implements RecognitionHooks {
}
}

// re-check after the wait phase: if a handoff started in the meantime,
// don't fire the callback on this now-stale activity.
if (this.schedulingPaused || this.newTurnsBlocked) {
return;
}

this.logger.debug(
{ numWords: ev.accumulatedWordCount, duration: ev.duration },
'user turn limit exceeded',
Expand Down
91 changes: 79 additions & 12 deletions agents/src/voice/agent_activity_handoff.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,19 @@ describe('AgentActivity blockNewTurns (handoff transition)', () => {
expect(activity.createSpeechTask).toHaveBeenCalledTimes(1);
});

// The bot's port wrongly gated the user-turn-exceeded callback on newTurnsBlocked;
// Python never does. onUserTurnExceeded must stay independent of the handoff flag.
it('onUserTurnExceeded is independent of newTurnsBlocked', () => {
const exceededEvent: UserTurnExceededEvent = {
type: 'user_turn_exceeded',
transcript: 'hi',
accumulatedTranscript: 'hi',
accumulatedWordCount: 10,
duration: 5000,
createdAt: Date.now(),
};

// Mirrors Python's test_skipped_when_new_turns_blocked: while new turns are blocked
// (handoff transition window), onUserTurnExceeded must not schedule a callback on the
// outgoing activity — otherwise the old agent responds and delays the handoff.
it('onUserTurnExceeded skips scheduling while new turns are blocked', () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
Expand All @@ -373,17 +383,74 @@ describe('AgentActivity blockNewTurns (handoff transition)', () => {

(activity as AgentActivity).blockNewTurns();

const ev: UserTurnExceededEvent = {
type: 'user_turn_exceeded',
transcript: 'hi',
accumulatedTranscript: 'hi',
accumulatedWordCount: 10,
duration: 5000,
createdAt: Date.now(),
(activity as AgentActivity).onUserTurnExceeded(exceededEvent);

expect(activity.createSpeechTask).not.toHaveBeenCalled();
expect(activity.userTurnExceededTask).toBeUndefined();
expect(activity.logger.warn).toHaveBeenCalledTimes(1);
});

// The start guard also covers schedulingPaused (drain already started).
it('onUserTurnExceeded skips scheduling while scheduling is paused', () => {
const activity = createBareActivity();
activity._schedulingPaused = true;
activity.newTurnsBlocked = false;
activity.userTurnExceededLocked = false;
activity.userTurnExceededTask = undefined;

(activity as AgentActivity).onUserTurnExceeded(exceededEvent);

expect(activity.createSpeechTask).not.toHaveBeenCalled();
expect(activity.userTurnExceededTask).toBeUndefined();
});

// Mirrors Python's test_inflight_task_aborts_when_handoff_starts: if the task is in its
// wait phase when a handoff flips newTurnsBlocked, it must self-abort at the post-wait
// re-check before invoking the user callback.
it('runUserTurnExceededTask aborts before the callback when a handoff starts mid-wait', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity.userTurnExceededLocked = false;
const onUserTurnExceeded = vi.fn(async () => {});
activity.agent = { onUserTurnExceeded };
activity.agentSession = {
agentState: 'listening',
on: vi.fn(),
off: vi.fn(),
};
(activity as AgentActivity).onUserTurnExceeded(ev);
// wait phase resolves immediately; flip the handoff flag so the post-wait re-check trips.
activity.waitForInactive = vi.fn(async () => {
activity.newTurnsBlocked = true;
});

expect(activity.createSpeechTask).toHaveBeenCalledTimes(1);
const controller = new AbortController();
await (activity as any).runUserTurnExceededTask(exceededEvent, controller.signal);

expect(onUserTurnExceeded).not.toHaveBeenCalled();
});

// Positive control: when nothing is blocked, the user callback is invoked.
it('runUserTurnExceededTask invokes the callback when not blocked', async () => {
const activity = createBareActivity();
activity._schedulingPaused = false;
activity.newTurnsBlocked = false;
activity.userTurnExceededLocked = false;
activity.userTurnExceededTask = undefined;
const onUserTurnExceeded = vi.fn(async () => {});
activity.agent = { onUserTurnExceeded };
activity.agentSession = {
agentState: 'listening',
on: vi.fn(),
off: vi.fn(),
};
activity.waitForInactive = vi.fn(async () => {});

const controller = new AbortController();
await (activity as any).runUserTurnExceededTask(exceededEvent, controller.signal);

expect(onUserTurnExceeded).toHaveBeenCalledTimes(1);
expect(onUserTurnExceeded).toHaveBeenCalledWith(exceededEvent);
});

// When new turns are blocked before the turn completes, the reply must be skipped
Expand Down
26 changes: 26 additions & 0 deletions agents/src/voice/agent_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { readStream, toStream } from '../utils.js';
import type { VAD } from '../vad.js';
import type { Agent, AgentOptions, AgentTask, AgentTaskOptions, ModelSettings } from './agent.js';
import type { AgentSession } from './agent_session.js';
import type { UserTurnExceededEvent } from './events.js';
import type { TurnHandlingOptions } from './turn_config/turn_handling.js';

/** Context passed to hooks created with `Agent.create()`. */
Expand Down Expand Up @@ -68,6 +69,8 @@ export interface AgentHooks<
chatCtx: ChatContext,
newMessage: ChatMessage,
) => Promise<void> | void;
/** Called when the user turn has exceeded the configured user turn limit. */
onUserTurnExceeded?: (ctx: ContextT, ev: UserTurnExceededEvent) => Promise<void> | void;
/** Transforms incoming audio into speech events or transcript text for the agent. */
sttNode?: (
ctx: ContextT,
Expand Down Expand Up @@ -130,6 +133,7 @@ export function createAgentV2<UserData>(
onEnter,
onExit,
onUserTurnCompleted,
onUserTurnExceeded,
sttNode,
llmNode,
ttsNode,
Expand All @@ -146,6 +150,7 @@ export function createAgentV2<UserData>(
onEnter,
onExit,
onUserTurnCompleted,
onUserTurnExceeded,
sttNode,
llmNode,
ttsNode,
Expand All @@ -172,6 +177,10 @@ export function createAgentV2<UserData>(
);
}

override async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise<void> {
return this.hookAdapter.onUserTurnExceeded(ev, () => super.onUserTurnExceeded(ev));
}

override async sttNode(
audio: ReadableStream<AudioFrame>,
modelSettings: ModelSettings,
Expand Down Expand Up @@ -224,6 +233,7 @@ export function createAgentTaskV2<ResultT, UserData>(
onEnter,
onExit,
onUserTurnCompleted,
onUserTurnExceeded,
sttNode,
llmNode,
ttsNode,
Expand All @@ -240,6 +250,7 @@ export function createAgentTaskV2<ResultT, UserData>(
onEnter,
onExit,
onUserTurnCompleted,
onUserTurnExceeded,
sttNode,
llmNode,
ttsNode,
Expand All @@ -266,6 +277,10 @@ export function createAgentTaskV2<ResultT, UserData>(
);
}

override async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise<void> {
return this.hookAdapter.onUserTurnExceeded(ev, () => super.onUserTurnExceeded(ev));
}

override async sttNode(
audio: ReadableStream<AudioFrame>,
modelSettings: ModelSettings,
Expand Down Expand Up @@ -341,6 +356,17 @@ class AgentHookAdapter<UserData, ContextT extends AgentContext<UserData>> {
return this.hooks.onUserTurnCompleted(this.context, chatCtx, newMessage);
}

async onUserTurnExceeded(
ev: UserTurnExceededEvent,
fallback: () => Promise<void>,
): Promise<void> {
if (!this.hooks.onUserTurnExceeded) {
return fallback();
}

return this.hooks.onUserTurnExceeded(this.context, ev);
}

async sttNode(
audio: ReadableStream<AudioFrame>,
modelSettings: ModelSettings,
Expand Down