Skip to content

Commit cfd6040

Browse files
committed
支持多渠道消息推送
1 parent 4859d3d commit cfd6040

8 files changed

Lines changed: 1070 additions & 104 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ A: 在 `notifier.py` 中添加新的发送方法,并在 `models.py` 中添加
447447
- [x] 移动端响应式适配与触摸优化
448448
- [x] 拖拽调整任务时间(日历视图)
449449
- [x] 日历订阅及同步功能
450-
- [ ] 通知群组/广播,允许将多个渠道打包成一个群组,实现一次任务多渠道分发
450+
- [x] 支持多渠道消息推送
451451
- [ ] 失败自动重试
452452
- [ ] 重复任务暂停功能
453453
- [ ] 数据导入/导出

app.py

Lines changed: 90 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,29 @@ def create_task():
123123
"""
124124
创建通知任务
125125
126-
请求体示例:
126+
支持单渠道和多渠道两种模式:
127+
128+
单渠道模式 (向后兼容):
127129
{
128130
"title": "测试通知",
129131
"content": "这是一条测试通知",
130132
"channel": "wecom_webhook",
133+
"channel_config": {"webhook_url": "..."},
131134
"scheduled_time": "2024-12-01T10:00:00",
132-
"channel_config": {
133-
"webhook_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"
135+
"is_recurring": false
136+
}
137+
138+
多渠道模式:
139+
{
140+
"title": "测试通知",
141+
"content": "这是一条测试通知",
142+
"channels": ["wecom_webhook", "pushplus"],
143+
"channels_config": {
144+
"wecom_webhook": {"webhook_url": "..."},
145+
"pushplus": {"token": "..."}
134146
},
135-
"is_recurring": false,
136-
"cron_expression": null
147+
"scheduled_time": "2024-12-01T10:00:00",
148+
"is_recurring": false
137149
}
138150
"""
139151
try:
@@ -143,11 +155,20 @@ def create_task():
143155
is_recurring = bool(data.get('is_recurring', False))
144156
cron_expression = data.get('cron_expression')
145157

158+
# 检测是多渠道模式还是单渠道模式
159+
is_multi_channel = 'channels' in data
160+
146161
# 验证必填字段
147-
required_fields = ['title', 'content', 'channel', 'channel_config']
162+
required_fields = ['title', 'content']
163+
if is_multi_channel:
164+
required_fields.extend(['channels', 'channels_config'])
165+
else:
166+
required_fields.extend(['channel', 'channel_config'])
167+
148168
# 非重复任务必须提供 scheduled_time
149169
if not is_recurring:
150170
required_fields.append('scheduled_time')
171+
151172
for field in required_fields:
152173
if field not in data:
153174
return jsonify({'error': f'缺少必填字段: {field}'}), 400
@@ -172,25 +193,48 @@ def create_task():
172193
except ValueError:
173194
return jsonify({'error': '时间格式错误,请使用 ISO 格式,如: 2024-12-01T10:00:00'}), 400
174195

175-
# 验证通知渠道
176-
try:
177-
channel = NotifyChannel(data['channel'])
178-
except ValueError:
179-
valid_channels = [c.value for c in NotifyChannel]
180-
return jsonify({'error': f'无效的通知渠道,支持的渠道: {valid_channels}'}), 400
181-
182196
# 创建任务
183197
with get_db() as db:
184198
task = NotifyTask(
185199
user_id=request.current_user.id,
186200
title=data['title'],
187201
content=data['content'],
188-
channel=channel,
189202
scheduled_time=scheduled_time,
190-
channel_config=json.dumps(data['channel_config'], ensure_ascii=False),
191203
is_recurring=is_recurring,
192204
cron_expression=cron_expression if is_recurring else None
193205
)
206+
207+
if is_multi_channel:
208+
# 多渠道模式
209+
channels = data['channels']
210+
channels_config = data['channels_config']
211+
212+
# 验证所有渠道类型
213+
if not isinstance(channels, list) or len(channels) == 0:
214+
return jsonify({'error': 'channels 必须是非空数组'}), 400
215+
216+
valid_channels = [c.value for c in NotifyChannel]
217+
for ch in channels:
218+
if ch not in valid_channels:
219+
return jsonify({'error': f'无效的通知渠道: {ch},支持的渠道: {valid_channels}'}), 400
220+
221+
# 验证每个渠道都有配置
222+
for ch in channels:
223+
if ch not in channels_config:
224+
return jsonify({'error': f'渠道 {ch} 缺少配置信息'}), 400
225+
226+
task.channels_json = json.dumps(channels, ensure_ascii=False)
227+
task.channels_config_json = json.dumps(channels_config, ensure_ascii=False)
228+
else:
229+
# 单渠道模式(向后兼容)
230+
try:
231+
channel = NotifyChannel(data['channel'])
232+
except ValueError:
233+
valid_channels = [c.value for c in NotifyChannel]
234+
return jsonify({'error': f'无效的通知渠道,支持的渠道: {valid_channels}'}), 400
235+
236+
task.channel = channel
237+
task.channel_config = json.dumps(data['channel_config'], ensure_ascii=False)
194238

