Skip to content

Commit 38ab9b8

Browse files
committed
feat: implement scheduling functionality in ScheduledTrigger and enhance trigger deployment logic
1 parent 73f0d94 commit 38ab9b8

File tree

3 files changed

+260
-12
lines changed

3 files changed

+260
-12
lines changed
Lines changed: 235 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,200 @@
11
# coding=utf-8
2-
"""
3-
@project: MaxKB
4-
@Author:虎虎
5-
@file: scheduled_trigger.py
6-
@date:2026/1/14 18:57
7-
@desc:
8-
"""
2+
from __future__ import annotations
3+
4+
import random
5+
6+
from django.db.models import QuerySet
7+
8+
from common.utils.lock import RedisLock
9+
from common.utils.logger import maxkb_logger
10+
from ops import celery_app
911
from trigger.handler.base_trigger import BaseTrigger
12+
from trigger.models import TriggerTask
13+
14+
15+
def _parse_hhmm(value: str) -> tuple[int, int]:
16+
hour_str, minute_str = (value or "").split(":")
17+
hour = int(hour_str)
18+
minute = int(minute_str)
19+
if not (0 <= hour <= 23 and 0 <= minute <= 59):
20+
raise ValueError("hour/minute out of range")
21+
return hour, minute
22+
23+
24+
def _weekday_to_cron(d: int | str) -> str:
25+
mapping = {1: "mon", 2: "tue", 3: "wed", 4: "thu", 5: "fri", 6: "sat", 7: "sun", 0: "sun"}
26+
di = int(d)
27+
if di not in mapping:
28+
raise ValueError("invalid weekday")
29+
return mapping[di]
30+
31+
32+
def _get_active_trigger_tasks(trigger_id: str) -> list[dict]:
33+
return list(
34+
QuerySet(TriggerTask)
35+
.filter(trigger_id=trigger_id, is_active=True)
36+
.values("id", "source_type", "source_id", "parameter")
37+
)
38+
39+
40+
def _deploy_daily(trigger: dict, trigger_tasks: list[dict], setting: dict, trigger_id: str, func) -> None:
41+
from common.job import scheduler
42+
43+
times = setting.get("time") or []
44+
for t in times:
45+
try:
46+
hour, minute = _parse_hhmm(t)
47+
except Exception:
48+
maxkb_logger.warning(f"invalid time={t}, trigger_id={trigger_id}")
49+
continue
50+
51+
for task in trigger_tasks:
52+
job_id = f"trigger:{trigger_id}:task:{task['id']}:daily:{hour:02d}{minute:02d}"
53+
scheduler.add_job(
54+
func,
55+
trigger="cron",
56+
hour=str(hour),
57+
minute=str(minute),
58+
id=job_id,
59+
kwargs={"trigger": trigger, "trigger_tasks": trigger_tasks},
60+
replace_existing=True,
61+
)
62+
63+
64+
def _deploy_weekly(trigger: dict, trigger_tasks: list[dict], setting: dict, trigger_id: str, func) -> None:
65+
from common.job import scheduler
66+
67+
times = setting.get("time") or []
68+
days = setting.get("days") or []
69+
if not times or not days:
70+
maxkb_logger.warning(f"empty weekly setting, trigger_id={trigger_id}")
71+
return
72+
73+
for d in days:
74+
try:
75+
dow = _weekday_to_cron(d)
76+
except Exception:
77+
maxkb_logger.warning(f"invalid weekday={d}, trigger_id={trigger_id}")
78+
continue
79+
80+
for t in times:
81+
try:
82+
hour, minute = _parse_hhmm(t)
83+
except Exception:
84+
maxkb_logger.warning(f"invalid time={t}, trigger_id={trigger_id}")
85+
continue
86+
87+
for task in trigger_tasks:
88+
job_id = f"trigger:{trigger_id}:task:{task['id']}:weekly:{dow}:{hour:02d}{minute:02d}"
89+
scheduler.add_job(
90+
func,
91+
trigger="cron",
92+
day_of_week=dow,
93+
hour=str(hour),
94+
minute=str(minute),
95+
id=job_id,
96+
kwargs={"trigger": trigger, "trigger_tasks": trigger_tasks},
97+
replace_existing=True,
98+
)
99+
100+
101+
def _deploy_monthly(trigger: dict, trigger_tasks: list[dict], setting: dict, trigger_id: str, func) -> None:
102+
from common.job import scheduler
103+
104+
times = setting.get("time") or []
105+
days = setting.get("days") or []
106+
if not times or not days:
107+
maxkb_logger.warning(f"empty monthly setting, trigger_id={trigger_id}")
108+
return
109+
110+
for d in days:
111+
try:
112+
dom = int(d)
113+
if not (1 <= dom <= 31):
114+
raise ValueError("invalid day of month")
115+
except Exception:
116+
maxkb_logger.warning(f"invalid day={d}, trigger_id={trigger_id}")
117+
continue
118+
119+
for t in times:
120+
try:
121+
hour, minute = _parse_hhmm(t)
122+
except Exception:
123+
maxkb_logger.warning(f"invalid time={t}, trigger_id={trigger_id}")
124+
continue
125+
126+
for task in trigger_tasks:
127+
job_id = f"trigger:{trigger_id}:task:{task['id']}:monthly:{dom:02d}:{hour:02d}{minute:02d}"
128+
scheduler.add_job(
129+
func,
130+
trigger="cron",
131+
day=str(dom),
132+
hour=str(hour),
133+
minute=str(minute),
134+
id=job_id,
135+
kwargs={"trigger": trigger, "trigger_tasks": trigger_tasks},
136+
replace_existing=True,
137+
)
138+
139+
140+
def _deploy_interval(trigger: dict, trigger_tasks: list[dict], setting: dict, trigger_id: str, func) -> None:
141+
from common.job import scheduler
142+
143+
unit = (setting.get("interval_unit") or "").strip()
144+
value = setting.get("interval_value")
145+
146+
try:
147+
value_i = int(value)
148+
if value_i <= 0:
149+
raise ValueError("interval_value must be positive")
150+
except Exception:
151+
maxkb_logger.warning(f"invalid interval_value={value}, trigger_id={trigger_id}")
152+
return
153+
154+
if unit not in {"seconds", "minutes", "hours", "days"}:
155+
maxkb_logger.warning(f"invalid interval_unit={unit}, trigger_id={trigger_id}")
156+
return
157+
158+
for task in trigger_tasks:
159+
job_id = f"trigger:{trigger_id}:task:{task['id']}:interval:{unit}:{value_i}"
160+
scheduler.add_job(
161+
func,
162+
trigger="interval",
163+
id=job_id,
164+
kwargs={"trigger": trigger, "trigger_tasks": trigger_tasks},
165+
replace_existing=True,
166+
**{unit: value_i},
167+
)
168+
169+
170+
def _remove_trigger_jobs(trigger_id: str) -> None:
171+
from common.job import scheduler
172+
173+
prefix = f"trigger:{trigger_id}:"
174+
for job in scheduler.get_jobs():
175+
if getattr(job, "id", "").startswith(prefix):
176+
try:
177+
job.remove()
178+
except Exception as e:
179+
maxkb_logger.warning(f"remove job failed, job_id={job.id}, err={e}")
180+
181+
182+
@celery_app.task(name='celery:deploy_scheduled_trigger')
183+
def deploy_scheduled_trigger(trigger: dict, trigger_tasks: list[dict], setting: dict, schedule_type: str, func) -> None:
184+
_remove_trigger_jobs(trigger['id'])
185+
186+
deployers = {
187+
"daily": _deploy_daily,
188+
"weekly": _deploy_weekly,
189+
"monthly": _deploy_monthly,
190+
"interval": _deploy_interval,
191+
}
192+
fn = deployers.get(schedule_type)
193+
if not fn:
194+
maxkb_logger.warning(f"unsupported schedule_type={schedule_type}, trigger_id={trigger['id']}")
195+
return
196+
197+
fn(trigger, trigger_tasks, setting, trigger['id'], func)
10198

