33import re
44import traceback
55import zipfile
6+ import json
67from datetime import timedelta , datetime
7-
8+ from django . conf import settings
89from io import BytesIO
910from tempfile import TemporaryDirectory , NamedTemporaryFile
1011
1112import oyaml as yaml
1213import requests
1314from celery ._state import app_or_default
14- from django . conf import settings
15+ from django_redis import get_redis_connection
1516from django .core .exceptions import ObjectDoesNotExist
1617from django .core .files .base import ContentFile
1718from django .db .models import Subquery , OuterRef , Count , Case , When , Value , F
4243from asgiref .sync import async_to_sync
4344
4445import logging
45-
4646logger = logging .getLogger (__name__ )
4747
48+ r = get_redis_connection ("default" )
49+ WORKERS_REGISTRY_KEY = "compute_workers_registry"
50+ WORKER_HEARTBEAT_TTL = 35
51+
4852COMPETITION_FIELDS = [
4953 "title" ,
5054 "description" ,
@@ -914,36 +918,61 @@ def refresh_compute_worker_health():
914918 stats = inspector .stats () or {}
915919 active = inspector .active () or {}
916920 reserved = inspector .reserved () or {}
921+ active_queues = inspector .active_queues () or {}
917922 except Exception :
918923 logger .exception ("Unable to inspect Celery workers" )
919924 return
920925
921926 for worker_name in stats .keys ():
922- if not worker_name .startswith ("compute-worker" ):
927+ queues = active_queues .get (worker_name , []) or []
928+ queue_names = []
929+
930+ for q in queues :
931+ if isinstance (q , dict ) and q .get ("name" ):
932+ queue_names .append (q ["name" ])
933+
934+ is_compute_worker = (
935+ "compute-worker" in queue_names
936+ or worker_name .startswith ("compute-worker" )
937+ or worker_name .startswith ("CW" )
938+ )
939+
940+ if not is_compute_worker :
923941 continue
924942
925- raw_running_jobs = len (active .get (worker_name , [])) + len (
926- reserved .get (worker_name , [])
943+ running_jobs = (
944+ len (active .get (worker_name , []))
945+ + len (reserved .get (worker_name , []))
927946 )
928- status = "busy" if raw_running_jobs > 0 else "available"
947+ status = "busy" if running_jobs > 0 else "available"
929948
930949 payload = {
931950 "hostname" : worker_name ,
932951 "status" : status ,
933- "running_jobs" : raw_running_jobs ,
952+ "running_jobs" : running_jobs ,
934953 "timestamp" : now ().timestamp (),
935954 }
936955
937- r .set (f"worker:{ worker_name } :heartbeat" , json .dumps (payload ), ex = 35 )
956+ heartbeat_key = f"worker:{ worker_name } :heartbeat"
957+
958+ r .set (
959+ heartbeat_key ,
960+ json .dumps (payload ),
961+ ex = WORKER_HEARTBEAT_TTL ,
962+ )
963+
938964 r .hset (
939965 WORKERS_REGISTRY_KEY ,
940966 worker_name ,
941- json .dumps (
942- {
943- "hostname " : worker_name ,
944- "last_seen " : payload [ "timestamp" ] ,
945- }
946- ),
967+ json .dumps ({
968+ "hostname" : worker_name ,
969+ "status " : status ,
970+ "running_jobs " : running_jobs ,
971+ "last_seen" : payload [ "timestamp" ],
972+ } ),
947973 )
948974
949975 _broadcast_worker_state (payload )
976+ logger .info (
977+ f"[WORKER-HEALTH] { worker_name } status={ status } jobs={ running_jobs } "
978+ )
0 commit comments