From c66970818bf0468762433bb1d368a72d57ae8d47 Mon Sep 17 00:00:00 2001 From: Marcus Furlong Date: Wed, 1 Apr 2026 23:11:38 -0400 Subject: [PATCH] switch osv.dev enrichment to two-phase fetch/parse with connection pooling --- errata/models.py | 25 +++++++++++++++++++------ errata/utils.py | 34 ++++++++++++++++++++++++++-------- util/__init__.py | 5 +++-- 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/errata/models.py b/errata/models.py index 4b09deeb..4d964538 100644 --- a/errata/models.py +++ b/errata/models.py @@ -93,15 +93,28 @@ def scan_for_security_updates(self): # marked as a security update, so delete this one affected_update.delete() - def fetch_osv_dev_data(self): + def fetch_osv_dev_data(self, session=None): + """ Fetch osv.dev JSON for this erratum. Returns parsed JSON or None. + """ + from util.logging import debug_message osv_dev_url = f'https://api.osv.dev/v1/vulns/{self.name}' - res = get_url(osv_dev_url) + debug_message(text=f'Trying {osv_dev_url}') + try: + if session: + res = session.get(osv_dev_url, timeout=30) + else: + res = get_url(osv_dev_url) + if res is None: + error_message(text=f'No response - Skipping {self.name} - {osv_dev_url}') + return None + except Exception as e: + error_message(text=f'Error fetching {osv_dev_url}: {e}') + return None + debug_message(text=f'{res.status_code}: {res.headers}') if res.status_code == 404: - error_message(text=f'404 - Skipping {self.name} - {osv_dev_url}') - return + return None data = res.content - osv_dev_json = json.loads(data) - self.parse_osv_dev_data(osv_dev_json) + return json.loads(data) def parse_osv_dev_data(self, osv_dev_json): name = osv_dev_json.get('id') diff --git a/errata/utils.py b/errata/utils.py index e0a5e01b..aa39ac24 100644 --- a/errata/utils.py +++ b/errata/utils.py @@ -91,12 +91,30 @@ def scan_package_updates_for_affected_packages(): def enrich_errata(): """ Enrich Errata with data from osv.dev """ - connections.close_all() - elen = Erratum.objects.count() - pbar_start.send(sender=None, ptext=f'Adding osv.dev data to {elen} Errata', plen=elen) - i = 0 - with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor: - futures = [executor.submit(e.fetch_osv_dev_data) for e in Erratum.objects.all()] - for future in concurrent.futures.as_completed(futures): + import requests + from requests.adapters import HTTPAdapter + errata = list(Erratum.objects.all()) + elen = len(errata) + + # phase 1: fetch osv.dev data + pbar_start.send(sender=None, ptext=f'Fetching osv.dev data for {elen} Errata', plen=elen) + results = [] + session = requests.Session() + adapter = HTTPAdapter(pool_connections=25, pool_maxsize=25) + session.mount('https://', adapter) + with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor: + futures = {executor.submit(e.fetch_osv_dev_data, session): e for e in errata} + for i, future in enumerate(concurrent.futures.as_completed(futures)): + erratum = futures[future] + osv_data = future.result() + if osv_data is not None: + results.append((erratum, osv_data)) + pbar_update.send(sender=None, index=i + 1) + + # phase 2: parse and write to db (serial, no lock contention) + rlen = len(results) + if rlen > 0: + pbar_start.send(sender=None, ptext=f'Parsing osv.dev data for {rlen} Errata', plen=rlen) + for i, (erratum, osv_data) in enumerate(results): + erratum.parse_osv_dev_data(osv_data) pbar_update.send(sender=None, index=i + 1) - i += 1 diff --git a/util/__init__.py b/util/__init__.py index 7797f522..fa5c184a 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -101,7 +101,7 @@ def fetch_content(response, text='', ljust=35): wait=wait_exponential(multiplier=1, min=1, max=10), reraise=False, ) -def get_url(url, headers=None, params=None): +def get_url(url, headers=None, params=None, session=None): """ Perform a http GET on a URL. Return None on error. """ response = None @@ -109,9 +109,10 @@ def get_url(url, headers=None, params=None): headers = {} if not params: params = {} + requester = session or requests try: debug_message(text=f'Trying {url} headers:{headers} params:{params}') - response = requests.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30) + response = requester.get(url, headers=headers, params=params, stream=True, proxies=proxies, timeout=30) debug_message(text=f'{response.status_code}: {response.headers}') if response.status_code in [403, 404]: return response