Skip to content

Commit 79e74a3

Browse files
committed
add a fetch_concurrently helper function
1 parent f26229e commit 79e74a3

3 files changed

Lines changed: 54 additions & 34 deletions

File tree

errata/sources/distros/debian.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from packages.utils import find_evr, get_or_create_package
2929
from patchman.signals import pbar_start, pbar_update
3030
from util import (
31-
extract, fetch_content, get_setting_of_type, get_url, run_concurrently,
31+
extract, fetch_concurrently, fetch_content, get_setting_of_type, get_url,
32+
run_concurrently,
3233
)
3334
from util.logging import clear_forked_pbar, error_message, warning_message
3435

@@ -47,7 +48,7 @@ def update_debian_errata(concurrent_processing=True, max_workers=25):
4748
advisories = dsas + dlas
4849
fetch_dscs_from_debian_package_file_maps()
4950
accepted_codenames = get_accepted_debian_codenames()
50-
errata = parse_debian_errata(advisories, accepted_codenames)
51+
errata = parse_debian_errata(advisories, accepted_codenames, concurrent_processing, max_workers)
5152
create_debian_errata(errata, accepted_codenames, concurrent_processing, max_workers)
5253

5354

@@ -109,7 +110,7 @@ def parse_debian_package_file_map(data, repo):
109110
parsing_dsc = False
110111

111112

112-
def parse_debian_errata(advisories, accepted_codenames):
113+
def parse_debian_errata(advisories, accepted_codenames, concurrent_processing=True, max_workers=25):
113114
""" Parse Debian DSA/DLA files for security advisories
114115
"""
115116
distro_pattern = re.compile(r'^\t\[(.+?)\] - .*')
@@ -145,7 +146,7 @@ def parse_debian_errata(advisories, accepted_codenames):
145146
e['packages'][release].append(None)
146147
# add the last one
147148
errata = add_errata_by_codename(errata, e, accepted_codenames)
148-
fetch_debian_dsc_package_lists(dsc_fetches)
149+
fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing, max_workers)
149150
return errata
150151

151152

@@ -232,13 +233,23 @@ def process_debian_erratum(erratum, accepted_codenames):
232233
error_message(text=exc)
233234

234235

235-
def fetch_debian_dsc_package_lists(dsc_fetches):
236+
def fetch_debian_dsc_package_lists(dsc_fetches, concurrent_processing=True, max_workers=25):
236237
""" Fetch DSC package lists with a progress bar
237238
"""
238-
pbar_start.send(sender=None, ptext=f'Fetching {len(dsc_fetches)} Debian DSC files', plen=len(dsc_fetches))
239-
for i, (package, version) in enumerate(dsc_fetches):
240-
fetch_debian_dsc_package_list(package, version)
241-
pbar_update.send(sender=None, index=i + 1)
239+
flen = len(dsc_fetches)
240+
pbar_start.send(sender=None, ptext=f'Fetching {flen} Debian DSC files', plen=flen)
241+
if concurrent_processing:
242+
for i, _ in enumerate(fetch_concurrently(fetch_dsc_worker, dsc_fetches, max_workers)):
243+
pbar_update.send(sender=None, index=i + 1)
244+
else:
245+
for i, (package, version) in enumerate(dsc_fetches):
246+
fetch_debian_dsc_package_list(package, version)
247+
pbar_update.send(sender=None, index=i + 1)
248+
249+
250+
def fetch_dsc_worker(item, session):
251+
package, version = item
252+
fetch_debian_dsc_package_list(package, version, session=session)
242253

243254

244255
def get_debian_dsc_package_list(package, version):
@@ -251,14 +262,14 @@ def get_debian_dsc_package_list(package, version):
251262
return package_list
252263

253264

