Skip to content

Commit 71fddf7

Browse files
committed
chore: wip
1 parent ab2cb74 commit 71fddf7

7 files changed

Lines changed: 368 additions & 2 deletions

File tree

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@
6868
"example:priority-queue": "bun examples/priority-queue.ts",
6969
"example:key-rate-limiting": "bun examples/key-rate-limiting.ts",
7070
"example:distributed-locks": "bun examples/distributed-locks.ts",
71-
"example:cron-jobs": "bun examples/cron-jobs.ts"
71+
"example:cron-jobs": "bun examples/cron-jobs.ts",
72+
"example:dead-letter-queue": "bun examples/dead-letter-queue.ts"
7273
},
7374
"devDependencies": {
7475
"@stacksjs/docs": "^0.70.23",

src/dead-letter-queue.ts

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
import type { Queue } from './queue'
2+
import type { DeadLetterQueueOptions } from './types'
3+
import { Job } from './job'
4+
import { createLogger } from './logger'
5+
6+
export class DeadLetterQueue<T = any> {
7+
private queue: Queue<T>
8+
private deadLetterQueueName: string
9+
private options: DeadLetterQueueOptions
10+
private logger = createLogger('dead-letter-queue')
11+
12+
constructor(queue: Queue<T>, options: DeadLetterQueueOptions = {}) {
13+
this.queue = queue
14+
this.options = {
15+
queueSuffix: options.queueSuffix || '-dead-letter',
16+
maxRetries: options.maxRetries || 3,
17+
processFailed: options.processFailed || false,
18+
removeFromOriginalQueue: options.removeFromOriginalQueue !== false,
19+
}
20+
this.deadLetterQueueName = `${queue.name}${this.options.queueSuffix}`
21+
this.logger.debug(`Dead letter queue initialized for queue ${queue.name}`)
22+
}
23+
24+
/**
25+
* Get the name of the dead letter queue
26+
*/
27+
getQueueName(): string {
28+
return this.deadLetterQueueName
29+
}
30+
31+
/**
32+
* Move a job to the dead letter queue
33+
*/
34+
async moveToDeadLetter(job: Job<T>, reason: string): Promise<string> {
35+
const deadLetterQueueKey = `${this.queue.prefix}:${this.deadLetterQueueName}`
36+
const deadLetterKey = `${deadLetterQueueKey}:${job.id}`
37+
38+
try {
39+
// Begin transaction
40+
await this.queue.redisClient.send('MULTI', [])
41+
42+
// Store the job data in the dead letter queue
43+
await this.queue.redisClient.send('HMSET', [
44+
deadLetterKey,
45+
'id',
46+
job.id,
47+
'originalQueue',
48+
job.name,
49+
'data',
50+
JSON.stringify(job.data),
51+
'failedReason',
52+
reason || job.failedReason || '',
53+
'attemptsMade',
54+
job.attemptsMade.toString(),
55+
'stacktrace',
56+
JSON.stringify(job.stacktrace || []),
57+
'timestamp',
58+
Date.now().toString(),
59+
'originalTimestamp',
60+
job.timestamp.toString(),
61+
])
62+
63+
// Add to dead letter list
64+
await this.queue.redisClient.send('LPUSH', [deadLetterQueueKey, job.id])
65+
66+
// If configured, remove from failed list
67+
if (this.options.removeFromOriginalQueue) {
68+
await this.queue.redisClient.send('LREM', [this.queue.getKey('failed'), '0', job.id])
69+
}
70+
71+
// Execute transaction
72+
await this.queue.redisClient.send('EXEC', [])
73+
74+
this.logger.info(`Job ${job.id} moved to dead letter queue ${this.deadLetterQueueName}`)
75+
76+
// Emit dead letter event
77+
if (this.queue.events) {
78+
this.queue.events.emitJobMovedToDeadLetter(job.id, this.deadLetterQueueName, reason)
79+
}
80+
81+
return job.id
82+
}
83+
catch (err) {
84+
this.logger.error(`Failed to move job ${job.id} to dead letter queue: ${(err as Error).message}`)
85+
throw err
86+
}
87+
}
88+
89+
/**
90+
* Get all jobs in the dead letter queue
91+
*/
92+
async getJobs(start = 0, end = -1): Promise<Job<T>[]> {
93+
const deadLetterQueueKey = `${this.queue.prefix}:${this.deadLetterQueueName}`
94+
const jobIds = await this.queue.redisClient.send('LRANGE', [deadLetterQueueKey, start.toString(), end.toString()])
95+
96+
if (!jobIds || jobIds.length === 0) {
97+
return []
98+
}
99+
100+
const jobs: Job<T>[] = []
101+
for (const jobId of jobIds) {
102+
try {
103+
const deadLetterKey = `${deadLetterQueueKey}:${jobId}`
104+
const jobData = await this.queue.redisClient.send('HGETALL', [deadLetterKey])
105+
106+
if (jobData && Array.isArray(jobData) && jobData.length > 0) {
107+
// Convert array to object
108+
const jobObj: Record<string, string> = {}
109+
for (let i = 0; i < jobData.length; i += 2) {
110+
jobObj[jobData[i]] = jobData[i + 1]
111+
}
112+
113+
// Create job instance
114+
const job = new Job<T>(this.queue, jobId as string)
115+
job.data = JSON.parse(jobObj.data || '{}')
116+
job.name = jobObj.originalQueue
117+
job.timestamp = Number.parseInt(jobObj.timestamp || '0', 10)
118+
job.attemptsMade = Number.parseInt(jobObj.attemptsMade || '0', 10)
119+
job.stacktrace = JSON.parse(jobObj.stacktrace || '[]')
120+
job.failedReason = jobObj.failedReason
121+
122+
jobs.push(job)
123+
}
124+
}
125+
catch (err) {
126+
this.logger.error(`Error fetching job ${jobId} from dead letter queue: ${(err as Error).message}`)
127+
}
128+
}
129+
130+
return jobs
131+
}
132+
133+
/**
134+
* Republish a job from the dead letter queue back to its original queue
135+
*/
136+
async republishJob(jobId: string, options: { resetRetries?: boolean } = {}): Promise<Job<T> | null> {
137+
const deadLetterQueueKey = `${this.queue.prefix}:${this.deadLetterQueueName}`
138+
const deadLetterKey = `${deadLetterQueueKey}:${jobId}`
139+
140+
try {
141+
// Get job data from dead letter queue
142+
const jobData = await this.queue.redisClient.send('HGETALL', [deadLetterKey])
143+
144+
if (!jobData || !Array.isArray(jobData) || jobData.length === 0) {
145+
this.logger.warn(`Job ${jobId} not found in dead letter queue`)
146+
return null
147+
}
148+
149+
// Convert array to object
150+
const jobObj: Record<string, string> = {}
151+
for (let i = 0; i < jobData.length; i += 2) {
152+
jobObj[jobData[i]] = jobData[i + 1]
153+
}
154+
155+
// Parse job data
156+
const data = JSON.parse(jobObj.data || '{}')
157+
const queueName = jobObj.originalQueue
158+
159+
// Add job back to original queue
160+
const jobOptions: any = {
161+
jobId,
162+
}
163+
164+
if (options.resetRetries) {
165+
jobOptions.attempts = 0
166+
}
167+
168+
// Add job back to original queue (assuming the queue exists)
169+
const newJob = await this.queue.add(data, jobOptions)
170+
171+
// Remove from dead letter queue
172+
await this.queue.redisClient.send('MULTI', [])
173+
await this.queue.redisClient.send('LREM', [deadLetterQueueKey, '0', jobId])
174+
await this.queue.redisClient.send('DEL', [deadLetterKey])
175+
await this.queue.redisClient.send('EXEC', [])
176+
177+
this.logger.info(`Job ${jobId} republished from dead letter queue to ${queueName}`)
178+
179+
// Emit republish event
180+
if (this.queue.events) {
181+
this.queue.events.emitJobRepublishedFromDeadLetter(jobId, queueName)
182+
}
183+
184+
return newJob
185+
}
186+
catch (err) {
187+
this.logger.error(`Failed to republish job ${jobId} from dead letter queue: ${(err as Error).message}`)
188+
throw err
189+
}
190+
}
191+
192+
/**
193+
* Remove a job from the dead letter queue
194+
*/
195+
async removeJob(jobId: string): Promise<boolean> {
196+
const deadLetterQueueKey = `${this.queue.prefix}:${this.deadLetterQueueName}`
197+
const deadLetterKey = `${deadLetterQueueKey}:${jobId}`
198+
199+
try {
200+
await this.queue.redisClient.send('MULTI', [])
201+
await this.queue.redisClient.send('LREM', [deadLetterQueueKey, '0', jobId])
202+
await this.queue.redisClient.send('DEL', [deadLetterKey])
203+
await this.queue.redisClient.send('EXEC', [])
204+
205+
this.logger.info(`Job ${jobId} removed from dead letter queue`)
206+
return true
207+
}
208+
catch (err) {
209+
this.logger.error(`Failed to remove job ${jobId} from dead letter queue: ${(err as Error).message}`)
210+
return false
211+
}
212+
}
213+
214+
/**
215+
* Clear the entire dead letter queue
216+
*/
217+
async clear(): Promise<void> {
218+
const deadLetterQueueKey = `${this.queue.prefix}:${this.deadLetterQueueName}`
219+
220+
try {
221+
// Get all job IDs
222+
const jobIds = await this.queue.redisClient.send('LRANGE', [deadLetterQueueKey, '0', '-1'])
223+
224+
if (!jobIds || jobIds.length === 0) {
225+
return
226+
}
227+
228+
// Remove each job key
229+
await this.queue.redisClient.send('MULTI', [])
230+
231+
for (const jobId of jobIds) {
232+
const deadLetterKey = `${deadLetterQueueKey}:${jobId}`
233+
await this.queue.redisClient.send('DEL', [deadLetterKey])
234+
}
235+
236+
// Clear the list
237+
await this.queue.redisClient.send('DEL', [deadLetterQueueKey])
238+
await this.queue.redisClient.send('EXEC', [])
239+
240+
this.logger.info(`Dead letter queue ${this.deadLetterQueueName} cleared (${jobIds.length} jobs)`)
241+
}
242+
catch (err) {
243+
this.logger.error(`Failed to clear dead letter queue: ${(err as Error).message}`)
244+
throw err
245+
}
246+
}
247+
}

src/events.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,22 @@ class JobEvents extends EventEmitter {
154154
this.logger.debug(`Observable ${observableId} stopped for queue ${this.queueName}`)
155155
this.emit('observableStopped', observableId)
156156
}
157+
158+
/**
159+
* Emit job moved to dead letter queue event
160+
*/
161+
emitJobMovedToDeadLetter(jobId: string, deadLetterQueueName: string, reason: string): void {
162+
this.logger.debug(`Job ${jobId} moved to dead letter queue ${deadLetterQueueName}: ${reason}`)
163+
this.emit('jobMovedToDeadLetter', jobId, deadLetterQueueName, reason)
164+
}
165+
166+
/**
167+
* Emit job republished from dead letter queue event
168+
*/
169+
emitJobRepublishedFromDeadLetter(jobId: string, originalQueueName: string): void {
170+
this.logger.debug(`Job ${jobId} republished from dead letter queue to ${originalQueueName}`)
171+
this.emit('jobRepublishedFromDeadLetter', jobId, originalQueueName)
172+
}
157173
}
158174

