Skip to content

Commit 35d68e7

Browse files
committed
add concurrent processing with deadlock workaround and progress bar fixes
- add run_concurrently() helper with multiprocessing.Pool fallback for python < 3.12 (CPython #105829) - add CONCURRENT_PROCESSING and CONCURRENT_WORKERS settings - add max_workers param and serial fallback to all errata sources - switch osv.dev enrichment to two-phase fetch/parse with connection pooling - add exception handling for threadpool futures in enrich_errata - switch arch to ThreadPoolExecutor, default serial to avoid rate limiting - add null guards for arch errata fetch failures - disable tqdm TMonitor thread with monitor_interval = 0 - add clear_forked_pbar to wrapper functions for forked workers - add prefetch_related to alma modules, batch module adds for rocky - add mininterval=0.5 to tqdm progress bars - move function-level imports to top-level in errata sources
1 parent 847964a commit 35d68e7

12 files changed

Lines changed: 277 additions & 153 deletions

File tree

errata/models.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from security.models import CVE, Reference
2626
from security.utils import get_or_create_cve, get_or_create_reference
2727
from util import get_url
28-
from util.logging import error_message
28+
from util.logging import debug_message, error_message
2929

3030

3131
class Erratum(models.Model):
@@ -91,15 +91,27 @@ def _mark_updates_security(self, updates):
9191
error_message(text=e)
9292
update.delete()
9393

94-
def fetch_osv_dev_data(self):
94+
def fetch_osv_dev_data(self, session=None):
95+
""" Fetch osv.dev JSON for this erratum. Returns parsed JSON or None.
96+
"""
9597
osv_dev_url = f'https://api.osv.dev/v1/vulns/{self.name}'
96-
res = get_url(osv_dev_url)
98+
debug_message(text=f'Trying {osv_dev_url}')
99+
try:
100+
if session:
101+
res = session.get(osv_dev_url, timeout=30)
102+
else:
103+
res = get_url(osv_dev_url)
104+
if res is None:
105+
error_message(text=f'No response - Skipping {self.name} - {osv_dev_url}')
106+
return None
107+
except Exception as e:
108+
error_message(text=f'Error fetching {osv_dev_url}: {e}')
109+
return None
110+
debug_message(text=f'{res.status_code}: {res.headers}')
97111
if res.status_code == 404:
98-
error_message(text=f'404 - Skipping {self.name} - {osv_dev_url}')
99-
return
112+
return None
100113
data = res.content
101-
osv_dev_json = json.loads(data)
102-
self.parse_osv_dev_data(osv_dev_json)
114+
return json.loads(data)
103115

104116
def parse_osv_dev_data(self, osv_dev_json):
105117
from django.db.models import Q

errata/sources/distros/alma.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with Patchman. If not, see <http://www.gnu.org/licenses/>
1616

17-
import concurrent.futures
1817
import json
1918

20-
from django.db import connections
21-
22-
from operatingsystems.utils import get_or_create_osrelease
19+
from errata.utils import get_or_create_erratum
20+
from modules.utils import get_matching_modules
21+
from operatingsystems.utils import (
22+
get_or_create_osrelease, normalize_el_osrelease,
23+
)
2324
from packages.models import Package
2425
from packages.utils import get_or_create_package, parse_package_string
2526
from patchman.signals import pbar_start, pbar_update
26-
from util import fetch_content, get_setting_of_type, get_url
27+
from util import fetch_content, get_setting_of_type, get_url, run_concurrently
28+
from util.logging import clear_forked_pbar
2729

2830

29-
def update_alma_errata(concurrent_processing=True):
31+
def update_alma_errata(concurrent_processing=True, max_workers=25):
3032
""" Update Alma Linux advisories from errata.almalinux.org:
3133
https://errata.almalinux.org/8/errata.full.json
3234
https://errata.almalinux.org/9/errata.full.json
@@ -40,7 +42,7 @@ def update_alma_errata(concurrent_processing=True):
4042
)
4143
for release in alma_releases:
4244
advisories = fetch_alma_advisories(release)
43-
process_alma_errata(release, advisories, concurrent_processing)
45+
process_alma_errata(release, advisories, concurrent_processing, max_workers)
4446

4547

4648
def fetch_alma_advisories(release):
@@ -54,11 +56,11 @@ def fetch_alma_advisories(release):
5456
return advisories
5557

5658

57-
def process_alma_errata(release, advisories, concurrent_processing):
59+
def process_alma_errata(release, advisories, concurrent_processing, max_workers=25):
5860
""" Process Alma Linux Errata
5961
"""
6062
if concurrent_processing:
61-
process_alma_errata_concurrently(release, advisories)
63+
process_alma_errata_concurrently(release, advisories, max_workers)
6264
else:
6365
process_alma_errata_serially(release, advisories)
6466

@@ -73,24 +75,24 @@ def process_alma_errata_serially(release, advisories):
7375
pbar_update.send(sender=None, index=i + 1)
7476

7577

76-
def process_alma_errata_concurrently(release, advisories):
78+
def process_alma_errata_concurrently(release, advisories, max_workers=25):
7779
""" Process Alma Linux Errata concurrently
7880
"""
79-
connections.close_all()
8081
elen = len(advisories)
8182
pbar_start.send(sender=None, ptext=f'Processing {elen} Alma {release} Errata', plen=elen)
82-
i = 0
83-
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
84-
futures = [executor.submit(process_alma_erratum, release, advisory) for advisory in advisories]
85-
for future in concurrent.futures.as_completed(futures):
86-
i += 1
87-
pbar_update.send(sender=None, index=i + 1)
83+
args = [(release, advisory) for advisory in advisories]
84+
for i, _ in enumerate(run_concurrently(_process_alma_erratum_wrapper, args, max_workers)):
85+
pbar_update.send(sender=None, index=i + 1)
86+
87+
88+
def _process_alma_erratum_wrapper(args):
89+
clear_forked_pbar()
90+
return process_alma_erratum(*args)
8891

8992

9093
def process_alma_erratum(release, advisory):
9194
""" Process a single Alma Linux Erratum
9295
"""
93-
from errata.utils import get_or_create_erratum
9496
erratum_name = advisory.get('id')
9597
issue_date = advisory.get('issued_date')
9698
synopsis = advisory.get('title')
@@ -110,7 +112,6 @@ def process_alma_erratum(release, advisory):
110112
def add_alma_erratum_osreleases(e, release):
111113
""" Update OS Release for Alma Linux errata
112114
"""
113-
from operatingsystems.utils import normalize_el_osrelease
114115
osrelease_name = normalize_el_osrelease(f'Alma Linux {release}')
115116
osrelease = get_or_create_osrelease(name=osrelease_name)
116117
e.osreleases.add(osrelease)
@@ -150,7 +151,6 @@ def add_alma_erratum_packages(e, advisory):
150151
def add_alma_erratum_modules(e, advisory):
151152
""" Parse and add modules for Alma Linux errata
152153
"""
153-
from modules.utils import get_matching_modules
154154
fixed_packages = set()
155155
modules = advisory.get('modules')
156156
for module in modules:
@@ -160,8 +160,7 @@ def add_alma_erratum_modules(e, advisory):
160160
stream = module.get('stream')
161161
version = module.get('version')
162162
matching_modules = get_matching_modules(name, stream, version, context, arch)
163-
for match in matching_modules:
163+
for match in matching_modules.prefetch_related('packages'):
164164
for fixed_package in match.packages.all():
165-
match.packages.add(fixed_package)
166165
fixed_packages.add(fixed_package)
167166
e.add_fixed_packages(fixed_packages)

errata/sources/distros/arch.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,25 @@
1717
import concurrent.futures
1818
import json
1919

20-
from django.db import connections
21-
20+
from errata.utils import get_or_create_erratum
2221
from operatingsystems.utils import get_or_create_osrelease
2322
from packages.models import Package
2423
from packages.utils import (
2524
find_evr, get_matching_packages, get_or_create_package,
2625
)
2726
from patchman.signals import pbar_start, pbar_update
2827
from util import fetch_content, get_url
29-
from util.logging import error_message
28+
from util.logging import clear_forked_pbar, error_message
3029

3130

32-
def update_arch_errata(concurrent_processing=False):
31+
def update_arch_errata(concurrent_processing=False, max_workers=25):
3332
""" Update Arch Linux Errata from the following sources:
3433
https://security.archlinux.org/advisories.json
3534
"""
3635
add_arch_linux_osrelease()
3736
advisories = fetch_arch_errata()
38-
parse_arch_errata(advisories, concurrent_processing)
37+
if advisories:
38+
parse_arch_errata(advisories, concurrent_processing, max_workers)
3939

4040

4141
def fetch_arch_errata():
@@ -44,14 +44,16 @@ def fetch_arch_errata():
4444
"""
4545
res = get_url('https://security.archlinux.org/advisories.json')
4646
advisories = fetch_content(res, 'Fetching Arch Advisories')
47+
if advisories is None:
48+
return None
4749
return json.loads(advisories)
4850

4951

50-
def parse_arch_errata(advisories, concurrent_processing):
52+
def parse_arch_errata(advisories, concurrent_processing, max_workers=25):
5153
""" Parse Arch Linux Errata Advisories
5254
"""
5355
if concurrent_processing:
54-
parse_arch_errata_concurrently(advisories)
56+
parse_arch_errata_concurrently(advisories, max_workers)
5557
else:
5658
parse_arch_errata_serially(advisories)
5759

@@ -67,15 +69,14 @@ def parse_arch_errata_serially(advisories):
6769
pbar_update.send(sender=None, index=i + 1)
6870

6971

70-
def parse_arch_errata_concurrently(advisories):
72+
def parse_arch_errata_concurrently(advisories, max_workers=25):
7173
""" Parse Arch Linux Errata Advisories concurrently
7274
"""
7375
osrelease = get_or_create_osrelease(name='Arch Linux')
74-
connections.close_all()
7576
elen = len(advisories)
7677
pbar_start.send(sender=None, ptext=f'Processing {elen} Arch Advisories', plen=elen)
7778
i = 0
78-
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
79+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
7980
futures = [executor.submit(process_arch_erratum, advisory, osrelease) for advisory in advisories]
8081
for future in concurrent.futures.as_completed(futures):
8182
i += 1
@@ -85,7 +86,7 @@ def parse_arch_errata_concurrently(advisories):
8586
def process_arch_erratum(advisory, osrelease):
8687
""" Process a single Arch Linux Erratum
8788
"""
88-
from errata.utils import get_or_create_erratum
89+
clear_forked_pbar()
8990
try:
9091
name = advisory.get('name')
9192
issue_date = advisory.get('date')
@@ -121,6 +122,8 @@ def add_arch_erratum_references(e, advisory):
121122
e.add_reference('ASA', url)
122123
raw_url = f'{url}/raw'
123124
res = get_url(raw_url)
125+
if res is None:
126+
return
124127
data = res.content
125128
parse_arch_erratum_raw(e, data.decode())
126129

@@ -152,6 +155,8 @@ def add_arch_erratum_packages(e, advisory):
152155
group_id = advisory.get('group')
153156
group_url = f'https://security.archlinux.org/group/{group_id}.json'
154157
res = get_url(group_url)
158+
if res is None:
159+
return
155160
data = res.content
156161
group = json.loads(data)
157162
packages = group.get('packages')

0 commit comments

Comments
 (0)