diff --git a/errata/tasks.py b/errata/tasks.py index 8ded3a79..9d1f3ed9 100644 --- a/errata/tasks.py +++ b/errata/tasks.py @@ -29,7 +29,7 @@ from util.logging import error_message, warning_message -@shared_task +@shared_task(priority=1) def update_yum_repo_errata(repo_id=None, force=False): """ Update all yum repos errata """ @@ -41,7 +41,7 @@ def update_yum_repo_errata(repo_id=None, force=False): repo.refresh_errata(force) -@shared_task +@shared_task(priority=1) def update_errata(erratum_type=None, force=False, repo=None): """ Update all distros errata """ @@ -85,7 +85,7 @@ def update_errata(erratum_type=None, force=False, repo=None): warning_message('Already updating Errata, skipping task.') -@shared_task +@shared_task(priority=2) def update_errata_and_cves(): """ Task to update all errata """ diff --git a/etc/patchman/local_settings.py b/etc/patchman/local_settings.py index 40dd2efd..15fcb60a 100644 --- a/etc/patchman/local_settings.py +++ b/etc/patchman/local_settings.py @@ -56,15 +56,15 @@ # Whether to run patchman under the gunicorn web server RUN_GUNICORN = False -# Set the default timeout to e.g. 30 seconds to enable UI caching -# Note that the UI results may be out of date for this amount of time CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.redis.RedisCache', 'LOCATION': 'redis://127.0.0.1:6379', - 'TIMEOUT': 0, } } +# Set the default timeout to e.g. 30 seconds to enable UI caching +# Note that the UI results may be out of date for this amount of time +CACHE_MIDDLEWARE_SECONDS = 0 from datetime import timedelta # noqa diff --git a/hosts/tasks.py b/hosts/tasks.py index 226652f6..f186760f 100755 --- a/hosts/tasks.py +++ b/hosts/tasks.py @@ -22,7 +22,7 @@ from util.logging import info_message -@shared_task +@shared_task(priority=0) def find_host_updates(host_id): """ Task to find updates for a host """ @@ -30,7 +30,7 @@ def find_host_updates(host_id): host.find_updates() -@shared_task +@shared_task(priority=1) def find_all_host_updates(): """ Task to find updates for all hosts """ @@ -38,7 +38,7 @@ def find_all_host_updates(): find_host_updates.delay(host.id) -@shared_task +@shared_task(priority=1) def find_all_host_updates_homogenous(): """ Task to find updates for all hosts where hosts are expected to be homogenous """ diff --git a/patchman/settings.py b/patchman/settings.py index 557e8c68..23e932c4 100644 --- a/patchman/settings.py +++ b/patchman/settings.py @@ -13,6 +13,7 @@ MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.middleware.cache.UpdateCacheMiddleware', + 'patchman.middleware.NeverCacheMiddleware', 'django.middleware.http.ConditionalGetMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', @@ -109,6 +110,10 @@ TAGGIT_CASE_INSENSITIVE = True CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' +CELERY_BROKER_TRANSPORT_OPTIONS = { + 'queue_order_strategy': 'priority', +} +CELERY_WORKER_PREFETCH_MULTIPLIER = 1 LOGIN_REDIRECT_URL = '/patchman/' LOGOUT_REDIRECT_URL = '/patchman/login/' diff --git a/reports/tasks.py b/reports/tasks.py index 07fb3004..6bf16e7c 100755 --- a/reports/tasks.py +++ b/reports/tasks.py @@ -24,25 +24,50 @@ from util.logging import info_message, warning_message -@shared_task(bind=True, autoretry_for=(OperationalError,), retry_backoff=True, retry_kwargs={'max_retries': 5}) +@shared_task( + bind=True, + priority=0, + autoretry_for=(OperationalError,), + retry_backoff=True, + retry_kwargs={'max_retries': 5} +) def process_report(self, report_id): """ Task to process a single report """ report = Report.objects.get(id=report_id) - lock_key = f'process_report_lock_{report_id}' - # lock will expire after 1 hour - lock_expire = 60 * 60 + report_id_lock_key = f'process_report_id_lock_{report_id}' + if report.host: + report_host_lock_key = f'process_report_host_lock_{report.host}' + else: + report_host_lock_key = f'process_report_host_lock_{report.report_ip}' + # locks will expire after 2 hours + lock_expire = 60 * 60 * 2 - if cache.add(lock_key, 'true', lock_expire): + if cache.add(report_id_lock_key, 'true', lock_expire): try: - report.process() + processing_report_id = cache.get(report_host_lock_key) + if processing_report_id: + if processing_report_id > report.id: + warning_message(f'Currently processing a newer report for {report.host} or {report.report_ip}, \ + marking report {report.id} as processed.') + report.processed = True + report.save() + else: + warning_message(f'Currently processing an older report for {report.host} or {report.report_ip}, \ + will skip processing this report.') + else: + try: + cache.set(report_host_lock_key, report.id, lock_expire) + report.process() + finally: + cache.delete(report_host_lock_key) finally: - cache.delete(lock_key) + cache.delete(report_id_lock_key) else: warning_message(f'Already processing report {report_id}, skipping task.') -@shared_task +@shared_task(priority=1) def process_reports(): """ Task to process all unprocessed reports """ @@ -51,9 +76,9 @@ def process_reports(): process_report.delay(report.id) -@shared_task -def clean_reports_with_no_hosts(): - """ Task to clean processed reports where the host no longer exists +@shared_task(priority=2) +def remove_reports_with_no_hosts(): + """ Task to remove processed reports where the host no longer exists """ for report in Report.objects.filter(processed=True): if not Host.objects.filter(hostname=report.host).exists(): diff --git a/repos/tasks.py b/repos/tasks.py index bc177653..a9fdd5f4 100644 --- a/repos/tasks.py +++ b/repos/tasks.py @@ -21,15 +21,25 @@ from util.logging import warning_message -@shared_task +@shared_task(priority=0) def refresh_repo(repo_id, force=False): """ Refresh metadata for a single repo """ - repo = Repository.objects.get(id=repo_id) - repo.refresh(force) + repo_id_lock_key = f'refresh_repos_{repo_id}_lock' + # lock will expire after 1 day + lock_expire = 60 * 60 * 24 + + if cache.add(repo_id_lock_key, 'true', lock_expire): + try: + repo = Repository.objects.get(id=repo_id) + repo.refresh(force) + finally: + cache.delete(repo_id_lock_key) + else: + warning_message(f'Already refreshing repo {repo_id}, skipping task.') -@shared_task +@shared_task(priority=1) def refresh_repos(force=False): """ Refresh metadata for all enabled repos """ diff --git a/sbin/patchman b/sbin/patchman index c415abec..b76272a2 100755 --- a/sbin/patchman +++ b/sbin/patchman @@ -43,7 +43,7 @@ from packages.utils import ( clean_packagenames, clean_packages, clean_packageupdates, ) from reports.models import Report -from reports.tasks import clean_reports_with_no_hosts +from reports.tasks import remove_reports_with_no_hosts from repos.models import Repository from repos.utils import clean_repos from security.utils import update_cves, update_cwes @@ -156,7 +156,7 @@ def clean_reports(hoststr=None): host.clean_reports() if not hoststr: - clean_reports_with_no_hosts() + remove_reports_with_no_hosts() def host_updates_alt(host=None): diff --git a/security/tasks.py b/security/tasks.py index 7bff4149..0cfbc2f1 100644 --- a/security/tasks.py +++ b/security/tasks.py @@ -21,15 +21,26 @@ from util.logging import warning_message -@shared_task +@shared_task(priority=3) def update_cve(cve_id): """ Task to update a CVE """ - cve = CVE.objects.get(id=cve_id) - cve.fetch_cve_data() + cve_id_lock_key = f'update_cve_id_lock_{cve_id}' + + # lock will expire after 1 week + lock_expire = 60 * 60 * 168 + + if cache.add(cve_id_lock_key, 'true', lock_expire): + try: + cve = CVE.objects.get(id=cve_id) + cve.fetch_cve_data() + finally: + cache.delete(cve_id_lock_key) + else: + warning_message(f'Already updating CVE {cve_id}, skipping task.') -@shared_task +@shared_task(priority=2) def update_cves(): """ Task to update all CVEs """ @@ -47,15 +58,26 @@ def update_cves(): warning_message('Already updating CVEs, skipping task.') -@shared_task +@shared_task(priority=3) def update_cwe(cwe_id): """ Task to update a CWE """ - cwe = CWE.objects.get(id=cwe_id) - cwe.fetch_cwe_data() + cwe_id_lock_key = f'update_cwe_id_lock_{cwe_id}' + + # lock will expire after 1 week + lock_expire = 60 * 60 * 168 + + if cache.add(cwe_id_lock_key, 'true', lock_expire): + try: + cwe = CWE.objects.get(id=cwe_id) + cwe.fetch_cwe_data() + finally: + cache.delete(cwe_id_lock_key) + else: + warning_message(f'Already updating CWE {cwe_id}, skipping task.') -@shared_task +@shared_task(priority=2) def update_cwes(): """ Task to update all CWEs """ diff --git a/util/tasks.py b/util/tasks.py index bd76bac6..12825a8c 100644 --- a/util/tasks.py +++ b/util/tasks.py @@ -24,7 +24,7 @@ from repos.utils import clean_repos, remove_mirror_trailing_slashes -@shared_task +@shared_task(priority=1) def clean_database(remove_duplicate_packages=False): """ Task to check the database and remove orphaned objects Runs all clean_* functions to check database consistency