Skip to content
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ services:
- db
- vulnerablecode

vulnerablecode_rqworker_high:
build: .
command: wait-for-it web:8000 -- python ./manage.py rqworker high
env_file:
- docker.env
volumes:
- /etc/vulnerablecode/:/etc/vulnerablecode/
depends_on:
- vulnerablecode_redis
- db
- vulnerablecode

nginx:
image: nginx
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 5.2.11 on 2026-04-08 10:48

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0119_remove_advisoryset_identifiers_and_more"),
]

operations = [
migrations.AddField(
model_name="impactedpackage",
name="last_range_unfurl_at",
field=models.DateTimeField(
blank=True,
db_index=True,
help_text="Timestamp of the last vers range unfurl.",
null=True,
),
),
migrations.AddField(
model_name="impactedpackage",
name="last_successful_range_unfurl_at",
field=models.DateTimeField(
blank=True,
db_index=True,
help_text="Timestamp of the last successful vers range unfurl.",
null=True,
),
),
migrations.AddField(
model_name="pipelineschedule",
name="run_priority",
field=models.IntegerField(
choices=[(1, "high"), (2, "default")],
default=2,
help_text="Select the pipeline execution priority",
),
),
]
26 changes: 26 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2262,6 +2262,10 @@ def requeue(self):
class PipelineSchedule(models.Model):
"""The Database representation of a pipeline schedule."""

class ExecutionPriority(models.IntegerChoices):
HIGH = 1, "high"
DEFAULT = 2, "default"

pipeline_id = models.CharField(
max_length=600,
help_text=("Identify a registered Pipeline class."),
Expand Down Expand Up @@ -2306,6 +2310,14 @@ class PipelineSchedule(models.Model):
help_text=("Number of hours to wait between run of this pipeline."),
)

run_priority = models.IntegerField(
null=False,
blank=False,
choices=ExecutionPriority.choices,
default=ExecutionPriority.DEFAULT,
help_text=("Select the pipeline execution priority"),
)

schedule_work_id = models.CharField(
max_length=255,
unique=True,
Expand Down Expand Up @@ -3240,6 +3252,20 @@ class ImpactedPackage(models.Model):
help_text="Timestamp indicating when this impact was added.",
)

last_range_unfurl_at = models.DateTimeField(
blank=True,
null=True,
db_index=True,
help_text="Timestamp of the last vers range unfurl.",
)

last_successful_range_unfurl_at = models.DateTimeField(
blank=True,
null=True,
db_index=True,
help_text="Timestamp of the last successful vers range unfurl.",
)

def to_dict(self):
from vulnerabilities.utils import purl_to_dict

Expand Down
10 changes: 10 additions & 0 deletions vulnerabilities/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from vulnerabilities.improver import MAX_CONFIDENCE
from vulnerabilities.models import Advisory
from vulnerabilities.models import PipelineRun
from vulnerabilities.models import PipelineSchedule
from vulnerabilities.pipes.advisory import import_advisory
from vulnerabilities.pipes.advisory import insert_advisory
from vulnerabilities.pipes.advisory import insert_advisory_v2
Expand Down Expand Up @@ -144,6 +145,9 @@ class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun):
# When set to true pipeline is run only once.
# To rerun onetime pipeline reset is_active field to True via migration.
run_once = False
# Interval between runs in hour.
run_interval = 24
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT

