Skip to content

Commit a64d891

Browse files
committed
fix: preserve job payloads when moving failed jobs to DLQ
1 parent b186c90 commit a64d891

3 files changed

Lines changed: 67 additions & 36 deletions

File tree

docs/implementation/incomplete-features.md

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,29 +147,7 @@ let queue_name = "default";
147147
- Query GraphileWorker schema to get queue info
148148
- Check if newer graphile_worker versions expose this field
149149

150-
### 9. DLQ payload limitation
151-
**Location:** `src/client/dlq.rs:542-543`
152-
153-
**Current State:** Uses empty JSON object `{}` as placeholder for payload when moving failed jobs to DLQ
154-
155-
**Issue:** GraphileWorker's jobs view doesn't include the payload field
156-
157-
**Code:**
158-
```rust
159-
// Payload is not available in the jobs view, use empty object as placeholder
160-
let payload = serde_json::json!({});
161-
```
162-
163-
**Affects:** `process_failed_jobs()` method - jobs moved to DLQ will have empty payloads
164-
165-
**Impact:** Cannot inspect actual job payload from DLQ, only metadata
166-
167-
**Possible Solutions:**
168-
- Query the underlying jobs table directly (not the view)
169-
- Store payload in a separate field during initial job processing
170-
- Accept this limitation and document it clearly
171-
172-
### 10. DLQ pagination optimization
150+
### 9. DLQ pagination optimization
173151
**Location:** `src/client/dlq.rs:238`
174152

175153
**Current State:** Uses separate COUNT query for pagination, could be more efficient
@@ -253,8 +231,8 @@ pub(crate) struct JobHandlerConfig {
253231
- All feature-gated, so not blocking main library use
254232

255233
**Low Priority (workarounds exist):**
256-
- 3 known limitations with acceptable workarounds
234+
- 2 known limitations with acceptable workarounds
257235
- 2 documentation TODOs
258236
- 1 dead code cleanup
259237

260-
**Total Items:** 13
238+
**Total Items:** 12

src/client/dlq.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -542,19 +542,22 @@ impl BackfillClient {
542542
pub async fn process_failed_jobs(&self) -> Result<u32, BackfillError> {
543543
// Find jobs that have failed permanently (attempts >= max_attempts)
544544
// and haven't been processed yet
545-
// Note: The jobs view doesn't include payload, so we'll handle this limitation
546545
let find_failed_jobs_query = format!(
547546
r#"
548-
SELECT id, task_identifier, queue_name, priority, key as job_key,
549-
max_attempts, attempts, last_error, created_at, run_at, updated_at
550-
FROM {}.jobs
551-
WHERE attempts >= max_attempts
552-
AND max_attempts > 0
553-
AND id NOT IN (SELECT COALESCE(original_job_id, -1) FROM {}.backfill_dlq)
554-
ORDER BY updated_at ASC
547+
SELECT jobs.id, tasks.identifier AS task_identifier,
548+
job_queues.queue_name, jobs.priority, jobs.key as job_key,
549+
jobs.max_attempts, jobs.attempts, jobs.last_error,
550+
jobs.created_at, jobs.run_at, jobs.updated_at, jobs.payload
551+
FROM {}._private_jobs AS jobs
552+
INNER JOIN {}._private_tasks AS tasks ON tasks.id = jobs.task_id
553+
LEFT JOIN {}._private_job_queues AS job_queues ON job_queues.id = jobs.job_queue_id
554+
WHERE jobs.attempts >= jobs.max_attempts
555+
AND jobs.max_attempts > 0
556+
AND jobs.id NOT IN (SELECT COALESCE(original_job_id, -1) FROM {}.backfill_dlq)
557+
ORDER BY jobs.updated_at ASC
555558
LIMIT 100
556559
"#,
557-
self.schema, self.schema
560+
self.schema, self.schema, self.schema, self.schema
558561
);
559562

560563
let failed_jobs = sqlx::query(&find_failed_jobs_query).fetch_all(&self.pool).await?;
@@ -564,8 +567,7 @@ impl BackfillClient {
564567
for job_row in failed_jobs {
565568
let job_id: i64 = job_row.get("id");
566569
let task_identifier: String = job_row.get("task_identifier");
567-
// Payload is not available in the jobs view, use empty object as placeholder
568-
let payload = serde_json::json!({});
570+
let payload: serde_json::Value = job_row.get("payload");
569571
let queue_name: Option<String> = job_row.get("queue_name");
570572
let queue_name = queue_name.unwrap_or_else(|| "default".to_string());
571573
let priority: i16 = job_row.get("priority");

tests/dlq_tests.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,57 @@ async fn test_dlq_add_job_and_retrieve() {
204204
let _testjob: TestJob = serde_json::from_value(retrieved.payload).expect("the payload should be a test job");
205205
}
206206

207+
#[tokio::test]
208+
async fn test_process_failed_jobs_preserves_payload() {
209+
let client = setup_test_client("dlq_payload_preservation").await;
210+
client.init_dlq().await.expect("DLQ init should work");
211+
212+
// Enqueue a job with specific payload
213+
let test_payload = TestJob {
214+
message: "unique test message for process_failed_jobs".to_string(),
215+
number: 12345,
216+
};
217+
218+
let outcome = client
219+
.enqueue(
220+
"test_job",
221+
&test_payload,
222+
JobSpec {
223+
max_attempts: Some(1),
224+
..Default::default()
225+
},
226+
)
227+
.await
228+
.expect("should enqueue");
229+
230+
let job = outcome.unwrap();
231+
232+
// Simulate job failure by updating attempts to match max_attempts
233+
sqlx::query(&format!(
234+
"UPDATE {}._private_jobs SET attempts = max_attempts WHERE id = $1",
235+
client.schema()
236+
))
237+
.bind(job.id())
238+
.execute(client.pool())
239+
.await
240+
.expect("should update job");
241+
242+
// Process failed jobs
243+
let moved = client.process_failed_jobs().await.expect("should process");
244+
assert_eq!(moved, 1);
245+
246+
// Verify payload was preserved
247+
let dlq_jobs = client.list_dlq_jobs(DlqFilter::default()).await.expect("should list");
248+
assert_eq!(dlq_jobs.jobs.len(), 1);
249+
250+
let dlq_job = &dlq_jobs.jobs[0];
251+
let recovered: TestJob =
252+
serde_json::from_value(dlq_job.payload.clone()).expect("payload should deserialize to TestJob");
253+
254+
assert_eq!(recovered.message, "unique test message for process_failed_jobs");
255+
assert_eq!(recovered.number, 12345);
256+
}
257+
207258
#[tokio::test]
208259
async fn test_dlq_list_with_filtering() {
209260
let client = setup_test_client("dlq_list_filter").await;

0 commit comments

Comments
 (0)