Skip to content

Commit 5872a9d

Browse files
committed
feat: add DLQ batch operations to admin API
Implements three batch endpoints for efficient DLQ management: - POST /dlq/cleanup - batch cleanup by age/filter - POST /dlq/batch-requeue - batch requeue with throttling - POST /dlq/batch-delete - batch delete by filter All endpoints support dry-run mode and safety limits.
1 parent 4203d8a commit 5872a9d

2 files changed

Lines changed: 294 additions & 25 deletions

File tree

docs/implementation/incomplete-features.md

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,6 @@ These are all feature-gated behind the `axum` feature flag.
6161

6262
**Notes:** May need to research GraphileWorker's cancellation mechanism
6363

64-
### 4. Admin API: dlq_cleanup()
65-
**Location:** `src/admin.rs:545-562`
66-
67-
**Current State:** Returns HTTP 501 NOT_IMPLEMENTED
68-
69-
**What Needs to be Implemented:**
70-
- Batch cleanup of old DLQ jobs
71-
- Parameters for cleanup criteria (age, status, task type, etc.)
72-
- Return count of cleaned up jobs
73-
74-
**Endpoint:** POST /dlq/cleanup
75-
76-
**Notes:** Should probably accept query parameters or JSON body to specify cleanup criteria
77-
7864
---
7965

8066
## Low Priority - Known Limitations/Workarounds
@@ -159,12 +145,12 @@ pub(crate) struct JobHandlerConfig {
159145
- WorkerRunner::process_available_jobs() - batch processing use case
160146

161147
**Medium Priority (admin API features):**
162-
- 3 admin endpoints returning stub/static data
148+
- 2 admin endpoints returning stub/static data
163149
- All feature-gated, so not blocking main library use
164150

165151
**Low Priority (workarounds exist):**
166152
- 1 known limitation with acceptable workaround
167153
- 2 documentation TODOs
168154
- 1 dead code cleanup
169155

170-
**Total Items:** 8
156+
**Total Items:** 7

src/admin.rs

Lines changed: 292 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,75 @@ impl ErrorResponse {
255255
}
256256
}
257257

258+
/// DLQ cleanup request
259+
#[derive(Deserialize)]
260+
pub struct DlqCleanupRequest {
261+
pub older_than_days: Option<i64>,
262+
pub task_identifier: Option<String>,
263+
pub queue_name: Option<String>,
264+
pub max_jobs: Option<i32>,
265+
pub dry_run: Option<bool>,
266+
}
267+
268+
/// DLQ cleanup response
269+
#[derive(Serialize)]
270+
pub struct DlqCleanupResponse {
271+
pub deleted_count: u32,
272+
pub dry_run: bool,
273+
pub cutoff_date: chrono::DateTime<chrono::Utc>,
274+
pub affected_tasks: Vec<String>,
275+
}
276+
277+
/// DLQ batch requeue request
278+
#[derive(Deserialize)]
279+
pub struct DlqBatchRequeueRequest {
280+
pub task_identifier: Option<String>,
281+
pub queue_name: Option<String>,
282+
pub failed_after: Option<chrono::DateTime<chrono::Utc>>,
283+
pub failed_before: Option<chrono::DateTime<chrono::Utc>>,
284+
pub notes: Option<String>,
285+
pub max_jobs: Option<i32>,
286+
pub dry_run: Option<bool>,
287+
pub throttle_ms: Option<u64>,
288+
}
289+
290+
/// Batch operation error details
291+
#[derive(Serialize)]
292+
pub struct BatchOperationError {
293+
pub dlq_id: i64,
294+
pub task_identifier: String,
295+
pub error: String,
296+
}
297+
298+
/// DLQ batch requeue response
299+
#[derive(Serialize)]
300+
pub struct DlqBatchRequeueResponse {
301+
pub requeued_count: u32,
302+
pub failed_count: u32,
303+
pub dry_run: bool,
304+
pub affected_tasks: Vec<String>,
305+
pub errors: Vec<BatchOperationError>,
306+
}
307+
308+
/// DLQ batch delete request
309+
#[derive(Deserialize)]
310+
pub struct DlqBatchDeleteRequest {
311+
pub task_identifier: Option<String>,
312+
pub queue_name: Option<String>,
313+
pub failed_after: Option<chrono::DateTime<chrono::Utc>>,
314+
pub failed_before: Option<chrono::DateTime<chrono::Utc>>,
315+
pub max_jobs: Option<i32>,
316+
pub dry_run: Option<bool>,
317+
}
318+
319+
/// DLQ batch delete response
320+
#[derive(Serialize)]
321+
pub struct DlqBatchDeleteResponse {
322+
pub deleted_count: u32,
323+
pub dry_run: bool,
324+
pub affected_tasks: Vec<String>,
325+
}
326+
258327
/// Create the admin router that can be mounted in any Axum application
259328
///
260329
/// This router provides comprehensive backfill management endpoints:
@@ -291,6 +360,8 @@ where
291360
.route("/dlq/:dlq_id", delete(delete_dlq_job::<S>))
292361
.route("/dlq/:dlq_id/requeue", post(requeue_dlq_job::<S>))
293362
.route("/dlq/cleanup", post(cleanup_dlq::<S>))
363+
.route("/dlq/batch-requeue", post(batch_requeue_dlq_jobs::<S>))
364+
.route("/dlq/batch-delete", post(batch_delete_dlq_jobs::<S>))
294365
}
295366

