-
Notifications
You must be signed in to change notification settings - Fork 386
Expand file tree
/
Copy pathaudio_stream.py
More file actions
152 lines (135 loc) · 4.69 KB
/
audio_stream.py
File metadata and controls
152 lines (135 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# audio_stream.py
# -*- coding: utf-8 -*-
import asyncio
from dataclasses import dataclass
from typing import Optional, Set, List, Tuple, Any, Dict
from fastapi import Request
from fastapi.responses import StreamingResponse
# ===== 下行 WAV 流基础参数 =====
STREAM_SR = 8000 # 改为8kHz,ESP32支持
STREAM_CH = 1
STREAM_SW = 2
BYTES_PER_20MS_16K = STREAM_SR * STREAM_SW * 20 // 1000 # 320B (8kHz)
# ===== AI 播放任务总闸 =====
current_ai_task: Optional[asyncio.Task] = None
async def cancel_current_ai():
"""取消当前大模型语音任务,并等待其退出。"""
global current_ai_task
task = current_ai_task
current_ai_task = None
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
pass
def is_playing_now() -> bool:
t = current_ai_task
return (t is not None) and (not t.done())
# ===== /stream.wav 连接管理 =====
@dataclass(frozen=True)
class StreamClient:
q: asyncio.Queue
abort_event: asyncio.Event
stream_clients: "Set[StreamClient]" = set()
STREAM_QUEUE_MAX = 96 # 小缓冲,避免积压
def _wav_header_unknown_size(sr=16000, ch=1, sw=2) -> bytes:
import struct
byte_rate = sr * ch * sw
block_align = ch * sw
data_size = 0x7FFFFFF0
riff_size = 36 + data_size
return struct.pack(
"<4sI4s4sIHHIIHH4sI",
b"RIFF", riff_size, b"WAVE",
b"fmt ", 16,
1, ch, sr, byte_rate, block_align, sw * 8,
b"data", data_size
)
async def hard_reset_audio(reason: str = ""):
"""
**一键清场**:丢弃所有播放器连接(abort_event置位)+ 取消当前AI任务。
这样旧的音频不会再有任何去处,也没有任何任务继续产出。
"""
# 1) 断开所有正在播放的 HTTP 连接
for sc in list(stream_clients):
try:
sc.abort_event.set()
except Exception:
pass
stream_clients.clear()
# 2) 取消当前AI任务
await cancel_current_ai()
# 3) 日志
if reason:
print(f"[HARD-RESET] {reason}")
async def broadcast_pcm16_realtime(pcm16: bytes):
"""以 20ms 节拍把 pcm16 发送给所有仍存活的连接;队列满丢尾,保持实时。"""
# 【新增】录制音频(在分发之前整体录制,避免分片)
try:
import sync_recorder
sync_recorder.record_audio(pcm16, text="[Omni对话]")
except Exception:
pass # 静默失败,不影响播放
loop = asyncio.get_event_loop()
next_tick = loop.time()
off = 0
while off < len(pcm16):
take = min(BYTES_PER_20MS_16K, len(pcm16) - off)
piece = pcm16[off:off + take]
dead: List[StreamClient] = []
for sc in list(stream_clients):
if sc.abort_event.is_set():
dead.append(sc)
continue
try:
if sc.q.full():
try: sc.q.get_nowait()
except Exception: pass
sc.q.put_nowait(piece)
except Exception:
dead.append(sc)
for sc in dead:
try: stream_clients.discard(sc)
except Exception: pass
next_tick += 0.020
now = loop.time()
if now < next_tick:
await asyncio.sleep(next_tick - now)
else:
next_tick = now
off += take
# ===== FastAPI 路由注册器 =====
def register_stream_route(app):
@app.get("/stream.wav")
async def stream_wav(_: Request):
# —— 强制单连接(或少数连接),先拉闸所有旧连接 ——
for sc in list(stream_clients):
try: sc.abort_event.set()
except Exception: pass
stream_clients.clear()
q: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=STREAM_QUEUE_MAX)
abort_event = asyncio.Event()
sc = StreamClient(q=q, abort_event=abort_event)
stream_clients.add(sc)
async def gen():
yield _wav_header_unknown_size(STREAM_SR, STREAM_CH, STREAM_SW)
try:
while True:
if abort_event.is_set():
break
try:
chunk = await asyncio.wait_for(q.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
if abort_event.is_set():
break
if chunk is None:
break
if chunk:
yield chunk
finally:
stream_clients.discard(sc)
return StreamingResponse(gen(), media_type="audio/wav")