Skip to content

Commit 6b61e7e

Browse files
committed
Separate pipelines for recomputing and removing duplicates
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent 060af18 commit 6b61e7e

File tree

6 files changed

+612
-137
lines changed

6 files changed

+612
-137
lines changed

vulnerabilities/improvers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from vulnerabilities.pipelines import enhance_with_kev
1919
from vulnerabilities.pipelines import enhance_with_metasploit
2020
from vulnerabilities.pipelines import flag_ghost_packages
21+
from vulnerabilities.pipelines import recompute_content_ids
2122
from vulnerabilities.pipelines import remove_duplicate_advisories
22-
2323
IMPROVERS_REGISTRY = [
2424
valid_versions.GitHubBasicImprover,
2525
valid_versions.GitLabBasicImprover,
@@ -46,6 +46,7 @@
4646
compute_package_version_rank.ComputeVersionRankPipeline,
4747
collect_commits.CollectFixCommitsPipeline,
4848
add_cvss31_to_CVEs.CVEAdvisoryMappingPipeline,
49+
recompute_content_ids.RecomputeContentIDPipeline,
4950
remove_duplicate_advisories.RemoveDuplicateAdvisoriesPipeline,
5051
]
5152

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
import multiprocessing
12+
import os
13+
import warnings
14+
from concurrent import futures
15+
16+
from aboutcode.pipeline import LoopProgress
17+
from django.core.paginator import Paginator
18+
from django.db import transaction
19+
20+
from vulnerabilities.models import Advisory
21+
from vulnerabilities.pipelines import VulnerableCodePipeline
22+
from vulnerabilities.utils import compute_content_id
23+
from vulnerablecode import settings
24+
25+
logger = logging.getLogger("scanpipe.pipes")
26+
27+
28+
def get_max_workers(keep_available=4):
29+
"""
30+
Return the `VULNERABLECODE_PROCESSES` if defined in the setting,
31+
or returns a default value based on the number of available CPUs,
32+
minus the provided `keep_available` value.
33+
34+
On operating system where the multiprocessing start method is not "fork",
35+
but for example "spawn", such as on macOS, multiprocessing and threading are
36+
disabled by default returning -1 `max_workers`.
37+
"""
38+
processes_from_settings = settings.VULNERABLECODE_PROCESSES
39+
if processes_from_settings in [-1, 0, 1]:
40+
return processes_from_settings
41+
42+
if multiprocessing.get_start_method() != "fork":
43+
return -1
44+
45+
max_workers = os.cpu_count() - keep_available
46+
if max_workers < 1:
47+
return 1
48+
49+
if processes_from_settings is not None:
50+
if processes_from_settings <= max_workers:
51+
return processes_from_settings
52+
else:
53+
msg = (
54+
f"The value {processes_from_settings} specified in SCANCODEIO_PROCESSES"
55+
f" exceeds the number of available CPUs on this machine."
56+
f" {max_workers} CPUs will be used instead for multiprocessing."
57+
)
58+
warnings.warn(msg, ResourceWarning)
59+
60+
return max_workers
61+
62+
63+
class InsufficientResourcesError(Exception):
64+
pass
65+
66+
67+
def process_advisories(
68+
advisories,
69+
advisory_func,
70+
progress_logger=None,
71+
batch_size=1000,
72+
):
73+
"""
74+
Run the `advisory_func` on the advisories of the provided `advisories`.
75+
76+
Multiprocessing is enabled by default on this pipe, the number of processes can be
77+
controlled through the `VULNERABLECODE_PROCESSES` setting.
78+
Multiprocessing can be disabled using `VULNERABLECODE_PROCESSES=0`,
79+
and threading can also be disabled `VULNERABLECODE_PROCESSES=-1`
80+
81+
The advisories QuerySet is chunked in `batch_size` results at the time,
82+
this can result in a significant reduction in memory usage.
83+
"""
84+
advisories_count = advisories.count()
85+
logger.info(f"Process {advisories_count} advisories with {advisory_func.__name__}")
86+
progress = LoopProgress(advisories_count, logger=progress_logger)
87+
max_workers = get_max_workers(keep_available=4)
88+
89+
advisory_batches = get_advisory_batches(advisories, batch_size)
90+
91+
if max_workers <= 0:
92+
for advisory_ids in progress.iter(advisory_batches):
93+
progress.log_progress()
94+
logger.debug(f"{advisory_func.__name__} len={len(advisory_ids)}")
95+
advisory_func(advisory_ids=advisory_ids, logger=None)
96+
return
97+
98+
logger.info(f"Starting ProcessPoolExecutor with {max_workers} max_workers")
99+
100+
with futures.ProcessPoolExecutor(max_workers) as executor:
101+
future_to_advisories = {
102+
executor.submit(advisory_func, advisory_ids, None): advisory_ids
103+
for advisory_ids in advisory_batches
104+
}
105+
106+
future_as_completed = futures.as_completed(future_to_advisories)
107+
108+
for future in progress.iter(future_as_completed):
109+
advisory_ids = future_to_advisories[future]
110+
progress.log_progress()
111+
logger.debug(f"{advisory_func.__name__} len={len(advisory_ids)}")
112+
try:
113+
future.result()
114+
except futures.process.BrokenProcessPool as broken_pool_error:
115+
message = (
116+
"You may not have enough resources to complete this operation. "
117+
"Please ensure that there is at least 2 GB of available memory per "
118+
"CPU core for successful execution."
119+
)
120+
raise broken_pool_error from InsufficientResourcesError(message)
121+
122+
123+
def get_advisory_batches(advisories, batch_size=1000):
124+
"""
125+
Yield lists of advisory ids each of upto batch size length.
126+
"""
127+
paginator = Paginator(advisories, per_page=batch_size)
128+
for page_number in paginator.page_range:
129+
page = paginator.page(page_number)
130+
yield [obj.id for obj in page.object_list]
131+
132+
133+
def recompute_content_ids(advisory_ids, logger):
134+
"""
135+
Recompute content IDs for all `advisory_ids`.
136+
"""
137+
advisories = Advisory.objects.exclude(unique_content_id__length=64).filter(id__in=advisory_ids)
138+
total_count = advisories.count()
139+
140+
if not total_count:
141+
logger("No advisories need content ID recomputation", level=logging.INFO)
142+
return
143+
144+
logger(f"Recomputing content IDs for {total_count} advisories", level=logging.INFO)
145+
146+
progress = LoopProgress(
147+
total_iterations=total_count,
148+
progress_step=total_count // 100,
149+
logger=logger,
150+
)
151+
152+
with transaction.atomic():
153+
advisories = advisories.select_for_update(nowait=True, skip_locked=True)
154+
if not advisories.exists():
155+
return
156+
advisories_to_update = []
157+
for advisory in progress.iter(advisories):
158+
advisory.unique_content_id = compute_content_id(advisory.to_advisory_data())
159+
advisories_to_update.append(advisory)
160+
161+
if advisories_to_update:
162+
Advisory.objects.bulk_update(
163+
advisories_to_update,
164+
["unique_content_id"],
165+
batch_size=len(advisories_to_update),
166+
)
167+
if logger:
168+
logger(
169+
f"Updated content IDs for {len(advisories_to_update)} advisories",
170+
level=logging.INFO,
171+
)
172+
173+
174+
class RecomputeContentIDPipeline(VulnerableCodePipeline):
175+
"""Pipeline to remove duplicate advisories based on their content."""
176+
177+
pipeline_id = "recompute_content_ids"
178+
BATCH_SIZE = 1000
179+
180+
@classmethod
181+
def steps(cls):
182+
return (cls.recompute_content_ids,)
183+
184+
def recompute_content_ids(self):
185+
"""
186+
Recompute content IDs for all advisories.
187+
"""
188+
while True:
189+
advisories = Advisory.objects.exclude(unique_content_id__length=64)
190+
if not advisories.exists():
191+
break
192+
process_advisories(
193+
advisories=advisories,
194+
advisory_func=recompute_content_ids,
195+
progress_logger=self.log,
196+
batch_size=1000,
197+
)

