Skip to content

Commit 6543094

Browse files
committed
fix: implement RedisLock for task execution to prevent concurrent processing
--bug=1068941@tapd-62980211 --user=刘瑞斌 集群部署时,因mk做了负载而触发器没有加锁,会触发两次 https://www.tapd.cn/62980211/s/1916945
1 parent 0a1df8c commit 6543094

1 file changed

Lines changed: 18 additions & 11 deletions

File tree

apps/trigger/handler/impl/trigger/scheduled_trigger.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# coding=utf-8
2+
from celery_once import QueueOnce
23
from django.db.models import QuerySet
34

5+
from common.utils.lock import RedisLock
46
from common.utils.logger import maxkb_logger
57
from ops import celery_app
68
from trigger.handler.base_trigger import BaseTrigger
@@ -239,17 +241,22 @@ def execute(trigger, **kwargs):
239241
maxkb_logger.warning(f"unsupported task={trigger_task}")
240242
return
241243
source_type = trigger_task["source_type"]
242-
243-
if source_type == "APPLICATION":
244-
from trigger.handler.impl.task.application_task import ApplicationTask
245-
246-
ApplicationTask().execute(trigger_task, **kwargs)
247-
elif source_type == "TOOL":
248-
from trigger.handler.impl.task.tool_task import ToolTask
249-
250-
ToolTask().execute(trigger_task, **kwargs)
251-
else:
252-
maxkb_logger.warning(f"unsupported source_type={source_type}, task_id={trigger_task['id']}")
244+
rlock = RedisLock()
245+
source_id = str(trigger_task["source_id"])
246+
if rlock.try_lock(source_id, 30 * 30):
247+
try:
248+
if source_type == "APPLICATION":
249+
from trigger.handler.impl.task.application_task import ApplicationTask
250+
251+
ApplicationTask().execute(trigger_task, **kwargs)
252+
elif source_type == "TOOL":
253+
from trigger.handler.impl.task.tool_task import ToolTask
254+
255+
ToolTask().execute(trigger_task, **kwargs)
256+
else:
257+
maxkb_logger.warning(f"unsupported source_type={source_type}, task_id={trigger_task['id']}")
258+
finally:
259+
rlock.un_lock(source_id)
253260

254261
def support(self, trigger, **kwargs):
255262
return trigger.get("trigger_type") == "SCHEDULED"

0 commit comments

Comments
 (0)