Skip to content

Commit 99ff200

Browse files
committed
add concurrent processing with deadlock workaround and progress bar fixes
- add run_concurrently() helper with multiprocessing.Pool fallback for python < 3.12 (CPython #105829) - add CONCURRENT_PROCESSING and CONCURRENT_WORKERS settings - add max_workers param and serial fallback to all errata sources - switch osv.dev enrichment to two-phase fetch/parse with connection pooling - add exception handling for threadpool futures in enrich_errata - switch arch to ThreadPoolExecutor, default serial to avoid rate limiting - add null guards for arch errata fetch failures - disable tqdm TMonitor thread with monitor_interval = 0 - add clear_forked_pbar to wrapper functions for forked workers - add prefetch_related to alma modules, batch module adds for rocky
1 parent 847964a commit 99ff200

12 files changed

Lines changed: 266 additions & 153 deletions

File tree

errata/models.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,16 @@ def _mark_updates_security(self, updates):
9191
error_message(text=e)
9292
update.delete()
9393

94-
def fetch_osv_dev_data(self):
94+
def fetch_osv_dev_data(self, session=None):
95+
""" Fetch osv.dev JSON for this erratum. Returns parsed JSON or None.
96+
"""
9597
osv_dev_url = f'https://api.osv.dev/v1/vulns/{self.name}'
96-
res = get_url(osv_dev_url)
98+
res = get_url(osv_dev_url, session=session)
99+
if res is None:
100+
return None
97101
if res.status_code == 404:
98-
error_message(text=f'404 - Skipping {self.name} - {osv_dev_url}')
99-
return
100-
data = res.content
101-
osv_dev_json = json.loads(data)
102-
self.parse_osv_dev_data(osv_dev_json)
102+
return None
103+
return json.loads(res.content)
103104

104105
def parse_osv_dev_data(self, osv_dev_json):
105106
from django.db.models import Q

errata/sources/distros/alma.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with Patchman. If not, see <http://www.gnu.org/licenses/>
1616

17-
import concurrent.futures
1817
import json
1918

20-
from django.db import connections
21-
22-
from operatingsystems.utils import get_or_create_osrelease
19+
from errata.utils import get_or_create_erratum
20+
from modules.utils import get_matching_modules
21+
from operatingsystems.utils import (
22+
get_or_create_osrelease, normalize_el_osrelease,
23+
)
2324
from packages.models import Package
2425
from packages.utils import get_or_create_package, parse_package_string
2526
from patchman.signals import pbar_start, pbar_update
26-
from util import fetch_content, get_setting_of_type, get_url
27+
from util import fetch_content, get_setting_of_type, get_url, run_concurrently
28+
from util.logging import clear_forked_pbar
2729

2830

29-
def update_alma_errata(concurrent_processing=True):
31+
def update_alma_errata(concurrent_processing=True, max_workers=25):
3032
""" Update Alma Linux advisories from errata.almalinux.org:
3133
https://errata.almalinux.org/8/errata.full.json
3234
https://errata.almalinux.org/9/errata.full.json
@@ -40,7 +42,7 @@ def update_alma_errata(concurrent_processing=True):
4042
)
4143
for release in alma_releases:
4244
advisories = fetch_alma_advisories(release)
43-
process_alma_errata(release, advisories, concurrent_processing)
45+
process_alma_errata(release, advisories, concurrent_processing, max_workers)
4446

4547

4648
def fetch_alma_advisories(release):
@@ -54,11 +56,11 @@ def fetch_alma_advisories(release):
5456
return advisories
5557

5658

57-
def process_alma_errata(release, advisories, concurrent_processing):
59+
def process_alma_errata(release, advisories, concurrent_processing, max_workers=25):
5860
""" Process Alma Linux Errata
5961
"""
6062
if concurrent_processing:
61-
process_alma_errata_concurrently(release, advisories)
63+
process_alma_errata_concurrently(release, advisories, max_workers)
6264
else:
6365
process_alma_errata_serially(release, advisories)
6466

@@ -73,24 +75,24 @@ def process_alma_errata_serially(release, advisories):
7375
pbar_update.send(sender=None, index=i + 1)
7476

7577

76-
def process_alma_errata_concurrently(release, advisories):
78+
def process_alma_errata_concurrently(release, advisories, max_workers=25):
7779
""" Process Alma Linux Errata concurrently
7880
"""
79-
connections.close_all()
8081
elen = len(advisories)
8182
pbar_start.send(sender=None, ptext=f'Processing {elen} Alma {release} Errata', plen=elen)
82-
i = 0
83-
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
84-
futures = [executor.submit(process_alma_erratum, release, advisory) for advisory in advisories]
85-
for future in concurrent.futures.as_completed(futures):
86-
i += 1
87-
pbar_update.send(sender=None, index=i + 1)
83+
args = [(release, advisory) for advisory in advisories]
84+
for i, _ in enumerate(run_concurrently(_process_alma_erratum_wrapper, args, max_workers)):
85+
pbar_update.send(sender=None, index=i + 1)
86+
87+
88+
def _process_alma_erratum_wrapper(args):
89+
clear_forked_pbar()
90+
return process_alma_erratum(*args)
8891

8992

9093
def process_alma_erratum(release, advisory):
9194
""" Process a single Alma Linux Erratum
9295
"""
93-
from errata.utils import get_or_create_erratum
9496
erratum_name = advisory.get('id')
9597
issue_date = advisory.get('issued_date')
9698
synopsis = advisory.get('title')
@@ -110,7 +112,6 @@ def process_alma_erratum(release, advisory):
110112
def add_alma_erratum_osreleases(e, release):
111113
""" Update OS Release for Alma Linux errata
112114
"""
113-
from operatingsystems.utils import normalize_el_osrelease
114115
osrelease_name = normalize_el_osrelease(f'Alma Linux {release}')
115116
osrelease = get_or_create_osrelease(name=osrelease_name)
116117
e.osreleases.add(osrelease)
@@ -150,7 +151,6 @@ def add_alma_erratum_packages(e, advisory):
150151
def add_alma_erratum_modules(e, advisory):
151152
""" Parse and add modules for Alma Linux errata
152153
"""
153-
from modules.utils import get_matching_modules
154154
fixed_packages = set()
155155
modules = advisory.get('modules')
156156
for module in modules:
@@ -160,8 +160,7 @@ def add_alma_erratum_modules(e, advisory):
160160
stream = module.get('stream')
161161
version = module.get('version')
162162
matching_modules = get_matching_modules(name, stream, version, context, arch)
163-
for match in matching_modules:
163+
for match in matching_modules.prefetch_related('packages'):
164164
for fixed_package in match.packages.all():
165-
match.packages.add(fixed_package)
166165
fixed_packages.add(fixed_package)
167166
e.add_fixed_packages(fixed_packages)

errata/sources/distros/arch.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,25 @@
1717
import concurrent.futures
1818
import json
1919

20-
from django.db import connections
21-
20+
from errata.utils import get_or_create_erratum
2221
from operatingsystems.utils import get_or_create_osrelease
2322
from packages.models import Package
2423
from packages.utils import (
2524
find_evr, get_matching_packages, get_or_create_package,
2625
)
2726
from patchman.signals import pbar_start, pbar_update
2827
from util import fetch_content, get_url
29-
from util.logging import error_message
28+
from util.logging import clear_forked_pbar, error_message
3029

3130

32-
def update_arch_errata(concurrent_processing=False):
31+
def update_arch_errata(concurrent_processing=False, max_workers=25):
3332
""" Update Arch Linux Errata from the following sources:
3433
https://security.archlinux.org/advisories.json
3534
"""
3635
add_arch_linux_osrelease()
3736
advisories = fetch_arch_errata()
38-
parse_arch_errata(advisories, concurrent_processing)
37+
if advisories:
38+
parse_arch_errata(advisories, concurrent_processing, max_workers)
3939

4040

4141
def fetch_arch_errata():
@@ -44,14 +44,16 @@ def fetch_arch_errata():
4444
"""
4545
res = get_url('https://security.archlinux.org/advisories.json')
4646
advisories = fetch_content(res, 'Fetching Arch Advisories')
47+
if advisories is None:
48+
return None
4749
return json.loads(advisories)
4850

4951

50-
def parse_arch_errata(advisories, concurrent_processing):
52+
def parse_arch_errata(advisories, concurrent_processing, max_workers=25):
5153
""" Parse Arch Linux Errata Advisories
5254
"""
5355
if concurrent_processing:
54-
parse_arch_errata_concurrently(advisories)
56+
parse_arch_errata_concurrently(advisories, max_workers)
5557
else:
5658
parse_arch_errata_serially(advisories)
5759

@@ -67,15 +69,14 @@ def parse_arch_errata_serially(advisories):
6769
pbar_update.send(sender=None, index=i + 1)
6870

6971

70-
def parse_arch_errata_concurrently(advisories):
72+
def parse_arch_errata_concurrently(advisories, max_workers=25):
7173
""" Parse Arch Linux Errata Advisories concurrently
7274
"""
7375
osrelease = get_or_create_osrelease(name='Arch Linux')
74-
connections.close_all()
7576
elen = len(advisories)
7677
pbar_start.send(sender=None, ptext=f'Processing {elen} Arch Advisories', plen=elen)
7778
i = 0
78-
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
79+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
7980
futures = [executor.submit(process_arch_erratum, advisory, osrelease) for advisory in advisories]
8081
for future in concurrent.futures.as_completed(futures):
8182
i += 1
@@ -85,7 +86,7 @@ def parse_arch_errata_concurrently(advisories):
8586
def process_arch_erratum(advisory, osrelease):
8687
""" Process a single Arch Linux Erratum
8788
"""
88-
from errata.utils import get_or_create_erratum
89+
clear_forked_pbar()
8990
try:
9091
name = advisory.get('name')
9192
issue_date = advisory.get('date')
@@ -121,6 +122,8 @@ def add_arch_erratum_references(e, advisory):
121122
e.add_reference('ASA', url)
122123
raw_url = f'{url}/raw'
123124
res = get_url(raw_url)
125+
if res is None:
126+
return
124127
data = res.content
125128
parse_arch_erratum_raw(e, data.decode())
126129

@@ -152,6 +155,8 @@ def add_arch_erratum_packages(e, advisory):
152155
group_id = advisory.get('group')
153156
group_url = f'https://security.archlinux.org/group/{group_id}.json'
154157
res = get_url(group_url)
158+
if res is None:
159+
return
155160
data = res.content
156161
group = json.loads(data)
157162
packages = group.get('packages')

errata/sources/distros/debian.py

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,28 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with Patchman. If not, see <http://www.gnu.org/licenses/>
1616

17-
import concurrent.futures
1817
import csv
1918
import re
2019
from datetime import datetime
2120
from io import StringIO
2221

2322
from debian.deb822 import Dsc
24-
from django.db import connections
2523

24+
from errata.utils import get_or_create_erratum
2625
from operatingsystems.models import OSRelease
2726
from operatingsystems.utils import get_or_create_osrelease
2827
from packages.models import Package
2928
from packages.utils import find_evr, get_or_create_package
3029
from patchman.signals import pbar_start, pbar_update
31-
from util import extract, fetch_content, get_setting_of_type, get_url
32-
from util.logging import error_message, warning_message
30+
from util import (
31+
extract, fetch_content, get_setting_of_type, get_url, run_concurrently,
32+
)
33+
from util.logging import clear_forked_pbar, error_message, warning_message
3334

3435
DSCs = {}
3536

3637

37-
def update_debian_errata(concurrent_processing=True):
38+
def update_debian_errata(concurrent_processing=True, max_workers=25):
3839
""" Update Debian errata using:
3940
https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list
4041
https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/DSA/list
@@ -47,7 +48,7 @@ def update_debian_errata(concurrent_processing=True):
4748
fetch_dscs_from_debian_package_file_maps()
4849
accepted_codenames = get_accepted_debian_codenames()
4950
errata = parse_debian_errata(advisories, accepted_codenames)
50-
create_debian_errata(errata, accepted_codenames, concurrent_processing)
51+
create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers)
5152

5253

5354
def fetch_debian_dsa_advisories():
@@ -113,7 +114,9 @@ def parse_debian_errata(advisories, accepted_codenames):
113114
"""
114115
distro_pattern = re.compile(r'^\t\[(.+?)\] - .*')
115116
title_pattern = re.compile(r'^\[(.+?)\] (.+?) (.+?)[ ]+[-]+ (.*)')
117+
distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)')
116118
errata = []
119+
dsc_fetches = []
117120
e = {'packages': {}, 'cve_ids': [], 'releases': []}
118121
for line in advisories.splitlines():
119122
if line.startswith('['):
@@ -132,9 +135,17 @@ def parse_debian_errata(advisories, accepted_codenames):
132135
e['releases'].append(release)
133136
if not e.get('packages').get(release):
134137
e['packages'][release] = []
135-
e['packages'][release].append(parse_debian_erratum_package(line, accepted_codenames))
138+
pkg_match = re.match(distro_package_pattern, line)
139+
if pkg_match and pkg_match.group(1) in accepted_codenames:
140+
source_package = pkg_match.group(2)
141+
source_version = pkg_match.group(3)
142+
dsc_fetches.append((source_package, source_version))
143+
e['packages'][release].append((source_package, source_version))
144+
else:
145+
e['packages'][release].append(None)
136146
# add the last one
137147
errata = add_errata_by_codename(errata, e, accepted_codenames)
148+
fetch_debian_dsc_package_lists(dsc_fetches)
138149
return errata
139150

