Skip to content

Commit ea2d9aa

Browse files
committed
fix(platform):streamline-dedup-architecture-and-logging
1 parent e35a26b commit ea2d9aa

File tree

4 files changed

+102
-100
lines changed

4 files changed

+102
-100
lines changed

astrbot/core/event_bus.py

Lines changed: 55 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from astrbot.core import logger
1717
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
1818
from astrbot.core.message.utils import (
19-
build_event_content_dedup_key,
20-
build_event_message_id_dedup_key,
19+
build_content_dedup_key,
20+
build_message_id_dedup_key,
2121
)
2222
from astrbot.core.pipeline.scheduler import PipelineScheduler
2323
from astrbot.core.utils.number_utils import safe_positive_float
@@ -26,90 +26,86 @@
2626
from .platform import AstrMessageEvent
2727

2828

29-
class EventDeduplicator:
30-
"""Event deduplicator using TTL-based registry.
31-
32-
This class handles deduplication of events based on content fingerprint
33-
and message ID, with configurable TTL window.
34-
"""
29+
class EventBus:
30+
"""用于处理事件的分发和处理"""
3531

36-
def __init__(self, ttl_seconds: float = 0.5) -> None:
37-
self._registry = TTLKeyRegistry(ttl_seconds)
32+
def __init__(
33+
self,
34+
event_queue: Queue,
35+
pipeline_scheduler_mapping: dict[str, PipelineScheduler],
36+
astrbot_config_mgr: AstrBotConfigManager,
37+
) -> None:
38+
self.event_queue = event_queue # 事件队列
39+
# abconf uuid -> scheduler
40+
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
41+
self.astrbot_config_mgr = astrbot_config_mgr
42+
dedup_ttl_seconds = safe_positive_float(
43+
self.astrbot_config_mgr.g(
44+
None,
45+
"event_bus_dedup_ttl_seconds",
46+
0.5,
47+
),
48+
default=0.5,
49+
)
50+
self._dedup_registry = TTLKeyRegistry(ttl_seconds=dedup_ttl_seconds)
51+
52+
@staticmethod
53+
def _build_event_content_key(event: AstrMessageEvent) -> str:
54+
return build_content_dedup_key(
55+
platform_id=str(event.get_platform_id() or ""),
56+
unified_msg_origin=str(event.unified_msg_origin or ""),
57+
sender_id=str(event.get_sender_id() or ""),
58+
text=str(event.get_message_str() or ""),
59+
components=event.get_messages(),
60+
)
3861

39-
def is_duplicate(self, event: AstrMessageEvent) -> bool:
40-
"""Check if the event is a duplicate.
62+
@staticmethod
63+
def _build_event_message_id_key(event: AstrMessageEvent) -> str | None:
64+
message_id = getattr(event.message_obj, "message_id", "") or getattr(
65+
event.message_obj,
66+
"id",
67+
"",
68+
)
69+
return build_message_id_dedup_key(
70+
platform_id=str(event.get_platform_id() or ""),
71+
unified_msg_origin=str(event.unified_msg_origin or ""),
72+
message_id=str(message_id or ""),
73+
)
4174

42-
Returns False immediately if TTL is 0 (deduplication disabled).
43-
Short-circuits on message_id key to avoid expensive attachment signature computation.
44-
"""
45-
# TTL of 0 means deduplication is disabled
46-
if self._registry.ttl_seconds == 0:
75+
def _is_duplicate(self, event: AstrMessageEvent) -> bool:
76+
if self._dedup_registry.ttl_seconds == 0:
4777
return False
4878

49-
# Short-circuit: check message_id first (cheap) before computing full content key (expensive)
50-
message_id_key = build_event_message_id_dedup_key(event)
79+
message_id_key = self._build_event_message_id_key(event)
5180
if message_id_key is not None:
52-
if self._registry.contains(message_id_key):
81+
if self._dedup_registry.contains(message_id_key):
5382
logger.debug(
5483
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
5584
event.unified_msg_origin,
5685
event.get_sender_id(),
5786
)
5887
return True
59-
# Register message_id key since we'll process the event
60-
self._registry.add(message_id_key)
88+
self._dedup_registry.add(message_id_key)
6189

