Skip to content

Commit 274d7ac

Browse files
committed
multiple routing queues OK
1 parent c16d623 commit 274d7ac

File tree

1 file changed

+160
-78
lines changed

1 file changed

+160
-78
lines changed

src/apps/competitions/tasks.py

Lines changed: 160 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@
116116
]
117117
MAX_EXECUTION_TIME_LIMIT = int(os.environ.get('MAX_EXECUTION_TIME_LIMIT', 600)) # time limit of the default queue
118118

119-
def _send_to_compute_worker(submission, is_scoring):
119+
120+
def _send_to_compute_worker(submission, is_scoring, target_group=None):
120121
logger.info("Site Worker ==> STARTING")
121122

122123
run_args = {
@@ -130,25 +131,19 @@ def _send_to_compute_worker(submission, is_scoring):
130131
}
131132

132133
if not submission.detailed_result.name and submission.phase.competition.enable_detailed_results:
133-
submission.detailed_result.save('detailed_results.html', ContentFile(''.encode())) # must encode here for GCS
134+
submission.detailed_result.save('detailed_results.html', ContentFile(''.encode()))
134135
submission.save(update_fields=['detailed_result'])
135136
if not submission.prediction_result.name:
136-
submission.prediction_result.save('prediction_result.zip', ContentFile(''.encode())) # must encode here for GCS
137+
submission.prediction_result.save('prediction_result.zip', ContentFile(''.encode()))
137138
submission.save(update_fields=['prediction_result'])
138139
if not submission.scoring_result.name:
139-
submission.scoring_result.save('scoring_result.zip', ContentFile(''.encode())) # must encode here for GCS
140+
submission.scoring_result.save('scoring_result.zip', ContentFile(''.encode()))
140141
submission.save(update_fields=['scoring_result'])
141142

142143
submission = Submission.objects.get(id=submission.id)
143144
task = submission.task
144145

145-
# priority of scoring tasks is higher, we don't want to wait around for
146-
# many submissions to be scored while we're waiting for results
147-
if is_scoring:
148-
# higher numbers are higher priority
149-
priority = 10
150-
else:
151-
priority = 0
146+
priority = 10 if is_scoring else 0
152147

153148
if not is_scoring:
154149
run_args['prediction_result'] = make_url_sassy(
@@ -196,91 +191,91 @@ def _send_to_compute_worker(submission, is_scoring):
196191
run_args[detail_name] = create_detailed_output_file(detail_name, submission)
197192

198193
logger.info(f"Task data for submission id = {submission.id}")
199-
logger.info(run_args)
194+
logger.debug(run_args)
200195

201196
# Pad timelimit so worker has time to cleanup
202197
time_padding = 60 * 20 # 20 minutes
203198
time_limit = submission.phase.execution_time_limit + time_padding
204199

200+
# Determine routing: prefer explicitly passed target_group, else fallback to competition/group resolution
201+
target_vhost = None
205202
try:
206-
competition = submission.phase.competition
207-
208-
user_group_ids = list(submission.owner.groups.values_list("id", flat=True))
209-
logger.debug("User %s groups ids: %s", submission.owner.pk, user_group_ids)
210-
211-
comp_user_groups_qs = (
212-
competition.participant_groups
213-
.select_related("queue")
214-
.filter(id__in=user_group_ids)
215-
)
216-
217-
group = comp_user_groups_qs.filter(queue__isnull=False).first() or comp_user_groups_qs.first()
218-
219-
if group:
220-
logger.info(
221-
"Submission %s candidate group(s) in competition %s: chosen group=%s queue=%s",
222-
submission.pk,
223-
competition.pk,
224-
group.pk,
225-
getattr(group.queue, "name", None),
226-
)
227-
228-
if group.queue:
229-
run_args["queue"] = group.queue.name
230-
competition.queue = group.queue
231-
logger.info(f"Group Found = {group.name}")
232-
203+
if target_group:
204+
if getattr(target_group, 'queue', None):
205+
run_args['queue'] = target_group.queue.name
206+
target_vhost = getattr(target_group.queue, 'vhost', None)
207+
logger.info("Submission %s forced to group %s queue=%s", submission.pk,
208+
getattr(target_group, 'pk', None), run_args.get('queue'))
233209
else:
234-
logger.debug(
235-
"Submission %s owner %s: no intersection between user's groups %s and competition %s participant_groups",
236-
submission.pk,
237-
submission.owner.pk,
238-
user_group_ids,
239-
competition.pk,
210+
# Legacy behavior
211+
competition = submission.phase.competition
212+
user_group_ids = list(submission.owner.groups.values_list("id", flat=True))
213+
logger.debug("User %s groups ids: %s", submission.owner.pk, user_group_ids)
214+
215+
comp_user_groups_qs = (
216+
competition.participant_groups
217+
.select_related("queue")
218+
.filter(id__in=user_group_ids)
240219
)
241220

221+
group = comp_user_groups_qs.filter(queue__isnull=False).first() or comp_user_groups_qs.first()
222+
if group and group.queue:
223+
run_args["queue"] = group.queue.name
224+
target_vhost = getattr(group.queue, "vhost", None)
225+
logger.info("Submission %s chosen group=%s queue=%s", submission.pk, group.pk, group.queue.name)
226+
else:
227+
logger.debug("Submission %s owner %s: no matching group with queue for competition %s",
228+
submission.pk, submission.owner.pk, competition.pk)
242229
except Exception:
243230
logger.exception("Error while resolving competition/group for submission %s", submission.pk)
244231

232+
# If no group vhost, fallback to competition-level queue vhost
233+
if target_vhost is None:
234+
comp_queue = getattr(submission.phase.competition, 'queue', None)
235+
if comp_queue:
236+
run_args['queue'] = getattr(comp_queue, 'name', None)
237+
target_vhost = getattr(comp_queue, 'vhost', None)
245238

246-
if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue
247-
submission.queue_name = submission.phase.competition.queue.name or ''
248-
run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit
249-
submission.save()
250-
251-
# Send to special queue? Using `celery_app` var name here since we'd be overriding the imported `app`
252-
# variable above
253-
celery_app = app_or_default()
254-
with celery_app.connection() as new_connection:
255-
new_connection.virtual_host = str(submission.phase.competition.queue.vhost)
256-
task = celery_app.send_task(
239+
# Send the task to the compute-worker
240+
task_obj = None
241+
try:
242+
if target_vhost:
243+
celery_app = app_or_default()
244+
with celery_app.connection() as new_connection:
245+
new_connection.virtual_host = str(target_vhost)
246+
task_obj = celery_app.send_task(
247+
'compute_worker_run',
248+
args=(run_args,),
249+
queue='compute-worker',
250+
soft_time_limit=time_limit,
251+
connection=new_connection,
252+
priority=priority,
253+
)
254+
else:
255+
task_obj = app.send_task(
257256
'compute_worker_run',
258257
args=(run_args,),
259258
queue='compute-worker',
260259
soft_time_limit=time_limit,
261-
connection=new_connection,
262260
priority=priority,
263261
)
264-
else:
265-
task = app.send_task(
266-
'compute_worker_run',
267-
args=(run_args,),
268-
queue='compute-worker',
269-
soft_time_limit=time_limit,
270-
priority=priority,
271-
)
272-
submission.celery_task_id = task.id
262+
except Exception:
263+
logger.exception("Failed to enqueue compute_worker_run for submission %s", submission.pk)
264+
task_obj = None
265+
266+
if task_obj:
267+
submission.celery_task_id = getattr(task_obj, 'id', None)
273268

274269
if submission.status == Submission.SUBMITTING:
275-
# Don't want to mark an already-prepared submission as "submitted" again, so
276-
# only do this if we were previously "SUBMITTING"
277270
submission.status = Submission.SUBMITTED
278271

279-
submission.save()
272+
try:
273+
submission.save()
274+
except Exception:
275+
logger.exception("Failed to save submission after enqueue for submission %s", submission.pk)
280276

281277

282278
def create_detailed_output_file(detail_name, submission):
283-
# Detail logs like stdout/etc.
284279
new_details = SubmissionDetails.objects.create(submission=submission, name=detail_name)
285280
new_details.data_file.save(f'{detail_name}.txt', ContentFile(''.encode())) # must encode here for GCS
286281
return make_url_sassy(new_details.data_file.name, permission="w")
@@ -346,6 +341,65 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False):
346341

347342
tasks = tasks.order_by('pk')
348343

344+
'''New section for group submission child '''
345+
try:
346+
assigned_group = None
347+
348+
if submission.parent is None:
349+
groups_with_queue_qs = resolve_competition_groups(submission)
350+
groups_with_queue = list(groups_with_queue_qs)
351+
352+
if len(groups_with_queue) > 1:
353+
submission.has_children = True
354+
submission.save()
355+
send_parent_status(submission)
356+
357+
# If there are multiple tasks, create one child per (group, task),
358+
for group in groups_with_queue:
359+
if len(tasks) > 1:
360+
for task_obj in tasks:
361+
child_sub = Submission(
362+
owner=submission.owner,
363+
phase=submission.phase,
364+
data=submission.data,
365+
participant=submission.participant,
366+
parent=submission,
367+
task=task_obj,
368+
fact_sheet_answers=submission.fact_sheet_answers
369+
)
370+
child_sub.save(ignore_submission_limit=True)
371+
send_child_id(submission, child_sub.id)
372+
373+
try:
374+
_send_to_compute_worker(child_sub, is_scoring, target_group=group)
375+
except Exception:
376+
logger.exception("Failed to send child submission %s to compute worker for group %s", child_sub.pk, getattr(group, 'pk', None))
377+
else:
378+
child_sub = Submission(
379+
owner=submission.owner,
380+
phase=submission.phase,
381+
data=submission.data,
382+
participant=submission.participant,
383+
parent=submission,
384+
task=tasks[0],
385+
fact_sheet_answers=submission.fact_sheet_answers
386+
)
387+
child_sub.save(ignore_submission_limit=True)
388+
send_child_id(submission, child_sub.id)
389+
try:
390+
_send_to_compute_worker(child_sub, is_scoring, target_group=group)
391+
except Exception:
392+
logger.exception("Failed to send child submission %s to compute worker for group %s", child_sub.pk, getattr(group, 'pk', None))
393+
return
394+
395+
if len(groups_with_queue) == 1:
396+
assigned_group = groups_with_queue[0]
397+
398+
except Exception:
399+
logger.exception("Error resolving participant groups for submission %s", submission.pk)
400+
'''END BLOCK'''
401+
402+
349403
if len(tasks) > 1:
350404
# The initial submission object becomes the parent submission and we create children for each task
351405
submission.has_children = True
@@ -361,19 +415,34 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False):
361415
data=submission.data,
362416
participant=submission.participant,
363417
parent=submission,
364-
task=task,
418+
task=task[0],
365419
fact_sheet_answers=submission.fact_sheet_answers
366420
)
367421
child_sub.save(ignore_submission_limit=True)
368-
_send_to_compute_worker(child_sub, is_scoring=False)
422+
_send_to_compute_worker(child_sub, is_scoring=False, target_group=assigned_group)
369423
send_child_id(submission, child_sub.id)
370-
else:
371-
# The initial submission object is the only submission
372-
if not submission.task:
373-
submission.task = tasks[0]
374-
submission.save()
375-
_send_to_compute_worker(submission, is_scoring)
424+
else:
425+
child_sub = Submission(
426+
owner=submission.owner,
427+
phase=submission.phase,
428+
data=submission.data,
429+
participant=submission.participant,
430+
parent=submission,
431+
task=tasks[0],
432+
fact_sheet_answers=submission.fact_sheet_answers
433+
)
434+
child_sub.save(ignore_submission_limit=True)
376435

436+
send_child_id(submission, child_sub.id)
437+
438+
try:
439+
_send_to_compute_worker(child_sub, is_scoring, target_group=group)
440+
except Exception:
441+
logger.exception(
442+
"Failed to send child submission %s to compute worker for group %s",
443+
child_sub.pk,
444+
getattr(group, 'pk', None)
445+
)
377446

378447
@app.task(queue='site-worker', soft_time_limit=60 * 60) # 1 hour timeout
379448
def unpack_competition(status_pk):
@@ -822,3 +891,16 @@ def submission_status_cleanup():
822891
sub.parent.cancel(status=Submission.FAILED)
823892
else:
824893
sub.cancel(status=Submission.FAILED)
894+
895+
896+
def resolve_competition_groups(submission):
897+
competition = submission.phase.competition
898+
899+
user_group_ids = submission.owner.groups.values_list("id", flat=True)
900+
901+
return (
902+
competition.participant_groups
903+
.select_related("queue")
904+
.filter(id__in=user_group_ids)
905+
.exclude(queue__isnull=True)
906+
)

0 commit comments

Comments
 (0)