195239
db.add(task)
196240
db.commit()
@@ -322,8 +366,9 @@ def update_task(task_id):
322366
"""
323367
更新任务
324368
325-
可更新字段: title, content, scheduled_time, channel_config, status
369+
可更新字段: title, content, scheduled_time, channel_config, channels_config, status
326370
支持重新启用已取消或已执行的任务,以及暂停/恢复重复任务
371+
支持在单渠道和多渠道模式间切换
327372
"""
328373
try:
329374
data = request.get_json()
@@ -386,12 +431,37 @@ def update_task(task_id):
386431
except Exception as e:
387432
return jsonify({'error': f'状态更新失败: {str(e)}'}), 400
388433

389-
# 更新字段
434+
# 更新基本字段
390435
if 'title' in data:
391436
task.title = data['title']
392437
if 'content' in data:
393438
task.content = data['content']
394-
if 'channel_config' in data:
439+
440+
# 处理渠道配置更新(支持单渠道和多渠道模式)
441+
if 'channels' in data and 'channels_config' in data:
442+
# 更新为多渠道模式
443+
channels = data['channels']
444+
channels_config = data['channels_config']
445+
446+
if not isinstance(channels, list) or len(channels) == 0:
447+
return jsonify({'error': 'channels 必须是非空数组'}), 400
448+
449+
valid_channels = [c.value for c in NotifyChannel]
450+
for ch in channels:
451+
if ch not in valid_channels:
452+
return jsonify({'error': f'无效的通知渠道: {ch}'}), 400
453+
454+
for ch in channels:
455+
if ch not in channels_config:
456+
return jsonify({'error': f'渠道 {ch} 缺少配置信息'}), 400
457+
458+
task.channels_json = json.dumps(channels, ensure_ascii=False)
459+
task.channels_config_json = json.dumps(channels_config, ensure_ascii=False)
460+
# 清空单渠道字段
461+
task.channel = None
462+
task.channel_config = None
463+
elif 'channel_config' in data:
464+
# 更新单渠道模式的配置
395465
task.channel_config = json.dumps(data['channel_config'], ensure_ascii=False)
396466

397467
# 处理时间更新
@@ -420,6 +490,8 @@ def update_task(task_id):
420490
task.status = NotifyStatus.PENDING
421491
task.sent_time = None
422492
task.error_msg = None
493+
# 清空多渠道发送结果
494+
task.send_results = None
423495

424496
db.commit()
425497

models.py

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,16 @@ class NotifyTask(Base):
167167
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, comment="用户ID")
168168
title = Column(String(200), nullable=False, comment="通知标题")
169169
content = Column(Text, nullable=False, comment="通知内容")
170-
channel = Column(Enum(NotifyChannel, values_callable=lambda obj: [e.value for e in NotifyChannel]), nullable=False, comment="通知渠道")
170+
channel = Column(Enum(NotifyChannel, values_callable=lambda obj: [e.value for e in NotifyChannel]), nullable=True, comment="通知渠道(单渠道模式)")
171171
scheduled_time = Column(DateTime, nullable=False, comment="计划发送时间")
172172

173173
# 渠道配置(JSON格式字符串)
174-
channel_config = Column(Text, nullable=False, comment="渠道配置信息")
174+
channel_config = Column(Text, nullable=True, comment="渠道配置信息(单渠道模式)")
175+
176+
# 多渠道支持
177+
channels_json = Column(Text, nullable=True, comment="通知渠道数组(JSON格式,多渠道模式)")
178+
channels_config_json = Column(Text, nullable=True, comment="渠道配置映射(JSON格式,多渠道模式)")
179+
send_results = Column(Text, nullable=True, comment="各渠道发送结果(JSON格式)")
175180

176181
# 状态相关
177182
status = Column(Enum(NotifyStatus, values_callable=lambda obj: [e.value for e in NotifyStatus]), default=NotifyStatus.PENDING, comment="发送状态")
@@ -201,12 +206,28 @@ def to_dict(self):
201206
channel_config = ast.literal_eval(self.channel_config) if self.channel_config else {}
202207
except:
203208
channel_config = {}
209+
210+
# 安全解析多渠道字段
211+
try:
212+
channels = json.loads(self.channels_json) if self.channels_json else None
213+
except (json.JSONDecodeError, TypeError):
214+
channels = None
215+
216+
try:
217+
channels_config = json.loads(self.channels_config_json) if self.channels_config_json else None
218+
except (json.JSONDecodeError, TypeError):
219+
channels_config = None
220+
221+
try:
222+
send_results = json.loads(self.send_results) if self.send_results else None
223+
except (json.JSONDecodeError, TypeError):
224+
send_results = None
204225

