-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_message.py
More file actions
270 lines (232 loc) · 9.27 KB
/
send_message.py
File metadata and controls
270 lines (232 loc) · 9.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# Copyright 2025 AptS-1547
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OneBot V11 客户端
本模块提供了一个 OneBot V11 客户端的实现,支持通过 WebSocket 和 HTTP 发送消息。
作者:AptS:1547
版本:0.1.0-alpha
日期:2025-04-17
本程序遵循 Apache License 2.0 许可证
"""
import json
import logging
from typing import Union, List, Dict, Any
import aiohttp
logger = logging.getLogger(__name__)
class WSConnectionException(Exception):
"""WebSocket 连接异常"""
class OnebotClient: # pylint: disable=too-few-public-methods
"""OneBot 客户端基类"""
def __init__(self, onebot_url: str, access_token: str = ""):
"""
初始化 OneBot 客户端
参数:
onebot_url: OneBot 实现的 URL 地址
access_token: 鉴权 token,如果有的话
"""
self.onebot_url = onebot_url
self.access_token = access_token
async def send_message(
self,
onebot_type: str,
onebot_id: int,
message: Union[str, List[Dict[str, Any]]],
auto_escape: bool = False
) -> Dict[str, Any]:
"""
发送消息的抽象方法,子类需要实现
参数:
onebot_type: 消息类型,例如 "group" 或 "private"
onebot_id: 消息接收者的 ID
message: 要发送的消息,可以是字符串或消息段列表
auto_escape: 是否转义 CQ 码,默认为 False
返回:
API 响应
"""
raise NotImplementedError("send_message 方法需要在子类中实现")
class OneBotWebSocketClient(OnebotClient): # pylint: disable=too-few-public-methods
"""基于 aiohttp 的 OneBot V11 WebSocket 客户端,仅用于发送消息"""
async def send_message(
self,
onebot_type: str,
onebot_id: int,
message: Union[str, List[Dict[str, Any]]],
auto_escape: bool = False
) -> Dict[str, Any]:
"""
发送群消息
参数:
user_id: 发送消息的用户 ID
onebot_id: 发送消息的群 ID
message: 要发送的消息,可以是字符串或消息段列表
auto_escape: 是否转义 CQ 码,默认为 False
返回:
API 响应
"""
request = {
"action": "send_msg",
"params": {
"message_type": onebot_type,
"message": message,
"auto_escape": auto_escape
},
"echo": "The_ESAP_Project_Github_Notification"
}
if onebot_type == "group":
request["params"]["group_id"] = onebot_id
elif onebot_type == "private":
request["params"]["user_id"] = onebot_id
# 准备 headers
headers = {}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
try:
# 使用 aiohttp 的 ClientSession
async with aiohttp.ClientSession() as session:
# 创建 WebSocket 连接
async with session.ws_connect(
self.onebot_url,
headers=headers
) as ws:
# 发送请求
await ws.send_str(json.dumps(request))
# 接收响应
response = await ws.receive()
# 处理消息类型
if response.type == aiohttp.WSMsgType.TEXT:
return json.loads(response.data)
if response.type == aiohttp.WSMsgType.CLOSED:
logger.debug("WebSocket 连接已关闭")
return {"status": "ok", "retcode": 0, "data": {"message_id": -1}}
if response.type == aiohttp.WSMsgType.ERROR:
logger.error("WebSocket 连接错误: %s", ws.exception())
raise WSConnectionException("WebSocket 连接错误")
logger.warning("收到未知类型的消息: %s", response.type)
return {"status": "error", "retcode": -1, "message": f"未知响应类型: {response.type}"} # pylint: disable=line-too-long
except aiohttp.ClientError as e:
logger.error("aiohttp 客户端错误: %s", e)
raise
except Exception as e:
logger.error("发送群消息时出错: %s", e)
raise
class OneBotHTTPClient(OnebotClient): # pylint: disable=too-few-public-methods
"""基于 aiohttp 的 OneBot V11 HTTP 客户端,仅用于发送消息"""
async def send_message(
self,
onebot_type: str,
onebot_id: int,
message: Union[str, List[Dict[str, Any]]],
auto_escape: bool = False
) -> Dict[str, Any]:
"""
发送群消息
参数:
user_id: 发送消息的用户 ID
onebot_id: 发送消息的群 ID
message: 要发送的消息,可以是字符串或消息段列表
auto_escape: 是否转义 CQ 码,默认为 False
返回:
API 响应
"""
request = {
"message_type": onebot_type,
"message": message,
"auto_escape": auto_escape
}
if onebot_type == "group":
request["group_id"] = onebot_id
elif onebot_type == "private":
request["user_id"] = onebot_id
# 准备 headers
headers = {}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
headers["Content-Type"] = "application/json"
try:
# 使用 aiohttp 的 ClientSession
async with aiohttp.ClientSession() as session:
# 发送请求
async with session.post(
self.onebot_url + "/send_msg",
json=request,
headers=headers
) as response:
# 检查响应状态码
if response.status != 200:
logger.error("HTTP 请求失败,状态码: %s", response.status)
return {"status": "error", "retcode": -1, "message": f"HTTP 错误: {response.status}"} # pylint: disable=line-too-long
# 解析 JSON 响应
data = await response.json()
return data
except aiohttp.ClientError as e:
logger.error("aiohttp 客户端错误: %s", e)
raise
except Exception as e:
logger.error("发送群消息时出错: %s", e)
raise
# 消息段工具函数
def text(content: str) -> Dict[str, Any]:
"""纯文本消息"""
return {"type": "text", "data": {"text": content}}
def at(qq: int) -> Dict[str, Any]:
"""@某人"""
return {"type": "at", "data": {"qq": str(qq)}}
def image(file: str) -> Dict[str, Any]:
"""图片消息"""
return {"type": "image", "data": {"file": file}}
async def send_github_notification( # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
onebot_type: str,
onebot_url: str,
access_token: str,
repo_name: str,
branch: str,
pusher: str,
commit_count: int,
commits: List[Dict],
onebot_send_type: str = "group",
onebot_id: int = 0
):
"""发送 GitHub 推送通知"""
if onebot_type == "ws":
client = OneBotWebSocketClient(onebot_url, access_token)
elif onebot_type == "http":
client = OneBotHTTPClient(onebot_url, access_token)
else:
logger.error("不支持的 OneBot 连接类型: %s", onebot_type)
return None
# 构建消息
message = [
text("📢 GitHub 推送通知\n"),
text(f"仓库:{repo_name}\n"),
text(f"分支:{branch}\n"),
text(f"推送者:{pusher}\n"),
text(f"提交数量:{commit_count}\n\n")
]
# 添加最近的提交信息(最多3条)
for i, commit in enumerate(commits[:3]):
commit_id = commit.get("id", "")[:7]
commit_msg = commit.get("message", "").split("\n")[0] # 只取第一行
author = commit.get("author", {}).get("name", "")
message.append(text(f"[{i+1}] {commit_id} by {author}\n"))
message.append(text(f" {commit_msg}\n"))
# 发送消息
try:
if onebot_send_type in ["group", "private"]:
result = await client.send_message(onebot_send_type, onebot_id, message)
else:
logger.error("不支持的 OneBot 类型: %s", onebot_send_type)
return None
return result
except Exception as e: # pylint: disable=broad-except
logger.error("发送 GitHub 通知失败: %s", e)
return None