3535from rest_framework .parsers import MultiPartParser
3636from rest_framework .permissions import DjangoModelPermissions , IsAuthenticated
3737from rest_framework .response import Response
38- from rest_framework .views import APIView
3938
4039import dojo .finding .helper as finding_helper
4140import dojo .jira_link .helper as jira_helper
@@ -3266,20 +3265,21 @@ def get_queryset(self):
32663265 return System_Settings .objects .all ().order_by ("id" )
32673266
32683267
3269- @extend_schema (
3270- responses = serializers .CeleryStatusSerializer ,
3271- summary = "Get Celery worker and queue status" ,
3272- description = (
3273- "Returns Celery worker liveness, pending queue length, and the active task "
3274- "timeout/expiry configuration. Uses the Celery control channel (pidbox) for "
3275- "worker status so it works correctly even when the task queue is clogged."
3276- ),
3277- )
3278- class CeleryStatusView (APIView ):
3268+ class CeleryViewSet (viewsets .ViewSet ):
32793269 permission_classes = (permissions .IsSuperUser , DjangoModelPermissions )
32803270 queryset = System_Settings .objects .none ()
32813271
3282- def get (self , request ):
3272+ @extend_schema (
3273+ responses = serializers .CeleryStatusSerializer ,
3274+ summary = "Get Celery worker and queue status" ,
3275+ description = (
3276+ "Returns Celery worker liveness, pending queue length, and the active task "
3277+ "timeout/expiry configuration. Uses the Celery control channel (pidbox) for "
3278+ "worker status so it works correctly even when the task queue is clogged."
3279+ ),
3280+ )
3281+ @action (detail = False , methods = ["get" ], url_path = "status" )
3282+ def status (self , request ):
32833283 queue_length = get_celery_queue_length ()
32843284 data = {
32853285 "worker_status" : get_celery_worker_status (),
@@ -3291,56 +3291,44 @@ def get(self, request):
32913291 }
32923292 return Response (serializers .CeleryStatusSerializer (data ).data )
32933293
3294-
3295- @extend_schema (
3296- request = None ,
3297- responses = {200 : {"type" : "object" , "properties" : {"purged" : {"type" : "integer" }}}},
3298- summary = "Purge all pending Celery tasks from the queue" ,
3299- description = (
3300- "Removes all pending tasks from the default Celery queue. Tasks already being "
3301- "executed by workers are not affected. Note: if deduplication tasks were queued, "
3302- "you may need to re-run deduplication manually via `python manage.py dedupe`."
3303- ),
3304- )
3305- class CeleryQueuePurgeView (APIView ):
3306- permission_classes = (permissions .IsSuperUser , DjangoModelPermissions )
3307- queryset = System_Settings .objects .none ()
3308-
3309- def post (self , request ):
3294+ @extend_schema (
3295+ request = None ,
3296+ responses = {200 : {"type" : "object" , "properties" : {"purged" : {"type" : "integer" }}}},
3297+ summary = "Purge all pending Celery tasks from the queue" ,
3298+ description = (
3299+ "Removes all pending tasks from the default Celery queue. Tasks already being "
3300+ "executed by workers are not affected. Note: if deduplication tasks were queued, "
3301+ "you may need to re-run deduplication manually via `python manage.py dedupe`."
3302+ ),
3303+ )
3304+ @action (detail = False , methods = ["post" ], url_path = "queue/purge" )
3305+ def queue_purge (self , request ):
33103306 purged = purge_celery_queue ()
33113307 return Response ({"purged" : purged })
33123308
3313-
3314- @extend_schema (
3315- responses = serializers .CeleryQueueTaskDetailSerializer (many = True ),
3316- summary = "Get per-task breakdown of the Celery queue" ,
3317- description = (
3318- "Scans every message in the queue (O(N)) and returns task name, count, and "
3319- "oldest/newest queue positions. May be slow for large queues."
3320- ),
3321- )
3322- class CeleryQueueDetailsView (APIView ):
3323- permission_classes = (permissions .IsSuperUser , DjangoModelPermissions )
3324- queryset = System_Settings .objects .none ()
3325-
3326- def get (self , request ):
3309+ @extend_schema (
3310+ responses = serializers .CeleryQueueTaskDetailSerializer (many = True ),
3311+ summary = "Get per-task breakdown of the Celery queue" ,
3312+ description = (
3313+ "Scans every message in the queue (O(N)) and returns task name, count, and "
3314+ "oldest/newest queue positions. May be slow for large queues."
3315+ ),
3316+ )
3317+ @action (detail = False , methods = ["get" ], url_path = "queue/details" )
3318+ def queue_details (self , request ):
33273319 details = get_celery_queue_details ()
33283320 if details is None :
33293321 return Response ({"error" : "Unable to read queue details." }, status = 503 )
33303322 return Response (serializers .CeleryQueueTaskDetailSerializer (details , many = True ).data )
33313323
3332-
3333- @extend_schema (
3334- request = {"application/json" : {"type" : "object" , "properties" : {"task_name" : {"type" : "string" }}, "required" : ["task_name" ]}},
3335- responses = {200 : {"type" : "object" , "properties" : {"purged" : {"type" : "integer" }}}},
3336- summary = "Purge all queued tasks with a given task name" ,
3337- description = "Removes all pending tasks matching the given task name from the default Celery queue." ,
3338- )
3339- class CeleryQueueTaskPurgeView (APIView ):
3340- permission_classes = (permissions .IsSuperUser , DjangoModelPermissions )
3341- queryset = System_Settings .objects .none ()
3342-
3343- def post (self , request ):
3324+ @extend_schema (
3325+ request = {"application/json" : {"type" : "object" , "properties" : {"task_name" : {"type" : "string" }}, "required" : ["task_name" ]}},
3326+ responses = {200 : {"type" : "object" , "properties" : {"purged" : {"type" : "integer" }}}},
3327+ summary = "Purge all queued tasks with a given task name" ,
3328+ description = "Removes all pending tasks matching the given task name from the default Celery queue." ,
3329+ )
3330+ @action (detail = False , methods = ["post" ], url_path = "queue/task/purge" )
3331+ def queue_task_purge (self , request ):
33443332 task_name = request .data .get ("task_name" , "" ).strip ()
33453333 if not task_name :
33463334 return Response ({"error" : "task_name is required." }, status = 400 )
0 commit comments