From 4d8f8deb4e069388d51933c382e8802e102acc87 Mon Sep 17 00:00:00 2001 From: Marcus Furlong Date: Wed, 15 Apr 2026 18:09:33 -0400 Subject: [PATCH 1/2] add concurrent processing with deadlock workaround and progress bar fixes - add run_concurrently() helper with multiprocessing.Pool fallback for python < 3.12 (CPython #105829) - add CONCURRENT_PROCESSING and CONCURRENT_WORKERS settings - add max_workers param and serial fallback to all errata sources - switch osv.dev enrichment to two-phase fetch/parse with connection pooling - add exception handling for threadpool futures in enrich_errata - switch arch to ThreadPoolExecutor, default serial to avoid rate limiting - add null guards for arch errata fetch failures - disable tqdm TMonitor thread with monitor_interval = 0 - add clear_forked_pbar to wrapper functions for forked workers - add prefetch_related to alma modules, batch module adds for rocky --- errata/models.py | 15 ++++--- errata/sources/distros/alma.py | 43 +++++++++---------- errata/sources/distros/arch.py | 27 +++++++----- errata/sources/distros/debian.py | 67 ++++++++++++++++------------- errata/sources/distros/rocky.py | 54 +++++++++++++---------- errata/sources/distros/ubuntu.py | 34 +++++++-------- errata/sources/repos/yum.py | 26 ++++++------ errata/tasks.py | 18 ++++++-- errata/utils.py | 73 +++++++++++++++++++++++--------- sbin/patchman | 15 ++++++- util/__init__.py | 28 +++++++++++- util/logging.py | 15 +++++++ 12 files changed, 263 insertions(+), 152 deletions(-) diff --git a/errata/models.py b/errata/models.py index 8f6a6f74d..ba4baec78 100644 --- a/errata/models.py +++ b/errata/models.py @@ -91,15 +91,16 @@ def _mark_updates_security(self, updates): error_message(text=e) update.delete() - def fetch_osv_dev_data(self): + def fetch_osv_dev_data(self, session=None): + """ Fetch osv.dev JSON for this erratum. Returns parsed JSON or None. + """ osv_dev_url = f'https://api.osv.dev/v1/vulns/{self.name}' - res = get_url(osv_dev_url) + res = get_url(osv_dev_url, session=session) + if res is None: + return None if res.status_code == 404: - error_message(text=f'404 - Skipping {self.name} - {osv_dev_url}') - return - data = res.content - osv_dev_json = json.loads(data) - self.parse_osv_dev_data(osv_dev_json) + return None + return json.loads(res.content) def parse_osv_dev_data(self, osv_dev_json): from django.db.models import Q diff --git a/errata/sources/distros/alma.py b/errata/sources/distros/alma.py index d9e1dbf9e..8b7b3163d 100644 --- a/errata/sources/distros/alma.py +++ b/errata/sources/distros/alma.py @@ -14,19 +14,21 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see -import concurrent.futures import json -from django.db import connections - -from operatingsystems.utils import get_or_create_osrelease +from errata.utils import get_or_create_erratum +from modules.utils import get_matching_modules +from operatingsystems.utils import ( + get_or_create_osrelease, normalize_el_osrelease, +) from packages.models import Package from packages.utils import get_or_create_package, parse_package_string from patchman.signals import pbar_start, pbar_update -from util import fetch_content, get_setting_of_type, get_url +from util import fetch_content, get_setting_of_type, get_url, run_concurrently +from util.logging import clear_forked_pbar -def update_alma_errata(concurrent_processing=True): +def update_alma_errata(concurrent_processing=True, max_workers=25): """ Update Alma Linux advisories from errata.almalinux.org: https://errata.almalinux.org/8/errata.full.json https://errata.almalinux.org/9/errata.full.json @@ -40,7 +42,7 @@ def update_alma_errata(concurrent_processing=True): ) for release in alma_releases: advisories = fetch_alma_advisories(release) - process_alma_errata(release, advisories, concurrent_processing) + process_alma_errata(release, advisories, concurrent_processing, max_workers) def fetch_alma_advisories(release): @@ -54,11 +56,11 @@ def fetch_alma_advisories(release): return advisories -def process_alma_errata(release, advisories, concurrent_processing): +def process_alma_errata(release, advisories, concurrent_processing, max_workers=25): """ Process Alma Linux Errata """ if concurrent_processing: - process_alma_errata_concurrently(release, advisories) + process_alma_errata_concurrently(release, advisories, max_workers) else: process_alma_errata_serially(release, advisories) @@ -73,24 +75,24 @@ def process_alma_errata_serially(release, advisories): pbar_update.send(sender=None, index=i + 1) -def process_alma_errata_concurrently(release, advisories): +def process_alma_errata_concurrently(release, advisories, max_workers=25): """ Process Alma Linux Errata concurrently """ - connections.close_all() elen = len(advisories) pbar_start.send(sender=None, ptext=f'Processing {elen} Alma {release} Errata', plen=elen) - i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: - futures = [executor.submit(process_alma_erratum, release, advisory) for advisory in advisories] - for future in concurrent.futures.as_completed(futures): - i += 1 - pbar_update.send(sender=None, index=i + 1) + args = [(release, advisory) for advisory in advisories] + for i, _ in enumerate(run_concurrently(process_alma_erratum_wrapper, args, max_workers)): + pbar_update.send(sender=None, index=i + 1) + + +def process_alma_erratum_wrapper(args): + clear_forked_pbar() + return process_alma_erratum(*args) def process_alma_erratum(release, advisory): """ Process a single Alma Linux Erratum """ - from errata.utils import get_or_create_erratum erratum_name = advisory.get('id') issue_date = advisory.get('issued_date') synopsis = advisory.get('title') @@ -110,7 +112,6 @@ def process_alma_erratum(release, advisory): def add_alma_erratum_osreleases(e, release): """ Update OS Release for Alma Linux errata """ - from operatingsystems.utils import normalize_el_osrelease osrelease_name = normalize_el_osrelease(f'Alma Linux {release}') osrelease = get_or_create_osrelease(name=osrelease_name) e.osreleases.add(osrelease) @@ -150,7 +151,6 @@ def add_alma_erratum_packages(e, advisory): def add_alma_erratum_modules(e, advisory): """ Parse and add modules for Alma Linux errata """ - from modules.utils import get_matching_modules fixed_packages = set() modules = advisory.get('modules') for module in modules: @@ -160,8 +160,7 @@ def add_alma_erratum_modules(e, advisory): stream = module.get('stream') version = module.get('version') matching_modules = get_matching_modules(name, stream, version, context, arch) - for match in matching_modules: + for match in matching_modules.prefetch_related('packages'): for fixed_package in match.packages.all(): - match.packages.add(fixed_package) fixed_packages.add(fixed_package) e.add_fixed_packages(fixed_packages) diff --git a/errata/sources/distros/arch.py b/errata/sources/distros/arch.py index e22de4034..696c9efb9 100644 --- a/errata/sources/distros/arch.py +++ b/errata/sources/distros/arch.py @@ -17,8 +17,7 @@ import concurrent.futures import json -from django.db import connections - +from errata.utils import get_or_create_erratum from operatingsystems.utils import get_or_create_osrelease from packages.models import Package from packages.utils import ( @@ -26,16 +25,17 @@ ) from patchman.signals import pbar_start, pbar_update from util import fetch_content, get_url -from util.logging import error_message +from util.logging import clear_forked_pbar, error_message -def update_arch_errata(concurrent_processing=False): +def update_arch_errata(concurrent_processing=False, max_workers=25): """ Update Arch Linux Errata from the following sources: https://security.archlinux.org/advisories.json """ add_arch_linux_osrelease() advisories = fetch_arch_errata() - parse_arch_errata(advisories, concurrent_processing) + if advisories: + parse_arch_errata(advisories, concurrent_processing, max_workers) def fetch_arch_errata(): @@ -44,14 +44,16 @@ def fetch_arch_errata(): """ res = get_url('https://security.archlinux.org/advisories.json') advisories = fetch_content(res, 'Fetching Arch Advisories') + if advisories is None: + return None return json.loads(advisories) -def parse_arch_errata(advisories, concurrent_processing): +def parse_arch_errata(advisories, concurrent_processing, max_workers=25): """ Parse Arch Linux Errata Advisories """ if concurrent_processing: - parse_arch_errata_concurrently(advisories) + parse_arch_errata_concurrently(advisories, max_workers) else: parse_arch_errata_serially(advisories) @@ -67,15 +69,14 @@ def parse_arch_errata_serially(advisories): pbar_update.send(sender=None, index=i + 1) -def parse_arch_errata_concurrently(advisories): +def parse_arch_errata_concurrently(advisories, max_workers=25): """ Parse Arch Linux Errata Advisories concurrently """ osrelease = get_or_create_osrelease(name='Arch Linux') - connections.close_all() elen = len(advisories) pbar_start.send(sender=None, ptext=f'Processing {elen} Arch Advisories', plen=elen) i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_arch_erratum, advisory, osrelease) for advisory in advisories] for future in concurrent.futures.as_completed(futures): i += 1 @@ -85,7 +86,7 @@ def parse_arch_errata_concurrently(advisories): def process_arch_erratum(advisory, osrelease): """ Process a single Arch Linux Erratum """ - from errata.utils import get_or_create_erratum + clear_forked_pbar() try: name = advisory.get('name') issue_date = advisory.get('date') @@ -121,6 +122,8 @@ def add_arch_erratum_references(e, advisory): e.add_reference('ASA', url) raw_url = f'{url}/raw' res = get_url(raw_url) + if res is None: + return data = res.content parse_arch_erratum_raw(e, data.decode()) @@ -152,6 +155,8 @@ def add_arch_erratum_packages(e, advisory): group_id = advisory.get('group') group_url = f'https://security.archlinux.org/group/{group_id}.json' res = get_url(group_url) + if res is None: + return data = res.content group = json.loads(data) packages = group.get('packages') diff --git a/errata/sources/distros/debian.py b/errata/sources/distros/debian.py index 8025b1bf4..d508c325e 100644 --- a/errata/sources/distros/debian.py +++ b/errata/sources/distros/debian.py @@ -14,27 +14,28 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see -import concurrent.futures import csv import re from datetime import datetime from io import StringIO from debian.deb822 import Dsc -from django.db import connections +from errata.utils import get_or_create_erratum from operatingsystems.models import OSRelease from operatingsystems.utils import get_or_create_osrelease from packages.models import Package from packages.utils import find_evr, get_or_create_package from patchman.signals import pbar_start, pbar_update -from util import extract, fetch_content, get_setting_of_type, get_url -from util.logging import error_message, warning_message +from util import ( + extract, fetch_content, get_setting_of_type, get_url, run_concurrently, +) +from util.logging import clear_forked_pbar, error_message, warning_message DSCs = {} -def update_debian_errata(concurrent_processing=True): +def update_debian_errata(concurrent_processing=True, max_workers=25): """ Update Debian errata using: https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list @@ -47,7 +48,7 @@ def update_debian_errata(concurrent_processing=True): fetch_dscs_from_debian_package_file_maps() accepted_codenames = get_accepted_debian_codenames() errata = parse_debian_errata(advisories, accepted_codenames) - create_debian_errata(errata, accepted_codenames, concurrent_processing) + create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers) def fetch_debian_dsa_advisories(): @@ -113,7 +114,9 @@ def parse_debian_errata(advisories, accepted_codenames): """ distro_pattern = re.compile(r'^\t\[(.+?)\] - .*') title_pattern = re.compile(r'^\[(.+?)\] (.+?) (.+?)[ ]+[-]+ (.*)') + distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)') errata = [] + dsc_fetches = [] e = {'packages': {}, 'cve_ids': [], 'releases': []} for line in advisories.splitlines(): if line.startswith('['): @@ -132,9 +135,17 @@ def parse_debian_errata(advisories, accepted_codenames): e['releases'].append(release) if not e.get('packages').get(release): e['packages'][release] = [] - e['packages'][release].append(parse_debian_erratum_package(line, accepted_codenames)) + pkg_match = re.match(distro_package_pattern, line) + if pkg_match and pkg_match.group(1) in accepted_codenames: + source_package = pkg_match.group(2) + source_version = pkg_match.group(3) + dsc_fetches.append((source_package, source_version)) + e['packages'][release].append((source_package, source_version)) + else: + e['packages'][release].append(None) # add the last one errata = add_errata_by_codename(errata, e, accepted_codenames) + fetch_debian_dsc_package_lists(dsc_fetches) return errata @@ -162,11 +173,11 @@ def parse_debian_erratum_advisory(e, match): return e -def create_debian_errata(errata, accepted_codenames, concurrent_processing): +def create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers=25): """ Create Debian Errata """ if concurrent_processing: - create_debian_errata_concurrently(errata, accepted_codenames) + create_debian_errata_concurrently(errata, accepted_codenames, max_workers) else: create_debian_errata_serially(errata, accepted_codenames) @@ -181,25 +192,25 @@ def create_debian_errata_serially(errata, accepted_codenames): pbar_update.send(sender=None, index=i + 1) -def create_debian_errata_concurrently(errata, accepted_codenames): +def create_debian_errata_concurrently(errata, accepted_codenames, max_workers=25): """ Create Debian Errata concurrently """ - connections.close_all() elen = len(errata) pbar_start.send(sender=None, ptext=f'Processing {elen} Debian Errata', plen=elen) - i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: - futures = [executor.submit(process_debian_erratum, erratum, accepted_codenames) for erratum in errata] - for future in concurrent.futures.as_completed(futures): - i += 1 - pbar_update.send(sender=None, index=i + 1) + args = [(erratum, accepted_codenames) for erratum in errata] + for i, _ in enumerate(run_concurrently(process_debian_erratum_wrapper, args, max_workers)): + pbar_update.send(sender=None, index=i + 1) + + +def process_debian_erratum_wrapper(args): + clear_forked_pbar() + return process_debian_erratum(*args) def process_debian_erratum(erratum, accepted_codenames): """ Process a single Debian Erratum """ try: - from errata.utils import get_or_create_erratum erratum_name = erratum.get('name') e, created = get_or_create_erratum( name=erratum_name, @@ -221,19 +232,13 @@ def process_debian_erratum(erratum, accepted_codenames): error_message(text=exc) -def parse_debian_erratum_package(line, accepted_codenames): - """ Parse the codename and source package from a DSA/DLA file - Returns the source package and source version +def fetch_debian_dsc_package_lists(dsc_fetches): + """ Fetch DSC package lists with a progress bar """ - distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)') - match = re.match(distro_package_pattern, line) - if match: - codename = match.group(1) - if codename in accepted_codenames: - source_package = match.group(2) - source_version = match.group(3) - fetch_debian_dsc_package_list(source_package, source_version) - return source_package, source_version + pbar_start.send(sender=None, ptext=f'Fetching {len(dsc_fetches)} Debian DSC files', plen=len(dsc_fetches)) + for i, (package, version) in enumerate(dsc_fetches): + fetch_debian_dsc_package_list(package, version) + pbar_update.send(sender=None, index=i + 1) def get_debian_dsc_package_list(package, version): @@ -301,6 +306,8 @@ def create_debian_os_releases(codename_to_version): def process_debian_erratum_fixed_packages(e, package_data): """ Process packages fixed in a Debian errata """ + if not package_data: + return source_package, source_version = package_data epoch, ver, rel = find_evr(source_version) package_list = get_debian_dsc_package_list(source_package, source_version) diff --git a/errata/sources/distros/rocky.py b/errata/sources/distros/rocky.py index 272ac2a7e..034e043f0 100644 --- a/errata/sources/distros/rocky.py +++ b/errata/sources/distros/rocky.py @@ -17,28 +17,33 @@ import concurrent.futures import json -from django.db import connections from django.db.utils import OperationalError from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_exponential, ) +from errata.utils import get_or_create_erratum +from modules.models import Module +from modules.utils import get_matching_modules from operatingsystems.utils import get_or_create_osrelease from packages.models import Package from packages.utils import get_or_create_package, parse_package_string from patchman.signals import pbar_start, pbar_update -from util import fetch_content, get_url -from util.logging import error_message, info_message +from util import fetch_content, get_url, run_concurrently +from util.logging import clear_forked_pbar, error_message, info_message -def update_rocky_errata(concurrent_processing=True): +def update_rocky_errata(concurrent_processing=True, max_workers=25): """ Update Rocky Linux errata """ rocky_errata_api_host = 'https://apollo.build.resf.org' rocky_errata_api_url = '/api/v3/' if check_rocky_errata_endpoint_health(rocky_errata_api_host): - advisories = fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing) - process_rocky_errata(advisories, concurrent_processing) + advisories = fetch_rocky_advisories( + rocky_errata_api_host, rocky_errata_api_url, + concurrent_processing, max_workers, + ) + process_rocky_errata(advisories, concurrent_processing, max_workers) def check_rocky_errata_endpoint_health(rocky_errata_api_host): @@ -66,11 +71,11 @@ def check_rocky_errata_endpoint_health(rocky_errata_api_host): return False -def fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing): +def fetch_rocky_advisories(rocky_errata_api_host, rocky_errata_api_url, concurrent_processing, max_workers=25): """ Fetch Rocky Linux advisories and return the list """ if concurrent_processing: - return fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url) + return fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url, max_workers) else: return fetch_rocky_advisories_serially(rocky_errata_api_host, rocky_errata_api_url) @@ -103,7 +108,7 @@ def fetch_rocky_advisories_serially(rocky_errata_api_host, rocky_errata_api_url) return advisories -def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url): +def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_url, max_workers=25): """ Fetch Rocky Linux advisories concurrently and return the list """ rocky_errata_advisories_url = rocky_errata_api_host + rocky_errata_api_url + 'advisories/' @@ -119,7 +124,7 @@ def fetch_rocky_advisories_concurrently(rocky_errata_api_host, rocky_errata_api_ ptext = 'Fetching Rocky Advisories' pbar_start.send(sender=None, ptext=ptext, plen=pages) i = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(get_rocky_advisory, rocky_errata_advisories_url, page) for page in range(1, pages + 1)] for future in concurrent.futures.as_completed(futures): @@ -140,11 +145,11 @@ def get_rocky_advisory(rocky_errata_advisories_url, page): return advisories_dict.get('advisories') -def process_rocky_errata(advisories, concurrent_processing): +def process_rocky_errata(advisories, concurrent_processing, max_workers=25): """ Process Rocky Linux Errata """ if concurrent_processing: - process_rocky_errata_concurrently(advisories) + process_rocky_errata_concurrently(advisories, max_workers) else: process_rocky_errata_serially(advisories) @@ -159,18 +164,18 @@ def process_rocky_errata_serially(advisories): pbar_update.send(sender=None, index=i + 1) -def process_rocky_errata_concurrently(advisories): +def process_rocky_errata_concurrently(advisories, max_workers=25): """ Process Rocky Linux errata concurrently """ - connections.close_all() elen = len(advisories) pbar_start.send(sender=None, ptext=f'Processing {elen} Rocky Errata', plen=elen) - i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: - futures = [executor.submit(process_rocky_erratum, advisory) for advisory in advisories] - for future in concurrent.futures.as_completed(futures): - i += 1 - pbar_update.send(sender=None, index=i + 1) + for i, _ in enumerate(run_concurrently(process_rocky_erratum_wrapper, advisories, max_workers)): + pbar_update.send(sender=None, index=i + 1) + + +def process_rocky_erratum_wrapper(advisory): + clear_forked_pbar() + return process_rocky_erratum(advisory) @retry( @@ -181,7 +186,6 @@ def process_rocky_errata_concurrently(advisories): def process_rocky_erratum(advisory): """ Process a single Rocky Linux erratum """ - from errata.utils import get_or_create_erratum try: erratum_name = advisory.get('name') e_type = advisory.get('kind').lower().replace(' ', '') @@ -230,9 +234,9 @@ def add_rocky_erratum_oses(e, advisory): def add_rocky_erratum_packages(e, advisory): """ Parse and add packages for Rocky Linux errata """ - from modules.utils import get_matching_modules packages = advisory.get('packages') fixed_packages = set() + module_package_adds = {} for package in packages: package_name = package.get('nevra') if package_name: @@ -253,5 +257,9 @@ def add_rocky_erratum_packages(e, advisory): arch, ) for match in matching_modules: - match.packages.add(fixed_package) + if match.pk not in module_package_adds: + module_package_adds[match.pk] = set() + module_package_adds[match.pk].add(fixed_package) + for module_pk, pkgs in module_package_adds.items(): + Module.objects.get(pk=module_pk).packages.add(*pkgs) e.add_fixed_packages(fixed_packages) diff --git a/errata/sources/distros/ubuntu.py b/errata/sources/distros/ubuntu.py index 5616331fc..ec6036fc7 100644 --- a/errata/sources/distros/ubuntu.py +++ b/errata/sources/distros/ubuntu.py @@ -14,15 +14,13 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see -import concurrent.futures import csv import json import os from io import StringIO from urllib.parse import urlparse -from django.db import connections - +from errata.utils import get_or_create_erratum from operatingsystems.models import OSRelease, OSVariant from operatingsystems.utils import get_or_create_osrelease from packages.models import Package @@ -33,11 +31,12 @@ from patchman.signals import pbar_start, pbar_update from util import ( bunzip2, fetch_content, get_setting_of_type, get_sha256, get_url, + run_concurrently, ) -from util.logging import error_message +from util.logging import clear_forked_pbar, error_message -def update_ubuntu_errata(concurrent_processing=False): +def update_ubuntu_errata(concurrent_processing=True, max_workers=25): """ Update Ubuntu errata """ codenames = retrieve_ubuntu_codenames() @@ -47,7 +46,7 @@ def update_ubuntu_errata(concurrent_processing=False): expected_checksum = fetch_ubuntu_usn_db_checksum() actual_checksum = get_sha256(data) if actual_checksum == expected_checksum: - parse_usn_data(data, concurrent_processing) + parse_usn_data(data, concurrent_processing, max_workers) else: e = 'Ubuntu USN DB checksum mismatch, skipping Ubuntu errata parsing\n' e += f'{actual_checksum} (actual) != {expected_checksum} (expected)' @@ -70,14 +69,14 @@ def fetch_ubuntu_usn_db_checksum(): return fetch_content(res, 'Fetching Ubuntu Errata Checksum').decode().split()[0] -def parse_usn_data(data, concurrent_processing): +def parse_usn_data(data, concurrent_processing, max_workers=25): """ Parse the Ubuntu USN data """ accepted_releases = get_accepted_ubuntu_codenames() extracted = bunzip2(data).decode() advisories = json.loads(extracted) if concurrent_processing: - parse_usn_data_concurrently(advisories, accepted_releases) + parse_usn_data_concurrently(advisories, accepted_releases, max_workers) else: parse_usn_data_serially(advisories, accepted_releases) @@ -92,25 +91,24 @@ def parse_usn_data_serially(advisories, accepted_releases): pbar_update.send(sender=None, index=i + 1) -def parse_usn_data_concurrently(advisories, accepted_releases): +def parse_usn_data_concurrently(advisories, accepted_releases, max_workers=25): """ Parse the Ubuntu USN data concurrently """ - connections.close_all() elen = len(advisories) pbar_start.send(sender=None, ptext=f'Processing {elen} Ubuntu Errata', plen=elen) - i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: - futures = [executor.submit(process_usn, usn_id, advisory, accepted_releases) - for usn_id, advisory in advisories.items()] - for future in concurrent.futures.as_completed(futures): - i += 1 - pbar_update.send(sender=None, index=i + 1) + args = [(usn_id, advisory, accepted_releases) for usn_id, advisory in advisories.items()] + for i, _ in enumerate(run_concurrently(process_usn_wrapper, args, max_workers)): + pbar_update.send(sender=None, index=i + 1) + + +def process_usn_wrapper(args): + clear_forked_pbar() + return process_usn(*args) def process_usn(usn_id, advisory, accepted_releases): """ Process a single USN advisory """ - from errata.utils import get_or_create_erratum try: affected_releases = advisory.get('releases', {}).keys() if not release_is_affected(affected_releases, accepted_releases): diff --git a/errata/sources/repos/yum.py b/errata/sources/repos/yum.py index 728134360..424e688dd 100644 --- a/errata/sources/repos/yum.py +++ b/errata/sources/repos/yum.py @@ -14,11 +14,9 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see 0: + pbar_start.send(sender=None, ptext=f'Parsing osv.dev data for {rlen} Errata', plen=rlen) + for i, (erratum, osv_data) in enumerate(results): + erratum.parse_osv_dev_data(osv_data) pbar_update.send(sender=None, index=i + 1) - i += 1 diff --git a/sbin/patchman b/sbin/patchman index ea81801a7..7040f228a 100755 --- a/sbin/patchman +++ b/sbin/patchman @@ -46,6 +46,7 @@ from reports.tasks import remove_reports_with_no_hosts from repos.models import Repository from repos.utils import clean_repos from security.utils import update_cves, update_cwes +from util import get_setting_of_type from util.logging import info_message, set_quiet_mode @@ -482,10 +483,20 @@ def process_args(args): dns_checks(args.host) showhelp = False if args.update_errata: + concurrent = get_setting_of_type( + setting_name='CONCURRENT_PROCESSING', + setting_type=bool, + default=True, + ) + max_workers = get_setting_of_type( + setting_name='CONCURRENT_WORKERS', + setting_type=int, + default=25, + ) update_errata(args.erratum_type, args.force, args.repo) scan_package_updates_for_affected_packages() - mark_errata_security_updates() - enrich_errata() + mark_errata_security_updates(concurrent, max_workers) + enrich_errata(concurrent, max_workers) showhelp = False if args.update_cves: update_cves(args.cve, args.fetch_nist_data) diff --git a/util/__init__.py b/util/__init__.py index 7797f5222..f9463ebc1 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -101,7 +101,7 @@ def fetch_content(response, text='', ljust=35): wait=wait_exponential(multiplier=1, min=1, max=10), reraise=False, ) -def get_url(url, headers=None, params=None): +def get_url(url, headers=None, params=None, session=None): """ Perform a http GET on a URL. Return None on error. """ response = None @@ -109,9 +109,10 @@ def get_url(url, headers=None, params=None): headers = {} if not params: params = {} + requester = session or requests try: debug_message(text=f'Trying {url} headers:{headers} params:{params}') - response = requests.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30) + response = requester.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30) debug_message(text=f'{response.status_code}: {response.headers}') if response.status_code in [403, 404]: return response @@ -297,3 +298,26 @@ def get_datetime_now(): """ Return the current timezone-aware datetime removing microseconds """ return datetime.now().astimezone().replace(microsecond=0) + + +def run_concurrently(func, items, max_workers=25): + """ Run func across items using multiprocessing, yielding results as + they complete. Uses multiprocessing.Pool on Python < 3.12 to avoid + ProcessPoolExecutor deadlock (CPython #105829). + """ + import concurrent.futures + import multiprocessing + import sys + + from django.db import connections + connections.close_all() + items = list(items) + if sys.version_info >= (3, 12): + with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(func, item) for item in items] + for future in concurrent.futures.as_completed(futures): + yield future.result() + else: + with multiprocessing.Pool(processes=max_workers) as pool: + for result in pool.imap_unordered(func, items, chunksize=1): + yield result diff --git a/util/logging.py b/util/logging.py index bf532e549..44644ecbe 100644 --- a/util/logging.py +++ b/util/logging.py @@ -16,6 +16,7 @@ import logging +import os from django.conf import settings from tqdm import tqdm @@ -24,6 +25,8 @@ debug_message_s, error_message_s, info_message_s, warning_message_s, ) +tqdm.monitor_interval = 0 + log_format = '[%(asctime)s] %(levelname)s: %(message)s' if settings.DEBUG: logging_level = logging.DEBUG @@ -37,6 +40,18 @@ pbar = None +def clear_forked_pbar(): + """ Clear any tqdm instances inherited from a parent process via fork. + Prevents subprocess tqdm.write() from redrawing a stale progress bar + on the parent's terminal. Only clears if running in a child process. + """ + if os.getpid() != _main_pid and tqdm._instances: + tqdm._instances.clear() + + +_main_pid = os.getpid() + + def get_quiet_mode(): """ Get the global quiet_mode """ From 1bdb18192b23c941238389b64fa5b38bd1353d87 Mon Sep 17 00:00:00 2001 From: Marcus Furlong Date: Wed, 22 Apr 2026 10:41:59 -0400 Subject: [PATCH 2/2] add fetch_concurrently helper function --- errata/sources/distros/debian.py | 33 ++++++++++++++++++++----------- errata/utils.py | 34 +++++++++++--------------------- util/__init__.py | 21 ++++++++++++++++++++ 3 files changed, 54 insertions(+), 34 deletions(-) diff --git a/errata/sources/distros/debian.py b/errata/sources/distros/debian.py index d508c325e..f14e37998 100644 --- a/errata/sources/distros/debian.py +++ b/errata/sources/distros/debian.py @@ -28,7 +28,8 @@ from packages.utils import find_evr, get_or_create_package from patchman.signals import pbar_start, pbar_update from util import ( - extract, fetch_content, get_setting_of_type, get_url, run_concurrently, + extract, fetch_concurrently, fetch_content, get_setting_of_type, get_url, + run_concurrently, ) from util.logging import clear_forked_pbar, error_message, warning_message @@ -47,7 +48,7 @@ def update_debian_errata(concurrent_processing=True, max_workers=25): advisories = dsas + dlas fetch_dscs_from_debian_package_file_maps() accepted_codenames = get_accepted_debian_codenames() - errata = parse_debian_errata(advisories, accepted_codenames) + errata = parse_debian_errata(advisories, accepted_codenames, concurrent_processing, max_workers) create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers) @@ -109,7 +110,7 @@ def parse_debian_package_file_map(data, repo): parsing_dsc = False -def parse_debian_errata(advisories, accepted_codenames): +def parse_debian_errata(advisories, accepted_codenames, concurrent_processing=True, max_workers=25): """ Parse Debian DSA/DLA files for security advisories """ distro_pattern = re.compile(r'^\t\[(.+?)\] - .*') @@ -145,7 +146,7 @@ def parse_debian_errata(advisories, accepted_codenames): e['packages'][release].append(None) # add the last one errata = add_errata_by_codename(errata, e, accepted_codenames) - fetch_debian_dsc_package_lists(dsc_fetches) + fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing, max_workers) return errata @@ -232,13 +233,23 @@ def process_debian_erratum(erratum, accepted_codenames): error_message(text=exc) -def fetch_debian_dsc_package_lists(dsc_fetches): +def fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing=True, max_workers=25): """ Fetch DSC package lists with a progress bar """ - pbar_start.send(sender=None, ptext=f'Fetching {len(dsc_fetches)} Debian DSC files', plen=len(dsc_fetches)) - for i, (package, version) in enumerate(dsc_fetches): - fetch_debian_dsc_package_list(package, version) - pbar_update.send(sender=None, index=i + 1) + flen = len(dsc_fetches) + pbar_start.send(sender=None, ptext=f'Fetching {flen} Debian DSC files', plen=flen) + if concurrent_processing: + for i, _ in enumerate(fetch_concurrently(fetch_dsc_worker, dsc_fetches, max_workers)): + pbar_update.send(sender=None, index=i + 1) + else: + for i, (package, version) in enumerate(dsc_fetches): + fetch_debian_dsc_package_list(package, version) + pbar_update.send(sender=None, index=i + 1) + + +def fetch_dsc_worker(item, session): + package, version = item + fetch_debian_dsc_package_list(package, version, session=session) def get_debian_dsc_package_list(package, version): @@ -251,14 +262,14 @@ def get_debian_dsc_package_list(package, version): return package_list -def fetch_debian_dsc_package_list(package, version): +def fetch_debian_dsc_package_list(package, version, session=None): """ Fetch the package list from a DSC file for a given source package/version """ if not DSCs.get(package) or not DSCs[package].get(version): warning_message(text=f'No DSC found for {package} {version}') return source_url = DSCs[package][version]['url'] - res = get_url(source_url) + res = get_url(source_url, session=session) data = res.content dsc = Dsc(data.decode()) package_list = dsc.get('package-list') diff --git a/errata/utils.py b/errata/utils.py index debc5ad69..7023e1fb3 100644 --- a/errata/utils.py +++ b/errata/utils.py @@ -14,16 +14,11 @@ # You should have received a copy of the GNU General Public License # along with Patchman. If not, see -import concurrent.futures - -import requests -from requests.adapters import HTTPAdapter - from errata.models import Erratum from packages.models import PackageUpdate from patchman.signals import pbar_start, pbar_update -from util import run_concurrently, tz_aware_datetime -from util.logging import error_message, warning_message +from util import fetch_concurrently, run_concurrently, tz_aware_datetime +from util.logging import warning_message def get_or_create_erratum(name, e_type, issue_date, synopsis): @@ -105,22 +100,11 @@ def enrich_errata(concurrent_processing=True, max_workers=25): pbar_start.send(sender=None, ptext=f'Fetching osv.dev data for {elen} Errata', plen=elen) results = [] if concurrent_processing: - session = requests.Session() - adapter = HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers) - session.mount('https://', adapter) - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(e.fetch_osv_dev_data, session): e for e in errata} - for i, future in enumerate(concurrent.futures.as_completed(futures)): - erratum = futures[future] - try: - osv_data = future.result() - except Exception as e: - error_message(text=f'Error fetching osv.dev data for {erratum}: {e}') - pbar_update.send(sender=None, index=i + 1) - continue - if osv_data is not None: - results.append((erratum, osv_data)) - pbar_update.send(sender=None, index=i + 1) + for i, result in enumerate(fetch_concurrently(fetch_osv_worker, errata, max_workers)): + erratum, osv_data = result + if osv_data is not None: + results.append((erratum, osv_data)) + pbar_update.send(sender=None, index=i + 1) else: for i, e in enumerate(errata): osv_data = e.fetch_osv_dev_data() @@ -135,3 +119,7 @@ def enrich_errata(concurrent_processing=True, max_workers=25): for i, (erratum, osv_data) in enumerate(results): erratum.parse_osv_dev_data(osv_data) pbar_update.send(sender=None, index=i + 1) + + +def fetch_osv_worker(erratum, session): + return (erratum, erratum.fetch_osv_dev_data(session)) diff --git a/util/__init__.py b/util/__init__.py index f9463ebc1..c0632d098 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -300,6 +300,27 @@ def get_datetime_now(): return datetime.now().astimezone().replace(microsecond=0) +def fetch_concurrently(func, items, max_workers=25): + """ Run func across items using threads with pooled HTTP sessions, + yielding results as they complete. Ideal for I/O-bound work + (network fetches). func(item, session) receives a shared + requests.Session with connection pooling. + """ + import concurrent.futures + + from requests.adapters import HTTPAdapter + + session = requests.Session() + adapter = HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers) + session.mount('https://', adapter) + session.mount('http://', adapter) + items = list(items) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(func, item, session): item for item in items} + for future in concurrent.futures.as_completed(futures): + yield future.result() + + def run_concurrently(func, items, max_workers=25): """ Run func across items using multiprocessing, yielding results as they complete. Uses multiprocessing.Pool on Python < 3.12 to avoid