11199

12200
class ScheduledTrigger(BaseTrigger):
@@ -16,13 +204,49 @@ class ScheduledTrigger(BaseTrigger):
16204

17205
@staticmethod
18206
def execute(trigger, **kwargs):
19-
pass
207+
n = random.randint(1, 1_000_000_000)
208+
209+
maxkb_logger.info(f"scheduled trigger execute, trigger={n}")
20210

21211
def support(self, trigger, **kwargs):
22-
return trigger.get('trigger_type') == 'SCHEDULED'
212+
return trigger.get("trigger_type") == "SCHEDULED"
23213

24214
def deploy(self, trigger, **kwargs):
25-
pass
215+
trigger_id = str(trigger["id"])
216+
setting = trigger.get("trigger_setting") or {}
217+
schedule_type = setting.get("schedule_type")
218+
219+
if not trigger.get("is_active", True):
220+
self.undeploy(trigger, **kwargs)
221+
return
222+
223+
trigger_tasks = _get_active_trigger_tasks(trigger["id"])
224+
if not trigger_tasks:
225+
maxkb_logger.warning(f"no active trigger_tasks, trigger_id={trigger_id}")
226+
self.undeploy(trigger, **kwargs)
227+
return
228+
229+
rlock = RedisLock()
230+
lock_key = f"scheduled_trigger_deploy:{trigger_id}"
231+
if not rlock.try_lock(lock_key, 30):
232+
return
233+
234+
try:
235+
maxkb_logger.debug(f"get lock {lock_key}")
236+
deploy_scheduled_trigger.delay(trigger, trigger_tasks, setting, schedule_type, self.execute)
237+
238+
finally:
239+
rlock.un_lock(lock_key)
26240