254-
def fetch_debian_dsc_package_list(package, version):
265+
def fetch_debian_dsc_package_list(package, version, session=None):
255266
""" Fetch the package list from a DSC file for a given source package/version
256267
"""
257268
if not DSCs.get(package) or not DSCs[package].get(version):
258269
warning_message(text=f'No DSC found for {package} {version}')
259270
return
260271
source_url = DSCs[package][version]['url']
261-
res = get_url(source_url)
272+
res = get_url(source_url, session=session)
262273
data = res.content
263274
dsc = Dsc(data.decode())
264275
package_list = dsc.get('package-list')

errata/utils.py

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with Patchman. If not, see <http://www.gnu.org/licenses/>
1616

17-
import concurrent.futures
18-
19-
import requests
20-
from requests.adapters import HTTPAdapter
21-
2217
from errata.models import Erratum
2318
from packages.models import PackageUpdate
2419
from patchman.signals import pbar_start, pbar_update
25-
from util import run_concurrently, tz_aware_datetime
26-
from util.logging import error_message, warning_message
20+
from util import fetch_concurrently, run_concurrently, tz_aware_datetime
21+
from util.logging import warning_message
2722

2823

2924
def get_or_create_erratum(name, e_type, issue_date, synopsis):
@@ -105,22 +100,11 @@ def enrich_errata(concurrent_processing=True, max_workers=25):
105100
pbar_start.send(sender=None, ptext=f'Fetching osv.dev data for {elen} Errata', plen=elen)
106101
results = []
107102
if concurrent_processing:
108-
session = requests.Session()
109-
adapter = HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers)
110-
session.mount('https://', adapter)
111-
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
112-
futures = {executor.submit(e.fetch_osv_dev_data, session): e for e in errata}
113-
for i, future in enumerate(concurrent.futures.as_completed(futures)):
114-
erratum = futures[future]
115-
try:
116-
osv_data = future.result()
117-
except Exception as e:
118-
error_message(text=f'Error fetching osv.dev data for {erratum}: {e}')
119-
pbar_update.send(sender=None, index=i + 1)
120-
continue
121-
if osv_data is not None:
122-
results.append((erratum, osv_data))
123-
pbar_update.send(sender=None, index=i + 1)
103+
for i, result in enumerate(fetch_concurrently(fetch_osv_worker, errata, max_workers)):
104+
erratum, osv_data = result
105+
if osv_data is not None:
106+
results.append((erratum, osv_data))
107+
pbar_update.send(sender=None, index=i + 1)
124108
else:
125109
for i, e in enumerate(errata):
126110
osv_data = e.fetch_osv_dev_data()
@@ -135,3 +119,7 @@ def enrich_errata(concurrent_processing=True, max_workers=25):
135119
for i, (erratum, osv_data) in enumerate(results):
136120
erratum.parse_osv_dev_data(osv_data)
137121
pbar_update.send(sender=None, index=i + 1)
122+
123+
124+
def fetch_osv_worker(erratum, session):
125+
return (erratum, erratum.fetch_osv_dev_data(session))

util/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,27 @@ def get_datetime_now():
300300
return datetime.now().astimezone().replace(microsecond=0)
301301

302302

303+
def fetch_concurrently(func, items, max_workers=25):
304+
""" Run func across items using threads with pooled HTTP sessions,
305+
yielding results as they complete. Ideal for I/O-bound work
306+
(network fetches). func(item, session) receives a shared
307+
requests.Session with connection pooling.
308+
"""
309+
import concurrent.futures
310+
311+
from requests.adapters import HTTPAdapter
312+
313+
session = requests.Session()
314+
adapter = HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers)
315+
session.mount('https://', adapter)
316+
session.mount('http://', adapter)
317+
items = list(items)
318+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
319+
futures = {executor.submit(func, item, session): item for item in items}
320+
for future in concurrent.futures.as_completed(futures):
321+
yield future.result()
322+
323+
303324
def run_concurrently(func, items, max_workers=25):
304325
""" Run func across items using multiprocessing, yielding results as
305326
they complete. Uses multiprocessing.Pool on Python < 3.12 to avoid

0 commit comments

Comments
 (0)