Skip to content

Commit 09f158e

Browse files
authored
Merge pull request #2256 from aboutcode-org/2253-priority
feat: add high priority queue and run version range unfurling pipeline more frequently
2 parents 4ab8b2f + 41484a5 commit 09f158e

File tree

10 files changed

+294
-25
lines changed

10 files changed

+294
-25
lines changed

docker-compose.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ services:
5757
- db
5858
- vulnerablecode
5959

60+
vulnerablecode_rqworker_high:
61+
build: .
62+
command: wait-for-it web:8000 -- python ./manage.py rqworker high
63+
env_file:
64+
- docker.env
65+
volumes:
66+
- /etc/vulnerablecode/:/etc/vulnerablecode/
67+
depends_on:
68+
- vulnerablecode_redis
69+
- db
70+
- vulnerablecode
6071

6172
nginx:
6273
image: nginx
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Generated by Django 5.2.11 on 2026-04-08 10:48
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vulnerabilities", "0119_remove_advisoryset_identifiers_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name="impactedpackage",
15+
name="last_range_unfurl_at",
16+
field=models.DateTimeField(
17+
blank=True,
18+
db_index=True,
19+
help_text="Timestamp of the last vers range unfurl.",
20+
null=True,
21+
),
22+
),
23+
migrations.AddField(
24+
model_name="impactedpackage",
25+
name="last_successful_range_unfurl_at",
26+
field=models.DateTimeField(
27+
blank=True,
28+
db_index=True,
29+
help_text="Timestamp of the last successful vers range unfurl.",
30+
null=True,
31+
),
32+
),
33+
migrations.AddField(
34+
model_name="pipelineschedule",
35+
name="run_priority",
36+
field=models.IntegerField(
37+
choices=[(1, "high"), (2, "default")],
38+
default=2,
39+
help_text="Select the pipeline execution priority",
40+
),
41+
),
42+
]

vulnerabilities/models.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2262,6 +2262,10 @@ def requeue(self):
22622262
class PipelineSchedule(models.Model):
22632263
"""The Database representation of a pipeline schedule."""
22642264

2265+
class ExecutionPriority(models.IntegerChoices):
2266+
HIGH = 1, "high"
2267+
DEFAULT = 2, "default"
2268+
22652269
pipeline_id = models.CharField(
22662270
max_length=600,
22672271
help_text=("Identify a registered Pipeline class."),
@@ -2306,6 +2310,14 @@ class PipelineSchedule(models.Model):
23062310
help_text=("Number of hours to wait between run of this pipeline."),
23072311
)
23082312

2313+
run_priority = models.IntegerField(
2314+
null=False,
2315+
blank=False,
2316+
choices=ExecutionPriority.choices,
2317+
default=ExecutionPriority.DEFAULT,
2318+
help_text=("Select the pipeline execution priority"),
2319+
)
2320+
23092321
schedule_work_id = models.CharField(
23102322
max_length=255,
23112323
unique=True,
@@ -3240,6 +3252,20 @@ class ImpactedPackage(models.Model):
32403252
help_text="Timestamp indicating when this impact was added.",
32413253
)
32423254

3255+
last_range_unfurl_at = models.DateTimeField(
3256+
blank=True,
3257+
null=True,
3258+
db_index=True,
3259+
help_text="Timestamp of the last vers range unfurl.",
3260+
)
3261+
3262+
last_successful_range_unfurl_at = models.DateTimeField(
3263+
blank=True,
3264+
null=True,
3265+
db_index=True,
3266+
help_text="Timestamp of the last successful vers range unfurl.",
3267+
)
3268+
32433269
def to_dict(self):
32443270
from vulnerabilities.utils import purl_to_dict
32453271

