Skip to content

Commit f9b3216

Browse files
committed
fix(platform): prevent qq_official duplicate message consumption (#5848)
Co-authored-by: VISIR <2561719535@qq.com>
1 parent 551c956 commit f9b3216

File tree

8 files changed

+518
-0
lines changed

8 files changed

+518
-0
lines changed

astrbot/core/config/default.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@
248248
},
249249
},
250250
"platform": [],
251+
"event_bus_dedup_ttl_seconds": 0.5,
251252
"platform_specific": {
252253
# 平台特异配置:按平台分类,平台下按功能分组
253254
"lark": {
@@ -328,6 +329,9 @@ class ChatProviderTemplate(TypedDict):
328329
"secret": "",
329330
"enable_group_c2c": True,
330331
"enable_guild_direct_message": True,
332+
"dedup_message_id_ttl_seconds": 1800.0,
333+
"dedup_content_key_ttl_seconds": 3.0,
334+
"dedup_cleanup_interval_seconds": 1.0,
331335
},
332336
"QQ 官方机器人(Webhook)": {
333337
"id": "default",
@@ -774,6 +778,21 @@ class ChatProviderTemplate(TypedDict):
774778
"type": "bool",
775779
"hint": "启用后,机器人可以接收到频道的私聊消息。",
776780
},
781+
"dedup_message_id_ttl_seconds": {
782+
"description": "消息 ID 去重窗口(秒)",
783+
"type": "float",
784+
"hint": "QQ 官方适配器中 message_id 去重窗口,默认 1800 秒。",
785+
},
786+
"dedup_content_key_ttl_seconds": {
787+
"description": "内容键去重窗口(秒)",
788+
"type": "float",
789+
"hint": "QQ 官方适配器中 sender+content hash 去重窗口,默认 3 秒。",
790+
},
791+
"dedup_cleanup_interval_seconds": {
792+
"description": "去重缓存清理间隔(秒)",
793+
"type": "float",
794+
"hint": "QQ 官方适配器去重缓存的增量清理间隔,默认 1 秒。",
795+
},
777796
"ws_reverse_host": {
778797
"description": "反向 Websocket 主机",
779798
"type": "string",

astrbot/core/event_bus.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
from astrbot.core import logger
1717
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
1818
from astrbot.core.pipeline.scheduler import PipelineScheduler
19+
from astrbot.core.utils.number_utils import safe_positive_float
1920

21+
from .event_dedup import EventDeduplicator
2022
from .platform import AstrMessageEvent
2123

2224

@@ -33,10 +35,22 @@ def __init__(
3335
# abconf uuid -> scheduler
3436
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
3537
self.astrbot_config_mgr = astrbot_config_mgr
38+
dedup_ttl_seconds = safe_positive_float(
39+
self.astrbot_config_mgr.g(
40+
None,
41+
"event_bus_dedup_ttl_seconds",
42+
0.5,
43+
),
44+
default=0.5,
45+
)
46+
self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)
3647

3748
async def dispatch(self) -> None:
49+
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
3850
while True:
3951
event: AstrMessageEvent = await self.event_queue.get()
52+
if self._deduplicator.is_duplicate(event):
53+
continue
4054
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
4155
conf_id = conf_info["id"]
4256
conf_name = conf_info.get("name") or conf_id

astrbot/core/event_dedup.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from astrbot.core import logger
2+
from astrbot.core.message.utils import (
3+
build_content_dedup_key,
4+
build_message_id_dedup_key,
5+
)
6+
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
7+
8+
from .platform import AstrMessageEvent
9+
10+
11+
class EventDeduplicator:
12+
def __init__(self, ttl_seconds: float = 0.5) -> None:
13+
self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)
14+
15+
def is_duplicate(self, event: AstrMessageEvent) -> bool:
16+
if self._registry.ttl_seconds == 0:
17+
return False
18+
19+
message_id_key = self._build_message_id_key(event)
20+
if message_id_key is not None:
21+
if self._registry.contains(message_id_key):
22+
logger.debug(
23+
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
24+
event.unified_msg_origin,
25+
event.get_sender_id(),
26+
)
27+
return True
28+
self._registry.add(message_id_key)
29+
30+
content_key = self._build_content_key(event)
31+
if self._registry.contains(content_key):
32+
logger.debug(
33+
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
34+
event.unified_msg_origin,
35+
event.get_sender_id(),
36+
)
37+
if message_id_key is not None:
38+
self._registry.discard(message_id_key)
39+
return True
40+
41+
self._registry.add(content_key)
42+
return False
43+
44+
@staticmethod
45+
def _build_content_key(event: AstrMessageEvent) -> str:
46+
return build_content_dedup_key(
47+
platform_id=str(event.get_platform_id() or ""),
48+
unified_msg_origin=str(event.unified_msg_origin or ""),
49+
sender_id=str(event.get_sender_id() or ""),
50+
text=str(event.get_message_str() or ""),
51+
components=event.get_messages(),
52+
)
53+
54+
@staticmethod
55+
def _build_message_id_key(event: AstrMessageEvent) -> str | None:
56+
message_id = getattr(event.message_obj, "message_id", "") or getattr(
57+
event.message_obj,
58+
"id",
59+
"",
60+
)
61+
return build_message_id_dedup_key(
62+
platform_id=str(event.get_platform_id() or ""),
63+
unified_msg_origin=str(event.unified_msg_origin or ""),
64+
message_id=str(message_id or ""),
65+
)

astrbot/core/message/utils.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Message utilities for deduplication and component handling."""
2+
3+
import hashlib
4+
from collections.abc import Iterable
5+
6+
from astrbot.core.message.components import BaseMessageComponent, File, Image
7+
8+
_MAX_RAW_TEXT_FINGERPRINT_LEN = 256
9+
10+
11+
def build_component_dedup_signature(
12+
components: Iterable[BaseMessageComponent],
13+
) -> str:
14+
"""Build a deduplication signature from message components.
15+
16+
This function extracts unique identifiers from Image and File components
17+
and creates a hash-based signature for deduplication purposes.
18+
19+
Args:
20+
components: An iterable of message components to analyze.
21+
22+
Returns:
23+
A SHA1 hash (16 hex characters) representing the component signatures,
24+
or an empty string if no valid components are found.
25+
"""
26+
parts: list[str] = []
27+
for component in components:
28+
if isinstance(component, Image):
29+
# Image can have url, file, or file_unique
30+
ref = component.url or component.file or component.file_unique or ""
31+
if ref:
32+
parts.append(f"img:{ref}")
33+
elif isinstance(component, File):
34+
# File can have url, file (via property), or name
35+
ref = component.url or component.file or component.name or ""
36+
if ref:
37+
parts.append(f"file:{ref}")
38+
# Future component types can be added here
39+
40+
if not parts:
41+
return ""
42+
43+
payload = "|".join(parts)
44+
return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16]
45+
46+
47+
def build_sender_content_dedup_key(content: str, sender_id: str) -> str | None:
48+
"""Build a sender+content hash key for short-window deduplication."""
49+
if not (content and sender_id):
50+
return None
51+
content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
52+
return f"{sender_id}:{content_hash}"
53+
54+
55+
def build_content_dedup_key(
56+
*,
57+
platform_id: str,
58+
unified_msg_origin: str,
59+
sender_id: str,
60+
text: str,
61+
components: Iterable[BaseMessageComponent],
62+
) -> str:
63+
"""Build a content fingerprint key for event deduplication."""
64+
msg_text = str(text or "").strip()
65+
if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
66+
msg_sig = msg_text
67+
else:
68+
msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
69+
msg_sig = f"h:{len(msg_text)}:{msg_hash}"
70+
71+
attach_sig = build_component_dedup_signature(components)
72+
return "|".join(
73+
[
74+
"content",
75+
str(platform_id or ""),
76+
str(unified_msg_origin or ""),
77+
str(sender_id or ""),
78+
msg_sig,
79+
attach_sig,
80+
]
81+
)
82+
83+
84+
def build_message_id_dedup_key(
85+
*,
86+
platform_id: str,
87+
unified_msg_origin: str,
88+
message_id: str,
89+
) -> str | None:
90+
"""Build a message_id fingerprint key for event deduplication."""
91+
normalized_message_id = str(message_id or "")
92+
if not normalized_message_id:
93+
return None
94+
return "|".join(
95+
[
96+
"message_id",
97+
str(platform_id or ""),
98+
str(unified_msg_origin or ""),
99+
normalized_message_id,
100+
]
101+
)

astrbot/core/platform/manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@ async def load_platform(self, platform_config: dict) -> None:
122122
)
123123
return
124124

125+
# 防御式处理:避免同一平台 ID 被重复加载导致消息重复消费。
126+
if platform_id in self._inst_map:
127+
logger.warning(
128+
"平台 %s(%s) 已存在实例,先终止旧实例再重载。",
129+
platform_config["type"],
130+
platform_id,
131+
)
132+
await self.terminate_platform(platform_id)
133+
125134
logger.info(
126135
f"载入 {platform_config['type']}({platform_config['id']}) 平台适配器 ...",
127136
)

0 commit comments

Comments
 (0)