Skip to content

Commit ab2cb74

Browse files
committed
chore: wip
1 parent 82554ed commit ab2cb74

8 files changed

Lines changed: 78 additions & 55 deletions

File tree

docs/cron-expressions.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ For more complex scheduling needs, you might need to combine multiple cron expre
9696
## Using Cron Jobs in Bun Bull
9797

9898
```typescript
99-
import { Queue, type CronJobOptions } from 'bun-queue'
99+
import type { CronJobOptions } from 'bun-queue'
100+
import { Queue } from 'bun-queue'
100101

101102
// Create a queue
102103
const queue = new Queue('my-queue')

src/cron-scheduler.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ export class CronScheduler {
9494
limit: options.limit,
9595
// Need to specify 'every' for backward compatibility
9696
every: 0, // This value is ignored when cron is specified
97-
}
97+
},
9898
}
9999

100100
// Calculate delay until next execution
@@ -107,7 +107,8 @@ export class CronScheduler {
107107
this.logger.info(`Scheduled cron job ${jobId} with expression "${cronExpression}", next run at ${nextRun.toLocaleString()}`)
108108

109109
return jobId
110-
} catch (error) {
110+
}
111+
catch (error) {
111112
this.logger.error(`Error scheduling cron job: ${(error as Error).message}`)
112113
throw error
113114
}
@@ -155,7 +156,7 @@ export class CronScheduler {
155156
for (const part of parts) {
156157
// Handle ranges (e.g. "1-5")
157158
if (part.includes('-')) {
158-
const [start, end] = part.split('-').map(n => parseInt(n, 10))
159+
const [start, end] = part.split('-').map(n => Number.parseInt(n, 10))
159160

160161
if (isNaN(start) || isNaN(end) || start < min || end > max || start > end) {
161162
throw new Error(`Invalid range in cron expression: ${part}`)
@@ -168,17 +169,17 @@ export class CronScheduler {
168169
// Handle steps (e.g. "*/2", "1/2")
169170
else if (part.includes('/')) {
170171
const [range, step] = part.split('/')
171-
const stepValue = parseInt(step, 10)
172+
const stepValue = Number.parseInt(step, 10)
172173

173174
if (isNaN(stepValue) || stepValue <= 0) {
174175
throw new Error(`Invalid step in cron expression: ${part}`)
175176
}
176177

177178
let start = min
178-
let end = max
179+
const end = max
179180

180181
if (range !== '*') {
181-
const rangeValue = parseInt(range, 10)
182+
const rangeValue = Number.parseInt(range, 10)
182183
if (!isNaN(rangeValue)) {
183184
start = rangeValue
184185
}
@@ -190,7 +191,7 @@ export class CronScheduler {
190191
}
191192
// Handle single values
192193
else {
193-
const value = parseInt(part, 10)
194+
const value = Number.parseInt(part, 10)
194195

195196
if (isNaN(value) || value < min || value > max) {
196197
throw new Error(`Invalid value in cron expression: ${part}`)
@@ -226,15 +227,15 @@ export class CronScheduler {
226227
hour: 'numeric',
227228
minute: 'numeric',
228229
second: 'numeric',
229-
hour12: false
230+
hour12: false,
230231
}
231232

232233
const formatter = new Intl.DateTimeFormat('en-US', options)
233234
const parts = formatter.formatToParts(date)
234235

235236
const getValue = (type: string): number => {
236237
const part = parts.find(p => p.type === type)
237-
return part ? parseInt(part.value, 10) : 0
238+
return part ? Number.parseInt(part.value, 10) : 0
238239
}
239240

240241
const year = getValue('year')
@@ -246,7 +247,8 @@ export class CronScheduler {
246247
// Create a new date in the target timezone
247248
date.setFullYear(year, month, day)
248249
date.setHours(hour, minute, 0, 0)
249-
} catch (error) {
250+
}
251+
catch (error) {
250252
this.logger.warn(`Invalid timezone: ${timezone}, using system timezone`)
251253
}
252254
}
@@ -325,7 +327,8 @@ export class CronScheduler {
325327

326328
this.logger.info(`Unscheduled cron job ${jobId}`)
327329
return true
328-
} catch (error) {
330+
}
331+
catch (error) {
329332
this.logger.error(`Error unscheduling cron job ${jobId}: ${(error as Error).message}`)
330333
return false
331334
}

src/distributed-lock.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { RedisClient } from 'bun'
2-
import { generateId } from './utils'
32
import { createLogger } from './logger'
3+
import { generateId } from './utils'
44

55
export interface LockOptions {
66
/**
@@ -61,8 +61,8 @@ export class DistributedLock {
6161
const result = await this.redisClient.send('SET', [
6262
lockKey,
6363
token,
64-
'NX', // Only set if key doesn't exist
65-
'PX', // Set expiry in milliseconds
64+
'NX', // Only set if key doesn't exist
65+
'PX', // Set expiry in milliseconds
6666
duration.toString(),
6767
])
6868

@@ -108,7 +108,8 @@ export class DistributedLock {
108108
await this.redisClient.del(lockKey)
109109
this.logger.debug(`Released lock ${resource} with token ${token}`)
110110
return true
111-
} else {
111+
}
112+
else {
112113
this.logger.debug(`Failed to release lock ${resource} with token ${token}`)
113114
return false
114115
}
@@ -123,7 +124,7 @@ export class DistributedLock {
123124
const lockKey = this.getLockKey(resource)
124125
const result = await this.redisClient.exists(lockKey)
125126
// Different Redis clients may return different types
126-
return result ? true : false
127+
return !!result
127128
}
128129

129130
/**
@@ -166,7 +167,8 @@ export class DistributedLock {
166167
clearInterval(autoExtendId)
167168
this.logger.debug(`Stopped auto-extension for lock ${resource}`)
168169
}
169-
} catch (error) {
170+
}
171+
catch (error) {
170172
this.logger.error(`Error extending lock ${resource}: ${(error as Error).message}`)
171173
clearInterval(autoExtendId)
172174
}
@@ -219,8 +221,9 @@ export class DistributedLock {
219221

220222
try {
221223
return await fn()
222-
} finally {
224+
}
225+
finally {
223226
await this.release(resource, token)
224227
}
225228
}
226-
}
229+
}

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
export { BatchProcessor } from './batch'
22
export { type CleanupOptions, CleanupService } from './cleanup'
33
export * from './config'
4-
export { CronScheduler, type CronJobOptions } from './cron-scheduler'
4+
export { type CronJobOptions, CronScheduler } from './cron-scheduler'
55
export { DistributedLock, type LockOptions } from './distributed-lock'
66
export { JobEvents } from './events'
77
export { QueueGroup } from './group'

src/priority-queue.ts

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import type { RedisClient } from 'bun'
2-
import type { QueueConfig, JobOptions, PriorityLevel, PriorityQueueOptions, JobStatus } from './types'
3-
import { DEFAULT_PRIORITY_LEVEL, MAX_PRIORITY_LEVELS } from './types'
2+
import type { JobEvents } from './events'
3+
import type { JobOptions, JobStatus, PriorityLevel, PriorityQueueOptions, QueueConfig } from './types'
44
import { Job } from './job'
5-
import { Queue } from './queue'
65
import { createLogger } from './logger'
7-
import { generateId, getRedisClient, mergeOptions } from './utils'
8-
import { Worker } from './worker'
9-
import { JobEvents } from './events'
6+
import { Queue } from './queue'
7+
import { DEFAULT_PRIORITY_LEVEL, MAX_PRIORITY_LEVELS } from './types'
8+
import { generateId } from './utils'
109

1110
/**
1211
* PriorityQueue implements a queue with priority support
@@ -98,15 +97,24 @@ export class PriorityQueue<T = any> {
9897
// Store the job data with priority
9998
await this.redisClient.send('HMSET', [
10099
jobKey,
101-
'id', jobId,
102-
'name', this.name,
103-
'data', JSON.stringify(data),
104-
'timestamp', timestamp.toString(),
105-
'delay', (opts.delay || 0).toString(),
106-
'opts', JSON.stringify(opts),
107-
'attemptsMade', '0',
108-
'progress', '0',
109-
'priority', priority.toString(),
100+
'id',
101+
jobId,
102+
'name',
103+
this.name,
104+
'data',
105+
JSON.stringify(data),
106+
'timestamp',
107+
timestamp.toString(),
108+
'delay',
109+
(opts.delay || 0).toString(),
110+
'opts',
111+
JSON.stringify(opts),
112+
'attemptsMade',
113+
'0',
114+
'progress',
115+
'0',
116+
'priority',
117+
priority.toString(),
110118
])
111119

112120
// Handle dependencies if any
@@ -283,7 +291,7 @@ export class PriorityQueue<T = any> {
283291
for (let priority = 0; priority < this.priorityLevels; priority++) {
284292
const priorityKey = this.getPriorityKey(priority)
285293
const countResult = await this.redisClient.send('LLEN', [priorityKey])
286-
const count = typeof countResult === 'string' ? parseInt(countResult, 10) : Number(countResult)
294+
const count = typeof countResult === 'string' ? Number.parseInt(countResult, 10) : Number(countResult)
287295
priorityJobs += count
288296
}
289297

@@ -319,7 +327,7 @@ export class PriorityQueue<T = any> {
319327

320328
// See if there are any jobs in this priority queue
321329
const lengthResult = await this.redisClient.send('LLEN', [priorityKey])
322-
const length = typeof lengthResult === 'string' ? parseInt(lengthResult, 10) : Number(lengthResult)
330+
const length = typeof lengthResult === 'string' ? Number.parseInt(lengthResult, 10) : Number(lengthResult)
323331

324332
if (length && length > 0) {
325333
// Get all jobs from this priority level
@@ -350,13 +358,13 @@ export class PriorityQueue<T = any> {
350358
private startJobMover(): void {
351359
// Move jobs immediately to start
352360
this.moveJobsToWaiting().catch(err =>
353-
this.logger.error(`Error moving priority jobs to waiting: ${(err as Error).message}`)
361+
this.logger.error(`Error moving priority jobs to waiting: ${(err as Error).message}`),
354362
)
355363

356364
// Then set up interval to do it periodically (faster than the worker tick)
357365
this.jobMoverTimer = setInterval(() => {
358366
this.moveJobsToWaiting().catch(err =>
359-
this.logger.error(`Error moving priority jobs to waiting: ${(err as Error).message}`)
367+
this.logger.error(`Error moving priority jobs to waiting: ${(err as Error).message}`),
360368
)
361369
}, 25) // Faster than the standard worker tick (50ms)
362370
}
@@ -458,4 +466,4 @@ export class PriorityQueue<T = any> {
458466
private getPriorityKey(priority: PriorityLevel): string {
459467
return `${this.keyPrefix}:priority:${priority}`
460468
}
461-
}
469+
}

src/queue.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
import type { RedisClient } from 'bun'
2+
import type { CronJobOptions } from './cron-scheduler'
3+
import type { RateLimitResult } from './rate-limiter'
24
import type { JobOptions, JobStatus, QueueConfig } from './types'
35
import { CleanupService } from './cleanup'
46
import { scriptLoader } from './commands'
57
import { config } from './config'
6-
import { CronScheduler, type CronJobOptions } from './cron-scheduler'
8+
import { CronScheduler } from './cron-scheduler'
79
import { DistributedLock } from './distributed-lock'
810
import { JobEvents } from './events'
911
import { Job } from './job'
1012
import { createLogger } from './logger'
1113
import { Metrics } from './metrics'
14+
import { RateLimiter } from './rate-limiter'
1215
import { StalledJobChecker } from './stalled-checker'
1316
import { generateId, getRedisClient, mergeOptions } from './utils'
1417
import { Worker } from './worker'
15-
import { RateLimiter } from './rate-limiter'
16-
import type { RateLimitResult } from './rate-limiter'
1718

1819
export class Queue<T = any> {
1920
name: string
@@ -112,7 +113,7 @@ export class Queue<T = any> {
112113
const opts = {
113114
...this.defaultJobOptions,
114115
...options,
115-
delay: limiterResult.resetIn
116+
delay: limiterResult.resetIn,
116117
}
117118

118119
return this.add(data, opts)
@@ -540,7 +541,7 @@ export class Queue<T = any> {
540541
/**
541542
* Check if the queue is rate limited for a specific key
542543
*/
543-
async isRateLimited(key?: string, data?: T): Promise<{ limited: boolean; resetIn: number }> {
544+
async isRateLimited(key?: string, data?: T): Promise<{ limited: boolean, resetIn: number }> {
544545
if (!this.limiter) {
545546
return { limited: false, resetIn: 0 }
546547
}
@@ -550,7 +551,8 @@ export class Queue<T = any> {
550551
if (key) {
551552
// Use explicit key if provided
552553
result = await this.limiter.checkByKey(key)
553-
} else {
554+
}
555+
else {
554556
// Use data with keyPrefix from limiter options
555557
result = await this.limiter.check(data)
556558
}
@@ -570,7 +572,8 @@ export class Queue<T = any> {
570572
// If locks are disabled, just process the job directly
571573
if (!this.lock) {
572574
const job = await this.getJob(jobId)
573-
if (!job) return null
575+
if (!job)
576+
return null
574577
return handler(job)
575578
}
576579

@@ -582,17 +585,19 @@ export class Queue<T = any> {
582585
return await this.lock.withLock(lockResource, async () => {
583586
// Now we have the lock, fetch the job and process it
584587
const job = await this.getJob(jobId)
585-
if (!job) return null
588+
if (!job)
589+
return null
586590

587591
// Process the job with the lock held
588592
this.logger.debug(`Processing job ${jobId} with distributed lock`)
589593
return handler(job)
590594
}, {
591595
duration: 30000, // 30 second lock
592-
retries: 3, // Try 3 times to get the lock
593-
retryDelay: 200 // 200ms between retries
596+
retries: 3, // Try 3 times to get the lock
597+
retryDelay: 200, // 200ms between retries
594598
})
595-
} catch (error) {
599+
}
600+
catch (error) {
596601
this.logger.error(`Failed to acquire lock for job ${jobId}: ${(error as Error).message}`)
597602
throw error
598603
}

src/rate-limiter.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ export class RateLimiter {
3131
// Use the function to generate a key from the data
3232
const keyValue = this.options.keyPrefix(data)
3333
identifier = `${this.queue.name}:${keyValue}`
34-
} else {
34+
}
35+
else {
3536
// Use the keyPrefix as a property path in the data object
3637
const keyValue = data[this.options.keyPrefix]
3738
if (keyValue) {

src/worker.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ export class Worker<T = any> {
130130
if ('processJobWithLock' in this.queue && typeof this.queue.processJobWithLock === 'function') {
131131
// Process with distributed lock for better concurrency safety
132132
await this.queue.processJobWithLock(jobId, async (job) => {
133-
if (!job) return
133+
if (!job)
134+
return
134135

135136
// Set processedOn timestamp
136137
const jobKey = this.queue.getJobKey(jobId)
@@ -145,7 +146,8 @@ export class Worker<T = any> {
145146
await job.moveToCompleted(result)
146147
return result
147148
})
148-
} else {
149+
}
150+
else {
149151
// Fallback to traditional processing without distributed lock
150152
const job = await this.queue.getJob(jobId)
151153
if (!job)

0 commit comments

Comments
 (0)