Skip to content

Commit de4ecd8

Browse files
committed
docs(retries): mark dead RetryPolicy timing fields as deprecated
`RetryPolicy.{initial_delay, max_delay, backoff_multiplier, jitter_factor}` were stored on the struct but never reached graphile_worker — only `max_attempts` is forwarded by `From<JobSpec> for GraphileJobSpec`. graphile_worker uses a hard-coded `exp(min(attempts, 10))` second SQL formula for every retry. So `RetryPolicy::fast()` and `RetryPolicy::conservative()` produced identical retry timing in practice even though the docs promised "100ms-30s delays" vs. "1 min - 8 hour delays". This commit makes the fact match the promise: - Marks the unused math helpers (`RetryPolicy::new`, `with_jitter`, `calculate_delay`, `calculate_retry_time`, and `JobSpec::calculate_retry_time`) as `#[deprecated(since = "1.2.0")]` with notes pointing users at `RetryPolicy { max_attempts: n, ..Default::default() }` or the presets. - Rewrites the rustdoc on `RetryPolicy`, on each preset, on the `with_*_retries` builders, and on the `enqueue_*_with_retries` convenience helpers to describe what actually happens (only `max_attempts` differs across presets, fixed exp-backoff timing). - Updates the lib.rs module-level rustdoc. - Migrates `examples/enqueue_jobs.rs` to the recommended pattern so it doesn't trigger the new deprecation warnings. - Updates README.md to drop the false delay-range claims and replace the wrong "Pre-configured Fast/Bulk queues" / "Custom(name)" listing with the actual `Queue::Parallel` / `Queue::Serial(name)` enum. - Updates docs/02-dlq.md to mark the post-#9 "queue_name shows as default" warning as resolved (it was the change in v1.1.1 that fixed this). The struct fields themselves stay public for source-compatibility with existing struct-literal construction. Per-job backoff customization needs upstream graphile_worker support and is deferred.
1 parent cc7f981 commit de4ecd8

5 files changed

Lines changed: 224 additions & 72 deletions

File tree

README.md

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# backfill
22