vulnerabilities/pipelines/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from vulnerabilities.improver import MAX_CONFIDENCE
2525
from vulnerabilities.models import Advisory
2626
from vulnerabilities.models import PipelineRun
27+
from vulnerabilities.models import PipelineSchedule
2728
from vulnerabilities.pipes.advisory import import_advisory
2829
from vulnerabilities.pipes.advisory import insert_advisory
2930
from vulnerabilities.pipes.advisory import insert_advisory_v2
@@ -144,6 +145,9 @@ class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun):
144145
# When set to true pipeline is run only once.
145146
# To rerun onetime pipeline reset is_active field to True via migration.
146147
run_once = False
148+
# Interval between runs in hour.
149+
run_interval = 24
150+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
147151

148152
def on_failure(self):
149153
"""
@@ -176,6 +180,9 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline):
176180
# When set to true pipeline is run only once.
177181
# To rerun onetime pipeline reset is_active field to True via migration.
178182
run_once = False
183+
# Interval between runs in hour.
184+
run_interval = 24
185+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
179186

180187
@classmethod
181188
def steps(cls):
@@ -277,6 +284,9 @@ class VulnerableCodeBaseImporterPipelineV2(VulnerableCodePipeline):
277284
# When set to true pipeline is run only once.
278285
# To rerun onetime pipeline reset is_active field to True via migration.
279286
run_once = False
287+
# Interval between runs in hour.
288+
run_interval = 24
289+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
280290

281291
@classmethod
282292
def steps(cls):

vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88
#
99

1010
import logging
11+
from datetime import timedelta
1112
from traceback import format_exc as traceback_format_exc
1213

1314
from aboutcode.pipeline import LoopProgress
15+
from django.db.models import F
16+
from django.db.models import Q
17+
from django.utils import timezone
1418
from fetchcode.package_versions import SUPPORTED_ECOSYSTEMS as FETCHCODE_SUPPORTED_ECOSYSTEMS
1519
from packageurl import PackageURL
1620
from univers.version_range import RANGE_CLASS_BY_SCHEMES
@@ -19,44 +23,64 @@
1923
from vulnerabilities.models import ImpactedPackage
2024
from vulnerabilities.models import ImpactedPackageAffecting
2125
from vulnerabilities.models import PackageV2
26+
from vulnerabilities.models import PipelineSchedule
2227
from vulnerabilities.pipelines import VulnerableCodePipeline
2328
from vulnerabilities.pipes.fetchcode_utils import get_versions
2429
from vulnerabilities.utils import update_purl_version
2530

2631

2732
class UnfurlVersionRangePipeline(VulnerableCodePipeline):
33+
"""
34+
Unfurl affected version ranges by first processing those that have
35+
never been unfurled and then handling ranges that were last unfurled
36+
two or more days ago.
37+
"""
2838

2939
pipeline_id = "unfurl_version_range_v2"
3040

41+
run_interval = 2
42+
run_priority = PipelineSchedule.ExecutionPriority.HIGH
43+
44+
# Days elapsed before version range is re-unfurled
45+
reunfurl_after_days = 2
46+
3147
@classmethod
3248
def steps(cls):
3349
return (cls.unfurl_version_range,)
3450

3551
def unfurl_version_range(self):
36-
impacted_packages = ImpactedPackage.objects.all().order_by("-created_at")
37-
impacted_packages_count = impacted_packages.count()
38-
3952
processed_impacted_packages_count = 0
4053
processed_affected_packages_count = 0
4154
cached_versions = {}
55+
update_unfurl_date = []
56+
update_successful_unfurl_date = []
57+
update_batch_size = 5000
58+
chunk_size = 5000
59+
60+
impacted_packages = impacted_package_qs(cutoff_day=self.reunfurl_after_days)
61+
impacted_packages_count = impacted_packages.count()
4262
self.log(f"Unfurl affected vers range for {impacted_packages_count:,d} ImpactedPackage.")
43-
progress = LoopProgress(total_iterations=impacted_packages_count, logger=self.log)
44-
for impact in progress.iter(impacted_packages):
63+
64+
progress = LoopProgress(
65+
total_iterations=impacted_packages_count, progress_step=5, logger=self.log
66+
)
67+
for impact in progress.iter(impacted_packages.iterator(chunk_size=chunk_size)):
68+
update_unfurl_date.append(impact.pk)
4569
purl = PackageURL.from_string(impact.base_purl)
4670
if not impact.affecting_vers or not any(
4771
c in impact.affecting_vers for c in ("<", ">", "!")
4872
):
73+
update_successful_unfurl_date.append(impact.pk)
4974
continue
5075
if purl.type not in FETCHCODE_SUPPORTED_ECOSYSTEMS:
5176
continue
5277
if purl.type not in RANGE_CLASS_BY_SCHEMES:
5378
continue
5479

55-
versions = get_purl_versions(purl, cached_versions) or []
80+
versions = get_purl_versions(purl, cached_versions, self.log) or []
5681
affected_purls = get_affected_purls(
5782
versions=versions,
58-
affecting_vers=impact.affecting_vers,
59-
base_purl=purl,
83+
impact=impact,
6084
logger=self.log,
6185
)
6286
if not affected_purls:
@@ -68,14 +92,31 @@ def unfurl_version_range(self):
6892
relation=ImpactedPackageAffecting,
6993
logger=self.log,
7094
)
95+
update_successful_unfurl_date.append(impact.pk)
7196
processed_impacted_packages_count += 1
7297

98+
if len(update_unfurl_date) > update_batch_size:
99+
ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update(
100+
last_range_unfurl_at=timezone.now()
101+
)
102+
ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update(
103+
last_successful_range_unfurl_at=timezone.now()
104+
)
105+
update_unfurl_date.clear()
106+
update_successful_unfurl_date.clear()
107+
108+
ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update(
109+
last_range_unfurl_at=timezone.now()
110+
)
111+
ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update(
112+
last_successful_range_unfurl_at=timezone.now()
113+
)
73114
self.log(f"Successfully processed {processed_impacted_packages_count:,d} ImpactedPackage.")
74115
self.log(f"{processed_affected_packages_count:,d} new Impact-Package relation created.")
75116

76117

77-
def get_affected_purls(versions, affecting_vers, base_purl, logger):
78-
affecting_version_range = VersionRange.from_string(affecting_vers)
118+
def get_affected_purls(versions, impact, logger):
119+
affecting_version_range = VersionRange.from_string(impact.affecting_vers)
79120
version_class = affecting_version_range.version_class
80121

81122
try:
@@ -84,7 +125,7 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
84125
versions = [version_class(v) for v in versions]
85126
except Exception as e:
86127
logger(
87-
f"Error while parsing versions for {base_purl!s}: {e!r} \n {traceback_format_exc()}",
128+
f"Error while parsing versions for {impact.base_purl!s}: {e!r} \n {traceback_format_exc()}",
88129
level=logging.ERROR,
89130
)
90131
return
@@ -95,21 +136,24 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
95136
if version in affecting_version_range:
96137
affected_purls.append(
97138
update_purl_version(
98-
purl=base_purl,
139+
purl=impact.base_purl,
99140
version=str(version),
100141
)
101142
)
102143
except Exception as e:
103144
logger(
104-
f"Error while checking {version!s} in {affecting_version_range!s}: {e!r} \n {traceback_format_exc()}",
145+
(
146+
f"Error while checking {version!s} in {affecting_version_range!s} for "
147+
f"advisory {impact.advisory.avid}: {e!r} \n {traceback_format_exc()}"
148+
),
105149
level=logging.ERROR,
106150
)
107151
return affected_purls
108152

109153

110-
def get_purl_versions(purl, cached_versions):
154+
def get_purl_versions(purl, cached_versions, logger):
111155
if not purl in cached_versions:
112-
purls = get_versions(purl)
156+
purls = get_versions(purl, logger)
113157
if purls is not None:
114158
cached_versions[purl] = purls
115159
return cached_versions.get(purl) or []
@@ -135,3 +179,16 @@ def bulk_create_with_m2m(purls, impact, relation, logger):
135179
return 0
136180

137181
return len(relations)
182+
183+
184+
def impacted_package_qs(cutoff_day=2):
185+
cutoff = timezone.now() - timedelta(days=cutoff_day)
186+
return (
187+
ImpactedPackage.objects.filter(
188+
(Q(last_range_unfurl_at__isnull=True) | Q(last_range_unfurl_at__lte=cutoff))
189+
& Q(affecting_vers__isnull=False)
190+
& ~Q(affecting_vers="")
191+
)
192+
.order_by(F("last_range_unfurl_at").asc(nulls_first=True))
193+
.only("pk", "affecting_vers", "advisory", "base_purl")
194+
)

vulnerabilities/schedules.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,21 @@ def update_pipeline_schedule():
9595
PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete()
9696
for id, pipeline_class in pipelines.items():
9797
run_once = getattr(pipeline_class, "run_once", False)
98+
run_interval = getattr(pipeline_class, "run_interval", 24)
99+
run_priority = getattr(
100+
pipeline_class, "run_priority", PipelineSchedule.ExecutionPriority.DEFAULT
101+
)
98102

99-
PipelineSchedule.objects.get_or_create(
103+
pipeline, created = PipelineSchedule.objects.get_or_create(
100104
pipeline_id=id,
101105
defaults={
102106
"is_run_once": run_once,
107+
"run_interval": run_interval,
108+
"run_priority": run_priority,
103109
},
104110
)
111+
112+
if not created:
113+
pipeline.run_priority = run_priority
114+
pipeline.run_interval = run_interval
115+
pipeline.save()

vulnerabilities/tasks.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
logger = logging.getLogger(__name__)
2222

23-
queue = django_rq.get_queue("default")
23+
default_queue = django_rq.get_queue("default")
24+
high_queue = django_rq.get_queue("high")
25+
26+
queues = {
27+
"default": django_rq.get_queue("default"),
28+
"high": django_rq.get_queue("high"),
29+
}
2430

2531

2632
def execute_pipeline(pipeline_id, run_id):
@@ -112,6 +118,8 @@ def set_run_failure(job, connection, type, value, traceback):
112118

113119
def enqueue_pipeline(pipeline_id):
114120
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
121+
queue = queues.get(pipeline_schedule.get_run_priority_display())
122+
115123
if pipeline_schedule.status in [
116124
models.PipelineRun.Status.RUNNING,
117125
models.PipelineRun.Status.QUEUED,
@@ -139,5 +147,7 @@ def enqueue_pipeline(pipeline_id):
139147

140148
def dequeue_job(job_id):
141149
"""Remove a job from queue if it hasn't been executed yet."""
142-
if job_id in queue.jobs:
143-
queue.remove(job_id)
150+
151+
for queue in queues.values():
152+
if job_id in queue.jobs:
153+
queue.remove(job_id)

vulnerabilities/templates/pipeline_dashboard.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ <h1>Pipeline Dashboard</h1>
6262
<div class="columns is-mobile is-vcentered">
6363
<div class="column is-one-quarter">Pipeline ID</div>
6464
<div class="column is-one-eighth">Active</div>
65+
<div class="column is-one-eighth">Priority</div>
6566
<div class="column is-one-eighth">Interval</div>
6667
<div class="column is-one-eighth">Status</div>
6768
<div class="column is-one-fifth">Last Run End Time</div>
@@ -79,6 +80,7 @@ <h1>Pipeline Dashboard</h1>
7980
<div class="columns px-1 is-mobile is-vcentered">
8081
<div class="column is-one-quarter">{{ schedule.pipeline_id }}</div>
8182
<div class="column is-one-eighth has-text-grey">{{ schedule.is_active|yesno:"Yes,No" }}</div>
83+
<div class="column is-one-eighth has-text-grey">{{ schedule.get_run_priority_display|capfirst}}</div>
8284
<div class="column is-one-eighth has-text-grey">
8385
{% if schedule.is_run_once %}
8486
Once

0 commit comments

Comments
 (0)