vulnerabilities/pipelines/remove_duplicate_advisories.py

Lines changed: 77 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,84 +11,104 @@
1111
from itertools import groupby
1212

1313
from aboutcode.pipeline import LoopProgress
14+
from django.db import transaction
1415
from django.db.models import Count
15-
from django.db.models import Q
1616

1717
from vulnerabilities.models import Advisory
1818
from vulnerabilities.pipelines import VulnerableCodePipeline
19-
from vulnerabilities.utils import compute_content_id
19+
from vulnerabilities.pipelines.recompute_content_ids import process_advisories
20+
21+
22+
def remove_duplicates_batch(advisory_ids, logger=None):
23+
"""
24+
Process a batch of advisories to remove duplicates.
25+
Keep only the oldest advisory for each content ID.
26+
"""
27+
try:
28+
with transaction.atomic():
29+
advisories = Advisory.objects.filter(id__in=advisory_ids).select_for_update(
30+
nowait=True, skip_locked=True
31+
)
32+
if not advisories.exists():
33+
return
34+
35+
advisories_by_content_id = groupby(
36+
advisories.order_by("unique_content_id").paginated(),
37+
key=lambda x: x.unique_content_id,
38+
)
39+
40+
progress = LoopProgress(total_iterations=advisories.count(), logger=logger)
41+
42+
for content_id, group_advisories in progress.iter(advisories_by_content_id):
43+
group_advisories = list(group_advisories)
44+
45+
if len(group_advisories) <= 1:
46+
continue
47+
48+
if logger:
49+
logger.info(
50+
f"Found {len(group_advisories)} duplicates for content ID {content_id}",
51+
)
52+
53+
oldest = min(group_advisories, key=lambda x: x.date_imported)
54+
55+
advisory_ids_to_delete = [adv.id for adv in group_advisories if adv.id != oldest.id]
56+
if advisory_ids_to_delete:
57+
Advisory.objects.filter(id__in=advisory_ids_to_delete).delete()
58+
if logger:
59+
logger.info(
60+
f"Kept advisory {oldest.id} and removed "
61+
f"{len(advisory_ids_to_delete)} duplicates for content ID {content_id}",
62+
)
63+
64+
except Exception as e:
65+
if logger:
66+
logger(
67+
f"Error processing batch of advisories: {e}",
68+
level=logging.ERROR,
69+
)
2070

