Skip to content

Commit ecc4e18

Browse files
fix: use UPSERT to prevent duplicate DLQ entries (#6)
## Summary Two fixes for worker queue health and DLQ reliability: ### 1. DLQ UPSERT - Prevent duplicate entries When a job is requeued from the DLQ and fails again, it was creating a new DLQ entry instead of updating the existing one. This caused: - **Duplicate DLQ entries** for the same logical job (same `job_key`) - **Stale `failed_at` timestamps** - original entry's timestamp was never updated - **Infinite requeue loops** - auto-requeue cron jobs would pick up old entries repeatedly - **Exponential job growth** - duplicates kept multiplying on each requeue cycle **Fix:** - Add a **unique partial index** on `job_key` (`WHERE job_key IS NOT NULL`) - Change `INSERT` to `UPSERT` in `add_to_dlq()` and `process_failed_jobs()` - On conflict, update `failed_at=NOW()` and increment `failure_count` ### 2. Startup cleanup - Release stale locks and clean up dead jobs When workers crash or are killed without graceful shutdown, they can leave behind: - **Stale queue locks** - prevent other workers from processing jobs in affected queues - **Dead jobs** - jobs with `attempts >= max_attempts` that sit in the main queue forever **Fix:** - Add `startup_cleanup()` that runs automatically when `run_until_cancelled()` is called - `release_stale_queue_locks()` - releases queue locks older than 5 minutes (`DEFAULT_STALE_LOCK_TIMEOUT`) - `cleanup_permanently_failed_jobs()` - deletes jobs that have exhausted retries ## Behavior After Fix - **One DLQ entry per logical job** (identified by `job_key`) - **`failed_at` always reflects the most recent failure** - cooldown periods work correctly - **`failure_count` accumulates** across all failure cycles - **Stale locks auto-released** on worker startup - **Dead jobs cleaned up** on worker startup ## Testing All 55 tests pass including the admin API DLQ tests. ## Breaking Changes None - backwards compatible. Existing DLQ entries without `job_key` are unaffected. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2fc7837 commit ecc4e18

4 files changed

Lines changed: 156 additions & 9 deletions

File tree

src/client/cleanup.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
//! Cleanup utilities for maintaining worker queue health
2+
//!
3+
//! Provides functions to clean up stale state that can accumulate when workers
4+
//! crash or are forcibly terminated without graceful shutdown.
5+
6+
use std::time::Duration;
7+
8+
use super::BackfillClient;
9+
use crate::BackfillError;
10+
11+
/// Default timeout for considering a queue lock stale.
12+
///
13+
/// Queue locks are held briefly during job selection (milliseconds), so any
14+
/// lock older than this is almost certainly from a crashed worker.
15+
pub const DEFAULT_STALE_LOCK_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
16+
17+
impl BackfillClient {
18+
/// Release stale queue locks that were left behind by crashed workers.
19+
///
20+
/// When a worker crashes or is killed without graceful shutdown, it may
21+
/// leave queue locks behind. These stale locks prevent other workers
22+
/// from processing jobs in the affected queues.
23+
///
24+
/// This function releases any queue locks older than the specified timeout.
25+
///
26+
/// # Arguments
27+
/// * `timeout` - Locks older than this duration are considered stale
28+
///
29+
/// # Returns
30+
/// Number of queue locks that were released
31+
pub async fn release_stale_queue_locks(&self, timeout: Duration) -> Result<u64, BackfillError> {
32+
let timeout_secs = timeout.as_secs();
33+
34+
let query = format!(
35+
r#"
36+
UPDATE {schema}._private_job_queues
37+
SET locked_at = NULL, locked_by = NULL
38+
WHERE locked_at IS NOT NULL
39+
AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds'
40+
"#,
41+
schema = self.schema,
42+
timeout_secs = timeout_secs
43+
);
44+
45+
let result = sqlx::query(&query).execute(&self.pool).await?;
46+
let released = result.rows_affected();
47+
48+
if released > 0 {
49+
log::info!(
50+
"Released stale queue locks (count: {}, timeout_secs: {})",
51+
released,
52+
timeout_secs
53+
);
54+
}
55+
56+
Ok(released)
57+
}
58+
59+
/// Delete permanently failed jobs from the main queue.
60+
///
61+
/// Jobs that have exhausted all retry attempts (attempts >= max_attempts)
62+
/// remain in the main queue with `is_available = false`. These jobs
63+
/// will never be processed again and should be cleaned up.
64+
///
65+
/// Note: These jobs should already be captured to the DLQ by the task
66+
/// handler or DLQ processor before reaching this state. This function
67+
/// removes the leftover rows from the main queue.
68+
///
69+
/// # Returns
70+
/// Number of permanently failed jobs that were deleted
71+
pub async fn cleanup_permanently_failed_jobs(&self) -> Result<u64, BackfillError> {
72+
let query = format!(
73+
r#"
74+
DELETE FROM {schema}._private_jobs
75+
WHERE attempts >= max_attempts
76+
AND locked_at IS NULL
77+
"#,
78+
schema = self.schema
79+
);
80+
81+
let result = sqlx::query(&query).execute(&self.pool).await?;
82+
let deleted = result.rows_affected();
83+
84+
if deleted > 0 {
85+
log::info!(
86+
"Cleaned up permanently failed jobs from main queue (count: {})",
87+
deleted
88+
);
89+
}
90+
91+
Ok(deleted)
92+
}
93+
94+
/// Run all startup cleanup tasks.
95+
///
96+
/// This should be called when a worker starts to clean up any stale state
97+
/// left behind by previous workers. It performs:
98+
/// 1. Release stale queue locks (using default timeout)
99+
/// 2. Delete permanently failed jobs from main queue
100+
///
101+
/// # Returns
102+
/// Tuple of (stale_locks_released, failed_jobs_deleted)
103+
pub async fn startup_cleanup(&self) -> Result<(u64, u64), BackfillError> {
104+
log::info!("Running startup cleanup tasks");
105+
106+
let locks_released = self.release_stale_queue_locks(DEFAULT_STALE_LOCK_TIMEOUT).await?;
107+
let jobs_deleted = self.cleanup_permanently_failed_jobs().await?;
108+
109+
log::info!(
110+
"Startup cleanup completed (stale_locks_released: {}, failed_jobs_deleted: {})",
111+
locks_released,
112+
jobs_deleted
113+
);
114+
115+
Ok((locks_released, jobs_deleted))
116+
}
117+
}

src/client/dlq.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ impl BackfillClient {
166166
"CREATE INDEX IF NOT EXISTS idx_backfill_dlq_job_key ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
167167
self.schema
168168
),
169+
// Unique constraint on job_key for UPSERT support - prevents duplicate DLQ entries
170+
// when a requeued job fails again. Only applies to non-NULL job_keys.
171+
format!(
172+
"CREATE UNIQUE INDEX IF NOT EXISTS idx_backfill_dlq_job_key_unique ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
173+
self.schema
174+
),
169175
];
170176