def on_failure(self):
"""
Expand Down Expand Up @@ -176,6 +180,9 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline):
# When set to true pipeline is run only once.
# To rerun onetime pipeline reset is_active field to True via migration.
run_once = False
# Interval between runs in hour.
run_interval = 24
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT

@classmethod
def steps(cls):
Expand Down Expand Up @@ -277,6 +284,9 @@ class VulnerableCodeBaseImporterPipelineV2(VulnerableCodePipeline):
# When set to true pipeline is run only once.
# To rerun onetime pipeline reset is_active field to True via migration.
run_once = False
# Interval between runs in hour.
run_interval = 24
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT

@classmethod
def steps(cls):
Expand Down
87 changes: 72 additions & 15 deletions vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
#

import logging
from datetime import timedelta
from traceback import format_exc as traceback_format_exc

from aboutcode.pipeline import LoopProgress
from django.db.models import F
from django.db.models import Q
from django.utils import timezone
from fetchcode.package_versions import SUPPORTED_ECOSYSTEMS as FETCHCODE_SUPPORTED_ECOSYSTEMS
from packageurl import PackageURL
from univers.version_range import RANGE_CLASS_BY_SCHEMES
Expand All @@ -19,44 +23,64 @@
from vulnerabilities.models import ImpactedPackage
from vulnerabilities.models import ImpactedPackageAffecting
from vulnerabilities.models import PackageV2
from vulnerabilities.models import PipelineSchedule
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.pipes.fetchcode_utils import get_versions
from vulnerabilities.utils import update_purl_version


class UnfurlVersionRangePipeline(VulnerableCodePipeline):
"""
Unfurl affected version ranges by first processing those that have
never been unfurled and then handling ranges that were last unfurled
two or more days ago.
"""

pipeline_id = "unfurl_version_range_v2"

run_interval = 2
run_priority = PipelineSchedule.ExecutionPriority.HIGH

# Days elapsed before version range is re-unfurled
reunfurl_after_days = 2

@classmethod
def steps(cls):
return (cls.unfurl_version_range,)

def unfurl_version_range(self):
impacted_packages = ImpactedPackage.objects.all().order_by("-created_at")
impacted_packages_count = impacted_packages.count()

processed_impacted_packages_count = 0
processed_affected_packages_count = 0
cached_versions = {}
update_unfurl_date = []
update_successful_unfurl_date = []
update_batch_size = 5000
chunk_size = 5000

impacted_packages = impacted_package_qs(cutoff_day=self.reunfurl_after_days)
impacted_packages_count = impacted_packages.count()
self.log(f"Unfurl affected vers range for {impacted_packages_count:,d} ImpactedPackage.")
progress = LoopProgress(total_iterations=impacted_packages_count, logger=self.log)
for impact in progress.iter(impacted_packages):

progress = LoopProgress(
total_iterations=impacted_packages_count, progress_step=5, logger=self.log
)
for impact in progress.iter(impacted_packages.iterator(chunk_size=chunk_size)):
update_unfurl_date.append(impact.pk)
purl = PackageURL.from_string(impact.base_purl)
if not impact.affecting_vers or not any(
c in impact.affecting_vers for c in ("<", ">", "!")
):
update_successful_unfurl_date.append(impact.pk)
continue
if purl.type not in FETCHCODE_SUPPORTED_ECOSYSTEMS:
continue
if purl.type not in RANGE_CLASS_BY_SCHEMES:
continue

versions = get_purl_versions(purl, cached_versions) or []
versions = get_purl_versions(purl, cached_versions, self.log) or []
affected_purls = get_affected_purls(
versions=versions,
affecting_vers=impact.affecting_vers,
base_purl=purl,
impact=impact,
logger=self.log,
)
if not affected_purls:
Expand All @@ -68,14 +92,31 @@ def unfurl_version_range(self):
relation=ImpactedPackageAffecting,
logger=self.log,
)
update_successful_unfurl_date.append(impact.pk)
processed_impacted_packages_count += 1

if len(update_unfurl_date) > update_batch_size:
ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update(
last_range_unfurl_at=timezone.now()
)
ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update(
last_successful_range_unfurl_at=timezone.now()
)
update_unfurl_date.clear()
update_successful_unfurl_date.clear()

ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update(
last_range_unfurl_at=timezone.now()
)
ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update(
last_successful_range_unfurl_at=timezone.now()
)
self.log(f"Successfully processed {processed_impacted_packages_count:,d} ImpactedPackage.")
self.log(f"{processed_affected_packages_count:,d} new Impact-Package relation created.")


def get_affected_purls(versions, affecting_vers, base_purl, logger):
affecting_version_range = VersionRange.from_string(affecting_vers)
def get_affected_purls(versions, impact, logger):
affecting_version_range = VersionRange.from_string(impact.affecting_vers)
version_class = affecting_version_range.version_class

try:
Expand All @@ -84,7 +125,7 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
versions = [version_class(v) for v in versions]
except Exception as e:
logger(
f"Error while parsing versions for {base_purl!s}: {e!r} \n {traceback_format_exc()}",
f"Error while parsing versions for {impact.base_purl!s}: {e!r} \n {traceback_format_exc()}",
level=logging.ERROR,
)
return
Expand All @@ -95,21 +136,24 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
if version in affecting_version_range:
affected_purls.append(
update_purl_version(
purl=base_purl,
purl=impact.base_purl,
version=str(version),
)
)
except Exception as e:
logger(
f"Error while checking {version!s} in {affecting_version_range!s}: {e!r} \n {traceback_format_exc()}",
(
f"Error while checking {version!s} in {affecting_version_range!s} for "
f"advisory {impact.advisory.avid}: {e!r} \n {traceback_format_exc()}"
),
level=logging.ERROR,
)
return affected_purls


def get_purl_versions(purl, cached_versions):
def get_purl_versions(purl, cached_versions, logger):
if not purl in cached_versions:
purls = get_versions(purl)
purls = get_versions(purl, logger)
if purls is not None:
cached_versions[purl] = purls
return cached_versions.get(purl) or []
Expand All @@ -135,3 +179,16 @@ def bulk_create_with_m2m(purls, impact, relation, logger):
return 0

return len(relations)


def impacted_package_qs(cutoff_day=2):
cutoff = timezone.now() - timedelta(days=cutoff_day)
return (
ImpactedPackage.objects.filter(
(Q(last_range_unfurl_at__isnull=True) | Q(last_range_unfurl_at__lte=cutoff))
& Q(affecting_vers__isnull=False)
& ~Q(affecting_vers="")
)
.order_by(F("last_range_unfurl_at").asc(nulls_first=True))
.only("pk", "affecting_vers", "advisory", "base_purl")
)
13 changes: 12 additions & 1 deletion vulnerabilities/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,21 @@ def update_pipeline_schedule():
PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete()
for id, pipeline_class in pipelines.items():
run_once = getattr(pipeline_class, "run_once", False)
run_interval = getattr(pipeline_class, "run_interval", 24)
run_priority = getattr(
pipeline_class, "run_priority", PipelineSchedule.ExecutionPriority.DEFAULT
)

PipelineSchedule.objects.get_or_create(
pipeline, created = PipelineSchedule.objects.get_or_create(
pipeline_id=id,
defaults={
"is_run_once": run_once,
"run_interval": run_interval,
"run_priority": run_priority,
},
)

if not created:
pipeline.run_priority = run_priority
pipeline.run_interval = run_interval
pipeline.save()
16 changes: 13 additions & 3 deletions vulnerabilities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@

logger = logging.getLogger(__name__)

queue = django_rq.get_queue("default")
default_queue = django_rq.get_queue("default")
high_queue = django_rq.get_queue("high")

queues = {
"default": django_rq.get_queue("default"),
"high": django_rq.get_queue("high"),
}


def execute_pipeline(pipeline_id, run_id):
Expand Down Expand Up @@ -112,6 +118,8 @@ def set_run_failure(job, connection, type, value, traceback):

def enqueue_pipeline(pipeline_id):
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
queue = queues.get(pipeline_schedule.get_run_priority_display())

if pipeline_schedule.status in [
models.PipelineRun.Status.RUNNING,
models.PipelineRun.Status.QUEUED,
Expand Down Expand Up @@ -139,5 +147,7 @@ def enqueue_pipeline(pipeline_id):

def dequeue_job(job_id):
"""Remove a job from queue if it hasn't been executed yet."""
if job_id in queue.jobs:
queue.remove(job_id)

for queue in queues.values():
if job_id in queue.jobs:
queue.remove(job_id)
2 changes: 2 additions & 0 deletions vulnerabilities/templates/pipeline_dashboard.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ <h1>Pipeline Dashboard</h1>
<div class="columns is-mobile is-vcentered">
<div class="column is-one-quarter">Pipeline ID</div>
<div class="column is-one-eighth">Active</div>
<div class="column is-one-eighth">Priority</div>
<div class="column is-one-eighth">Interval</div>
<div class="column is-one-eighth">Status</div>
<div class="column is-one-fifth">Last Run End Time</div>
Expand All @@ -79,6 +80,7 @@ <h1>Pipeline Dashboard</h1>
<div class="columns px-1 is-mobile is-vcentered">
<div class="column is-one-quarter">{{ schedule.pipeline_id }}</div>
<div class="column is-one-eighth has-text-grey">{{ schedule.is_active|yesno:"Yes,No" }}</div>
<div class="column is-one-eighth has-text-grey">{{ schedule.get_run_priority_display|capfirst}}</div>
<div class="column is-one-eighth has-text-grey">
{% if schedule.is_run_once %}
Once
Expand Down
Loading