Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/api/Pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Extends: [`ClientOptions`](/docs/docs/api/Client.md#parameter-clientoptions)
* **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Client(origin, opts)`
* **connections** `number | null` (optional) - Default: `null` - The number of `Client` instances to create. When set to `null`, the `Pool` instance will create an unlimited amount of `Client` instances.
* **clientTtl** `number | null` (optional) - Default: `null` - The amount of time before a `Client` instance is removed from the `Pool` and closed. When set to `null`, `Client` instances will not be removed or closed based on age.
* **maxIdle** `number` (optional) - Default: `Infinity` - Limits the number of open connections that do not have an active request.

## Instance Properties

Expand Down
3 changes: 2 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ module.exports = {
kPingInterval: Symbol('ping interval'),
kNoProxyAgent: Symbol('no proxy agent'),
kHttpProxyAgent: Symbol('http proxy agent'),
kHttpsProxyAgent: Symbol('https proxy agent')
kHttpsProxyAgent: Symbol('https proxy agent'),
kMaxIdle: Symbol('max idle')
}
13 changes: 12 additions & 1 deletion lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { PoolStats } = require('../util/stats.js')
const DispatcherBase = require('./dispatcher-base')
const FixedQueue = require('./fixed-queue')
const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('../core/symbols')
const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch, kMaxIdle } = require('../core/symbols')

const kClients = Symbol('clients')
const kNeedDrain = Symbol('needDrain')
Expand Down Expand Up @@ -40,6 +40,17 @@ class PoolBase extends DispatcherBase {
needDrain = !client.dispatch(item.opts, item.handler)
}

if (this[kMaxIdle] < Infinity && this[kFree] > this[kMaxIdle]) {
for (let i = 0; i < this[kClients].length; i++) {
const client = this[kClients][i]
if (!client[kNeedDrain]) {
if (client.ttl && client.ttl > Date.now()) continue
this[kRemoveClient](client)
}
if (this[kFree] <= this[kMaxIdle]) break
}
}

client[kNeedDrain] = needDrain

if (!needDrain && this[kNeedDrain]) {
Expand Down
14 changes: 10 additions & 4 deletions lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
InvalidArgumentError
} = require('../core/errors')
const util = require('../core/util')
const { kUrl } = require('../core/symbols')
const { kUrl, kMaxIdle } = require('../core/symbols')
const buildConnector = require('../core/connect')

const kOptions = Symbol('options')
Expand All @@ -37,6 +37,7 @@ class Pool extends PoolBase {
autoSelectFamilyAttemptTimeout,
allowH2,
clientTtl,
maxIdle = Infinity,
...options
} = {}) {
if (connections != null && (!Number.isFinite(connections) || connections < 0)) {
Expand All @@ -51,6 +52,10 @@ class Pool extends PoolBase {
throw new InvalidArgumentError('connect must be a function or an object')
}

if (typeof maxIdle !== 'number' || Number.isNaN(maxIdle) || maxIdle <= 0) {
throw new InvalidArgumentError('maxIdle must be a number greater than 0')
}

if (typeof connect !== 'function') {
connect = buildConnector({
...tls,
Expand All @@ -67,16 +72,17 @@ class Pool extends PoolBase {

this[kConnections] = connections || null
this[kUrl] = util.parseOrigin(origin)
this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl }
this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl, maxIdle }
this[kOptions].interceptors = options.interceptors
? { ...options.interceptors }
: undefined
this[kFactory] = factory
this[kMaxIdle] = maxIdle

this.on('connect', (origin, targets) => {
if (clientTtl != null && clientTtl > 0) {
for (const target of targets) {
Object.assign(target, { ttl: Date.now() })
Object.assign(target, { ttl: Date.now() + clientTtl })
}
}
})
Expand All @@ -100,7 +106,7 @@ class Pool extends PoolBase {
const clientTtlOption = this[kOptions].clientTtl
for (const client of this[kClients]) {
// check ttl of client and if it's stale, remove it from the pool
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && (client.ttl <= Date.now())) {
this[kRemoveClient](client)
} else if (!client[kNeedDrain]) {
return client
Expand Down
4 changes: 2 additions & 2 deletions lib/dispatcher/round-robin-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RoundRobinPool extends PoolBase {
this.on('connect', (origin, targets) => {
if (clientTtl != null && clientTtl > 0) {
for (const target of targets) {
Object.assign(target, { ttl: Date.now() })
Object.assign(target, { ttl: Date.now() + clientTtl })
}
}
})
Expand Down Expand Up @@ -111,7 +111,7 @@ class RoundRobinPool extends PoolBase {
const client = this[kClients][this[kIndex]]

// Check if client is stale (TTL expired)
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && (client.ttl <= Date.now())) {
this[kRemoveClient](client)
checked++
continue
Expand Down
29 changes: 29 additions & 0 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,35 @@ test('emit disconnect after destroy', async t => {
await t.completed
})

test('emit idle', async t => {
t = tspl(t, { plan: 1 })

const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
req.pipe(res)
})
after(() => server.close())

server.listen(0, () => {
const url = new URL(`http://localhost:${server.address().port}`)
const client = new Client(url)

t.strictEqual(client[kConnected], false)
client[kConnect](() => {
t.strictEqual(client[kConnected], true)
let idle = false
client.on('idle', () => {
idle = true
t.ok(true, 'pass')
})
client.destroy(() => {
t.strictEqual(idle, true)
})
})
})

await t.completed
})

