Skip to content

Commit b3b43ab

Browse files
committed
Alternate content id migration pipeline
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 3eab471 commit b3b43ab

File tree

1 file changed

+70
-58
lines changed

1 file changed

+70
-58
lines changed

vulnerabilities/pipelines/remove_duplicate_advisories.py

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
99

10-
import logging
11-
from itertools import groupby
12-
1310
from aboutcode.pipeline import LoopProgress
14-
from django.db.models import Count
15-
from django.db.models import Q
1611

1712
from vulnerabilities.models import Advisory
1813
from vulnerabilities.pipelines import VulnerableCodePipeline
@@ -26,69 +21,86 @@ class RemoveDuplicateAdvisoriesPipeline(VulnerableCodePipeline):
2621

2722
@classmethod
2823
def steps(cls):
29-
return (
30-
cls.recompute_content_ids,
31-
cls.remove_duplicates,
32-
)
24+
return (cls.remove_duplicates,)
3325

3426
def remove_duplicates(self):
3527
"""
36-
Find advisories with the same content and keep only the latest one.
28+
Recompute content id and remove advisories with the same content and keep only the latest one.
3729
"""
3830

39-
duplicated_advisories = groupby(
40-
Advisory.objects.order_by("unique_content_id").all().paginated(),
41-
key=lambda x: x.unique_content_id,
42-
)
43-
progress = LoopProgress(total_iterations=Advisory.objects.count(), logger=self.log)
44-
for _content_id, advisories in progress.iter(duplicated_advisories):
45-
advisories = list(advisories)
46-
self.log(
47-
f"Removing duplicates for content ID {_content_id} {len(advisories)}",
48-
level=logging.INFO,
49-
)
50-
oldest = min(advisories, key=lambda x: x.date_imported)
51-
try:
52-
advisory_ids = []
53-
for adv in advisories:
54-
if adv.id != oldest.id:
55-
advisory_ids.append(adv.id)
56-
Advisory.objects.filter(id__in=advisory_ids).delete()
57-
except Exception as e:
58-
self.log(f"Error deleting advisories: {e}", level=logging.ERROR)
59-
60-
self.log(
61-
f"Kept advisory {oldest.id} and removed "
62-
f"{len(list(advisories)) - 1} duplicates for content ID {_content_id}",
63-
level=logging.INFO,
64-
)
65-
66-
def recompute_content_ids(self):
67-
"""
68-
Recompute content IDs for all advisories.
69-
"""
70-
71-
advisories_list = []
31+
advisories_count = Advisory.objects.all().count()
32+
self.log(f"Computing new content id for {advisories_count} and removing duplicates.")
7233

73-
advisories = Advisory.objects.exclude(unique_content_id__length=64)
34+
update_batch_size = 500
35+
delete_batch_size = 1000
36+
chunk_size = 50000
37+
deleted_advisory_count = 0
38+
updated_advisory_count = 0
39+
duplicate_advisory_id = []
40+
updated_advisory = []
41+
content_ids = set()
7442

43+
advisories = Advisory.objects.all().order_by("-id").paginated(per_page=chunk_size)
7544
progress = LoopProgress(
76-
total_iterations=advisories.count(),
77-
progress_step=1000,
45+
total_iterations=advisories_count,
7846
logger=self.log,
47+
progress_step=1,
7948
)
8049

81-
batch_size = 50000
82-
83-
for advisory in progress.iter(advisories.paginated(per_page=batch_size)):
84-
self.log(f"Recomputing content ID for advisory {advisory.id}", level=logging.INFO)
85-
advisory.unique_content_id = compute_content_id(advisory.to_advisory_data())
86-
advisories_list.append(advisory)
87-
if len(advisories_list) % batch_size == 0:
88-
Advisory.objects.bulk_update(
89-
advisories_list, ["unique_content_id"], batch_size=batch_size
50+
for advisory in progress.iter(advisories):
51+
content_id = compute_content_id(advisory.to_advisory_data())
52+
if content_id in content_ids:
53+
duplicate_advisory_id.append(advisory.id)
54+
else:
55+
if advisory.unique_content_id != content_id:
56+
advisory.unique_content_id = content_id
57+
updated_advisory.append(advisory)
58+
content_ids.add(content_id)
59+
if len(duplicate_advisory_id) > delete_batch_size:
60+
deleted_advisory_count += delete_advisories(
61+
advisory_ids=duplicate_advisory_id,
62+
logger=self.log,
9063
)
91-
advisories_list = []
64+
if len(updated_advisory) > update_batch_size:
65+
updated_advisory_count += bulk_update_advisory(
66+
items=updated_advisory,
67+
fields=["unique_content_id"],
68+
logger=self.log,
69+
)
70+
71+
deleted_advisory_count += delete_advisories(
72+
advisory_ids=duplicate_advisory_id,
73+
logger=self.log,
74+
)
75+
updated_advisory_count += bulk_update_advisory(
76+
items=updated_advisory,
77+
fields=["unique_content_id"],
78+
logger=self.log,
79+
)
9280

93-
if advisories:
94-
Advisory.objects.bulk_update(advisories, ["unique_content_id"], batch_size=batch_size)
81+
self.log(f"Removed {deleted_advisory_count} duplicates advisories.")
82+
self.log(f"Updated content id for {deleted_advisory_count} advisories.")
83+
84+
85+
def bulk_update_advisory(items, fields, logger):
86+
item_count = 0
87+
if items:
88+
try:
89+
Advisory.objects.bulk_update(objs=items, fields=fields)
90+
item_count += len(items)
91+
except Exception as e:
92+
logger(f"Error updating Advisory: {e}")
93+
items.clear()
94+
return item_count
95+
96+
97+
def delete_advisories(advisory_ids, logger):
98+
item_count = 0
99+
if advisory_ids:
100+
try:
101+
Advisory.objects.filter(id__in=advisory_ids).delete()
102+
item_count += len(advisory_ids)
103+
except Exception as e:
104+
logger(f"Error deleting Advisory: {e}")
105+
advisory_ids.clear()
106+
return item_count

0 commit comments

Comments
 (0)