11import logging
22import uuid
3- from typing import Optional
43
54from django .contrib .auth .models import User
5+ from django .db import transaction
66from django .db .models import Count
77from django .template .loader import render_to_string
88from django .utils import timezone
@@ -21,7 +21,7 @@ def send_alert_email_notification(
2121 load_item : LoadItem ,
2222 user : User ,
2323 subscription : AlertSubscription ,
24- thread : Optional [ AlertEmailThread ] ,
24+ thread : AlertEmailThread ,
2525 is_reply : bool = False ,
2626) -> None :
2727 """Helper function to send email and create log entry"""
@@ -38,12 +38,12 @@ def send_alert_email_notification(
3838
3939 try :
4040 if is_reply :
41- subject = f"Re: Hazard Alert: { load_item .event_title } "
41+ subject = f"Re:Hazard Alert: { load_item .event_title } "
4242 template = "email/alert_system/alert_notification_reply.html"
4343 email_type = "Alert Email Notification Reply"
4444 in_reply_to = thread .root_email_message_id
4545 else :
46- subject = f"New Hazard Alert: { load_item .event_title } "
46+ subject = f"Hazard Alert: { load_item .event_title } "
4747 template = "email/alert_system/alert_notification.html"
4848 email_type = "Alert Email Notification"
4949 in_reply_to = None
@@ -58,39 +58,30 @@ def send_alert_email_notification(
5858 html = email_body ,
5959 mailtype = email_type ,
6060 )
61- # Create thread for initial emails
61+
6262 email_log .status = AlertEmailLog .Status .SENT
6363 email_log .email_sent_at = timezone .now ()
6464
6565 if not is_reply :
66- thread , created = AlertEmailThread . objects . get_or_create (
67- user = user ,
68- parent_event_id = load_item . parent_event_id ,
69- defaults = {
70- "root_email_message_id" : message_id ,
71- "root_message_sent_at" : timezone . now (),
72- },
66+ thread . root_email_message_id = message_id
67+ thread . root_message_sent_at = timezone . now ()
68+ thread . save ( update_fields = [ "root_email_message_id" , "root_message_sent_at" ])
69+
70+ logger . info (
71+ f"Alert email thread updated for user [ { user . get_full_name () } ] "
72+ f"with parent event [ { load_item . parent_event_id } ]"
7373 )
74- email_log .thread = thread
75- email_log .save (update_fields = ["status" , "email_sent_at" , "thread" ])
76-
77- if created :
78- logger .info (
79- f"Alert Email thread created for user [{ user .get_full_name ()} ] "
80- f"with parent event [{ load_item .parent_event_id } ]"
81- )
82- else :
83- logger .info (
84- f"Existing thread found for user [{ user .get_full_name ()} ] " f"with parent event [{ load_item .parent_event_id } ]"
85- )
86- else :
87- email_log .save (update_fields = ["status" , "email_sent_at" ])
74+
75+ email_log .save (update_fields = ["status" , "email_sent_at" , "thread" ])
8876 logger .info (f"Alert email sent to [{ user .get_full_name ()} ] for LoadItem ID [{ load_item .id } ]" )
8977
9078 except Exception :
9179 email_log .status = AlertEmailLog .Status .FAILED
9280 email_log .save (update_fields = ["status" ])
93- logger .warning (f"Alert email failed for [{ user .get_full_name ()} ] LoadItem ID [{ load_item .id } ]" , exc_info = True )
81+ logger .warning (
82+ f"Alert email failed for [{ user .get_full_name ()} ] LoadItem ID [{ load_item .id } ]" ,
83+ exc_info = True ,
84+ )
9485
9586
9687def process_email_alert (load_item_id : int ) -> None :
@@ -122,34 +113,24 @@ def process_email_alert(load_item_id: int) -> None:
122113 )
123114 daily_count_map = {(item ["user_id" ], item ["subscription_id" ]): item ["sent_count" ] for item in daily_counts }
124115
125- # Emails already sent for this item (per user)
116+ # NOTE: Include PROCESSING status to block concurrent duplicate sends.
126117 already_sent = set (
127118 AlertEmailLog .objects .filter (
128119 user_id__in = user_ids ,
129120 subscription_id__in = subscription_ids ,
130121 item_id = load_item_id ,
131- status = AlertEmailLog .Status .SENT ,
122+ status__in = [
123+ AlertEmailLog .Status .SENT ,
124+ AlertEmailLog .Status .PROCESSING ,
125+ ],
132126 ).values_list ("user_id" , "subscription_id" )
133127 )
134128
135- # Existing threads for this correlation_id
136- existing_threads = {
137- thread .user_id : thread
138- for thread in AlertEmailThread .objects .filter (
139- parent_event_id = load_item .parent_event_id ,
140- user_id__in = user_ids ,
141- )
142- }
143-
144129 for subscription in subscriptions :
145130 user = subscription .user
146131 user_id : int = user .id
147132 subscription_id : int = subscription .id
148133
149- # Reply if this specific user has an existing thread
150- thread = existing_threads .get (user_id )
151- is_reply : bool = thread is not None
152-
153134 # Skip duplicate emails for same item
154135 if (user_id , subscription_id ) in already_sent :
155136 logger .info (f"Duplicate alert skipped for user [{ user .get_full_name ()} ] " f"with LoadItem ID [{ load_item_id } ]" )
@@ -162,4 +143,23 @@ def process_email_alert(load_item_id: int) -> None:
162143 logger .info (f"Daily alert limit reached for user [{ user .get_full_name ()} ]" )
163144 continue
164145
165- send_alert_email_notification (load_item = load_item , user = user , subscription = subscription , thread = thread , is_reply = is_reply )
146+ # NOTE: root_email_message_id is None until the first email is sent successfully.
147+ with transaction .atomic ():
148+ thread , _ = AlertEmailThread .objects .select_for_update ().get_or_create (
149+ user = user ,
150+ parent_event_id = load_item .parent_event_id ,
151+ defaults = {
152+ "root_email_message_id" : None ,
153+ "root_message_sent_at" : None ,
154+ },
155+ )
156+
157+ is_reply : bool = thread .root_email_message_id is not None
158+
159+ send_alert_email_notification (
160+ load_item = load_item ,
161+ user = user ,
162+ subscription = subscription ,
163+ thread = thread ,
164+ is_reply = is_reply ,
165+ )
0 commit comments