fix(console/worker): simulation CLI threading, snake_case session reports, IPC init hardening#1804
Conversation
🦋 Changeset detectedLatest commit: 7cabea4 The changes in this PR will be included in the next version bump. This PR includes changesets to release 35 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
54dd2ee to
ece7c2e
Compare
ece7c2e to
5a0804f
Compare
ecd0e71 to
1b962a3
Compare
| override async close(): Promise<void> { | ||
| this.closed = true; | ||
| await this.channel.close(); | ||
| await super.close(); | ||
| } |
There was a problem hiding this comment.
🟡 TcpAudioInput.close() does not close its AudioResampler, leaking native resources
TcpAudioInput creates an AudioResampler instance at agents/src/voice/console_io.ts:53 but its close() method at line 68-72 never calls this.resampler.close(). The codebase has an established pattern of explicitly closing AudioResampler instances to free native resources (e.g. agents/src/voice/generation.ts:924, agents/src/utils.ts:771), and a prior fix for this exact issue was merged in PR #1453 ("fix audio resampler memory leak"). The TcpAudioOutput at line 85 has the same issue but lacks a close() method entirely, so the fix is less straightforward there.
| override async close(): Promise<void> { | |
| this.closed = true; | |
| await this.channel.close(); | |
| await super.close(); | |
| } | |
| override async close(): Promise<void> { | |
| this.closed = true; | |
| this.resampler.close(); | |
| await this.channel.close(); | |
| await super.close(); | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
Chat-item toJSON() emitted camelCase keys (newAgentId, callId, args, isError, createdAt, ...), but the session report uploaded to LiveKit Cloud is parsed with the Python chat_context models, which use snake_case with no alias. Required fields therefore appeared missing — the simulation summary failed on agent_handoff.new_agent_id (and any function_call would have failed on call_id/arguments). Emit snake_case for all defined fields across ChatMessage, FunctionCall, FunctionCallOutput, AgentHandoffItem, AgentConfigUpdate, and image content, matching Python's to_dict(). Constructor APIs stay camelCase; snapshots regenerated.
Stamp the agent name attribute last so it takes precedence (#1816). Co-authored-by: u9g <u9g@users.noreply.github.com>
The session report's events[] was spread raw (camelCase) and usage[] kept camelCase keys, but session_report.json is parsed with the Python pydantic models (snake_case, no alias) — so the same fields that the chat-item fix addressed were still dropped for events (old_state, is_final, function_calls, created_at, ...) and usage (input_tokens, session_duration, ...). Add eventToJSON (mirrors event.model_dump(): drops speechHandle, reduces error source/error like the Python field serializers) and modelUsageToJSON (snake_case, durations in seconds to match the proto wire format and Python model). Also emit audio_recording_path, audio_recording_started_at, sdk_version, and options.user_away_timeout to match SessionReport.to_dict().
handleRunInput routed run_input through the registered text-input callback, which is fire-and-forget (interrupt + generateReply) and never surfaces the resulting chat items, so the remote driver always received empty responses even though the agent replied. Drive the reply through session.run() directly and capture result.events, mirroring the Python SessionHost. Remove the now-unused SessionHost.registerTextInput; participant text input is handled by RoomIO itself.
Keep chat-item toJSON() emitting the framework's native camelCase shape and do the snake_case wire conversion in report.ts. toSnakeCaseDeep now recurses into toJSON() output (also fixing camelCase metrics keys), special-cases args->arguments, and passes extra dicts through verbatim.
…agent name override) Add a --simulation CLI flag (on the start command, set by `lk simulate`) that threads through to worker options. In simulation mode: skip worker HTTP server creation/start/close so concurrent runs don't collide on a fixed port, disable the worker load limit (loadThreshold = Infinity) so runs can saturate the agent, and resolve agentName from LIVEKIT_AGENT_NAME_OVERRIDE first so lk simulate dispatch matches. Ported from livekit/agents#6055
Pin the js->python field mapping the report layer applies (args->arguments, callId->call_id, isError->is_error, createdAt->created_at, groupId->group_id, thoughtSignature->thought_signature) across message, function_call, and function_call_output items, complementing the camelCase chat_context snapshot.
2b92830 to
c3fa1cb
Compare
uploadSessionReport stringified chatHistory.toJSON() directly, which emits camelCase; the snake_case conversion lives only in sessionReportToJSON. The uploaded chat_history therefore carried camelCase keys and failed the Python consumer's pydantic validation (call_id/arguments/is_error/new_agent_id reported missing). Reuse sessionReportToJSON(report).chat_history so the two serializations stay in sync, and extend the wire-format snapshot to cover agent_handoff (new_agent_id/old_agent_id).
There was a problem hiding this comment.
🚩 simulation=true loadThreshold can be overridden by workerToken cloud guard
When simulation=true, ServerOptions sets loadThreshold=Infinity (agents/src/worker.ts:255). However, the AgentServer constructor's cloud guard (agents/src/worker.ts:361-367) checks if (opts.workerToken) and overrides loadThreshold to Default.loadThreshold(production) (0.7) if it differs. If a simulation is run with a worker token (Cloud deployment), the infinite load threshold would be reset to 0.7, defeating the simulation's purpose of allowing unlimited jobs. This scenario is unlikely in practice since lk simulate probably doesn't use Cloud worker tokens, but worth being aware of.
(Refers to lines 361-367)
Was this helpful? React with 👍 or 👎 to provide feedback.
| return new InferenceProcExecutor({ | ||
| runners: InferenceRunner.registeredRunners, | ||
| // 5 minutes, matching python: loading model files into the child can be | ||
| // slow on first run. |
There was a problem hiding this comment.
🟡 Worker inference initialization timeout silently increased from 30s to 5min
The InferenceProcExecutor.createIfNeeded() refactoring unified the initialization timeout at 5 minutes for both the worker and console paths, but the worker previously used 30 seconds (agents/src/worker.ts old code: initializeTimeout: 30000). The console path already used 5 minutes because it's an interactive tool where first-run model downloads are expected. The worker path intentionally used a shorter 30-second timeout suited for production, where models should already be downloaded (via npx livekit-agents download-files). A broken inference process will now block the worker startup for up to 5 minutes instead of 30 seconds before timing out — a 10× increase in failure detection time in production. None of the PR's changesets mention this behavioral change.
Prompt for agents
The createIfNeeded() static factory consolidates InferenceProcExecutor construction for both the worker (agents/src/worker.ts) and the console (agents/src/console.ts). However, the old worker code used initializeTimeout: 30000 (30 seconds) while the console used 5 * 60 * 1000 (5 minutes). The shared method adopted the console's 5-minute timeout for both paths.
Consider either:
1. Adding an optional parameter to createIfNeeded() (e.g. initializeTimeout) so callers can pass the appropriate timeout for their context, or
2. Keeping the 5-minute timeout if the Python worker also uses 5 minutes and this alignment is intentional, but documenting the change in a changeset so users are aware of the production behavior difference.
Was this helpful? React with 👍 or 👎 to provide feedback.
| function modelUsageToJSON(usage: ModelUsage): Record<string, unknown> { | ||
| const filtered = filterZeroValues(usage) as Record<string, unknown>; | ||
| const out: Record<string, unknown> = {}; | ||
| for (const [k, v] of Object.entries(filtered)) { | ||
| if (v === undefined) { | ||
| continue; | ||
| } | ||
| if (k === 'sessionDurationMs') { | ||
| out.session_duration = (v as number) / 1000; | ||
| } else if (k === 'audioDurationMs') { | ||
| out.audio_duration = (v as number) / 1000; | ||
| } else { | ||
| out[jsToPythonFieldName(k)] = v; | ||
| } | ||
| } | ||
| return out; | ||
| } |
There was a problem hiding this comment.
🚩 modelUsageToJSON converts ms durations to seconds, but event durations remain in ms
The modelUsageToJSON function (agents/src/voice/report.ts:195-211) correctly converts audioDurationMs and sessionDurationMs from milliseconds to seconds for the Python wire format. However, event fields like EotPredictionEvent.inferenceDurationMs and EotPredictionEvent.delayMs are serialized via the generic eventToJSON/toSnakeCaseDeep path which does NOT convert ms to seconds — they become inference_duration_ms and delay_ms with values still in milliseconds. This is likely correct (the Python field names include _ms), but worth verifying against the Python event schema if that exists.
Was this helpful? React with 👍 or 👎 to provide feedback.
The lk.chat_ctx attribute on LLM-generation and EOU-prediction spans was stringified from chatCtx.toJSON() (camelCase), diverging from the Python framework which serializes the same attribute via chat_ctx.to_dict() (snake_case). Route both sites through the shared toSnakeCaseDeep helper (now exported from the report layer) and align timestamp exclusion with Python via toJSON() defaults.
| if (!text) { | ||
| error = 'empty run_input text'; | ||
| } else { | ||
| // Drive the reply through session.run() directly and capture its events, | ||
| // ignoring any registered text-input callback. The room's default text | ||
| // input callback is fire-and-forget (interrupt + generateReply) and never | ||
| // surfaces the resulting chat items, so routing run_input through it would | ||
| // always return empty responses to the remote driver. This mirrors the | ||
| // Python SessionHost behavior. | ||
| try { | ||
| await this.session!.interrupt({ force: true }).await; | ||
| } catch { | ||
| // ignore | ||
| } | ||
|
|
||
| const result = this.session!.run({ userInput: text }); | ||
| try { | ||
| await result.wait(); | ||
| } catch (e) { | ||
| error = e instanceof Error ? e.message : String(e); | ||
| } | ||
| items = chatItemsToProto(result.events.map((ev) => ev.item)); | ||
| const result = this.session!.run({ userInput: text }); | ||
| try { | ||
| await result.wait(); | ||
| } catch (e) { | ||
| error = e instanceof Error ? e.message : String(e); | ||
| } | ||
| items = chatItemsToProto(result.events.map((ev) => ev.item)); | ||
|
|
||
| if (items.length === 0 && !error) { | ||
| error = 'agent produced no response items'; | ||
| } |
There was a problem hiding this comment.
🚩 SessionHost.handleRunInput now bypasses custom textInputCallback
The SessionHost.handleRunInput at agents/src/voice/remote_session.ts:1031-1056 was refactored to always call session.run() directly, bypassing any custom textInputCallback set via RoomInputOptions.textInputCallback. Previously, registerTextInput wired the callback so run_input requests from the remote driver would use it. Users who relied on custom text input callbacks for the SessionHost path will see different behavior — their callback is now only invoked for text arriving through RoomIO's room text stream (room_io.ts:355). The inline comment explains the rationale: the default callback was fire-and-forget and never surfaced response items.
Was this helpful? React with 👍 or 👎 to provide feedback.
Description
Layers simulation/CLI fixes and session-report serialization parity on top of the console CLI work, plus IPC init-failure hardening.
Changes Made
--simulationCLI arg threaded through and read from subcommand options; HTTP server disabled in simulation modeLIVEKIT_AGENT_NAME_OVERRIDEfor simulation dispatch and stamps the agent name on the accepted job participanttoJSONserialized in snake_caseinitialize()fast and kill the child when it dies during startup; use Python's 5-minute inference init timeoutInferenceProcExecutorconstruction unified into acreateIfNeededfactory--recordforces audio recording (Python parity)run_inputreturns agent reply itemsPre-Review Checklist
Testing
restaurant_agent.tsandrealtime_agent.tswork properly (for major changes)Additional Notes
Targets
brian/node-console-cli. Early console work (#1740) is already squash-merged into the base, so this PR shows only the newer fixes on top.Note to reviewers: Please ensure the pre-review checklist is completed before starting your review.