Skip to content

Commit 78bed50

Browse files
fix(admin): use redis.from_url() consistently for queue inspection and purge
- Replaced Kombu channel.client with redis.from_url() in both get_celery_queue_details() and purge_celery_queue_by_task_name() so both functions use the same connection mechanism - Batched pipeline approach for per-task purge to avoid hitting Valkey's max query buffer limit on large queues
1 parent 557ad0d commit 78bed50

1 file changed

Lines changed: 34 additions & 35 deletions

File tree

dojo/utils.py

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,41 +1233,40 @@ def get_celery_queue_details():
12331233
"""Per-task breakdown of the celery queue. O(N) — expensive for large queues."""
12341234
tasks = {}
12351235
try:
1236-
with Connection(settings.CELERY_BROKER_URL) as conn, conn.channel() as channel:
1237-
messages_raw = channel.client.lrange("celery", 0, -1)
1238-
for i, msg_raw in enumerate(messages_raw):
1239-
try:
1240-
msg = json.loads(msg_raw)
1241-
headers = msg.get("headers", {})
1242-
task_name = headers.get("task", "unknown")
1243-
eta = headers.get("eta")
1244-
expires = headers.get("expires")
1245-
except Exception:
1246-
task_name = "unknown"
1247-
eta = None
1248-
expires = None
1249-
if task_name not in tasks:
1250-
tasks[task_name] = {
1251-
"task_name": task_name,
1252-
"count": 0,
1253-
"oldest_position": i + 1,
1254-
"newest_position": i + 1,
1255-
"oldest_eta": eta,
1256-
"newest_eta": eta,
1257-
"earliest_expires": expires,
1258-
"latest_expires": expires,
1259-
}
1260-
else:
1261-
tasks[task_name]["newest_position"] = i + 1
1262-
tasks[task_name]["newest_eta"] = eta
1263-
# ISO 8601 strings are lexicographically sortable
1264-
if expires is not None:
1265-
cur = tasks[task_name]
1266-
if cur["earliest_expires"] is None or expires < cur["earliest_expires"]:
1267-
cur["earliest_expires"] = expires
1268-
if cur["latest_expires"] is None or expires > cur["latest_expires"]:
1269-
cur["latest_expires"] = expires
1270-
tasks[task_name]["count"] += 1
1236+
client = redis_lib.from_url(settings.CELERY_BROKER_URL)
1237+
for i, msg_raw in enumerate(client.lrange("celery", 0, -1)):
1238+
try:
1239+
msg = json.loads(msg_raw)
1240+
headers = msg.get("headers", {})
1241+
task_name = headers.get("task", "unknown")
1242+
eta = headers.get("eta")
1243+
expires = headers.get("expires")
1244+
except Exception:
1245+
task_name = "unknown"
1246+
eta = None
1247+
expires = None
1248+
if task_name not in tasks:
1249+
tasks[task_name] = {
1250+
"task_name": task_name,
1251+
"count": 0,
1252+
"oldest_position": i + 1,
1253+
"newest_position": i + 1,
1254+
"oldest_eta": eta,
1255+
"newest_eta": eta,
1256+
"earliest_expires": expires,
1257+
"latest_expires": expires,
1258+
}
1259+
else:
1260+
tasks[task_name]["newest_position"] = i + 1
1261+
tasks[task_name]["newest_eta"] = eta
1262+
# ISO 8601 strings are lexicographically sortable
1263+
if expires is not None:
1264+
cur = tasks[task_name]
1265+
if cur["earliest_expires"] is None or expires < cur["earliest_expires"]:
1266+
cur["earliest_expires"] = expires
1267+
if cur["latest_expires"] is None or expires > cur["latest_expires"]:
1268+
cur["latest_expires"] = expires
1269+
tasks[task_name]["count"] += 1
12711270
except Exception:
12721271
return None
12731272
return sorted(tasks.values(), key=operator.itemgetter("oldest_position"))

0 commit comments

Comments
 (0)