diff --git a/errata/models.py b/errata/models.py
index 8f6a6f74..ba4baec7 100644
--- a/errata/models.py
+++ b/errata/models.py
@@ -91,15 +91,16 @@ def _mark_updates_security(self, updates):
error_message(text=e)
update.delete()
- def fetch_osv_dev_data(self):
+ def fetch_osv_dev_data(self, session=None):
+ """ Fetch osv.dev JSON for this erratum. Returns parsed JSON or None.
+ """
osv_dev_url = f'https://api.osv.dev/v1/vulns/{self.name}'
- res = get_url(osv_dev_url)
+ res = get_url(osv_dev_url, session=session)
+ if res is None:
+ return None
if res.status_code == 404:
- error_message(text=f'404 - Skipping {self.name} - {osv_dev_url}')
- return
- data = res.content
- osv_dev_json = json.loads(data)
- self.parse_osv_dev_data(osv_dev_json)
+ return None
+ return json.loads(res.content)
def parse_osv_dev_data(self, osv_dev_json):
from django.db.models import Q
diff --git a/errata/sources/distros/alma.py b/errata/sources/distros/alma.py
index d9e1dbf9..8b7b3163 100644
--- a/errata/sources/distros/alma.py
+++ b/errata/sources/distros/alma.py
@@ -14,19 +14,21 @@
# You should have received a copy of the GNU General Public License
# along with Patchman. If not, see
-import concurrent.futures
import json
-from django.db import connections
-
-from operatingsystems.utils import get_or_create_osrelease
+from errata.utils import get_or_create_erratum
+from modules.utils import get_matching_modules
+from operatingsystems.utils import (
+ get_or_create_osrelease, normalize_el_osrelease,
+)
from packages.models import Package
from packages.utils import get_or_create_package, parse_package_string
from patchman.signals import pbar_start, pbar_update
-from util import fetch_content, get_setting_of_type, get_url
+from util import fetch_content, get_setting_of_type, get_url, run_concurrently
+from util.logging import clear_forked_pbar
-def update_alma_errata(concurrent_processing=True):
+def update_alma_errata(concurrent_processing=True, max_workers=25):
""" Update Alma Linux advisories from errata.almalinux.org:
https://errata.almalinux.org/8/errata.full.json
https://errata.almalinux.org/9/errata.full.json
@@ -40,7 +42,7 @@ def update_alma_errata(concurrent_processing=True):
)
for release in alma_releases:
advisories = fetch_alma_advisories(release)
- process_alma_errata(release, advisories, concurrent_processing)
+ process_alma_errata(release, advisories, concurrent_processing, max_workers)
def fetch_alma_advisories(release):
@@ -54,11 +56,11 @@ def fetch_alma_advisories(release):
return advisories
-def process_alma_errata(release, advisories, concurrent_processing):
+def process_alma_errata(release, advisories, concurrent_processing, max_workers=25):
""" Process Alma Linux Errata
"""
if concurrent_processing:
- process_alma_errata_concurrently(release, advisories)
+ process_alma_errata_concurrently(release, advisories, max_workers)
else:
process_alma_errata_serially(release, advisories)
@@ -73,24 +75,24 @@ def process_alma_errata_serially(release, advisories):
pbar_update.send(sender=None, index=i + 1)
-def process_alma_errata_concurrently(release, advisories):
+def process_alma_errata_concurrently(release, advisories, max_workers=25):
""" Process Alma Linux Errata concurrently
"""
- connections.close_all()
elen = len(advisories)
pbar_start.send(sender=None, ptext=f'Processing {elen} Alma {release} Errata', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(process_alma_erratum, release, advisory) for advisory in advisories]
- for future in concurrent.futures.as_completed(futures):
- i += 1
- pbar_update.send(sender=None, index=i + 1)
+ args = [(release, advisory) for advisory in advisories]
+ for i, _ in enumerate(run_concurrently(process_alma_erratum_wrapper, args, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+
+
+def process_alma_erratum_wrapper(args):
+ clear_forked_pbar()
+ return process_alma_erratum(*args)
def process_alma_erratum(release, advisory):
""" Process a single Alma Linux Erratum
"""
- from errata.utils import get_or_create_erratum
erratum_name = advisory.get('id')
issue_date = advisory.get('issued_date')
synopsis = advisory.get('title')
@@ -110,7 +112,6 @@ def process_alma_erratum(release, advisory):
def add_alma_erratum_osreleases(e, release):
""" Update OS Release for Alma Linux errata
"""
- from operatingsystems.utils import normalize_el_osrelease
osrelease_name = normalize_el_osrelease(f'Alma Linux {release}')
osrelease = get_or_create_osrelease(name=osrelease_name)
e.osreleases.add(osrelease)
@@ -150,7 +151,6 @@ def add_alma_erratum_packages(e, advisory):
def add_alma_erratum_modules(e, advisory):
""" Parse and add modules for Alma Linux errata
"""
- from modules.utils import get_matching_modules
fixed_packages = set()
modules = advisory.get('modules')
for module in modules:
@@ -160,8 +160,7 @@ def add_alma_erratum_modules(e, advisory):
stream = module.get('stream')
version = module.get('version')
matching_modules = get_matching_modules(name, stream, version, context, arch)
- for match in matching_modules:
+ for match in matching_modules.prefetch_related('packages'):
for fixed_package in match.packages.all():
- match.packages.add(fixed_package)
fixed_packages.add(fixed_package)
e.add_fixed_packages(fixed_packages)
diff --git a/errata/sources/distros/arch.py b/errata/sources/distros/arch.py
index e22de403..696c9efb 100644
--- a/errata/sources/distros/arch.py
+++ b/errata/sources/distros/arch.py
@@ -17,8 +17,7 @@
import concurrent.futures
import json
-from django.db import connections
-
+from errata.utils import get_or_create_erratum
from operatingsystems.utils import get_or_create_osrelease
from packages.models import Package
from packages.utils import (
@@ -26,16 +25,17 @@
)
from patchman.signals import pbar_start, pbar_update
from util import fetch_content, get_url
-from util.logging import error_message
+from util.logging import clear_forked_pbar, error_message
-def update_arch_errata(concurrent_processing=False):
+def update_arch_errata(concurrent_processing=False, max_workers=25):
""" Update Arch Linux Errata from the following sources:
https://security.archlinux.org/advisories.json
"""
add_arch_linux_osrelease()
advisories = fetch_arch_errata()
- parse_arch_errata(advisories, concurrent_processing)
+ if advisories:
+ parse_arch_errata(advisories, concurrent_processing, max_workers)
def fetch_arch_errata():
@@ -44,14 +44,16 @@ def fetch_arch_errata():
"""
res = get_url('https://security.archlinux.org/advisories.json')
advisories = fetch_content(res, 'Fetching Arch Advisories')
+ if advisories is None:
+ return None
return json.loads(advisories)
-def parse_arch_errata(advisories, concurrent_processing):
+def parse_arch_errata(advisories, concurrent_processing, max_workers=25):
""" Parse Arch Linux Errata Advisories
"""
if concurrent_processing:
- parse_arch_errata_concurrently(advisories)
+ parse_arch_errata_concurrently(advisories, max_workers)
else:
parse_arch_errata_serially(advisories)
@@ -67,15 +69,14 @@ def parse_arch_errata_serially(advisories):
pbar_update.send(sender=None, index=i + 1)
-def parse_arch_errata_concurrently(advisories):
+def parse_arch_errata_concurrently(advisories, max_workers=25):
""" Parse Arch Linux Errata Advisories concurrently
"""
osrelease = get_or_create_osrelease(name='Arch Linux')
- connections.close_all()
elen = len(advisories)
pbar_start.send(sender=None, ptext=f'Processing {elen} Arch Advisories', plen=elen)
i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_arch_erratum, advisory, osrelease) for advisory in advisories]
for future in concurrent.futures.as_completed(futures):
i += 1
@@ -85,7 +86,7 @@ def parse_arch_errata_concurrently(advisories):
def process_arch_erratum(advisory, osrelease):
""" Process a single Arch Linux Erratum
"""
- from errata.utils import get_or_create_erratum
+ clear_forked_pbar()
try:
name = advisory.get('name')
issue_date = advisory.get('date')
@@ -121,6 +122,8 @@ def add_arch_erratum_references(e, advisory):
e.add_reference('ASA', url)
raw_url = f'{url}/raw'
res = get_url(raw_url)
+ if res is None:
+ return
data = res.content
parse_arch_erratum_raw(e, data.decode())
@@ -152,6 +155,8 @@ def add_arch_erratum_packages(e, advisory):
group_id = advisory.get('group')
group_url = f'https://security.archlinux.org/group/{group_id}.json'
res = get_url(group_url)
+ if res is None:
+ return
data = res.content
group = json.loads(data)
packages = group.get('packages')
diff --git a/errata/sources/distros/debian.py b/errata/sources/distros/debian.py
index 8025b1bf..f14e3799 100644
--- a/errata/sources/distros/debian.py
+++ b/errata/sources/distros/debian.py
@@ -14,27 +14,29 @@
# You should have received a copy of the GNU General Public License
# along with Patchman. If not, see
-import concurrent.futures
import csv
import re
from datetime import datetime
from io import StringIO
from debian.deb822 import Dsc
-from django.db import connections
+from errata.utils import get_or_create_erratum
from operatingsystems.models import OSRelease
from operatingsystems.utils import get_or_create_osrelease
from packages.models import Package
from packages.utils import find_evr, get_or_create_package
from patchman.signals import pbar_start, pbar_update
-from util import extract, fetch_content, get_setting_of_type, get_url
-from util.logging import error_message, warning_message
+from util import (
+ extract, fetch_concurrently, fetch_content, get_setting_of_type, get_url,
+ run_concurrently,
+)
+from util.logging import clear_forked_pbar, error_message, warning_message
DSCs = {}
-def update_debian_errata(concurrent_processing=True):
+def update_debian_errata(concurrent_processing=True, max_workers=25):
""" Update Debian errata using:
https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list
https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list
@@ -46,8 +48,8 @@ def update_debian_errata(concurrent_processing=True):
advisories = dsas + dlas
fetch_dscs_from_debian_package_file_maps()
accepted_codenames = get_accepted_debian_codenames()
- errata = parse_debian_errata(advisories, accepted_codenames)
- create_debian_errata(errata, accepted_codenames, concurrent_processing)
+ errata = parse_debian_errata(advisories, accepted_codenames, concurrent_processing, max_workers)
+ create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers)
def fetch_debian_dsa_advisories():
@@ -108,12 +110,14 @@ def parse_debian_package_file_map(data, repo):
parsing_dsc = False
-def parse_debian_errata(advisories, accepted_codenames):
+def parse_debian_errata(advisories, accepted_codenames, concurrent_processing=True, max_workers=25):
""" Parse Debian DSA/DLA files for security advisories
"""
distro_pattern = re.compile(r'^\t\[(.+?)\] - .*')
title_pattern = re.compile(r'^\[(.+?)\] (.+?) (.+?)[ ]+[-]+ (.*)')
+ distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)')
errata = []
+ dsc_fetches = []
e = {'packages': {}, 'cve_ids': [], 'releases': []}
for line in advisories.splitlines():
if line.startswith('['):
@@ -132,9 +136,17 @@ def parse_debian_errata(advisories, accepted_codenames):
e['releases'].append(release)
if not e.get('packages').get(release):
e['packages'][release] = []
- e['packages'][release].append(parse_debian_erratum_package(line, accepted_codenames))
+ pkg_match = re.match(distro_package_pattern, line)
+ if pkg_match and pkg_match.group(1) in accepted_codenames:
+ source_package = pkg_match.group(2)
+ source_version = pkg_match.group(3)
+ dsc_fetches.append((source_package, source_version))
+ e['packages'][release].append((source_package, source_version))
+ else:
+ e['packages'][release].append(None)
# add the last one
errata = add_errata_by_codename(errata, e, accepted_codenames)
+ fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing, max_workers)
return errata
@@ -162,11 +174,11 @@ def parse_debian_erratum_advisory(e, match):
return e
-def create_debian_errata(errata, accepted_codenames, concurrent_processing):
+def create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers=25):
""" Create Debian Errata
"""
if concurrent_processing:
- create_debian_errata_concurrently(errata, accepted_codenames)
+ create_debian_errata_concurrently(errata, accepted_codenames, max_workers)
else:
create_debian_errata_serially(errata, accepted_codenames)
@@ -181,25 +193,25 @@ def create_debian_errata_serially(errata, accepted_codenames):
pbar_update.send(sender=None, index=i + 1)
-def create_debian_errata_concurrently(errata, accepted_codenames):
+def create_debian_errata_concurrently(errata, accepted_codenames, max_workers=25):
""" Create Debian Errata concurrently
"""
- connections.close_all()
elen = len(errata)
pbar_start.send(sender=None, ptext=f'Processing {elen} Debian Errata', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(process_debian_erratum, erratum, accepted_codenames) for erratum in errata]
- for future in concurrent.futures.as_completed(futures):
- i += 1
- pbar_update.send(sender=None, index=i + 1)
+ args = [(erratum, accepted_codenames) for erratum in errata]
+ for i, _ in enumerate(run_concurrently(process_debian_erratum_wrapper, args, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+
+
+def process_debian_erratum_wrapper(args):
+ clear_forked_pbar()
+ return process_debian_erratum(*args)
def process_debian_erratum(erratum, accepted_codenames):
""" Process a single Debian Erratum
"""
try:
- from errata.utils import get_or_create_erratum
erratum_name = erratum.get('name')
e, created = get_or_create_erratum(
name=erratum_name,
@@ -221,19 +233,23 @@ def process_debian_erratum(erratum, accepted_codenames):
error_message(text=exc)
-def parse_debian_erratum_package(line, accepted_codenames):
- """ Parse the codename and source package from a DSA/DLA file
- Returns the source package and source version
+def fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing=True, max_workers=25):
+ """ Fetch DSC package lists with a progress bar
"""
- distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)')
- match = re.match(distro_package_pattern, line)
- if match:
- codename = match.group(1)
- if codename in accepted_codenames:
- source_package = match.group(2)
- source_version = match.group(3)
- fetch_debian_dsc_package_list(source_package, source_version)
- return source_package, source_version
+ flen = len(dsc_fetches)
+ pbar_start.send(sender=None, ptext=f'Fetching {flen} Debian DSC files', plen=flen)
+ if concurrent_processing:
+ for i, _ in enumerate(fetch_concurrently(fetch_dsc_worker, dsc_fetches, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+ else:
+ for i, (package, version) in enumerate(dsc_fetches):
+ fetch_debian_dsc_package_list(package, version)
+ pbar_update.send(sender=None, index=i + 1)
+
+
+def fetch_dsc_worker(item, session):
+ package, version = item
+ fetch_debian_dsc_package_list(package, version, session=session)
def get_debian_dsc_package_list(package, version):
@@ -246,14 +262,14 @@ def get_debian_dsc_package_list(package, version):
return package_list
-def fetch_debian_dsc_package_list(package, version):
+def fetch_debian_dsc_package_list(package, version, session=None):
""" Fetch the package list from a DSC file for a given source package/version
"""
if not DSCs.get(package) or not DSCs[package].get(version):
warning_message(text=f'No DSC found for {package} {version}')
return
source_url = DSCs[package][version]['url']
- res = get_url(source_url)
+ res = get_url(source_url, session=session)
data = res.content
dsc = Dsc(data.decode())
package_list = dsc.get('package-list')
@@ -301,6 +317,8 @@ def create_debian_os_releases(codename_to_version):
def process_debian_erratum_fixed_packages(e, package_data):
""" Process packages fixed in a Debian errata
"""
+ if not package_data:
+ return
source_package, source_version = package_data
epoch, ver, rel = find_evr(source_version)
package_list = get_debian_dsc_package_list(source_package, source_version)
diff --git a/errata/sources/distros/rocky.py b/errata/sources/distros/rocky.py
index 272ac2a7..034e043f 100644
--- a/errata/sources/distros/rocky.py
+++ b/errata/sources/distros/rocky.py
@@ -17,28 +17,33 @@
import concurrent.futures
import json
-from django.db import connections
from django.db.utils import OperationalError
from tenacity import (
retry, retry_if_exception_type, stop_after_attempt, wait_exponential,
)
+from errata.utils import get_or_create_erratum
+from modules.models import Module
+from modules.utils import get_matching_modules
from operatingsystems.utils import get_or_create_osrelease
from packages.models import Package
from packages.utils import get_or_create_package, parse_package_string
from patchman.signals import pbar_start, pbar_update
-from util import fetch_content, get_url
-from util.logging import error_message, info_message
+from util import fetch_content, get_url, run_concurrently
+from util.logging import clear_forked_pbar, error_message, info_message
-def update_rocky_errata(concurrent_processing=True):
+def update_rocky_errata(concurrent_processing=True, max_workers=25):
""" Update Rocky Linux errata
"""
rocky_errata_api_host = 'https://apollo.build.resf.org'
rocky_errata_api_url = '/api/v3/'
if check_rocky_errata_endpoint_health(rocky_errata_api_host):
- advisories = fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing)
- process_rocky_errata(advisories, concurrent_processing)
+ advisories = fetch_rocky_advisories(
+ rocky_errata_api_host, rocky_errata_api_url,
+ concurrent_processing, max_workers,
+ )
+ process_rocky_errata(advisories, concurrent_processing, max_workers)
def check_rocky_errata_endpoint_health(rocky_errata_api_host):
@@ -66,11 +71,11 @@ def check_rocky_errata_endpoint_health(rocky_errata_api_host):
return False
-def fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing):
+def fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing, max_workers=25):
""" Fetch Rocky Linux advisories and return the list
"""
if concurrent_processing:
- return fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url)
+ return fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url, max_workers)
else:
return fetch_rocky_advisories_serially(rocky_errata_api_host, rocky_errata_api_url)
@@ -103,7 +108,7 @@ def fetch_rocky_advisories_serially(rocky_errata_api_host, rocky_errata_api_url)
return advisories
-def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url):
+def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url, max_workers=25):
""" Fetch Rocky Linux advisories concurrently and return the list
"""
rocky_errata_advisories_url = rocky_errata_api_host + rocky_errata_api_url + 'advisories/'
@@ -119,7 +124,7 @@ def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_
ptext = 'Fetching Rocky Advisories'
pbar_start.send(sender=None, ptext=ptext, plen=pages)
i = 0
- with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(get_rocky_advisory, rocky_errata_advisories_url, page)
for page in range(1, pages + 1)]
for future in concurrent.futures.as_completed(futures):
@@ -140,11 +145,11 @@ def get_rocky_advisory(rocky_errata_advisories_url, page):
return advisories_dict.get('advisories')
-def process_rocky_errata(advisories, concurrent_processing):
+def process_rocky_errata(advisories, concurrent_processing, max_workers=25):
""" Process Rocky Linux Errata
"""
if concurrent_processing:
- process_rocky_errata_concurrently(advisories)
+ process_rocky_errata_concurrently(advisories, max_workers)
else:
process_rocky_errata_serially(advisories)
@@ -159,18 +164,18 @@ def process_rocky_errata_serially(advisories):
pbar_update.send(sender=None, index=i + 1)
-def process_rocky_errata_concurrently(advisories):
+def process_rocky_errata_concurrently(advisories, max_workers=25):
""" Process Rocky Linux errata concurrently
"""
- connections.close_all()
elen = len(advisories)
pbar_start.send(sender=None, ptext=f'Processing {elen} Rocky Errata', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(process_rocky_erratum, advisory) for advisory in advisories]
- for future in concurrent.futures.as_completed(futures):
- i += 1
- pbar_update.send(sender=None, index=i + 1)
+ for i, _ in enumerate(run_concurrently(process_rocky_erratum_wrapper, advisories, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+
+
+def process_rocky_erratum_wrapper(advisory):
+ clear_forked_pbar()
+ return process_rocky_erratum(advisory)
@retry(
@@ -181,7 +186,6 @@ def process_rocky_errata_concurrently(advisories):
def process_rocky_erratum(advisory):
""" Process a single Rocky Linux erratum
"""
- from errata.utils import get_or_create_erratum
try:
erratum_name = advisory.get('name')
e_type = advisory.get('kind').lower().replace(' ', '')
@@ -230,9 +234,9 @@ def add_rocky_erratum_oses(e, advisory):
def add_rocky_erratum_packages(e, advisory):
""" Parse and add packages for Rocky Linux errata
"""
- from modules.utils import get_matching_modules
packages = advisory.get('packages')
fixed_packages = set()
+ module_package_adds = {}
for package in packages:
package_name = package.get('nevra')
if package_name:
@@ -253,5 +257,9 @@ def add_rocky_erratum_packages(e, advisory):
arch,
)
for match in matching_modules:
- match.packages.add(fixed_package)
+ if match.pk not in module_package_adds:
+ module_package_adds[match.pk] = set()
+ module_package_adds[match.pk].add(fixed_package)
+ for module_pk, pkgs in module_package_adds.items():
+ Module.objects.get(pk=module_pk).packages.add(*pkgs)
e.add_fixed_packages(fixed_packages)
diff --git a/errata/sources/distros/ubuntu.py b/errata/sources/distros/ubuntu.py
index 5616331f..ec6036fc 100644
--- a/errata/sources/distros/ubuntu.py
+++ b/errata/sources/distros/ubuntu.py
@@ -14,15 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with Patchman. If not, see
-import concurrent.futures
import csv
import json
import os
from io import StringIO
from urllib.parse import urlparse
-from django.db import connections
-
+from errata.utils import get_or_create_erratum
from operatingsystems.models import OSRelease, OSVariant
from operatingsystems.utils import get_or_create_osrelease
from packages.models import Package
@@ -33,11 +31,12 @@
from patchman.signals import pbar_start, pbar_update
from util import (
bunzip2, fetch_content, get_setting_of_type, get_sha256, get_url,
+ run_concurrently,
)
-from util.logging import error_message
+from util.logging import clear_forked_pbar, error_message
-def update_ubuntu_errata(concurrent_processing=False):
+def update_ubuntu_errata(concurrent_processing=True, max_workers=25):
""" Update Ubuntu errata
"""
codenames = retrieve_ubuntu_codenames()
@@ -47,7 +46,7 @@ def update_ubuntu_errata(concurrent_processing=False):
expected_checksum = fetch_ubuntu_usn_db_checksum()
actual_checksum = get_sha256(data)
if actual_checksum == expected_checksum:
- parse_usn_data(data, concurrent_processing)
+ parse_usn_data(data, concurrent_processing, max_workers)
else:
e = 'Ubuntu USN DB checksum mismatch, skipping Ubuntu errata parsing\n'
e += f'{actual_checksum} (actual) != {expected_checksum} (expected)'
@@ -70,14 +69,14 @@ def fetch_ubuntu_usn_db_checksum():
return fetch_content(res, 'Fetching Ubuntu Errata Checksum').decode().split()[0]
-def parse_usn_data(data, concurrent_processing):
+def parse_usn_data(data, concurrent_processing, max_workers=25):
""" Parse the Ubuntu USN data
"""
accepted_releases = get_accepted_ubuntu_codenames()
extracted = bunzip2(data).decode()
advisories = json.loads(extracted)
if concurrent_processing:
- parse_usn_data_concurrently(advisories, accepted_releases)
+ parse_usn_data_concurrently(advisories, accepted_releases, max_workers)
else:
parse_usn_data_serially(advisories, accepted_releases)
@@ -92,25 +91,24 @@ def parse_usn_data_serially(advisories, accepted_releases):
pbar_update.send(sender=None, index=i + 1)
-def parse_usn_data_concurrently(advisories, accepted_releases):
+def parse_usn_data_concurrently(advisories, accepted_releases, max_workers=25):
""" Parse the Ubuntu USN data concurrently
"""
- connections.close_all()
elen = len(advisories)
pbar_start.send(sender=None, ptext=f'Processing {elen} Ubuntu Errata', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(process_usn, usn_id, advisory, accepted_releases)
- for usn_id, advisory in advisories.items()]
- for future in concurrent.futures.as_completed(futures):
- i += 1
- pbar_update.send(sender=None, index=i + 1)
+ args = [(usn_id, advisory, accepted_releases) for usn_id, advisory in advisories.items()]
+ for i, _ in enumerate(run_concurrently(process_usn_wrapper, args, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+
+
+def process_usn_wrapper(args):
+ clear_forked_pbar()
+ return process_usn(*args)
def process_usn(usn_id, advisory, accepted_releases):
""" Process a single USN advisory
"""
- from errata.utils import get_or_create_erratum
try:
affected_releases = advisory.get('releases', {}).keys()
if not release_is_affected(affected_releases, accepted_releases):
diff --git a/errata/sources/repos/yum.py b/errata/sources/repos/yum.py
index 72813436..424e688d 100644
--- a/errata/sources/repos/yum.py
+++ b/errata/sources/repos/yum.py
@@ -14,11 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with Patchman. If not, see
-import concurrent.futures
-
-from django.db import connections
-
from errata.models import Erratum
from packages.models import PackageUpdate
from patchman.signals import pbar_start, pbar_update
-from util import tz_aware_datetime
+from util import fetch_concurrently, run_concurrently, tz_aware_datetime
from util.logging import warning_message
@@ -62,19 +58,25 @@ def get_or_create_erratum(name, e_type, issue_date, synopsis):
return e, created
-def mark_errata_security_updates():
+def mark_errata_security_updates(concurrent_processing=True, max_workers=25):
""" For each set of erratum packages, modify any PackageUpdate that
should be marked as a security update.
"""
- connections.close_all()
elen = Erratum.objects.count()
pbar_start.send(sender=None, ptext=f'Scanning {elen} Errata for security updates', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(e.scan_for_security_updates) for e in Erratum.objects.all()]
- for future in concurrent.futures.as_completed(futures):
+ if concurrent_processing:
+ pks = list(Erratum.objects.values_list('pk', flat=True))
+ for i, _ in enumerate(run_concurrently(scan_security_worker, pks, max_workers)):
+ pbar_update.send(sender=None, index=i + 1)
+ else:
+ for i, e in enumerate(Erratum.objects.all()):
+ e.scan_for_security_updates()
pbar_update.send(sender=None, index=i + 1)
- i += 1
+
+
+def scan_security_worker(pk):
+ e = Erratum.objects.get(pk=pk)
+ return e.scan_for_security_updates()
def scan_package_updates_for_affected_packages():
@@ -88,15 +90,36 @@ def scan_package_updates_for_affected_packages():
e.affected_packages.add(pu.oldpackage)
-def enrich_errata():
+def enrich_errata(concurrent_processing=True, max_workers=25):
""" Enrich Errata with data from osv.dev
"""
- connections.close_all()
- elen = Erratum.objects.count()
- pbar_start.send(sender=None, ptext=f'Adding osv.dev data to {elen} Errata', plen=elen)
- i = 0
- with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
- futures = [executor.submit(e.fetch_osv_dev_data) for e in Erratum.objects.all()]
- for future in concurrent.futures.as_completed(futures):
+ errata = list(Erratum.objects.all())
+ elen = len(errata)
+
+ # phase 1: fetch osv.dev data
+ pbar_start.send(sender=None, ptext=f'Fetching osv.dev data for {elen} Errata', plen=elen)
+ results = []
+ if concurrent_processing:
+ for i, result in enumerate(fetch_concurrently(fetch_osv_worker, errata, max_workers)):
+ erratum, osv_data = result
+ if osv_data is not None:
+ results.append((erratum, osv_data))
+ pbar_update.send(sender=None, index=i + 1)
+ else:
+ for i, e in enumerate(errata):
+ osv_data = e.fetch_osv_dev_data()
+ if osv_data is not None:
+ results.append((e, osv_data))
+ pbar_update.send(sender=None, index=i + 1)
+
+ # phase 2: parse and write to db (serial, no lock contention)
+ rlen = len(results)
+ if rlen > 0:
+ pbar_start.send(sender=None, ptext=f'Parsing osv.dev data for {rlen} Errata', plen=rlen)
+ for i, (erratum, osv_data) in enumerate(results):
+ erratum.parse_osv_dev_data(osv_data)
pbar_update.send(sender=None, index=i + 1)
- i += 1
+
+
+def fetch_osv_worker(erratum, session):
+ return (erratum, erratum.fetch_osv_dev_data(session))
diff --git a/sbin/patchman b/sbin/patchman
index ea81801a..7040f228 100755
--- a/sbin/patchman
+++ b/sbin/patchman
@@ -46,6 +46,7 @@ from reports.tasks import remove_reports_with_no_hosts
from repos.models import Repository
from repos.utils import clean_repos
from security.utils import update_cves, update_cwes
+from util import get_setting_of_type
from util.logging import info_message, set_quiet_mode
@@ -482,10 +483,20 @@ def process_args(args):
dns_checks(args.host)
showhelp = False
if args.update_errata:
+ concurrent = get_setting_of_type(
+ setting_name='CONCURRENT_PROCESSING',
+ setting_type=bool,
+ default=True,
+ )
+ max_workers = get_setting_of_type(
+ setting_name='CONCURRENT_WORKERS',
+ setting_type=int,
+ default=25,
+ )
update_errata(args.erratum_type, args.force, args.repo)
scan_package_updates_for_affected_packages()
- mark_errata_security_updates()
- enrich_errata()
+ mark_errata_security_updates(concurrent, max_workers)
+ enrich_errata(concurrent, max_workers)
showhelp = False
if args.update_cves:
update_cves(args.cve, args.fetch_nist_data)
diff --git a/util/__init__.py b/util/__init__.py
index 7797f522..c0632d09 100644
--- a/util/__init__.py
+++ b/util/__init__.py
@@ -101,7 +101,7 @@ def fetch_content(response, text='', ljust=35):
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=False,
)
-def get_url(url, headers=None, params=None):
+def get_url(url, headers=None, params=None, session=None):
""" Perform a http GET on a URL. Return None on error.
"""
response = None
@@ -109,9 +109,10 @@ def get_url(url, headers=None, params=None):
headers = {}
if not params:
params = {}
+ requester = session or requests
try:
debug_message(text=f'Trying {url} headers:{headers} params:{params}')
- response = requests.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30)
+ response = requester.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30)
debug_message(text=f'{response.status_code}: {response.headers}')
if response.status_code in [403, 404]:
return response
@@ -297,3 +298,47 @@ def get_datetime_now():
""" Return the current timezone-aware datetime removing microseconds
"""
return datetime.now().astimezone().replace(microsecond=0)
+
+
+def fetch_concurrently(func, items, max_workers=25):
+ """ Run func across items using threads with pooled HTTP sessions,
+ yielding results as they complete. Ideal for I/O-bound work
+ (network fetches). func(item, session) receives a shared
+ requests.Session with connection pooling.
+ """
+ import concurrent.futures
+
+ from requests.adapters import HTTPAdapter
+
+ session = requests.Session()
+ adapter = HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers)
+ session.mount('https://', adapter)
+ session.mount('http://', adapter)
+ items = list(items)
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures = {executor.submit(func, item, session): item for item in items}
+ for future in concurrent.futures.as_completed(futures):
+ yield future.result()
+
+
+def run_concurrently(func, items, max_workers=25):
+ """ Run func across items using multiprocessing, yielding results as
+ they complete. Uses multiprocessing.Pool on Python < 3.12 to avoid
+ ProcessPoolExecutor deadlock (CPython #105829).
+ """
+ import concurrent.futures
+ import multiprocessing
+ import sys
+
+ from django.db import connections
+ connections.close_all()
+ items = list(items)
+ if sys.version_info >= (3, 12):
+ with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
+ futures = [executor.submit(func, item) for item in items]
+ for future in concurrent.futures.as_completed(futures):
+ yield future.result()
+ else:
+ with multiprocessing.Pool(processes=max_workers) as pool:
+ for result in pool.imap_unordered(func, items, chunksize=1):
+ yield result
diff --git a/util/logging.py b/util/logging.py
index bf532e54..44644ecb 100644
--- a/util/logging.py
+++ b/util/logging.py
@@ -16,6 +16,7 @@
import logging
+import os
from django.conf import settings
from tqdm import tqdm
@@ -24,6 +25,8 @@
debug_message_s, error_message_s, info_message_s, warning_message_s,
)
+tqdm.monitor_interval = 0
+
log_format = '[%(asctime)s] %(levelname)s: %(message)s'
if settings.DEBUG:
logging_level = logging.DEBUG
@@ -37,6 +40,18 @@
pbar = None
+def clear_forked_pbar():
+ """ Clear any tqdm instances inherited from a parent process via fork.
+ Prevents subprocess tqdm.write() from redrawing a stale progress bar
+ on the parent's terminal. Only clears if running in a child process.
+ """
+ if os.getpid() != _main_pid and tqdm._instances:
+ tqdm._instances.clear()
+
+
+_main_pid = os.getpid()
+
+
def get_quiet_mode():
""" Get the global quiet_mode
"""