Skip to content

Commit 4203d8a

Browse files
committed
feat: implement admin API monitoring endpoints
Implement real database queries for admin API monitoring: - system_status() - returns actual queue statistics from database - list_queues() - discovers queues dynamically - queue_stats() - returns real-time stats for specific queue Queries _private_jobs table for: - pending jobs (not locked, ready to run) - active jobs (currently locked by workers) - failed jobs (attempts >= max_attempts) - completed_jobs returns 0 (GraphileWorker deletes completed jobs)
1 parent 9d36602 commit 4203d8a

2 files changed

Lines changed: 190 additions & 102 deletions

File tree

docs/implementation/incomplete-features.md

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -61,52 +61,7 @@ These are all feature-gated behind the `axum` feature flag.
6161

6262
**Notes:** May need to research GraphileWorker's cancellation mechanism
6363

64-
### 4. Admin API: system_status()
65-
**Location:** `src/admin.rs:185-213`
66-
67-
**Current State:** Returns hardcoded static data with all zeros
68-
69-
**What Needs to be Implemented:**
70-
- Query database for actual queue statistics
71-
- Calculate pending_jobs, active_jobs, completed_jobs, failed_jobs per queue
72-
73-
**Endpoint:** GET /status
74-
75-
**Current Return:**
76-
```rust
77-
QueueStatus {
78-
queue_name: "fast" | "bulk",
79-
pending_jobs: 0, // hardcoded
80-
active_jobs: 0, // hardcoded
81-
completed_jobs: 0, // hardcoded
82-
failed_jobs: 0, // hardcoded
83-
}
84-
```
85-
86-
### 5. Admin API: list_queues()
87-
**Location:** `src/admin.rs:321-346`
88-
89-
**Current State:** Returns hardcoded list of "fast" and "bulk" queues with zero counts
90-
91-
**What Needs to be Implemented:**
92-
- Query database for actual queue list (dynamic queue discovery)
93-
- Get real statistics for each queue
94-
- Consider custom queues beyond just "fast" and "bulk"
95-
96-
**Endpoint:** GET /queues
97-
98-
### 6. Admin API: queue_stats()
99-
**Location:** `src/admin.rs:349-368`
100-
101-
**Current State:** Returns hardcoded zeros for all metrics
102-
103-
**What Needs to be Implemented:**
104-
- Query database for specific queue statistics
105-
- Return actual counts for pending/active/completed/failed jobs
106-
107-
**Endpoint:** GET /queues/:queue_name/stats
108-
109-
### 7. Admin API: dlq_cleanup()
64+
### 4. Admin API: dlq_cleanup()
11065
**Location:** `src/admin.rs:545-562`
11166

