fix(workflows): normalize two-digit years in GetDOBTask#1837
fix(workflows): normalize two-digit years in GetDOBTask#1837rosetta-livekit-bot[bot] wants to merge 31 commits into
Conversation
…1525) Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Co-authored-by: u9g <jason.lernerman@livekit.io>
Agent.llmNode now returns ReadableStream<ChatChunk | string | FlushSentinel>, but the agent_v2 hook overrides and AgentHookAdapter still declared the narrower ChatChunk | string union, so passing super.llmNode as the fallback failed to type-check. Widen the override return types and the adapter's fallback/return signatures to include FlushSentinel. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Brian Yin <brian.yin@livekit.io> Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Co-authored-by: u9g <jason.lernerman@livekit.io>
Catch end-call close listener errors to avoid unhandled rejections during shutdown, and make public tool type guards return false for null inputs.
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
…egment (#1760) Co-authored-by: Cursor <cursoragent@cursor.com>
…#1698) Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Co-authored-by: Toubat <brian.yin@livekit.io>
# Conflicts: # agents/src/voice/agent_activity.test.ts # agents/src/voice/generation.ts # agents/src/voice/generation_tts_timeout.test.ts
The test asserted an exact tick count ([0,1,2]) against real timers with a 5ms margin, which flakes on loaded CI runners. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: rosetta-livekit-bot[bot] <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com>
Co-authored-by: Long Chen <longch1024@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: u9g <jason.lernerman@livekit.io>
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: u9g <jason.lernerman@livekit.io>
🦋 Changeset detectedLatest commit: 2a64eae The changes in this PR will be included in the next version bump. This PR includes changesets to release 35 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| private async deliverReply(session: AgentSession): Promise<void> { | ||
| try { | ||
| if (this.owningActivity) { | ||
| await this.owningActivity.waitForIdle(); | ||
| } else if ('waitForIdle' in session && typeof session.waitForIdle === 'function') { | ||
| await session.waitForIdle(); | ||
| } | ||
|
|
||
| const updates = [...this.pendingUpdates]; | ||
| this.pendingUpdates = []; | ||
| const pendingItems = updates.flatMap((update) => update.items); | ||
| if (pendingItems.length === 0) return; | ||
|
|
||
| const targetAgent = this.owningActivity?.agent ?? getCurrentAgent(session); | ||
|
|
||
| const itemsToInsert = updates | ||
| .filter((update) => update.target !== targetAgent) | ||
| .flatMap((update) => update.items); | ||
|
|
||
| let chatCtx: ChatContext | undefined; | ||
| if (itemsToInsert.length > 0) { | ||
| chatCtx = targetAgent.chatCtx.copy(); | ||
| chatCtx.insert(itemsToInsert); | ||
| } | ||
|
|
||
| const lastItem = pendingItems[pendingItems.length - 1]!; | ||
| const targetItems = targetAgent.chatCtx.items; | ||
| const atTail = | ||
| targetItems.length > 0 && targetItems[targetItems.length - 1]!.id === lastItem.id; | ||
| const callIds = pendingItems | ||
| .filter((item): item is FunctionCallOutput => item.type === 'function_call_output') | ||
| .map((item) => item.callId); | ||
| const instructions = renderTemplate( | ||
| atTail ? this.toolOptions.replyAtTailTemplate : this.toolOptions.replyMaybeCoveredTemplate, | ||
| { callIds }, | ||
| ); | ||
|
|
||
| session.generateReply({ | ||
| instructions, | ||
| toolChoice: 'none', | ||
| chatCtx, | ||
| }); | ||
| } finally { | ||
| this._replyTaskDone = true; | ||
| } | ||
| } |
There was a problem hiding this comment.
🔴 Async tool's final result can be permanently lost when a delivery is already in progress
A background tool's final result is silently dropped (enqueueReply at agents/src/voice/tool_executor.ts:529) when the delivery task is still running from a prior update, so the agent never speaks the completed result to the user.
Impact: The user never receives the final answer from a long-running background tool, even though the tool completed successfully.
Race between enqueueReply and deliverReply draining pendingUpdates
The race window:
- Tool calls
ctx.update('started')→ firstenqueueReplystartsdeliverReply(sets_replyTaskDone = false). deliverReplycopies and clearspendingUpdates(agents/src/voice/tool_executor.ts:590-591), then awaitswaitForIdle()+generateReply().- While
deliverReplyis still awaiting, the tool finishes andrunToolcallsenqueueReplywith the final result (agents/src/voice/tool_executor.ts:529). - Inside
enqueueReply, the guard atagents/src/voice/tool_executor.ts:426checksthis._replyTaskDone— it's stillfalse(deliverReply hasn't finished), so no new delivery task is started. The final result is pushed topendingUpdates. deliverReplyfinishes and sets_replyTaskDone = trueatagents/src/voice/tool_executor.ts:625, but never re-checkspendingUpdates.- No subsequent
enqueueReplycall ever arrives → the final result sits inpendingUpdatesforever.
Prompt for agents
The deliverReply method drains pendingUpdates once and then exits, setting _replyTaskDone = true. But new updates can arrive (via enqueueReply) after the drain but before _replyTaskDone is set. Since _replyTaskDone is still false at that point, enqueueReply does not start a new delivery task, and the update is stranded.
Fix: After setting _replyTaskDone = true in the finally block, check if pendingUpdates has new items. If so, recursively start a new deliverReply (or loop). Alternatively, use a loop inside deliverReply that keeps draining until pendingUpdates is empty, only setting _replyTaskDone = true when there's truly nothing left. The key invariant is: if pendingUpdates is non-empty after _replyTaskDone becomes true, a new delivery must be scheduled.
Relevant code: ToolExecutor.enqueueReply (line 415-441), ToolExecutor.deliverReply (line 582-627), and ToolExecutor.runTool final enqueueReply call (line 529).
Was this helpful? React with 👍 or 👎 to provide feedback.
| const unlock = await this.duplicateLock.lock(); | ||
| try { | ||
| const duplicateResult = await this.checkDuplicate(functionName, { | ||
| onDuplicate: tool.onDuplicate, | ||
| confirmDuplicate, | ||
| }); | ||
| if (duplicateResult !== undefined) return duplicateResult; | ||
|
|
||
| if (this.runningTasks.has(callId)) { | ||
| throw new Error(`Task already running for call_id: ${callId}`); | ||
| } | ||
|
|
||
| const firstUpdateFuture = new Future<unknown>(); | ||
| runCtx._attachExecutor(this, firstUpdateFuture); | ||
|
|
||
| const controller = new AbortController(); | ||
| const abort = () => { | ||
| queueMicrotask(() => { | ||
| controller.abort(); | ||
| if (!firstUpdateFuture.done) { | ||
| firstUpdateFuture.reject(new Error('tool call was aborted')); | ||
| } | ||
| }); | ||
| }; | ||
| abortSignal?.addEventListener('abort', abort, { once: true }); | ||
|
|
||
| // Once a tool goes non-blocking (it called ctx.update and detached from its | ||
| // owning speech), a speech interruption must NOT abort it — async tools are | ||
| // meant to survive interruptions and deliver their result later (matches | ||
| // Python, where the exe_task is independent and only cancel()/drain() stop it). | ||
| // Stop forwarding the speech abort to this tool; explicit cancel()/drain()/ | ||
| // aclose() still abort it directly via task.controller. | ||
| void firstUpdateFuture.await | ||
| .then(() => { | ||
| if (runCtx.functionCall.extra.__livekit_agents_tool_non_blocking === true) { | ||
| abortSignal?.removeEventListener('abort', abort); | ||
| } | ||
| }) | ||
| .catch(() => {}); | ||
|
|
||
| const toolPromiseRef: { promise?: Promise<unknown> } = {}; | ||
| const promise = this.runTool({ | ||
| tool, | ||
| runCtx, | ||
| rawArguments: args as Parameters, | ||
| firstUpdateFuture, | ||
| controller, | ||
| onUserToolStarted, | ||
| toolPromiseRef, | ||
| }).finally(() => { | ||
| this.runningTasks.delete(callId); | ||
| runningTasks.get(runCtx.session)?.delete(callId); | ||
| abortSignal?.removeEventListener('abort', abort); | ||
| runCtx._detachExecutor(); | ||
| }); | ||
|
|
||
| const task: RunningTask = { | ||
| ctx: runCtx, | ||
| promise, | ||
| controller, | ||
| firstUpdateFuture, | ||
| executor: this, | ||
| allowCancellation: Boolean(tool.flags & ToolFlag.CANCELLABLE), | ||
| toolPromiseRef, | ||
| }; | ||
| this.runningTasks.set(callId, task); | ||
| let sessionTasks = runningTasks.get(runCtx.session); | ||
| if (!sessionTasks) { | ||
| sessionTasks = new Map(); | ||
| runningTasks.set(runCtx.session, sessionTasks); | ||
| } | ||
| sessionTasks.set(callId, task); | ||
|
|
||
| return firstUpdateFuture.await; | ||
| } finally { | ||
| unlock(); | ||
| } |
There was a problem hiding this comment.
🚩 Duplicate lock serializes all concurrent tool executions, not just duplicates
The duplicateLock mutex in ToolExecutor.execute (agents/src/voice/tool_executor.ts:214) is acquired before the duplicate check and held until firstUpdateFuture.await resolves at line 287. For blocking tools (those that never call ctx.update()), firstUpdateFuture only resolves when the tool's execute() completes (line 504-506). This means the lock is held for the entire duration of the tool execution, serializing ALL concurrent tool calls through a single mutex — even calls to completely different tools that have no duplicate concern. For example, if the LLM emits two parallel tool calls (getWeather and playMusic), the second one cannot even begin its duplicate check until the first tool finishes. This may be intentional to match Python's behavior, but it's a significant performance constraint for parallel tool calls.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
90to 1990 and05to 2005 before date validationTesting
pnpm --filter @livekit/agents typecheckpnpm --filter @livekit/agents lint(passes with existing warnings)pnpm --filter @livekit/agents buildNo tests added per porting request.
Ported from livekit/agents#6124
Original PR description
Closes #6067
GetDOBTask's prompt tells the model to normalize two-digit years ("90" likely means 1990), but_update_dob_impltakesyearas a raw int with no lower bound. Smaller/faster models often pass the spoken value through literally, anddate(90, 5, 15)is a valid Python date (year 90 AD): the future-date check passes, noToolErroris raised, and the task completes with a corrupted birthdate. Because this workflow tends to feed identity/healthcare/fintech intake, it's silent data corruption rather than a visible error.Fix: normalize
year < 100at the top of_update_dob_implwith a pivot window keyed on the current year, which is exactly what the prompt already promises. 90 -> 1990, 05 -> 2005, 26 -> 2026, 27 -> 1927. Four-digit years are left untouched, so there's no regression. The issue floated a hard floor (raise onyear < 1900) as an alternative; I went with normalization since it matches the documented prompt contract and the smaller behavioral change.Verified:
tests/test_dob.py(unit, no LLM). It fails onmain(date(90, 5, 15) != date(1990, 5, 15)) and passes with the fix.pytest tests/test_dob.py --unit-> 2 passed.ruff checkandruff format --checkclean on both files.