forked from AI-FanGe/OpenAIglasses_for_Navigation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasr_core.py
More file actions
204 lines (180 loc) · 6.76 KB
/
asr_core.py
File metadata and controls
204 lines (180 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# asr_core.py
# -*- coding: utf-8 -*-
import os, json, asyncio
from typing import Any, Dict, List, Optional, Callable, Tuple
ASR_DEBUG_RAW = os.getenv("ASR_DEBUG_RAW", "0") == "1"
def _shorten(s: str, limit: int = 200) -> str:
if not s:
return ""
return s if len(s) <= limit else (s[:limit] + "…")
def _safe_to_dict(x: Any) -> Dict[str, Any]:
if isinstance(x, dict): return x
for attr in ("to_dict", "model_dump", "__dict__"):
try:
v = getattr(x, attr, None)
except Exception:
v = None
if callable(v):
try:
d = v()
if isinstance(d, dict): return d
except Exception:
pass
elif isinstance(v, dict):
return v
try:
s = str(x)
if s and s.lstrip().startswith("{") and s.rstrip().endswith("}"):
return json.loads(s)
except Exception:
pass
return {"_raw": str(x)}
def _extract_sentence(event_obj: Any) -> Tuple[Optional[str], Optional[bool]]:
d = _safe_to_dict(event_obj)
cands: List[Dict[str, Any]] = [d]
for k in ("output", "data", "result"):
v = d.get(k)
if isinstance(v, dict):
cands.append(v)
for obj in cands:
sent = obj.get("sentence")
if isinstance(sent, dict):
text = sent.get("text")
is_end = sent.get("sentence_end")
if is_end is not None:
is_end = bool(is_end)
return text, is_end
for obj in cands:
if "text" in obj and isinstance(obj.get("text"), str):
return obj.get("text"), None
return None, None
# ====== 仅热词触发的“全清零复位”配置 ======
INTERRUPT_KEYWORDS = set(
os.getenv("INTERRUPT_KEYWORDS", "停下,别说了,停止").split(",")
)
def _normalize_cn(s: str) -> str:
try:
import unicodedata
s = "".join(" " if unicodedata.category(ch) == "Zs" else ch for ch in s)
s = s.strip().lower()
except Exception:
s = (s or "").strip().lower()
return s
# ============ ASR 全局总闸 ============
_current_recognition: Optional[object] = None
_rec_lock = asyncio.Lock()
async def set_current_recognition(r):
global _current_recognition
async with _rec_lock:
_current_recognition = r
async def stop_current_recognition():
global _current_recognition
async with _rec_lock:
r = _current_recognition
_current_recognition = None
if r:
try:
r.stop() # DashScope SDK 的实时识别停止
except Exception:
pass
# ============ ASR 回调 ============
class ASRCallback:
"""
设计目标:
1) “停下 / 别说了 …”等热词一出现 → 立刻全清零复位(恢复到刚启动后的状态)。
2) 除此之外【不接受打断】;AI 正在播报时,用户说话只做展示,不触发新一轮。
3) 不再用 partial 叠加字符串;partial 只用于 UI 临时展示;只有 final sentence 用于驱动 AI。
"""
def __init__(
self,
on_sdk_error: Callable[[str], None],
post: Callable[[asyncio.Future], None],
ui_broadcast_partial,
ui_broadcast_final,
is_playing_now_fn: Callable[[], bool],
start_ai_with_text_fn, # async (text)
full_system_reset_fn, # async (reason)
interrupt_lock: asyncio.Lock,
):
self._on_sdk_error = on_sdk_error
self._post = post
self._last_partial_for_ui: str = "" # 只用于 UI 展示
self._last_final_text: str = "" # 以句末 final 为准
self._hot_interrupted: bool = False # 本句是否因热词触发过复位(防抖)
self._ui_partial = ui_broadcast_partial
self._ui_final = ui_broadcast_final
self._is_playing = is_playing_now_fn
self._start_ai = start_ai_with_text_fn
self._full_reset = full_system_reset_fn
self._interrupt_lock = interrupt_lock
def on_open(self): pass
def on_close(self): pass
def on_complete(self): pass
def on_error(self, err):
try:
self._post(self._ui_partial(""))
self._on_sdk_error(str(err))
except Exception:
pass
def on_result(self, result): self._handle(result)
def on_event(self, event): self._handle(event)
def _has_hotword(self, text: str) -> bool:
t = _normalize_cn(text)
if not t: return False
for w in INTERRUPT_KEYWORDS:
if w and _normalize_cn(w) in t:
return True
return False
def _handle(self, event: Any):
if ASR_DEBUG_RAW:
try:
rawd = _safe_to_dict(event)
print("[ASR EVENT RAW]", json.dumps(rawd, ensure_ascii=False), flush=True)
except Exception:
pass
text, is_end = _extract_sentence(event)
if text is None:
return
text = text.strip()
if not text:
return
# ---------- ① 热词优先:命中就全清零并短路,绝不送 LLM ----------
if not self._hot_interrupted and self._has_hotword(text):
self._hot_interrupted = True
async def _hot_reset():
async with self._interrupt_lock:
print(f"[ASR HOTWORD] '{text}' -> FULL RESET, skip LLM", flush=True)
await self._full_reset("Hotword interrupt")
try:
self._post(_hot_reset())
except Exception:
pass
return
# ---------- ② partial:仅用于 UI 展示 ----------
self._last_partial_for_ui = text
try:
print(f"[ASR PARTIAL] len={len(text)} text='{_shorten(text)}'", flush=True)
self._post(self._ui_partial(self._last_partial_for_ui))
except Exception:
pass
# ---------- ③ final:仅 final 驱动 LLM(若未在播报) ----------
if is_end is True:
final_text = text
try:
print(f"[ASR FINAL] len={len(final_text)} text='{final_text}'", flush=True)
self._post(self._ui_final(final_text))
except Exception:
pass
if (not self._is_playing()) and final_text:
async def _run_final():
async with self._interrupt_lock:
print(f"[LLM INPUT TEXT] {final_text}", flush=True)
await self._start_ai(final_text)
try:
self._post(_run_final())
except Exception:
pass
# 复位进入下一句
self._last_partial_for_ui = ""
self._last_final_text = ""
self._hot_interrupted = False