From 4a5e70379ae905e8019b57d970bab664459527fb Mon Sep 17 00:00:00 2001 From: gilboom Date: Thu, 7 May 2026 18:14:26 +0800 Subject: [PATCH] fix(pool): reject queued requests on end --- lib/base/pool.js | 4 ++ test/integration/test-pool-end.test.mts | 81 +++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/lib/base/pool.js b/lib/base/pool.js index f6d08929de..3bf2fdef32 100644 --- a/lib/base/pool.js +++ b/lib/base/pool.js @@ -185,6 +185,10 @@ class BasePool extends EventEmitter { } }; } + while (this._connectionQueue.length > 0) { + const queuedCallback = this._connectionQueue.shift(); + process.nextTick(() => queuedCallback(new Error('Pool is closed.'))); + } let calledBack = false; let closedConnections = 0; let connection; diff --git a/test/integration/test-pool-end.test.mts b/test/integration/test-pool-end.test.mts index 8ae24f847a..0b1eb5d1ed 100644 --- a/test/integration/test-pool-end.test.mts +++ b/test/integration/test-pool-end.test.mts @@ -1,6 +1,17 @@ import { describe, it, strict } from 'poku'; import { createPool } from '../common.test.mjs'; +function timeoutAfter(ms: number, message: string): Promise { + return new Promise((_, reject) => { + setTimeout(() => reject(new Error(message)), ms); + }); +} + +function getConnectionQueueLength(pool: unknown): number { + // @ts-expect-error: internal access + return pool.pool._connectionQueue.length; +} + await describe('Pool End', async () => { const pool = createPool(); @@ -53,3 +64,73 @@ await describe('Pool end should close all connections and mark as closed', async strict(pool._closed === true, 'pool should be closed'); }); }); + +await describe('Promise Pool End', async () => { + await it('should reject queued queries when ending a saturated pool', async () => { + const pool = createPool({ + connectionLimit: 2, + gracefulEnd: true, + }).promise(); + + const warmedConnections = await Promise.all([ + pool.getConnection(), + pool.getConnection(), + ]); + + for (const conn of warmedConnections) { + conn.release(); + } + + let enqueued = 0; + const waitForQueuedQueries = new Promise((resolve) => { + pool.on('enqueue', () => { + enqueued++; + + if (enqueued === 2) { + process.nextTick(resolve); + } + }); + }); + + const queries = [ + pool.query('SELECT SLEEP(2)'), + pool.query('SELECT SLEEP(2)'), + pool.query('SELECT SLEEP(2)'), + pool.query('SELECT SLEEP(2)'), + ]; + const activeResultsPromise = Promise.allSettled(queries.slice(0, 2)); + const queuedResultsPromise = Promise.allSettled(queries.slice(2)); + + await waitForQueuedQueries; + + strict.equal(getConnectionQueueLength(pool), 2); + + const endPromise = pool.end(); + + const queuedResults = await Promise.race([ + queuedResultsPromise, + timeoutAfter( + 1000, + `timed out waiting for queued queries to settle; queue length=${getConnectionQueueLength(pool)}` + ), + ]); + + for (const result of queuedResults) { + strict.equal(result.status, 'rejected'); + + if (result.status === 'rejected') { + strict.equal(result.reason.message, 'Pool is closed.'); + } + } + + strict.equal(getConnectionQueueLength(pool), 0); + + const activeResults = await activeResultsPromise; + + for (const result of activeResults) { + strict.equal(result.status, 'fulfilled'); + } + + await endPromise; + }); +});