-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathmain.py
More file actions
223 lines (184 loc) · 7.93 KB
/
main.py
File metadata and controls
223 lines (184 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# main.py
import asyncio
import re
from astrbot.api import logger
from astrbot.api.event import filter
from astrbot.api.star import Context, Star
from astrbot.core import AstrBotConfig
from astrbot.core.message.components import At, Image, Json
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.platform.sources.aiocqhttp.aiocqhttp_message_event import (
AiocqhttpMessageEvent,
)
from .core.arbiter import ArbiterContext, EmojiLikeArbiter
from .core.clean import CacheCleaner
from .core.config import PluginConfig
from .core.debounce import Debouncer
from .core.download import Downloader
from .core.parsers import BaseParser, BilibiliParser
from .core.render import Renderer
from .core.sender import MessageSender
from .core.utils import extract_json_url
class ParserPlugin(Star):
def __init__(self, context: Context, config: AstrBotConfig):
super().__init__(context)
self.cfg = PluginConfig(config, context=context)
# 渲染器
self.renderer = Renderer(self.cfg)
# 下载器
self.downloader = Downloader(self.cfg)
# 防抖器
self.debouncer = Debouncer(self.cfg)
# 仲裁器
self.arbiter = EmojiLikeArbiter()
# 消息发送器
self.sender = MessageSender(self.cfg, self.renderer)
# 缓存清理器
self.cleaner = CacheCleaner(self.cfg)
# 关键词 -> Parser 映射
self.parser_map: dict[str, BaseParser] = {}
# 关键词 -> 正则 列表
self.key_pattern_list: list[tuple[str, re.Pattern[str]]] = []
async def initialize(self):
"""加载、重载插件时触发"""
# 加载渲染器资源
await asyncio.to_thread(Renderer.load_resources)
# 注册解析器
self._register_parser()
async def terminate(self):
"""插件卸载时触发"""
# 关下载器里的会话
await self.downloader.close()
# 关所有解析器里的会话 (去重后的实例)
unique_parsers = set(self.parser_map.values())
for parser in unique_parsers:
await parser.close_session()
# 关缓存清理器
await self.cleaner.stop()
def _register_parser(self):
"""注册解析器(以 parser.enable 为唯一启用来源)"""
# 所有 Parser 子类
all_subclass = BaseParser.get_all_subclass()
enabled_platforms = set(self.cfg.parser.enabled_platforms())
enabled_classes: list[type[BaseParser]] = []
enabled_names: list[str] = []
for cls in all_subclass:
platform_name = cls.platform.name
if platform_name not in enabled_platforms:
logger.debug(f"[parser] 平台未启用或未配置: {platform_name}")
continue
enabled_classes.append(cls)
enabled_names.append(platform_name)
# 一个平台一个 parser 实例
parser = cls(self.cfg, self.downloader)
# 关键词 → parser
for keyword, _ in cls._key_patterns:
self.parser_map[keyword] = parser
logger.debug(f"启用平台: {'、'.join(enabled_names) if enabled_names else '无'}")
# -------- 关键词-正则表(统一生成) --------
patterns: list[tuple[str, re.Pattern[str]]] = []
for cls in enabled_classes:
for kw, pat in cls._key_patterns:
patterns.append((kw, re.compile(pat) if isinstance(pat, str) else pat))
# 长关键词优先,避免短词抢匹配
patterns.sort(key=lambda x: -len(x[0]))
self.key_pattern_list = patterns
logger.debug(f"[parser] 关键词-正则对已生成: {[kw for kw, _ in patterns]}")
def _get_parser_by_type(self, parser_type):
for parser in self.parser_map.values():
if isinstance(parser, parser_type):
return parser
raise ValueError(f"未找到类型为 {parser_type} 的 parser 实例")
@filter.event_message_type(filter.EventMessageType.ALL)
async def on_message(self, event: AstrMessageEvent):
"""消息的统一入口"""
umo = event.unified_msg_origin
# 白名单
if self.cfg.whitelist and umo not in self.cfg.whitelist:
return
# 黑名单
if self.cfg.blacklist and umo in self.cfg.blacklist:
return
# 消息链
chain = event.get_messages()
if not chain:
return
seg1 = chain[0]
text = event.message_str
# 卡片解析:解析Json组件,提取URL
if isinstance(seg1, Json):
text = extract_json_url(seg1.data)
logger.debug(f"解析Json组件: {text}")
if not text:
return
self_id = event.get_self_id()
# 指定机制:专门@其他bot的消息不解析
if isinstance(seg1, At) and str(seg1.qq) != self_id:
return
# 核心匹配逻辑 :关键词 + 正则双重判定,汇集了所有解析器的正则对。
keyword: str = ""
searched: re.Match[str] | None = None
for kw, pat in self.key_pattern_list:
if kw not in text:
continue
if m := pat.search(text):
keyword, searched = kw, m
break
if searched is None:
return
logger.debug(f"匹配结果: {keyword}, {searched}")
# 仲裁机制
if isinstance(event, AiocqhttpMessageEvent) and not event.is_private_chat():
raw = event.message_obj.raw_message
if not isinstance(raw, dict):
logger.warning(f"Unexpected raw_message type: {type(raw)}")
return
is_win = await self.arbiter.compete(
bot=event.bot,
ctx=ArbiterContext(
message_id=int(raw["message_id"]),
msg_time=int(raw["time"]),
self_id=int(raw["self_id"]),
),
)
if not is_win:
logger.debug("Bot在仲裁中输了, 跳过解析")
return
logger.debug("Bot在仲裁中胜出, 准备解析...")
# 基于link防抖
link = searched.group(0)
if self.debouncer.hit_link(umo, link):
logger.warning(f"[链接防抖] 链接 {link} 在防抖时间内,跳过解析")
return
# 解析
parse_res = await self.parser_map[keyword].parse(keyword, searched)
# 基于资源ID防抖
resource_id = parse_res.get_resource_id()
if self.debouncer.hit_resource(umo, resource_id):
logger.warning(f"[资源防抖] 资源 {resource_id} 在防抖时间内,跳过发送")
return
# 发送
await self.sender.send_parse_result(event, parse_res)
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("开启解析")
async def open_parser(self, event: AstrMessageEvent):
"""开启当前会话的解析"""
umo = event.unified_msg_origin
self.cfg.remove_blacklist(umo)
yield event.plain_result("当前会话的解析已开启")
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("关闭解析")
async def close_parser(self, event: AstrMessageEvent):
"""关闭当前会话的解析"""
umo = event.unified_msg_origin
self.cfg.add_blacklist(umo)
yield event.plain_result("当前会话的解析已关闭")
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("登录B站", alias={"blogin", "登录b站"})
async def login_bilibili(self, event: AstrMessageEvent):
"""扫码登录B站"""
parser: BilibiliParser = self._get_parser_by_type(BilibiliParser) # type: ignore
qrcode = await parser.login.login_with_qrcode()
yield event.chain_result([Image.fromBytes(qrcode)])
async for msg in parser.login.check_qr_state():
yield event.plain_result(msg)