171177
for index_query in indexes {
@@ -477,19 +483,30 @@ impl BackfillClient {
477483
"default".to_string()
478484
};
479485

480-
let insert_query = format!(
486+
// Use UPSERT to handle the case where a requeued job fails again.
487+
// If a DLQ entry with the same job_key already exists, update it
488+
// instead of creating a duplicate. This ensures one DLQ entry per
489+
// logical job and keeps failed_at current for cooldown calculations.
490+
let upsert_query = format!(
481491
r#"
482492
INSERT INTO {}.backfill_dlq (
483493
original_job_id, task_identifier, payload, queue_name, priority,
484494
job_key, max_attempts, failure_reason, failure_count, last_error,
485495
original_created_at, original_run_at
486496
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
497+
ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
498+
failed_at = NOW(),
499+
failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
500+
failure_reason = EXCLUDED.failure_reason,
501+
last_error = EXCLUDED.last_error,
502+
original_job_id = EXCLUDED.original_job_id
487503
RETURNING *
488504
"#,
489-
self.schema
505+
self.schema,
506+
schema = self.schema
490507
);
491508

492-
let row = sqlx::query(&insert_query)
509+
let row = sqlx::query(&upsert_query)
493510
.bind(original_job.id())
494511
.bind(original_job.task_identifier())
495512
.bind(original_job.payload())
@@ -588,21 +605,27 @@ impl BackfillClient {
588605
// Convert last_error from TEXT to JSONB for DLQ table
589606
let last_error_json = last_error.map(serde_json::Value::String);
590607

591-
// Move to DLQ
592-
let insert_dlq_query = format!(
608+
// Move to DLQ using UPSERT to handle requeued jobs that fail again
609+
let upsert_dlq_query = format!(
593610
r#"
594-
INSERT INTO {}.backfill_dlq (
611+
INSERT INTO {schema}.backfill_dlq (
595612
original_job_id, task_identifier, payload, queue_name, priority,
596613
job_key, max_attempts, failure_reason, failure_count, last_error,
597614
original_created_at, original_run_at
598615
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
616+
ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
617+
failed_at = NOW(),
618+
failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
619+
failure_reason = EXCLUDED.failure_reason,
620+
last_error = EXCLUDED.last_error,
621+
original_job_id = EXCLUDED.original_job_id
599622
"#,
600-
self.schema
623+
schema = self.schema
601624
);
602625

603626
let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts);
604627

605-
let insert_result = sqlx::query(&insert_dlq_query)
628+
let upsert_result = sqlx::query(&upsert_dlq_query)
606629
.bind(job_id)
607630
.bind(&task_identifier)
608631
.bind(&payload)
@@ -618,7 +641,7 @@ impl BackfillClient {
618641
.execute(&self.pool)
619642
.await;
620643

621-
match insert_result {
644+
match upsert_result {
622645
Ok(_) => {
623646
// Successfully moved to DLQ, now remove from main jobs table
624647
let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema);

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
//! The backfill client, split across a couple of files.
22
3+
mod cleanup;
34
mod dlq;
45
mod enqueue;
56

7+
pub use cleanup::DEFAULT_STALE_LOCK_TIMEOUT;
68
pub use dlq::*;
79

810
/// High-level client for the backfill job queue system.

src/worker.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,11 @@ impl WorkerRunner {
554554
self.config.dlq_processor_interval.is_some()
555555
);
556556

557+
// Run startup cleanup to release stale locks and clean up failed jobs
558+
if let Err(e) = self.client.startup_cleanup().await {
559+
log::warn!("Startup cleanup failed (continuing anyway): {}", e);
560+
}
561+
557562
// Record worker starting (increment active worker count)
558563
crate::metrics::update_worker_active("worker", 1);
559564

0 commit comments

Comments
 (0)