Skip to content

Commit 1eed953

Browse files
committed
refactor: migrate to new graphile_worker Plugin API
The graphile_worker_lifecycle_hooks crate changed from a trait-based API (LifecycleHooks) to an event-registration API (Plugin trait with HookRegistry). Changes: - Update lib.rs re-exports to include Plugin, HookRegistry, and event types - Migrate DlqCleanupPlugin to use hooks.on(JobComplete, ...) pattern - Update worker.rs add_plugin signature and documentation - Migrate MetricsPlugin example to new API - Migrate all test plugins (CountingPlugin, DurationCheckPlugin, etc.) - Fix hardcoded test database URLs to use consistent defaults
1 parent 4cf4fb9 commit 1eed953

8 files changed

Lines changed: 398 additions & 292 deletions

File tree

Cargo.lock

Lines changed: 160 additions & 128 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ axum = ["dep:axum"]
1616

1717
[dependencies]
1818
chrono = { version = "0.4", features = ["serde"] }
19-
graphile_worker = "0.9.2"
19+
graphile_worker = "0.11.1"
2020
graphile_worker_crontab_parser = "0.5.12"
2121
log = "0.4.28"
2222
metrics = "0.24"
@@ -32,6 +32,7 @@ uuid = { version = "1.0", features = ["v4", "v7", "serde"] }
3232

3333
# Optional dependencies for admin API
3434
axum = { version = "0.8", optional = true }
35+
graphile_worker_lifecycle_hooks = "0.2.2"
3536

3637
[dev-dependencies]
3738
env_logger = "0.11"

examples/metrics_plugin.rs

Lines changed: 95 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
use std::time::Duration;
2121

2222
use backfill::{
23-
BackfillClient, BackfillError, IntoTaskHandlerResult, JobCompleteContext, JobFailContext,
24-
JobPermanentlyFailContext, JobStartContext, LifecycleHooks, TaskHandler, WorkerConfig, WorkerContext, WorkerRunner,
25-
WorkerShutdownContext, WorkerStartContext, enqueue_fast,
23+
BackfillClient, BackfillError, HookRegistry, IntoTaskHandlerResult, JobComplete, JobCompleteContext, JobFail,
24+
JobFailContext, JobPermanentlyFail, JobPermanentlyFailContext, JobStart, JobStartContext, Plugin, TaskHandler,
25+
WorkerConfig, WorkerContext, WorkerRunner, WorkerShutdown, WorkerShutdownContext, WorkerStart, WorkerStartContext,
26+
enqueue_fast,
2627
};
2728
use serde::{Deserialize, Serialize};
2829
use tokio_util::sync::CancellationToken;
@@ -37,95 +38,97 @@ use tokio_util::sync::CancellationToken;
3738
#[derive(Clone)]
3839
struct MetricsPlugin;
3940

