Skip to content

Commit 65e8851

Browse files
add priority queues for tasks (#724)
* add priority queues for tasks * Update repos/tasks.py Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
1 parent 95a5213 commit 65e8851

File tree

9 files changed

+97
-35
lines changed

9 files changed

+97
-35
lines changed

errata/tasks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from util.logging import error_message, warning_message
3030

3131

32-
@shared_task
32+
@shared_task(priority=1)
3333
def update_yum_repo_errata(repo_id=None, force=False):
3434
""" Update all yum repos errata
3535
"""
@@ -41,7 +41,7 @@ def update_yum_repo_errata(repo_id=None, force=False):
4141
repo.refresh_errata(force)
4242

4343

44-
@shared_task
44+
@shared_task(priority=1)
4545
def update_errata(erratum_type=None, force=False, repo=None):
4646
""" Update all distros errata
4747
"""
@@ -85,7 +85,7 @@ def update_errata(erratum_type=None, force=False, repo=None):
8585
warning_message('Already updating Errata, skipping task.')
8686

8787

88-
@shared_task
88+
@shared_task(priority=2)
8989
def update_errata_and_cves():
9090
""" Task to update all errata
9191
"""

etc/patchman/local_settings.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@
5656
# Whether to run patchman under the gunicorn web server
5757
RUN_GUNICORN = False
5858

59-
# Set the default timeout to e.g. 30 seconds to enable UI caching
60-
# Note that the UI results may be out of date for this amount of time
6159
CACHES = {
6260
'default': {
6361
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
6462
'LOCATION': 'redis://127.0.0.1:6379',
65-
'TIMEOUT': 0,
6663
}
6764
}
65+
# Set the default timeout to e.g. 30 seconds to enable UI caching
66+
# Note that the UI results may be out of date for this amount of time
67+
CACHE_MIDDLEWARE_SECONDS = 0
6868

6969
from datetime import timedelta # noqa
7070

hosts/tasks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,23 @@
2222
from util.logging import info_message
2323

2424

25-
@shared_task
25+
@shared_task(priority=0)
2626
def find_host_updates(host_id):
2727
""" Task to find updates for a host
2828
"""
2929
host = Host.objects.get(id=host_id)
3030
host.find_updates()
3131

3232

33-
@shared_task
33+
@shared_task(priority=1)
3434
def find_all_host_updates():
3535
""" Task to find updates for all hosts
3636
"""
3737
for host in Host.objects.all():
3838
find_host_updates.delay(host.id)
3939

4040

41-
@shared_task
41+
@shared_task(priority=1)
4242
def find_all_host_updates_homogenous():
4343
""" Task to find updates for all hosts where hosts are expected to be homogenous
4444
"""

patchman/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
MIDDLEWARE = [
1414
'django.middleware.security.SecurityMiddleware',
1515
'django.middleware.cache.UpdateCacheMiddleware',
16+
'patchman.middleware.NeverCacheMiddleware',
1617
'django.middleware.http.ConditionalGetMiddleware',
1718
'django.contrib.sessions.middleware.SessionMiddleware',
1819
'django.middleware.common.CommonMiddleware',
@@ -109,6 +110,10 @@
109110
TAGGIT_CASE_INSENSITIVE = True
110111

111112
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
113+
CELERY_BROKER_TRANSPORT_OPTIONS = {
114+
'queue_order_strategy': 'priority',
115+
}
116+
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
112117

113118
LOGIN_REDIRECT_URL = '/patchman/'
114119
LOGOUT_REDIRECT_URL = '/patchman/login/'

reports/tasks.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,50 @@
2424
from util.logging import info_message, warning_message
2525

2626

27-
@shared_task(bind=True, autoretry_for=(OperationalError,), retry_backoff=True, retry_kwargs={'max_retries': 5})
27+
@shared_task(
28+
bind=True,
29+
priority=0,
30+
autoretry_for=(OperationalError,),
31+
retry_backoff=True,
32+
retry_kwargs={'max_retries': 5}
33+
)
2834
def process_report(self, report_id):
2935
""" Task to process a single report
3036
"""
3137
report = Report.objects.get(id=report_id)
32-
lock_key = f'process_report_lock_{report_id}'
33-
# lock will expire after 1 hour
34-
lock_expire = 60 * 60
38+
report_id_lock_key = f'process_report_id_lock_{report_id}'
39+
if report.host:
40+
report_host_lock_key = f'process_report_host_lock_{report.host}'
41+
else:
42+
report_host_lock_key = f'process_report_host_lock_{report.report_ip}'
43+
# locks will expire after 2 hours
44+
lock_expire = 60 * 60 * 2
3545

36-
if cache.add(lock_key, 'true', lock_expire):
46+
if cache.add(report_id_lock_key, 'true', lock_expire):
3747
try:
38-
report.process()
48+
processing_report_id = cache.get(report_host_lock_key)
49+
if processing_report_id:
50+
if processing_report_id > report.id:
51+
warning_message(f'Currently processing a newer report for {report.host} or {report.report_ip}, \
52+
marking report {report.id} as processed.')
53+
report.processed = True
54+
report.save()
55+
else:
56+
warning_message(f'Currently processing an older report for {report.host} or {report.report_ip}, \
57+
will skip processing this report.')
58+
else:
59+
try:
60+
cache.set(report_host_lock_key, report.id, lock_expire)
61+
report.process()
62+
finally:
63+
cache.delete(report_host_lock_key)
3964
finally:
40-
cache.delete(lock_key)
65+
cache.delete(report_id_lock_key)
4166
else:
4267
warning_message(f'Already processing report {report_id}, skipping task.')
4368

4469

45-
@shared_task
70+
@shared_task(priority=1)
4671
def process_reports():
4772
""" Task to process all unprocessed reports
4873
"""
@@ -51,9 +76,9 @@ def process_reports():
5176
process_report.delay(report.id)
5277

5378

54-
@shared_task
55-
def clean_reports_with_no_hosts():
56-
""" Task to clean processed reports where the host no longer exists
79+
@shared_task(priority=2)
80+
def remove_reports_with_no_hosts():
81+
""" Task to remove processed reports where the host no longer exists
5782
"""
5883
for report in Report.objects.filter(processed=True):
5984
if not Host.objects.filter(hostname=report.host).exists():

repos/tasks.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,25 @@
2121
from util.logging import warning_message
2222

2323

24-
@shared_task
24+
@shared_task(priority=0)
2525
def refresh_repo(repo_id, force=False):
2626
""" Refresh metadata for a single repo
2727
"""
28-
repo = Repository.objects.get(id=repo_id)
29-
repo.refresh(force)
28+
repo_id_lock_key = f'refresh_repos_{repo_id}_lock'
29+
# lock will expire after 1 day
30+
lock_expire = 60 * 60 * 24
31+
32+
if cache.add(repo_id_lock_key, 'true', lock_expire):
33+
try:
34+
repo = Repository.objects.get(id=repo_id)
35+
repo.refresh(force)
36+
finally:
37+
cache.delete(repo_id_lock_key)
38+
else:
39+
warning_message(f'Already refreshing repo {repo_id}, skipping task.')
3040

3141

32-
@shared_task
42+
@shared_task(priority=1)
3343
def refresh_repos(force=False):
3444
""" Refresh metadata for all enabled repos
3545
"""

sbin/patchman

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ from packages.utils import (
4343
clean_packagenames, clean_packages, clean_packageupdates,
4444
)
4545
from reports.models import Report
46-
from reports.tasks import clean_reports_with_no_hosts
46+
from reports.tasks import remove_reports_with_no_hosts
4747
from repos.models import Repository
4848
from repos.utils import clean_repos
4949
from security.utils import update_cves, update_cwes
@@ -156,7 +156,7 @@ def clean_reports(hoststr=None):
156156
host.clean_reports()
157157

158158
if not hoststr:
159-
clean_reports_with_no_hosts()
159+
remove_reports_with_no_hosts()
160160

161161

162162
def host_updates_alt(host=None):

security/tasks.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,26 @@
2121
from util.logging import warning_message
2222

2323

24-
@shared_task
24+
@shared_task(priority=3)
2525
def update_cve(cve_id):
2626
""" Task to update a CVE
2727
"""
28-
cve = CVE.objects.get(id=cve_id)
29-
cve.fetch_cve_data()
28+
cve_id_lock_key = f'update_cve_id_lock_{cve_id}'
29+
30+
# lock will expire after 1 week
31+
lock_expire = 60 * 60 * 168
32+
33+
if cache.add(cve_id_lock_key, 'true', lock_expire):
34+
try:
35+
cve = CVE.objects.get(id=cve_id)
36+
cve.fetch_cve_data()
37+
finally:
38+
cache.delete(cve_id_lock_key)
39+
else:
40+
warning_message(f'Already updating CVE {cve_id}, skipping task.')
3041

3142

32-
@shared_task
43+
@shared_task(priority=2)
3344
def update_cves():
3445
""" Task to update all CVEs
3546
"""
@@ -47,15 +58,26 @@ def update_cves():
4758
warning_message('Already updating CVEs, skipping task.')
4859

4960

50-
@shared_task
61+
@shared_task(priority=3)
5162
def update_cwe(cwe_id):
5263
""" Task to update a CWE
5364
"""
54-
cwe = CWE.objects.get(id=cwe_id)
55-
cwe.fetch_cwe_data()
65+
cwe_id_lock_key = f'update_cwe_id_lock_{cwe_id}'
66+
67+
# lock will expire after 1 week
68+
lock_expire = 60 * 60 * 168
69+
70+
if cache.add(cwe_id_lock_key, 'true', lock_expire):
71+
try:
72+
cwe = CWE.objects.get(id=cwe_id)
73+
cwe.fetch_cwe_data()
74+
finally:
75+
cache.delete(cwe_id_lock_key)
76+
else:
77+
warning_message(f'Already updating CWE {cwe_id}, skipping task.')
5678

5779

58-
@shared_task
80+
@shared_task(priority=2)
5981
def update_cwes():
6082
""" Task to update all CWEs
6183
"""

util/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from repos.utils import clean_repos, remove_mirror_trailing_slashes
2525

2626

27-
@shared_task
27+
@shared_task(priority=1)
2828
def clean_database(remove_duplicate_packages=False):
2929
""" Task to check the database and remove orphaned objects
3030
Runs all clean_* functions to check database consistency

0 commit comments

Comments
 (0)