Skip to content

Commit 1db9f85

Browse files
committed
feat: add job history and retention
1 parent 839f42e commit 1db9f85

File tree

12 files changed

+1589
-124
lines changed

12 files changed

+1589
-124
lines changed

.changelog/redis-knex-migration.md

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
# Redis + Knex migration notes
2+
3+
> **Breaking Change**: This release introduces a new storage layout for both Redis and Knex adapters. Existing data is incompatible and requires migration.
4+
5+
## New features
6+
7+
### Job retention
8+
9+
Jobs can now be kept in history after completion or failure using the `removeOnComplete` and `removeOnFail` options:
10+
11+
```typescript
12+
// Global configuration
13+
await QueueManager.init({
14+
// ...
15+
defaultJobOptions: {
16+
removeOnComplete: false, // Keep all completed jobs
17+
removeOnFail: { count: 100 }, // Keep last 100 failed jobs
18+
},
19+
})
20+
21+
// Per-queue configuration
22+
await QueueManager.init({
23+
// ...
24+
queues: {
25+
critical: {
26+
defaultJobOptions: {
27+
removeOnFail: { age: '7d', count: 1000 }, // Keep for 7 days, max 1000
28+
},
29+
},
30+
},
31+
})
32+
33+
// Per-job configuration (via Job class)
34+
class MyJob extends Job {
35+
static options = {
36+
removeOnComplete: { count: 50 },
37+
}
38+
}
39+
```
40+
41+
Retention options:
42+
- `true` (default): Remove job immediately after completion/failure
43+
- `false`: Keep job in history indefinitely
44+
- `{ age?: Duration, count?: number }`: Keep with pruning by age and/or count
45+
46+
### Job status API
47+
48+
A new `getJob` method allows retrieving job status and data:
49+
50+
```typescript
51+
const adapter = QueueManager.getAdapter()
52+
const record = await adapter.getJob('job-id', 'queue-name')
53+
54+
if (record) {
55+
console.log(record.status) // 'pending' | 'active' | 'delayed' | 'completed' | 'failed'
56+
console.log(record.data) // Original job data
57+
console.log(record.finishedAt) // Timestamp (for completed/failed)
58+
console.log(record.error) // Error message (for failed)
59+
}
60+
```
61+
62+
## Redis adapter migration
63+
64+
### New storage layout
65+
66+
The Redis adapter now stores job payloads in a dedicated hash and tracks queue state in separate sets/hashes:
67+
68+
| Key | Type | Description |
69+
|-----|------|-------------|
70+
| `jobs::<queue>::data` | Hash | jobId -> job payload JSON |
71+
| `jobs::<queue>::pending` | Sorted Set | jobId -> priority score |
72+
| `jobs::<queue>::delayed` | Sorted Set | jobId -> executeAt timestamp |
73+
| `jobs::<queue>::active` | Hash | jobId -> { workerId, acquiredAt } |
74+
| `jobs::<queue>::completed` | Hash | jobId -> { finishedAt } |
75+
| `jobs::<queue>::completed::index` | Sorted Set | jobId -> finishedAt (for pruning) |
76+
| `jobs::<queue>::failed` | Hash | jobId -> { finishedAt, error } |
77+
| `jobs::<queue>::failed::index` | Sorted Set | jobId -> finishedAt (for pruning) |
78+
79+
### Migration steps
80+
81+
**Option 1: Flush existing data (recommended for alpha users)**
82+
83+
```bash
84+
redis-cli KEYS "jobs::*" | xargs redis-cli DEL
85+
```
86+
87+
**Option 2: Wait for existing jobs to complete**
88+
89+
1. Stop pushing new jobs
90+
2. Wait for all workers to drain existing queues
91+
3. Deploy new version
92+
4. Clean up old keys:
93+
94+
```bash
95+
# Remove old format keys (adjust pattern to your prefix)
96+
redis-cli KEYS "jobs::*" | grep -v "::data\|::pending\|::delayed\|::active\|::completed\|::failed" | xargs redis-cli DEL
97+
```
98+
99+
## Knex adapter migration
100+
101+
### Schema changes
102+
103+
The Knex adapter now persists completed/failed job state. Existing tables need these changes:
104+
105+
| Column | Type | Description |
106+
|--------|------|-------------|
107+
| `finished_at` | BIGINT (nullable) | Completion/failure timestamp |
108+
| `error` | TEXT (nullable) | Error message for failed jobs |
109+
| `status` | ENUM | Add `completed` and `failed` values |
110+
111+
New index for pruning queries:
112+
- `(queue, status, finished_at)`
113+
114+
### Migration SQL
115+
116+
**PostgreSQL:**
117+
118+
```sql
119+
-- Add new columns
120+
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT;
121+
ALTER TABLE queue_jobs ADD COLUMN error TEXT;
122+
123+
-- Add new enum values (PostgreSQL specific)
124+
ALTER TYPE queue_jobs_status ADD VALUE 'completed';
125+
ALTER TYPE queue_jobs_status ADD VALUE 'failed';
126+
127+
-- Add index for pruning
128+
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);
129+
```
130+
131+
**MySQL:**
132+
133+
```sql
134+
-- Add new columns
135+
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT UNSIGNED NULL;
136+
ALTER TABLE queue_jobs ADD COLUMN error TEXT NULL;
137+
138+
-- Modify enum to include new values
139+
ALTER TABLE queue_jobs MODIFY COLUMN status ENUM('pending', 'active', 'delayed', 'completed', 'failed') NOT NULL;
140+
141+
-- Add index for pruning
142+
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);
143+
```
144+
145+
**SQLite:**
146+
147+
```sql
148+
-- Add new columns (SQLite doesn't enforce enum, so status just works)
149+
ALTER TABLE queue_jobs ADD COLUMN finished_at INTEGER;
150+
ALTER TABLE queue_jobs ADD COLUMN error TEXT;
151+
152+
-- Add index for pruning
153+
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);
154+
```
155+
156+
### Fresh install
157+
158+
For new installations, drop and recreate the table to get the new schema automatically:
159+
160+
```sql
161+
DROP TABLE IF EXISTS queue_jobs;
162+
-- Table will be recreated on first adapter use
163+
```

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"lint": "eslint .",
2424
"prepublishOnly": "yarn build",
2525
"release": "yarn dlx release-it",
26-
"test": "c8 node --import=@poppinss/ts-exec --enable-source-maps bin/test.ts",
26+
"test": "node --import=@poppinss/ts-exec --enable-source-maps bin/test.ts --reporters=dot",
27+
"test:coverage": "c8 yarn test",
2728
"typecheck": "tsc --noEmit"
2829
},
2930
"dependencies": {

src/contracts/adapter.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import type { JobData, ScheduleConfig, ScheduleData, ScheduleListOptions } from '../types/main.js'
1+
import type {
2+
JobData,
3+
JobRecord,
4+
JobRetention,
5+
ScheduleConfig,
6+
ScheduleData,
7+
ScheduleListOptions,
8+
} from '../types/main.js'
29

310
/**
411
* A job that has been acquired by a worker for processing.
@@ -76,17 +83,19 @@ export interface Adapter {
7683
*
7784
* @param jobId - The job ID to complete
7885
* @param queue - The queue the job belongs to
86+
* @param removeOnComplete - Optional retention policy for completed jobs
7987
*/
80-
completeJob(jobId: string, queue: string): Promise<void>
88+
completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void>
8189

8290
/**
8391
* Mark a job as failed permanently and remove it from the queue.
8492
*
8593
* @param jobId - The job ID to fail
8694
* @param queue - The queue the job belongs to
8795
* @param error - Optional error that caused the failure
96+
* @param removeOnFail - Optional retention policy for failed jobs
8897
*/
89-
failJob(jobId: string, queue: string, error?: Error): Promise<void>
98+
failJob(jobId: string, queue: string, error?: Error, removeOnFail?: JobRetention): Promise<void>
9099

91100
/**
92101
* Retry a job by moving it back to pending with incremented attempts.
@@ -97,6 +106,15 @@ export interface Adapter {
97106
*/
98107
retryJob(jobId: string, queue: string, retryAt?: Date): Promise<void>
99108

109+
/**
110+
* Get a job record by id.
111+
*
112+
* @param jobId - The job ID to retrieve
113+
* @param queue - The queue the job belongs to
114+
* @returns The job record, or null if not found
115+
*/
116+
getJob(jobId: string, queue: string): Promise<JobRecord | null>
117+
100118
/**
101119
* Push a job to the default queue for immediate processing.
102120
*

0 commit comments

Comments
 (0)