205-
return {
226+
result = {
206227
'id': self.id,
207228
'title': self.title,
208229
'content': self.content,
209-
'channel': self.channel.value,
230+
'channel': self.channel.value if self.channel else None,
210231
'scheduled_time': self.scheduled_time.isoformat() if self.scheduled_time else None,
211232
'status': self.status.value,
212233
'sent_time': self.sent_time.isoformat() if self.sent_time else None,
@@ -217,6 +238,14 @@ def to_dict(self):
217238
'channel_config': channel_config,
218239
'external_uid': self.external_uid
219240
}
241+
242+
# 添加多渠道字段(如果存在)
243+
if channels:
244+
result['channels'] = channels
245+
result['channels_config'] = channels_config
246+
result['send_results'] = send_results
247+
248+
return result
220249

221250

222251
# 数据库配置
@@ -240,13 +269,82 @@ def init_db():
240269
except Exception:
241270
print("Migrating: Adding calendar_token to users table...")
242271
conn.execute(text("ALTER TABLE users ADD COLUMN calendar_token VARCHAR(64)"))
272+
conn.commit()
243273

244274
# 2. 检查 notify_tasks.external_uid
245275
try:
246276
conn.execute(text("SELECT external_uid FROM notify_tasks LIMIT 1"))
247277
except Exception:
248278
print("Migrating: Adding external_uid to notify_tasks table...")
249279
conn.execute(text("ALTER TABLE notify_tasks ADD COLUMN external_uid VARCHAR(255)"))
280+
conn.commit()
281+
282+
# 3. 检查 notify_tasks.channels_json(多渠道支持)
283+
try:
284+
conn.execute(text("SELECT channels_json FROM notify_tasks LIMIT 1"))
285+
except Exception:
286+
print("Migrating: Adding multi-channel support fields to notify_tasks table...")
287+
conn.execute(text("ALTER TABLE notify_tasks ADD COLUMN channels_json TEXT"))
288+
conn.execute(text("ALTER TABLE notify_tasks ADD COLUMN channels_config_json TEXT"))
289+
conn.execute(text("ALTER TABLE notify_tasks ADD COLUMN send_results TEXT"))
290+
conn.commit()
291+
292+
# 4. 移除 channel 和 channel_config 的 NOT NULL 约束(多渠道模式需要)
293+
# SQLite 不支持直接修改列约束,需要重建表
294+
try:
295+
# 检查是否已经迁移(通过尝试插入 channel=NULL 的记录)
296+
result = conn.execute(text("SELECT sql FROM sqlite_master WHERE type='table' AND name='notify_tasks'"))
297+
table_schema = result.fetchone()[0]
298+
299+
# 如果表结构中 channel 字段仍有 NOT NULL 约束
300+
if 'channel' in table_schema and 'channel TEXT NOT NULL' in table_schema:
301+
print("Migrating: Removing NOT NULL constraint from channel and channel_config...")
302+
303+
# 创建临时表
304+
conn.execute(text("""
305+
CREATE TABLE notify_tasks_new (
306+
id INTEGER PRIMARY KEY AUTOINCREMENT,
307+
user_id INTEGER NOT NULL,
308+
title VARCHAR(200) NOT NULL,
309+
content TEXT NOT NULL,
310+
channel TEXT,
311+
scheduled_time DATETIME NOT NULL,
312+
channel_config TEXT,
313+
channels_json TEXT,
314+
channels_config_json TEXT,
315+
send_results TEXT,
316+
status TEXT DEFAULT 'pending',
317+
sent_time DATETIME,
318+
error_msg TEXT,
319+
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
320+
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
321+
is_recurring BOOLEAN DEFAULT 0,
322+
cron_expression VARCHAR(100),
323+
external_uid VARCHAR(255),
324+
FOREIGN KEY (user_id) REFERENCES users(id)
325+
)
326+
"""))
327+
328+
# 复制数据
329+
conn.execute(text("""
330+
INSERT INTO notify_tasks_new
331+
SELECT id, user_id, title, content, channel, scheduled_time,
332+
channel_config, channels_json, channels_config_json, send_results,
333+
status, sent_time, error_msg, created_at, updated_at,
334+
is_recurring, cron_expression, external_uid
335+
FROM notify_tasks
336+
"""))
337+
338+
# 删除旧表
339+
conn.execute(text("DROP TABLE notify_tasks"))
340+
341+
# 重命名新表
342+
conn.execute(text("ALTER TABLE notify_tasks_new RENAME TO notify_tasks"))
343+
344+
conn.commit()
345+
print("Migration completed: channel and channel_config are now nullable")
346+
except Exception as e:
347+
print(f"Channel nullable migration info: {e}")
250348
except Exception as e:
251349
print(f"Migration warning: {e}")
252350

0 commit comments

Comments
 (0)