|
22 | 22 | logger = logging.getLogger(__name__) |
23 | 23 |
|
24 | 24 | default_queue = django_rq.get_queue("default") |
| 25 | +live_queue = django_rq.get_queue("live") |
25 | 26 |
|
26 | 27 |
|
27 | 28 | def execute_pipeline(pipeline_id, run_id, inputs=None): |
@@ -132,35 +133,59 @@ def enqueue_pipeline(pipeline_id): |
132 | 133 | ) |
133 | 134 |
|
134 | 135 |
|
135 | | -def enqueue_ad_hoc_pipeline(pipeline_id, *, inputs=None): |
136 | | - """Enqueue a one-off execution for the given pipeline_id with optional inputs. |
| 136 | +def enqueue_ad_hoc_pipeline(pipeline_ids, *, inputs=None): |
| 137 | + """Enqueue one-off executions for the given pipeline_ids with optional inputs. |
137 | 138 |
|
138 | | - Returns the created run_id or None if the pipeline cannot be enqueued. |
| 139 | + When multiple pipeline IDs are provided, this will create a single LivePipelineRun and attach |
| 140 | + each created PipelineRun to it. Returns a tuple of (live_run_id, run_ids). |
| 141 | +
|
| 142 | + If a single pipeline ID (str) is provided, it will be wrapped into a list. |
139 | 143 | """ |
| 144 | + inputs = inputs or {} |
| 145 | + # Normalize to list |
| 146 | + if isinstance(pipeline_ids, str): |
| 147 | + pipeline_ids = [pipeline_ids] |
| 148 | + |
| 149 | + # Create a LivePipelineRun to group these ad-hoc runs, if any inputs (such as purl) are given |
| 150 | + purl_val = inputs.get("purl") |
140 | 151 | try: |
141 | | - pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) |
142 | | - except models.PipelineSchedule.DoesNotExist: |
143 | | - pipeline_schedule = models.PipelineSchedule.objects.create( |
144 | | - pipeline_id=pipeline_id, |
145 | | - is_active=False, |
146 | | - ) |
| 152 | + # accept PackageURL instance as well as string |
| 153 | + purl_str = str(purl_val) if purl_val is not None else None |
| 154 | + except Exception: |
| 155 | + purl_str = None |
| 156 | + |
| 157 | + live_run = models.LivePipelineRun.objects.create(purl=purl_str) |
| 158 | + |
| 159 | + run_ids = [] |
| 160 | + for pipeline_id in pipeline_ids: |
| 161 | + try: |
| 162 | + pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) |
| 163 | + except models.PipelineSchedule.DoesNotExist: |
| 164 | + pipeline_schedule = models.PipelineSchedule.objects.create( |
| 165 | + pipeline_id=pipeline_id, |
| 166 | + is_active=False, |
| 167 | + ) |
147 | 168 |
|
148 | | - run = models.PipelineRun.objects.create(pipeline=pipeline_schedule) |
| 169 | + run = models.PipelineRun.objects.create(pipeline=pipeline_schedule, live_pipeline=live_run) |
| 170 | + |
| 171 | + # Enqueue on the dedicated live queue |
| 172 | + live_queue.enqueue( |
| 173 | + execute_pipeline, |
| 174 | + pipeline_id, |
| 175 | + run.run_id, |
| 176 | + inputs, |
| 177 | + job_id=str(run.run_id), |
| 178 | + on_failure=set_run_failure, |
| 179 | + job_timeout=f"{pipeline_schedule.execution_timeout}h", |
| 180 | + ) |
| 181 | + run_ids.append(run.run_id) |
149 | 182 |
|
150 | | - live_queue = django_rq.get_queue("live") |
151 | | - job = live_queue.enqueue( |
152 | | - execute_pipeline, |
153 | | - pipeline_id, |
154 | | - run.run_id, |
155 | | - inputs or {}, |
156 | | - job_id=str(run.run_id), |
157 | | - on_failure=set_run_failure, |
158 | | - job_timeout=f"{pipeline_schedule.execution_timeout}h", |
159 | | - ) |
160 | | - return run.run_id |
| 183 | + return live_run.run_id, run_ids |
161 | 184 |
|
162 | 185 |
|
163 | 186 | def dequeue_job(job_id): |
164 | 187 | """Remove a job from queue if it hasn't been executed yet.""" |
165 | 188 | if job_id in default_queue.jobs: |
166 | 189 | default_queue.remove(job_id) |
| 190 | + if job_id in live_queue.jobs: |
| 191 | + live_queue.remove(job_id) |
0 commit comments