-
-
Notifications
You must be signed in to change notification settings - Fork 447
feat(voice): backend voice picker + advanced controls behind disclosure (#742) #743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f82269c
58ebf8e
4a5220e
a0b9cad
edb1f97
5cda364
76dee70
4c0660b
485d349
9030d4b
20ba4bc
34faec2
ad3b853
ff3a6e6
93bb896
c463fbc
392acf3
258c851
35b4a6c
2a1a4e7
dcbdef3
fb2bbf2
2143b4c
1a334a7
a4e8b25
296bcef
08e72af
1d082d0
97935c6
6036734
abe1ef5
d24d993
238ad4c
af72bb5
97fe628
c841ae3
daa838e
cbfbd5b
838527c
f3ab0f3
8b6f013
711c027
a2a6e4c
adf3c30
7d692a3
febb6b2
a9dd039
9d4a5f5
5e2f61b
7a7d2c8
38197cd
45145d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /** Decode ?systemPrompt= from Gemini hub proxy (base64url, UTF-8). */ | ||
| export function decodeVoiceSystemPromptParam(param: string | null | undefined): string | undefined { | ||
| if (!param?.trim()) return undefined | ||
| try { | ||
| const normalized = param.replace(/-/g, '+').replace(/_/g, '/') | ||
| const pad = '='.repeat((4 - (normalized.length % 4)) % 4) | ||
| const decoded = Buffer.from(normalized + pad, 'base64').toString('utf8') | ||
| if (!decoded.trim() || decoded.length > 48_000) return undefined | ||
| return decoded | ||
| } catch { | ||
| return undefined | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| import { describe, expect, test, beforeEach, afterEach } from 'bun:test' | ||
| import type { ServerWebSocket } from 'bun' | ||
| import { createQwenProxyWebSocketHandler, type WebSocketLike } from './qwenProxyHandler' | ||
|
|
||
| const WS_OPEN = 1 | ||
|
|
||
| class FakeUpstream implements WebSocketLike { | ||
| readyState = WS_OPEN | ||
| sent: Array<string | ArrayBuffer | Uint8Array> = [] | ||
| closed = false | ||
| onmessage: ((event: { data: string | ArrayBuffer | Uint8Array }) => void) | null = null | ||
| onerror: ((event: unknown) => void) | null = null | ||
| onclose: ((event: { code: number; reason: string }) => void) | null = null | ||
|
|
||
| constructor(public url: string, public opts?: unknown) {} | ||
|
|
||
| send(data: string | ArrayBuffer | Uint8Array) { | ||
| this.sent.push(data) | ||
| } | ||
|
|
||
| close(code = 1000, reason = '') { | ||
| this.closed = true | ||
| this.readyState = 3 | ||
| this.onclose?.({ code, reason }) | ||
| } | ||
|
|
||
| deliver(payload: object | string) { | ||
| const text = typeof payload === 'string' ? payload : JSON.stringify(payload) | ||
| this.onmessage?.({ data: text }) | ||
| } | ||
| } | ||
|
|
||
| class FakeClient { | ||
| readyState = WS_OPEN | ||
| sent: string[] = [] | ||
| closeCode: number | null = null | ||
| closeReason: string | null = null | ||
| data: object | ||
|
|
||
| constructor(data: object) { | ||
| this.data = data | ||
| } | ||
|
|
||
| send(payload: string | ArrayBuffer | Uint8Array) { | ||
| this.sent.push(typeof payload === 'string' ? payload : new TextDecoder().decode(payload as Uint8Array)) | ||
| } | ||
|
|
||
| close(code?: number, reason?: string) { | ||
| this.closeCode = code ?? null | ||
| this.closeReason = reason ?? null | ||
| this.readyState = 3 | ||
| } | ||
| } | ||
|
|
||
| let lastUpstream: FakeUpstream | null = null | ||
| const FakeWebSocket = function FakeWebSocket(url: string, opts?: unknown) { | ||
| const u = new FakeUpstream(url, opts) | ||
| lastUpstream = u | ||
| return u | ||
| } as unknown as new (url: string, opts?: unknown) => WebSocketLike | ||
|
|
||
| beforeEach(() => { | ||
| lastUpstream = null | ||
| }) | ||
|
|
||
| afterEach(() => { | ||
| lastUpstream = null | ||
| }) | ||
|
|
||
| function newClient() { | ||
| return new FakeClient({ apiKey: 'k', model: 'qwen3-omni-flash-realtime', language: 'en', voiceName: 'Cherry' }) as unknown as ServerWebSocket<unknown> & FakeClient | ||
| } | ||
|
|
||
| describe('createQwenProxyWebSocketHandler ack-gate', () => { | ||
| test('queues client frames until upstream acks hub-owned session.update with session.updated', () => { | ||
| const handler = createQwenProxyWebSocketHandler(FakeWebSocket) | ||
| const client = newClient() | ||
|
|
||
| handler.open(client) | ||
| const upstream = lastUpstream! | ||
| expect(upstream.sent).toHaveLength(0) | ||
|
|
||
| upstream.deliver({ type: 'session.created' }) | ||
| expect(upstream.sent).toHaveLength(1) | ||
| const hubSetup = JSON.parse(upstream.sent[0] as string) as { type: string; session: { instructions: string } } | ||
| expect(hubSetup.type).toBe('session.update') | ||
| expect(typeof hubSetup.session.instructions).toBe('string') | ||
|
|
||
| handler.message(client, JSON.stringify({ type: 'response.create' })) | ||
| handler.message(client, JSON.stringify({ type: 'conversation.item.create', item: { type: 'message' } })) | ||
| handler.message(client, JSON.stringify({ type: 'session.update', session: { instructions: 'updated' } })) | ||
|
|
||
| expect(upstream.sent).toHaveLength(1) | ||
|
|
||
| upstream.deliver({ type: 'session.updated', session: { instructions: hubSetup.session.instructions } }) | ||
|
|
||
| expect(upstream.sent.length).toBeGreaterThanOrEqual(4) | ||
| const flushedTypes = upstream.sent.slice(1).map(raw => { | ||
| try { return (JSON.parse(raw as string) as { type?: string }).type } | ||
| catch { return undefined } | ||
| }) | ||
| expect(flushedTypes).toEqual(['response.create', 'conversation.item.create', 'session.update']) | ||
| }) | ||
|
|
||
| test('forwards client frames immediately after the gate has flipped', () => { | ||
| const handler = createQwenProxyWebSocketHandler(FakeWebSocket) | ||
| const client = newClient() | ||
|
|
||
| handler.open(client) | ||
| const upstream = lastUpstream! | ||
|
|
||
| upstream.deliver({ type: 'session.created' }) | ||
| upstream.deliver({ type: 'session.updated', session: {} }) | ||
| const sentBefore = upstream.sent.length | ||
|
|
||
| handler.message(client, JSON.stringify({ type: 'input_audio_buffer.append', audio: 'abc' })) | ||
| expect(upstream.sent).toHaveLength(sentBefore + 1) | ||
| }) | ||
|
|
||
| test('rejects client frames that fail the safe-frame allowlist with 1008', () => { | ||
| const handler = createQwenProxyWebSocketHandler(FakeWebSocket) | ||
| const client = newClient() | ||
|
|
||
| handler.open(client) | ||
| const upstream = lastUpstream! | ||
| upstream.deliver({ type: 'session.created' }) | ||
| upstream.deliver({ type: 'session.updated', session: {} }) | ||
|
|
||
| handler.message(client, JSON.stringify({ | ||
| type: 'session.update', | ||
| session: { tools: [{ type: 'function', name: 'evil' }] } | ||
| })) | ||
|
|
||
| expect(client.closeCode).toBe(1008) | ||
| expect(client.closeReason).toContain('instructions') | ||
| }) | ||
|
|
||
| test('clears queued frames on close so no stale data leaks if a new client reuses memory', () => { | ||
| const handler = createQwenProxyWebSocketHandler(FakeWebSocket) | ||
| const client = newClient() | ||
|
|
||
| handler.open(client) | ||
| const upstream = lastUpstream! | ||
| upstream.deliver({ type: 'session.created' }) | ||
| handler.message(client, JSON.stringify({ type: 'response.create' })) | ||
|
|
||
| handler.close(client, 1000, 'bye') | ||
| expect(upstream.closed).toBe(true) | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| // Qwen Realtime WebSocket proxy factory — extracted from server.ts so the ack-gate | ||
| // behaviour (queue client frames until DashScope acknowledges the hub-owned session.update) | ||
| // can be unit-tested without spinning up Hono + Bun.serve. | ||
| // | ||
| // The factory accepts an optional WebSocket constructor injection so tests can substitute | ||
| // a deterministic fake for the upstream connection. | ||
|
|
||
| import type { ServerWebSocket } from 'bun' | ||
| import { buildQwenSessionUpdateMessage, isQwenSafeClientFrame } from '@hapi/protocol/voice' | ||
|
|
||
| type WebSocketCtor = new (url: string, opts?: unknown) => WebSocketLike | ||
|
|
||
| export interface WebSocketLike { | ||
| readyState: number | ||
| send(data: string | ArrayBuffer | Uint8Array): void | ||
| close(code?: number, reason?: string): void | ||
| onmessage: ((event: { data: string | ArrayBuffer | Uint8Array }) => void) | null | ||
| onerror: ((event: unknown) => void) | null | ||
| onclose: ((event: { code: number; reason: string }) => void) | null | ||
| } | ||
|
|
||
| export interface QwenProxyHandler { | ||
| open(clientWs: ServerWebSocket<unknown>): void | ||
| message(clientWs: ServerWebSocket<unknown>, message: string | ArrayBuffer | Uint8Array): void | ||
| close(clientWs: ServerWebSocket<unknown>, code: number, reason: string): void | ||
| } | ||
|
|
||
| const QWEN_WS_BASE = 'wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime' | ||
| const WS_OPEN = 1 | ||
|
|
||
| function toClientCloseCode(code: number): number { | ||
| return code >= 1000 && code <= 4999 && code !== 1005 && code !== 1006 && code !== 1015 | ||
| ? code | ||
| : 1011 | ||
| } | ||
|
|
||
| export function createQwenProxyWebSocketHandler( | ||
| WebSocketImpl: WebSocketCtor = WebSocket as unknown as WebSocketCtor | ||
| ): QwenProxyHandler { | ||
| const upstreamMap = new WeakMap<ServerWebSocket<unknown>, WebSocketLike>() | ||
| // Holds the hub-owned session.update payload until session.created arrives from DashScope. | ||
| // Sending session.update before session.created violates the Qwen Realtime protocol ordering. | ||
| const pendingSetupMap = new WeakMap<ServerWebSocket<unknown>, string>() | ||
| // Tracks whether DashScope has acknowledged the hub-owned session.update with a session.updated | ||
| // frame. Until the ack arrives, client frames are queued, never forwarded - otherwise an | ||
| // authenticated client could push response.create / conversation.item.create / instruction-only | ||
| // session.update before HAPI's tools/voice/instructions are locked into the upstream session. | ||
| const setupAckedMap = new WeakMap<ServerWebSocket<unknown>, boolean>() | ||
| const pendingClientFrames = new WeakMap<ServerWebSocket<unknown>, Array<string | ArrayBuffer | Uint8Array>>() | ||
|
|
||
| return { | ||
| open(clientWs) { | ||
| const data = clientWs.data as { apiKey: string; model: string; language?: string; voiceName?: string } | ||
| const upstreamUrl = `${process.env.QWEN_REALTIME_WS_URL || QWEN_WS_BASE}?model=${encodeURIComponent(data.model)}` | ||
|
|
||
| const upstream = new WebSocketImpl(upstreamUrl, { | ||
| headers: { 'Authorization': `Bearer ${data.apiKey}` } | ||
| }) | ||
|
|
||
| upstreamMap.set(clientWs, upstream) | ||
| pendingSetupMap.set(clientWs, JSON.stringify(buildQwenSessionUpdateMessage(data.language, data.voiceName))) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Major] Pass the decoded prompt into Qwen's hub-owned setup The server decodes Suggested fix: const data = clientWs.data as {
apiKey: string
model: string
language?: string
voiceName?: string
systemInstruction?: string
}
pendingSetupMap.set(clientWs, JSON.stringify(buildQwenSessionUpdateMessage(
data.language,
data.voiceName,
data.systemInstruction
)))There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Major] Pass the decoded prompt into Qwen's hub-owned setup The server decodes Suggested fix: const data = clientWs.data as {
apiKey: string
model: string
language?: string
voiceName?: string
systemInstruction?: string
}
pendingSetupMap.set(clientWs, JSON.stringify(buildQwenSessionUpdateMessage(
data.language,
data.voiceName,
data.systemInstruction
)))There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Major] Pass the decoded prompt into Qwen setup The server decodes Suggested fix: const data = clientWs.data as {
apiKey: string
model: string
language?: string
voiceName?: string
systemInstruction?: string
}
pendingSetupMap.set(clientWs, JSON.stringify(buildQwenSessionUpdateMessage(
data.language,
data.voiceName,
data.systemInstruction
)))There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Major] Pass the decoded prompt into Qwen setup The server decodes Suggested fix: const data = clientWs.data as {
apiKey: string
model: string
language?: string
voiceName?: string
systemInstruction?: string
}
pendingSetupMap.set(clientWs, JSON.stringify(buildQwenSessionUpdateMessage(
data.language,
data.voiceName,
data.systemInstruction
))) |
||
| setupAckedMap.set(clientWs, false) | ||
|
|
||
| upstream.onmessage = (event) => { | ||
| const raw = event.data | ||
| const text = typeof raw === 'string' | ||
| ? raw | ||
| : new TextDecoder().decode(raw instanceof Uint8Array ? raw : new Uint8Array(raw as ArrayBuffer)) | ||
|
|
||
| const pendingSetup = pendingSetupMap.get(clientWs) | ||
| if (pendingSetup) { | ||
| try { | ||
| const parsed = JSON.parse(text) as { type?: string } | ||
| if (parsed.type === 'session.created') { | ||
| pendingSetupMap.delete(clientWs) | ||
| try { if (clientWs.readyState === 1) clientWs.send(text) } catch { /* client gone */ } | ||
| upstream.send(pendingSetup) | ||
| return | ||
| } | ||
| } catch { /* not JSON */ } | ||
| } | ||
|
|
||
| // Once the upstream acks the hub-owned setup with session.updated, flip the gate | ||
| // and flush any client frames the proxy held while tools/voice/instructions were | ||
| // still being installed. | ||
| if (setupAckedMap.get(clientWs) === false) { | ||
| try { | ||
| const parsed = JSON.parse(text) as { type?: string } | ||
| if (parsed.type === 'session.updated') { | ||
| setupAckedMap.set(clientWs, true) | ||
| const queued = pendingClientFrames.get(clientWs) ?? [] | ||
| pendingClientFrames.delete(clientWs) | ||
| for (const frame of queued) { | ||
| try { upstream.send(frame) } catch { /* upstream gone */ } | ||
| } | ||
| } | ||
| } catch { /* not JSON */ } | ||
| } | ||
|
|
||
| try { | ||
| if (clientWs.readyState === 1) { | ||
| clientWs.send(typeof raw === 'string' ? raw : new Uint8Array(raw as ArrayBuffer)) | ||
| } | ||
| } catch { /* client gone */ } | ||
| } | ||
| upstream.onerror = () => { | ||
| pendingSetupMap.delete(clientWs) | ||
| setupAckedMap.delete(clientWs) | ||
| pendingClientFrames.delete(clientWs) | ||
| upstreamMap.delete(clientWs) | ||
| try { clientWs.close(1011, 'Upstream error') } catch { /* */ } | ||
| } | ||
| upstream.onclose = (event) => { | ||
| pendingSetupMap.delete(clientWs) | ||
| setupAckedMap.delete(clientWs) | ||
| pendingClientFrames.delete(clientWs) | ||
| try { clientWs.close(toClientCloseCode(event.code), event.reason || 'Upstream closed') } catch { /* */ } | ||
| upstreamMap.delete(clientWs) | ||
| } | ||
| }, | ||
| message(clientWs, message) { | ||
| if (!isQwenSafeClientFrame(message)) { | ||
| try { clientWs.close(1008, 'Client session.update may only modify instructions') } catch { /* */ } | ||
| return | ||
| } | ||
| const upstream = upstreamMap.get(clientWs) | ||
| if (upstream?.readyState !== WS_OPEN) return | ||
|
|
||
| // Hold client frames until DashScope has acknowledged the hub-owned session.update. | ||
| // Without this gate, the client could race response.create / conversation.item.create / | ||
| // instruction-only session.update past the lockdown of tools/voice/instructions and run | ||
| // the upstream session under the provider default config or partially-applied state. | ||
| if (setupAckedMap.get(clientWs) !== true) { | ||
| const queue = pendingClientFrames.get(clientWs) ?? [] | ||
| queue.push(message) | ||
| pendingClientFrames.set(clientWs, queue) | ||
| return | ||
| } | ||
| upstream.send(message) | ||
| }, | ||
| close(clientWs, code, reason) { | ||
| pendingSetupMap.delete(clientWs) | ||
| setupAckedMap.delete(clientWs) | ||
| pendingClientFrames.delete(clientWs) | ||
| const upstream = upstreamMap.get(clientWs) | ||
| if (upstream) { | ||
| try { upstream.close(toClientCloseCode(code), (reason || 'Client closed').slice(0, 123)) } catch { /* */ } | ||
| upstreamMap.delete(clientWs) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Major] Pass the decoded prompt into Qwen's hub-owned setup
The server decodes
?systemPrompt=for/api/voice/qwen-wsand stores it onws.data, and the Qwen client sends that param for the advanced voice prompt. This handler drops it when building the initialsession.update, so Qwen ignores the user-edited identity/character/delivery prompt while Gemini applies it.Suggested fix: