Skip to content

Commit a147b95

Browse files
committed
feature ok / needs to be tested
1 parent dcb58c2 commit a147b95

File tree

6 files changed

+827
-29
lines changed

6 files changed

+827
-29
lines changed

src/apps/competitions/tasks.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
from utils.data import make_url_sassy
3333
from utils.email import codalab_send_markdown_email
3434

35+
from channels.layers import get_channel_layer
36+
from asgiref.sync import async_to_sync
37+
3538
import logging
3639
logger = logging.getLogger(__name__)
3740

@@ -784,9 +787,66 @@ def submission_status_cleanup():
784787
submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent')
785788

786789
for sub in submissions:
787-
# Check if the submission has been running for 24 hours longer than execution_time_limit
788790
if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit):
789791
if sub.parent is not None:
790792
sub.parent.cancel(status=Submission.FAILED)
791793
else:
792794
sub.cancel(status=Submission.FAILED)
795+
796+
797+
def _broadcast_worker_state(payload):
798+
channel_layer = get_channel_layer()
799+
if not channel_layer:
800+
return
801+
802+
async_to_sync(channel_layer.group_send)(
803+
"compute_workers",
804+
{
805+
"type": "worker.health",
806+
"worker": payload,
807+
},
808+
)
809+
810+
811+
@app.task(queue="site-worker", soft_time_limit=60)
812+
def refresh_compute_worker_health():
813+
celery_app = app_or_default()
814+
inspector = celery_app.control.inspect(timeout=1)
815+
816+
if inspector is None:
817+
logger.warning("Celery inspect returned None")
818+
return
819+
820+
try:
821+
stats = inspector.stats() or {}
822+
active = inspector.active() or {}
823+
reserved = inspector.reserved() or {}
824+
except Exception:
825+
logger.exception("Unable to inspect Celery workers")
826+
return
827+
828+
for worker_name in stats.keys():
829+
if not worker_name.startswith("compute-worker"):
830+
continue
831+
832+
raw_running_jobs = len(active.get(worker_name, [])) + len(reserved.get(worker_name, []))
833+
status = "busy" if raw_running_jobs > 0 else "available"
834+
835+
payload = {
836+
"hostname": worker_name,
837+
"status": status,
838+
"running_jobs": raw_running_jobs,
839+
"timestamp": now().timestamp(),
840+
}
841+
842+
r.set(f"worker:{worker_name}:heartbeat", json.dumps(payload), ex=35)
843+
r.hset(
844+
WORKERS_REGISTRY_KEY,
845+
worker_name,
846+
json.dumps({
847+
"hostname": worker_name,
848+
"last_seen": payload["timestamp"],
849+
}),
850+
)
851+
852+
_broadcast_worker_state(payload)

src/celery_config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,12 @@ def app_for_vhost(vhost):
3636
vhost_app.conf.task_queues = app.conf.task_queues
3737
_vhost_apps[vhost] = vhost_app
3838
return _vhost_apps[vhost]
39+
40+
41+
42+
app.conf.beat_schedule = {
43+
"refresh-compute-worker-health": {
44+
"task": "chemin.vers.refresh_compute_worker_health",
45+
"schedule": 5.0,
46+
},
47+
}

src/routing.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from django.urls import re_path
22
from apps.competitions.consumers import SubmissionIOConsumer, SubmissionOutputConsumer
3+
from utils.consumers import ComputeWorkersConsumer
4+
35

46
websocket_urlpatterns = [
57
re_path(r'submission_input/(?P<user_pk>\d+)/(?P<submission_id>\d+)/(?P<secret>[^/]+)/$', SubmissionIOConsumer.as_asgi()),
68
re_path(r'submission_output/$', SubmissionOutputConsumer.as_asgi()),
9+
re_path(r"ws/workers/$", ComputeWorkersConsumer.as_asgi()),
710
]

0 commit comments

Comments
 (0)