diff --git a/.changeset/deepgram-flux-retryable-close.md b/.changeset/deepgram-flux-retryable-close.md new file mode 100644 index 000000000..ed8b343e8 --- /dev/null +++ b/.changeset/deepgram-flux-retryable-close.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-deepgram': patch +--- + +Recover Deepgram Flux streaming STT after unexpected WebSocket closes by surfacing them as retryable connection errors, and preserve the transcript timebase across the reconnect so timestamps stay monotonic. diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index c722689e6..11b256b0d 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -1,16 +1,397 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { VAD } from '@livekit/agents-plugin-silero'; -import { stt } from '@livekit/agents-plugins-test'; -import { describe, it } from 'vitest'; -import { STTv2 } from './stt_v2.js'; +import { stt } from '@livekit/agents'; +import { AudioFrame } from '@livekit/rtc-node'; +import { describe, expect, it } from 'vitest'; +import { type WebSocket, WebSocketServer } from 'ws'; +import { STTv2, type STTv2Options } from './stt_v2.js'; + +const TEST_CONN_OPTIONS = { maxRetry: 1, retryIntervalMs: 1, timeoutMs: 1000 }; + +async function startWebSocketServer(): Promise<{ + wss: WebSocketServer; + endpointUrl: string; +}> { + const wss = new WebSocketServer({ host: '127.0.0.1', port: 0 }); + await new Promise((resolve) => wss.once('listening', resolve)); + + const address = wss.address(); + if (address === null || typeof address === 'string') { + throw new Error('failed to bind test WebSocket server'); + } + + return { + wss, + endpointUrl: `ws://127.0.0.1:${address.port}/v2/listen`, + }; +} + +async function closeWebSocketServer(wss: WebSocketServer): Promise { + for (const client of wss.clients) { + client.terminate(); + } + + await new Promise((resolve, reject) => { + wss.close((error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitFor(condition: () => boolean, message: string): Promise { + const startedAt = Date.now(); + + while (Date.now() - startedAt < 1000) { + if (condition()) return; + await sleep(5); + } + + throw new Error(message); +} + +function createStream(endpointUrl: string, maxRetry = 1) { + return new STTv2({ apiKey: 'test-api-key', endpointUrl }).stream({ + connOptions: { ...TEST_CONN_OPTIONS, maxRetry }, + }); +} + +describe('Deepgram STTv2 WebSocket recovery', () => { + it('reconnects when Deepgram closes the WebSocket unexpectedly', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + if (connections.length === 1) { + setTimeout(() => ws.close(1011, 'unexpected close'), 10); + } + }); + + try { + await waitFor( + () => connections.length === 2, + `expected retry to open a second WebSocket, saw ${connections.length}`, + ); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('does not reconnect after normal input end closes the WebSocket', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const messages: Array> = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + ws.on('message', (data, isBinary) => { + if (isBinary) return; + + messages.push(JSON.parse(data.toString()) as Record); + ws.close(1000, 'stream complete'); + }); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.endInput(); + + await waitFor( + () => messages.some((message) => message.type === 'CloseStream'), + 'expected client to send CloseStream', + ); + await sleep(50); + + expect(connections).toHaveLength(1); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('does not reconnect or close after flush before normal input end', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const messages: Array> = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + ws.on('message', (data, isBinary) => { + if (isBinary) return; + + const message = JSON.parse(data.toString()) as Record; + messages.push(message); + if (message.type === 'CloseStream') { + ws.close(1000, 'stream complete'); + } + }); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.flush(); + + await sleep(50); + expect(messages.some((message) => message.type === 'CloseStream')).toBe(false); + expect(connections).toHaveLength(1); + + stream.endInput(); + + await waitFor( + () => messages.some((message) => message.type === 'CloseStream'), + 'expected client to send CloseStream', + ); + await sleep(50); + + expect(connections).toHaveLength(1); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('treats option-update reconnects as intentional WebSocket closes', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl, 0) as stt.SpeechStream & { + updateOptions(opts: Partial): void; + }; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.updateOptions({ keyterms: ['updated'] }); + + await waitFor( + () => connections.length === 2, + `expected option update to reconnect once, saw ${connections.length}`, + ); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('keeps transcript timestamps monotonic across an unexpected reconnect', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + let firstConnAudioBytes = 0; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + const index = connections.push(ws); + if (index === 1) { + ws.on('message', (data, isBinary) => { + if (isBinary) firstConnAudioBytes += (data as Buffer).length; + }); + } + }); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + const endOfTurn = (audioWindowEnd: number) => + JSON.stringify({ + type: 'TurnInfo', + event: 'EndOfTurn', + transcript: 'hello', + audio_window_start: Math.max(0, audioWindowEnd - 0.5), + audio_window_end: audioWindowEnd, + words: [ + { + word: 'hello', + start: Math.max(0, audioWindowEnd - 0.5), + end: audioWindowEnd, + confidence: 0.9, + }, + ], + }); + + const finalEndTimes: number[] = []; + const consume = (async () => { + for await (const event of stream) { + if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT && event.alternatives?.[0]) { + finalEndTimes.push(event.alternatives[0].endTime); + } + } + })(); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + // Stream ~2s of 16 kHz mono audio so the first connection advances the timeline. + const oneSecond = () => new AudioFrame(new Int16Array(16_000), 16_000, 1, 16_000); + stream.pushFrame(oneSecond()); + stream.pushFrame(oneSecond()); + // 16 kHz * 2 bytes/sample * 1.5s = 48_000 bytes => at least 1.5s reached the socket. + await waitFor( + () => firstConnAudioBytes >= 48_000, + `expected ~2s of audio at the first connection, saw ${firstConnAudioBytes} bytes`, + ); + + // First turn ends 1.0s into the first connection's audio window. + connections[0]!.send(startOfTurn); + connections[0]!.send(endOfTurn(1.0)); + await waitFor(() => finalEndTimes.length === 1, 'expected first final transcript'); + + // Unexpected close -> base-class retry reconnects. + connections[0]!.close(1011, 'unexpected close'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + // The fresh Deepgram socket restarts audio_window near 0; this turn ends at + // 0.5 within the NEW connection's window. + connections[1]!.send(startOfTurn); + connections[1]!.send(endOfTurn(0.5)); + await waitFor(() => finalEndTimes.length === 2, 'expected second final transcript'); + + // Without timebase preservation the second final lands at ~0.5s — earlier than + // the first — and downstream "before answer audio" logic would drop it. + expect(finalEndTimes[1]!).toBeGreaterThan(finalEndTimes[0]!); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); + + it('emits start of speech after reconnecting during an active turn', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + const eventTypes: stt.SpeechEventType[] = []; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + const consume = (async () => { + for await (const event of stream) { + eventTypes.push(event.type); + } + })(); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + connections[0]!.send(startOfTurn); + await waitFor( + () => + eventTypes.filter((eventType) => eventType === stt.SpeechEventType.START_OF_SPEECH) + .length === 1, + 'expected first start of speech', + ); + + connections[0]!.close(1011, 'unexpected close during speech'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + connections[1]!.send(startOfTurn); + await waitFor( + () => + eventTypes.filter((eventType) => eventType === stt.SpeechEventType.START_OF_SPEECH) + .length === 2, + 'expected second start of speech after reconnect', + ); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); + + it('starts speech from a non-empty update after reconnecting when StartOfTurn is absent', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + const events: Array<{ type: stt.SpeechEventType; text?: string }> = []; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + const consume = (async () => { + for await (const event of stream) { + events.push({ + type: event.type, + text: event.alternatives?.[0]?.text, + }); + } + })(); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + const update = JSON.stringify({ + type: 'TurnInfo', + event: 'Update', + transcript: 'after reconnect', + audio_window_start: 0, + audio_window_end: 0.5, + words: [ + { word: 'after', start: 0, end: 0.25, confidence: 0.9 }, + { word: 'reconnect', start: 0.25, end: 0.5, confidence: 0.9 }, + ], + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + connections[0]!.send(startOfTurn); + await waitFor( + () => + events.filter((event) => event.type === stt.SpeechEventType.START_OF_SPEECH).length === 1, + 'expected first start of speech', + ); + + connections[0]!.close(1011, 'unexpected close during speech'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + connections[1]!.send(update); + await waitFor( + () => + events.filter((event) => event.type === stt.SpeechEventType.START_OF_SPEECH).length === + 2 && events.some((event) => event.text === 'after reconnect'), + 'expected synthesized start of speech and update transcript after reconnect', + ); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); +}); const hasDeepgramApiKey = Boolean(process.env.DEEPGRAM_API_KEY); if (hasDeepgramApiKey) { describe('Deepgram STTv2 (Flux)', async () => { - await stt(new STTv2(), await VAD.load(), { nonStreaming: false }); + const sileroPackage = '@livekit/agents-plugin-silero'; + const pluginsTestPackage = '@livekit/agents-plugins-test'; + const [{ VAD }, { stt: runSttTests }] = await Promise.all([ + import(/* @vite-ignore */ sileroPackage), + import(/* @vite-ignore */ pluginsTestPackage), + ]); + + await runSttTests(new STTv2(), await VAD.load(), { nonStreaming: false }); }); } else { describe('Deepgram STTv2 (Flux)', () => { diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 2558bc787..9ce781bde 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { type APIConnectOptions, + APIConnectionError, AudioByteStream, Event, calculateAudioDurationSeconds, @@ -214,11 +215,24 @@ class SpeechStreamv2 extends stt.SpeechStream { #opts: STTv2Options & { apiKey: string }; #logger = log(); #ws: WebSocket | null = null; + #closingWs = new WeakSet(); #audioDurationCollector: PeriodicCollector; #requestId = ''; #speaking = false; + // Monotonic timestamp base across reconnects. Deepgram's audio_window restarts + // at 0 on every new socket, so each connection's window times are offset by the + // audio already streamed to prior connections (#sentAudioInS snapshotted at + // connect into #connectionTimeBaseInS). Without this, transcripts after a + // reconnect would be timestamped near the start of the session. + // + // The SDK sets startTimeOffset once at stream creation (voice/agent.ts sttNode) + // and relies on the plugin to keep audio_window continuous across its own + // reconnects ("linear timestamps across reconnections") — this preserves that. + #sentAudioInS = 0; + #connectionTimeBaseInS = 0; + // Parity: _reconnect_event - using existing Event class from @livekit/agents #reconnectEvent = new Event(); @@ -280,9 +294,20 @@ class SpeechStreamv2 extends stt.SpeechStream { this.#ws.once('error', onError); }); + // Snapshot the timeline base for this connection: the fresh socket's + // audio_window starts at 0, so offset it by the audio already streamed. + this.#connectionTimeBaseInS = this.#sentAudioInS; + // Provider turn state is scoped to one WebSocket. A reconnect during an + // active turn must not make the next connection suppress StartOfTurn. + this.#speaking = false; + // 2. Run Concurrent Tasks (Send & Receive) - const sendPromise = this.#sendTask(); - const recvPromise = this.#recvTask(); + const ws = this.#ws; + if (!ws) throw new Error('WebSocket not initialized'); + + const wsClosedEvent = new Event(); + const sendPromise = this.#sendTask(ws, wsClosedEvent); + const recvPromise = this.#recvTask(ws, wsClosedEvent); const reconnectWait = this.#reconnectEvent.wait(); // 3. Race: Normal Completion vs Reconnect Signal @@ -294,6 +319,7 @@ class SpeechStreamv2 extends stt.SpeechStream { if (result === 'RECONNECT') { this.#logger.debug('Reconnecting stream due to option update...'); // Close current socket; loop will restart and open a new one + this.#expectWsClose(this.#ws); this.#ws.close(); } else { // Normal finish (Stream ended or Error thrown) @@ -304,6 +330,7 @@ class SpeechStreamv2 extends stt.SpeechStream { throw error; // Let Base Class handle retry logic } finally { if (this.#ws?.readyState === WebSocket.OPEN) { + this.#expectWsClose(this.#ws); this.#ws.close(); } } @@ -311,81 +338,64 @@ class SpeechStreamv2 extends stt.SpeechStream { this.close(); } - async #sendTask() { - if (!this.#ws) return; - + async #sendTask(ws: WebSocket, wsClosedEvent: Event) { // Buffer audio into 50ms chunks (Parity) const samples50ms = Math.floor(this.#opts.sampleRate / 20); const audioBstream = new AudioByteStream(this.#opts.sampleRate, 1, samples50ms); - let hasEnded = false; - // Manual Iterator to allow racing against Reconnect Signal const iterator = this.input[Symbol.asyncIterator](); + const sendFrames = (frames: AudioFrame[]) => { + for (const frame of frames) { + const durationInS = calculateAudioDurationSeconds(frame); + this.#audioDurationCollector.push(durationInS); + // Track total audio consumed so reconnects can preserve the timeline. + this.#sentAudioInS += durationInS; + + if (ws.readyState === WebSocket.OPEN) { + ws.send(frame.data); + } + } + }; while (true) { const nextPromise = iterator.next(); - // If reconnect signal fires, abort the wait - const abortPromise = this.#reconnectEvent.wait().then(() => ({ abort: true }) as const); + // If reconnect or WebSocket close fires, abort the wait + const abortPromise = Promise.race([this.#reconnectEvent.wait(), wsClosedEvent.wait()]).then( + () => ({ abort: true }) as const, + ); const result = await Promise.race([nextPromise, abortPromise]); // Check if we need to abort (Reconnect) or if stream is done if ('abort' in result || result.done) { - if (!('abort' in result) && result.done) { - // Normal stream end - hasEnded = true; - } else { - // Reconnect triggered - break loop immediately - break; - } + break; } - // If we broke above, we don't process data. If not, 'result' is IteratorResult - if (hasEnded && result.value === undefined) { - // Process flush below - } else if ('value' in result) { - const data = result.value; - const frames: AudioFrame[] = []; - - if (data === SpeechStreamv2.FLUSH_SENTINEL) { - frames.push(...audioBstream.flush()); - hasEnded = true; - } else { - frames.push(...audioBstream.write((data as AudioFrame).data.buffer as ArrayBuffer)); - } - - for (const frame of frames) { - this.#audioDurationCollector.push(calculateAudioDurationSeconds(frame)); - - if (this.#ws!.readyState === WebSocket.OPEN) { - this.#ws!.send(frame.data); - } + const data = result.value; - if (hasEnded) { - this.#audioDurationCollector.flush(); - hasEnded = false; - } - } + if (data === SpeechStreamv2.FLUSH_SENTINEL) { + sendFrames(audioBstream.flush()); + this.#audioDurationCollector.flush(); + continue; } - if (hasEnded) break; + sendFrames(audioBstream.write((data as AudioFrame).data.buffer as ArrayBuffer)); } // Only send CloseStream if we are exiting normally (not reconnecting) - if (!this.#reconnectEvent.isSet && this.#ws!.readyState === WebSocket.OPEN) { + if (!this.#reconnectEvent.isSet && !wsClosedEvent.isSet && ws.readyState === WebSocket.OPEN) { this.#logger.debug('Sending CloseStream message to Deepgram'); - this.#ws!.send(_CLOSE_MSG); + this.#expectWsClose(ws); + ws.send(_CLOSE_MSG); } } - async #recvTask() { - if (!this.#ws) return; - - return new Promise((resolve) => { - if (!this.#ws) return resolve(); + async #recvTask(ws: WebSocket, wsClosedEvent: Event) { + return new Promise((resolve, reject) => { + let wsError: Error | undefined; - this.#ws.on('message', (data: Buffer, isBinary: boolean) => { + ws.on('message', (data: Buffer, isBinary: boolean) => { if (isBinary) { this.#logger.warn('Received unexpected binary message from Deepgram'); return; @@ -398,13 +408,29 @@ class SpeechStreamv2 extends stt.SpeechStream { } }); - this.#ws.on('close', (code, reason) => { + ws.on('close', (code, reason) => { + wsClosedEvent.set(); this.#logger.debug(`Deepgram WebSocket closed: ${code} ${reason}`); - resolve(); + + if (this.#closingWs.has(ws) || this.closed || this.input.closed) { + resolve(); + return; + } + + const reasonText = reason.toString(); + const message = reasonText + ? `Deepgram WebSocket closed unexpectedly: ${code} ${reasonText}` + : `Deepgram WebSocket closed unexpectedly: ${code}`; + reject( + new APIConnectionError({ + message: wsError ? `${message}: ${wsError.message}` : message, + }), + ); }); - // Errors are caught by run() listener, resolve here to clean up task - this.#ws.on('error', () => resolve()); + ws.on('error', (error) => { + wsError = error; + }); }); } @@ -419,23 +445,20 @@ class SpeechStreamv2 extends stt.SpeechStream { if (eventType === 'StartOfTurn') { if (this.#speaking) return; - this.#speaking = true; - this.queue.put({ - type: stt.SpeechEventType.START_OF_SPEECH, - requestId: this.#requestId, - }); + this.#startSpeech(); this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'Update') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'EagerEndOfTurn') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#sendTranscriptEvent(stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, data); } else if (eventType === 'TurnResumed') { + if (!this.#speaking) this.#startSpeechFromTranscript(data); this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'EndOfTurn') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#speaking = false; this.#sendTranscriptEvent(stt.SpeechEventType.FINAL_TRANSCRIPT, data); @@ -452,8 +475,30 @@ class SpeechStreamv2 extends stt.SpeechStream { } } + #startSpeech() { + this.#speaking = true; + this.queue.put({ + type: stt.SpeechEventType.START_OF_SPEECH, + requestId: this.#requestId, + }); + } + + #startSpeechFromTranscript(data: Record) { + const transcript = data.transcript; + if (typeof transcript !== 'string' || transcript.trim().length === 0) { + return false; + } + + this.#startSpeech(); + return true; + } + #sendTranscriptEvent(eventType: stt.SpeechEventType, data: Record) { - const alts = parseTranscription(this.#opts.language || 'en', data, this.startTimeOffset); + const alts = parseTranscription( + this.#opts.language || 'en', + data, + this.startTimeOffset + this.#connectionTimeBaseInS, + ); if (alts.length > 0) { this.queue.put({ @@ -502,7 +547,14 @@ class SpeechStreamv2 extends stt.SpeechStream { return `${baseUrl}?${qs}`; } + #expectWsClose(ws: WebSocket | null) { + if (ws) { + this.#closingWs.add(ws); + } + } + override close() { + this.#expectWsClose(this.#ws); super.close(); this.#ws?.close(); }