Skip to content

Commit 0ca054a

Browse files
authored
fix: replace stale pool clients under connection limit (#5145)
1 parent 7af90e9 commit 0ca054a

5 files changed

Lines changed: 89 additions & 13 deletions

File tree

lib/dispatcher/pool-base.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,12 @@ class PoolBase extends DispatcherBase {
189189
}
190190

191191
[kRemoveClient] (client) {
192-
client.close(() => {
193-
const idx = this[kClients].indexOf(client)
194-
if (idx !== -1) {
195-
this[kClients].splice(idx, 1)
196-
}
197-
})
192+
const idx = this[kClients].indexOf(client)
193+
if (idx !== -1) {
194+
this[kClients].splice(idx, 1)
195+
}
196+
197+
client.close(() => {})
198198

199199
this[kNeedDrain] = this[kClients].some(dispatcher => (
200200
!dispatcher[kNeedDrain] &&

lib/dispatcher/pool.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,13 @@ class Pool extends PoolBase {
9797

9898
[kGetDispatcher] () {
9999
const clientTtlOption = this[kOptions].clientTtl
100-
for (const client of this[kClients]) {
100+
for (let i = 0; i < this[kClients].length; i++) {
101+
const client = this[kClients][i]
102+
101103
// check ttl of client and if it's stale, remove it from the pool
102104
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
103105
this[kRemoveClient](client)
106+
i--
104107
} else if (!client[kNeedDrain]) {
105108
return client
106109
}

lib/dispatcher/round-robin-pool.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,24 @@ class RoundRobinPool extends PoolBase {
9292

9393
[kGetDispatcher] () {
9494
const clientTtlOption = this[kOptions].clientTtl
95-
const clientsLength = this[kClients].length
9695

9796
// If we have no clients yet, create one
98-
if (clientsLength === 0) {
97+
if (this[kClients].length === 0) {
9998
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
10099
this[kAddClient](dispatcher)
101100
return dispatcher
102101
}
103102

104103
// Round-robin through existing clients
105104
let checked = 0
106-
while (checked < clientsLength) {
107-
this[kIndex] = (this[kIndex] + 1) % clientsLength
105+
while (checked < this[kClients].length) {
106+
this[kIndex] = (this[kIndex] + 1) % this[kClients].length
108107
const client = this[kClients][this[kIndex]]
109108

110109
// Check if client is stale (TTL expired)
111110
if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
112111
this[kRemoveClient](client)
113-
checked++
112+
this[kIndex]--
114113
continue
115114
}
116115

@@ -123,7 +122,7 @@ class RoundRobinPool extends PoolBase {
123122
}
124123

125124
// All clients are busy, create a new one if we haven't reached the limit
126-
if (!this[kConnections] || clientsLength < this[kConnections]) {
125+
if (!this[kConnections] || this[kClients].length < this[kConnections]) {
127126
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
128127
this[kAddClient](dispatcher)
129128
return dispatcher

test/pool.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,43 @@ test('pool connect with clientTtl specified', async (t) => {
652652
await t.completed
653653
})
654654

655+
test('pool replaces stale client when connections limit is reached', async (t) => {
656+
t = tspl(t, { plan: 2 })
657+
658+
const server = createServer((req, res) => {
659+
res.end('ok')
660+
})
661+
after(() => server.close())
662+
663+
await new Promise(resolve => server.listen(0, resolve))
664+
665+
const client = new Pool(`http://localhost:${server.address().port}`, {
666+
connections: 1,
667+
clientTtl: 1
668+
})
669+
after(() => client.destroy())
670+
671+
async function request () {
672+
const { statusCode, body } = await client.request({
673+
path: '/',
674+
method: 'GET'
675+
})
676+
await body.text()
677+
return statusCode
678+
}
679+
680+
t.strictEqual(await request(), 200)
681+
await new Promise(resolve => setTimeout(resolve, 20))
682+
683+
const statusCode = await Promise.race([
684+
request(),
685+
new Promise((_resolve, reject) => {
686+
setTimeout(() => reject(new Error('second request hung')), 1000)
687+
})
688+
])
689+
t.strictEqual(statusCode, 200)
690+
})
691+
655692
test('pool dispatch', async (t) => {
656693
t = tspl(t, { plan: 2 })
657694

test/round-robin-pool.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,43 @@ test('basic get', async (t) => {
9696
await t.completed
9797
})
9898

99+
test('replaces stale client when connections limit is reached', async (t) => {
100+
t = tspl(t, { plan: 2 })
101+
102+
const server = createServer((req, res) => {
103+
res.end('ok')
104+
})
105+
after(() => server.close())
106+
107+
await new Promise(resolve => server.listen(0, resolve))
108+
109+
const pool = new RoundRobinPool(`http://localhost:${server.address().port}`, {
110+
connections: 1,
111+
clientTtl: 1
112+
})
113+
after(() => pool.destroy())
114+
115+
async function request () {
116+
const { statusCode, body } = await pool.request({
117+
path: '/',
118+
method: 'GET'
119+
})
120+
await body.text()
121+
return statusCode
122+
}
123+
124+
t.strictEqual(await request(), 200)
125+
await new Promise(resolve => setTimeout(resolve, 20))
126+
127+
const statusCode = await Promise.race([
128+
request(),
129+
new Promise((_resolve, reject) => {
130+
setTimeout(() => reject(new Error('second request hung')), 1000)
131+
})
132+
])
133+
t.strictEqual(statusCode, 200)
134+
})
135+
99136
test('connect/disconnect event(s)', async (t) => {
100137
const clients = 2
101138

0 commit comments

Comments
 (0)