Skip to content

Commit 0efae24

Browse files
fix(dlq): preserve parallel queue type when adding jobs to DLQ (#9)
## Summary - Fix parallel jobs being converted to serial when requeued from DLQ - Use empty string instead of "default" for parallel jobs in DLQ ## Problem When parallel jobs (`Queue::Parallel`) fail and are added to the DLQ, they were being stored with `queue_name = "default"`. When requeued, the requeue logic sees a non-empty string and creates `Queue::Serial("default")` instead of `Queue::Parallel`. The requeue logic already correctly handles this distinction: ```rust let queue = if dlq_job.queue_name.is_empty() { Queue::Parallel // ← Empty string = parallel } else { Queue::Serial(dlq_job.queue_name.clone()) // ← Non-empty = serial }; ``` But `add_to_dlq()` and `process_failed_jobs()` were defaulting to `"default"` instead of empty string for parallel jobs (which have no `job_queue_id`). ## Fix - `add_to_dlq()`: Use `String::new()` when `job_queue_id()` is `None` (parallel job) - `process_failed_jobs()`: Use `unwrap_or_default()` when `queue_name` is `NULL` This ensures parallel jobs stay parallel when requeued from DLQ. ## Test plan - [x] `cargo check` passes - [x] `just fmt` - no changes needed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3a2d681 commit 0efae24

2 files changed

Lines changed: 63 additions & 3 deletions

File tree

src/client/dlq.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,9 @@ impl BackfillClient {
493493
) -> Result<DlqJob, BackfillError> {
494494
let start = std::time::Instant::now();
495495

496-
// Query queue_name from job_queue_id
496+
// Query queue_name from job_queue_id.
497+
// For parallel jobs (no queue_id), use empty string so requeue preserves
498+
// parallel execution. The requeue logic treats empty string as Queue::Parallel.
497499
let queue_name = if let Some(queue_id) = original_job.job_queue_id() {
498500
let query = format!(
499501
"SELECT queue_name FROM {}._private_job_queues WHERE id = $1",
@@ -505,7 +507,9 @@ impl BackfillClient {
505507
.await?
506508
.unwrap_or_else(|| "default".to_string())
507509
} else {
508-
"default".to_string()
510+
// Parallel jobs have no queue_id - use empty string to preserve
511+
// parallel execution when requeued (is_empty() check in requeue_dlq_job)
512+
String::new()
509513
};
510514

511515
// Use UPSERT to handle the case where a requeued job fails again.
@@ -618,7 +622,9 @@ impl BackfillClient {
618622
let task_identifier: String = job_row.get("task_identifier");
619623
let payload: serde_json::Value = job_row.get("payload");
620624
let queue_name: Option<String> = job_row.get("queue_name");
621-
let queue_name = queue_name.unwrap_or_else(|| "default".to_string());
625+
// Use empty string for parallel jobs (NULL queue_name) to preserve
626+
// parallel execution when requeued
627+
let queue_name = queue_name.unwrap_or_default();
622628
let priority: i16 = job_row.get("priority");
623629
let job_key: Option<String> = job_row.get("job_key");
624630
let max_attempts: i16 = job_row.get("max_attempts");

tests/dlq_tests.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,60 @@ async fn test_add_to_dlq_preserves_queue_name() {
290290
assert_eq!(dlq_job.queue_name, "fast");
291291
}
292292

293+
#[tokio::test]
294+
async fn test_parallel_job_stays_parallel_through_dlq() {
295+
let client = setup_test_client("dlq_parallel_roundtrip").await;
296+
client.init_dlq().await.expect("DLQ init should work");
297+
298+
let test_job = TestJob {
299+
message: "parallel job test".to_string(),
300+
number: 777,
301+
};
302+
303+
// Enqueue as parallel job (default)
304+
let outcome = client
305+
.enqueue(
306+
"test_job",
307+
&test_job,
308+
JobSpec {
309+
job_key: Some("parallel_dlq_test".to_string()),
310+
..Default::default() // Queue::Parallel is the default
311+
},
312+
)
313+
.await
314+
.expect("should enqueue");
315+
316+
let job = outcome.unwrap();
317+
318+
// Verify original job has no queue_id (parallel)
319+
assert!(job.job_queue_id().is_none(), "parallel job should have no queue_id");
320+
321+
// Add to DLQ
322+
let dlq_job = client
323+
.add_to_dlq(&job, "Test failure", None)
324+
.await
325+
.expect("should add to DLQ");
326+
327+
// Verify queue_name is empty string (not "default")
328+
assert!(
329+
dlq_job.queue_name.is_empty(),
330+
"parallel job should have empty queue_name in DLQ, got: '{}'",
331+
dlq_job.queue_name
332+
);
333+
334+
// Requeue the job
335+
let requeued = client
336+
.requeue_dlq_job(dlq_job.id, Some("Testing parallel preservation".to_string()))
337+
.await
338+
.expect("should requeue");
339+
340+
// Verify requeued job is still parallel (no queue_id)
341+
assert!(
342+
requeued.job_queue_id().is_none(),
343+
"requeued parallel job should still have no queue_id"
344+
);
345+
}
346+
293347
#[tokio::test]
294348
async fn test_dlq_list_with_filtering() {
295349
let client = setup_test_client("dlq_list_filter").await;

0 commit comments

Comments
 (0)