62-
# Only compute full content key if we get past message_id check
63-
content_key = build_event_content_dedup_key(event)
64-
if self._registry.contains(content_key):
90+
content_key = self._build_event_content_key(event)
91+
if self._dedup_registry.contains(content_key):
6592
logger.debug(
6693
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
6794
event.unified_msg_origin,
6895
event.get_sender_id(),
6996
)
70-
# If content duplicate, also remove message_id to preserve existing behavior
7197
if message_id_key is not None:
72-
self._registry.discard(message_id_key)
98+
self._dedup_registry.discard(message_id_key)
7399
return True
74100

75-
# Register content key
76-
self._registry.add(content_key)
101+
self._dedup_registry.add(content_key)
77102
return False
78103

79-
80-
class EventBus:
81-
"""用于处理事件的分发和处理"""
82-
83-
def __init__(
84-
self,
85-
event_queue: Queue,
86-
pipeline_scheduler_mapping: dict[str, PipelineScheduler],
87-
astrbot_config_mgr: AstrBotConfigManager,
88-
) -> None:
89-
self.event_queue = event_queue # 事件队列
90-
# abconf uuid -> scheduler
91-
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
92-
self.astrbot_config_mgr = astrbot_config_mgr
93-
dedup_ttl_seconds = safe_positive_float(
94-
self.astrbot_config_mgr.g(
95-
None,
96-
"event_bus_dedup_ttl_seconds",
97-
0.5,
98-
),
99-
default=0.5,
100-
)
101-
self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)
102-
103104
async def dispatch(self) -> None:
104105
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
105106
while True:
106107
event: AstrMessageEvent = await self.event_queue.get()
107-
if self._deduplicator.is_duplicate(event):
108-
logger.debug(
109-
"Skip duplicate event in event_bus, umo=%s, sender=%s",
110-
event.unified_msg_origin,
111-
event.get_sender_id(),
112-
)
108+
if self._is_duplicate(event):
113109
continue
114110
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
115111
conf_id = conf_info["id"]

astrbot/core/message/utils.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,9 @@
22

33
import hashlib
44
from collections.abc import Iterable
5-
from typing import TYPE_CHECKING
65

76
from astrbot.core.message.components import BaseMessageComponent, File, Image
87

9-
if TYPE_CHECKING:
10-
from astrbot.core.platform import AstrMessageEvent
11-
12-
138
_MAX_RAW_TEXT_FINGERPRINT_LEN = 256
149

1510

@@ -57,46 +52,50 @@ def build_sender_content_dedup_key(content: str, sender_id: str) -> str | None:
5752
return f"{sender_id}:{content_hash}"
5853

5954

60-
def build_event_content_dedup_key(event: "AstrMessageEvent") -> str:
61-
"""Build a content fingerprint key for EventBus deduplication."""
62-
msg_text = str(event.get_message_str() or "").strip()
55+
def build_content_dedup_key(
56+
*,
57+
platform_id: str,
58+
unified_msg_origin: str,
59+
sender_id: str,
60+
text: str,
61+
components: Iterable[BaseMessageComponent],
62+
) -> str:
63+
"""Build a content fingerprint key for event deduplication."""
64+
msg_text = str(text or "").strip()
6365
if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
6466
msg_sig = msg_text
6567
else:
6668
msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
6769
msg_sig = f"h:{len(msg_text)}:{msg_hash}"
6870

69-
attach_sig = build_component_dedup_signature(event.get_messages())
70-
platform_id = str(event.get_platform_id() or "")
71-
unified_msg_origin = str(event.unified_msg_origin or "")
72-
sender_id = str(event.get_sender_id() or "")
71+
attach_sig = build_component_dedup_signature(components)
7372
return "|".join(
7473
[
7574
"content",
76-
platform_id,
77-
unified_msg_origin,
78-
sender_id,
75+
str(platform_id or ""),
76+
str(unified_msg_origin or ""),
77+
str(sender_id or ""),
7978
msg_sig,
8079
attach_sig,
8180
]
8281
)
8382

8483