2171

2272
class RemoveDuplicateAdvisoriesPipeline(VulnerableCodePipeline):
2373
"""Pipeline to remove duplicate advisories based on their content."""
2474

2575
pipeline_id = "remove_duplicate_advisories"
76+
BATCH_SIZE = 1000
2677

2778
@classmethod
2879
def steps(cls):
29-
return (
30-
cls.recompute_content_ids,
31-
cls.remove_duplicates,
32-
)
80+
return (cls.remove_duplicates,)
3381

3482
def remove_duplicates(self):
3583
"""
36-
Find advisories with the same content and keep only the latest one.
84+
Find advisories with the same content and keep only the oldest one.
85+
Process in parallel batches with proper transaction management.
3786
"""
38-
39-
duplicated_advisories = groupby(
40-
Advisory.objects.order_by("unique_content_id").all().paginated(),
41-
key=lambda x: x.unique_content_id,
42-
)
43-
progress = LoopProgress(total_iterations=Advisory.objects.count(), logger=self.log)
44-
for _content_id, advisories in progress.iter(duplicated_advisories):
45-
advisories = list(advisories)
46-
self.log(
47-
f"Removing duplicates for content ID {_content_id} {len(advisories)}",
48-
level=logging.INFO,
49-
)
50-
oldest = min(advisories, key=lambda x: x.date_imported)
51-
try:
52-
advisory_ids = []
53-
for adv in advisories:
54-
if adv.id != oldest.id:
55-
advisory_ids.append(adv.id)
56-
Advisory.objects.filter(id__in=advisory_ids).delete()
57-
except Exception as e:
58-
self.log(f"Error deleting advisories: {e}", level=logging.ERROR)
59-
60-
self.log(
61-
f"Kept advisory {oldest.id} and removed "
62-
f"{len(list(advisories)) - 1} duplicates for content ID {_content_id}",
63-
level=logging.INFO,
87+
while True:
88+
duplicate_content_ids = (
89+
Advisory.objects.values("unique_content_id")
90+
.annotate(count=Count("id"))
91+
.filter(count__gt=1)
92+
.values_list("unique_content_id", flat=True)
6493
)
6594

66-
def recompute_content_ids(self):
67-
"""
68-
Recompute content IDs for all advisories.
69-
"""
70-
71-
advisories_list = []
95+
print(f"duplicate_content_ids: {duplicate_content_ids}")
7296

73-
advisories = Advisory.objects.exclude(unique_content_id__length=64)
97+
advisories = Advisory.objects.filter(unique_content_id__in=duplicate_content_ids)
7498

75-
progress = LoopProgress(
76-
total_iterations=advisories.count(),
77-
progress_step=1000,
78-
logger=self.log,
79-
)
99+
if not advisories.exists():
100+
break
80101

81-
batch_size = 50000
102+
self.log(
103+
f"Processing {advisories.count()} content IDs with duplicates",
104+
level=logging.INFO,
105+
)
82106

83-
for advisory in progress.iter(advisories.paginated(per_page=batch_size)):
84-
self.log(f"Recomputing content ID for advisory {advisory.id}", level=logging.INFO)
85-
advisory.unique_content_id = compute_content_id(advisory.to_advisory_data())
86-
advisories_list.append(advisory)
87-
if len(advisories_list) % batch_size == 0:
88-
Advisory.objects.bulk_update(
89-
advisories_list, ["unique_content_id"], batch_size=batch_size
90-
)
91-
advisories_list = []
107+
process_advisories(
108+
advisories=advisories,
109+
advisory_func=remove_duplicates_batch,
110+
progress_logger=self.log,
111+
batch_size=self.BATCH_SIZE,
112+
)
92113

93-
if advisories:
94-
Advisory.objects.bulk_update(advisories, ["unique_content_id"], batch_size=batch_size)
114+
self.log("Completed duplicate removal batch", level=logging.INFO)

0 commit comments

Comments
 (0)