test('end response before request', async t => {
t = tspl(t, { plan: 2 })

Expand Down
120 changes: 118 additions & 2 deletions test/node-test/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
getGlobalDispatcher
} = require('../..')
const { tspl } = require('@matteo.collina/tspl')
const { closeServerAsPromise } = require('../utils/node-http')
const { closeServerAsPromise, closeClientAndServerAsPromise } = require('../utils/node-http')

describe('setGlobalDispatcher', () => {
after(() => {
Expand Down Expand Up @@ -77,6 +77,112 @@ test('Agent enforces maxOrigins', async (t) => {
await p.completed
})

test('Agent passes maxIdle to each Pool (per-origin limit)', async (t) => {
const p = tspl(t, { plan: 2 })
const dispatcher = new Agent({
maxIdle: 3,
keepAliveTimeout: 1000
})

const handler = (_req, res) => res.end('ok')
const server1 = http.createServer({ joinDuplicateHeaders: true }, handler)
const server2 = http.createServer({ joinDuplicateHeaders: true }, handler)

server1.listen(0)
await once(server1, 'listening')
t.after(closeServerAsPromise(server1))

server2.listen(0)
await once(server2, 'listening')
t.after(closeServerAsPromise(server2))

const host1 = `http://localhost:${server1.address().port}`
const host2 = `http://localhost:${server2.address().port}`

await Promise.all([
request(host1, { dispatcher }),
request(host1, { dispatcher }),
request(host1, { dispatcher }),
request(host1, { dispatcher }),
request(host1, { dispatcher }),
request(host2, { dispatcher }),
request(host2, { dispatcher }),
request(host2, { dispatcher }),
request(host2, { dispatcher }),
request(host2, { dispatcher })
])

await new Promise(resolve => setTimeout(resolve, 10))

const host1Free = dispatcher.stats[host1]?.free ?? 0
p.ok(host1Free <= 3, `Host 1 should have 3 or fewer free connections, found ${host1Free}`)
const host2Free = dispatcher.stats[host2]?.free ?? 0
p.ok(host2Free <= 3, `Host 2 should have 3 or fewer free connections, found ${host2Free}`)

await p.completed
})

test('Pool enforces maxIdle', async (t) => {
const p = tspl(t, { plan: 1 })

const handler = (_req, res) => res.end('ok')
const server = http.createServer({ joinDuplicateHeaders: true }, handler)
server.listen(0)
await once(server, 'listening')

const host = `http://localhost:${server.address().port}`
const dispatcher = new Pool(host, {
maxIdle: 3,
keepAliveTimeout: 1000
})

t.after(closeClientAndServerAsPromise(dispatcher, server))

await Promise.all([
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher })
])

await new Promise(resolve => setTimeout(resolve, 1))
p.ok(dispatcher.stats.free <= 3, `Should be 3 or fewer free connections, found ${dispatcher.stats.free}`)

await p.completed
})

test('maxIdle respects clientTtl', async (t) => {
const p = tspl(t, { plan: 1 })

const handler = (_req, res) => res.end('ok')
const server = http.createServer({ joinDuplicateHeaders: true }, handler)
server.listen(0)
await once(server, 'listening')

const host = `http://localhost:${server.address().port}`
const dispatcher = new Pool(host, {
maxIdle: 3,
clientTtl: 2000,
keepAliveTimeout: 1000
})

t.after(closeClientAndServerAsPromise(dispatcher, server))

await Promise.all([
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher }),
request(host, { dispatcher })
])

await new Promise(resolve => setTimeout(resolve, 1))
p.equal(dispatcher.stats.free, 5)

await p.completed
})

test('agent should call callback after closing internal pools', async (t) => {
const p = tspl(t, { plan: 2 })

Expand Down Expand Up @@ -698,13 +804,23 @@ test('stream: fails with invalid onInfo', async (t) => {
}
})

test('constructor validations', t => {
test('Agent constructor validations', t => {
const p = tspl(t, { plan: 3 })
p.throws(() => new Agent({ factory: 'ASD' }), errors.InvalidArgumentError, 'throws on invalid opts argument')
p.throws(() => new Agent({ maxOrigins: -1 }), errors.InvalidArgumentError, 'maxOrigins must be a number greater than 0')
p.throws(() => new Agent({ maxOrigins: 'foo' }), errors.InvalidArgumentError, 'maxOrigins must be a number greater than 0')
})

test('Pool constructor validatons', t => {
const p = tspl(t, { plan: 6 })
p.throws(() => new Pool({ connections: 'ASD' }), errors.InvalidArgumentError, 'throws on non-number')
p.throws(() => new Pool({ connect: 'ASD' }), errors.InvalidArgumentError, 'throws if not a function or object')
p.throws(() => new Pool({ factory: 'ASD' }), errors.InvalidArgumentError, 'throws on non-function')
p.throws(() => new Pool({ maxIdle: -1 }), errors.InvalidArgumentError, 'throws on negative number')
p.throws(() => new Pool({ maxIdle: 'foo' }), errors.InvalidArgumentError, 'throws on non-number')
p.throws(() => new Pool({ maxIdle: NaN }), errors.InvalidArgumentError, 'throws on NaN')
})

test('dispatch validations', async t => {
const dispatcher = new Agent()

Expand Down
2 changes: 2 additions & 0 deletions types/pool.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ declare namespace Pool {
connections?: number | null;
/** The amount of time before a client is removed from the pool and closed. `null` if no time limit. Default `null` */
clientTtl?: number | null;
/** The max number of idle connections to keep open per origin. Default `Infinity`. */
maxIdle?: number;

interceptors?: { Pool?: readonly Dispatcher.DispatchInterceptor[] } & Client.Options['interceptors']
}
Expand Down
Loading