diff --git a/docs/docs/api/Pool.md b/docs/docs/api/Pool.md index ee0a0d3fe81..c83a37d084d 100644 --- a/docs/docs/api/Pool.md +++ b/docs/docs/api/Pool.md @@ -20,6 +20,7 @@ Extends: [`ClientOptions`](/docs/docs/api/Client.md#parameter-clientoptions) * **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Client(origin, opts)` * **connections** `number | null` (optional) - Default: `null` - The number of `Client` instances to create. When set to `null`, the `Pool` instance will create an unlimited amount of `Client` instances. * **clientTtl** `number | null` (optional) - Default: `null` - The amount of time before a `Client` instance is removed from the `Pool` and closed. When set to `null`, `Client` instances will not be removed or closed based on age. +* **maxIdle** `number` (optional) - Default: `Infinity` - Limits the number of open connections that do not have an active request. ## Instance Properties diff --git a/lib/core/symbols.js b/lib/core/symbols.js index fd7af0c10e1..80227ef7755 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -70,5 +70,6 @@ module.exports = { kPingInterval: Symbol('ping interval'), kNoProxyAgent: Symbol('no proxy agent'), kHttpProxyAgent: Symbol('http proxy agent'), - kHttpsProxyAgent: Symbol('https proxy agent') + kHttpsProxyAgent: Symbol('https proxy agent'), + kMaxIdle: Symbol('max idle') } diff --git a/lib/dispatcher/pool-base.js b/lib/dispatcher/pool-base.js index 6c1f2388766..a36a298e613 100644 --- a/lib/dispatcher/pool-base.js +++ b/lib/dispatcher/pool-base.js @@ -3,7 +3,7 @@ const { PoolStats } = require('../util/stats.js') const DispatcherBase = require('./dispatcher-base') const FixedQueue = require('./fixed-queue') -const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('../core/symbols') +const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch, kMaxIdle } = require('../core/symbols') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') @@ -40,6 +40,17 @@ class PoolBase extends DispatcherBase { needDrain = !client.dispatch(item.opts, item.handler) } + if (this[kMaxIdle] < Infinity && this[kFree] > this[kMaxIdle]) { + for (let i = 0; i < this[kClients].length; i++) { + const client = this[kClients][i] + if (!client[kNeedDrain]) { + if (client.ttl && client.ttl > Date.now()) continue + this[kRemoveClient](client) + } + if (this[kFree] <= this[kMaxIdle]) break + } + } + client[kNeedDrain] = needDrain if (!needDrain && this[kNeedDrain]) { diff --git a/lib/dispatcher/pool.js b/lib/dispatcher/pool.js index 77fd5326696..964decd1219 100644 --- a/lib/dispatcher/pool.js +++ b/lib/dispatcher/pool.js @@ -13,7 +13,7 @@ const { InvalidArgumentError } = require('../core/errors') const util = require('../core/util') -const { kUrl } = require('../core/symbols') +const { kUrl, kMaxIdle } = require('../core/symbols') const buildConnector = require('../core/connect') const kOptions = Symbol('options') @@ -37,6 +37,7 @@ class Pool extends PoolBase { autoSelectFamilyAttemptTimeout, allowH2, clientTtl, + maxIdle = Infinity, ...options } = {}) { if (connections != null && (!Number.isFinite(connections) || connections < 0)) { @@ -51,6 +52,10 @@ class Pool extends PoolBase { throw new InvalidArgumentError('connect must be a function or an object') } + if (typeof maxIdle !== 'number' || Number.isNaN(maxIdle) || maxIdle <= 0) { + throw new InvalidArgumentError('maxIdle must be a number greater than 0') + } + if (typeof connect !== 'function') { connect = buildConnector({ ...tls, @@ -67,16 +72,17 @@ class Pool extends PoolBase { this[kConnections] = connections || null this[kUrl] = util.parseOrigin(origin) - this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl } + this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl, maxIdle } this[kOptions].interceptors = options.interceptors ? { ...options.interceptors } : undefined this[kFactory] = factory + this[kMaxIdle] = maxIdle this.on('connect', (origin, targets) => { if (clientTtl != null && clientTtl > 0) { for (const target of targets) { - Object.assign(target, { ttl: Date.now() }) + Object.assign(target, { ttl: Date.now() + clientTtl }) } } }) @@ -100,7 +106,7 @@ class Pool extends PoolBase { const clientTtlOption = this[kOptions].clientTtl for (const client of this[kClients]) { // check ttl of client and if it's stale, remove it from the pool - if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) { + if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && (client.ttl <= Date.now())) { this[kRemoveClient](client) } else if (!client[kNeedDrain]) { return client diff --git a/lib/dispatcher/round-robin-pool.js b/lib/dispatcher/round-robin-pool.js index b113aa9d039..c89d7f0e3f3 100644 --- a/lib/dispatcher/round-robin-pool.js +++ b/lib/dispatcher/round-robin-pool.js @@ -78,7 +78,7 @@ class RoundRobinPool extends PoolBase { this.on('connect', (origin, targets) => { if (clientTtl != null && clientTtl > 0) { for (const target of targets) { - Object.assign(target, { ttl: Date.now() }) + Object.assign(target, { ttl: Date.now() + clientTtl }) } } }) @@ -111,7 +111,7 @@ class RoundRobinPool extends PoolBase { const client = this[kClients][this[kIndex]] // Check if client is stale (TTL expired) - if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) { + if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && (client.ttl <= Date.now())) { this[kRemoveClient](client) checked++ continue diff --git a/test/client.js b/test/client.js index 1627ad4fe99..2f7385f775d 100644 --- a/test/client.js +++ b/test/client.js @@ -1671,6 +1671,35 @@ test('emit disconnect after destroy', async t => { await t.completed }) +test('emit idle', async t => { + t = tspl(t, { plan: 1 }) + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + req.pipe(res) + }) + after(() => server.close()) + + server.listen(0, () => { + const url = new URL(`http://localhost:${server.address().port}`) + const client = new Client(url) + + t.strictEqual(client[kConnected], false) + client[kConnect](() => { + t.strictEqual(client[kConnected], true) + let idle = false + client.on('idle', () => { + idle = true + t.ok(true, 'pass') + }) + client.destroy(() => { + t.strictEqual(idle, true) + }) + }) + }) + + await t.completed +}) + test('end response before request', async t => { t = tspl(t, { plan: 2 }) diff --git a/test/node-test/agent.js b/test/node-test/agent.js index d9aee46b597..a8aba59d8d7 100644 --- a/test/node-test/agent.js +++ b/test/node-test/agent.js @@ -17,7 +17,7 @@ const { getGlobalDispatcher } = require('../..') const { tspl } = require('@matteo.collina/tspl') -const { closeServerAsPromise } = require('../utils/node-http') +const { closeServerAsPromise, closeClientAndServerAsPromise } = require('../utils/node-http') describe('setGlobalDispatcher', () => { after(() => { @@ -77,6 +77,112 @@ test('Agent enforces maxOrigins', async (t) => { await p.completed }) +test('Agent passes maxIdle to each Pool (per-origin limit)', async (t) => { + const p = tspl(t, { plan: 2 }) + const dispatcher = new Agent({ + maxIdle: 3, + keepAliveTimeout: 1000 + }) + + const handler = (_req, res) => res.end('ok') + const server1 = http.createServer({ joinDuplicateHeaders: true }, handler) + const server2 = http.createServer({ joinDuplicateHeaders: true }, handler) + + server1.listen(0) + await once(server1, 'listening') + t.after(closeServerAsPromise(server1)) + + server2.listen(0) + await once(server2, 'listening') + t.after(closeServerAsPromise(server2)) + + const host1 = `http://localhost:${server1.address().port}` + const host2 = `http://localhost:${server2.address().port}` + + await Promise.all([ + request(host1, { dispatcher }), + request(host1, { dispatcher }), + request(host1, { dispatcher }), + request(host1, { dispatcher }), + request(host1, { dispatcher }), + request(host2, { dispatcher }), + request(host2, { dispatcher }), + request(host2, { dispatcher }), + request(host2, { dispatcher }), + request(host2, { dispatcher }) + ]) + + await new Promise(resolve => setTimeout(resolve, 10)) + + const host1Free = dispatcher.stats[host1]?.free ?? 0 + p.ok(host1Free <= 3, `Host 1 should have 3 or fewer free connections, found ${host1Free}`) + const host2Free = dispatcher.stats[host2]?.free ?? 0 + p.ok(host2Free <= 3, `Host 2 should have 3 or fewer free connections, found ${host2Free}`) + + await p.completed +}) + +test('Pool enforces maxIdle', async (t) => { + const p = tspl(t, { plan: 1 }) + + const handler = (_req, res) => res.end('ok') + const server = http.createServer({ joinDuplicateHeaders: true }, handler) + server.listen(0) + await once(server, 'listening') + + const host = `http://localhost:${server.address().port}` + const dispatcher = new Pool(host, { + maxIdle: 3, + keepAliveTimeout: 1000 + }) + + t.after(closeClientAndServerAsPromise(dispatcher, server)) + + await Promise.all([ + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }) + ]) + + await new Promise(resolve => setTimeout(resolve, 1)) + p.ok(dispatcher.stats.free <= 3, `Should be 3 or fewer free connections, found ${dispatcher.stats.free}`) + + await p.completed +}) + +test('maxIdle respects clientTtl', async (t) => { + const p = tspl(t, { plan: 1 }) + + const handler = (_req, res) => res.end('ok') + const server = http.createServer({ joinDuplicateHeaders: true }, handler) + server.listen(0) + await once(server, 'listening') + + const host = `http://localhost:${server.address().port}` + const dispatcher = new Pool(host, { + maxIdle: 3, + clientTtl: 2000, + keepAliveTimeout: 1000 + }) + + t.after(closeClientAndServerAsPromise(dispatcher, server)) + + await Promise.all([ + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }), + request(host, { dispatcher }) + ]) + + await new Promise(resolve => setTimeout(resolve, 1)) + p.equal(dispatcher.stats.free, 5) + + await p.completed +}) + test('agent should call callback after closing internal pools', async (t) => { const p = tspl(t, { plan: 2 }) @@ -698,13 +804,23 @@ test('stream: fails with invalid onInfo', async (t) => { } }) -test('constructor validations', t => { +test('Agent constructor validations', t => { const p = tspl(t, { plan: 3 }) p.throws(() => new Agent({ factory: 'ASD' }), errors.InvalidArgumentError, 'throws on invalid opts argument') p.throws(() => new Agent({ maxOrigins: -1 }), errors.InvalidArgumentError, 'maxOrigins must be a number greater than 0') p.throws(() => new Agent({ maxOrigins: 'foo' }), errors.InvalidArgumentError, 'maxOrigins must be a number greater than 0') }) +test('Pool constructor validatons', t => { + const p = tspl(t, { plan: 6 }) + p.throws(() => new Pool({ connections: 'ASD' }), errors.InvalidArgumentError, 'throws on non-number') + p.throws(() => new Pool({ connect: 'ASD' }), errors.InvalidArgumentError, 'throws if not a function or object') + p.throws(() => new Pool({ factory: 'ASD' }), errors.InvalidArgumentError, 'throws on non-function') + p.throws(() => new Pool({ maxIdle: -1 }), errors.InvalidArgumentError, 'throws on negative number') + p.throws(() => new Pool({ maxIdle: 'foo' }), errors.InvalidArgumentError, 'throws on non-number') + p.throws(() => new Pool({ maxIdle: NaN }), errors.InvalidArgumentError, 'throws on NaN') +}) + test('dispatch validations', async t => { const dispatcher = new Agent() diff --git a/types/pool.d.ts b/types/pool.d.ts index 120bb8b25ef..4a7832830a1 100644 --- a/types/pool.d.ts +++ b/types/pool.d.ts @@ -35,6 +35,8 @@ declare namespace Pool { connections?: number | null; /** The amount of time before a client is removed from the pool and closed. `null` if no time limit. Default `null` */ clientTtl?: number | null; + /** The max number of idle connections to keep open per origin. Default `Infinity`. */ + maxIdle?: number; interceptors?: { Pool?: readonly Dispatcher.DispatchInterceptor[] } & Client.Options['interceptors'] }