11267
**Current State:** Returns HTTP 501 NOT_IMPLEMENTED
@@ -204,12 +159,12 @@ pub(crate) struct JobHandlerConfig {
204159
- WorkerRunner::process_available_jobs() - batch processing use case
205160

206161
**Medium Priority (admin API features):**
207-
- 6 admin endpoints returning stub/static data
162+
- 3 admin endpoints returning stub/static data
208163
- All feature-gated, so not blocking main library use
209164

210165
**Low Priority (workarounds exist):**
211166
- 1 known limitation with acceptable workaround
212167
- 2 documentation TODOs
213168
- 1 dead code cleanup
214169

215-
**Total Items:** 11
170+
**Total Items:** 8

src/admin.rs

Lines changed: 187 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,129 @@ pub struct SystemStatus {
8585
pub total_jobs: i64,
8686
}
8787

88+
/// Get statistics for a specific queue from the database
89+
async fn get_queue_stats(
90+
pool: &sqlx::PgPool,
91+
schema: &str,
92+
queue_name: &str,
93+
) -> Result<QueueStatus, (StatusCode, String)> {
94+
// Query for pending jobs (not locked, ready to run)
95+
let pending_query = format!(
96+
r#"
97+
SELECT COUNT(*)
98+
FROM {}._private_jobs j
99+
LEFT JOIN {}._private_job_queues q ON j.job_queue_id = q.id
100+
WHERE COALESCE(q.queue_name, 'default') = $1
101+
AND j.locked_at IS NULL
102+
AND j.run_at <= NOW()
103+
AND j.attempts < j.max_attempts
104+
"#,
105+
schema, schema
106+
);
107+
108+
let pending_jobs = sqlx::query_scalar::<_, i64>(&pending_query)
109+
.bind(queue_name)
110+
.fetch_one(pool)
111+
.await
112+
.map_err(|e| {
113+
(
114+
StatusCode::INTERNAL_SERVER_ERROR,
115+
format!("Failed to query pending jobs: {}", e),
116+
)
117+
})?;
118+
119+
// Query for active jobs (currently locked by workers)
120+
let active_query = format!(
121+
r#"
122+
SELECT COUNT(*)
123+
FROM {}._private_jobs j
124+
LEFT JOIN {}._private_job_queues q ON j.job_queue_id = q.id
125+
WHERE COALESCE(q.queue_name, 'default') = $1
126+
AND j.locked_at IS NOT NULL
127+
"#,
128+
schema, schema
129+
);
130+
131+
let active_jobs = sqlx::query_scalar::<_, i64>(&active_query)
132+
.bind(queue_name)
133+
.fetch_one(pool)
134+
.await
135+
.map_err(|e| {
136+
(
137+
StatusCode::INTERNAL_SERVER_ERROR,
138+
format!("Failed to query active jobs: {}", e),
139+
)
140+
})?;
141+
142+
// Query for failed jobs (attempts >= max_attempts)
143+
let failed_query = format!(
144+
r#"
145+
SELECT COUNT(*)
146+
FROM {}._private_jobs j
147+
LEFT JOIN {}._private_job_queues q ON j.job_queue_id = q.id
148+
WHERE COALESCE(q.queue_name, 'default') = $1
149+
AND j.attempts >= j.max_attempts
150+
AND j.max_attempts > 0
151+
"#,
152+
schema, schema
153+
);
154+
155+
let failed_jobs = sqlx::query_scalar::<_, i64>(&failed_query)
156+
.bind(queue_name)
157+
.fetch_one(pool)
158+
.await
159+
.map_err(|e| {
160+
(
161+
StatusCode::INTERNAL_SERVER_ERROR,
162+
format!("Failed to query failed jobs: {}", e),
163+
)
164+
})?;
165+
166+
Ok(QueueStatus {
167+
queue_name: queue_name.to_string(),
168+
pending_jobs,
169+
active_jobs,
170+
completed_jobs: 0, // GraphileWorker deletes completed jobs - use metrics system
171+
failed_jobs,
172+
})
173+
}
174+
175+
/// Get all queue names from the database
176+
async fn get_all_queue_names(pool: &sqlx::PgPool, schema: &str) -> Result<Vec<String>, (StatusCode, String)> {
177+
let query = format!(
178+
"SELECT DISTINCT queue_name FROM {}._private_job_queues ORDER BY queue_name",
179+
schema
180+
);
181+
182+
let queue_names = sqlx::query_scalar::<_, String>(&query)
183+
.fetch_all(pool)
184+
.await
185+
.map_err(|e| {
186+
(
187+
StatusCode::INTERNAL_SERVER_ERROR,
188+
format!("Failed to query queue names: {}", e),
189+
)
190+
})?;
191+
192+
// Include "default" if there are any jobs without a queue
193+
let has_default_jobs_query = format!(
194+
"SELECT EXISTS(SELECT 1 FROM {}._private_jobs WHERE job_queue_id IS NULL)",
195+
schema
196+
);
197+
198+
let has_default_jobs = sqlx::query_scalar::<_, bool>(&has_default_jobs_query)
199+
.fetch_one(pool)
200+
.await
201+
.unwrap_or(false);
202+
203+
let mut all_queues = queue_names;
204+
if has_default_jobs && !all_queues.contains(&"default".to_string()) {
205+
all_queues.insert(0, "default".to_string());
206+
}
207+
208+
Ok(all_queues)
209+
}
210+
88211
/// Job enqueueing request
89212
#[derive(Deserialize)]
90213
pub struct EnqueueJobRequest {
@@ -181,35 +304,51 @@ async fn health_check() -> Result<Json<HealthResponse>, StatusCode> {
181304
Ok(Json(response))
182305
}
183306

184-
/// System status endpoint - GET /status
307+
/// System status endpoint - GET /status
185308
async fn system_status<S>(State(state): State<S>) -> Result<Json<SystemStatus>, (StatusCode, Json<ErrorResponse>)>
186309
where
187310
S: BackfillAdminState,
188311
{
189-
let _client = state.backfill_client();
312+
let client = state.backfill_client();
313+
let pool = client.pool();
314+
let schema = client.schema();
315+
316+
// Get all queue names
317+
let queue_names = get_all_queue_names(pool, schema)
318+
.await
319+
.map_err(|(status, msg)| (status, Json(ErrorResponse::new(msg, "QUERY_ERROR"))))?;
320+
321+
// Get stats for each queue
322+
let mut queue_stats = Vec::new();
323+
for queue_name in queue_names {
324+
let stats = get_queue_stats(pool, schema, &queue_name)
325+
.await
326+
.map_err(|(status, msg)| (status, Json(ErrorResponse::new(msg, "QUERY_ERROR"))))?;
327+
queue_stats.push(stats);
328+
}
329+
330+
// Get DLQ stats
331+
let dlq_stats = client.dlq_stats().await.map_err(|e| {
332+
(
333+
StatusCode::INTERNAL_SERVER_ERROR,
334+
Json(ErrorResponse::new(
335+
format!("Failed to get DLQ stats: {}", e),
336+
"DLQ_ERROR",
337+
)),
338+
)
339+
})?;
340+
341+
// Calculate total jobs across all queues
342+
let total_jobs: i64 = queue_stats
343+
.iter()
344+
.map(|q| q.pending_jobs + q.active_jobs + q.failed_jobs)
345+
.sum();
190346

191-
// For now, return a basic status. In a real implementation, you'd query
192-
// the database for actual queue statistics
193347
let status = SystemStatus {
194-
queues: vec![
195-
QueueStatus {
196-
queue_name: "fast".to_string(),
197-
pending_jobs: 0,
198-
active_jobs: 0,
199-
completed_jobs: 0,
200-
failed_jobs: 0,
201-
},
202-
QueueStatus {
203-
queue_name: "bulk".to_string(),
204-
pending_jobs: 0,
205-
active_jobs: 0,
206-
completed_jobs: 0,
207-
failed_jobs: 0,
208-
},
209-
],
348+
queues: queue_stats,
210349
dlq_enabled: true,
211-
dlq_job_count: 0,
212-
total_jobs: 0,
350+
dlq_job_count: dlq_stats.total_jobs as i64,
351+
total_jobs,
213352
};
214353

215354
info!("Retrieved system status: {} queues", status.queues.len());
@@ -329,50 +468,44 @@ where
329468
}
330469

331470
/// List queues endpoint - GET /queues
332-
async fn list_queues<S>(State(_state): State<S>) -> Result<Json<Vec<QueueStatus>>, (StatusCode, Json<ErrorResponse>)>
471+
async fn list_queues<S>(State(state): State<S>) -> Result<Json<Vec<QueueStatus>>, (StatusCode, Json<ErrorResponse>)>
333472
where
334473
S: BackfillAdminState,
335474
{
336-
// This would query the actual queue statistics from the database
337-
// For now, return static data
338-
let queues = vec![
339-
QueueStatus {
340-
queue_name: "fast".to_string(),
341-
pending_jobs: 0,
342-
active_jobs: 0,
343-
completed_jobs: 0,
344-
failed_jobs: 0,
345-
},
346-
QueueStatus {
347-
queue_name: "bulk".to_string(),
348-
pending_jobs: 0,
349-
active_jobs: 0,
350-
completed_jobs: 0,
351-
failed_jobs: 0,
352-
},
353-
];
354-
355-
info!("Listed {} queues", queues.len());
356-
Ok(Json(queues))
475+
let client = state.backfill_client();
476+
let pool = client.pool();
477+
let schema = client.schema();
478+
479+
// Get all queue names
480+
let queue_names = get_all_queue_names(pool, schema)
481+
.await
482+
.map_err(|(status, msg)| (status, Json(ErrorResponse::new(msg, "QUERY_ERROR"))))?;
483+
484+
// Get stats for each queue
485+
let mut queue_stats = Vec::new();
486+
for queue_name in queue_names {
487+
let stats = get_queue_stats(pool, schema, &queue_name)
488+
.await
489+
.map_err(|(status, msg)| (status, Json(ErrorResponse::new(msg, "QUERY_ERROR"))))?;
490+
queue_stats.push(stats);
491+
}
492+
493+
info!("Listed {} queues", queue_stats.len());
494+
Ok(Json(queue_stats))
357495
}
358496

359497
/// Queue stats endpoint - GET /queues/:queue_name/stats
360498
async fn queue_stats<S>(
361-
State(_state): State<S>,
499+
State(state): State<S>,
362500
Path(queue_name): Path<String>,
363501
) -> Result<Json<QueueStatus>, (StatusCode, Json<ErrorResponse>)>
364502
where
365503
S: BackfillAdminState,
366504
{
367-
// This would query the actual queue statistics from the database
368-
// For now, return static data
369-
let stats = QueueStatus {
370-
queue_name: queue_name.clone(),
371-
pending_jobs: 0,
372-
active_jobs: 0,
373-
completed_jobs: 0,
374-
failed_jobs: 0,
375-
};
505+
let client = state.backfill_client();
506+
let stats = get_queue_stats(client.pool(), client.schema(), &queue_name)
507+
.await
508+
.map_err(|(status, msg)| (status, Json(ErrorResponse::new(msg, "QUERY_ERROR"))))?;
376509

377510
info!("Retrieved stats for queue: {}", queue_name);
378511
Ok(Json(stats))

0 commit comments

Comments
 (0)