Skip to content

Commit 11712b3

Browse files
committed
fix(queue-manager): destroy cached adapters before reinit
1 parent 708074e commit 11712b3

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

src/queue_manager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ class QueueManagerSingleton {
8787

8888
this.#validateConfig(config)
8989

90+
for (const [name, adapter] of this.#adapterInstances) {
91+
debug('destroying adapter "%s" before reinitialization', name)
92+
await adapter.destroy()
93+
}
94+
9095
this.#adapterInstances.clear()
9196

9297
this.#defaultAdapter = config.default

tests/queue_manager.spec.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as errors from '../src/exceptions.js'
33
import { QueueManager } from '../src/queue_manager.js'
44
import { sync } from '../src/drivers/sync_adapter.js'
55
import { MemoryLogger } from './_mocks/memory_logger.js'
6+
import type { Adapter } from '../src/contracts/adapter.js'
67

78
test.group('QueueManager', () => {
89
test('should validate adapter presence', async ({ assert }) => {
@@ -166,4 +167,75 @@ test.group('QueueManager', () => {
166167

167168
await QueueManager.destroy()
168169
})
170+
171+
test('should destroy existing adapter instances before reinitializing', async ({
172+
assert,
173+
cleanup,
174+
}) => {
175+
const adapters: Adapter[] = []
176+
let destroyedCount = 0
177+
178+
const createAdapter = (): Adapter => ({
179+
setWorkerId() {},
180+
pop: async () => null,
181+
popFrom: async () => null,
182+
recoverStalledJobs: async () => 0,
183+
completeJob: async () => {},
184+
failJob: async () => {},
185+
retryJob: async () => {},
186+
getJob: async () => null,
187+
push: async () => {},
188+
pushOn: async () => {},
189+
pushLater: async () => {},
190+
pushLaterOn: async () => {},
191+
pushMany: async () => {},
192+
pushManyOn: async () => {},
193+
size: async () => 0,
194+
sizeOf: async () => 0,
195+
destroy: async () => {
196+
destroyedCount++
197+
},
198+
upsertSchedule: async () => 'schedule-id',
199+
createSchedule: async () => 'schedule-id',
200+
getSchedule: async () => null,
201+
listSchedules: async () => [],
202+
updateSchedule: async () => {},
203+
deleteSchedule: async () => {},
204+
claimDueSchedule: async () => null,
205+
})
206+
207+
cleanup(async () => {
208+
await QueueManager.destroy()
209+
})
210+
211+
await QueueManager.init({
212+
default: 'custom',
213+
adapters: {
214+
custom: () => {
215+
const adapter = createAdapter()
216+
adapters.push(adapter)
217+
return adapter
218+
},
219+
},
220+
})
221+
222+
const firstAdapter = QueueManager.use()
223+
224+
await QueueManager.init({
225+
default: 'custom',
226+
adapters: {
227+
custom: () => {
228+
const adapter = createAdapter()
229+
adapters.push(adapter)
230+
return adapter
231+
},
232+
},
233+
})
234+
235+
const secondAdapter = QueueManager.use()
236+
237+
assert.strictEqual(firstAdapter, adapters[0])
238+
assert.strictEqual(secondAdapter, adapters[1])
239+
assert.equal(destroyedCount, 1)
240+
})
169241
})

0 commit comments

Comments
 (0)