27241
def undeploy(self, trigger, **kwargs):
28-
pass
242+
trigger_id = str(trigger["id"])
243+
244+
rlock = RedisLock()
245+
lock_key = f"scheduled_trigger_deploy:{trigger_id}"
246+
if not rlock.try_lock(lock_key, 30):
247+
return
248+
249+
try:
250+
_remove_trigger_jobs(trigger_id)
251+
finally:
252+
rlock.un_lock(lock_key)

apps/trigger/serializers/trigger.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from knowledge.serializers.common import BatchSerializer
2626
from maxkb.conf import PROJECT_DIR
2727
from tools.models import Tool
28+
from trigger.handler.simple_tools import deploy, undeploy
2829
from trigger.models import TriggerTypeChoices, Trigger, TriggerTaskTypeChoices, TriggerTask, TaskRecord
2930

3031

@@ -401,12 +402,17 @@ def batch_delete(self, instance: Dict, with_valid=True):
401402
self.is_valid(raise_exception=True)
402403
workspace_id = self.data.get("workspace_id")
403404
trigger_id_list = instance.get("id_list")
405+
for trigger_id in trigger_id_list:
406+
trigger = QuerySet(Trigger).filter(id=trigger_id).first()
407+
undeploy(TriggerModelSerializer(trigger).data, **{})
408+
404409
TaskRecord.objects.filter(trigger_id__in=trigger_id_list).delete()
405410
TriggerTask.objects.filter(trigger_id__in=trigger_id_list).delete()
406411
Trigger.objects.filter(workspace_id=workspace_id, id__in=trigger_id_list).delete()
407412

408413
return True
409414

415+
@transaction.atomic
410416
def batch_switch(self, instance: Dict, with_valid=True):
411417
if with_valid:
412418
BatchActiveSerializer(data=instance).is_valid(model=Trigger, raise_exception=True)
@@ -416,6 +422,15 @@ def batch_switch(self, instance: Dict, with_valid=True):
416422
is_active = instance.get("is_active")
417423
Trigger.objects.filter(workspace_id=workspace_id, id__in=trigger_id_list, is_active=not is_active).update(
418424
is_active=is_active)
425+
if is_active:
426+
for trigger_id in trigger_id_list:
427+
trigger = QuerySet(Trigger).filter(id=trigger_id).first()
428+
deploy(TriggerModelSerializer(trigger).data, **{})
429+
else:
430+
for trigger_id in trigger_id_list:
431+
trigger = QuerySet(Trigger).filter(id=trigger_id).first()
432+
undeploy(TriggerModelSerializer(trigger).data, **{})
433+
419434
return True
420435

421436

@@ -464,11 +479,20 @@ def edit(self, instance: Dict, with_valid=True):
464479
) for task_data in trigger_tasks]
465480
TriggerTask.objects.bulk_create(trigger_task_model_list)
466481

482+
# 重新部署触发器任务
483+
if trigger.is_active:
484+
deploy(TriggerModelSerializer(trigger).data, **{})
485+
else:
486+
undeploy(TriggerModelSerializer(trigger).data, **{})
487+
467488
return self.one(with_valid=False)
468489

469490
def delete(self):
470491
self.is_valid(raise_exception=True)
471492
trigger_id = self.data.get('trigger_id')
493+
trigger = QuerySet(Trigger).filter(workspace_id=self.data.get('workspace_id'), id=trigger_id).first()
494+
if trigger:
495+
undeploy(TriggerModelSerializer(trigger).data, **{})
472496
TaskRecord.objects.filter(trigger_id=trigger_id).delete()
473497
TriggerTask.objects.filter(trigger_id=trigger_id).delete()
474498
Trigger.objects.filter(id=trigger_id).delete()

apps/users/apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ class UsersConfig(AppConfig):
66
name = 'users'
77

88
def ready(self):
9-
from ops.celery import signal_handler
9+
from ops.celery import signal_handler # noqa

0 commit comments

Comments
 (0)