Skip to content

Commit 05eb6d2

Browse files
committed
feat(dispatcher): add ability to wait for job completion
1 parent 83db97d commit 05eb6d2

File tree

9 files changed

+153
-77
lines changed

9 files changed

+153
-77
lines changed

src/contracts/adapter.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,14 @@ export interface Adapter {
8484
* @param jobId - The job ID to complete
8585
* @param queue - The queue the job belongs to
8686
* @param removeOnComplete - Optional retention policy for completed jobs
87+
* @param output - Optional output returned by the job
8788
*/
88-
completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void>
89+
completeJob(
90+
jobId: string,
91+
queue: string,
92+
removeOnComplete?: JobRetention,
93+
output?: any
94+
): Promise<void>
8995

9096
/**
9197
* Mark a job as failed permanently and remove it from the queue.

src/drivers/knex_adapter.ts

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,7 @@ export class KnexAdapter implements Adapter {
117117

118118
// Update job to active status
119119
// For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim
120-
const updateQuery = trx(this.#jobsTable)
121-
.where('id', job.id)
122-
.where('queue', queue)
120+
const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue)
123121

124122
if (!this.#supportsSkipLocked()) {
125123
updateQuery.where('status', 'pending')
@@ -178,19 +176,21 @@ export class KnexAdapter implements Adapter {
178176
const priority = jobData.priority ?? DEFAULT_PRIORITY
179177
const score = calculateScore(priority, now)
180178

181-
await trx(this.#jobsTable)
182-
.where('id', job.id)
183-
.where('queue', queue)
184-
.update({
185-
status: 'pending',
186-
score,
187-
execute_at: null,
188-
})
179+
await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({
180+
status: 'pending',
181+
score,
182+
execute_at: null,
183+
})
189184
}
190185
})
191186
}
192187

193-
async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void> {
188+
async completeJob(
189+
jobId: string,
190+
queue: string,
191+
removeOnComplete?: JobRetention,
192+
output?: any
193+
): Promise<void> {
194194
const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete)
195195

196196
if (!keep) {
@@ -213,6 +213,7 @@ export class KnexAdapter implements Adapter {
213213
worker_id: null,
214214
acquired_at: null,
215215
finished_at: now,
216+
output: output ? JSON.stringify(output) : null,
216217
})
217218

218219
if (!updated) {
@@ -276,6 +277,7 @@ export class KnexAdapter implements Adapter {
276277
status: row.status as JobStatus,
277278
data: jobData,
278279
finishedAt: row.finished_at ? Number(row.finished_at) : undefined,
280+
output: row.output ? JSON.parse(row.output) : undefined,
279281
error: row.error || undefined,
280282
}
281283
}
@@ -331,33 +333,27 @@ export class KnexAdapter implements Adapter {
331333

332334
if (retryAt && retryAt.getTime() > now) {
333335
// Move to delayed
334-
await this.#connection(this.#jobsTable)
335-
.where('id', jobId)
336-
.where('queue', queue)
337-
.update({
338-
status: 'delayed',
339-
data: updatedData,
340-
worker_id: null,
341-
acquired_at: null,
342-
score: null,
343-
execute_at: retryAt.getTime(),
344-
})
336+
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
337+
status: 'delayed',
338+
data: updatedData,
339+
worker_id: null,
340+
acquired_at: null,
341+
score: null,
342+
execute_at: retryAt.getTime(),
343+
})
345344
} else {
346345
// Move back to pending
347346
const priority = jobData.priority ?? DEFAULT_PRIORITY
348347
const score = calculateScore(priority, now)
349348

350-
await this.#connection(this.#jobsTable)
351-
.where('id', jobId)
352-
.where('queue', queue)
353-
.update({
354-
status: 'pending',
355-
data: updatedData,
356-
worker_id: null,
357-
acquired_at: null,
358-
score,
359-
execute_at: null,
360-
})
349+
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
350+
status: 'pending',
351+
data: updatedData,
352+
worker_id: null,
353+
acquired_at: null,
354+
score,
355+
execute_at: null,
356+
})
361357
}
362358
}
363359

@@ -458,10 +454,7 @@ export class KnexAdapter implements Adapter {
458454

459455
if (currentStalledCount >= maxStalledCount) {
460456
// Fail permanently - remove the job
461-
await trx(this.#jobsTable)
462-
.where('id', row.id)
463-
.where('queue', queue)
464-
.delete()
457+
await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete()
465458
} else {
466459
// Recover: increment stalledCount and put back in pending
467460
jobData.stalledCount = currentStalledCount + 1
@@ -534,9 +527,9 @@ export class KnexAdapter implements Adapter {
534527
}
535528

536529
async getSchedule(id: string): Promise<ScheduleData | null> {
537-
const row = (await this.#connection(this.#schedulesTable)
538-
.where('id', id)
539-
.first()) as ScheduleRow | undefined
530+
const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as
531+
| ScheduleRow
532+
| undefined
540533
if (!row) return null
541534

542535
return this.#rowToScheduleData(row)
@@ -565,16 +558,12 @@ export class KnexAdapter implements Adapter {
565558
if (updates.runCount !== undefined) data.run_count = updates.runCount
566559

567560
if (Object.keys(data).length > 0) {
568-
await this.#connection(this.#schedulesTable)
569-
.where('id', id)
570-
.update(data)
561+
await this.#connection(this.#schedulesTable).where('id', id).update(data)
571562
}
572563
}
573564

574565
async deleteSchedule(id: string): Promise<void> {
575-
await this.#connection(this.#schedulesTable)
576-
.where('id', id)
577-
.delete()
566+
await this.#connection(this.#schedulesTable).where('id', id).delete()
578567
}
579568

580569
async claimDueSchedule(): Promise<ScheduleData | null> {
@@ -629,13 +618,11 @@ export class KnexAdapter implements Adapter {
629618
}
630619

631620
// Update atomically
632-
await trx(this.#schedulesTable)
633-
.where('id', row.id)
634-
.update({
635-
next_run_at: nextRunAt,
636-
last_run_at: now,
637-
run_count: newRunCount,
638-
})
621+
await trx(this.#schedulesTable).where('id', row.id).update({
622+
next_run_at: nextRunAt,
623+
last_run_at: now,
624+
run_count: newRunCount,
625+
})
639626

640627
// Return schedule data (before update state for payload)
641628
return this.#rowToScheduleData(row)

src/exceptions.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ export const E_JOB_NOT_FOUND = createError<[jobName: string]>(
3535
'E_JOB_NOT_FOUND'
3636
)
3737

38+
export const E_JOB_EXECUTION_NOT_FOUND = createError<[jobId: string]>(
39+
'The job execution "%s" could not be found',
40+
'E_JOB_EXECUTION_NOT_FOUND'
41+
)
42+
43+
export const E_JOB_EXECUTION_FAILED = createError<[jobId: string]>(
44+
'The job execution "%s" failed',
45+
'E_JOB_EXECUTION_FAILED'
46+
)
47+
3848
export const E_JOB_MAX_ATTEMPTS_REACHED = createError<[jobName: string]>(
3949
'The job "%s" has reached the maximum number of retry attempts',
4050
'E_JOB_MAX_ATTEMPTS_REACHED'

src/job.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import type { JobContext, JobOptions } from './types/main.js'
4646
* }
4747
* ```
4848
*/
49-
export abstract class Job<Payload = any> {
49+
export abstract class Job<Payload = any, Output = any> {
5050
#payload!: Payload
5151
#context!: JobContext
5252
#signal?: AbortSignal
@@ -173,12 +173,15 @@ export abstract class Job<Payload = any> {
173173
static dispatch<T extends Job>(
174174
this: abstract new (...args: any[]) => T,
175175
payload: T extends Job<infer P> ? P : never
176-
): JobDispatcher<T extends Job<infer P> ? P : never> {
176+
): JobDispatcher<T extends Job<infer P> ? P : never, T extends Job<any, infer O> ? O : never> {
177177
const jobClass = this as unknown as { options?: JobOptions; name: string }
178178
const options = jobClass.options || {}
179179
const jobName = options.name || this.name
180180

181-
const dispatcher = new JobDispatcher<T extends Job<infer P> ? P : never>(jobName, payload)
181+
const dispatcher = new JobDispatcher<
182+
T extends Job<infer P> ? P : never,
183+
T extends Job<any, infer O> ? O : never
184+
>(jobName, payload)
182185

183186
if (options.queue) {
184187
dispatcher.toQueue(options.queue)
@@ -305,7 +308,7 @@ export abstract class Job<Payload = any> {
305308
* }
306309
* ```
307310
*/
308-
abstract execute(): Promise<void>
311+
abstract execute(): Promise<Output>
309312

310313
/**
311314
* Called when the job has permanently failed (after all retries exhausted).

src/job_dispatcher.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import { dispatchChannel } from './tracing_channels.js'
55
import type { Adapter } from './contracts/adapter.js'
66
import type { DispatchResult, Duration } from './types/main.js'
77
import type { JobDispatchMessage } from './types/tracing_channels.js'
8+
import { setTimeout } from 'node:timers/promises'
89
import { parse } from './utils.js'
10+
import { E_JOB_EXECUTION_FAILED, E_JOB_EXECUTION_NOT_FOUND } from './exceptions.ts'
911

1012
/**
1113
* Fluent builder for dispatching jobs to the queue.
@@ -39,9 +41,9 @@ import { parse } from './utils.js'
3941
* await ReminderJob.dispatch({ userId: 123 }).in('24h')
4042
* ```
4143
*/
42-
export class JobDispatcher<T> {
44+
export class JobDispatcher<TPayload, TOutput> {
4345
readonly #name: string
44-
readonly #payload: T
46+
readonly #payload: TPayload
4547
#queue: string = 'default'
4648
#adapter?: string | (() => Adapter)
4749
#delay?: Duration
@@ -54,7 +56,7 @@ export class JobDispatcher<T> {
5456
* @param name - The job class name (used to locate the class at runtime)
5557
* @param payload - The data to pass to the job
5658
*/
57-
constructor(name: string, payload: T) {
59+
constructor(name: string, payload: TPayload) {
5860
this.#name = name
5961
this.#payload = payload
6062
}
@@ -211,6 +213,39 @@ export class JobDispatcher<T> {
211213
return { jobId: id }
212214
}
213215

216+
/**
217+
* Dispatch the job to the queue and
218+
* await for job to complete or fail.
219+
*
220+
* @param pollingInterval - Interval between each check
221+
* @param signal - Optional signal to abort waiting
222+
* @returns The job output
223+
*/
224+
async wait(pollingInterval: Duration = 2000, signal?: AbortSignal): Promise<TOutput> {
225+
const adapter = this.#getAdapterInstance()
226+
const dispatchResult = await this.run()
227+
228+
while (true) {
229+
signal?.throwIfAborted()
230+
231+
await setTimeout(parse(pollingInterval))
232+
233+
const job = await adapter.getJob(dispatchResult.jobId, this.#queue)
234+
235+
if (!job) {
236+
throw new E_JOB_EXECUTION_NOT_FOUND([dispatchResult.jobId])
237+
}
238+
239+
if (job.status === 'completed') {
240+
return job.output
241+
}
242+
243+
if (job.status === 'failed') {
244+
throw new E_JOB_EXECUTION_FAILED([dispatchResult.jobId], { cause: job.error })
245+
}
246+
}
247+
}
248+
214249
/**
215250
* Thenable implementation for auto-dispatch when awaited.
216251
*

src/job_runtime.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export class JobExecutionRuntime {
7575
/**
7676
* Execute a hydrated job instance and enforce the configured timeout.
7777
*/
78-
async execute(instance: Job, payload: unknown, context: JobContext): Promise<void> {
78+
async execute(instance: Job, payload: unknown, context: JobContext): Promise<unknown> {
7979
if (this.#timeout === undefined) {
8080
instance.$hydrate(payload, context)
8181
return instance.execute()
@@ -90,7 +90,7 @@ export class JobExecutionRuntime {
9090
)
9191

9292
try {
93-
await Promise.race([instance.execute(), abortPromise])
93+
return Promise.race([instance.execute(), abortPromise])
9494
} finally {
9595
cleanupAbortListener()
9696
}

src/services/queue_schema.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export class QueueSchemaService {
2020
table.string('queue', 255).notNullable()
2121
table.enu('status', ['pending', 'active', 'delayed', 'completed', 'failed']).notNullable()
2222
table.text('data').notNullable()
23+
table.text('output').nullable()
2324
table.bigint('score').unsigned().nullable()
2425
table.string('worker_id', 255).nullable()
2526
table.bigint('acquired_at').unsigned().nullable()
@@ -57,10 +58,7 @@ export class QueueSchemaService {
5758
table.integer('run_count').unsigned().notNullable().defaultTo(0)
5859
table.timestamp('next_run_at').nullable()
5960
table.timestamp('last_run_at').nullable()
60-
table
61-
.timestamp('created_at')
62-
.notNullable()
63-
.defaultTo(this.#connection.fn.now())
61+
table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now())
6462
table.index(['status', 'next_run_at'])
6563

6664
extend?.(table)

src/types/main.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ export interface JobRecord {
141141
finishedAt?: number
142142
/** Error message (for failed jobs) */
143143
error?: string
144+
/** Serialized job output */
145+
output?: any
144146
}
145147

146148
/**

0 commit comments

Comments
 (0)