Skip to content

Commit e6c1164

Browse files
2ndelementclaude
andauthored
perf(QQ Official API): improve streaming message delivery reliability and proactive media sending (AstrBotDevs#6131)
* fix(qqofficial): fix streaming message delivery for C2C * fix(qqofficial): rewrite send_streaming for C2C vs non-C2C split * fix(qqofficial): add _extract_response_message_id for safe id extraction * fix(qqofficial): flush stream segment on tool-call break signal * fix(qqofficial): downgrade rich-media to non-stream send in C2C * fix(qqofficial): auto-append \n to final stream chunk (state=10) * fix(qqofficial): propagate stream param to all _send_with_markdown_fallback call sites * fix(qqofficial): retry on STREAM_MARKDOWN_NEWLINE_ERROR with newline fix * fix(qqofficial): handle None/non-dict response in post_c2c_message gracefully * fix(qqofficial): remove msg_id from video/file media payloads in send_by_session QQ API rejects msg_id on proactive media (video/file, msg_type=7) messages sent via the tool-call path, returning "请求参数msg_id无效或越权". The msg_id passive-reply credential is consumed by the first send and cannot be reused for subsequent media uploads in the same session. Remove msg_id from the payload after setting msg_type=7 for video and file sends, for both FRIEND_MESSAGE (C2C) and GROUP_MESSAGE paths. * fix(qqofficial): replace deprecated get_event_loop() with get_running_loop() asyncio.get_event_loop() is deprecated since Python 3.10 and raises a DeprecationWarning (or errors) when called from inside a running coroutine without a current event loop set on the thread. Replace both call-sites in the streaming throttle logic with asyncio.get_running_loop(), which is the correct API to use inside an already-running async context. Co-Authored-By: Claude Sonnet <noreply@anthropic.com> --------- Co-authored-by: 2ndelement <2ndelement@users.noreply.github.com> Co-authored-by: Claude Sonnet <noreply@anthropic.com>
1 parent 89cc8a1 commit e6c1164

2 files changed

Lines changed: 132 additions & 22 deletions

File tree

astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py

Lines changed: 126 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class QQOfficialMessageEvent(AstrMessageEvent):
5151
VIDEO_FILE_TYPE = 2
5252
VOICE_FILE_TYPE = 3
5353
FILE_FILE_TYPE = 4
54+
STREAM_MARKDOWN_NEWLINE_ERROR = "流式消息md分片需要\\n结束"
5455

5556
def __init__(
5657
self,
@@ -69,45 +70,94 @@ async def send(self, message: MessageChain) -> None:
6970
await self._post_send()
7071

7172
async def send_streaming(self, generator, use_fallback: bool = False):
72-
"""流式输出仅支持消息列表私聊"""
73+
"""流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送"""
74+
# 先标记事件层“已执行发送操作”,避免异常路径遗漏
75+
await super().send_streaming(generator, use_fallback)
76+
# QQ C2C 流式协议:开始/中间分片使用 state=1,结束分片使用 state=10
7377
stream_payload = {"state": 1, "id": None, "index": 0, "reset": False}
74-
last_edit_time = 0 # 上次编辑消息的时间
75-
throttle_interval = 1 # 编辑消息的间隔时间 (秒)
78+
last_edit_time = 0 # 上次发送分片的时间
79+
throttle_interval = 1 # 分片间最短间隔 (秒)
7680
ret = None
81+
source = (
82+
self.message_obj.raw_message
83+
) # 提前获取,避免 generator 为空时 NameError
7784
try:
7885
async for chain in generator:
7986
source = self.message_obj.raw_message
87+
88+
if not isinstance(source, botpy.message.C2CMessage):
89+
# 非 C2C 场景:直接累积,最后统一发
90+
if not self.send_buffer:
91+
self.send_buffer = chain
92+
else:
93+
self.send_buffer.chain.extend(chain.chain)
94+
continue
95+
96+
# ---- C2C 流式场景 ----
97+
98+
# tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段
99+
if chain.type == "break":
100+
if self.send_buffer:
101+
stream_payload["state"] = 10
102+
ret = await self._post_send(stream=stream_payload)
103+
ret_id = self._extract_response_message_id(ret)
104+
if ret_id is not None:
105+
stream_payload["id"] = ret_id
106+
# 重置 stream_payload,为下一段流式做准备
107+
stream_payload = {
108+
"state": 1,
109+
"id": None,
110+
"index": 0,
111+
"reset": False,
112+
}
113+
last_edit_time = 0
114+
continue
115+
116+
# 累积内容
80117
if not self.send_buffer:
81118
self.send_buffer = chain
82119
else:
83120
self.send_buffer.chain.extend(chain.chain)
84121

85-
if isinstance(source, botpy.message.C2CMessage):
86-
# 真流式传输
87-
current_time = asyncio.get_running_loop().time()
88-
time_since_last_edit = current_time - last_edit_time
89-
90-
if time_since_last_edit >= throttle_interval:
91-
ret = cast(
92-
message.Message,
93-
await self._post_send(stream=stream_payload),
94-
)
95-
stream_payload["index"] += 1
96-
stream_payload["id"] = ret["id"]
97-
last_edit_time = asyncio.get_running_loop().time()
122+
# 节流:按时间间隔发送中间分片
123+
current_time = asyncio.get_running_loop().time()
124+
if current_time - last_edit_time >= throttle_interval:
125+
ret = cast(
126+
message.Message,
127+
await self._post_send(stream=stream_payload),
128+
)
129+
stream_payload["index"] += 1
130+
ret_id = self._extract_response_message_id(ret)
131+
if ret_id is not None:
132+
stream_payload["id"] = ret_id
133+
last_edit_time = asyncio.get_running_loop().time()
134+
self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容
98135

99136
if isinstance(source, botpy.message.C2CMessage):
100-
# 结束流式对话,并且传输 buffer 中剩余的消息
137+
# 结束流式对话,发送 buffer 中剩余内容
101138
stream_payload["state"] = 10
102139
ret = await self._post_send(stream=stream_payload)
103140
else:
104141
ret = await self._post_send()
105142

106143
except Exception as e:
107144
logger.error(f"发送流式消息时出错: {e}", exc_info=True)
145+
# 避免累计内容在异常后被整包重复发送:仅清理缓存,不做非流式整包兜底
146+
# 如需兜底,应该只发送未发送 delta(后续可继续优化)
108147
self.send_buffer = None
109148

110-
return await super().send_streaming(generator, use_fallback)
149+
return None
150+
151+
@staticmethod
152+
def _extract_response_message_id(ret) -> str | None:
153+
"""兼容 qq-botpy 返回 Message 对象或 dict 两种形态。"""
154+
if ret is None:
155+
return None
156+
if isinstance(ret, dict):
157+
ret_id = ret.get("id")
158+
return str(ret_id) if ret_id is not None else None
159+
ret_id = getattr(ret, "id", None)
160+
return str(ret_id) if ret_id is not None else None
111161

112162
async def _post_send(self, stream: dict | None = None):
113163
if not self.send_buffer:
@@ -135,6 +185,11 @@ async def _post_send(self, stream: dict | None = None):
135185
file_name,
136186
) = await QQOfficialMessageEvent._parse_to_qqofficial(self.send_buffer)
137187

188+
# C2C 流式仅用于文本分片,富媒体时降级为普通发送,避免平台侧流式校验报错。
189+
if stream and (image_base64 or record_file_path):
190+
logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。")
191+
stream = None
192+
138193
if (
139194
not plain_text
140195
and not image_base64
@@ -145,6 +200,17 @@ async def _post_send(self, stream: dict | None = None):
145200
):
146201
return None
147202

203+
# QQ C2C 流式 API 说明:
204+
# - 开始/中间分片(state=1):增量追加内容,不需要 \n(加了会导致强制换行)
205+
# - 最终分片(state=10):结束流,content 必须以 \n 结尾(QQ API 要求)
206+
if (
207+
stream
208+
and stream.get("state") == 10
209+
and plain_text
210+
and not plain_text.endswith("\n")
211+
):
212+
plain_text = plain_text + "\n"
213+
148214
payload: dict = {
149215
# "content": plain_text,
150216
"markdown": MarkdownPayload(content=plain_text) if plain_text else None,
@@ -214,6 +280,7 @@ async def _post_send(self, stream: dict | None = None):
214280
),
215281
payload=payload,
216282
plain_text=plain_text,
283+
stream=stream,
217284
)
218285

