Skip to content

Commit 11a9681

Browse files
committed
Deactivate onetime pipeline after first run
- To rerun onetime pipeline, reset `is_active` to `True` via a migration, pipeline will run one more time and then deactivate. - To convert a onetime pipeline to a regular pipeline, set the `run_once` class variable to `False` and reset the `is_active` field to `True` via a migration. Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 0dbd545 commit 11a9681

File tree

3 files changed

+34
-15
lines changed

3 files changed

+34
-15
lines changed

vulnerabilities/management/commands/run_scheduler.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717

1818

1919
def init_pipeline_scheduled():
20-
"""Initialize schedule jobs for active PipelineSchedule."""
21-
active_pipeline_qs = models.PipelineSchedule.objects.filter(is_active=True).order_by(
22-
"created_date"
23-
)
24-
for pipeline_schedule in active_pipeline_qs:
25-
if scheduled_job_exists(pipeline_schedule.schedule_work_id):
26-
continue
27-
new_id = pipeline_schedule.create_new_job()
28-
pipeline_schedule.schedule_work_id = new_id
29-
pipeline_schedule.save(update_fields=["schedule_work_id"])
20+
"""
21+
Initialize schedule jobs for active PipelineSchedule.
22+
- Create new schedule if there is no schedule for active pipeline
23+
- Create new schedule if schedule is corrupted for an active pipeline
24+
- Delete schedule for inactive pipeline
25+
"""
26+
pipeline_qs = models.PipelineSchedule.objects.order_by("created_date")
27+
for pipeline in pipeline_qs:
28+
reset_schedule = pipeline.is_active != bool(pipeline.schedule_work_id)
29+
if not scheduled_job_exists(pipeline.schedule_work_id):
30+
reset_schedule = True
31+
32+
if reset_schedule:
33+
pipeline.schedule_work_id = pipeline.create_new_job()
34+
pipeline.save(update_fields=["schedule_work_id"])
3035

3136

3237
class Command(rqscheduler.Command):

vulnerabilities/schedules.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,15 @@ def update_pipeline_schedule():
8989
from vulnerabilities.improvers import IMPROVERS_REGISTRY
9090
from vulnerabilities.models import PipelineSchedule
9191

92-
pipeline_ids = [*IMPORTERS_REGISTRY.keys(), *IMPROVERS_REGISTRY.keys()]
93-
94-
PipelineSchedule.objects.exclude(pipeline_id__in=pipeline_ids).delete()
95-
[PipelineSchedule.objects.get_or_create(pipeline_id=id) for id in pipeline_ids]
92+
pipelines = IMPORTERS_REGISTRY | IMPROVERS_REGISTRY
93+
94+
PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete()
95+
for id, pipeline_class in pipelines.items():
96+
run_once = getattr(pipeline_class, "run_once", False)
97+
98+
PipelineSchedule.objects.get_or_create(
99+
pipeline_id=id,
100+
defaults={
101+
"is_run_once": run_once,
102+
},
103+
)

vulnerabilities/tasks.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from vulnerabilities import models
1818
from vulnerabilities.importer import Importer
1919
from vulnerabilities.improver import Improver
20-
from vulnerablecode.settings import VULNERABLECODE_PIPELINE_TIMEOUT
2120

2221
logger = logging.getLogger(__name__)
2322

@@ -48,6 +47,13 @@ def execute_pipeline(pipeline_id, run_id):
4847
exitcode = 1
4948

5049
run.set_run_ended(exitcode=exitcode, output=output)
50+
51+
# Onetime pipeline are inactive after first execution.
52+
pipeline = run.pipeline
53+
if pipeline.is_run_once:
54+
pipeline.is_active = False
55+
pipeline.save()
56+
5157
logger.info("Update Run instance with exitcode, output, and end_date")
5258

5359

0 commit comments

Comments
 (0)