Skip to content

Commit 05e241b

Browse files
authored
fix: handle locking tasks in retry context (#20)
* fix: handle locking tasks in retry context * lint: clippy
1 parent 9894feb commit 05e241b

9 files changed

Lines changed: 27 additions & 67 deletions

File tree

.sqlx/query-3b0fccfab61f95863ef5a2e5c4ca4a888ce3c14e404ace182b985b8a16999f71.json renamed to .sqlx/query-5fd3913bd5d12b5571141da1d5fc6ddf0bd6d1af82bccf97b2b39827daf0e4ca.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/basic.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22

3-
use apalis::prelude::*;
3+
use apalis::{layers::retry::RetryPolicy, prelude::*};
44
use apalis_postgres::*;
55
use futures::stream::{self, StreamExt};
66

@@ -24,12 +24,18 @@ async fn main() {
2424
.take(10);
2525
backend.push_all(&mut items).await.unwrap();
2626

27-
async fn send_reminder(_item: usize, _wrk: WorkerContext) -> Result<(), BoxDynError> {
27+
async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
28+
if item % 3 == 0 {
29+
println!("Reminding about item: {} but failing", item);
30+
return Err("Failed to send reminder".into());
31+
}
32+
println!("Reminding about item: {}", item);
2833
Ok(())
2934
}
3035

3136
let worker = WorkerBuilder::new("worker-1")
3237
.backend(backend)
38+
.retry(RetryPolicy::retries(1))
3339
.build(send_reminder);
3440
worker.run().await.unwrap();
3541
}

queries/task/lock_by_id.sql

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1-
UPDATE apalis.jobs
2-
SET
1+
UPDATE
2+
apalis.jobs
3+
SET
34
status = 'Running',
45
lock_at = now(),
56
lock_by = $2
6-
WHERE
7-
status = 'Queued'
7+
WHERE
8+
(
9+
status = 'Pending'
10+
OR status = 'Queued'
11+
OR (
12+
status = 'Failed'
13+
AND attempts < max_attempts
14+
)
15+
)
816
AND run_at < now()
9-
AND id = ANY($1)
10-
RETURNING *;
17+
AND id = ANY($1) RETURNING *;

src/ack.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use apalis_core::{
2+
error::AbortError,
23
error::BoxDynError,
34
layers::{Layer, Service},
45
task::{Parts, status::Status},
@@ -148,7 +149,9 @@ where
148149
};
149150
let fut = self.inner.call(req);
150151
async move {
151-
lock_task(&pool, &task_id, &worker_id).await.unwrap();
152+
lock_task(&pool, &task_id, &worker_id)
153+
.await
154+
.map_err(AbortError::new)?;
152155
fut.await.map_err(|e| e.into())
153156
}
154157
.boxed()

src/queries/task/ack.sql

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/queries/task/fetch_next.sql

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/queries/task/lock_by_id.sql

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/queries/task/sink.sql

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/queries/worker/register.sql

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)