Skip to content

Commit 0eee474

Browse files
authored
feat: add lifecycle hooks plugin support (#5)
Exposed graphile_worker's lifecycle hooks via add_plugin() builder method. This lets you use the feature to implement metrics or other observability options, or whatever you want. This change is implemented in advance of graphile_worker publishing this feature, so I am not landing the branch yet. Changes: - Added add_plugin<H: LifecycleHooks>() to WorkerRunnerBuilder - Re-exported all lifecycle hook types from graphile_worker - Removed JobMetrics helper (BREAKING: use lifecycle hooks instead) - Added examples/metrics_plugin.rs with comprehensive metrics example - Added tests/plugin_tests.rs with full integration test coverage - Added docs/07-plugins.md with detailed plugin guide Breaking Changes: - Removed JobMetrics struct and manual job instrumentation functions - You should implement metrics via lifecycle hook plugins instead - Automatic client-side metrics (enqueue, DLQ, DB ops) unchanged
1 parent 7853ef9 commit 0eee474

14 files changed

Lines changed: 1864 additions & 478 deletions

Cargo.lock

Lines changed: 365 additions & 60 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ axum = ["dep:axum"]
1616

1717
[dependencies]
1818
chrono = { version = "0.4", features = ["serde"] }
19-
graphile_worker = "0.8"
20-
graphile_worker_crontab_parser = "0.5"
19+
graphile_worker = "0.9.2"
20+
graphile_worker_crontab_parser = "0.5.12"
2121
log = "0.4.28"
2222
metrics = "0.24"
2323
serde = { version = "1.0", features = ["derive"] }
@@ -37,6 +37,7 @@ axum = { version = "0.8", optional = true }
3737
env_logger = "0.11"
3838
axum = "0.8"
3939
fastrand = "2.0"
40+
metrics-exporter-prometheus = "0.18.1"
4041
tower = "0.5"
4142
tower-http = { version = "0.6", features = ["cors", "trace"] }
4243

docs/03-metrics.md

Lines changed: 111 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,23 @@ The backfill library provides comprehensive metrics using the [`metrics`](https:
44

55
## Metrics Philosophy
66

7-
**Automatic where possible, easy when manual:**
8-
-**Automatic metrics** for operations the library controls (enqueueing, DLQ, database)
9-
- 🛠️ **Easy manual instrumentation** for job lifecycle (helper utilities provided)
7+
**Automatic where possible, plugins for job lifecycle:**
8+
-**Automatic metrics** for operations the library controls (enqueueing, DLQ, database, worker lifecycle)
9+
- 🔌 **Lifecycle hook plugins** for job-level metrics (start, complete, fail, duration)
1010
- 🎯 **Bring your own backend** - install any metrics recorder (Prometheus, StatsD, etc.)
1111
- 📊 **Zero overhead** when no recorder is installed
1212

1313
This design keeps the library backend-agnostic while providing comprehensive observability.
1414

15-
## Automatic vs Manual Metrics
15+
## Automatic vs Plugin-Based Metrics
1616

1717
### ✅ Automatically Emitted
1818

1919
The library automatically emits these metrics without any user code:
2020

2121
**Job Operations:**
2222
- **`backfill_jobs_enqueued`** - Recorded when you call `client.enqueue()`
23+
- **`backfill_jobs_already_in_progress`** - Enqueue skipped due to duplicate job key
2324

2425
**DLQ Operations:**
2526
- **`backfill_dlq_jobs_added`** - Jobs moved to DLQ
@@ -36,28 +37,21 @@ The library automatically emits these metrics without any user code:
3637

3738
**You get these for free** - just install a metrics recorder!
3839

39-
### 🛠️ Manual Instrumentation Required
40+
### 🔌 Plugin-Based Metrics (Lifecycle Hooks)
4041

41-
Due to GraphileWorker's architecture, these metrics require manual instrumentation in your task handlers:
42+
For job lifecycle metrics, use lifecycle hook plugins instead of manual instrumentation:
4243

43-
- **`backfill_jobs_started/completed/failed`** - Job lifecycle events
44-
- **`backfill_jobs_duration_seconds`** - Job execution time
44+
- **`jobs_started/completed/failed`** - Job lifecycle events
45+
- **`job_duration_seconds`** - Job execution time (automatically tracked!)
46+
- **`job_wait_time_seconds`** - Queue latency (time from enqueue to start)
4547

46-
**Don't worry** - we provide easy-to-use helpers! See [Manual Instrumentation](#manual-instrumentation) below.
48+
**Benefits of lifecycle hooks:**
49+
- ✨ Automatic duration tracking (no timers needed!)
50+
- ✨ Rich context (will_retry flag, attempt number, job metadata)
51+
- ✨ No code in job handlers (centralized metrics)
52+
- ✨ Consistent across all jobs
4753

48-
### ⏳ Future Enhancements (Require GraphileWorker Hooks)
49-
50-
These metrics are planned but require either GraphileWorker to add lifecycle hooks or using a fork with hooks:
51-
52-
- **`backfill_jobs_wait_time_seconds`** - Queue latency (enqueue to start)
53-
- **`backfill_queue_depth`** - Current queue depth by queue
54-
- **`backfill_queue_active_jobs`** - Active jobs being processed
55-
- **`backfill_worker_utilization`** - Worker utilization percentage
56-
- **`backfill_worker_polls`** - Worker poll operation results
57-
- **`backfill_retries_attempted`** - Retry attempt tracking
58-
- **`backfill_retries_exhausted`** - Jobs that exceeded max retries
59-
60-
These require access to GraphileWorker's internal state that isn't currently exposed. For now, you can approximate some of these with database queries or application-level tracking.
54+
See `docs/07-plugins.md` and `examples/metrics_plugin.rs` for how to implement a metrics plugin.
6155

6256
## Quick Start
6357

@@ -83,122 +77,128 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8377
}
8478
```
8579

86-
### 2. Add Manual Instrumentation to Jobs (Optional but Recommended)
80+
### 2. Add a Metrics Plugin for Job Lifecycle (Optional but Recommended)
8781

8882
```rust
89-
use backfill::{TaskHandler, WorkerContext, IntoTaskHandlerResult};
83+
use backfill::{LifecycleHooks, JobCompleteContext, JobFailContext, WorkerRunner};
9084

91-
struct MyJob {
92-
data: String,
93-
}
85+
#[derive(Clone)]
86+
struct MetricsPlugin;
9487

95-
impl TaskHandler for MyJob {
96-
const IDENTIFIER: &'static str = "my_job";
97-
98-
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
99-
// Option A: Use the JobMetrics helper (easiest!)
100-
backfill::metrics::JobMetrics::new("fast", Self::IDENTIFIER, &ctx)
101-
.instrument(|| async {
102-
// Your job logic here
103-
process_data(&self.data).await
104-
})
105-
.await
88+
impl LifecycleHooks for MetricsPlugin {
89+
async fn on_job_complete(&self, ctx: JobCompleteContext) {
90+
let task = &ctx.job.task_identifier;
91+
let duration = ctx.duration.as_secs_f64();
92+
93+
metrics::counter!("jobs_completed", "task" => task.clone()).increment(1);
94+
metrics::histogram!("job_duration_seconds",
95+
"task" => task.clone(),
96+
"status" => "success"
97+
).record(duration);
10698
}
107-
}
10899

109-
async fn process_data(data: &str) -> Result<(), Box<dyn std::error::Error>> {
110-
// Do work...
111-
Ok(())
100+
async fn on_job_fail(&self, ctx: JobFailContext) {
101+
let status = if ctx.will_retry { "retrying" } else { "failed" };
102+
metrics::counter!("jobs_failed",
103+
"task" => ctx.job.task_identifier.clone(),
104+
"status" => status
105+
).increment(1);
106+
}
112107
}
113-
```
114-
115-
That's it! You now have comprehensive observability.
116108

117-
## Manual Instrumentation
109+
// Add plugin when building worker
110+
let worker = WorkerRunner::builder(config).await?
111+
.define_job::<MyJob>()
112+
.add_plugin(MetricsPlugin) // Handles ALL jobs automatically!
113+
.build().await?;
114+
```
118115

119-
For job lifecycle metrics, you need to add instrumentation to your task handlers. We provide two approaches:
116+
That's it! You now have comprehensive observability. See `examples/metrics_plugin.rs` for a complete example.
120117

121-
### Approach 1: JobMetrics Helper (Recommended)
118+
## Implementing Job Metrics with Lifecycle Hooks
122119

123-
The `JobMetrics` helper automatically handles all metrics for you:
120+
Instead of manual instrumentation in each job handler, implement a metrics plugin once and it applies to all jobs:
124121

125122
```rust
126-
use backfill::{TaskHandler, WorkerContext, IntoTaskHandlerResult, metrics::JobMetrics};
127-
128-
impl TaskHandler for MyJob {
129-
const IDENTIFIER: &'static str = "my_job";
130-
131-
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
132-
JobMetrics::new("fast", Self::IDENTIFIER, &ctx)
133-
.instrument(|| async {
134-
// Your job logic
135-
do_work().await
136-
})
137-
.await
123+
use backfill::{
124+
LifecycleHooks, JobStartContext, JobCompleteContext,
125+
JobFailContext, JobPermanentlyFailContext,
126+
};
127+
128+
#[derive(Clone)]
129+
struct MetricsPlugin;
130+
131+
impl LifecycleHooks for MetricsPlugin {
132+
async fn on_job_start(&self, ctx: JobStartContext) {
133+
metrics::counter!("jobs_started",
134+
"task" => ctx.job.task_identifier.clone()
135+
).increment(1);
138136
}
139-
}
140-
```
141137

142-
**What it does:**
143-
- ✅ Records job start (`backfill_jobs_started`)
144-
- ✅ Records completion/failure (`backfill_jobs_completed` / `backfill_jobs_failed`)
145-
- ✅ Records duration (`backfill_jobs_duration_seconds`)
146-
- ✅ Records retry attempts (`backfill_retries_attempted`) if applicable
147-
- ✅ Classifies error types automatically
138+
async fn on_job_complete(&self, ctx: JobCompleteContext) {
139+
let task = &ctx.job.task_identifier;
140+
let duration = ctx.duration.as_secs_f64(); // Duration tracked automatically!
148141

149-
### Approach 2: Manual Calls
142+
metrics::counter!("jobs_completed",
143+
"task" => task.clone(),
144+
"attempt" => ctx.job.attempts.to_string()
145+
).increment(1);
150146

151-
For more control, call the metric functions directly:
152-
153-
```rust
154-
use backfill::{TaskHandler, WorkerContext, IntoTaskHandlerResult, metrics};
155-
156-
impl TaskHandler for MyJob {
157-
const IDENTIFIER: &'static str = "my_job";
147+
metrics::histogram!("job_duration_seconds",
148+
"task" => task.clone(),
149+
"status" => "success"
150+
).record(duration);
151+
}
158152

159-
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
160-
let start = std::time::Instant::now();
161-
let attempt = *ctx.job().attempts();
153+
async fn on_job_fail(&self, ctx: JobFailContext) {
154+
let task = &ctx.job.task_identifier;
162155

163-
// Record start
164-
metrics::record_job_started("fast", Self::IDENTIFIER);
156+
// Use will_retry flag for better metrics!
157+
let status = if ctx.will_retry { "retrying" } else { "failed" };
165158

166-
// Do work
167-
let result = do_work().await;
159+
metrics::counter!("jobs_failed",
160+
"task" => task.clone(),
161+
"will_retry" => status,
162+
"attempt" => ctx.job.attempts.to_string()
163+
).increment(1);
168164

169-
// Record completion
170-
let duration = start.elapsed().as_secs_f64();
171-
match &result {
172-
Ok(_) => {
173-
metrics::record_job_completed("fast", Self::IDENTIFIER, attempt);
174-
metrics::record_job_duration("fast", Self::IDENTIFIER, "success", duration);
175-
}
176-
Err(e) => {
177-
let error_type = metrics::classify_error_for_metrics(e.as_ref());
178-
metrics::record_job_failed("fast", Self::IDENTIFIER, error_type, attempt);
179-
metrics::record_job_duration("fast", Self::IDENTIFIER, "failed", duration);
180-
}
181-
}
165+
// Classify the error
166+
let error_type = classify_error(&ctx.error);
167+
metrics::counter!("job_errors_by_type",
168+
"task" => task.clone(),
169+
"error_type" => error_type
170+
).increment(1);
171+
}
182172

183-
result
173+
async fn on_job_permanently_fail(&self, ctx: JobPermanentlyFailContext) {
174+
metrics::counter!("jobs_permanently_failed",
175+
"task" => ctx.job.task_identifier.clone()
176+
).increment(1);
184177
}
185178
}
186-
```
187179

188-
### Error Classification
189-
190-
The `classify_error_for_metrics()` function automatically categorizes errors into standard types:
180+
fn classify_error(error: &str) -> &'static str {
181+
let msg = error.to_lowercase();
182+
if msg.contains("timeout") { "timeout" }
183+
else if msg.contains("network") { "network" }
184+
else if msg.contains("not found") { "not_found" }
185+
else if msg.contains("unauthorized") { "unauthorized" }
186+
else if msg.contains("forbidden") { "forbidden" }
187+
else if msg.contains("validation") { "validation" }
188+
else if msg.contains("rate limit") { "rate_limit" }
189+
else if msg.contains("unavailable") { "unavailable" }
190+
else { "unknown" }
191+
}
192+
```
191193

192-
- `timeout` - Timeout errors
193-
- `network` - Network/connection errors
194-
- `not_found` - 404-style errors
195-
- `unauthorized` / `forbidden` - Auth errors
196-
- `validation` - Validation failures
197-
- `rate_limit` - Rate limit errors
198-
- `unavailable` - Service unavailable
199-
- `unknown` - Everything else
194+
**Advantages over manual instrumentation:**
195+
- ✨ Duration automatically tracked (no timers needed!)
196+
-`will_retry` flag tells you if failure is transient
197+
- ✨ No code in job handlers (cleaner separation)
198+
- ✨ Consistent metrics across all jobs
199+
- ✨ Rich job metadata available (attempts, priority, created_at, etc.)
200200

201-
You can also pass your own error type string for more specific classification.
201+
See `examples/metrics_plugin.rs` for a complete working example and `docs/07-plugins.md` for comprehensive documentation.
202202

203203
## Metric Categories
204204

0 commit comments

Comments
 (0)