Skip to content

Commit df0688c

Browse files
committed
refactor!: extract magic numbers into named constants
1 parent 12c46f6 commit df0688c

File tree

8 files changed

+85
-23
lines changed

8 files changed

+85
-23
lines changed

benchmark/boringnode/harness.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export async function run(options: BenchmarkOptions): Promise<BenchmarkResult> {
5454
locations: [''],
5555
worker: {
5656
concurrency: options.concurrency,
57-
pollingInterval: 1, // Very short polling for benchmarks
57+
idleDelay: 1, // Very short delay for benchmarks
5858
},
5959
}
6060

examples/config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ export const config: QueueManagerConfig = {
2828

2929
worker: {
3030
concurrency: 5,
31-
pollingInterval: '10ms',
3231
},
3332

3433
locations: ['./examples/jobs/**/*.ts'],

src/constants.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Default job priority (1-10 scale, lower = higher priority)
3+
*/
4+
export const DEFAULT_PRIORITY = 5
5+
6+
/**
7+
* Multiplier used in score calculation: priority * multiplier + timestamp
8+
*
9+
* This ensures higher priority jobs are processed first,
10+
* while preserving FIFO order within the same priority.
11+
* The value (1e13) leaves room for ~300 years of millisecond timestamps.
12+
*/
13+
export const PRIORITY_SCORE_MULTIPLIER = 1e13
14+
15+
/**
16+
* Default delay when the worker is idle (no jobs in queue)
17+
*/
18+
export const DEFAULT_IDLE_DELAY = '2s'
19+
20+
/**
21+
* Default interval between stalled job checks
22+
*/
23+
export const DEFAULT_STALLED_INTERVAL = '30s'
24+
25+
/**
26+
* Default threshold after which a job is considered stalled
27+
*/
28+
export const DEFAULT_STALLED_THRESHOLD = '30s'
29+
30+
/**
31+
* Default delay before retrying after an error
32+
*/
33+
export const DEFAULT_ERROR_RETRY_DELAY = '5s'

src/drivers/knex_adapter.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import KnexPkg from 'knex'
22
import type { Knex } from 'knex'
33
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
44
import type { JobData } from '../types/main.js'
5+
import { DEFAULT_PRIORITY } from '../constants.js'
6+
import { calculateScore } from '../utils.js'
57

68
export interface KnexAdapterOptions {
79
connection: Knex
@@ -168,8 +170,8 @@ export class KnexAdapter implements Adapter {
168170
// Move them to pending
169171
for (const job of delayedJobs) {
170172
const jobData: JobData = JSON.parse(job.data)
171-
const priority = jobData.priority ?? 5
172-
const score = priority * 1e13 + now
173+
const priority = jobData.priority ?? DEFAULT_PRIORITY
174+
const score = calculateScore(priority, now)
173175

174176
await trx(this.#tableName).where('id', job.id).where('queue', queue).update({
175177
status: 'pending',
@@ -223,8 +225,8 @@ export class KnexAdapter implements Adapter {
223225
})
224226
} else {
225227
// Move back to pending
226-
const priority = jobData.priority ?? 5
227-
const score = priority * 1e13 + now
228+
const priority = jobData.priority ?? DEFAULT_PRIORITY
229+
const score = calculateScore(priority, now)
228230

229231
await this.#connection(this.#tableName).where('id', jobId).where('queue', queue).update({
230232
status: 'pending',
@@ -244,9 +246,9 @@ export class KnexAdapter implements Adapter {
244246
async pushOn(queue: string, jobData: JobData): Promise<void> {
245247
await this.#ensureTableExists()
246248

247-
const priority = jobData.priority ?? 5
249+
const priority = jobData.priority ?? DEFAULT_PRIORITY
248250
const timestamp = Date.now()
249-
const score = priority * 1e13 + timestamp
251+
const score = calculateScore(priority, timestamp)
250252

251253
await this.#connection(this.#tableName).insert({
252254
id: jobData.id,
@@ -327,8 +329,8 @@ export class KnexAdapter implements Adapter {
327329
} else {
328330
// Recover: increment stalledCount and put back in pending
329331
jobData.stalledCount = currentStalledCount + 1
330-
const priority = jobData.priority ?? 5
331-
const score = priority * 1e13 + now
332+
const priority = jobData.priority ?? DEFAULT_PRIORITY
333+
const score = calculateScore(priority, now)
332334

333335
await trx(this.#tableName)
334336
.where('id', row.id)

src/drivers/redis_adapter.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { Redis, type RedisOptions } from 'ioredis'
22
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
33
import type { JobData } from '../types/main.js'
4+
import { DEFAULT_PRIORITY } from '../constants.js'
5+
import { calculateScore } from '../utils.js'
46

57
const redisKey = 'jobs'
68
type RedisConfig = Redis | RedisOptions
@@ -25,6 +27,8 @@ const ACQUIRE_JOB_SCRIPT = `
2527
for i = 1, #ready_jobs do
2628
local job_data = ready_jobs[i]
2729
local job = cjson.decode(job_data)
30+
-- Score = priority * 1e13 + timestamp
31+
-- Lower score = higher priority, FIFO within same priority
2832
local priority = job.priority or 5
2933
local timestamp = tonumber(now)
3034
local score = priority * 10000000000000 + timestamp
@@ -115,6 +119,8 @@ const RETRY_JOB_SCRIPT = `
115119
if retry_at and retry_at > now then
116120
redis.call('ZADD', delayed_key, retry_at, job_data)
117121
else
122+
-- Score = priority * 1e13 + timestamp
123+
-- Lower score = higher priority, FIFO within same priority
118124
local priority = job.priority or 5
119125
local score = priority * 10000000000000 + now
120126
redis.call('ZADD', pending_key, score, job_data)
@@ -164,6 +170,8 @@ const RECOVER_STALLED_JOBS_SCRIPT = `
164170
-- Recover: increment stalledCount and put back in pending
165171
job.stalledCount = current_stalled_count + 1
166172
local job_data = cjson.encode(job)
173+
-- Score = priority * 1e13 + timestamp
174+
-- Lower score = higher priority, FIFO within same priority
167175
local priority = job.priority or 5
168176
local score = priority * 10000000000000 + now
169177
redis.call('ZADD', pending_key, score, job_data)
@@ -297,12 +305,9 @@ export class RedisAdapter implements Adapter {
297305
}
298306

299307
async pushOn(queue: string, jobData: JobData): Promise<void> {
300-
const priority = jobData.priority ?? 5
301-
302-
// Use priority as primary score, add timestamp for FIFO order within same priority
303-
// Date.now() precision is sufficient but perfect FIFO within the same millisecond is not guaranteed
308+
const priority = jobData.priority ?? DEFAULT_PRIORITY
304309
const timestamp = Date.now()
305-
const score = priority * 1e13 + timestamp
310+
const score = calculateScore(priority, timestamp)
306311

307312
await this.#connection.zadd(`${redisKey}::${queue}`, score, JSON.stringify(jobData))
308313
}

src/types/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ export interface WorkerConfig {
5757
concurrency?: number
5858

5959
/**
60-
* How often to poll for new jobs when the queue is empty.
60+
* Delay between queue polls when idle (no jobs available).
6161
* @default '2s'
6262
*/
63-
pollingInterval?: Duration
63+
idleDelay?: Duration
6464

6565
/**
6666
* Maximum duration a job can run before being timed out.

src/utils.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { parse as parseDuration } from '@lukeed/ms'
22
import type { Duration } from './types/main.js'
33
import * as errors from './exceptions.js'
4+
import { PRIORITY_SCORE_MULTIPLIER } from './constants.js'
45

56
export function parse(duration: Duration): number {
67
if (typeof duration === 'number') {
@@ -15,3 +16,15 @@ export function parse(duration: Duration): number {
1516

1617
return milliseconds
1718
}
19+
20+
/**
21+
* Calculate the score for job ordering in the queue.
22+
* Lower scores are processed first.
23+
*
24+
* @param priority - Job priority (1-10, lower = higher priority)
25+
* @param timestamp - Timestamp in milliseconds
26+
* @returns Score for queue ordering
27+
*/
28+
export function calculateScore(priority: number, timestamp: number): number {
29+
return priority * PRIORITY_SCORE_MULTIPLIER + timestamp
30+
}

src/worker.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@ import type { QueueManagerConfig, WorkerCycle } from './types/main.js'
1010
import { Locator } from './locator.js'
1111
import type { JobOptions } from './types/main.js'
1212
import type { Job } from './job.js'
13+
import {
14+
DEFAULT_IDLE_DELAY,
15+
DEFAULT_STALLED_INTERVAL,
16+
DEFAULT_STALLED_THRESHOLD,
17+
DEFAULT_ERROR_RETRY_DELAY,
18+
} from './constants.js'
1319

1420
export class Worker {
1521
readonly #id: string
1622
readonly #config: QueueManagerConfig
17-
readonly #pollingInterval: number
23+
readonly #idleDelay: number
1824
readonly #stalledInterval: number
1925
readonly #stalledThreshold: number
2026
readonly #maxStalledCount: number
@@ -39,9 +45,9 @@ export class Worker {
3945
this.#id = randomUUID()
4046

4147
// Parse worker config once at construction
42-
this.#pollingInterval = parse(config.worker?.pollingInterval ?? '2s')
43-
this.#stalledInterval = parse(config.worker?.stalledInterval ?? '30s')
44-
this.#stalledThreshold = parse(config.worker?.stalledThreshold ?? '30s')
48+
this.#idleDelay = parse(config.worker?.idleDelay ?? DEFAULT_IDLE_DELAY)
49+
this.#stalledInterval = parse(config.worker?.stalledInterval ?? DEFAULT_STALLED_INTERVAL)
50+
this.#stalledThreshold = parse(config.worker?.stalledThreshold ?? DEFAULT_STALLED_THRESHOLD)
4551
this.#maxStalledCount = config.worker?.maxStalledCount ?? 1
4652
this.#concurrency = config.worker?.concurrency ?? 1
4753
this.#gracefulShutdown = config.worker?.gracefulShutdown ?? true
@@ -148,14 +154,18 @@ export class Worker {
148154
yield* this.#fillPool(queues)
149155

150156
if (this.#pool.isEmpty()) {
151-
yield { type: 'idle', suggestedDelay: this.#pollingInterval }
157+
yield { type: 'idle', suggestedDelay: this.#idleDelay }
152158
continue
153159
}
154160

155161
const completed = await this.#pool.waitForNextCompletion()
156162
yield { type: 'completed', queue: completed.queue, job: completed.job }
157163
} catch (error) {
158-
yield { type: 'error', error: error as Error, suggestedDelay: parse('5s') }
164+
yield {
165+
type: 'error',
166+
error: error as Error,
167+
suggestedDelay: parse(DEFAULT_ERROR_RETRY_DELAY),
168+
}
159169
}
160170
}
161171
}

0 commit comments

Comments
 (0)