Skip to content

Commit 6e6f4ab

Browse files
committed
feat: Expose the cron features of graphile_worker
You can call worker.define_job<T>.add_cron_schedule() to add a cron task. This is a very thin wrapper over what's available in the graphile worker crate. We also re-expose the types from its crontab parser crate in case those should come in handy.
1 parent 0e19faf commit 6e6f4ab

7 files changed

Lines changed: 251 additions & 52 deletions

File tree

AGENTS.md

Lines changed: 0 additions & 37 deletions
This file was deleted.

Cargo.lock

Lines changed: 14 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ axum = ["dep:axum"]
1717
[dependencies]
1818
chrono = { version = "0.4", features = ["serde"] }
1919
graphile_worker = "0.8"
20+
graphile_worker_crontab_parser = "0.5"
2021
log = "0.4.28"
2122
metrics = "0.24"
2223
serde = { version = "1.0", features = ["derive"] }

src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ pub enum BackfillError {
3535
#[error("Worker runtime error: {0}")]
3636
WorkerRuntime(String),
3737
#[error(transparent)]
38+
CrontabParse(#[from] graphile_worker_crontab_parser::CrontabParseError),
39+
#[error(transparent)]
3840
SqlxError(#[from] sqlx::Error),
3941
#[error(transparent)]
4042
JsonError(#[from] serde_json::Error),

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ use chrono::{DateTime, Utc};
6363
// Re-export commonly used types from graphile_worker
6464
pub use graphile_worker::{IntoTaskHandlerResult, JobKeyMode, TaskHandler, WorkerContext, WorkerOptions};
6565
use graphile_worker::{Job, JobSpec as GraphileJobSpec, JobSpecBuilder};
66+
// Re-export crontab parsing types for advanced usage
67+
pub use graphile_worker_crontab_parser::{CrontabParseError, parse_crontab};
6668
use serde::Serialize;
6769

6870
mod client;

src/worker.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ pub struct WorkerOptionsBuilder {
213213
pub(crate) concurrency: usize,
214214
pub(crate) queue_name: Option<String>,
215215
pub(crate) job_handlers: Vec<JobHandlerConfig>,
216+
pub(crate) crontabs: Vec<String>,
216217
}
217218

218219
/// Configuration for a job handler that can be recreated
@@ -251,6 +252,7 @@ impl WorkerOptionsBuilder {
251252
concurrency,
252253
queue_name,
253254
job_handlers: Vec::new(),
255+
crontabs: Vec::new(),
254256
})
255257
}
256258

@@ -278,6 +280,55 @@ impl WorkerOptionsBuilder {
278280
self.queue_name = queue_name;
279281
self
280282
}
283+
284+
/// Add a cron schedule for periodic job execution
285+
///
286+
/// # Syntax
287+
/// The crontab format is: `<timer> <task_identifier> ?<options>
288+
/// {<payload>}`
289+
///
290+
/// - **Timer**: Standard 5-field cron syntax (minute hour day month
291+
/// day-of-week)
292+
/// - **Task**: Must match a registered job handler's IDENTIFIER
293+
/// - **Options**: Query string format for job configuration
294+
/// - `fill`: Backfill period (e.g., `?fill=10m` to execute missed runs
295+
/// within 10 minutes)
296+
/// - `job_key`: Unique identifier for deduplication
297+
/// - `job_key_mode`: How to handle duplicates (`replace`,
298+
/// `preserve_run_at`, etc.)
299+
/// - `priority`: Job priority (default: 0)
300+
/// - `max`: Maximum attempts (default: 25)
301+
/// - `queue`: Queue name (default: task identifier)
302+
/// - **Payload**: Optional JSON object to pass to the job handler
303+
///
304+
/// # Examples
305+
/// ```rust,no_run
306+
/// # use backfill::{WorkerRunner, WorkerConfig};
307+
/// # async fn example() -> Result<(), backfill::BackfillError> {
308+
/// # let config = WorkerConfig::default();
309+
/// WorkerRunner::builder(config).await?
310+
/// // Every 5 minutes
311+
/// .add_cron_schedule("*/5 * * * * cleanup_task")?
312+
/// // Daily at 2:00 AM with backfill
313+
/// .add_cron_schedule("0 2 * * * backup_task ?fill=1h")?
314+
/// // With payload
315+
/// .add_cron_schedule(r#"0 * * * * report_task {\"format\":\"pdf\"}"#)?
316+
/// .build().await?;
317+
/// # Ok(())
318+
/// # }
319+
/// ```
320+
///
321+
/// # Errors
322+
/// Returns `BackfillError::CrontabParse` if the cron syntax is invalid.
323+
pub fn add_cron_schedule(mut self, spec: &str) -> Result<Self, crate::BackfillError> {
324+
// Validate the crontab spec by attempting to parse it
325+
use graphile_worker_crontab_parser::parse_crontab;
326+
parse_crontab(spec)?;
327+
328+
// Store the validated spec
329+
self.crontabs.push(spec.to_string());
330+
Ok(self)
331+
}
281332
}
282333

283334
impl From<WorkerOptionsBuilder> for WorkerOptions {
@@ -299,6 +350,13 @@ impl From<WorkerOptionsBuilder> for WorkerOptions {
299350
worker_options = (handler_config.builder_fn)(worker_options);
300351
}
301352

353+
// Add all cron schedules
354+
for crontab_spec in builder.crontabs {
355+
worker_options = worker_options
356+
.with_crontab(&crontab_spec)
357+
.expect("Crontab already validated in add_cron_schedule");
358+
}
359+
302360
worker_options
303361
}
304362
}
@@ -381,6 +439,44 @@ impl WorkerRunnerBuilder {
381439
self
382440
}
383441

