From 1316a12fed8447242c77a99fce879c86aa7598ee Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Tue, 16 Jun 2026 16:19:35 -0700 Subject: [PATCH 1/7] fix(deepgram): recover Flux STT WebSocket closes --- .changeset/deepgram-flux-retryable-close.md | 5 + plugins/deepgram/src/stt_v2.test.ts | 159 +++++++++++++++++++- plugins/deepgram/src/stt_v2.ts | 71 ++++++--- 3 files changed, 209 insertions(+), 26 deletions(-) create mode 100644 .changeset/deepgram-flux-retryable-close.md diff --git a/.changeset/deepgram-flux-retryable-close.md b/.changeset/deepgram-flux-retryable-close.md new file mode 100644 index 000000000..8d8704289 --- /dev/null +++ b/.changeset/deepgram-flux-retryable-close.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-deepgram': patch +--- + +Recover Deepgram Flux streaming STT after unexpected WebSocket closes by surfacing them as retryable connection errors. diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index c722689e6..473b72c86 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -1,16 +1,165 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { VAD } from '@livekit/agents-plugin-silero'; -import { stt } from '@livekit/agents-plugins-test'; -import { describe, it } from 'vitest'; -import { STTv2 } from './stt_v2.js'; +import type { stt } from '@livekit/agents'; +import { describe, expect, it } from 'vitest'; +import { type WebSocket, WebSocketServer } from 'ws'; +import { STTv2, type STTv2Options } from './stt_v2.js'; + +const TEST_CONN_OPTIONS = { maxRetry: 1, retryIntervalMs: 1, timeoutMs: 1000 }; + +async function startWebSocketServer(): Promise<{ + wss: WebSocketServer; + endpointUrl: string; +}> { + const wss = new WebSocketServer({ host: '127.0.0.1', port: 0 }); + await new Promise((resolve) => wss.once('listening', resolve)); + + const address = wss.address(); + if (address === null || typeof address === 'string') { + throw new Error('failed to bind test WebSocket server'); + } + + return { + wss, + endpointUrl: `ws://127.0.0.1:${address.port}/v2/listen`, + }; +} + +async function closeWebSocketServer(wss: WebSocketServer): Promise { + for (const client of wss.clients) { + client.terminate(); + } + + await new Promise((resolve, reject) => { + wss.close((error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitFor(condition: () => boolean, message: string): Promise { + const startedAt = Date.now(); + + while (Date.now() - startedAt < 1000) { + if (condition()) return; + await sleep(5); + } + + throw new Error(message); +} + +function createStream(endpointUrl: string, maxRetry = 1) { + return new STTv2({ apiKey: 'test-api-key', endpointUrl }).stream({ + connOptions: { ...TEST_CONN_OPTIONS, maxRetry }, + }); +} + +describe('Deepgram STTv2 WebSocket recovery', () => { + it('reconnects when Deepgram closes the WebSocket unexpectedly', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + if (connections.length === 1) { + setTimeout(() => ws.close(1011, 'unexpected close'), 10); + } + }); + + try { + await waitFor( + () => connections.length === 2, + `expected retry to open a second WebSocket, saw ${connections.length}`, + ); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('does not reconnect after normal input end closes the WebSocket', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const messages: Array> = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + ws.on('message', (data) => { + messages.push(JSON.parse(data.toString()) as Record); + ws.close(1000, 'stream complete'); + }); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.endInput(); + + await waitFor( + () => messages.some((message) => message.type === 'CloseStream'), + 'expected client to send CloseStream', + ); + await sleep(50); + + expect(connections).toHaveLength(1); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + + it('treats option-update reconnects as intentional WebSocket closes', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl, 0) as stt.SpeechStream & { + updateOptions(opts: Partial): void; + }; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.updateOptions({ keyterms: ['updated'] }); + + await waitFor( + () => connections.length === 2, + `expected option update to reconnect once, saw ${connections.length}`, + ); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); +}); const hasDeepgramApiKey = Boolean(process.env.DEEPGRAM_API_KEY); if (hasDeepgramApiKey) { describe('Deepgram STTv2 (Flux)', async () => { - await stt(new STTv2(), await VAD.load(), { nonStreaming: false }); + const sileroPackage = '@livekit/agents-plugin-silero'; + const pluginsTestPackage = '@livekit/agents-plugins-test'; + const [{ VAD }, { stt: runSttTests }] = await Promise.all([ + import(/* @vite-ignore */ sileroPackage), + import(/* @vite-ignore */ pluginsTestPackage), + ]); + + await runSttTests(new STTv2(), await VAD.load(), { nonStreaming: false }); }); } else { describe('Deepgram STTv2 (Flux)', () => { diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 2558bc787..c7756f726 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { type APIConnectOptions, + APIConnectionError, AudioByteStream, Event, calculateAudioDurationSeconds, @@ -214,6 +215,7 @@ class SpeechStreamv2 extends stt.SpeechStream { #opts: STTv2Options & { apiKey: string }; #logger = log(); #ws: WebSocket | null = null; + #closingWs = new WeakSet(); #audioDurationCollector: PeriodicCollector; #requestId = ''; @@ -281,8 +283,12 @@ class SpeechStreamv2 extends stt.SpeechStream { }); // 2. Run Concurrent Tasks (Send & Receive) - const sendPromise = this.#sendTask(); - const recvPromise = this.#recvTask(); + const ws = this.#ws; + if (!ws) throw new Error('WebSocket not initialized'); + + const wsClosedEvent = new Event(); + const sendPromise = this.#sendTask(ws, wsClosedEvent); + const recvPromise = this.#recvTask(ws, wsClosedEvent); const reconnectWait = this.#reconnectEvent.wait(); // 3. Race: Normal Completion vs Reconnect Signal @@ -294,6 +300,7 @@ class SpeechStreamv2 extends stt.SpeechStream { if (result === 'RECONNECT') { this.#logger.debug('Reconnecting stream due to option update...'); // Close current socket; loop will restart and open a new one + this.#expectWsClose(this.#ws); this.#ws.close(); } else { // Normal finish (Stream ended or Error thrown) @@ -304,6 +311,7 @@ class SpeechStreamv2 extends stt.SpeechStream { throw error; // Let Base Class handle retry logic } finally { if (this.#ws?.readyState === WebSocket.OPEN) { + this.#expectWsClose(this.#ws); this.#ws.close(); } } @@ -311,9 +319,7 @@ class SpeechStreamv2 extends stt.SpeechStream { this.close(); } - async #sendTask() { - if (!this.#ws) return; - + async #sendTask(ws: WebSocket, wsClosedEvent: Event) { // Buffer audio into 50ms chunks (Parity) const samples50ms = Math.floor(this.#opts.sampleRate / 20); const audioBstream = new AudioByteStream(this.#opts.sampleRate, 1, samples50ms); @@ -325,8 +331,10 @@ class SpeechStreamv2 extends stt.SpeechStream { while (true) { const nextPromise = iterator.next(); - // If reconnect signal fires, abort the wait - const abortPromise = this.#reconnectEvent.wait().then(() => ({ abort: true }) as const); + // If reconnect or WebSocket close fires, abort the wait + const abortPromise = Promise.race([this.#reconnectEvent.wait(), wsClosedEvent.wait()]).then( + () => ({ abort: true }) as const, + ); const result = await Promise.race([nextPromise, abortPromise]); @@ -358,8 +366,8 @@ class SpeechStreamv2 extends stt.SpeechStream { for (const frame of frames) { this.#audioDurationCollector.push(calculateAudioDurationSeconds(frame)); - if (this.#ws!.readyState === WebSocket.OPEN) { - this.#ws!.send(frame.data); + if (ws.readyState === WebSocket.OPEN) { + ws.send(frame.data); } if (hasEnded) { @@ -373,19 +381,17 @@ class SpeechStreamv2 extends stt.SpeechStream { } // Only send CloseStream if we are exiting normally (not reconnecting) - if (!this.#reconnectEvent.isSet && this.#ws!.readyState === WebSocket.OPEN) { + if (!this.#reconnectEvent.isSet && !wsClosedEvent.isSet && ws.readyState === WebSocket.OPEN) { this.#logger.debug('Sending CloseStream message to Deepgram'); - this.#ws!.send(_CLOSE_MSG); + ws.send(_CLOSE_MSG); } } - async #recvTask() { - if (!this.#ws) return; + async #recvTask(ws: WebSocket, wsClosedEvent: Event) { + return new Promise((resolve, reject) => { + let wsError: Error | undefined; - return new Promise((resolve) => { - if (!this.#ws) return resolve(); - - this.#ws.on('message', (data: Buffer, isBinary: boolean) => { + ws.on('message', (data: Buffer, isBinary: boolean) => { if (isBinary) { this.#logger.warn('Received unexpected binary message from Deepgram'); return; @@ -398,13 +404,29 @@ class SpeechStreamv2 extends stt.SpeechStream { } }); - this.#ws.on('close', (code, reason) => { + ws.on('close', (code, reason) => { + wsClosedEvent.set(); this.#logger.debug(`Deepgram WebSocket closed: ${code} ${reason}`); - resolve(); + + if (this.#closingWs.has(ws) || this.closed || this.input.closed) { + resolve(); + return; + } + + const reasonText = reason.toString(); + const message = reasonText + ? `Deepgram WebSocket closed unexpectedly: ${code} ${reasonText}` + : `Deepgram WebSocket closed unexpectedly: ${code}`; + reject( + new APIConnectionError({ + message: wsError ? `${message}: ${wsError.message}` : message, + }), + ); }); - // Errors are caught by run() listener, resolve here to clean up task - this.#ws.on('error', () => resolve()); + ws.on('error', (error) => { + wsError = error; + }); }); } @@ -502,7 +524,14 @@ class SpeechStreamv2 extends stt.SpeechStream { return `${baseUrl}?${qs}`; } + #expectWsClose(ws: WebSocket | null) { + if (ws) { + this.#closingWs.add(ws); + } + } + override close() { + this.#expectWsClose(this.#ws); super.close(); this.#ws?.close(); } From e9c7b86639354d8d1ab3141117edae78e1929fe3 Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Tue, 16 Jun 2026 17:49:39 -0700 Subject: [PATCH 2/7] fix(deepgram): treat Flux flush close as expected --- plugins/deepgram/src/stt_v2.test.ts | 42 ++++++++++++++++++++++++++++- plugins/deepgram/src/stt_v2.ts | 11 ++++---- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index 473b72c86..2361e5936 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -97,7 +97,9 @@ describe('Deepgram STTv2 WebSocket recovery', () => { wss.on('connection', (ws) => { connections.push(ws); - ws.on('message', (data) => { + ws.on('message', (data, isBinary) => { + if (isBinary) return; + messages.push(JSON.parse(data.toString()) as Record); ws.close(1000, 'stream complete'); }); @@ -121,6 +123,44 @@ describe('Deepgram STTv2 WebSocket recovery', () => { } }); + it('does not reconnect after flush sends CloseStream', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const messages: Array> = []; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + connections.push(ws); + + ws.on('message', (data, isBinary) => { + if (isBinary) return; + + const message = JSON.parse(data.toString()) as Record; + messages.push(message); + if (message.type === 'CloseStream') { + ws.close(1000, 'stream complete'); + } + }); + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + stream.flush(); + + await waitFor( + () => messages.some((message) => message.type === 'CloseStream'), + 'expected client to send CloseStream', + ); + await sleep(50); + + expect(connections).toHaveLength(1); + } finally { + stream.close(); + await closeWebSocketServer(wss); + } + }); + it('treats option-update reconnects as intentional WebSocket closes', async () => { const { wss, endpointUrl } = await startWebSocketServer(); const connections: WebSocket[] = []; diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index c7756f726..0b8154534 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -369,20 +369,19 @@ class SpeechStreamv2 extends stt.SpeechStream { if (ws.readyState === WebSocket.OPEN) { ws.send(frame.data); } - - if (hasEnded) { - this.#audioDurationCollector.flush(); - hasEnded = false; - } } } - if (hasEnded) break; + if (hasEnded) { + this.#audioDurationCollector.flush(); + break; + } } // Only send CloseStream if we are exiting normally (not reconnecting) if (!this.#reconnectEvent.isSet && !wsClosedEvent.isSet && ws.readyState === WebSocket.OPEN) { this.#logger.debug('Sending CloseStream message to Deepgram'); + this.#expectWsClose(ws); ws.send(_CLOSE_MSG); } } From 3bb2b99ab9fb6f4d1aabb8dc9ce70e34fc1b2459 Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Tue, 16 Jun 2026 18:08:56 -0700 Subject: [PATCH 3/7] fix(deepgram): preserve Flux flush stream --- plugins/deepgram/src/stt_v2.test.ts | 8 ++++- plugins/deepgram/src/stt_v2.ts | 49 ++++++++++------------------- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index 2361e5936..43e25d5b3 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -123,7 +123,7 @@ describe('Deepgram STTv2 WebSocket recovery', () => { } }); - it('does not reconnect after flush sends CloseStream', async () => { + it('does not reconnect or close after flush before normal input end', async () => { const { wss, endpointUrl } = await startWebSocketServer(); const connections: WebSocket[] = []; const messages: Array> = []; @@ -148,6 +148,12 @@ describe('Deepgram STTv2 WebSocket recovery', () => { stream.flush(); + await sleep(50); + expect(messages.some((message) => message.type === 'CloseStream')).toBe(false); + expect(connections).toHaveLength(1); + + stream.endInput(); + await waitFor( () => messages.some((message) => message.type === 'CloseStream'), 'expected client to send CloseStream', diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 0b8154534..6627b7164 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -324,10 +324,17 @@ class SpeechStreamv2 extends stt.SpeechStream { const samples50ms = Math.floor(this.#opts.sampleRate / 20); const audioBstream = new AudioByteStream(this.#opts.sampleRate, 1, samples50ms); - let hasEnded = false; - // Manual Iterator to allow racing against Reconnect Signal const iterator = this.input[Symbol.asyncIterator](); + const sendFrames = (frames: AudioFrame[]) => { + for (const frame of frames) { + this.#audioDurationCollector.push(calculateAudioDurationSeconds(frame)); + + if (ws.readyState === WebSocket.OPEN) { + ws.send(frame.data); + } + } + }; while (true) { const nextPromise = iterator.next(); @@ -340,42 +347,18 @@ class SpeechStreamv2 extends stt.SpeechStream { // Check if we need to abort (Reconnect) or if stream is done if ('abort' in result || result.done) { - if (!('abort' in result) && result.done) { - // Normal stream end - hasEnded = true; - } else { - // Reconnect triggered - break loop immediately - break; - } + break; } - // If we broke above, we don't process data. If not, 'result' is IteratorResult - if (hasEnded && result.value === undefined) { - // Process flush below - } else if ('value' in result) { - const data = result.value; - const frames: AudioFrame[] = []; + const data = result.value; - if (data === SpeechStreamv2.FLUSH_SENTINEL) { - frames.push(...audioBstream.flush()); - hasEnded = true; - } else { - frames.push(...audioBstream.write((data as AudioFrame).data.buffer as ArrayBuffer)); - } - - for (const frame of frames) { - this.#audioDurationCollector.push(calculateAudioDurationSeconds(frame)); - - if (ws.readyState === WebSocket.OPEN) { - ws.send(frame.data); - } - } - } - - if (hasEnded) { + if (data === SpeechStreamv2.FLUSH_SENTINEL) { + sendFrames(audioBstream.flush()); this.#audioDurationCollector.flush(); - break; + continue; } + + sendFrames(audioBstream.write((data as AudioFrame).data.buffer as ArrayBuffer)); } // Only send CloseStream if we are exiting normally (not reconnecting) From 71bf6626e239593e01e0601c9618d746e18da6c1 Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Tue, 16 Jun 2026 21:53:31 -0700 Subject: [PATCH 4/7] fix(deepgram): preserve STT timebase across reconnect After an unexpected reconnect the fresh Deepgram socket restarts audio_window at 0, but startTimeOffset was left unchanged, so post-reconnect transcripts were timestamped near the start of the session. Downstream consumers that gate on absolute timing (e.g. drop finals that land before the current answer's audio) would discard them, so long sessions could still stall after the first reconnect. Track the audio already streamed (#sentAudioSec) and snapshot it as a per-connection base at each connect, offsetting that connection's window times by it so transcript timestamps stay monotonic across reconnects. Adds a regression test asserting the second turn's final timestamp does not reset below the first across an unexpected reconnect. --- .changeset/deepgram-flux-retryable-close.md | 2 +- plugins/deepgram/src/stt_v2.test.ts | 83 ++++++++++++++++++++- plugins/deepgram/src/stt_v2.ts | 23 +++++- 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/.changeset/deepgram-flux-retryable-close.md b/.changeset/deepgram-flux-retryable-close.md index 8d8704289..ed8b343e8 100644 --- a/.changeset/deepgram-flux-retryable-close.md +++ b/.changeset/deepgram-flux-retryable-close.md @@ -2,4 +2,4 @@ '@livekit/agents-plugin-deepgram': patch --- -Recover Deepgram Flux streaming STT after unexpected WebSocket closes by surfacing them as retryable connection errors. +Recover Deepgram Flux streaming STT after unexpected WebSocket closes by surfacing them as retryable connection errors, and preserve the transcript timebase across the reconnect so timestamps stay monotonic. diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index 43e25d5b3..50c7aa35b 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { stt } from '@livekit/agents'; +import { stt } from '@livekit/agents'; +import { AudioFrame } from '@livekit/rtc-node'; import { describe, expect, it } from 'vitest'; import { type WebSocket, WebSocketServer } from 'ws'; import { STTv2, type STTv2Options } from './stt_v2.js'; @@ -192,6 +193,86 @@ describe('Deepgram STTv2 WebSocket recovery', () => { await closeWebSocketServer(wss); } }); + + it('keeps transcript timestamps monotonic across an unexpected reconnect', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + let firstConnAudioBytes = 0; + const stream = createStream(endpointUrl); + + wss.on('connection', (ws) => { + const index = connections.push(ws); + if (index === 1) { + ws.on('message', (data, isBinary) => { + if (isBinary) firstConnAudioBytes += (data as Buffer).length; + }); + } + }); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + const endOfTurn = (audioWindowEnd: number) => + JSON.stringify({ + type: 'TurnInfo', + event: 'EndOfTurn', + transcript: 'hello', + audio_window_start: Math.max(0, audioWindowEnd - 0.5), + audio_window_end: audioWindowEnd, + words: [ + { + word: 'hello', + start: Math.max(0, audioWindowEnd - 0.5), + end: audioWindowEnd, + confidence: 0.9, + }, + ], + }); + + const finalEndTimes: number[] = []; + const consume = (async () => { + for await (const event of stream) { + if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT && event.alternatives?.[0]) { + finalEndTimes.push(event.alternatives[0].endTime); + } + } + })(); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + // Stream ~2s of 16 kHz mono audio so the first connection advances the timeline. + const oneSecond = () => new AudioFrame(new Int16Array(16_000), 16_000, 1, 16_000); + stream.pushFrame(oneSecond()); + stream.pushFrame(oneSecond()); + // 16 kHz * 2 bytes/sample * 1.5s = 48_000 bytes => at least 1.5s reached the socket. + await waitFor( + () => firstConnAudioBytes >= 48_000, + `expected ~2s of audio at the first connection, saw ${firstConnAudioBytes} bytes`, + ); + + // First turn ends 1.0s into the first connection's audio window. + connections[0]!.send(startOfTurn); + connections[0]!.send(endOfTurn(1.0)); + await waitFor(() => finalEndTimes.length === 1, 'expected first final transcript'); + + // Unexpected close -> base-class retry reconnects. + connections[0]!.close(1011, 'unexpected close'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + // The fresh Deepgram socket restarts audio_window near 0; this turn ends at + // 0.5 within the NEW connection's window. + connections[1]!.send(startOfTurn); + connections[1]!.send(endOfTurn(0.5)); + await waitFor(() => finalEndTimes.length === 2, 'expected second final transcript'); + + // Without timebase preservation the second final lands at ~0.5s — earlier than + // the first — and downstream "before answer audio" logic would drop it. + expect(finalEndTimes[1]!).toBeGreaterThan(finalEndTimes[0]!); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); }); const hasDeepgramApiKey = Boolean(process.env.DEEPGRAM_API_KEY); diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 6627b7164..48a1d62d3 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -221,6 +221,14 @@ class SpeechStreamv2 extends stt.SpeechStream { #requestId = ''; #speaking = false; + // Monotonic timestamp base across reconnects. Deepgram's audio_window restarts + // at 0 on every new socket, so each connection's window times are offset by the + // audio already streamed to prior connections (#sentAudioSec snapshotted at + // connect into #connectionTimeBaseSec). Without this, transcripts after a + // reconnect would be timestamped near the start of the session. + #sentAudioSec = 0; + #connectionTimeBaseSec = 0; + // Parity: _reconnect_event - using existing Event class from @livekit/agents #reconnectEvent = new Event(); @@ -282,6 +290,10 @@ class SpeechStreamv2 extends stt.SpeechStream { this.#ws.once('error', onError); }); + // Snapshot the timeline base for this connection: the fresh socket's + // audio_window starts at 0, so offset it by the audio already streamed. + this.#connectionTimeBaseSec = this.#sentAudioSec; + // 2. Run Concurrent Tasks (Send & Receive) const ws = this.#ws; if (!ws) throw new Error('WebSocket not initialized'); @@ -328,7 +340,10 @@ class SpeechStreamv2 extends stt.SpeechStream { const iterator = this.input[Symbol.asyncIterator](); const sendFrames = (frames: AudioFrame[]) => { for (const frame of frames) { - this.#audioDurationCollector.push(calculateAudioDurationSeconds(frame)); + const durationSec = calculateAudioDurationSeconds(frame); + this.#audioDurationCollector.push(durationSec); + // Track total audio consumed so reconnects can preserve the timeline. + this.#sentAudioSec += durationSec; if (ws.readyState === WebSocket.OPEN) { ws.send(frame.data); @@ -457,7 +472,11 @@ class SpeechStreamv2 extends stt.SpeechStream { } #sendTranscriptEvent(eventType: stt.SpeechEventType, data: Record) { - const alts = parseTranscription(this.#opts.language || 'en', data, this.startTimeOffset); + const alts = parseTranscription( + this.#opts.language || 'en', + data, + this.startTimeOffset + this.#connectionTimeBaseSec, + ); if (alts.length > 0) { this.queue.put({ From 3862af7902650aa72cb5d59004339cdfc5af6c88 Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Tue, 16 Jun 2026 22:15:45 -0700 Subject: [PATCH 5/7] docs(deepgram): note the sttNode timebase contract --- plugins/deepgram/src/stt_v2.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 48a1d62d3..6d6c3a0db 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -226,6 +226,10 @@ class SpeechStreamv2 extends stt.SpeechStream { // audio already streamed to prior connections (#sentAudioSec snapshotted at // connect into #connectionTimeBaseSec). Without this, transcripts after a // reconnect would be timestamped near the start of the session. + // + // The SDK sets startTimeOffset once at stream creation (voice/agent.ts sttNode) + // and relies on the plugin to keep audio_window continuous across its own + // reconnects ("linear timestamps across reconnections") — this preserves that. #sentAudioSec = 0; #connectionTimeBaseSec = 0; From a0f6d4168e738cc6e1de9b7c33125e53e4d4375a Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Wed, 17 Jun 2026 08:20:51 -0700 Subject: [PATCH 6/7] fix(deepgram): reset Flux turn state on reconnect --- plugins/deepgram/src/stt_v2.test.ts | 105 ++++++++++++++++++++++++++++ plugins/deepgram/src/stt_v2.ts | 34 ++++++--- 2 files changed, 131 insertions(+), 8 deletions(-) diff --git a/plugins/deepgram/src/stt_v2.test.ts b/plugins/deepgram/src/stt_v2.test.ts index 50c7aa35b..11b256b0d 100644 --- a/plugins/deepgram/src/stt_v2.test.ts +++ b/plugins/deepgram/src/stt_v2.test.ts @@ -273,6 +273,111 @@ describe('Deepgram STTv2 WebSocket recovery', () => { await closeWebSocketServer(wss); } }); + + it('emits start of speech after reconnecting during an active turn', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + const eventTypes: stt.SpeechEventType[] = []; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + const consume = (async () => { + for await (const event of stream) { + eventTypes.push(event.type); + } + })(); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + connections[0]!.send(startOfTurn); + await waitFor( + () => + eventTypes.filter((eventType) => eventType === stt.SpeechEventType.START_OF_SPEECH) + .length === 1, + 'expected first start of speech', + ); + + connections[0]!.close(1011, 'unexpected close during speech'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + connections[1]!.send(startOfTurn); + await waitFor( + () => + eventTypes.filter((eventType) => eventType === stt.SpeechEventType.START_OF_SPEECH) + .length === 2, + 'expected second start of speech after reconnect', + ); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); + + it('starts speech from a non-empty update after reconnecting when StartOfTurn is absent', async () => { + const { wss, endpointUrl } = await startWebSocketServer(); + const connections: WebSocket[] = []; + const stream = createStream(endpointUrl); + const events: Array<{ type: stt.SpeechEventType; text?: string }> = []; + + wss.on('connection', (ws) => { + connections.push(ws); + }); + + const consume = (async () => { + for await (const event of stream) { + events.push({ + type: event.type, + text: event.alternatives?.[0]?.text, + }); + } + })(); + + const startOfTurn = JSON.stringify({ type: 'TurnInfo', event: 'StartOfTurn', transcript: '' }); + const update = JSON.stringify({ + type: 'TurnInfo', + event: 'Update', + transcript: 'after reconnect', + audio_window_start: 0, + audio_window_end: 0.5, + words: [ + { word: 'after', start: 0, end: 0.25, confidence: 0.9 }, + { word: 'reconnect', start: 0.25, end: 0.5, confidence: 0.9 }, + ], + }); + + try { + await waitFor(() => connections.length === 1, 'expected initial WebSocket connection'); + + connections[0]!.send(startOfTurn); + await waitFor( + () => + events.filter((event) => event.type === stt.SpeechEventType.START_OF_SPEECH).length === 1, + 'expected first start of speech', + ); + + connections[0]!.close(1011, 'unexpected close during speech'); + await waitFor(() => connections.length === 2, 'expected reconnect after unexpected close'); + + connections[1]!.send(update); + await waitFor( + () => + events.filter((event) => event.type === stt.SpeechEventType.START_OF_SPEECH).length === + 2 && events.some((event) => event.text === 'after reconnect'), + 'expected synthesized start of speech and update transcript after reconnect', + ); + } finally { + stream.close(); + await consume.catch(() => {}); + await closeWebSocketServer(wss); + } + }); }); const hasDeepgramApiKey = Boolean(process.env.DEEPGRAM_API_KEY); diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index 6d6c3a0db..c67e3027a 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -297,6 +297,9 @@ class SpeechStreamv2 extends stt.SpeechStream { // Snapshot the timeline base for this connection: the fresh socket's // audio_window starts at 0, so offset it by the audio already streamed. this.#connectionTimeBaseSec = this.#sentAudioSec; + // Provider turn state is scoped to one WebSocket. A reconnect during an + // active turn must not make the next connection suppress StartOfTurn. + this.#speaking = false; // 2. Run Concurrent Tasks (Send & Receive) const ws = this.#ws; @@ -442,23 +445,20 @@ class SpeechStreamv2 extends stt.SpeechStream { if (eventType === 'StartOfTurn') { if (this.#speaking) return; - this.#speaking = true; - this.queue.put({ - type: stt.SpeechEventType.START_OF_SPEECH, - requestId: this.#requestId, - }); + this.#startSpeech(); this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'Update') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'EagerEndOfTurn') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#sendTranscriptEvent(stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, data); } else if (eventType === 'TurnResumed') { + if (!this.#speaking) this.#startSpeechFromTranscript(data); this.#sendTranscriptEvent(stt.SpeechEventType.INTERIM_TRANSCRIPT, data); } else if (eventType === 'EndOfTurn') { - if (!this.#speaking) return; + if (!this.#speaking && !this.#startSpeechFromTranscript(data)) return; this.#speaking = false; this.#sendTranscriptEvent(stt.SpeechEventType.FINAL_TRANSCRIPT, data); @@ -475,6 +475,24 @@ class SpeechStreamv2 extends stt.SpeechStream { } } + #startSpeech() { + this.#speaking = true; + this.queue.put({ + type: stt.SpeechEventType.START_OF_SPEECH, + requestId: this.#requestId, + }); + } + + #startSpeechFromTranscript(data: Record) { + const transcript = data.transcript; + if (typeof transcript !== 'string' || transcript.trim().length === 0) { + return false; + } + + this.#startSpeech(); + return true; + } + #sendTranscriptEvent(eventType: stt.SpeechEventType, data: Record) { const alts = parseTranscription( this.#opts.language || 'en', From 549371252d0a179e1f1f56fc9617d0739da1c6f1 Mon Sep 17 00:00:00 2001 From: Myk Melez Date: Wed, 17 Jun 2026 11:24:33 -0700 Subject: [PATCH 7/7] fix(deepgram): align Flux time unit names --- plugins/deepgram/src/stt_v2.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/plugins/deepgram/src/stt_v2.ts b/plugins/deepgram/src/stt_v2.ts index c67e3027a..9ce781bde 100644 --- a/plugins/deepgram/src/stt_v2.ts +++ b/plugins/deepgram/src/stt_v2.ts @@ -223,15 +223,15 @@ class SpeechStreamv2 extends stt.SpeechStream { // Monotonic timestamp base across reconnects. Deepgram's audio_window restarts // at 0 on every new socket, so each connection's window times are offset by the - // audio already streamed to prior connections (#sentAudioSec snapshotted at - // connect into #connectionTimeBaseSec). Without this, transcripts after a + // audio already streamed to prior connections (#sentAudioInS snapshotted at + // connect into #connectionTimeBaseInS). Without this, transcripts after a // reconnect would be timestamped near the start of the session. // // The SDK sets startTimeOffset once at stream creation (voice/agent.ts sttNode) // and relies on the plugin to keep audio_window continuous across its own // reconnects ("linear timestamps across reconnections") — this preserves that. - #sentAudioSec = 0; - #connectionTimeBaseSec = 0; + #sentAudioInS = 0; + #connectionTimeBaseInS = 0; // Parity: _reconnect_event - using existing Event class from @livekit/agents #reconnectEvent = new Event(); @@ -296,7 +296,7 @@ class SpeechStreamv2 extends stt.SpeechStream { // Snapshot the timeline base for this connection: the fresh socket's // audio_window starts at 0, so offset it by the audio already streamed. - this.#connectionTimeBaseSec = this.#sentAudioSec; + this.#connectionTimeBaseInS = this.#sentAudioInS; // Provider turn state is scoped to one WebSocket. A reconnect during an // active turn must not make the next connection suppress StartOfTurn. this.#speaking = false; @@ -347,10 +347,10 @@ class SpeechStreamv2 extends stt.SpeechStream { const iterator = this.input[Symbol.asyncIterator](); const sendFrames = (frames: AudioFrame[]) => { for (const frame of frames) { - const durationSec = calculateAudioDurationSeconds(frame); - this.#audioDurationCollector.push(durationSec); + const durationInS = calculateAudioDurationSeconds(frame); + this.#audioDurationCollector.push(durationInS); // Track total audio consumed so reconnects can preserve the timeline. - this.#sentAudioSec += durationSec; + this.#sentAudioInS += durationInS; if (ws.readyState === WebSocket.OPEN) { ws.send(frame.data); @@ -497,7 +497,7 @@ class SpeechStreamv2 extends stt.SpeechStream { const alts = parseTranscription( this.#opts.language || 'en', data, - this.startTimeOffset + this.#connectionTimeBaseSec, + this.startTimeOffset + this.#connectionTimeBaseInS, ); if (alts.length > 0) {