85-
def build_event_message_id_dedup_key(event: "AstrMessageEvent") -> str | None:
86-
"""Build a message_id fingerprint key for EventBus deduplication."""
87-
message_id = str(getattr(event.message_obj, "message_id", "") or "")
88-
if not message_id:
89-
message_id = str(getattr(event.message_obj, "id", "") or "")
90-
if not message_id:
84+
def build_message_id_dedup_key(
85+
*,
86+
platform_id: str,
87+
unified_msg_origin: str,
88+
message_id: str,
89+
) -> str | None:
90+
"""Build a message_id fingerprint key for event deduplication."""
91+
normalized_message_id = str(message_id or "")
92+
if not normalized_message_id:
9193
return None
92-
93-
platform_id = str(event.get_platform_id() or "")
94-
unified_msg_origin = str(event.unified_msg_origin or "")
9594
return "|".join(
9695
[
9796
"message_id",
98-
platform_id,
99-
unified_msg_origin,
100-
message_id,
97+
str(platform_id or ""),
98+
str(unified_msg_origin or ""),
99+
normalized_message_id,
101100
]
102101
)

astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,11 @@ class botClient(Client):
150150
def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
151151
self.platform = platform
152152

153-
async def _should_drop_message(self, message) -> bool:
154-
sender_id = _extract_sender_id(message)
155-
content = getattr(message, "content", "") or ""
156-
return await self.platform._is_duplicate_message(message.id, content, sender_id)
157-
158153
# 收到群消息
159154
async def on_group_at_message_create(
160155
self, message: botpy.message.GroupMessage
161156
) -> None:
162-
if await self._should_drop_message(message):
157+
if not await self.platform.should_handle_raw_message(message):
163158
return
164159
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
165160
message,
@@ -172,7 +167,7 @@ async def on_group_at_message_create(
172167

173168
# 收到频道消息
174169
async def on_at_message_create(self, message: botpy.message.Message) -> None:
175-
if await self._should_drop_message(message):
170+
if not await self.platform.should_handle_raw_message(message):
176171
return
177172
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
178173
message,
@@ -187,7 +182,7 @@ async def on_at_message_create(self, message: botpy.message.Message) -> None:
187182
async def on_direct_message_create(
188183
self, message: botpy.message.DirectMessage
189184
) -> None:
190-
if await self._should_drop_message(message):
185+
if not await self.platform.should_handle_raw_message(message):
191186
return
192187
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
193188
message,
@@ -199,7 +194,7 @@ async def on_direct_message_create(
199194

200195
# 收到 C2C 消息
201196
async def on_c2c_message_create(self, message: botpy.message.C2CMessage) -> None:
202-
if await self._should_drop_message(message):
197+
if not await self.platform.should_handle_raw_message(message):
203198
return
204199
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
205200
message,
@@ -280,10 +275,16 @@ def __init__(
280275
cleanup_interval_seconds=cleanup_interval_seconds,
281276
)
282277

283-
async def _is_duplicate_message(
284-
self, message_id: str, content: str = "", sender_id: str = ""
285-
) -> bool:
286-
return await self._deduplicator.is_duplicate(message_id, content, sender_id)
278+
async def should_handle_raw_message(self, message: Any) -> bool:
279+
"""Return False if the raw incoming message should be dropped."""
280+
sender_id = _extract_sender_id(message)
281+
content = getattr(message, "content", "") or ""
282+
is_duplicate = await self._deduplicator.is_duplicate(
283+
message.id,
284+
content,
285+
sender_id,
286+
)
287+
return not is_duplicate
287288

288289
async def send_by_session(
289290
self,

astrbot/core/utils/ttl_registry.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"""
66

77
import time
8-
from typing import Hashable, Sequence
8+
from collections.abc import Hashable, Sequence
99

1010

1111
class TTLKeyRegistry:
@@ -15,6 +15,12 @@ class TTLKeyRegistry:
1515
deduplication scenarios where old entries should be automatically cleaned up.
1616
Supports optional cleanup interval throttling to avoid per-access full scans.
1717
18+
Concurrency note:
19+
This class is not thread-safe and does not provide internal locking.
20+
It is designed for single-consumer/single-thread usage patterns.
21+
If shared across concurrent tasks/threads, callers must provide
22+
external synchronization.
23+
1824
Example:
1925
registry = TTLKeyRegistry(ttl_seconds=0.5)
2026
if registry.seen("some_key"):

0 commit comments

Comments
 (0)