Skip to content

Commit c1563b5

Browse files
committed
feat: add jobFactory option for dependency injection support
1 parent 592f4b9 commit c1563b5

File tree

5 files changed

+329
-29
lines changed

5 files changed

+329
-29
lines changed

README.md

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ const config = {
7272

7373
worker: {
7474
concurrency: 5,
75-
pollingInterval: '10ms',
75+
idleDelay: '2s',
7676
},
7777

7878
locations: ['./app/jobs/**/*.ts'],
@@ -242,7 +242,7 @@ export default class UrgentJob extends Job<Payload> {
242242
static readonly jobName = 'UrgentJob'
243243

244244
static options: JobOptions = {
245-
priority: 1, // Processed before default priority (5)
245+
priority: 1, // Processed before default priority (5)
246246
}
247247

248248
async execute(): Promise<void> {
@@ -264,12 +264,13 @@ export default class ReliableJob extends Job<Payload> {
264264
static options: JobOptions = {
265265
maxRetries: 5,
266266
retry: {
267-
backoff: () => exponentialBackoff({
268-
baseDelay: '1s',
269-
maxDelay: '1m',
270-
multiplier: 2,
271-
jitter: true,
272-
}),
267+
backoff: () =>
268+
exponentialBackoff({
269+
baseDelay: '1s',
270+
maxDelay: '1m',
271+
multiplier: 2,
272+
jitter: true,
273+
}),
273274
},
274275
}
275276

@@ -309,25 +310,73 @@ You can also set a global timeout in the worker configuration:
309310
```typescript
310311
const config = {
311312
worker: {
312-
timeout: '1m', // Default timeout for all jobs
313+
timeout: '1m', // Default timeout for all jobs
313314
},
314315
}
315316
```
316317

318+
## Dependency Injection
319+
320+
Use the `jobFactory` option to integrate with IoC containers for dependency injection. This allows your jobs to receive injected services in their constructor.
321+
322+
```typescript
323+
import { Worker } from '@boringnode/queue'
324+
import type { JobFactory } from '@boringnode/queue'
325+
326+
const worker = new Worker({
327+
default: 'redis',
328+
adapters: { redis: redis(connection) },
329+
worker: {
330+
jobFactory: async (JobClass, payload) => {
331+
// Use your IoC container to instantiate jobs
332+
return app.container.make(JobClass, [payload])
333+
},
334+
},
335+
})
336+
```
337+
338+
Example with injected dependencies:
339+
340+
```typescript
341+
import { Job } from '@boringnode/queue'
342+
343+
interface SendEmailPayload {
344+
to: string
345+
subject: string
346+
}
347+
348+
export default class SendEmailJob extends Job<SendEmailPayload> {
349+
static readonly jobName = 'SendEmailJob'
350+
351+
constructor(
352+
payload: SendEmailPayload,
353+
private mailer: MailerService, // Injected by IoC container
354+
private logger: Logger // Injected by IoC container
355+
) {
356+
super(payload)
357+
}
358+
359+
async execute(): Promise<void> {
360+
this.logger.info(`Sending email to ${this.payload.to}`)
361+
await this.mailer.send(this.payload)
362+
}
363+
}
364+
```
365+
366+
Without a `jobFactory`, jobs are instantiated with `new JobClass(payload)`.
367+
317368
## Job Discovery
318369

319370
The queue manager automatically discovers and registers jobs from the specified locations:
320371

321372
```typescript
322373
const config = {
323-
locations: [
324-
'./app/jobs/**/*.ts',
325-
'./modules/**/jobs/**/*.ts',
326-
],
374+
locations: ['./app/jobs/**/*.ts', './modules/**/jobs/**/*.ts'],
327375
}
328376
```
329377

330378
Jobs must:
379+
331380
- Extend the `Job` class
332381
- Have a static `jobName` property
333382
- Implement the `execute` method
@@ -357,20 +406,20 @@ By default, a simple console logger is used that only outputs warnings and error
357406

358407
Performance comparison with BullMQ using realistic jobs (5ms simulated work per job):
359408

360-
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
361-
|------|-------------|-------------------|--------|--------------|
362-
| 100 | 1 | 562ms | 596ms | 5.7% faster |
363-
| 100 | 5 | 116ms | 117ms | ~same |
364-
| 100 | 10 | 62ms | 62ms | ~same |
365-
| 500 | 1 | 2728ms | 2798ms | 2.5% faster |
366-
| 500 | 5 | 565ms | 565ms | ~same |
367-
| 500 | 10 | 287ms | 288ms | ~same |
368-
| 1000 | 1 | 5450ms | 5547ms | 1.7% faster |
369-
| 1000 | 5 | 1096ms | 1116ms | 1.8% faster |
370-
| 1000 | 10 | 565ms | 579ms | 2.4% faster |
371-
| 100K | 5 | 110.5s | 112.3s | 1.5% faster |
372-
| 100K | 10 | 56.2s | 57.5s | 2.1% faster |
373-
| 100K | 20 | 29.1s | 29.6s | 1.7% faster |
409+
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
410+
|------|-------------|-------------------|--------|-------------|
411+
| 100 | 1 | 562ms | 596ms | 5.7% faster |
412+
| 100 | 5 | 116ms | 117ms | ~same |
413+
| 100 | 10 | 62ms | 62ms | ~same |
414+
| 500 | 1 | 2728ms | 2798ms | 2.5% faster |
415+
| 500 | 5 | 565ms | 565ms | ~same |
416+
| 500 | 10 | 287ms | 288ms | ~same |
417+
| 1000 | 1 | 5450ms | 5547ms | 1.7% faster |
418+
| 1000 | 5 | 1096ms | 1116ms | 1.8% faster |
419+
| 1000 | 10 | 565ms | 579ms | 2.4% faster |
420+
| 100K | 5 | 110.5s | 112.3s | 1.5% faster |
421+
| 100K | 10 | 56.2s | 57.5s | 2.1% faster |
422+
| 100K | 20 | 29.1s | 29.6s | 1.7% faster |
374423

375424
Run benchmarks yourself:
376425

index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ export {
99
fixedBackoff,
1010
} from './src/strategies/backoff_strategy.js'
1111
export * as errors from './src/exceptions.js'
12+
13+
export type { JobFactory } from './src/types/main.js'

src/types/main.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,31 @@ export interface JobOptions {
2929

3030
export type JobClass<T extends Job = Job> = (new (payload: any) => T) & { options?: JobOptions }
3131

32+
/**
33+
* Factory function for custom job instantiation.
34+
*
35+
* Use this to integrate with IoC containers for dependency injection.
36+
* The factory receives the job class and payload, and must return
37+
* a job instance (or a Promise that resolves to one).
38+
*
39+
* @param JobClass - The job class to instantiate
40+
* @param payload - The payload data for the job
41+
* @returns The job instance, or a Promise resolving to the instance
42+
*
43+
* @example
44+
* ```typescript
45+
* // With AdonisJS IoC container
46+
* const worker = new Worker({
47+
* worker: {
48+
* jobFactory: async (JobClass, payload) => {
49+
* return app.container.make(JobClass, [payload])
50+
* }
51+
* }
52+
* })
53+
* ```
54+
*/
55+
export type JobFactory = (JobClass: JobClass, payload: any) => Job | Promise<Job>
56+
3257
export interface RetryConfig {
3358
maxRetries?: number
3459
backoff?: () => BackoffStrategyClass
@@ -103,6 +128,26 @@ export interface WorkerConfig {
103128
* Called before the worker starts stopping.
104129
*/
105130
onShutdownSignal?: () => void | Promise<void>
131+
132+
/**
133+
* Custom factory function for job instantiation.
134+
*
135+
* Use this to integrate with IoC containers for dependency injection.
136+
* When provided, this factory is called instead of `new JobClass(payload)`.
137+
*
138+
* @example
139+
* ```typescript
140+
* const worker = new Worker({
141+
* worker: {
142+
* jobFactory: async (JobClass, payload) => {
143+
* // Inject dependencies via IoC container
144+
* return app.container.make(JobClass, [payload])
145+
* }
146+
* }
147+
* })
148+
* ```
149+
*/
150+
jobFactory?: JobFactory
106151
}
107152

108153
export type WorkerCycle =

src/worker.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import * as errors from './exceptions.js'
66
import { QueueManager } from './queue_manager.js'
77
import { JobPool } from './job_pool.js'
88
import type { Adapter, AcquiredJob } from './contracts/adapter.js'
9-
import type { QueueManagerConfig, WorkerCycle } from './types/main.js'
9+
import type { JobFactory, QueueManagerConfig, WorkerCycle } from './types/main.js'
1010
import { Locator } from './locator.js'
1111
import type { JobOptions } from './types/main.js'
1212
import type { Job } from './job.js'
@@ -58,6 +58,7 @@ export class Worker {
5858
readonly #concurrency: number
5959
readonly #gracefulShutdown: boolean
6060
readonly #onShutdownSignal?: () => void | Promise<void>
61+
readonly #jobFactory?: JobFactory
6162

6263
#adapter!: Adapter
6364
#running = false
@@ -89,6 +90,7 @@ export class Worker {
8990
this.#concurrency = config.worker?.concurrency ?? 1
9091
this.#gracefulShutdown = config.worker?.gracefulShutdown ?? true
9192
this.#onShutdownSignal = config.worker?.onShutdownSignal
93+
this.#jobFactory = config.worker?.jobFactory
9294

9395
debug('created worker with id %s and config %O', this.#id, config)
9496
}
@@ -371,7 +373,9 @@ export class Worker {
371373
): Promise<{ instance: Job; options: JobOptions; timeout: number | undefined }> {
372374
try {
373375
const JobClass = Locator.getOrThrow(job.name)
374-
const instance = new JobClass(job.payload)
376+
const instance = this.#jobFactory
377+
? await this.#jobFactory(JobClass, job.payload)
378+
: new JobClass(job.payload)
375379
const options = JobClass.options || {}
376380
const timeout = this.#getJobTimeout(options)
377381

0 commit comments

Comments
 (0)