Skip to content

Commit 708074e

Browse files
committed
fix(worker)!: keep adapter lifecycle owned by QueueManager
1 parent 1432791 commit 708074e

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

src/worker.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ export class Worker {
169169
/**
170170
* Stop the worker gracefully.
171171
*
172-
* Waits for all running jobs to complete before shutting down.
172+
* Waits for all running jobs to complete before stopping job consumption.
173+
* Adapter cleanup remains the responsibility of `QueueManager.destroy()`.
173174
* Called automatically on SIGINT/SIGTERM if gracefulShutdown is enabled.
174175
*/
175176
async stop() {
@@ -182,10 +183,6 @@ export class Worker {
182183
await this.#pool.drain()
183184
}
184185

185-
if (this.#adapter) {
186-
await this.#adapter.destroy()
187-
}
188-
189186
this.#removeShutdownHandlers()
190187
}
191188

tests/worker.spec.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,37 @@
11
import { test } from '@japa/runner'
22
import { setTimeout } from 'node:timers/promises'
33
import { Worker } from '../src/worker.js'
4-
import { memory } from './_mocks/memory_adapter.js'
4+
import { MemoryAdapter, memory } from './_mocks/memory_adapter.js'
55
import { ChaosAdapter } from './_mocks/chaos_adapter.js'
66
import type { QueueManagerConfig } from '../src/types/main.js'
77
import { Locator } from '../src/locator.js'
88
import { Job } from '../src/job.js'
9+
import { QueueManager } from '../src/queue_manager.js'
910
import * as errors from '../src/exceptions.js'
1011

1112
const config = {
1213
default: 'memory',
1314
adapters: { memory: memory() },
1415
} satisfies QueueManagerConfig
1516

17+
class DestroyAwareMemoryAdapter extends MemoryAdapter {
18+
destroyed = false
19+
20+
override async pushOn(...args: Parameters<MemoryAdapter['pushOn']>) {
21+
if (this.destroyed) {
22+
throw new Error('adapter is destroyed')
23+
}
24+
25+
return super.pushOn(...args)
26+
}
27+
28+
override destroy(): Promise<void> {
29+
this.destroyed = true
30+
31+
return super.destroy()
32+
}
33+
}
34+
1635
test.group('Worker', () => {
1736
test('should create a worker with a unique worker ID', ({ assert, cleanup }) => {
1837
const worker1 = new Worker(config)
@@ -773,6 +792,50 @@ test.group('Worker', () => {
773792
assert.isTrue(jobCompleted, 'Job should have completed before worker stopped')
774793
})
775794

795+
test('should not destroy the shared adapter when stopping', async ({ assert, cleanup }) => {
796+
const adapters: DestroyAwareMemoryAdapter[] = []
797+
798+
const localConfig = {
799+
default: 'memory',
800+
adapters: {
801+
memory: () => {
802+
const adapter = new DestroyAwareMemoryAdapter()
803+
adapters.push(adapter)
804+
return adapter
805+
},
806+
},
807+
}
808+
809+
const worker = new Worker(localConfig)
810+
811+
cleanup(async () => {
812+
await QueueManager.destroy()
813+
})
814+
815+
await worker.init()
816+
817+
const firstAdapter = QueueManager.use()
818+
819+
await worker.stop()
820+
821+
const secondAdapter = QueueManager.use()
822+
823+
assert.strictEqual(secondAdapter, firstAdapter)
824+
assert.equal(adapters.length, 1)
825+
assert.isFalse(adapters[0].destroyed)
826+
827+
await secondAdapter.pushOn('default', {
828+
id: 'post-stop-job',
829+
name: 'TestJob',
830+
payload: {},
831+
attempts: 0,
832+
})
833+
834+
await QueueManager.destroy()
835+
836+
assert.isTrue(adapters[0].destroyed)
837+
})
838+
776839
test('should handle job that fails permanently', async ({ assert, cleanup }) => {
777840
let failedCalled = false
778841

0 commit comments

Comments
 (0)