Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/base/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ class BasePool extends EventEmitter {
}
};
}
while (this._connectionQueue.length > 0) {
const queuedCallback = this._connectionQueue.shift();
process.nextTick(() => queuedCallback(new Error('Pool is closed.')));
}
let calledBack = false;
let closedConnections = 0;
let connection;
Expand Down
81 changes: 81 additions & 0 deletions test/integration/test-pool-end.test.mts
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { describe, it, strict } from 'poku';
import { createPool } from '../common.test.mjs';

function timeoutAfter(ms: number, message: string): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => reject(new Error(message)), ms);
});
}

function getConnectionQueueLength(pool: unknown): number {
// @ts-expect-error: internal access
return pool.pool._connectionQueue.length;
}

await describe('Pool End', async () => {
const pool = createPool();

Expand Down Expand Up @@ -53,3 +64,73 @@ await describe('Pool end should close all connections and mark as closed', async
strict(pool._closed === true, 'pool should be closed');
});
});

await describe('Promise Pool End', async () => {
await it('should reject queued queries when ending a saturated pool', async () => {
const pool = createPool({
connectionLimit: 2,
gracefulEnd: true,
}).promise();

const warmedConnections = await Promise.all([
pool.getConnection(),
pool.getConnection(),
]);

for (const conn of warmedConnections) {
conn.release();
}

let enqueued = 0;
const waitForQueuedQueries = new Promise<void>((resolve) => {
pool.on('enqueue', () => {
enqueued++;

if (enqueued === 2) {
process.nextTick(resolve);
}
});
});

const queries = [
pool.query('SELECT SLEEP(2)'),
pool.query('SELECT SLEEP(2)'),
pool.query('SELECT SLEEP(2)'),
pool.query('SELECT SLEEP(2)'),
];
const activeResultsPromise = Promise.allSettled(queries.slice(0, 2));
const queuedResultsPromise = Promise.allSettled(queries.slice(2));

await waitForQueuedQueries;

strict.equal(getConnectionQueueLength(pool), 2);

const endPromise = pool.end();

const queuedResults = await Promise.race([
queuedResultsPromise,
timeoutAfter(
1000,
`timed out waiting for queued queries to settle; queue length=${getConnectionQueueLength(pool)}`
),
]);

for (const result of queuedResults) {
strict.equal(result.status, 'rejected');

if (result.status === 'rejected') {
strict.equal(result.reason.message, 'Pool is closed.');
}
}

strict.equal(getConnectionQueueLength(pool), 0);

const activeResults = await activeResultsPromise;

for (const result of activeResults) {
strict.equal(result.status, 'fulfilled');
}

await endPromise;
});
});
Loading