1+ import asyncpg
2+ import asyncio
3+ import os
4+ from dotenv import load_dotenv
5+ from gmail_service import send_email
6+ from datetime import datetime , timedelta , time
7+ import pytz
8+ import logging
9+
10+ local_tz = pytz .timezone ("Europe/Rome" )
11+
12+ load_dotenv ()
13+
14+ DATABASE_URL = os .getenv ("DATABASE_URL" )
15+
16+ async def handle_notify (connection , pid , channel_name , payload ):
17+ # Handles notifications when they arrive on the user_approval channel
18+ user_email , username , status = payload .split ("," )
19+
20+ if channel_name == "user_status" :
21+ if status == "active" :
22+ subject = "Welcome to SchemaLink! Your account has been approved"
23+ message = (
24+ f"Hi { username } ,\n \n "
25+ "Great news! Your SchemaLink account has been approved.\n \n "
26+ "You have been granted a Trial policy that allows up to 10 intelligent requests "
27+ "within the next 24 hours. After this period, your access may be limited unless you upgrade "
28+ "to a higher tier.\n \n "
29+ "Thank you for joining SchemaLink, and we hope you enjoy using the platform!\n \n "
30+ f"Best regards,\n "
31+ f"The SchemaLink Team"
32+ )
33+ now = datetime .now (local_tz )
34+ end_date = now + timedelta (hours = 24 )
35+
36+ if now .time () <= time (12 , 0 ): # From 00:00:01 to 12:00:00
37+ end_date = (now + timedelta (days = 1 )).replace (hour = 12 , minute = 0 , second = 0 , microsecond = 0 )
38+ else : # From 12:00:01 to 23:59:59
39+ end_date = (now + timedelta (days = 2 )).replace (hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
40+
41+ try :
42+ await connection .execute ("""
43+ INSERT INTO UserSubscribesPolicy (
44+ username, startDate, endDate, requestDate, status, policyName
45+ ) VALUES ($1, $2, $3, $4, $5, $6)
46+ """ , username , now , end_date , now , 'active' , 'trial' )
47+
48+ print (f"Assigned 'trial' policy to user { username } " )
49+ except Exception as e :
50+ print (f"Error assigning policy to user { username } : { e } " )
51+ elif status == "blocked" :
52+ subject = "SchemaLink account blocked"
53+ message = (
54+ f"Hi { username } ,\n \n "
55+ "Your SchemaLink account has been blocked. \n \n "
56+ f"Best regards,\n "
57+ f"The SchemaLink Team"
58+ )
59+ elif status == "disabled" :
60+ subject = "SchemaLink account deleted"
61+ message = (
62+ f"Hi { username } ,\n \n "
63+ "Your SchemaLink account has been successfully deleted. \n \n "
64+ f"Best regards,\n "
65+ f"The SchemaLink Team"
66+ )
67+ elif channel_name == "policy_status" :
68+ if status == "active" :
69+ subject = "Your SchemaLink policy has been approved"
70+ message = (
71+ f"Hi { username } ,\n \n "
72+ "Your policy request has been approved. You can now enjoy the benefits associated with your policy.\n \n "
73+ f"Best regards,\n "
74+ f"The SchemaLink Team"
75+ )
76+ elif status == "rejected" :
77+ subject = "Your SchemaLink policy request was rejected"
78+ message = (
79+ f"Hi { username } ,\n \n "
80+ "We regret to inform you that your policy request has been rejected. \n \n "
81+ f"Best regards,\n "
82+ f"The SchemaLink Team"
83+ )
84+ elif status == "expired" :
85+ subject = "Your SchemaLink policy has expired"
86+ message = (
87+ f"Hi { username } ,\n \n "
88+ "Your current SchemaLink policy has expired. To continue enjoying uninterrupted service, "
89+ "please renew or upgrade your subscription.\n \n "
90+ f"Best regards,\n "
91+ f"The SchemaLink Team"
92+ )
93+
94+ # Send the email asynchronously in a separate thread
95+ if subject and message :
96+ await asyncio .to_thread (send_email , user_email , subject , message )
97+
98+
99+ async def listen_notifications ():
100+ # Listens for user approval notifications and sends emails
101+ conn = None
102+
103+ try :
104+ conn = await asyncpg .connect (DATABASE_URL )
105+ # Add the listener to the 'user_status' channel
106+ await conn .add_listener ("user_status" , handle_notify )
107+ await conn .add_listener ("policy_status" , handle_notify )
108+ print ("Listening on 'user_status' and 'policy_status'..." )
109+
110+ # Keep listening for notifications
111+ while True :
112+ await asyncio .sleep (60 ) # Keep the listener active without blocking
113+ except Exception as e :
114+ print (f"Error connecting to the database: { e } " )
115+
116+ finally :
117+ if conn :
118+ await conn .close ()
119+ print ("Database connection closed." )
0 commit comments