159175
export { JobEvents }

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export { BatchProcessor } from './batch'
22
export { type CleanupOptions, CleanupService } from './cleanup'
33
export * from './config'
44
export { type CronJobOptions, CronScheduler } from './cron-scheduler'
5+
export { DeadLetterQueue } from './dead-letter-queue'
56
export { DistributedLock, type LockOptions } from './distributed-lock'
67
export { JobEvents } from './events'
78
export { QueueGroup } from './group'
@@ -15,5 +16,6 @@ export { RateLimiter } from './rate-limiter'
1516
export type { RateLimitResult } from './rate-limiter'
1617
export { StalledJobChecker } from './stalled-checker'
1718
export * from './types'
19+
export type { DeadLetterQueueOptions } from './types'
1820
export * from './utils'
1921
export { Worker } from './worker'

src/queue.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import type { RedisClient } from 'bun'
22
import type { CronJobOptions } from './cron-scheduler'
33
import type { RateLimitResult } from './rate-limiter'
4-
import type { JobOptions, JobStatus, QueueConfig } from './types'
4+
import type { DeadLetterQueueOptions, JobOptions, JobStatus, QueueConfig } from './types'
55
import { CleanupService } from './cleanup'
66
import { scriptLoader } from './commands'
77
import { config } from './config'
88
import { CronScheduler } from './cron-scheduler'
9+
import { DeadLetterQueue } from './dead-letter-queue'
910
import { DistributedLock } from './distributed-lock'
1011
import { JobEvents } from './events'
1112
import { Job } from './job'
@@ -31,6 +32,8 @@ export class Queue<T = any> {
3132
private defaultJobOptions: JobOptions | undefined
3233
private lock: DistributedLock | null = null
3334
private cronScheduler: CronScheduler | null = null
35+
private deadLetterQueue: DeadLetterQueue<T> | null = null
36+
private defaultDeadLetterOptions: DeadLetterQueueOptions | undefined
3437

3538
constructor(name: string, options?: QueueConfig) {
3639
this.name = name
@@ -79,6 +82,14 @@ export class Queue<T = any> {
7982

8083
// Initialize scripts
8184
this.init()
85+
86+
this.defaultDeadLetterOptions = options?.defaultDeadLetterOptions
87+
88+
// Initialize dead letter queue if enabled by default
89+
if (options?.defaultDeadLetterOptions?.enabled) {
90+
this.deadLetterQueue = new DeadLetterQueue<T>(this, options.defaultDeadLetterOptions)
91+
this.logger.debug(`Dead letter queue initialized for queue ${name}`)
92+
}
8293
}
8394

8495
/**
@@ -635,4 +646,64 @@ export class Queue<T = any> {
635646

636647
return this.cronScheduler.unschedule(jobId)
637648
}
649+
650+
getDeadLetterQueue(): DeadLetterQueue<T> {
651+
if (!this.deadLetterQueue) {
652+
this.deadLetterQueue = new DeadLetterQueue<T>(this, this.defaultDeadLetterOptions)
653+
}
654+
return this.deadLetterQueue
655+
}
656+
657+
/**
658+
* Get default dead letter queue options
659+
*/
660+
getDefaultDeadLetterOptions(): DeadLetterQueueOptions | undefined {
661+
return this.defaultDeadLetterOptions
662+
}
663+
664+
/**
665+
* Move a job to the dead letter queue
666+
*/
667+
async moveToDeadLetter(jobId: string, reason: string): Promise<boolean> {
668+
const job = await this.getJob(jobId)
669+
if (!job) {
670+
return false
671+
}
672+
673+
const dlq = this.getDeadLetterQueue()
674+
await dlq.moveToDeadLetter(job, reason)
675+
return true
676+
}
677+
678+
/**
679+
* Get jobs from the dead letter queue
680+
*/
681+
async getDeadLetterJobs(start = 0, end = -1): Promise<Job<T>[]> {
682+
const dlq = this.getDeadLetterQueue()
683+
return dlq.getJobs(start, end)
684+
}
685+
686+
/**
687+
* Republish a job from the dead letter queue
688+
*/
689+
async republishDeadLetterJob(jobId: string, options: { resetRetries?: boolean } = {}): Promise<Job<T> | null> {
690+
const dlq = this.getDeadLetterQueue()
691+
return dlq.republishJob(jobId, options)
692+
}
693+
694+
/**
695+
* Remove a job from the dead letter queue
696+
*/
697+
async removeDeadLetterJob(jobId: string): Promise<boolean> {
698+
const dlq = this.getDeadLetterQueue()
699+
return dlq.removeJob(jobId)
700+
}
701+
702+
/**
703+
* Clear the dead letter queue
704+
*/
705+
async clearDeadLetterQueue(): Promise<void> {
706+
const dlq = this.getDeadLetterQueue()
707+
return dlq.clear()
708+
}
638709
}

0 commit comments

Comments
 (0)