-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathsignal_handler.py
More file actions
76 lines (62 loc) · 2.49 KB
/
signal_handler.py
File metadata and controls
76 lines (62 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# -*- coding: utf-8 -*-
#
import logging
import os
from common.init import init_template
from celery import subtask
from celery.signals import (
worker_ready, worker_shutdown, after_setup_logger, task_revoked, task_prerun
)
from django.core.cache import cache
from django_celery_beat.models import PeriodicTask
from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
from .logger import CeleryThreadTaskFileHandler
logger = logging.getLogger(__file__)
safe_str = lambda x: x
@worker_ready.connect
def on_app_ready(sender=None, headers=None, **kwargs):
if cache.get("CELERY_APP_READY", 0) == 1:
return
cache.set("CELERY_APP_READY", 1, 10)
tasks = get_after_app_ready_tasks()
logger.debug("Work ready signal recv")
logger.debug("Start need start task: [{}]".format(", ".join(tasks)))
for task in tasks:
periodic_task = PeriodicTask.objects.filter(task=task).first()
if periodic_task and not periodic_task.enabled:
logger.debug("Periodic task [{}] is disabled!".format(task))
continue
subtask(task).delay()
init_template.run()
def delete_files(directory):
if os.path.isdir(directory):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
os.remove(file_path)
@worker_shutdown.connect
def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
return
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
tasks = get_after_app_shutdown_clean_tasks()
logger.debug("Worker shutdown signal recv")
logger.debug("Clean period tasks: [{}]".format(', '.join(tasks)))
PeriodicTask.objects.filter(name__in=tasks).delete()
@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
task_handler = CeleryThreadTaskFileHandler()
task_handler.setLevel(loglevel)
formatter = logging.Formatter(format)
task_handler.setFormatter(formatter)
logger.addHandler(task_handler)
@task_revoked.connect
def on_task_revoked(request, terminated, signum, expired, **kwargs):
print('task_revoked', terminated)
@task_prerun.connect
def on_taskaa_start(sender, task_id, **kwargs):
pass
# sender.update_state(state='REVOKED',
# meta={'exc_type': 'Exception', 'exc': 'Exception', 'message': '暂停任务', 'exc_message': ''})