Skip to content

Commit 26d8043

Browse files
committed
feat!: expose job context to job instances
1 parent c1563b5 commit 26d8043

File tree

7 files changed

+495
-64
lines changed

7 files changed

+495
-64
lines changed

src/drivers/sync_adapter.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { Locator } from '../locator.js'
2+
import { QueueManager } from '../queue_manager.js'
23
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
3-
import type { JobData } from '../types/main.js'
4+
import type { JobContext, JobData } from '../types/main.js'
5+
import { DEFAULT_PRIORITY } from '../constants.js'
46

7+
/**
8+
* Create a sync adapter factory.
9+
*/
510
export function sync() {
611
return () => new SyncAdapter()
712
}
@@ -17,17 +22,17 @@ export class SyncAdapter implements Adapter {
1722
return this.pushOn('default', jobData)
1823
}
1924

20-
pushOn(_queue: string, jobData: JobData): Promise<void> {
21-
return this.#execute(jobData.name, jobData.payload)
25+
pushOn(queue: string, jobData: JobData): Promise<void> {
26+
return this.#execute(jobData.name, jobData.payload, queue)
2227
}
2328

2429
pushLater(jobData: JobData, delay: number): Promise<void> {
2530
return this.pushLaterOn('default', jobData, delay)
2631
}
2732

28-
pushLaterOn(_queue: string, jobData: JobData, delay: number): Promise<void> {
33+
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
2934
setTimeout(() => {
30-
void this.#execute(jobData.name, jobData.payload)
35+
void this.#execute(jobData.name, jobData.payload, queue)
3136
}, delay)
3237

3338
return Promise.resolve()
@@ -74,14 +79,28 @@ export class SyncAdapter implements Adapter {
7479
return Promise.resolve()
7580
}
7681

77-
async #execute(jobName: string, payload: any): Promise<any> {
82+
async #execute(jobName: string, payload: any, queue: string = 'default'): Promise<any> {
7883
const JobClass = Locator.get(jobName)
7984

8085
if (!JobClass) {
8186
throw new Error(`Job class ${jobName} not found.`)
8287
}
8388

84-
const jobInstance = new JobClass(payload)
89+
const context: JobContext = Object.freeze({
90+
jobId: `sync-${Date.now()}`,
91+
name: jobName,
92+
attempt: 1,
93+
queue,
94+
priority: DEFAULT_PRIORITY,
95+
acquiredAt: new Date(),
96+
stalledCount: 0,
97+
})
98+
99+
const jobFactory = QueueManager.getJobFactory()
100+
const jobInstance = jobFactory
101+
? await jobFactory(JobClass, payload, context)
102+
: new JobClass(payload, context)
103+
85104
await jobInstance.execute()
86105
}
87106
}

src/job.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { JobDispatcher } from './job_dispatcher.js'
2-
import type { JobOptions } from './types/main.js'
2+
import type { JobContext, JobOptions } from './types/main.js'
33

