Skip to content

Commit 14e4284

Browse files
committed
feat(dispatcher): add ability to wait for job completion
1 parent bc8dc23 commit 14e4284

File tree

9 files changed

+111
-70
lines changed

9 files changed

+111
-70
lines changed

src/contracts/adapter.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,14 @@ export interface Adapter {
8484
* @param jobId - The job ID to complete
8585
* @param queue - The queue the job belongs to
8686
* @param removeOnComplete - Optional retention policy for completed jobs
87+
* @param output - Optional output returned by the job
8788
*/
88-
completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void>
89+
completeJob(
90+
jobId: string,
91+
queue: string,
92+
removeOnComplete?: JobRetention,
93+
output?: any
94+
): Promise<void>
8995

9096
/**
9197
* Mark a job as failed permanently and remove it from the queue.

src/drivers/knex_adapter.ts

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,7 @@ export class KnexAdapter implements Adapter {
117117

118118
// Update job to active status
119119
// For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim
120-
const updateQuery = trx(this.#jobsTable)
121-
.where('id', job.id)
122-
.where('queue', queue)
120+
const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue)
123121

124122
if (!this.#supportsSkipLocked()) {
125123
updateQuery.where('status', 'pending')
@@ -178,19 +176,21 @@ export class KnexAdapter implements Adapter {
178176
const priority = jobData.priority ?? DEFAULT_PRIORITY
179177
const score = calculateScore(priority, now)
180178

181-
await trx(this.#jobsTable)
182-
.where('id', job.id)
183-
.where('queue', queue)
184-
.update({
185-
status: 'pending',
186-
score,
187-
execute_at: null,
188-
})
179+
await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({
180+
status: 'pending',
181+
score,
182+
execute_at: null,
183+
})
189184
}
190185
})
191186
}
192187

193-
async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void> {
188+
async completeJob(
189+
jobId: string,
190+
queue: string,
191+
removeOnComplete?: JobRetention,
192+
output?: any
193+
): Promise<void> {
194194
const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete)
195195

196196
if (!keep) {
@@ -213,6 +213,7 @@ export class KnexAdapter implements Adapter {
213213
worker_id: null,
214214
acquired_at: null,
215215
finished_at: now,
216+
output: output ? JSON.stringify(output) : null,
216217
})
217218

218219
if (!updated) {
@@ -276,6 +277,7 @@ export class KnexAdapter implements Adapter {
276277
status: row.status as JobStatus,
277278
data: jobData,
278279
finishedAt: row.finished_at ? Number(row.finished_at) : undefined,
280+
output: row.output ? JSON.parse(row.output) : undefined,
279281
error: row.error || undefined,
280282
}
281283
}
@@ -331,33 +333,27 @@ export class KnexAdapter implements Adapter {
331333

332334
if (retryAt && retryAt.getTime() > now) {
333335
// Move to delayed
334-
await this.#connection(this.#jobsTable)
335-
.where('id', jobId)
336-
.where('queue', queue)
337-
.update({
338-
status: 'delayed',
339-
data: updatedData,
340-
worker_id: null,
341-
acquired_at: null,
342-
score: null,
343-
execute_at: retryAt.getTime(),
344-
})
336+
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
337+
status: 'delayed',
338+
data: updatedData,
339+
worker_id: null,
340+
acquired_at: null,
341+
score: null,
342+
execute_at: retryAt.getTime(),
343+
})
345344
} else {
346345
// Move back to pending
347346
const priority = jobData.priority ?? DEFAULT_PRIORITY
348347
const score = calculateScore(priority, now)
349348

350-
await this.#connection(this.#jobsTable)
351-
.where('id', jobId)
352-
.where('queue', queue)
353-
.update({
354-
status: 'pending',
355-
data: updatedData,
356-
worker_id: null,
357-
acquired_at: null,
358-
score,
359-
execute_at: null,
360-
})
349+
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
350+
status: 'pending',
351+
data: updatedData,
352+
worker_id: null,
353+
acquired_at: null,
354+
score,
355+
execute_at: null,
356+
})
361357
}
362358
}
363359

@@ -458,10 +454,7 @@ export class KnexAdapter implements Adapter {
458454

459455
if (currentStalledCount >= maxStalledCount) {
460456
// Fail permanently - remove the job
461-
await trx(this.#jobsTable)
462-
.where('id', row.id)
463-
.where('queue', queue)
464-
.delete()
457+
await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete()
465458
} else {
466459
// Recover: increment stalledCount and put back in pending
467460
jobData.stalledCount = currentStalledCount + 1
@@ -534,9 +527,9 @@ export class KnexAdapter implements Adapter {
534527
}
535528

536529
async getSchedule(id: string): Promise<ScheduleData | null> {
537-
const row = (await this.#connection(this.#schedulesTable)
538-
.where('id', id)
539-
.first()) as ScheduleRow | undefined
530+
const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as
531+
| ScheduleRow
532+
| undefined
540533
if (!row) return null
541534

542535
return this.#rowToScheduleData(row)
@@ -565,16 +558,12 @@ export class KnexAdapter implements Adapter {
565558
if (updates.runCount !== undefined) data.run_count = updates.runCount
566559

567560
if (Object.keys(data).length > 0) {
568-
await this.#connection(this.#schedulesTable)
569-
.where('id', id)
570-
.update(data)
561+
await this.#connection(this.#schedulesTable).where('id', id).update(data)
571562
}
572563
}
573564

574565
async deleteSchedule(id: string): Promise<void> {
575-
await this.#connection(this.#schedulesTable)
576-
.where('id', id)
577-
.delete()
566+
await this.#connection(this.#schedulesTable).where('id', id).delete()
578567
}
579568

580569
async claimDueSchedule(): Promise<ScheduleData | null> {
@@ -629,13 +618,11 @@ export class KnexAdapter implements Adapter {
629618
}
630619

631620
// Update atomically
632-
await trx(this.#schedulesTable)
633-
.where('id', row.id)
634-
.update({
635-
next_run_at: nextRunAt,
636-
last_run_at: now,
637-
run_count: newRunCount,
638-
})
621+
await trx(this.#schedulesTable).where('id', row.id).update({
622+
next_run_at: nextRunAt,
623+
last_run_at: now,
624+
run_count: newRunCount,
625+
})
639626

640627
// Return schedule data (before update state for payload)
641628
return this.#rowToScheduleData(row)

src/exceptions.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ export const E_JOB_NOT_FOUND = createError<[jobName: string]>(
3535
'E_JOB_NOT_FOUND'
3636
)
3737

38+
export const E_JOB_EXECUTION_NOT_FOUND = createError<[jobId: string]>(
39+
'The job execution "%s" could not be found',
40+
'E_JOB_EXECUTION_NOT_FOUND'
41+
)
42+
43+
export const E_JOB_EXECUTION_FAILED = createError<[jobId: string]>(
44+
'The job execution "%s" failed',
45+
'E_JOB_EXECUTION_FAILED'
46+
)
47+
3848
export const E_JOB_MAX_ATTEMPTS_REACHED = createError<[jobName: string]>(
3949
'The job "%s" has reached the maximum number of retry attempts',
4050
'E_JOB_MAX_ATTEMPTS_REACHED'

src/job.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import type { JobContext, JobOptions } from './types/main.js'
4646
* }
4747
* ```
4848
*/
49-
export abstract class Job<Payload = any> {
49+
export abstract class Job<Payload = any, Output = any> {
5050
#payload!: Payload
5151
#context!: JobContext
5252
#signal?: AbortSignal
@@ -173,12 +173,15 @@ export abstract class Job<Payload = any> {
173173
static dispatch<T extends Job>(
174174
this: abstract new (...args: any[]) => T,
175175
payload: T extends Job<infer P> ? P : never
176-
): JobDispatcher<T extends Job<infer P> ? P : never> {
176+
): JobDispatcher<T extends Job<infer P> ? P : never, T extends Job<any, infer O> ? O : never> {
177177
const jobClass = this as unknown as { options?: JobOptions; name: string }
178178
const options = jobClass.options || {}
179179
const jobName = options.name || this.name
180180

181-
const dispatcher = new JobDispatcher<T extends Job<infer P> ? P : never>(jobName, payload)
181+
const dispatcher = new JobDispatcher<
182+
T extends Job<infer P> ? P : never,
183+
T extends Job<any, infer O> ? O : never
184+
>(jobName, payload)
182185

183186
if (options.queue) {
184187
dispatcher.toQueue(options.queue)
@@ -305,7 +308,7 @@ export abstract class Job<Payload = any> {
305308
* }
306309
* ```
307310
*/
308-
abstract execute(): Promise<void>
311+
abstract execute(): Promise<Output>
309312

310313
/**
311314
* Called when the job has permanently failed (after all retries exhausted).

src/job_dispatcher.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ import { randomUUID } from 'node:crypto'
33
import { QueueManager } from './queue_manager.js'
44
import type { Adapter } from './contracts/adapter.js'
55
import type { DispatchResult, Duration } from './types/main.js'
6+
import { setTimeout } from 'node:timers/promises'
67
import { parse } from './utils.js'
8+
import { E_JOB_EXECUTION_FAILED, E_JOB_EXECUTION_NOT_FOUND } from './exceptions.ts'
79

810
/**
911
* Fluent builder for dispatching jobs to the queue.
@@ -37,9 +39,9 @@ import { parse } from './utils.js'
3739
* await ReminderJob.dispatch({ userId: 123 }).in('24h')
3840
* ```
3941
*/
40-
export class JobDispatcher<T> {
42+
export class JobDispatcher<TPayload, TOutput> {
4143
readonly #name: string
42-
readonly #payload: T
44+
readonly #payload: TPayload
4345
#queue: string = 'default'
4446
#adapter?: string | (() => Adapter)
4547
#delay?: Duration
@@ -52,7 +54,7 @@ export class JobDispatcher<T> {
5254
* @param name - The job class name (used to locate the class at runtime)
5355
* @param payload - The data to pass to the job
5456
*/
55-
constructor(name: string, payload: T) {
57+
constructor(name: string, payload: TPayload) {
5658
this.#name = name
5759
this.#payload = payload
5860
}
@@ -207,6 +209,39 @@ export class JobDispatcher<T> {
207209
}
208210
}
209211

212+
/**
213+
* Dispatch the job to the queue and
214+
* await for job to complete or fail.
215+
*
216+
* @param pollingInterval - Interval between each check
217+
* @param signal - Optional signal to abort waiting
218+
* @returns The job output
219+
*/
220+
async wait(pollingInterval: Duration = 2000, signal?: AbortSignal): Promise<TOutput> {
221+
const adapter = this.#getAdapterInstance()
222+
const dispatchResult = await this.run()
223+
224+
while (true) {
225+
signal?.throwIfAborted()
226+
227+
await setTimeout(parse(pollingInterval))
228+
229+
const job = await adapter.getJob(dispatchResult.jobId, this.#queue)
230+
231+
if (!job) {
232+
throw new E_JOB_EXECUTION_NOT_FOUND([dispatchResult.jobId])
233+
}
234+
235+
if (job.status === 'completed') {
236+
return job.output
237+
}
238+
239+
if (job.status === 'failed') {
240+
throw new E_JOB_EXECUTION_FAILED([dispatchResult.jobId], { cause: job.error })
241+
}
242+
}
243+
}
244+
210245
/**
211246
* Thenable implementation for auto-dispatch when awaited.
212247
*

src/job_runtime.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export class JobExecutionRuntime {
7575
/**
7676
* Execute a hydrated job instance and enforce the configured timeout.
7777
*/
78-
async execute(instance: Job, payload: unknown, context: JobContext): Promise<void> {
78+
async execute(instance: Job, payload: unknown, context: JobContext): Promise<unknown> {
7979
if (this.#timeout === undefined) {
8080
instance.$hydrate(payload, context)
8181
return instance.execute()
@@ -90,7 +90,7 @@ export class JobExecutionRuntime {
9090
)
9191

9292
try {
93-
await Promise.race([instance.execute(), abortPromise])
93+
return Promise.race([instance.execute(), abortPromise])
9494
} finally {
9595
cleanupAbortListener()
9696
}

src/services/queue_schema.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export class QueueSchemaService {
2020
table.string('queue', 255).notNullable()
2121
table.enu('status', ['pending', 'active', 'delayed', 'completed', 'failed']).notNullable()
2222
table.text('data').notNullable()
23+
table.text('output').nullable()
2324
table.bigint('score').unsigned().nullable()
2425
table.string('worker_id', 255).nullable()
2526
table.bigint('acquired_at').unsigned().nullable()
@@ -57,10 +58,7 @@ export class QueueSchemaService {
5758
table.integer('run_count').unsigned().notNullable().defaultTo(0)
5859
table.timestamp('next_run_at').nullable()
5960
table.timestamp('last_run_at').nullable()
60-
table
61-
.timestamp('created_at')
62-
.notNullable()
63-
.defaultTo(this.#connection.fn.now())
61+
table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now())
6462
table.index(['status', 'next_run_at'])
6563

6664
extend?.(table)

src/types/main.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ export interface JobRecord {
134134
finishedAt?: number
135135
/** Error message (for failed jobs) */
136136
error?: string
137+
/** Serialized job output */
138+
output?: any
137139
}
138140

139141
/**

src/worker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,8 @@ export class Worker {
338338
})
339339

340340
try {
341-
await runtime.execute(instance, payload, context)
342-
await this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)
341+
const output = await runtime.execute(instance, payload, context)
342+
await this.#adapter.completeJob(job.id, queue, retention.removeOnComplete, output)
343343

344344
const duration = (performance.now() - startTime).toFixed(2)
345345
debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, duration)

0 commit comments

Comments
 (0)