55from models import NotifyTask , NotifyStatus , get_db
66from notifier import NotificationSender , parse_config
77import logging
8+ import queue
89
910logging .basicConfig (level = logging .INFO )
1011logger = logging .getLogger (__name__ )
1112
1213
14+ class EventManager :
15+ def __init__ (self ):
16+ # listeners: list of (queue, user_id)
17+ self .listeners = []
18+
19+ def listen (self , user_id ):
20+ q = queue .Queue (maxsize = 10 )
21+ self .listeners .append ((q , user_id ))
22+ return q
23+
24+ def announce (self , user_id , msg ):
25+ for i in range (len (self .listeners ) - 1 , - 1 , - 1 ):
26+ q , uid = self .listeners [i ]
27+ if uid == user_id :
28+ try :
29+ q .put_nowait (msg )
30+ except queue .Full :
31+ del self .listeners [i ]
32+
33+ event_manager = EventManager ()
34+
35+
36+ def get_cron_trigger (expression ):
37+ """根据 cron 表达式获取触发器,支持 5 位 (分时日月周) 和 6 位 (秒分时日月周)"""
38+ values = expression .strip ().split ()
39+ if len (values ) == 6 :
40+ return CronTrigger (
41+ second = values [0 ],
42+ minute = values [1 ],
43+ hour = values [2 ],
44+ day = values [3 ],
45+ month = values [4 ],
46+ day_of_week = values [5 ]
47+ )
48+ return CronTrigger .from_crontab (expression )
49+
50+
1351class NotifyScheduler :
1452 """通知调度器"""
1553
@@ -27,23 +65,36 @@ def add_task(self, task: NotifyTask):
2765 """
2866 if task .is_recurring and task .cron_expression :
2967 # 重复任务,使用 cron 表达式
30- trigger = CronTrigger .from_crontab (task .cron_expression )
31- job_id = f"recurring_task_{ task .id } "
68+ try :
69+ trigger = get_cron_trigger (task .cron_expression )
70+ job_id = f"recurring_task_{ task .id } "
71+
72+ self .scheduler .add_job (
73+ func = self ._execute_task ,
74+ trigger = trigger ,
75+ args = [task .id ],
76+ id = job_id ,
77+ replace_existing = True ,
78+ misfire_grace_time = 60 # 错过时间窗口60秒内仍执行
79+ )
80+ logger .info (f"任务 { task .id } 已添加到调度器,计划执行时间: { task .scheduled_time } " )
81+ except Exception as e :
82+ logger .error (f"添加任务 { task .id } 失败,Cron 表达式无效: { e } " )
3283 else :
3384 # 一次性任务,使用指定时间
3485 trigger = DateTrigger (run_date = task .scheduled_time )
3586 job_id = f"task_{ task .id } "
3687
37- self .scheduler .add_job (
38- func = self ._execute_task ,
39- trigger = trigger ,
40- args = [task .id ],
41- id = job_id ,
42- replace_existing = True ,
43- misfire_grace_time = 60 # 错过时间窗口60秒内仍执行
44- )
45-
46- logger .info (f"任务 { task .id } 已添加到调度器,计划执行时间: { task .scheduled_time } " )
88+ self .scheduler .add_job (
89+ func = self ._execute_task ,
90+ trigger = trigger ,
91+ args = [task .id ],
92+ id = job_id ,
93+ replace_existing = True ,
94+ misfire_grace_time = 60 # 错过时间窗口60秒内仍执行
95+ )
96+
97+ logger .info (f"任务 { task .id } 已添加到调度器,计划执行时间: { task .scheduled_time } " )
4798
4899 def remove_task (self , task_id : int , is_recurring : bool = False ):
49100 """
@@ -108,8 +159,7 @@ def _execute_task(self, task_id: int):
108159 # 关键:重复任务执行成功后,滚动更新下一次执行时间(用于列表展示)
109160 if task .is_recurring and task .cron_expression :
110161 try :
111- from apscheduler .triggers .cron import CronTrigger
112- trigger = CronTrigger .from_crontab (task .cron_expression )
162+ trigger = get_cron_trigger (task .cron_expression )
113163 # 以"本次实际执行时间"为基准,计算下一次
114164 base_time = datetime .now ()
115165 next_run = trigger .get_next_fire_time (None , base_time )
@@ -120,12 +170,30 @@ def _execute_task(self, task_id: int):
120170 logger .warning (f"任务 { task_id } 更新下一次执行时间失败: { str (e )} " )
121171
122172 logger .info (f"任务 { task_id } 执行成功" )
173+
174+ # 通知前端
175+ event_manager .announce (task .user_id , {
176+ 'type' : 'task_executed' ,
177+ 'task_id' : task .id ,
178+ 'title' : task .title ,
179+ 'status' : 'sent' ,
180+ 'message' : '发送成功'
181+ })
123182
124183 except Exception as e :
125184 # 更新任务状态为失败
126185 task .status = NotifyStatus .FAILED
127186 task .error_msg = str (e )
128187 logger .error (f"任务 { task_id } 执行失败: { str (e )} " )
188+
189+ # 通知前端
190+ event_manager .announce (task .user_id , {
191+ 'type' : 'task_executed' ,
192+ 'task_id' : task .id ,
193+ 'title' : task .title ,
194+ 'status' : 'failed' ,
195+ 'message' : str (e )
196+ })
129197
130198 db .commit ()
131199
@@ -155,7 +223,7 @@ def load_pending_tasks(self):
155223 # 如果是重复任务且计划时间已过,重新计算下一次执行时间
156224 if task .is_recurring and task .cron_expression and task .scheduled_time < datetime .now ():
157225 try :
158- trigger = CronTrigger . from_crontab (task .cron_expression )
226+ trigger = get_cron_trigger (task .cron_expression )
159227 next_run = trigger .get_next_fire_time (None , datetime .now ())
160228 if next_run :
161229 task .scheduled_time = next_run
0 commit comments