fix(stt): clean up input frame reads#1869
Conversation
|
| export function combineAbortSignals(signals: AbortSignal[]): AbortSignal { | ||
| const controller = new AbortController(); | ||
| const cleanupCallbacks: (() => void)[] = []; | ||
|
|
||
| const abort = () => { | ||
| for (const cleanup of cleanupCallbacks) { | ||
| cleanup(); | ||
| } | ||
| cleanupCallbacks.length = 0; | ||
|
|
||
| if (!controller.signal.aborted) { | ||
| controller.abort(); | ||
| } | ||
| }; | ||
|
|
||
| for (const signal of signals) { | ||
| if (signal.aborted) { | ||
| abort(); | ||
| break; | ||
| } | ||
|
|
||
| signal.addEventListener('abort', abort, { once: true }); | ||
| cleanupCallbacks.push(() => signal.removeEventListener('abort', abort)); | ||
| } | ||
|
|
||
| return controller.signal; | ||
| } |
There was a problem hiding this comment.
🚩 combineAbortSignals duplicates existing combineSignals with different API
The new combineAbortSignals(signals: AbortSignal[]) at agents/src/utils.ts:1414 overlaps significantly with the existing combineSignals(a, b) at agents/src/utils.ts:1455. Key differences: (1) the new function takes an array vs two params, (2) the new function cleans up listeners on other signals when one fires, (3) the new function does NOT propagate the abort reason (controller.abort() with no argument), while the old one does (c.abort((s as any).reason)). Both leak listeners if neither signal ever fires, which is acceptable in practice since signals are eventually aborted. Consider consolidating these in a future PR.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
AsyncIterableQueue.next()abort-aware so canceled STT send loops do not leave pending input reads behind.input.next()against abort promises.Notes
Verification
pnpm --filter @livekit/agents --filter @livekit/agents-plugins-test --filter @livekit/agents-plugin-silero --filter @livekit/agents-plugin-openai --filter @livekit/agents-plugin-assemblyai --filter @livekit/agents-plugin-cartesia --filter @livekit/agents-plugin-deepgram --filter @livekit/agents-plugin-elevenlabs --filter @livekit/agents-plugin-inworld --filter @livekit/agents-plugin-sarvam --filter @livekit/agents-plugin-soniox --filter @livekit/agents-plugin-xai buildPorted from livekit/agents#6193
Original PR description
Summary
Fixes #6181.
This cleans up the Google STT streaming request generator when the surrounding gRPC stream is cancelled while it is waiting for the next audio frame. The user-visible streaming behavior is unchanged, but the pending
self._input_ch.recv()task is now cancelled and awaited before the generator exits.Background
PR #5591 added a race between
self._input_ch.recv()and a stop event so Google STT can reconnect without leaving an idle request generator parked on the input channel. That fixed the idle-stream leak, but left a smaller cleanup gap: if the request generator itself is cancelled whileframe_taskis still waiting onChan.recv(), the generator'sfinallyblock only cancelledstop_task.When the input channel later closes, that orphaned task can finish with
ChanClosed, causing:Task exception was never retrievedfuture: <Task finished ... coro=<Chan.recv()> exception=ChanClosed()>Changes
frame_task.stop_taskand any pendingframe_taskviautils.aio.gracefully_cancel.test_plugin_google_stt.pycan run locally without real credentials.Validation
python -m pytest tests/test_plugin_google_stt.py -q16 passedpython -m ruff format --check livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py tests/test_plugin_google_stt.pypython -m ruff check livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py tests/test_plugin_google_stt.pypython -m py_compile livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py tests/test_plugin_google_stt.pygit diff --check