Skip to content

Commit 12c46f6

Browse files
committed
fix: improve graceful shutdown in Worker
1 parent ccdc019 commit 12c46f6

File tree

3 files changed

+97
-6
lines changed

3 files changed

+97
-6
lines changed

src/types/main.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,20 @@ export interface WorkerConfig {
8989
* @default 1
9090
*/
9191
maxStalledCount?: number
92+
93+
/**
94+
* Whether to automatically stop the worker on SIGINT/SIGTERM signals.
95+
* When enabled, the worker will wait for running jobs to complete
96+
* before stopping.
97+
* @default true
98+
*/
99+
gracefulShutdown?: boolean
100+
101+
/**
102+
* Callback invoked when a shutdown signal is received.
103+
* Called before the worker starts stopping.
104+
*/
105+
onShutdownSignal?: () => void | Promise<void>
92106
}
93107

94108
export type WorkerCycle =

src/worker.ts

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ export class Worker {
1919
readonly #stalledThreshold: number
2020
readonly #maxStalledCount: number
2121
readonly #concurrency: number
22+
readonly #gracefulShutdown: boolean
23+
readonly #onShutdownSignal?: () => void | Promise<void>
2224

2325
#adapter!: Adapter
2426
#running = false
2527
#initialized = false
2628
#generator?: AsyncGenerator<WorkerCycle, void, unknown>
2729
#pool?: JobPool
2830
#lastStalledCheck = 0
31+
#shutdownHandler?: () => Promise<void>
2932

3033
get id() {
3134
return this.#id
@@ -41,6 +44,8 @@ export class Worker {
4144
this.#stalledThreshold = parse(config.worker?.stalledThreshold ?? '30s')
4245
this.#maxStalledCount = config.worker?.maxStalledCount ?? 1
4346
this.#concurrency = config.worker?.concurrency ?? 1
47+
this.#gracefulShutdown = config.worker?.gracefulShutdown ?? true
48+
this.#onShutdownSignal = config.worker?.onShutdownSignal
4449

4550
debug('created worker with id %s and config %O', this.#id, config)
4651
}
@@ -74,7 +79,7 @@ export class Worker {
7479

7580
debug('starting worker %s on queues: %O', this.#id, queues)
7681

77-
await this.#setupGracefulShutdown()
82+
this.#setupGracefulShutdown()
7883

7984
for await (const cycle of this.process(queues)) {
8085
if (['started', 'completed'].includes(cycle.type)) {
@@ -109,6 +114,8 @@ export class Worker {
109114
if (this.#adapter) {
110115
await this.#adapter.destroy()
111116
}
117+
118+
this.#removeShutdownHandlers()
112119
}
113120

114121
async processCycle(queues: string[]): Promise<WorkerCycle | null> {
@@ -317,14 +324,30 @@ export class Worker {
317324
}
318325
}
319326

320-
async #setupGracefulShutdown() {
321-
const shutdown = async () => {
327+
#setupGracefulShutdown() {
328+
if (!this.#gracefulShutdown) {
329+
return
330+
}
331+
332+
this.#shutdownHandler = async () => {
322333
debug('received shutdown signal, stopping worker...')
334+
335+
if (this.#onShutdownSignal) {
336+
await this.#onShutdownSignal()
337+
}
338+
323339
await this.stop()
324-
process.exit(0)
325340
}
326341

327-
process.on('SIGINT', shutdown)
328-
process.on('SIGTERM', shutdown)
342+
process.on('SIGINT', this.#shutdownHandler)
343+
process.on('SIGTERM', this.#shutdownHandler)
344+
}
345+
346+
#removeShutdownHandlers() {
347+
if (this.#shutdownHandler) {
348+
process.off('SIGINT', this.#shutdownHandler)
349+
process.off('SIGTERM', this.#shutdownHandler)
350+
this.#shutdownHandler = undefined
351+
}
329352
}
330353
}

tests/worker.spec.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,4 +1051,58 @@ test.group('Worker', () => {
10511051
assert.equal(count, 1, `${jobId} should be executed exactly once`)
10521052
}
10531053
})
1054+
1055+
test('onShutdownSignal callback is invoked on SIGTERM', async ({ assert }) => {
1056+
let callbackInvoked = false
1057+
1058+
const localConfig = {
1059+
default: 'memory',
1060+
adapters: { memory: memory() },
1061+
worker: {
1062+
gracefulShutdown: true,
1063+
onShutdownSignal: () => {
1064+
callbackInvoked = true
1065+
},
1066+
},
1067+
}
1068+
1069+
const worker = new Worker(localConfig)
1070+
const startPromise = worker.start(['default'])
1071+
await setTimeout(10)
1072+
1073+
// Emit SIGTERM to trigger the shutdown handler
1074+
process.emit('SIGTERM')
1075+
1076+
// Wait for the worker to stop
1077+
await Promise.race([startPromise, setTimeout(500)])
1078+
1079+
assert.isTrue(callbackInvoked, 'onShutdownSignal should be called on SIGTERM')
1080+
})
1081+
1082+
test('onShutdownSignal callback is invoked on SIGINT', async ({ assert }) => {
1083+
let callbackInvoked = false
1084+
1085+
const localConfig = {
1086+
default: 'memory',
1087+
adapters: { memory: memory() },
1088+
worker: {
1089+
gracefulShutdown: true,
1090+
onShutdownSignal: () => {
1091+
callbackInvoked = true
1092+
},
1093+
},
1094+
}
1095+
1096+
const worker = new Worker(localConfig)
1097+
const startPromise = worker.start(['default'])
1098+
await setTimeout(10)
1099+
1100+
// Emit SIGINT to trigger the shutdown handler
1101+
process.emit('SIGINT')
1102+
1103+
// Wait for the worker to stop
1104+
await Promise.race([startPromise, setTimeout(500)])
1105+
1106+
assert.isTrue(callbackInvoked, 'onShutdownSignal should be called on SIGINT')
1107+
})
10541108
})

0 commit comments

Comments
 (0)