Skip to content

Commit 53937ec

Browse files
committed
feat: return EnqueueOutcome instead of error for duplicate job keys
When queuing a job with a duplicate id, as you might do if you are queuing jobs with deterministic ids, subsequent attempts will fail, and add_job() will respond with a row not found error, because it fails to find any newly-added job. We now catch this error and handle it gracefully. The EnqueueOutcome enum lets us distinguish between the case of this job being added successfully, and it not being added because it was a duplicate. pub enum EnqueueOutcome { // success gives you the job Enqueued(Box<Job>), // duplicate gives you the key back AlreadyInProgress { job_key: String }, } This is arguably a bug in graphile_worker: it should use fetch_optional(), not fetch_one(). OTOH, maybe it's intended. This is a breaking change. Instead of `Result<Job, Error>`, `enqueue()` now returns `Result<EnqueueOutcome, Error>`. The upside is you can do idempotent job enqueueing patterns where duplicate submissions are normal and not errors. This happens to be the use case I wrote this library for, so yay.
1 parent 7b3fa44 commit 53937ec

11 files changed

Lines changed: 306 additions & 74 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/admin_server.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,14 @@ async fn send_notification(
9797
};
9898

9999
match state.backfill.enqueue("send_email", &email_req, spec).await {
100-
Ok(job) => Ok(Json(serde_json::json!({
101-
"status": "enqueued",
102-
"job_id": job.id(),
103-
"message": format!("Email queued for {}", email_req.to)
104-
}))),
100+
Ok(outcome) => {
101+
let job = outcome.unwrap();
102+
Ok(Json(serde_json::json!({
103+
"status": "enqueued",
104+
"job_id": job.id(),
105+
"message": format!("Email queued for {}", email_req.to)
106+
})))
107+
}
105108
Err(e) => {
106109
error!("Failed to enqueue email: {}", e);
107110
Err(StatusCode::INTERNAL_SERVER_ERROR)

examples/enqueue_jobs.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7777
should_fail: Some(false),
7878
};
7979

80-
let job = client
80+
let outcome = client
8181
.enqueue(
8282
"example_job",
8383
&example_job,
@@ -92,7 +92,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9292
println!(
9393
"✉️ Enqueued ExampleJob: {} (job_id: {})",
9494
example_job.message,
95-
job.id()
95+
outcome.unwrap().id()
9696
);
9797

9898
// Enqueue a high-priority email job using the convenience function
@@ -103,14 +103,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
103103
template: Some("welcome".to_string()),
104104
};
105105

106-
let job = enqueue_fast(
106+
let outcome = enqueue_fast(
107107
&client,
108108
"send_email",
109109
&email_job,
110110
Some("welcome-email-user123".to_string()),
111111
)
112112
.await?;
113-
println!("📧 Enqueued SendEmailJob to: {} (job_id: {})", email_job.to, job.id());
113+
println!(
114+
"📧 Enqueued SendEmailJob to: {} (job_id: {})",
115+
email_job.to,
116+
outcome.unwrap().id()
117+
);
114118

115119
// Enqueue a bulk processing job
116120
let process_job = ProcessUserDataJob {
@@ -119,7 +123,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
119123
batch_size: Some(100),
120124
};
121125

122-
let job = enqueue_bulk(
126+
let outcome = enqueue_bulk(
123127
&client,
124128
"process_user_data",
125129
&process_job,
@@ -129,7 +133,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
129133
println!(
130134
"📊 Enqueued ProcessUserDataJob for user: {} (job_id: {})",
131135
process_job.user_id,
132-
job.id()
136+
outcome.unwrap().id()
133137
);
134138

135139
// Enqueue a scheduled report generation job (delayed by 30 seconds)
@@ -140,7 +144,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
140144
recipients: vec!["admin@example.com".to_string(), "manager@example.com".to_string()],
141145
};
142146

143-
let job = client
147+
let outcome = client
144148
.enqueue(
145149
"generate_report",
146150
&report_job,
@@ -156,7 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
156160
println!(
157161
"📈 Enqueued GenerateReportJob (scheduled for 30s): {} report (job_id: {})",
158162
report_job.report_type,
159-
job.id()
163+
outcome.unwrap().id()
160164
);
161165

162166
// Enqueue a job that will fail (for testing error handling)
@@ -166,7 +170,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
166170
should_fail: Some(true),
167171
};
168172

169-
let job = client
173+
let outcome = client
170174
.enqueue(
171175
"example_job",
172176
&failing_job,
@@ -180,7 +184,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
180184
.await?;
181185
println!(
182186
"💥 Enqueued failing ExampleJob for error testing (job_id: {})",
183-
job.id()
187+
outcome.unwrap().id()
184188
);
185189

186190
println!("\n🔄 Demonstrating exponential backoff retry policies...");
@@ -194,7 +198,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
194198
template: Some("alert".to_string()),
195199
};
196200

197-
let job = enqueue_critical(
201+
let outcome = enqueue_critical(
198202
&client,
199203
"send_email",
200204
&critical_job,
@@ -203,7 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
203207
.await?;
204208
println!(
205209
"🚨 Enqueued critical alert with aggressive retries (job_id: {})",
206-
job.id()
210+
outcome.unwrap().id()
207211
);
208212

209213
// Enqueue a job with fast retries for quick turnaround
@@ -213,7 +217,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
213217
should_fail: Some(false),
214218
};
215219

216-
let job = enqueue_fast_with_retries(
220+
let outcome = enqueue_fast_with_retries(
217221
&client,
218222
"example_job",
219223
&notification_job,
@@ -222,7 +226,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
222226
.await?;
223227
println!(
224228
"⚡ Enqueued fast notification with quick retries (job_id: {})",
225-
job.id()
229+
outcome.unwrap().id()
226230
);
227231

228232
// Enqueue a bulk job with conservative retries
@@ -232,14 +236,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
232236
batch_size: Some(1000),
233237
};
234238

235-
let job = enqueue_bulk_with_retries(
239+
let outcome = enqueue_bulk_with_retries(
236240
&client,
237241
"process_user_data",
238242
&bulk_job,
239243
Some("bulk-export-789".to_string()),
240244
)
241245
.await?;
242-
println!("📦 Enqueued bulk job with conservative retries (job_id: {})", job.id());
246+
println!(
247+
"📦 Enqueued bulk job with conservative retries (job_id: {})",
248+
outcome.unwrap().id()
249+
);
243250

244251
// Enqueue a job with custom retry policy
245252
let custom_retry_policy = RetryPolicy::new(
@@ -257,7 +264,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
257264
recipients: vec!["data-team@example.com".to_string()],
258265
};
259266

260-
let job = client
267+
let outcome = client
261268
.enqueue(
262269
"generate_report",
263270
&custom_job,
@@ -272,7 +279,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
272279
.await?;
273280
println!(
274281
"📊 Enqueued analytics job with custom retry policy (job_id: {})",
275-
job.id()
282+
outcome.unwrap().id()
276283
);
277284

278285
println!("\n🎯 All jobs enqueued successfully!");

src/admin.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ where
257257
}
258258

259259
match client.enqueue(&req.task_identifier, &req.payload, spec).await {
260-
Ok(job) => {
260+
Ok(crate::EnqueueOutcome::Enqueued(job)) => {
261261
let response = EnqueueJobResponse {
262262
job_id: job.id().to_string(),
263263
status: "enqueued".to_string(),
@@ -267,6 +267,17 @@ where
267267
info!("Successfully enqueued job: id={}", job.id());
268268
Ok(Json(response))
269269
}
270+
Ok(crate::EnqueueOutcome::AlreadyInProgress { job_key }) => {
271+
// Return 409 Conflict for already in progress
272+
warn!("Job already in progress: job_key={}", job_key);
273+
Err((
274+
StatusCode::CONFLICT,
275+
Json(ErrorResponse::new(
276+
format!("Job with key '{}' is already in progress", job_key),
277+
"JOB_ALREADY_IN_PROGRESS",
278+
)),
279+
))
280+
}
270281
Err(e) => {
271282
error!("Failed to enqueue job: {}", e);
272283
Err((

src/client/dlq.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,20 @@ impl BackfillClient {
329329
};
330330

331331
// Enqueue the job
332-
let job = self
332+
let outcome = self
333333
.enqueue(&dlq_job.task_identifier, &dlq_job.payload, spec.clone())
334334
.await?;
335335

336+
let job = match outcome {
337+
crate::EnqueueOutcome::Enqueued(job) => job,
338+
crate::EnqueueOutcome::AlreadyInProgress { job_key } => {
339+
return Err(BackfillError::RuntimeError(format!(
340+
"Cannot requeue DLQ job {}: a job with key '{}' is already in progress",
341+
dlq_id, job_key
342+
)));
343+
}
344+
};
345+
336346
// Record metrics
337347
crate::metrics::record_dlq_job_requeued(&dlq_job.task_identifier, spec.queue.as_str());
338348

@@ -361,7 +371,7 @@ impl BackfillClient {
361371
.execute(&self.pool)
362372
.await?;
363373

364-
Ok(job)
374+
Ok(*job)
365375
}
366376

367377
/// Delete a job from the DLQ permanently.

src/client/enqueue.rs

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
//! The queue functions implementations for backfill client.
22
3-
use graphile_worker::{Job, TaskHandler, WorkerUtils};
3+
use graphile_worker::{TaskHandler, WorkerUtils};
44
use serde::Serialize;
55
use sqlx::PgPool;
66
use sqlx::postgres::PgPoolOptions;
77

88
use super::BackfillClient;
9-
use crate::{BackfillError, JobSpec};
9+
use crate::{BackfillError, EnqueueOutcome, JobSpec};
10+
11+
/// Check if a GraphileWorkerError is a RowNotFound error.
12+
/// This happens when add_job returns NULL because a job with the same
13+
/// job_key is currently locked by a worker.
14+
fn is_row_not_found(e: &graphile_worker::errors::GraphileWorkerError) -> bool {
15+
use graphile_worker::errors::GraphileWorkerError;
16+
matches!(e, GraphileWorkerError::SqlError(sqlx::Error::RowNotFound))
17+
}
1018

1119
impl BackfillClient {
1220
/// Create a new BackfillClient with the given database URL.
@@ -67,13 +75,21 @@ impl BackfillClient {
6775
/// * `spec` - Job specification including priority, scheduling, etc.
6876
///
6977
/// # Returns
70-
/// The Job struct containing the job ID and metadata.
71-
pub async fn enqueue<T>(&self, task_identifier: &str, payload: &T, spec: JobSpec) -> Result<Job, BackfillError>
78+
/// `EnqueueOutcome::Enqueued(Job)` if the job was created or updated,
79+
/// `EnqueueOutcome::AlreadyInProgress { job_key }` if a job with the same
80+
/// key is currently being processed.
81+
pub async fn enqueue<T>(
82+
&self,
83+
task_identifier: &str,
84+
payload: &T,
85+
spec: JobSpec,
86+
) -> Result<EnqueueOutcome, BackfillError>
7287
where
7388
T: Serialize,
7489
{
7590
let start = std::time::Instant::now();
7691
let utils = self.utils();
92+
let job_key = spec.job_key.clone();
7793

7894
let result = utils
7995
.add_raw_job(task_identifier, serde_json::to_value(payload)?, spec.clone().into())
@@ -96,12 +112,35 @@ impl BackfillClient {
96112
spec.priority.0
97113
);
98114

99-
Ok(job)
115+
Ok(EnqueueOutcome::Enqueued(Box::new(job)))
100116
}
101117
Err(e) => {
102-
crate::metrics::record_db_operation("enqueue", "error");
103-
log::error!("Failed to enqueue job (task: {}, error: {})", task_identifier, e);
104-
Err(e.into())
118+
// Check if this is a RowNotFound error with a job_key
119+
// This indicates the job_key is already locked by a worker
120+
if is_row_not_found(&e) && job_key.is_some() {
121+
// Job with this key is already locked/in progress
122+
crate::metrics::record_db_operation("enqueue", "already_in_progress");
123+
crate::metrics::record_db_operation_duration("enqueue", start.elapsed().as_secs_f64());
124+
125+
// Record the already_in_progress metric
126+
crate::metrics::record_job_already_in_progress(spec.queue.as_str(), task_identifier);
127+
128+
let key = job_key.clone().unwrap_or_else(|| "<unknown>".to_string());
129+
log::debug!(
130+
"Job already in progress (job_key: {}, task: {}, queue: {})",
131+
key,
132+
task_identifier,
133+
spec.queue.as_str()
134+
);
135+
136+
Ok(EnqueueOutcome::AlreadyInProgress {
137+
job_key: job_key.unwrap_or_default(),
138+
})
139+
} else {
140+
crate::metrics::record_db_operation("enqueue", "error");
141+
log::error!("Failed to enqueue job (task: {}, error: {})", task_identifier, e);
142+
Err(e.into())
143+
}
105144
}
106145
}
107146
}
@@ -110,13 +149,20 @@ impl BackfillClient {
110149
///
111150
/// This method uses the task's IDENTIFIER constant and ensures the payload
112151
/// type matches the expected task type.
113-
pub async fn enqueue_task<T>(&self, task: T, spec: JobSpec) -> Result<Job, BackfillError>
152+
pub async fn enqueue_task<T>(&self, task: T, spec: JobSpec) -> Result<EnqueueOutcome, BackfillError>
114153
where
115154
T: TaskHandler + Serialize,
116155
{
117156
let utils = self.utils();
118-
let job = utils.add_job(task, spec.into()).await?;
119-
Ok(job)
157+
let job_key = spec.job_key.clone();
158+
159+
match utils.add_job(task, spec.into()).await {
160+
Ok(job) => Ok(EnqueueOutcome::Enqueued(Box::new(job))),
161+
Err(e) if is_row_not_found(&e) && job_key.is_some() => Ok(EnqueueOutcome::AlreadyInProgress {
162+
job_key: job_key.unwrap_or_default(),
163+
}),
164+
Err(e) => Err(e.into()),
165+
}
120166
}
121167

122168
/// Remove a job by its unique key.

0 commit comments

Comments
 (0)