140151

@@ -162,11 +173,11 @@ def parse_debian_erratum_advisory(e, match):
162173
return e
163174

164175

165-
def create_debian_errata(errata, accepted_codenames, concurrent_processing):
176+
def create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers=25):
166177
""" Create Debian Errata
167178
"""
168179
if concurrent_processing:
169-
create_debian_errata_concurrently(errata, accepted_codenames)
180+
create_debian_errata_concurrently(errata, accepted_codenames, max_workers)
170181
else:
171182
create_debian_errata_serially(errata, accepted_codenames)
172183

@@ -181,25 +192,25 @@ def create_debian_errata_serially(errata, accepted_codenames):
181192
pbar_update.send(sender=None, index=i + 1)
182193

183194

184-
def create_debian_errata_concurrently(errata, accepted_codenames):
195+
def create_debian_errata_concurrently(errata, accepted_codenames, max_workers=25):
185196
""" Create Debian Errata concurrently
186197
"""
187-
connections.close_all()
188198
elen = len(errata)
189199
pbar_start.send(sender=None, ptext=f'Processing {elen} Debian Errata', plen=elen)
190-
i = 0
191-
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
192-
futures = [executor.submit(process_debian_erratum, erratum, accepted_codenames) for erratum in errata]
193-
for future in concurrent.futures.as_completed(futures):
194-
i += 1
195-
pbar_update.send(sender=None, index=i + 1)
200+
args = [(erratum, accepted_codenames) for erratum in errata]
201+
for i, _ in enumerate(run_concurrently(_process_debian_erratum_wrapper, args, max_workers)):
202+
pbar_update.send(sender=None, index=i + 1)
203+
204+
205+
def _process_debian_erratum_wrapper(args):
206+
clear_forked_pbar()
207+
return process_debian_erratum(*args)
196208

