Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions errata/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from util.logging import error_message, warning_message


@shared_task
@shared_task(priority=1)
def update_yum_repo_errata(repo_id=None, force=False):
""" Update all yum repos errata
"""
Expand All @@ -41,7 +41,7 @@ def update_yum_repo_errata(repo_id=None, force=False):
repo.refresh_errata(force)


@shared_task
@shared_task(priority=1)
def update_errata(erratum_type=None, force=False, repo=None):
""" Update all distros errata
"""
Expand Down Expand Up @@ -85,7 +85,7 @@ def update_errata(erratum_type=None, force=False, repo=None):
warning_message('Already updating Errata, skipping task.')


@shared_task
@shared_task(priority=2)
def update_errata_and_cves():
""" Task to update all errata
"""
Expand Down
6 changes: 3 additions & 3 deletions etc/patchman/local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@
# Whether to run patchman under the gunicorn web server
RUN_GUNICORN = False

# Set the default timeout to e.g. 30 seconds to enable UI caching
# Note that the UI results may be out of date for this amount of time
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379',
'TIMEOUT': 0,
}
}
# Set the default timeout to e.g. 30 seconds to enable UI caching
# Note that the UI results may be out of date for this amount of time
CACHE_MIDDLEWARE_SECONDS = 0

from datetime import timedelta # noqa

Expand Down
6 changes: 3 additions & 3 deletions hosts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
from util.logging import info_message


@shared_task
@shared_task(priority=0)
def find_host_updates(host_id):
""" Task to find updates for a host
"""
host = Host.objects.get(id=host_id)
host.find_updates()


@shared_task
@shared_task(priority=1)
def find_all_host_updates():
""" Task to find updates for all hosts
"""
for host in Host.objects.all():
find_host_updates.delay(host.id)


@shared_task
@shared_task(priority=1)
def find_all_host_updates_homogenous():
""" Task to find updates for all hosts where hosts are expected to be homogenous
"""
Expand Down
5 changes: 5 additions & 0 deletions patchman/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.middleware.cache.UpdateCacheMiddleware',
'patchman.middleware.NeverCacheMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
Expand Down Expand Up @@ -109,6 +110,10 @@
TAGGIT_CASE_INSENSITIVE = True

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'queue_order_strategy': 'priority',
}
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

LOGIN_REDIRECT_URL = '/patchman/'
LOGOUT_REDIRECT_URL = '/patchman/login/'
Expand Down
47 changes: 36 additions & 11 deletions reports/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,50 @@
from util.logging import info_message, warning_message


@shared_task(bind=True, autoretry_for=(OperationalError,), retry_backoff=True, retry_kwargs={'max_retries': 5})
@shared_task(
bind=True,
priority=0,
autoretry_for=(OperationalError,),
retry_backoff=True,
retry_kwargs={'max_retries': 5}
)
def process_report(self, report_id):
""" Task to process a single report
"""
report = Report.objects.get(id=report_id)
lock_key = f'process_report_lock_{report_id}'
# lock will expire after 1 hour
lock_expire = 60 * 60
report_id_lock_key = f'process_report_id_lock_{report_id}'
if report.host:
report_host_lock_key = f'process_report_host_lock_{report.host}'
else:
report_host_lock_key = f'process_report_host_lock_{report.report_ip}'
# locks will expire after 2 hours
lock_expire = 60 * 60 * 2

if cache.add(lock_key, 'true', lock_expire):
if cache.add(report_id_lock_key, 'true', lock_expire):
try:
report.process()
processing_report_id = cache.get(report_host_lock_key)
if processing_report_id:
if processing_report_id > report.id:
warning_message(f'Currently processing a newer report for {report.host} or {report.report_ip}, \
marking report {report.id} as processed.')
report.processed = True
report.save()
else:
warning_message(f'Currently processing an older report for {report.host} or {report.report_ip}, \
will skip processing this report.')
else:
try:
cache.set(report_host_lock_key, report.id, lock_expire)
report.process()
finally:
cache.delete(report_host_lock_key)
finally:
cache.delete(lock_key)
cache.delete(report_id_lock_key)
else:
warning_message(f'Already processing report {report_id}, skipping task.')


