-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtasks.py
More file actions
57 lines (50 loc) · 1.87 KB
/
tasks.py
File metadata and controls
57 lines (50 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import datetime
from mailing.typing import ContextDataDict, EmailDataToPrepare, MailDataDict
from mailing.utils import prepare_mail_data, send_mass_mail
from procollab.celery import app
from vacancy.mapping import (
CeleryEmailParams,
EmailParamsType,
MessageTypeEnum,
create_text_for_email,
get_link,
message_type_to_button_text,
message_type_to_title,
)
from vacancy.models import Vacancy
@app.task
def send_email(data: EmailParamsType):
context_data = ContextDataDict(
text=create_text_for_email(data),
title=message_type_to_title[data["message_type"]],
button_link=get_link(data),
button_text=message_type_to_button_text[data["message_type"]],
)
mail_data: MailDataDict = prepare_mail_data(
EmailDataToPrepare(
users_ids=[data["user_id"]],
schema_id=data["schema_id"],
subject=message_type_to_title[data["message_type"]],
context_data=context_data,
)
)
send_mass_mail(**mail_data)
@app.task
def email_notificate_vacancy_outdated():
"""Уведомление лидера по email о том, что вакансия просрочилась"""
expiration_check = datetime.datetime.now() - datetime.timedelta(days=30)
outdated_active_vacancies = Vacancy.objects.select_related(
"project", "project__leader"
).filter(is_active=True, datetime_created__lte=expiration_check)
for vacancy in outdated_active_vacancies:
project = vacancy.project
data_to_send = CeleryEmailParams(
message_type=MessageTypeEnum.OUTDATED.value,
user_id=project.leader.id,
project_name=project.name,
project_id=project.id,
vacancy_role=vacancy.role,
schema_id=2,
)
send_email.delay(data_to_send)
outdated_active_vacancies.update(is_active=False)