Skip to content

Commit ba551bc

Browse files
Merge pull request #94 from pitangainnovare/better-tasks-2
Melhora tasks de validação e processamento de logs
2 parents d57887b + b73fe51 commit ba551bc

File tree

3 files changed

+88
-35
lines changed

3 files changed

+88
-35
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.8.0
1+
1.9.0

log_manager/tasks.py

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non
100100
username (str, optional): The username of the user initiating the task. Defaults to None.
101101
ignore_date (bool, optional): If True, ignore the date of the log file. Defaults to False.
102102
"""
103-
user = _get_user(self.request, username=username, user_id=user_id)
104-
105103
logging.info(f'Validating log files for collections: {collections}.')
106104

107105
visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back)
@@ -113,34 +111,53 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non
113111
for log_file in models.LogFile.objects.filter(status=choices.LOG_FILE_STATUS_CREATED, collection__acron3=col):
114112
file_ctime = date_utils.get_date_obj_from_timestamp(log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX])
115113
if file_ctime in visible_dates or ignore_date:
116-
logging.info(f'Validating log file {log_file.path} for collection {log_file.collection.acron3}.')
114+
task_validate_log_file.apply_async(args=(log_file.hash, user_id, username))
117115

118-
buffer_size, sample_size = _fetch_validation_parameters(col)
119-
120-
val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size)
121-
if 'datetimes' in val_result.get('content', {}).get('summary', {}):
122-
del val_result['content']['summary']['datetimes']
123116

124-
try:
125-
log_file.validation['result'] = json.dumps(val_result, cls=DjangoJSONEncoder) if val_result else {}
126-
log_file.validation['parameters'] = {'buffer_size': buffer_size, 'sample_size': sample_size}
127-
except json.JSONDecodeError as e:
128-
logging.error(f'Error serializing validation result: {e}')
129-
log_file.validation = {}
117+
@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1)
118+
def task_validate_log_file(self, log_file_hash, user_id=None, username=None):
119+
"""
120+
Task to validate a specific log file.
130121
131-
if val_result.get('is_valid', {}).get('all', False):
132-
models.LogFileDate.create_or_update(
133-
user=user,
134-
log_file=log_file,
135-
date=val_result.get('probably_date', ''),
136-
)
137-
log_file.status = choices.LOG_FILE_STATUS_QUEUED
122+
Parameters:
123+
log_file_id (int): The ID of the log file to validate.
124+
user_id (int, optional): The ID of the user initiating the task. Defaults to None.
125+
username (str, optional): The username of the user initiating the task. Defaults to None.
126+
"""
127+
user = _get_user(self.request, username=username, user_id=user_id)
128+
log_file = models.LogFile.objects.get(hash=log_file_hash)
129+
collection = log_file.collection.acron3
138130

139-
else:
140-
log_file.status = choices.LOG_FILE_STATUS_INVALIDATED
131+
buffer_size, sample_size = _fetch_validation_parameters(collection)
141132

142-
logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.')
143-
log_file.save()
133+
logging.info(f'Validating log file {log_file.path}.')
134+
val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size)
135+
if 'datetimes' in val_result.get('content', {}).get('summary', {}):
136+
del val_result['content']['summary']['datetimes']
137+
138+
if 'probably_date' in val_result:
139+
val_result['probably_date'] = date_utils.get_date_str(val_result['probably_date'])
140+
141+
try:
142+
log_file.validation = val_result
143+
log_file.validation.update({'buffer_size': buffer_size, 'sample_size': sample_size})
144+
except json.JSONDecodeError as e:
145+
logging.error(f'Error serializing validation result: {e}')
146+
log_file.validation = {}
147+
148+
if val_result.get('is_valid', {}).get('all', False):
149+
models.LogFileDate.create_or_update(
150+
user=user,
151+
log_file=log_file,
152+
date=val_result.get('probably_date', ''),
153+
)
154+
log_file.status = choices.LOG_FILE_STATUS_QUEUED
155+
156+
else:
157+
log_file.status = choices.LOG_FILE_STATUS_INVALIDATED
158+
159+
logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.')
160+
log_file.save()
144161

145162

146163
def _fetch_validation_parameters(collection, default_buffer_size=0.1, default_sample_size=2048):
@@ -199,8 +216,7 @@ def _check_missing_logs_for_date(user, collection, date):
199216
@celery_app.task(bind=True, name=_('Generate log files count report'))
200217
def task_log_files_count_status_report(self, collections=[], from_date=None, until_date=None, user_id=None, username=None):
201218
from_date, until_date = date_utils.get_date_range_str(from_date, until_date)
202-
possible_dates_n = len(date_utils.get_date_objs_from_date_range(from_date, until_date))
203-
219+
204220
from_date_obj = date_utils.get_date_obj(from_date)
205221
until_date_obj = date_utils.get_date_obj(until_date)
206222

metrics/tasks.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from core.utils.utils import _get_user
1111
from core.utils.date_utils import (
12+
get_date_obj,
1213
get_date_str,
1314
get_date_range_str,
1415
get_date_objs_from_date_range,
@@ -40,27 +41,63 @@
4041
from .models import UserAgent, UserSession, Item, ItemAccess
4142

4243
import logging
44+
import json
4345

4446

4547
User = get_user_model()
4648

49+
4750
@celery_app.task(bind=True, name=_('Compute access'), timelimit=-1)
48-
def task_parse_logs(self, collection, user_id=None, username=None):
51+
def task_parse_logs(self, collections=[], from_date=None, until_date=None, user_id=None, username=None):
4952
"""
5053
Parses log files associated with a given collection.
5154
5255
Args:
53-
collection (str): Acronym associated with the collection for which logs are being parsed.
56+
collections (list, optional): List of collection acronyms to parse logs for. Defaults to all collections.
57+
from_date (str, optional): Start date for log parsing in 'YYYY-MM-DD' format. Defaults to None.
58+
until_date (str, optional): End date for log parsing in 'YYYY-MM-DD' format. Defaults to None.
5459
user_id
5560
username
5661
5762
Returns:
5863
None.
59-
"""
60-
for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection):
61-
logging.info(f'PARSING file {lf.path}')
62-
task_parse_log.apply_async(args=(lf.hash, user_id, username))
63-
64+
"""
65+
from_date, until_date = get_date_range_str(from_date, until_date)
66+
67+
from_date_obj = get_date_obj(from_date)
68+
until_date_obj = get_date_obj(until_date)
69+
70+
for collection in collections or Collection.acron3_list():
71+
for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection):
72+
probably_date = _extract_date_from_validation(lf.validation)
73+
if not probably_date:
74+
logging.debug(f'Log file {lf.path} does not have a valid probably date.')
75+
continue
76+
77+
if probably_date <= from_date_obj or probably_date >= until_date_obj:
78+
continue
79+
80+
logging.info(f'PARSING file {lf.path}')
81+
task_parse_log.apply_async(args=(lf.hash, user_id, username))
82+
83+
84+
def _extract_date_from_validation(validation):
85+
"""
86+
Extracts the date from the validation dict of a log file.
87+
88+
Args:
89+
validation (dict): The validation dict of the log file.
90+
91+
Returns:
92+
datetime.date: The extracted date.
93+
"""
94+
try:
95+
date_str = validation.get('probably_date')
96+
return get_date_obj(date_str, '%Y-%m-%d')
97+
except Exception as e:
98+
logging.error(f"Failed to extract date from validation: {e}")
99+
return None
100+
64101

65102
@celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1)
66103
def task_parse_log(self, log_file_hash, user_id=None, username=None):

0 commit comments

Comments
 (0)