219286
case botpy.message.C2CMessage():
@@ -270,6 +337,7 @@ async def _post_send(self, stream: dict | None = None):
270337
),
271338
payload=payload,
272339
plain_text=plain_text,
340+
stream=stream,
273341
)
274342
else:
275343
ret = await self._send_with_markdown_fallback(
@@ -279,6 +347,7 @@ async def _post_send(self, stream: dict | None = None):
279347
),
280348
payload=payload,
281349
plain_text=plain_text,
350+
stream=stream,
282351
)
283352
logger.debug(f"Message sent to C2C: {ret}")
284353

@@ -294,6 +363,7 @@ async def _post_send(self, stream: dict | None = None):
294363
),
295364
payload=payload,
296365
plain_text=plain_text,
366+
stream=stream,
297367
)
298368

299369
case botpy.message.DirectMessage():
@@ -308,6 +378,7 @@ async def _post_send(self, stream: dict | None = None):
308378
),
309379
payload=payload,
310380
plain_text=plain_text,
381+
stream=stream,
311382
)
312383

313384
case _:
@@ -324,10 +395,31 @@ async def _send_with_markdown_fallback(
324395
send_func,
325396
payload: dict,
326397
plain_text: str,
398+
stream: dict | None = None,
327399
):
328400
try:
329401
return await send_func(payload)
330402
except botpy.errors.ServerError as err:
403+
# QQ 流式 markdown 分片校验:内容必须以换行结尾。
404+
# 某些边界场景服务端仍可能判定失败,这里做一次修正重试。
405+
if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err):
406+
retry_payload = payload.copy()
407+
408+
markdown_payload = retry_payload.get("markdown")
409+
if isinstance(markdown_payload, dict):
410+
md_content = cast(str, markdown_payload.get("content", "") or "")
411+
if md_content and not md_content.endswith("\n"):
412+
retry_payload["markdown"] = {"content": md_content + "\n"}
413+
414+
content = cast(str | None, retry_payload.get("content"))
415+
if content and not content.endswith("\n"):
416+
retry_payload["content"] = content + "\n"
417+
418+
logger.warning(
419+
"[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。"
420+
)
421+
return await send_func(retry_payload)
422+
331423
if (
332424
self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err)
333425
or not payload.get("markdown")
@@ -339,10 +431,14 @@ async def _send_with_markdown_fallback(
339431
"[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。"
340432
)
341433
fallback_payload = payload.copy()
342-
fallback_payload["markdown"] = None
434+
fallback_payload.pop("markdown", None)
343435
fallback_payload["content"] = plain_text
344436
if fallback_payload.get("msg_type") == 2:
345437
fallback_payload["msg_type"] = 0
438+
if stream:
439+
fallback_content = cast(str, fallback_payload.get("content") or "")
440+
if fallback_content and not fallback_content.endswith("\n"):
441+
fallback_payload["content"] = fallback_content + "\n"
346442
return await send_func(fallback_payload)
347443

