Skip to content

Commit 7bd3ca4

Browse files
committed
refactor(platform):harden-dedup-and-extract-event-deduplicator
1 parent ea2d9aa commit 7bd3ca4

File tree

3 files changed

+128
-96
lines changed

3 files changed

+128
-96
lines changed

astrbot/core/event_bus.py

Lines changed: 3 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515

1616
from astrbot.core import logger
1717
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
18-
from astrbot.core.message.utils import (
19-
build_content_dedup_key,
20-
build_message_id_dedup_key,
21-
)
2218
from astrbot.core.pipeline.scheduler import PipelineScheduler
2319
from astrbot.core.utils.number_utils import safe_positive_float
24-
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
2520

21+
from .event_dedup import EventDeduplicator
2622
from .platform import AstrMessageEvent
2723

2824

@@ -47,65 +43,13 @@ def __init__(
4743
),
4844
default=0.5,
4945
)
50-
self._dedup_registry = TTLKeyRegistry(ttl_seconds=dedup_ttl_seconds)
51-
52-
@staticmethod
53-
def _build_event_content_key(event: AstrMessageEvent) -> str:
54-
return build_content_dedup_key(
55-
platform_id=str(event.get_platform_id() or ""),
56-
unified_msg_origin=str(event.unified_msg_origin or ""),
57-
sender_id=str(event.get_sender_id() or ""),
58-
text=str(event.get_message_str() or ""),
59-
components=event.get_messages(),
60-
)
61-
62-
@staticmethod
63-
def _build_event_message_id_key(event: AstrMessageEvent) -> str | None:
64-
message_id = getattr(event.message_obj, "message_id", "") or getattr(
65-
event.message_obj,
66-
"id",
67-
"",
68-
)
69-
return build_message_id_dedup_key(
70-
platform_id=str(event.get_platform_id() or ""),
71-
unified_msg_origin=str(event.unified_msg_origin or ""),
72-
message_id=str(message_id or ""),
73-
)
74-
75-
def _is_duplicate(self, event: AstrMessageEvent) -> bool:
76-
if self._dedup_registry.ttl_seconds == 0:
77-
return False
78-
79-
message_id_key = self._build_event_message_id_key(event)
80-
if message_id_key is not None:
81-
if self._dedup_registry.contains(message_id_key):
82-
logger.debug(
83-
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
84-
event.unified_msg_origin,
85-
event.get_sender_id(),
86-
)
87-
return True
88-
self._dedup_registry.add(message_id_key)
89-
90-
content_key = self._build_event_content_key(event)
91-
if self._dedup_registry.contains(content_key):
92-
logger.debug(
93-
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
94-
event.unified_msg_origin,
95-
event.get_sender_id(),
96-
)
97-
if message_id_key is not None:
98-
self._dedup_registry.discard(message_id_key)
99-
return True
100-
101-
self._dedup_registry.add(content_key)
102-
return False
46+
self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)
10347

10448
async def dispatch(self) -> None:
10549
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
10650
while True:
10751
event: AstrMessageEvent = await self.event_queue.get()
108-
if self._is_duplicate(event):
52+
if self._deduplicator.is_duplicate(event):
10953
continue
11054
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
11155
conf_id = conf_info["id"]

astrbot/core/event_dedup.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from astrbot.core import logger
2+
from astrbot.core.message.utils import (
3+
build_content_dedup_key,
4+
build_message_id_dedup_key,
5+
)
6+
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
7+
8+
from .platform import AstrMessageEvent
9+
10+
11+
class EventDeduplicator:
12+
def __init__(self, ttl_seconds: float = 0.5) -> None:
13+
self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)
14+
15+
def is_duplicate(self, event: AstrMessageEvent) -> bool:
16+
if self._registry.ttl_seconds == 0:
17+
return False
18+
19+
message_id_key = self._build_message_id_key(event)
20+
if message_id_key is not None:
21+
if self._registry.contains(message_id_key):
22+
logger.debug(
23+
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
24+
event.unified_msg_origin,
25+
event.get_sender_id(),
26+
)
27+
return True
28+
self._registry.add(message_id_key)
29+
30+
content_key = self._build_content_key(event)
31+
if self._registry.contains(content_key):
32+
logger.debug(
33+
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
34+
event.unified_msg_origin,
35+
event.get_sender_id(),
36+
)
37+
if message_id_key is not None:
38+
self._registry.discard(message_id_key)
39+
return True
40+
41+
self._registry.add(content_key)
42+
return False
43+
44+
@staticmethod
45+
def _build_content_key(event: AstrMessageEvent) -> str:
46+
return build_content_dedup_key(
47+
platform_id=str(event.get_platform_id() or ""),
48+
unified_msg_origin=str(event.unified_msg_origin or ""),
49+
sender_id=str(event.get_sender_id() or ""),
50+
text=str(event.get_message_str() or ""),
51+
components=event.get_messages(),
52+
)
53+
54+
@staticmethod
55+
def _build_message_id_key(event: AstrMessageEvent) -> str | None:
56+
message_id = getattr(event.message_obj, "message_id", "") or getattr(
57+
event.message_obj,
58+
"id",
59+
"",
60+
)
61+
return build_message_id_dedup_key(
62+
platform_id=str(event.get_platform_id() or ""),
63+
unified_msg_origin=str(event.unified_msg_origin or ""),
64+
message_id=str(message_id or ""),
65+
)

astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import random
77
import time
8+
from types import SimpleNamespace
89
from typing import Any, cast
910

1011
import botpy
@@ -90,8 +91,50 @@ def __init__(
9091
)
9192
self._lock = asyncio.Lock()
9293

93-
def _build_content_key(self, content: str, sender_id: str) -> str | None:
94-
return build_sender_content_dedup_key(content, sender_id)
94+
def _id_dedup_enabled(self, message_id: str) -> bool:
95+
return self._message_ids.ttl_seconds > 0 and bool(message_id)
96+
97+
def _content_dedup_enabled(self) -> bool:
98+
return self._content_keys.ttl_seconds > 0
99+
100+
def _register_message_id(self, message_id: str) -> bool:
101+
"""Return True if duplicate by ID, False otherwise (and register)."""
102+
if self._message_ids.contains(message_id):
103+
logger.debug(
104+
"[QQOfficial] Duplicate message detected (by ID): %s...",
105+
message_id[:50],
106+
)
107+
return True
108+
109+
self._message_ids.add(message_id)
110+
return False
111+
112+
def _register_content(
113+
self,
114+
message_id: str,
115+
content: str,
116+
sender_id: str,
117+
id_dedup_enabled: bool,
118+
) -> bool:
119+
"""Return True if duplicate by content, False otherwise (and register)."""
120+
content_key = build_sender_content_dedup_key(content, sender_id)
121+
if content_key is None:
122+
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
123+
return False
124+
125+
if self._content_keys.contains(content_key):
126+
logger.debug(
127+
"[QQOfficial] Duplicate message detected (by content): %s",
128+
content_key,
129+
)
130+
# Preserve existing behavior: do not keep message_id on content duplicates
131+
if id_dedup_enabled:
132+
self._message_ids.discard(message_id)
133+
return True
134+
135+
self._content_keys.add(content_key)
136+
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
137+
return False
95138

96139
async def is_duplicate(
97140
self,
@@ -100,22 +143,14 @@ async def is_duplicate(
100143
sender_id: str = "",
101144
) -> bool:
102145
async with self._lock:
103-
id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
104-
content_dedup_enabled = self._content_keys.ttl_seconds > 0
146+
id_dedup_enabled = self._id_dedup_enabled(message_id)
147+
content_dedup_enabled = self._content_dedup_enabled()
105148

106149
if not id_dedup_enabled and not content_dedup_enabled:
107150
return False
108151

109-
# 1) ID-based dedup
110-
if id_dedup_enabled:
111-
if self._message_ids.contains(message_id):
112-
logger.debug(
113-
"[QQOfficial] Duplicate message detected (by ID): %s...",
114-
message_id[:50],
115-
)
116-
return True
117-
118-
self._message_ids.add(message_id)
152+
if id_dedup_enabled and self._register_message_id(message_id):
153+
return True
119154

120155
# 2) Content-based dedup
121156
if not content_dedup_enabled:
@@ -124,26 +159,12 @@ async def is_duplicate(
124159
)
125160
return False
126161

127-
content_key = self._build_content_key(content, sender_id)
128-
if content_key is None:
129-
logger.debug(
130-
"[QQOfficial] New message registered: %s...", message_id[:50]
131-
)
132-
return False
133-
134-
if self._content_keys.contains(content_key):
135-
logger.debug(
136-
"[QQOfficial] Duplicate message detected (by content): %s",
137-
content_key,
138-
)
139-
# Preserve existing behavior: do not keep message_id on content duplicates
140-
if id_dedup_enabled:
141-
self._message_ids.discard(message_id)
142-
return True
143-
144-
self._content_keys.add(content_key)
145-
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
146-
return False
162+
return self._register_content(
163+
message_id=message_id,
164+
content=content,
165+
sender_id=sender_id,
166+
id_dedup_enabled=id_dedup_enabled,
167+
)
147168

148169

149170
class botClient(Client):
@@ -278,9 +299,11 @@ def __init__(
278299
async def should_handle_raw_message(self, message: Any) -> bool:
279300
"""Return False if the raw incoming message should be dropped."""
280301
sender_id = _extract_sender_id(message)
281-
content = getattr(message, "content", "") or ""
302+
message_id = str(getattr(message, "id", "") or "")
303+
raw_content = getattr(message, "content", "")
304+
content = str(raw_content or "")
282305
is_duplicate = await self._deduplicator.is_duplicate(
283-
message.id,
306+
message_id,
284307
content,
285308
sender_id,
286309
)
@@ -327,7 +350,7 @@ async def _send_by_session_common(
327350

328351
payload: dict[str, Any] = {"content": plain_text, "msg_id": msg_id}
329352
ret: Any = None
330-
send_helper = self.client
353+
send_helper = SimpleNamespace(bot=self.client)
331354

332355
if session.message_type == MessageType.GROUP_MESSAGE:
333356
scene = self._session_scene.get(session.session_id)

0 commit comments

Comments
 (0)