40-
impl LifecycleHooks for MetricsPlugin {
41-
async fn on_worker_start(&self, ctx: WorkerStartContext) {
42-
metrics::gauge!("backfill_worker_active").set(1.0);
43-
log::info!("Worker {} started - metrics recording enabled", ctx.worker_id);
44-
}
45-
46-
async fn on_worker_shutdown(&self, ctx: WorkerShutdownContext) {
47-
metrics::gauge!("backfill_worker_active").set(0.0);
48-
log::info!("Worker {} shutdown (reason: {:?})", ctx.worker_id, ctx.reason);
49-
}
50-
51-
async fn on_job_start(&self, ctx: JobStartContext) {
52-
let task = ctx.job.task_identifier();
53-
let attempt = ctx.job.attempts();
54-
55-
metrics::counter!("jobs_started", "task" => task.clone()).increment(1);
56-
57-
log::debug!("Job started: {} (attempt {})", task, attempt);
58-
59-
// Track wait time (time from creation to start)
60-
let created_at = ctx.job.created_at();
61-
let wait_time = chrono::Utc::now().signed_duration_since(*created_at).num_milliseconds() as f64 / 1000.0;
62-
63-
metrics::histogram!("job_wait_time_seconds", "task" => task.clone()).record(wait_time);
64-
}
65-
66-
async fn on_job_complete(&self, ctx: JobCompleteContext) {
67-
let task = ctx.job.task_identifier();
68-
let attempt = ctx.job.attempts();
69-
let duration = ctx.duration.as_secs_f64();
70-
71-
metrics::counter!("jobs_completed", "task" => task.clone(), "attempt" => attempt.to_string()).increment(1);
72-
73-
metrics::histogram!("job_duration_seconds", "task" => task.clone(), "status" => "success").record(duration);
74-
75-
log::info!(
76-
"Job completed: {} (attempt {}, duration: {:.2}s)",
77-
task,
78-
attempt,
79-
duration
80-
);
81-
}
82-
83-
async fn on_job_fail(&self, ctx: JobFailContext) {
84-
let task = ctx.job.task_identifier();
85-
let attempt = ctx.job.attempts();
86-
let will_retry = ctx.will_retry;
87-
88-
// Use will_retry to distinguish between transient failures and final failures
89-
let status = if will_retry { "retrying" } else { "failed" };
90-
91-
metrics::counter!(
92-
"jobs_failed",
93-
"task" => task.clone(),
94-
"attempt" => attempt.to_string(),
95-
"will_retry" => status
96-
)
97-
.increment(1);
98-
99-
// Classify the error type for more detailed metrics
100-
let error_type = classify_error(&ctx.error);
101-
metrics::counter!(
102-
"job_errors_by_type",
103-
"task" => task.clone(),
104-
"error_type" => error_type
105-
)
106-
.increment(1);
107-
108-
log::warn!(
109-
"Job failed: {} (attempt {}, will_retry: {}, error: {})",
110-
task,
111-
attempt,
112-
will_retry,
113-
ctx.error
114-
);
115-
}
116-
117-
async fn on_job_permanently_fail(&self, ctx: JobPermanentlyFailContext) {
118-
let task = ctx.job.task_identifier();
119-
let final_attempt = ctx.job.attempts();
120-
121-
metrics::counter!("jobs_permanently_failed", "task" => task.clone()).increment(1);
122-
123-
log::error!(
124-
"Job permanently failed: {} (final attempt: {}, error: {})",
125-
task,
126-
final_attempt,
127-
ctx.error
128-
);
41+
impl Plugin for MetricsPlugin {
42+
fn register(self, hooks: &mut HookRegistry) {
43+
hooks.on(WorkerStart, |ctx: WorkerStartContext| async move {
44+
metrics::gauge!("backfill_worker_active").set(1.0);
45+
log::info!("Worker {} started - metrics recording enabled", ctx.worker_id);
46+
});
47+
48+
hooks.on(WorkerShutdown, |ctx: WorkerShutdownContext| async move {
49+
metrics::gauge!("backfill_worker_active").set(0.0);
50+
log::info!("Worker {} shutdown (reason: {:?})", ctx.worker_id, ctx.reason);
51+
});
52+
53+
hooks.on(JobStart, |ctx: JobStartContext| async move {
54+
let task = ctx.job.task_identifier();
55+
let attempt = ctx.job.attempts();
56+
57+
metrics::counter!("jobs_started", "task" => task.clone()).increment(1);
58+
59+
log::debug!("Job started: {} (attempt {})", task, attempt);
60+
61+
// Track wait time (time from creation to start)
62+
let created_at = ctx.job.created_at();
63+
let wait_time = chrono::Utc::now().signed_duration_since(*created_at).num_milliseconds() as f64 / 1000.0;
64+
65+
metrics::histogram!("job_wait_time_seconds", "task" => task.clone()).record(wait_time);
66+
});
67+
68+
hooks.on(JobComplete, |ctx: JobCompleteContext| async move {
69+
let task = ctx.job.task_identifier();
70+
let attempt = ctx.job.attempts();
71+
let duration = ctx.duration.as_secs_f64();
72+
73+
metrics::counter!("jobs_completed", "task" => task.clone(), "attempt" => attempt.to_string()).increment(1);
74+
75+
metrics::histogram!("job_duration_seconds", "task" => task.clone(), "status" => "success").record(duration);
76+
77+
log::info!(
78+
"Job completed: {} (attempt {}, duration: {:.2}s)",
79+
task,
80+
attempt,
81+
duration
82+
);
83+
});
84+
85+
hooks.on(JobFail, |ctx: JobFailContext| async move {
86+
let task = ctx.job.task_identifier();
87+
let attempt = ctx.job.attempts();
88+
let will_retry = ctx.will_retry;
89+
90+
// Use will_retry to distinguish between transient failures and final failures
91+
let status = if will_retry { "retrying" } else { "failed" };
92+
93+
metrics::counter!(
94+
"jobs_failed",
95+
"task" => task.clone(),
96+
"attempt" => attempt.to_string(),
97+
"will_retry" => status
98+
)
99+
.increment(1);
100+
101+
// Classify the error type for more detailed metrics
102+
let error_type = classify_error(&ctx.error);
103+
metrics::counter!(
104+
"job_errors_by_type",
105+
"task" => task.clone(),
106+
"error_type" => error_type
107+
)
108+
.increment(1);
109+
110+
log::warn!(
111+
"Job failed: {} (attempt {}, will_retry: {}, error: {})",
112+
task,
113+
attempt,
114+
will_retry,
115+
ctx.error
116+
);
117+
});
118+
119+
hooks.on(JobPermanentlyFail, |ctx: JobPermanentlyFailContext| async move {
120+
let task = ctx.job.task_identifier();
121+
let final_attempt = ctx.job.attempts();
122+
123+
metrics::counter!("jobs_permanently_failed", "task" => task.clone()).increment(1);
124+
125+
log::error!(
126+
"Job permanently failed: {} (final attempt: {}, error: {})",
127+
task,
128+
final_attempt,
129+
ctx.error
130+
);
131+
});
129132
}
130133
}
131134

src/dlq_cleanup_plugin.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! The cleanup is based on `job_key` - if a successful job has a `job_key`, any
99
//! DLQ entries with that same `job_key` are deleted.
1010
11-
use graphile_worker::{JobCompleteContext, LifecycleHooks};
11+
use graphile_worker::{HookRegistry, JobComplete, JobCompleteContext, Plugin};
1212

1313
use crate::BackfillClient;
1414

@@ -29,31 +29,37 @@ impl DlqCleanupPlugin {
2929
}
3030
}
3131

32-
impl LifecycleHooks for DlqCleanupPlugin {
33-
async fn on_job_complete(&self, ctx: JobCompleteContext) {
34-
// Only clean up if the job has a job_key
35-
if let Some(job_key) = ctx.job.key() {
36-
match self.client.delete_dlq_by_job_key(job_key).await {
37-
Ok(deleted) if deleted > 0 => {
38-
log::debug!(
39-
"Cleaned up DLQ entry after successful job (job_key: {}, task: {})",
40-
job_key,
41-
ctx.job.task_identifier()
42-
);
43-
}
44-
Ok(_) => {
45-
// No DLQ entry to clean up, this is normal for jobs that
46-
// never failed
47-
}
48-
Err(e) => {
49-
// Log but don't fail - this is a best-effort cleanup
50-
log::warn!(
51-
"Failed to clean up DLQ entry after job success (job_key: {}, error: {})",
52-
job_key,
53-
e
54-
);
32+
impl Plugin for DlqCleanupPlugin {
33+
fn register(self, hooks: &mut HookRegistry) {
34+
let client = self.client;
35+
hooks.on(JobComplete, move |ctx: JobCompleteContext| {
36+
let client = client.clone();
37+
async move {
38+
// Only clean up if the job has a job_key
39+
if let Some(job_key) = ctx.job.key() {
40+
match client.delete_dlq_by_job_key(job_key).await {
41+
Ok(deleted) if deleted > 0 => {
42+
log::debug!(
43+
"Cleaned up DLQ entry after successful job (job_key: {}, task: {})",
44+
job_key,
45+
ctx.job.task_identifier()
46+
);
47+
}
48+
Ok(_) => {
49+
// No DLQ entry to clean up, this is normal for jobs
50+
// that never failed
51+
}
52+
Err(e) => {
53+
// Log but don't fail - this is a best-effort cleanup
54+
log::warn!(
55+
"Failed to clean up DLQ entry after job success (job_key: {}, error: {})",
56+
job_key,
57+
e
58+
);
59+
}
60+
}
5561
}
5662
}
57-
}
63+
});
5864
}
5965
}

src/lib.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,45 @@
6060
//! ```
6161
6262
use chrono::{DateTime, Utc};
63-
// Lifecycle hooks for plugins
63+
// Lifecycle hooks for plugins - new Plugin API with event registration
6464
pub use graphile_worker::{
65+
// Event types (for hooks.on() registration)
66+
AfterJobRun,
67+
// Context types (for hook handlers)
6568
AfterJobRunContext,
69+
BeforeJobRun,
6670
BeforeJobRunContext,
71+
BeforeJobSchedule,
6772
BeforeJobScheduleContext,
73+
CronJobScheduled,
6874
CronJobScheduledContext,
75+
CronTick,
6976
CronTickContext,
77+
// Plugin trait and registry
78+
HookRegistry,
7079
// Result types
7180
HookResult,
7281
// Job type for accessing job data
7382
Job,
83+
JobComplete,
7484
JobCompleteContext,
85+
JobFail,
7586
JobFailContext,
87+
JobFetch,
7688
JobFetchContext,
89+
JobPermanentlyFail,
7790
JobPermanentlyFailContext,
7891
JobScheduleResult,
92+
JobStart,
7993
JobStartContext,
80-
// Trait
81-
LifecycleHooks,
94+
Plugin,
95+
// Other
8296
ShutdownReason,
83-
// Context types
97+
WorkerInit,
8498
WorkerInitContext,
99+
WorkerShutdown,
85100
WorkerShutdownContext,
101+
WorkerStart,
86102
WorkerStartContext,
87103
};
88104
// Re-export commonly used types from graphile_worker

src/worker.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ use std::time::Duration;
9898
use tokio::task::JoinHandle;
9999
use tokio_util::sync::CancellationToken;
100100

101-
use crate::{BackfillClient, BackfillError, LifecycleHooks, TaskHandler, WorkerOptions};
101+
use crate::{BackfillClient, BackfillError, Plugin, TaskHandler, WorkerOptions};
102102

103103
// Re-export for use in wrapper
104104
/// Configuration for a worker queue
@@ -604,20 +604,28 @@ impl WorkerRunnerBuilder {
604604
/// Multiple plugins can be registered and will be called in registration
605605
/// order.
606606
///
607+
/// # Plugin API
608+
/// Plugins implement the `Plugin` trait which has a single `register`
609+
/// method that receives a `HookRegistry`. Use `hooks.on(EventType,
610+
/// handler)` to register handlers for specific events.
611+
///
607612
/// # Requirements
608613
/// The plugin must implement `Clone` to support worker cloning for
609-
/// background tasks.
614+
/// background tasks. Any state shared between handlers must be wrapped
615+
/// in `Arc`.
610616
///
611617
/// # Example
612618
/// ```rust,no_run
613-
/// use backfill::{WorkerRunner, WorkerConfig, LifecycleHooks, JobCompleteContext};
619+
/// use backfill::{WorkerRunner, WorkerConfig, Plugin, HookRegistry, JobComplete, JobCompleteContext};
614620
///
615621
/// #[derive(Clone)]
616622
/// struct MyPlugin;
617623
///
618-
/// impl LifecycleHooks for MyPlugin {
619-
/// async fn on_job_complete(&self, ctx: JobCompleteContext) {
620-
/// println!("Job {} completed in {:?}", ctx.job.task_identifier(), ctx.duration);
624+
/// impl Plugin for MyPlugin {
625+
/// fn register(self, hooks: &mut HookRegistry) {
626+
/// hooks.on(JobComplete, |ctx: JobCompleteContext| async move {
627+
/// println!("Job {} completed in {:?}", ctx.job.task_identifier(), ctx.duration);
628+
/// });
621629
/// }
622630
/// }
623631
///
@@ -629,7 +637,7 @@ impl WorkerRunnerBuilder {
629637
/// # Ok(())
630638
/// # }
631639
/// ```
632-
pub fn add_plugin<H: LifecycleHooks + Clone + 'static>(mut self, plugin: H) -> Self {
640+
pub fn add_plugin<H: Plugin + Clone + 'static>(mut self, plugin: H) -> Self {
633641
// Store a closure that captures the plugin and applies it to WorkerOptions
634642
let applier = Arc::new(move |opts: WorkerOptions| opts.add_plugin(plugin.clone()));
635643
self.plugin_appliers.push(applier);

tests/integration_tests_clean.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ struct TestJob {
1313

1414
/// Get a test database URL from environment or use default
1515
fn get_test_database_url() -> String {
16-
std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgresql://ceej@localhost:5432/backfill_test".to_string())
16+
std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgresql://localhost:5432/backfill_test".to_string())
1717
}
1818

1919
/// One-time setup of test database

0 commit comments

Comments
 (0)