@shared_task
@shared_task(priority=1)
def process_reports():
""" Task to process all unprocessed reports
"""
Expand All @@ -51,9 +76,9 @@ def process_reports():
process_report.delay(report.id)


@shared_task
def clean_reports_with_no_hosts():
""" Task to clean processed reports where the host no longer exists
@shared_task(priority=2)
def remove_reports_with_no_hosts():
""" Task to remove processed reports where the host no longer exists
"""
for report in Report.objects.filter(processed=True):
if not Host.objects.filter(hostname=report.host).exists():
Expand Down
18 changes: 14 additions & 4 deletions repos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,25 @@
from util.logging import warning_message


@shared_task
@shared_task(priority=0)
def refresh_repo(repo_id, force=False):
""" Refresh metadata for a single repo
"""
repo = Repository.objects.get(id=repo_id)
repo.refresh(force)
repo_id_lock_key = f'refresh_repos_{repo_id}_lock'
# lock will expire after 1 day
lock_expire = 60 * 60 * 24

if cache.add(repo_id_lock_key, 'true', lock_expire):
try:
repo = Repository.objects.get(id=repo_id)
repo.refresh(force)
finally:
cache.delete(repo_id_lock_key)
else:
warning_message(f'Already refreshing repo {repo_id}, skipping task.')


@shared_task
@shared_task(priority=1)
def refresh_repos(force=False):
""" Refresh metadata for all enabled repos
"""
Expand Down
4 changes: 2 additions & 2 deletions sbin/patchman
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ from packages.utils import (
clean_packagenames, clean_packages, clean_packageupdates,
)
from reports.models import Report
from reports.tasks import clean_reports_with_no_hosts
from reports.tasks import remove_reports_with_no_hosts
from repos.models import Repository
from repos.utils import clean_repos
from security.utils import update_cves, update_cwes
Expand Down Expand Up @@ -156,7 +156,7 @@ def clean_reports(hoststr=None):
host.clean_reports()

if not hoststr:
clean_reports_with_no_hosts()
remove_reports_with_no_hosts()


def host_updates_alt(host=None):
Expand Down
38 changes: 30 additions & 8 deletions security/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@
from util.logging import warning_message


@shared_task
@shared_task(priority=3)
def update_cve(cve_id):
""" Task to update a CVE
"""
cve = CVE.objects.get(id=cve_id)
cve.fetch_cve_data()
cve_id_lock_key = f'update_cve_id_lock_{cve_id}'

# lock will expire after 1 week
lock_expire = 60 * 60 * 168

if cache.add(cve_id_lock_key, 'true', lock_expire):
try:
cve = CVE.objects.get(id=cve_id)
cve.fetch_cve_data()
finally:
cache.delete(cve_id_lock_key)
else:
warning_message(f'Already updating CVE {cve_id}, skipping task.')


@shared_task
@shared_task(priority=2)
def update_cves():
""" Task to update all CVEs
"""
Expand All @@ -47,15 +58,26 @@ def update_cves():
warning_message('Already updating CVEs, skipping task.')


@shared_task
@shared_task(priority=3)
def update_cwe(cwe_id):
""" Task to update a CWE
"""
cwe = CWE.objects.get(id=cwe_id)
cwe.fetch_cwe_data()
cwe_id_lock_key = f'update_cwe_id_lock_{cwe_id}'

# lock will expire after 1 week
lock_expire = 60 * 60 * 168

if cache.add(cwe_id_lock_key, 'true', lock_expire):
try:
cwe = CWE.objects.get(id=cwe_id)
cwe.fetch_cwe_data()
finally:
cache.delete(cwe_id_lock_key)
else:
warning_message(f'Already updating CWE {cwe_id}, skipping task.')


@shared_task
@shared_task(priority=2)
def update_cwes():
""" Task to update all CWEs
"""
Expand Down
2 changes: 1 addition & 1 deletion util/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from repos.utils import clean_repos, remove_mirror_trailing_slashes


@shared_task
@shared_task(priority=1)
def clean_database(remove_duplicate_packages=False):
""" Task to check the database and remove orphaned objects
Runs all clean_* functions to check database consistency
Expand Down
Loading