44
/**
55
* Abstract base class for all queue jobs.
@@ -12,6 +12,7 @@ import type { JobOptions } from './types/main.js'
1212
* @example
1313
* ```typescript
1414
* import { Job } from '@boringnode/queue'
15+
* import type { JobContext } from '@boringnode/queue'
1516
*
1617
* interface SendEmailPayload {
1718
* to: string
@@ -25,7 +26,12 @@ import type { JobOptions } from './types/main.js'
2526
* maxRetries: 3,
2627
* }
2728
*
29+
* constructor(payload: SendEmailPayload, context: JobContext) {
30+
* super(payload, context)
31+
* }
32+
*
2833
* async execute() {
34+
* console.log(`Attempt ${this.context.attempt} for job ${this.context.jobId}`)
2935
* await sendEmail(this.payload.to, this.payload.subject, this.payload.body)
3036
* }
3137
*
@@ -37,6 +43,7 @@ import type { JobOptions } from './types/main.js'
3743
*/
3844
export abstract class Job<Payload = any> {
3945
readonly #payload: Payload
46+
readonly #context: JobContext
4047

4148
/** Static options for this job class (queue, retries, timeout, etc.) */
4249
static options: JobOptions = {}
@@ -46,13 +53,35 @@ export abstract class Job<Payload = any> {
4653
return this.#payload
4754
}
4855

56+
/**
57+
* Context information for the current job execution.
58+
*
59+
* Provides metadata such as job ID, current attempt number,
60+
* queue name, priority, and timing information.
61+
*
62+
* @example
63+
* ```typescript
64+
* async execute() {
65+
* if (this.context.attempt > 1) {
66+
* console.log(`Retry attempt ${this.context.attempt}`)
67+
* }
68+
* console.log(`Processing job ${this.context.jobId} on queue ${this.context.queue}`)
69+
* }
70+
* ```
71+
*/
72+
get context(): JobContext {
73+
return this.#context
74+
}
75+
4976
/**
5077
* Create a new job instance.
5178
*
5279
* @param payload - The data to be processed by this job
80+
* @param context - The job execution context (provided by the worker)
5381
*/
54-
constructor(payload: Payload) {
82+
constructor(payload: Payload, context: JobContext) {
5583
this.#payload = payload
84+
this.#context = Object.freeze(context)
5685
}
5786

5887
/**
@@ -79,7 +108,7 @@ export abstract class Job<Payload = any> {
79108
* ```
80109
*/
81110
static dispatch<T extends Job>(
82-
this: new (payload: any) => T,
111+
this: new (payload: any, context: JobContext) => T,
83112
payload: T extends Job<infer P> ? P : never
84113
): JobDispatcher<T extends Job<infer P> ? P : never> {
85114
const dispatcher = new JobDispatcher<T extends Job<infer P> ? P : never>(

src/queue_manager.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@ import debug from './debug.js'
33
import { Locator } from './locator.js'
44
import { consoleLogger, type Logger } from './logger.js'
55
import type { Adapter } from './contracts/adapter.js'
6-
import type { AdapterFactory, QueueConfig, QueueManagerConfig, RetryConfig } from './types/main.js'
6+
import type {
7+
AdapterFactory,
8+
JobFactory,
9+
QueueConfig,
10+
QueueManagerConfig,
11+
RetryConfig,
12+
} from './types/main.js'
713

814
/**
915
* Central configuration and adapter management for the queue system.
@@ -44,6 +50,7 @@ class QueueManagerSingleton {
4450
#globalRetryConfig?: RetryConfig
4551
#queueConfigs: Map<string, QueueConfig> = new Map()
4652
#logger: Logger = consoleLogger
53+
#jobFactory?: JobFactory
4754

4855
/**
4956
* Initialize the queue system with the given configuration.
@@ -80,6 +87,7 @@ class QueueManagerSingleton {
8087
this.#adapters = config.adapters
8188
this.#globalRetryConfig = config.retry
8289
this.#logger = config.logger ?? consoleLogger
90+
this.#jobFactory = config.jobFactory
8391

8492
if (config.queues) {
8593
for (const [queue, queueConfig] of Object.entries(config.queues)) {
@@ -190,6 +198,15 @@ class QueueManagerSingleton {
190198
return { maxRetries, backoff }
191199
}
192200

201+
/**
202+
* Get the configured job factory for custom instantiation.
203+
*
204+
* @returns The job factory function, or undefined if not configured
205+
*/
206+
getJobFactory(): JobFactory | undefined {
207+
return this.#jobFactory
208+
}
209+
193210
#validateConfig(config: QueueManagerConfig): void {
194211
if (!config.adapters || Object.keys(config.adapters).length === 0) {
195212
throw new errors.E_CONFIGURATION_ERROR(['At least one adapter must be configured'])

src/types/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ export type {
33
JobData,
44
JobOptions,
55
JobClass,
6+
JobContext,
7+
JobFactory,
68
RetryConfig,
79
BackoffStrategy,
810
BackoffConfig,

src/types/main.ts

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,78 @@ export interface JobOptions {
2727
failOnTimeout?: boolean
2828
}
2929

30-
export type JobClass<T extends Job = Job> = (new (payload: any) => T) & { options?: JobOptions }
30+
/**
31+
* Context information available to a job during execution.
32+
*
33+
* Provides metadata about the current job execution, including
34+
* retry information, queue details, and timing.
35+
*
36+
* @example
37+
* ```typescript
38+
* class MyJob extends Job<Payload> {
39+
* async execute() {
40+
* console.log(`Attempt ${this.context.attempt} of job ${this.context.jobId}`)
41+
* console.log(`Running on queue: ${this.context.queue}`)
42+
* }
43+
* }
44+
* ```
45+
*/
46+
export interface JobContext {
47+
/** Unique identifier for this job */
48+
jobId: string
49+
50+
/** Job class name */
51+
name: string
52+
53+
/** Current attempt number (1-based: first attempt = 1) */
54+
attempt: number
55+
56+
/** Queue name this job is being processed from */
57+
queue: string
58+
59+
/** Job priority (lower number = higher priority) */
60+
priority: number
61+
62+
/** When this job was acquired by the worker for processing */
63+
acquiredAt: Date
64+
65+
/** Number of times this job has been recovered from stalled state */
66+
stalledCount: number
67+
}
68+
69+
export type JobClass<T extends Job = Job> = (new (payload: any, context: JobContext) => T) & {
70+
options?: JobOptions
71+
}
3172

3273
/**
3374
* Factory function for custom job instantiation.
3475
*
3576
* Use this to integrate with IoC containers for dependency injection.
36-
* The factory receives the job class and payload, and must return
77+
* The factory receives the job class, payload, and context, and must return
3778
* a job instance (or a Promise that resolves to one).
3879
*
3980
* @param JobClass - The job class to instantiate
4081
* @param payload - The payload data for the job
82+
* @param context - The job execution context (jobId, attempt, queue, etc.)
4183
* @returns The job instance, or a Promise resolving to the instance
4284
*
4385
* @example
4486
* ```typescript
4587
* // With AdonisJS IoC container
4688
* const worker = new Worker({
4789
* worker: {
48-
* jobFactory: async (JobClass, payload) => {
49-
* return app.container.make(JobClass, [payload])
90+
* jobFactory: async (JobClass, payload, context) => {
91+
* return app.container.make(JobClass, [payload, context])
5092
* }
5193
* }
5294
* })
5395
* ```
5496
*/
55-
export type JobFactory = (JobClass: JobClass, payload: any) => Job | Promise<Job>
97+
export type JobFactory = (
98+
JobClass: JobClass,
99+
payload: any,
100+
context: JobContext
101+
) => Job | Promise<Job>
56102

57103
export interface RetryConfig {
58104
maxRetries?: number
@@ -128,26 +174,6 @@ export interface WorkerConfig {
128174
* Called before the worker starts stopping.
129175
*/
130176
onShutdownSignal?: () => void | Promise<void>
131-
132-
/**
133-
* Custom factory function for job instantiation.
134-
*
135-
* Use this to integrate with IoC containers for dependency injection.
136-
* When provided, this factory is called instead of `new JobClass(payload)`.
137-
*
138-
* @example
139-
* ```typescript
140-
* const worker = new Worker({
141-
* worker: {
142-
* jobFactory: async (JobClass, payload) => {
143-
* // Inject dependencies via IoC container
144-
* return app.container.make(JobClass, [payload])
145-
* }
146-
* }
147-
* })
148-
* ```
149-
*/
150-
jobFactory?: JobFactory
151177
}
152178

153179
export type WorkerCycle =
@@ -166,4 +192,23 @@ export interface QueueManagerConfig {
166192
worker?: WorkerConfig
167193
locations?: string[]
168194
logger?: Logger
195+
196+
/**
197+
* Custom factory function for job instantiation.
198+
*
199+
* Use this to integrate with IoC containers for dependency injection.
200+
* When provided, this factory is called instead of `new JobClass(payload, context)`.
201+
*
202+
* @example
203+
* ```typescript
204+
* await QueueManager.init({
205+
* default: 'redis',
206+
* adapters: { redis: redis() },
207+
* jobFactory: async (JobClass, payload, context) => {
208+
* return app.container.make(JobClass, [payload, context])
209+
* }
210+
* })
211+
* ```
212+
*/
213+
jobFactory?: JobFactory
169214
}

src/worker.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import * as errors from './exceptions.js'
66
import { QueueManager } from './queue_manager.js'
77
import { JobPool } from './job_pool.js'
88
import type { Adapter, AcquiredJob } from './contracts/adapter.js'
9-
import type { JobFactory, QueueManagerConfig, WorkerCycle } from './types/main.js'
9+
import type { JobContext, JobOptions, QueueManagerConfig, WorkerCycle } from './types/main.js'
1010
import { Locator } from './locator.js'
11-
import type { JobOptions } from './types/main.js'
11+
import { DEFAULT_PRIORITY } from './constants.js'
1212
import type { Job } from './job.js'
1313
import {
1414
DEFAULT_IDLE_DELAY,
@@ -58,7 +58,6 @@ export class Worker {
5858
readonly #concurrency: number
5959
readonly #gracefulShutdown: boolean
6060
readonly #onShutdownSignal?: () => void | Promise<void>
61-
readonly #jobFactory?: JobFactory
6261

6362
#adapter!: Adapter
6463
#running = false
@@ -90,7 +89,6 @@ export class Worker {
9089
this.#concurrency = config.worker?.concurrency ?? 1
9190
this.#gracefulShutdown = config.worker?.gracefulShutdown ?? true
9291
this.#onShutdownSignal = config.worker?.onShutdownSignal
93-
this.#jobFactory = config.worker?.jobFactory
9492

9593
debug('created worker with id %s and config %O', this.#id, config)
9694
}
@@ -373,9 +371,21 @@ export class Worker {
373371
): Promise<{ instance: Job; options: JobOptions; timeout: number | undefined }> {
374372
try {
375373
const JobClass = Locator.getOrThrow(job.name)
376-
const instance = this.#jobFactory
377-
? await this.#jobFactory(JobClass, job.payload)
378-
: new JobClass(job.payload)
374+
375+
const context: JobContext = Object.freeze({
376+
jobId: job.id,
377+
name: job.name,
378+
attempt: job.attempts + 1,
379+
queue,
380+
priority: job.priority ?? DEFAULT_PRIORITY,
381+
acquiredAt: new Date(job.acquiredAt),
382+
stalledCount: job.stalledCount ?? 0,
383+
})
384+
385+
const jobFactory = QueueManager.getJobFactory()
386+
const instance = jobFactory
387+
? await jobFactory(JobClass, job.payload, context)
388+
: new JobClass(job.payload, context)
379389
const options = JobClass.options || {}
380390
const timeout = this.#getJobTimeout(options)
381391

0 commit comments

Comments
 (0)