197209

198210
def process_debian_erratum(erratum, accepted_codenames):
199211
""" Process a single Debian Erratum
200212
"""
201213
try:
202-
from errata.utils import get_or_create_erratum
203214
erratum_name = erratum.get('name')
204215
e, created = get_or_create_erratum(
205216
name=erratum_name,
@@ -221,19 +232,13 @@ def process_debian_erratum(erratum, accepted_codenames):
221232
error_message(text=exc)
222233

223234

224-
def parse_debian_erratum_package(line, accepted_codenames):
225-
""" Parse the codename and source package from a DSA/DLA file
226-
Returns the source package and source version
235+
def fetch_debian_dsc_package_lists(dsc_fetches):
236+
""" Fetch DSC package lists with a progress bar
227237
"""
228-
distro_package_pattern = re.compile(r'^\t\[(.+?)\] - (.+?) (.*)')
229-
match = re.match(distro_package_pattern, line)
230-
if match:
231-
codename = match.group(1)
232-
if codename in accepted_codenames:
233-
source_package = match.group(2)
234-
source_version = match.group(3)
235-
fetch_debian_dsc_package_list(source_package, source_version)
236-
return source_package, source_version
238+
pbar_start.send(sender=None, ptext=f'Fetching {len(dsc_fetches)} Debian DSC files', plen=len(dsc_fetches))
239+
for i, (package, version) in enumerate(dsc_fetches):
240+
fetch_debian_dsc_package_list(package, version)
241+
pbar_update.send(sender=None, index=i + 1)
237242

238243

239244
def get_debian_dsc_package_list(package, version):
@@ -301,6 +306,8 @@ def create_debian_os_releases(codename_to_version):
301306
def process_debian_erratum_fixed_packages(e, package_data):
302307
""" Process packages fixed in a Debian errata
303308
"""
309+
if not package_data:
310+
return
304311
source_package, source_version = package_data
305312
epoch, ver, rel = find_evr(source_version)
306313
package_list = get_debian_dsc_package_list(source_package, source_version)

0 commit comments

Comments
 (0)