348444
async def upload_group_and_c2c_image(
@@ -460,13 +556,21 @@ async def post_c2c_message(
460556
) -> message.Message:
461557
payload = locals()
462558
payload.pop("self", None)
559+
# QQ API does not accept stream.id=None; remove it when not yet assigned
560+
if "stream" in payload and payload["stream"] is not None:
561+
stream_data = dict(payload["stream"])
562+
if stream_data.get("id") is None:
563+
stream_data.pop("id", None)
564+
payload["stream"] = stream_data
463565
route = Route("POST", "/v2/users/{openid}/messages", openid=openid)
464566
result = await self.bot.api._http.request(route, json=payload)
465567

568+
if result is None:
569+
logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送")
570+
return None
466571
if not isinstance(result, dict):
467-
raise RuntimeError(
468-
f"Failed to post c2c message, response is not dict: {result}"
469-
)
572+
logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}")
573+
return None
470574

471575
return message.Message(**result)
472576

astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ async def _send_by_session_common(
212212
if media:
213213
payload["media"] = media
214214
payload["msg_type"] = 7
215+
payload.pop("msg_id", None)
215216
if file_source:
216217
media = await QQOfficialMessageEvent.upload_group_and_c2c_media(
217218
send_helper, # type: ignore
@@ -223,6 +224,7 @@ async def _send_by_session_common(
223224
if media:
224225
payload["media"] = media
225226
payload["msg_type"] = 7
227+
payload.pop("msg_id", None)
226228
ret = await self.client.api.post_group_message(
227229
group_openid=session.session_id,
228230
**payload,
@@ -266,6 +268,9 @@ async def _send_by_session_common(
266268
if media:
267269
payload["media"] = media
268270
payload["msg_type"] = 7
271+
# QQ API rejects msg_id for media (video/file) messages sent
272+
# via the proactive tool-call path; remove it to avoid 越权 error.
273+
payload.pop("msg_id", None)
269274
if file_source:
270275
media = await QQOfficialMessageEvent.upload_group_and_c2c_media(
271276
send_helper, # type: ignore
@@ -277,6 +282,7 @@ async def _send_by_session_common(
277282
if media:
278283
payload["media"] = media
279284
payload["msg_type"] = 7
285+
payload.pop("msg_id", None)
280286

281287
ret = await QQOfficialMessageEvent.post_c2c_message(
282288
send_helper, # type: ignore

0 commit comments

Comments
 (0)