11"""This module contains Celery tasks and functions associated with them."""
22import os
33import json
4- import random
54import time
5+ from datetime import datetime , timedelta , timezone
66import requests
77import numpy as np
8+ from sqlalchemy import desc
89from sqlalchemy .sql .functions import func
910from celery .utils .log import get_task_logger
1011from app import celery , db
1112from app .emails import send_email
1213from app .lists import MailChimpList , MailChimpImportError , do_async_import
13- from app .models import ListStats
14+ from app .models import EmailList , ListStats
1415from app .dbops import associate_user_with_list
1516from app .visualizations import (
1617 draw_bar , draw_stacked_horizontal_bar , draw_histogram , draw_donuts )
@@ -81,12 +82,8 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
8182 mailing_list .calc_high_open_rate_pct ()
8283 mailing_list .calc_cur_yr_stats ()
8384
84- # Create a list object
85+ # Create a set of stats
8586 list_stats = ListStats (
86- list_id = list_data ['list_id' ],
87- list_name = list_data ['list_name' ],
88- api_key = list_data ['key' ],
89- data_center = list_data ['data_center' ],
9087 frequency = mailing_list .frequency ,
9188 subscribers = mailing_list .subscribers ,
9289 open_rate = mailing_list .open_rate ,
@@ -97,13 +94,23 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
9794 pending_pct = mailing_list .pending_pct ,
9895 high_open_rt_pct = mailing_list .high_open_rt_pct ,
9996 cur_yr_inactive_pct = mailing_list .cur_yr_inactive_pct ,
100- store_aggregates = list_data ['store_aggregates' ],
101- monthly_updates = list_data ['monthly_updates' ],
102- org_id = org_id )
97+ list_id = list_data ['list_id' ])
10398
104- # If the user gave their permission, store the object in the database
99+ # If the user gave their permission, store the stats in the database
105100 if list_data ['monthly_updates' ] or list_data ['store_aggregates' ]:
106- list_stats = db .session .merge (list_stats )
101+
102+ # Create a list object to go with the set of stats
103+ email_list = EmailList (
104+ list_id = list_data ['list_id' ],
105+ list_name = list_data ['list_name' ],
106+ api_key = list_data ['key' ],
107+ data_center = list_data ['data_center' ],
108+ store_aggregates = list_data ['store_aggregates' ],
109+ monthly_updates = list_data ['monthly_updates' ],
110+ org_id = org_id )
111+ email_list = db .session .merge (email_list )
112+
113+ db .session .add (list_stats )
107114 try :
108115 db .session .commit ()
109116 except :
@@ -113,7 +120,7 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
113120 return list_stats
114121
115122def send_report (stats , list_id , list_name , user_email_or_emails ):
116- """Generates charts using Pygal and emails them to the user.
123+ """Generates charts using Plotly and emails them to the user.
117124
118125 Args:
119126 stats: a dictionary containing analysis results for a list.
@@ -122,22 +129,30 @@ def send_report(stats, list_id, list_name, user_email_or_emails):
122129 user_email_or_emails: a list of emails to send the report to.
123130 """
124131
125- # Generate aggregates for the database
126- # Only include lists where we have permission
132+ # This subquery generates the most recent stats
133+ # For each unique list_id in the database
134+ # Where store_aggregates is True
135+ subquery = ListStats .query .filter (
136+ ListStats .list .has (store_aggregates = True )).order_by ('list_id' , desc (
137+ 'analysis_timestamp' )).distinct (ListStats .list_id ).subquery ()
138+
139+ # Generate aggregates within the subquery
127140 agg_stats = db .session .query (
128- func .avg (ListStats .subscribers ),
129- func .avg (ListStats .subscribed_pct ),
130- func .avg (ListStats .unsubscribed_pct ),
131- func .avg (ListStats .cleaned_pct ),
132- func .avg (ListStats .pending_pct ),
133- func .avg (ListStats .open_rate ),
134- func .avg (ListStats .high_open_rt_pct ),
135- func .avg (ListStats .cur_yr_inactive_pct )).filter_by (
136- store_aggregates = True ).first ()
141+ func .avg (subquery .columns .subscribers ),
142+ func .avg (subquery .columns .subscribed_pct ),
143+ func .avg (subquery .columns .unsubscribed_pct ),
144+ func .avg (subquery .columns .cleaned_pct ),
145+ func .avg (subquery .columns .pending_pct ),
146+ func .avg (subquery .columns .open_rate ),
147+ func .avg (subquery .columns .high_open_rt_pct ),
148+ func .avg (subquery .columns .cur_yr_inactive_pct )).first ()
137149
138150 # Make sure we have no 'None' values
139151 agg_stats = [agg if agg else 0 for agg in agg_stats ]
140152
153+ # Convert subscribers average to an integer
154+ agg_stats [0 ] = int (agg_stats [0 ])
155+
141156 # Generate epoch time (to get around image caching in webmail)
142157 epoch_time = str (int (time .time ()))
143158
@@ -202,7 +217,7 @@ def send_report(stats, list_id, list_name, user_email_or_emails):
202217 os .environ .get ('SES_CONFIGURATION_SET' ) or None ))
203218
204219def extract_stats (list_object ):
205- """Extracts a stats dictionary from a list object from the database ."""
220+ """Extracts a stats dictionary from a SQLAlchemy ListStats object ."""
206221 stats = {'subscribers' : list_object .subscribers ,
207222 'open_rate' : list_object .open_rate ,
208223 'hist_bin_counts' : json .loads (list_object .hist_bin_counts ),
@@ -218,10 +233,12 @@ def extract_stats(list_object):
218233def init_list_analysis (user_data , list_data , org_id ):
219234 """Celery task wrapper for each stage of analyzing a list.
220235
221- First checks if the list stats are cached, i.e. already in the
236+ First checks if there is a recently cached analysis , i.e. already in the
222237 database. If not, calls import_analyze_store_list() to generate
223- them. Then checks if the user is already associated with the list,
224- if not, create the relationship. Finally, generates a benchmarking
238+ the ListStats and an associated EmailList. Next updates the user's
239+ privacy options (e.g. store_aggregates, monthly_updates) if the list was
240+ cached. Then checks if the user selected monthly updates, if so,
241+ create the relationship. Finally, generates a benchmarking
225242 report with the stats.
226243
227244 Args:
@@ -230,60 +247,88 @@ def init_list_analysis(user_data, list_data, org_id):
230247 org_id: the id of the organization associated with the list.
231248 """
232249
233- # Try to pull the list stats from database
250+ # Try to pull the most recent ListStats from the database
234251 # Otherwise generate them
235- list_object = (ListStats .query .filter_by (
236- list_id = list_data ['list_id' ]).first () or
237- import_analyze_store_list (
238- list_data , org_id , user_data ['email' ]))
239-
240- # Associate the list with the user who requested the analysis
241- # If that user requested monthly updates
242- if list_data ['monthly_updates' ]:
243- associate_user_with_list (user_data ['user_id' ], list_object )
244-
245- stats = extract_stats (list_object )
252+ most_recent_analysis = (ListStats .query .filter_by (
253+ list_id = list_data ['list_id' ]).order_by (desc (
254+ 'analysis_timestamp' )).first () or import_analyze_store_list (
255+ list_data , org_id , user_data ['email' ]))
256+
257+ # If the user chose to store their data, there will be an associated
258+ # EmailList object
259+ list_object = EmailList .query .filter_by (
260+ list_id = list_data ['list_id' ]).first ()
261+
262+ if list_object :
263+
264+ # Update the privacy options if they differ from previous selection
265+ if (list_object .monthly_updates != list_data ['monthly_updates' ]
266+ or list_object .store_aggregates != list_data ['store_aggregates' ]):
267+ list_object .monthly_updates = list_data ['monthly_updates' ]
268+ list_object .store_aggregates = list_data ['store_aggregates' ]
269+ list_object = db .session .merge (list_object )
270+ try :
271+ db .session .commit ()
272+ except :
273+ db .session .rollback ()
274+ raise
275+
276+ # Associate the list with the user who requested the analysis
277+ # If that user requested monthly updates
278+ if list_data ['monthly_updates' ]:
279+ associate_user_with_list (user_data ['user_id' ], list_object )
280+
281+ # Convert the ListStats object to an easier-to-use dictionary
282+ stats = extract_stats (most_recent_analysis )
246283 send_report (stats , list_data ['list_id' ],
247284 list_data ['list_name' ], [user_data ['email' ]])
248285
249286@celery .task
250287def update_stored_data ():
251288 """Celery task which goes through the database
252- and updates calculations using the most recent data .
289+ and generates a new set of calculations for each list older than 30 days .
253290
254291 This task is called by Celery Beat, see the schedule in config.py.
255292 """
256-
257- # Get the logger
258293 logger = get_task_logger (__name__ )
259294
260- # Grab what we have in the database
261- list_objects = ListStats .query .with_entities (
262- ListStats .list_id , ListStats .list_name , ListStats .org_id ,
263- ListStats .api_key , ListStats .data_center ,
264- ListStats .store_aggregates , ListStats .monthly_updates ).all ()
295+ # Grab the most recent analyses in the database
296+ list_analyses = ListStats .query .order_by (
297+ 'list_id' , desc ('analysis_timestamp' )).distinct (
298+ ListStats .list_id ).all ()
265299
266- if not list_objects :
267- logger .info ('No lists to update!' )
300+ if not list_analyses :
301+ logger .warning ('No lists in the database!' )
302+ return
303+
304+ # Create a list of analyses which are more than 30 days old
305+ now = datetime .now (timezone .utc )
306+ one_month_ago = now - timedelta (days = 30 )
307+ analyses_to_update = [
308+ analysis for analysis in list_analyses
309+ if (analysis .analysis_timestamp .replace (
310+ tzinfo = timezone .utc )) < one_month_ago ]
311+
312+ if not analyses_to_update :
313+ logger .info ('No old lists to update!' )
268314 return
269315
270316 # Placeholder for lists which failed during the update process
271317 failed_updates = []
272318
273- # Update 1/30th of the lists in the database (such that every list
274- # is updated about once per month, on average).
275- lists_to_update = random .sample (
276- list_objects , len (list_objects ) // 31 if len (list_objects ) // 31 else 1 )
277-
278319 # Update each list's calculations in sequence
279- for list_to_update in lists_to_update :
320+ for analysis in analyses_to_update :
321+
322+ logger .info ('Updating list %s!' , analysis .list_id )
280323
281- logger .info ('Updating list %s!' , list_to_update .list_id )
324+ # Get the list object associated with the analysis
325+ associated_list_object = analysis .list
282326
283327 # Pull information about the list from the API
284328 # This may have changed since we originally pulled the list data
285329 request_uri = ('https://{}.api.mailchimp.com/3.0/lists/{}' .format (
286- list_to_update .data_center , list_to_update .list_id ))
330+ associated_list_object .data_center ,
331+ associated_list_object .list_id ))
287332 params = (
288333 ('fields' , 'stats.member_count,'
289334 'stats.unsubscribe_count,'
@@ -294,31 +339,31 @@ def update_stored_data():
294339 )
295340 response = requests .get (
296341 request_uri , params = params ,
297- auth = ('shorenstein' , list_to_update .api_key ))
342+ auth = ('shorenstein' , associated_list_object .api_key ))
298343 response_body = response .json ()
299344 response_stats = response_body ['stats' ]
300345 count = (response_stats ['member_count' ] +
301346 response_stats ['unsubscribe_count' ] +
302347 response_stats ['cleaned_count' ])
303348
304349 # Create a dictionary of list data
305- list_data = {'list_id' : list_to_update .list_id ,
306- 'list_name' : list_to_update .list_name ,
307- 'key' : list_to_update .api_key ,
308- 'data_center' : list_to_update .data_center ,
309- 'monthly_updates' : list_to_update .monthly_updates ,
310- 'store_aggregates' : list_to_update .store_aggregates ,
350+ list_data = {'list_id' : analysis .list_id ,
351+ 'list_name' : associated_list_object .list_name ,
352+ 'key' : associated_list_object .api_key ,
353+ 'data_center' : associated_list_object .data_center ,
354+ 'monthly_updates' : associated_list_object .monthly_updates ,
355+ 'store_aggregates' : associated_list_object .store_aggregates ,
311356 'total_count' : count ,
312357 'open_rate' : response_stats ['open_rate' ],
313358 'date_created' : response_body ['date_created' ],
314359 'campaign_count' : response_stats ['campaign_count' ]}
315360
316361 # Then re-run the calculations and update the database
317362 try :
318- import_analyze_store_list (list_data , list_to_update .org_id )
363+ import_analyze_store_list (list_data , associated_list_object .org_id )
319364 except MailChimpImportError :
320- logger .error ('Error updating list %s.' , list_to_update .list_id )
321- failed_updates .append (list_to_update .list_id )
365+ logger .error ('Error updating list %s.' , analysis .list_id )
366+ failed_updates .append (analysis .list_id )
322367
323368 # If any updates failed, raise an exception to send an error email
324369 if failed_updates :
@@ -336,7 +381,7 @@ def send_monthly_reports():
336381 logger = get_task_logger (__name__ )
337382
338383 # Grab info from the database
339- monthly_report_lists = ListStats .query .filter_by (
384+ monthly_report_lists = EmailList .query .filter_by (
340385 monthly_updates = True ).all ()
341386
342387 # Send an email report for each list
@@ -352,8 +397,13 @@ def send_monthly_reports():
352397 monthly_report_list .list_name ,
353398 monthly_report_list .list_id )
354399
400+ # Get the most recent analysis for the list
401+ stats_object = ListStats .query .filter_by (
402+ list_id = monthly_report_list .list_id ).order_by (
403+ desc ('analysis_timestamp' )).first ()
404+
355405 # Extract stats from the list object
356- stats = extract_stats (monthly_report_list )
406+ stats = extract_stats (stats_object )
357407 send_report (stats , monthly_report_list .list_id ,
358408 monthly_report_list .list_name ,
359409 users_to_email )
0 commit comments