296367
/// Health check endpoint - GET /health
@@ -690,17 +761,229 @@ where
690761
}
691762

692763
/// Cleanup DLQ endpoint - POST /dlq/cleanup
693-
async fn cleanup_dlq<S>(State(state): State<S>) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)>
764+
async fn cleanup_dlq<S>(
765+
State(state): State<S>,
766+
Json(req): Json<DlqCleanupRequest>,
767+
) -> Result<Json<DlqCleanupResponse>, (StatusCode, Json<ErrorResponse>)>
694768
where
695769
S: BackfillAdminState,
696770
{
697-
let _client = state.backfill_client();
771+
let client = state.backfill_client();
698772

699-
// This would implement batch cleanup of old DLQ jobs
700-
// For now, return a placeholder
701-
warn!("DLQ cleanup endpoint not yet implemented");
702-
Err((
703-
StatusCode::NOT_IMPLEMENTED,
704-
Json(ErrorResponse::new("DLQ cleanup not yet implemented", "NOT_IMPLEMENTED")),
705-
))
773+
// Default to 90 days if not specified
774+
let days = req.older_than_days.unwrap_or(90);
775+
let cutoff_date = chrono::Utc::now() - chrono::Duration::days(days);
776+
let max_jobs = req.max_jobs.unwrap_or(1000).min(10000);
777+
let dry_run = req.dry_run.unwrap_or(false);
778+
779+
// Build filter for list query
780+
let filter = DlqFilter {
781+
task_identifier: req.task_identifier.clone(),
782+
queue_name: req.queue_name.clone(),
783+
failed_before: Some(cutoff_date),
784+
limit: Some(max_jobs),
785+
..Default::default()
786+
};
787+
788+
// Get jobs to delete
789+
let jobs_to_delete = client.list_dlq_jobs(filter).await.map_err(|e| {
790+
(
791+
StatusCode::INTERNAL_SERVER_ERROR,
792+
Json(ErrorResponse::new(format!("Failed to query DLQ: {}", e), "QUERY_ERROR")),
793+
)
794+
})?;
795+
796+
if dry_run {
797+
let affected_tasks: Vec<String> = jobs_to_delete
798+
.jobs
799+
.iter()
800+
.map(|j| j.task_identifier.clone())
801+
.collect::<std::collections::HashSet<_>>()
802+
.into_iter()
803+
.collect();
804+
805+
return Ok(Json(DlqCleanupResponse {
806+
deleted_count: jobs_to_delete.jobs.len() as u32,
807+
dry_run: true,
808+
cutoff_date,
809+
affected_tasks,
810+
}));
811+
}
812+
813+
// Delete jobs
814+
let mut deleted_count = 0;
815+
let mut affected_tasks = std::collections::HashSet::new();
816+
817+
for job in &jobs_to_delete.jobs {
818+
if client.delete_dlq_job(job.id).await.unwrap_or(false) {
819+
deleted_count += 1;
820+
affected_tasks.insert(job.task_identifier.clone());
821+
}
822+
}
823+
824+
info!("DLQ cleanup completed: deleted {} jobs", deleted_count);
825+
826+
Ok(Json(DlqCleanupResponse {
827+
deleted_count,
828+
dry_run: false,
829+
cutoff_date,
830+
affected_tasks: affected_tasks.into_iter().collect(),
831+
}))
832+
}
833+
834+
/// Batch requeue DLQ jobs endpoint - POST /dlq/batch-requeue
835+
async fn batch_requeue_dlq_jobs<S>(
836+
State(state): State<S>,
837+
Json(req): Json<DlqBatchRequeueRequest>,
838+
) -> Result<Json<DlqBatchRequeueResponse>, (StatusCode, Json<ErrorResponse>)>
839+
where
840+
S: BackfillAdminState,
841+
{
842+
let client = state.backfill_client();
843+
let max_jobs = req.max_jobs.unwrap_or(100).min(1000);
844+
let dry_run = req.dry_run.unwrap_or(false);
845+
let throttle_ms = req.throttle_ms.unwrap_or(50).min(5000);
846+
847+
// Build filter
848+
let filter = DlqFilter {
849+
task_identifier: req.task_identifier.clone(),
850+
queue_name: req.queue_name.clone(),
851+
failed_after: req.failed_after,
852+
failed_before: req.failed_before,
853+
limit: Some(max_jobs),
854+
..Default::default()
855+
};
856+
857+
// Get jobs to requeue
858+
let jobs_to_requeue = client.list_dlq_jobs(filter).await.map_err(|e| {
859+
(
860+
StatusCode::INTERNAL_SERVER_ERROR,
861+
Json(ErrorResponse::new(format!("Failed to query DLQ: {}", e), "QUERY_ERROR")),
862+
)
863+
})?;
864+
865+
if dry_run {
866+
let affected_tasks: Vec<String> = jobs_to_requeue
867+
.jobs
868+
.iter()
869+
.map(|j| j.task_identifier.clone())
870+
.collect::<std::collections::HashSet<_>>()
871+
.into_iter()
872+
.collect();
873+
874+
return Ok(Json(DlqBatchRequeueResponse {
875+
requeued_count: jobs_to_requeue.jobs.len() as u32,
876+
failed_count: 0,
877+
dry_run: true,
878+
affected_tasks,
879+
errors: vec![],
880+
}));
881+
}
882+
883+
// Requeue jobs with throttling
884+
let mut requeued_count = 0;
885+
let mut failed_count = 0;
886+
let mut errors = Vec::new();
887+
let mut affected_tasks = std::collections::HashSet::new();
888+
889+
for job in &jobs_to_requeue.jobs {
890+
match client.requeue_dlq_job(job.id, req.notes.clone()).await {
891+
Ok(_) => {
892+
requeued_count += 1;
893+
affected_tasks.insert(job.task_identifier.clone());
894+
895+
// Throttle to avoid overwhelming the queue
896+
if throttle_ms > 0 {
897+
tokio::time::sleep(tokio::time::Duration::from_millis(throttle_ms)).await;
898+
}
899+
}
900+
Err(e) => {
901+
failed_count += 1;
902+
errors.push(BatchOperationError {
903+
dlq_id: job.id,
904+
task_identifier: job.task_identifier.clone(),
905+
error: e.to_string(),
906+
});
907+
}
908+
}
909+
}
910+
911+
info!(
912+
"DLQ batch requeue completed: {} succeeded, {} failed",
913+
requeued_count, failed_count
914+
);
915+
916+
Ok(Json(DlqBatchRequeueResponse {
917+
requeued_count,
918+
failed_count,
919+
dry_run: false,
920+
affected_tasks: affected_tasks.into_iter().collect(),
921+
errors,
922+
}))
923+
}
924+
925+
/// Batch delete DLQ jobs endpoint - POST /dlq/batch-delete
926+
async fn batch_delete_dlq_jobs<S>(
927+
State(state): State<S>,
928+
Json(req): Json<DlqBatchDeleteRequest>,
929+
) -> Result<Json<DlqBatchDeleteResponse>, (StatusCode, Json<ErrorResponse>)>
930+
where
931+
S: BackfillAdminState,
932+
{
933+
let client = state.backfill_client();
934+
let max_jobs = req.max_jobs.unwrap_or(100).min(1000);
935+
let dry_run = req.dry_run.unwrap_or(false);
936+
937+
// Build filter
938+
let filter = DlqFilter {
939+
task_identifier: req.task_identifier.clone(),
940+
queue_name: req.queue_name.clone(),
941+
failed_after: req.failed_after,
942+
failed_before: req.failed_before,
943+
limit: Some(max_jobs),
944+
..Default::default()
945+
};
946+
947+
// Get jobs to delete
948+
let jobs_to_delete = client.list_dlq_jobs(filter).await.map_err(|e| {
949+
(
950+
StatusCode::INTERNAL_SERVER_ERROR,
951+
Json(ErrorResponse::new(format!("Failed to query DLQ: {}", e), "QUERY_ERROR")),
952+
)
953+
})?;
954+
955+
if dry_run {
956+
let affected_tasks: Vec<String> = jobs_to_delete
957+
.jobs
958+
.iter()
959+
.map(|j| j.task_identifier.clone())
960+
.collect::<std::collections::HashSet<_>>()
961+
.into_iter()
962+
.collect();
963+
964+
return Ok(Json(DlqBatchDeleteResponse {
965+
deleted_count: jobs_to_delete.jobs.len() as u32,
966+
dry_run: true,
967+
affected_tasks,
968+
}));
969+
}
970+
971+
// Delete jobs
972+
let mut deleted_count = 0;
973+
let mut affected_tasks = std::collections::HashSet::new();
974+
975+
for job in &jobs_to_delete.jobs {
976+
if client.delete_dlq_job(job.id).await.unwrap_or(false) {
977+
deleted_count += 1;
978+
affected_tasks.insert(job.task_identifier.clone());
979+
}
980+
}
981+
982+
info!("DLQ batch delete completed: deleted {} jobs", deleted_count);
983+
984+
Ok(Json(DlqBatchDeleteResponse {
985+
deleted_count,
986+
dry_run: false,
987+
affected_tasks: affected_tasks.into_iter().collect(),
988+
}))
706989
}

0 commit comments

Comments
 (0)