-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery_worker.py
More file actions
35 lines (27 loc) · 879 Bytes
/
celery_worker.py
File metadata and controls
35 lines (27 loc) · 879 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from celery import Celery
from celery.schedules import crontab
import os
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
celery_app = Celery(
__name__,
backend=os.getenv("REDIS_URL"),
broker=os.getenv("REDIS_URL")
)
celery_app.autodiscover_tasks(['app.tasks'])
# Celery Beat configuration for periodic tasks
# This task will run daily at midnight UTC
celery_app.conf.beat_schedule = {
'log-tasks-daily': {
'task': 'app.tasks.log_task.log_tasks_daily',
'schedule': crontab(hour=0, minute=0),
},
}
celery_app.conf.timezone = 'UTC'
def init_celery(app):
celery_app.conf.update(app.config)
class ContextTask(celery_app.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery_app.Task = ContextTask