33
[![CI](https://github.com/ceejbot/backfill/workflows/CI/badge.svg)](https://github.com/ceejbot/backfill/actions)
4-
[![Coverage](https://img.shields.io/badge/coverage-64.67%25-yellow)](https://github.com/ceejbot/backfill/actions)
54
[![Security](https://github.com/ceejbot/backfill/actions/workflows/security.yml/badge.svg)](https://github.com/ceejbot/backfill/actions/workflows/security.yml)
65

76
A boringly-named priority queue system for doing async work. This library and work process wrap the the [graphile_worker crate](https://lib.rs/crates/graphile_worker) to do things the way I want to do them. It's unlikely you'll want to do things exactly this way, but perhaps you can learn by reading the code, or get a jumpstart by borrowing open-source code, or heck, maybe this will do what you need.
@@ -10,37 +9,70 @@ A boringly-named priority queue system for doing async work. This library and wo
109

1110
This is a postgres-backed async work queue library that is a set of conveniences and features on top of the rust port of Graphile Worker. It gives you a library you can integrate with your own project to handle background tasks.
1211

13-
> **Status**: Core features are complete and tested (64.67%% test coverage, 55 tests). The library is suitable for production use for job enqueueing, worker processing, and DLQ management. The Admin API (feature-gated) is experimental. See [CHANGELOG.md](CHANGELOG.md) for details and [Known Limitations](docs/02-dlq.md#known-limitations).
12+
> **Status**: Core features are complete and covered by an integration test
13+
> suite. The library is suitable for production use for job enqueueing,
14+
> worker processing, and DLQ management. The Admin API (feature-gated) is
15+
> experimental. See [CHANGELOG.md](CHANGELOG.md) for details and
16+
> [Known Limitations](docs/02-dlq.md#known-limitations).
1417
1518
### What's New Over graphile_worker
1619

17-
Built on top of `graphile_worker` (v0.8.6), backfill adds these production-ready features:
18-
19-
- 🎯 **Priority System** - Six-level priority queue (EMERGENCY to BULK_LOWEST) with numeric priority values
20-
- 📦 **Named Queues** - Pre-configured Fast/Bulk queues plus custom queue support
21-
- 🔄 **Smart Retry Policies** - Exponential backoff with jitter (fast/aggressive/conservative presets)
22-
- 💀 **Dead Letter Queue (DLQ)** - Automatic failed job handling with query/requeue/deletion APIs
23-
- 📊 **Comprehensive Metrics** - Prometheus-compatible metrics for jobs, DLQ, and database operations
24-
- 🛠️ **High-Level Client API** - `BackfillClient` with ergonomic enqueueing helpers
25-
- 🏃 **Flexible Worker Patterns** - `WorkerRunner` supporting tokio::select!, background tasks, and one-shot processing
26-
- 🔧 **Admin API** - Optional Axum router for HTTP-based job management (experimental)
27-
- 📝 **Convenience Functions** - `enqueue_fast()`, `enqueue_bulk()`, `enqueue_critical()`, etc.
28-
- 🧹 **Stale Lock Cleanup** - Automatic cleanup of orphaned locks from crashed workers (startup + periodic)
20+
Built on top of `graphile_worker` (v0.11.x), backfill adds these production-ready features:
21+
22+
- 🎯 **Priority System** — Six-level priority enum (EMERGENCY=-20 down through
23+
BULK_LOWEST=10), mapped through to graphile_worker's `priority asc` fetch
24+
ordering.
25+
- 📦 **Parallel + Serial queues**`Queue::Parallel` (default, jobs run
26+
concurrently across workers) or `Queue::Serial(name)` (one-job-at-a-time
27+
per named queue, for rate limiting or per-entity ordering).
28+
- 🔄 **Retry policy presets**`fast` / `aggressive` / `conservative`
29+
presets that differ in `max_attempts`. Note: backoff *timing* is fixed by
30+
graphile_worker (see [`docs/02-dlq.md`](docs/02-dlq.md)) — only the
31+
attempt count is configurable.
32+
- 💀 **Dead Letter Queue (DLQ)** — Automatic failed-job handling with
33+
query/requeue/deletion APIs. Includes a permanent-failure short-circuit
34+
plugin: handlers that return non-retryable `WorkerError` variants land in
35+
the DLQ on the first failure rather than waiting for `max_attempts` to
36+
exhaust.
37+
- 📊 **Comprehensive Metrics** — Prometheus-compatible metrics for jobs,
38+
DLQ, and database operations.
39+
- 🛠️ **High-Level Client API**`BackfillClient` with ergonomic enqueueing
40+
helpers.
41+
- 🏃 **Flexible Worker Patterns**`WorkerRunner` supporting
42+
`tokio::select!`, background tasks, and one-shot processing.
43+
- 🔧 **Admin API** — Optional Axum router for HTTP-based job management
44+
(experimental).
45+
- 📝 **Convenience Functions**`enqueue_fast()`, `enqueue_bulk()`,
46+
`enqueue_critical()`, etc.
47+
- 🧹 **Stale Lock Cleanup** — Automatic cleanup of orphaned locks from
48+
crashed workers (startup + periodic). Ordered correctly with the DLQ
49+
scanner so failed jobs aren't lost across restarts.
2950

3051
All built on graphile_worker's rock-solid foundation of PostgreSQL SKIP LOCKED and LISTEN/NOTIFY.
3152

3253
### Features
3354

34-
- **Priority queues**: EMERGENCY, FAST_HIGH, FAST_DEFAULT, BULK_DEFAULT, BULK_LOW, BULK_LOWEST
35-
- **Named queues**: Fast, Bulk, DeadLetter, Custom(name)
36-
- **Scheduling**: Immediate or delayed execution with `run_at`
37-
- **Idempotency**: Use `job_key` for deduplication
38-
- **Exponential backoff**: Built-in retry policies with jitter to prevent thundering herds
39-
- **Dead letter queue**: Handling jobs that experience un-retryable failures or exceed their retry limits
40-
- **Error handling**: Automatic retry classification
41-
- **Metrics**: Comprehensive metrics via the `metrics` crate - bring your own exporter (Prometheus, StatsD, etc.)
42-
- **Monitoring**: Structured logging and tracing throughout
43-
- **Building blocks for an axum admin api**: via a router you can mount on your own axum api server
55+
- **Priority queues**: EMERGENCY (-20), FAST_HIGH (-10), FAST_DEFAULT (-5),
56+
BULK_DEFAULT (0), BULK_LOW (5), BULK_LOWEST (10) — lower number = higher
57+
priority.
58+
- **Queue types**: `Queue::Parallel` (default), `Queue::Serial(name)` — plus
59+
`Queue::serial_for(entity, id)` for per-entity ordering.
60+
- **Scheduling**: Immediate or delayed execution with `run_at`.
61+
- **Idempotency**: Use `job_key` for deduplication.
62+
- **Retries**: Configurable `max_attempts` per job; graphile_worker handles
63+
the exponential-backoff schedule (`exp(min(attempts, 10))` seconds, capped
64+
at ~6h per retry).
65+
- **Dead letter queue**: Automatic capture of jobs that exceed their retry
66+
limits or return non-retryable errors. Includes a synchronous startup
67+
pre-move so DLQ doesn't lose jobs across worker restarts.
68+
- **Error classification**: `WorkerError` variants split into retryable and
69+
non-retryable; non-retryable errors short-circuit retries to DLQ via an
70+
auto-registered lifecycle plugin.
71+
- **Metrics**: Comprehensive metrics via the `metrics` crate — bring your
72+
own exporter (Prometheus, StatsD, etc.).
73+
- **Monitoring**: Structured logging and tracing throughout.
74+
- **Building blocks for an axum admin api**: via a router you can mount on
75+
your own axum api server.
4476

4577
Look at the `examples/` directory and the readme there for practical usage examples.
4678

docs/02-dlq.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -880,16 +880,21 @@ The DLQ system is fully functional for production use, but has a few known limit
880880

881881
### 1. Queue Name Tracking
882882

883-
**Issue**: DLQ entries may show `queue_name` as `"default"` even if the job ran in a different queue (e.g., "fast" or "bulk").
883+
**Status**: Fixed in v1.1.1 (PR #9).
884884

885-
**Cause**: The GraphileWorker `Job` struct doesn't expose the queue name field, so when jobs are moved to the DLQ, the queue name defaults to `"default"`.
885+
DLQ entries now correctly preserve the queue type of the original job:
886886

887-
**Workarounds**:
888-
- The `task_identifier` field is always accurate and can be used for filtering
889-
- Job priority is preserved, which often correlates with queue assignment
890-
- For critical workflows, track queue assignment in your application logs or metrics
887+
- Parallel jobs (`Queue::Parallel`) are stored with an empty `queue_name`.
888+
When requeued, they go back to `Queue::Parallel` — concurrent execution
889+
is preserved.
890+
- Serial jobs (`Queue::Serial(name)`) are stored with their queue name.
891+
When requeued, they go back to `Queue::Serial(name)` — single-job-at-a-
892+
time semantics are preserved.
891893

892-
**Future**: This will be resolved when GraphileWorker exposes queue_name on the Job struct, or when we implement direct database querying.
894+
The DDL retains a `DEFAULT 'default'` clause for the `queue_name` column for
895+
schema compatibility, but it is never used by the production code path —
896+
`add_to_dlq` and `process_failed_jobs` always pass an explicit value
897+
(possibly the empty string for parallel jobs).
893898

894899
### 2. Payload Visibility
895900

@@ -929,7 +934,7 @@ These limitations are minor and don't affect the core DLQ functionality:
929934
-**Error message capture** - Fully functional
930935
-**Requeuing workflows** - Production-ready
931936
-**Statistics and monitoring** - Complete
932-
- ⚠️ **Queue name tracking** - Shows "default" for all queues
937+
- **Queue name tracking** - Fixed in v1.1.1 (parallel ↔ serial round-trip preserved)
933938
- ⚠️ **Payload inspection** - Requires direct DB access
934939
- ⚠️ **Job cancellation** - Not yet implemented
935940

examples/enqueue_jobs.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
248248
outcome.expect("outcome should contain a job").id()
249249
);
250250

251-
// Enqueue a job with custom retry policy
252-
let custom_retry_policy = RetryPolicy::new(
253-
6, // 6 attempts
254-
std::time::Duration::from_millis(500), // Start with 500ms
255-
std::time::Duration::from_secs(60), // Cap at 1 minute
256-
1.8, // 1.8x multiplier
257-
)
258-
.with_jitter(0.2); // 20% jitter
251+
// Enqueue a job with custom retry settings.
252+
// Note: only `max_attempts` affects runtime behaviour. graphile_worker
253+
// schedules retries via a fixed `exp(min(attempts, 10))` second formula,
254+
// so the timing fields on RetryPolicy are not honored. See the docs on
255+
// `RetryPolicy` for the full story.
256+
let custom_retry_policy = RetryPolicy {
257+
max_attempts: 6,
258+
..Default::default()
259+
};
259260

260261
let custom_job = GenerateReportJob {
261262
report_type: "analytics_summary".to_string(),

src/lib.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@
99
//! - **Parallel execution** by default - jobs run concurrently across all
1010
//! workers
1111
//! - **Serial queues** when you need ordering or rate limiting
12-
//! - **Exponential backoff** with jitter to prevent thundering herds
13-
//! - **Flexible retry policies** (fast, aggressive, conservative, or custom)
12+
//! - **Exponential backoff retries** via graphile_worker (timing is
13+
//! `exp(min(attempts, 10))` seconds; fixed, not per-job tunable — see
14+
//! [`RetryPolicy`])
15+
//! - **Configurable max-attempts per job** with `fast`, `aggressive`, and
16+
//! `conservative` presets
17+
//! - **Permanent-failure short-circuit** — non-retryable `WorkerError`
18+
//! variants land in the DLQ on the first failure instead of waiting for
19+
//! `max_attempts` to exhaust
1420
//! - **Dead letter queue** handling for failed jobs
1521
//! - **Type-safe job handlers** using Rust's type system
1622
//! - **Low-latency execution** via PostgreSQL LISTEN/NOTIFY
@@ -362,46 +368,70 @@ impl Default for JobSpec {
362368
}
363369

364370
impl JobSpec {
365-
/// Create a JobSpec with exponential backoff retry policy
371+
/// Attach a [`RetryPolicy`] to this JobSpec.
372+
///
373+
/// Sets `max_attempts` from the policy. Note that backoff timing fields
374+
/// on the policy are stored but not honored at runtime — see
375+
/// [`RetryPolicy`].
366376
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
367377
self.max_attempts = Some(retry_policy.max_attempts);
368378
self.retry_policy = Some(retry_policy);
369379
self
370380
}
371381

372-
/// Create a JobSpec optimized for fast retries
382+
/// Configure for the `fast` preset: `max_attempts = 3`.
383+
///
384+
/// In practice this differs from [`with_aggressive_retries`] and
385+
/// [`with_conservative_retries`] only in the attempt count — see
386+
/// [`RetryPolicy`].
373387
pub fn with_fast_retries(mut self) -> Self {
374388
let policy = RetryPolicy::fast();
375389
self.max_attempts = Some(policy.max_attempts);
376390
self.retry_policy = Some(policy);
377391
self
378392
}
379393

380-
/// Create a JobSpec optimized for aggressive retries
394+
/// Configure for the `aggressive` preset: `max_attempts = 12`.
395+
///
396+
/// At graphile_worker's fixed exp-backoff schedule, 12 attempts gives
397+
/// roughly half a day of cumulative retry coverage before DLQ.
381398
pub fn with_aggressive_retries(mut self) -> Self {
382399
let policy = RetryPolicy::aggressive();
383400
self.max_attempts = Some(policy.max_attempts);
384401
self.retry_policy = Some(policy);
385402
self
386403
}
387404

388-
/// Create a JobSpec optimized for conservative retries
405+
/// Configure for the `conservative` preset: `max_attempts = 5`.
389406
pub fn with_conservative_retries(mut self) -> Self {
390407
let policy = RetryPolicy::conservative();
391408
self.max_attempts = Some(policy.max_attempts);
392409
self.retry_policy = Some(policy);
393410
self
394411
}
395412

396-
/// Get the effective retry policy (returns default if none specified)
413+
/// Get the effective retry policy (returns default if none specified).
414+
///
415+
/// **Note:** Only `max_attempts` from the returned policy reaches
416+
/// graphile_worker. See [`RetryPolicy`] for details.
397417
pub fn effective_retry_policy(&self) -> RetryPolicy {
398418
self.retry_policy.clone().unwrap_or_default()
399419
}
400420

401-
/// Calculate the next retry time for a failed job
421+
/// Calculate what the next retry time *would* be under this spec's
422+
/// policy.
423+
///
424+
/// **Not used at runtime.** graphile_worker schedules retries via a
425+
/// fixed SQL formula. This method is preserved as a utility but has no
426+
/// effect on actual job behaviour.
427+
#[deprecated(
428+
since = "1.2.0",
429+
note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect."
430+
)]
402431
pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime<Utc>) -> Option<DateTime<Utc>> {
403432
let policy = self.effective_retry_policy();
404433
if policy.should_retry(attempt) {
434+
#[allow(deprecated)]
405435
Some(policy.calculate_retry_time(attempt, failed_at))
406436
} else {
407437
None // No more retries
@@ -504,10 +534,12 @@ where
504534
client.enqueue(task_identifier, payload, spec).await
505535
}
506536

507-
/// Enqueue a high-priority job with fast exponential backoff retries.
537+
/// Enqueue a high-priority job configured for a low retry count (3 attempts).
508538
///
509-
/// Best for high-priority jobs that need quick retries (3 attempts, 100ms-30s
510-
/// delays).
539+
/// Use for jobs where rapid failure-to-DLQ is preferred over many retries.
540+
/// graphile_worker's retry timing is fixed at `exp(min(attempts, 10))` seconds
541+
/// regardless of policy — see [`RetryPolicy`] — so the only difference between
542+
/// this and other `_with_retries` helpers is the attempt cap.
511543
pub async fn enqueue_fast_with_retries<T>(
512544
client: &BackfillClient,
513545
task_identifier: &str,
@@ -527,9 +559,12 @@ where
527559
client.enqueue(task_identifier, payload, spec).await
528560
}
529561

530-
/// Enqueue a critical job with aggressive exponential backoff retries.
562+
/// Enqueue a critical job with a high retry count (12 attempts).
531563
///
532-
/// Best for critical jobs that must succeed (12 attempts, up to 4 hour delays).
564+
/// Use for jobs that must eventually succeed if at all possible. graphile_worker
565+
/// retries on a fixed `exp(min(attempts, 10))` second schedule, capping at
566+
/// ~6h per retry — so 12 attempts gives roughly half a day of total retry
567+
/// coverage. See [`RetryPolicy`] for the full timing.
533568
pub async fn enqueue_critical<T>(
534569
client: &BackfillClient,
535570
task_identifier: &str,
@@ -549,10 +584,13 @@ where
549584
client.enqueue(task_identifier, payload, spec).await
550585
}
551586

552-
/// Enqueue a bulk job with conservative exponential backoff retries.
587+
/// Enqueue a bulk job with a moderate retry count (5 attempts via the
588+
/// `conservative` preset).
553589
///
554-
/// Best for background jobs where consistency matters more than speed
555-
/// (8 attempts, 1 min - 8 hour delays).
590+
/// Use for background jobs that should be retried but where you don't want
591+
/// many attempts. graphile_worker's retry timing is `exp(min(attempts, 10))`
592+
/// seconds — see [`RetryPolicy`] — so this gives roughly 1s, 3s, 7s, 20s,
593+
/// 55s before the job lands in DLQ.
556594
pub async fn enqueue_bulk_with_retries<T>(
557595
client: &BackfillClient,
558596
task_identifier: &str,

0 commit comments

Comments
 (0)