Skip to content

Commit a09018e

Browse files
committed
feat(scheduler): add cron/interval scheduling with persistent schedules
1 parent c7d445e commit a09018e

29 files changed

+2441
-1244
lines changed

.beads/interactions.jsonl

Whitespace-only changes.

.beads/metadata.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"database": "beads.db",
3+
"jsonl_export": "issues.jsonl"
4+
}

README.md

Lines changed: 79 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ npm install @boringnode/queue
2929
- **Priority Queues**: Process high-priority jobs first
3030
- **Retry with Backoff**: Automatic retries with exponential, linear, or fixed backoff strategies
3131
- **Job Timeout**: Automatically fail or retry jobs that exceed a time limit
32-
- **Repeating Jobs**: Schedule jobs to repeat at fixed intervals
32+
- **Scheduled Jobs**: Cron-based or interval-based job scheduling with pause/resume support
3333

3434
## Quick Start
3535

@@ -355,18 +355,15 @@ export default class MyJob extends Job<Payload> {
355355

356356
### Context Properties
357357

358-
| Property | Type | Description |
359-
|-------------------|---------------------|---------------------------------------------------|
360-
| `jobId` | string | Unique identifier for this job |
361-
| `name` | string | Job class name |
362-
| `attempt` | number | Current attempt number (1-based) |
363-
| `queue` | string | Queue name this job is being processed from |
364-
| `priority` | number | Job priority (lower = higher priority) |
365-
| `acquiredAt` | Date | When this job was acquired by the worker |
366-
| `stalledCount` | number | Times this job was recovered from stalled state |
367-
| `isRepeating` | boolean | Whether this job is configured to repeat |
368-
| `repeatRemaining` | number \| undefined | Remaining repetitions (undefined = infinite) |
369-
| `repeatId` | string \| undefined | Unique ID for the repeat chain (for cancellation) |
358+
| Property | Type | Description |
359+
|----------------|--------|-------------------------------------------------|
360+
| `jobId` | string | Unique identifier for this job |
361+
| `name` | string | Job class name |
362+
| `attempt` | number | Current attempt number (1-based) |
363+
| `queue` | string | Queue name this job is being processed from |
364+
| `priority` | number | Job priority (lower = higher priority) |
365+
| `acquiredAt` | Date | When this job was acquired by the worker |
366+
| `stalledCount` | number | Times this job was recovered from stalled state |
370367

371368
## Dependency Injection
372369

@@ -417,87 +414,96 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
417414

418415
Without a `jobFactory`, jobs are instantiated with `new JobClass(payload, context)`.
419416

420-
## Repeating Jobs
417+
## Scheduled Jobs
421418

422-
Schedule jobs to repeat automatically at fixed intervals:
419+
Schedule jobs to run on a recurring basis using cron expressions or fixed intervals. Schedules are persisted and survive worker restarts.
423420

424-
```typescript
425-
// Repeat every 5 seconds indefinitely
426-
await SyncJob.dispatch({ source: 'api' }).every('5s')
427-
428-
// Repeat every hour, 10 times total
429-
await CleanupJob.dispatch({ days: 30 }).every('1h').times(10)
421+
### Creating a Schedule
430422

431-
// Combine with delay (start after 30 seconds, then repeat every minute)
432-
await ReportJob.dispatch({ type: 'daily' }).in('30s').every('1m')
423+
```typescript
424+
import { Schedule } from '@boringnode/queue'
425+
426+
// Run every 10 seconds (uses job name as schedule ID by default)
427+
const { scheduleId } = await MetricsJob.schedule({ endpoint: '/api/health' }).every('10s').run()
428+
429+
// Run on a cron schedule with custom ID
430+
await CleanupJob.schedule({ days: 30 })
431+
.id('daily-cleanup') // Custom ID (optional, defaults to job name)
432+
.cron('0 * * * *') // Every hour at minute 0
433+
.timezone('Europe/Paris') // Optional timezone (default: UTC)
434+
.run()
435+
436+
// Schedule with constraints
437+
await ReportJob.schedule({ type: 'weekly' })
438+
.id('weekly-report')
439+
.cron('0 9 * * MON') // Every Monday at 9am
440+
.from(new Date('2024-01-01')) // Start date
441+
.to(new Date('2024-12-31')) // End date
442+
.limit(52) // Maximum 52 runs
443+
.run()
433444
```
434445

435-
### Cancelling a Repeating Job
436-
437-
When dispatching a repeating job, you receive a `repeatId` that can be used to cancel the entire repeat chain from anywhere:
446+
### Managing Schedules
438447

439448
```typescript
440-
import { QueueManager } from '@boringnode/queue'
449+
import { Schedule } from '@boringnode/queue'
441450

442-
// Dispatch returns jobId and repeatId
443-
const { jobId, repeatId } = await SyncJob.dispatch({ source: 'api' }).every('5s')
451+
// Find a schedule by ID
452+
const schedule = await Schedule.find('health-check')
444453

445-
console.log(`Started repeating job ${jobId} with repeat chain ${repeatId}`)
454+
if (schedule) {
455+
console.log(`Status: ${schedule.status}`) // 'active' or 'paused'
456+
console.log(`Run count: ${schedule.runCount}`)
457+
console.log(`Next run: ${schedule.nextRunAt}`)
458+
console.log(`Last run: ${schedule.lastRunAt}`)
446459

447-
// Later, cancel the repeat chain from anywhere
448-
if (repeatId) {
449-
await QueueManager.cancelRepeat(repeatId)
450-
}
451-
```
452-
453-
The `repeatId` is also available inside the job via `this.context.repeatId`.
454-
455-
### Stopping from Within the Job
460+
// Pause the schedule
461+
await schedule.pause()
456462

457-
A job can stop its own repetition by calling `this.stopRepeating()`:
463+
// Resume the schedule
464+
await schedule.resume()
458465

459-
```typescript
460-
import { Job } from '@boringnode/queue'
461-
import type { JobContext } from '@boringnode/queue/types'
466+
// Trigger an immediate run (outside of the normal schedule)
467+
await schedule.trigger()
462468

463-
export default class SyncJob extends Job<SyncPayload> {
464-
static readonly jobName = 'SyncJob'
465-
466-
async execute(): Promise<void> {
467-
const result = await this.syncData()
468-
469-
// Stop repeating when sync is complete
470-
if (result.isComplete) {
471-
this.stopRepeating()
472-
}
473-
}
469+
// Delete the schedule
470+
await schedule.delete()
474471
}
475472
```
476473

477-
### Repeat Context
478-
479-
Jobs have access to repeat information via `this.context`:
474+
### Listing Schedules
480475

481476
```typescript
482-
async execute(): Promise<void> {
483-
if (this.context.isRepeating) {
484-
console.log(`Repeating job, ${this.context.repeatRemaining ?? 'infinite'} runs remaining`)
485-
}
486-
}
487-
```
477+
import { Schedule } from '@boringnode/queue'
488478

489-
| Property | Type | Description |
490-
|-------------------|---------------------|---------------------------------------------------|
491-
| `isRepeating` | boolean | Whether this job is configured to repeat |
492-
| `repeatRemaining` | number \| undefined | Remaining repetitions (undefined = infinite) |
493-
| `repeatId` | string \| undefined | Unique ID for the repeat chain (for cancellation) |
479+
// List all schedules
480+
const all = await Schedule.list()
494481

495-
### How Repeating Works
482+
// Filter by status
483+
const active = await Schedule.list({ status: 'active' })
484+
const paused = await Schedule.list({ status: 'paused' })
485+
```
496486

497-
- Each repeat creates a **new job** with a new ID
498-
- The payload is **preserved** across repeats
499-
- Failed jobs do **not** repeat (only successful completions trigger the next run)
500-
- The repeat interval is the delay **between** job completions
487+
### Schedule Options
488+
489+
| Method | Description |
490+
|----------------------|-------------------------------------------------|
491+
| `.id(string)` | Unique identifier (defaults to job name) |
492+
| `.every(duration)` | Run at fixed intervals ('5s', '1m', '1h', '1d') |
493+
| `.cron(expression)` | Run on a cron schedule |
494+
| `.timezone(tz)` | Timezone for cron expressions (default: 'UTC') |
495+
| `.from(date)` | Don't run before this date |
496+
| `.to(date)` | Don't run after this date |
497+
| `.between(from, to)` | Shorthand for `.from().to()` |
498+
| `.limit(n)` | Maximum number of runs |
499+
500+
### How Scheduling Works
501+
502+
- Schedules are **persisted** in the database (via the adapter)
503+
- The **Worker** polls for due schedules and dispatches jobs automatically
504+
- Each schedule run creates a **new job** with a unique ID
505+
- Multiple workers can run concurrently - only one will claim each due schedule
506+
- Failed jobs do **not** affect the schedule (the next run will still occur)
501507

502508
## Job Discovery
503509

examples/app.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { config } from './config.js'
22
import { QueueManager } from '../src/queue_manager.js'
3+
import { Schedule } from '../src/schedule.js'
34
import SendEmailJob from './jobs/send_email_job.js'
45
import SyncJob from './jobs/sync_job.js'
56
import MetricsJob from './jobs/metrics_job.js'
@@ -14,13 +15,25 @@ for (let i = 0; i < 10; i++) {
1415
await SendEmailJob.dispatch({ to: 'romain.lanz@pm.me' + i })
1516
}
1617

17-
// Example: Dispatch a repeating job and get the repeatId for later cancellation
18-
const { jobId, repeatId } = await MetricsJob.dispatch({ endpoint: '/api/health' }).every('10s')
18+
// Example: Schedule a recurring metrics job every 10 seconds
19+
// By default, the schedule ID is the job name ('MetricsJob')
20+
const { scheduleId } = await MetricsJob.schedule({ endpoint: '/api/health' }).every('10s').run()
1921

20-
console.log(`Started metrics collection job ${jobId}`)
21-
console.log(`To cancel this repeating job, use: await QueueManager.cancelRepeat('${repeatId}')`)
22+
console.log(`Created schedule: ${scheduleId}`) // 'MetricsJob'
2223

23-
// Example: Cancel a repeating job after some condition
24-
// await QueueManager.cancelRepeat(repeatId)
24+
// Manage the schedule using its ID
25+
const schedule = await Schedule.find('MetricsJob')
26+
if (schedule) {
27+
console.log(`Schedule status: ${schedule.status}, run count: ${schedule.runCount}`)
28+
29+
// Pause, resume, or delete
30+
// await schedule.pause()
31+
// await schedule.resume()
32+
// await schedule.delete()
33+
}
34+
35+
// List all active schedules
36+
const activeSchedules = await Schedule.list({ status: 'active' })
37+
console.log(`Active schedules: ${activeSchedules.length}`)
2538

2639
await QueueManager.destroy()

examples/jobs/metrics_job.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,15 @@ interface MetricsJobPayload {
66
}
77

88
/**
9-
* Example job that collects metrics at regular intervals.
10-
* Demonstrates repeating jobs with external cancellation.
9+
* Example job that collects metrics.
10+
* For scheduled/repeating execution, use the Schedule API:
11+
*
12+
* ```typescript
13+
* await MetricsJob.schedule({ endpoint: '/api/health' })
14+
* .id('health-check')
15+
* .every('10s')
16+
* .run()
17+
* ```
1118
*/
1219
export default class MetricsJob extends Job<MetricsJobPayload> {
1320
static readonly jobName = 'MetricsJob'
@@ -17,13 +24,7 @@ export default class MetricsJob extends Job<MetricsJobPayload> {
1724
}
1825

1926
async execute(): Promise<void> {
20-
const repeatInfo = this.context.isRepeating
21-
? ` (repeat ${this.context.repeatRemaining ?? '∞'} remaining, repeatId: ${this.context.repeatId})`
22-
: ''
23-
24-
console.log(
25-
`[Job ${this.context.jobId}] Collecting metrics from ${this.payload.endpoint}${repeatInfo}`
26-
)
27+
console.log(`[Job ${this.context.jobId}] Collecting metrics from ${this.payload.endpoint}`)
2728

2829
// Simulate metrics collection
2930
const metrics = {

index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ export { Job } from './src/job.js'
22
export { Worker } from './src/worker.js'
33
export { QueueManager } from './src/queue_manager.js'
44
export { Locator } from './src/locator.js'
5+
export { Schedule } from './src/schedule.js'
6+
export { ScheduleBuilder } from './src/schedule_builder.js'
57
export {
68
customBackoff,
79
linearBackoff,

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
},
2929
"dependencies": {
3030
"@lukeed/ms": "^2.0.2",
31-
"@poppinss/utils": "^6.10.1"
31+
"@poppinss/utils": "^6.10.1",
32+
"cron-parser": "^5.4.0"
3233
},
3334
"devDependencies": {
3435
"@adonisjs/eslint-config": "^2.1.2",

src/contracts/adapter.ts

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { JobData } from '../types/main.js'
1+
import type { JobData, ScheduleConfig, ScheduleData, ScheduleListOptions } from '../types/main.js'
22

33
/**
44
* A job that has been acquired by a worker for processing.
@@ -151,20 +151,62 @@ export interface Adapter {
151151
destroy(): Promise<void>
152152

153153
/**
154-
* Cancel a repeating job chain.
154+
* Create or update a schedule.
155155
*
156-
* After calling this, `isRepeatCancelled` will return true for this groupId,
157-
* and the worker will not re-dispatch jobs with this groupId.
156+
* If a schedule with the given id exists, it will be updated (upsert).
157+
* Otherwise, a new schedule is created.
158158
*
159-
* @param groupId - The repeat chain identifier (from RepeatConfig.groupId)
159+
* @param config - The schedule configuration
160+
* @returns The schedule ID
160161
*/
161-
cancelRepeat(groupId: string): Promise<void>
162+
createSchedule(config: ScheduleConfig): Promise<string>
162163

163164
/**
164-
* Check if a repeat chain has been cancelled.
165+
* Get a schedule by ID.
165166
*
166-
* @param groupId - The repeat chain identifier to check
167-
* @returns True if the repeat chain has been cancelled
167+
* @param id - The schedule ID
168+
* @returns The schedule data, or null if not found
168169
*/
169-
isRepeatCancelled(groupId: string): Promise<boolean>
170+
getSchedule(id: string): Promise<ScheduleData | null>
171+
172+
/**
173+
* List all schedules matching the given options.
174+
*
175+
* @param options - Optional filters for listing
176+
* @returns Array of schedule data
177+
*/
178+
listSchedules(options?: ScheduleListOptions): Promise<ScheduleData[]>
179+
180+
/**
181+
* Update a schedule's status or run metadata.
182+
*
183+
* @param id - The schedule ID
184+
* @param updates - The fields to update
185+
*/
186+
updateSchedule(
187+
id: string,
188+
updates: Partial<Pick<ScheduleData, 'status' | 'nextRunAt' | 'lastRunAt' | 'runCount'>>
189+
): Promise<void>
190+
191+
/**
192+
* Delete a schedule permanently.
193+
*
194+
* @param id - The schedule ID to delete
195+
*/
196+
deleteSchedule(id: string): Promise<void>
197+
198+
/**
199+
* Atomically claim a due schedule for execution.
200+
*
201+
* This method:
202+
* 1. Finds ONE schedule where nextRunAt <= now AND status = 'active'
203+
* 2. Calculates and updates its nextRunAt to the next occurrence
204+
* 3. Increments runCount and sets lastRunAt
205+
* 4. Returns the schedule data for job dispatching
206+
*
207+
* The atomic nature prevents multiple workers from claiming the same schedule.
208+
*
209+
* @returns The claimed schedule, or null if no schedules are due
210+
*/
211+
claimDueSchedule(): Promise<ScheduleData | null>
170212
}

0 commit comments

Comments
 (0)