442+
/// Add a cron schedule for periodic job execution
443+
///
444+
/// Schedules a task to run automatically at specified intervals using cron
445+
/// syntax. The task must be registered with `define_job()` before
446+
/// adding a cron schedule.
447+
///
448+
/// # Syntax
449+
/// `<timer> <task_identifier> ?<options> {<payload>}`
450+
///
451+
/// See [`WorkerOptionsBuilder::add_cron_schedule`] for detailed syntax
452+
/// documentation.
453+
///
454+
/// # Examples
455+
/// ```rust,no_run
456+
/// # use backfill::{WorkerRunner, WorkerConfig, TaskHandler, WorkerContext, IntoTaskHandlerResult};
457+
/// # #[derive(Clone)]
458+
/// # struct CleanupTask;
459+
/// # impl TaskHandler for CleanupTask {
460+
/// # const IDENTIFIER: &'static str = "cleanup";
461+
/// # async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult { Ok(()) }
462+
/// # }
463+
/// # async fn example() -> Result<(), backfill::BackfillError> {
464+
/// # let config = WorkerConfig::default();
465+
/// let worker = WorkerRunner::builder(config).await?
466+
/// .define_job::<CleanupTask>()
467+
/// .add_cron_schedule("*/5 * * * * cleanup")? // Every 5 minutes
468+
/// .build().await?;
469+
/// # Ok(())
470+
/// # }
471+
/// ```
472+
///
473+
/// # Errors
474+
/// Returns `BackfillError::CrontabParse` if the cron syntax is invalid.
475+
pub fn add_cron_schedule(mut self, spec: &str) -> Result<Self, BackfillError> {
476+
self.worker_options_builder = self.worker_options_builder.add_cron_schedule(spec)?;
477+
Ok(self)
478+
}
479+
384480
/// Build the final WorkerRunner
385481
pub async fn build(self) -> Result<WorkerRunner, BackfillError> {
386482
WorkerRunner::from_builder(self.config, self.worker_options_builder).await

0 commit comments

Comments
 (0)