Skip to content
This repository was archived by the owner on Dec 30, 2019. It is now read-only.

Commit 7b2d29d

Browse files
Fix two timeout races
1 parent 4b9669e commit 7b2d29d

File tree

2 files changed

+136
-24
lines changed

2 files changed

+136
-24
lines changed

index.js

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,6 @@ const EventEmitter = require('events').EventEmitter
33

44
const NOOP = function () { }
55

6-
const remove = (list, value) => {
7-
const i = list.indexOf(value)
8-
9-
if (i !== -1) {
10-
list.splice(i, 1)
11-
}
12-
}
13-
146
const removeWhere = (list, predicate) => {
157
const i = list.findIndex(predicate)
168

@@ -26,6 +18,12 @@ class IdleItem {
2618
}
2719
}
2820

21+
class PendingItem {
22+
constructor (callback) {
23+
this.callback = callback
24+
}
25+
}
26+
2927
function throwOnRelease () {
3028
throw new Error('Release called on client which has already been released to the pool.')
3129
}
@@ -85,6 +83,7 @@ class Pool extends EventEmitter {
8583
this._pendingQueue = []
8684
this._endCallback = undefined
8785
this.ending = false
86+
this.ended = false
8887
}
8988

9089
_isFull () {
@@ -93,6 +92,10 @@ class Pool extends EventEmitter {
9392

9493
_pulseQueue () {
9594
this.log('pulse queue')
95+
if (this.ended) {
96+
this.log('pulse queue ended')
97+
return
98+
}
9699
if (this.ending) {
97100
this.log('pulse queue on ending')
98101
if (this._idle.length) {
@@ -101,6 +104,7 @@ class Pool extends EventEmitter {
101104
})
102105
}
103106
if (!this._clients.length) {
107+
this.ended = true
104108
this._endCallback()
105109
}
106110
return
@@ -121,10 +125,10 @@ class Pool extends EventEmitter {
121125
const client = idleItem.client
122126
client.release = release.bind(this, client)
123127
this.emit('acquire', client)
124-
return waiter(undefined, client, client.release)
128+
return waiter.callback(undefined, client, client.release)
125129
}
126130
if (!this._isFull()) {
127-
return this.connect(waiter)
131+
return this.newClient(waiter)
128132
}
129133
throw new Error('unexpected condition')
130134
}
@@ -150,18 +154,18 @@ class Pool extends EventEmitter {
150154
return cb ? cb(err) : this.Promise.reject(err)
151155
}
152156

157+
const response = promisify(this.Promise, cb)
158+
const result = response.result
159+
153160
// if we don't have to connect a new client, don't do so
154161
if (this._clients.length >= this.options.max || this._idle.length) {
155-
const response = promisify(this.Promise, cb)
156-
const result = response.result
157-
158162
// if we have idle clients schedule a pulse immediately
159163
if (this._idle.length) {
160164
process.nextTick(() => this._pulseQueue())
161165
}
162166

163167
if (!this.options.connectionTimeoutMillis) {
164-
this._pendingQueue.push(response.callback)
168+
this._pendingQueue.push(new PendingItem(response.callback))
165169
return result
166170
}
167171

@@ -170,18 +174,27 @@ class Pool extends EventEmitter {
170174
response.callback(err, res, done)
171175
}
172176

177+
const pendingItem = new PendingItem(queueCallback)
178+
173179
// set connection timeout on checking out an existing client
174180
const tid = setTimeout(() => {
175181
// remove the callback from pending waiters because
176182
// we're going to call it with a timeout error
177-
remove(this._pendingQueue, queueCallback)
183+
removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
184+
pendingItem.timedOut = true
178185
response.callback(new Error('timeout exceeded when trying to connect'))
179186
}, this.options.connectionTimeoutMillis)
180187

181-
this._pendingQueue.push(queueCallback)
188+
this._pendingQueue.push(pendingItem)
182189
return result
183190
}
184191

192+
this.newClient(new PendingItem(response.callback))
193+
194+
return result
195+
}
196+
197+
newClient (pendingItem) {
185198
const client = new this.Client(this.options)
186199
this._clients.push(client)
187200
const idleListener = (err) => {
@@ -210,9 +223,6 @@ class Pool extends EventEmitter {
210223
}, this.options.connectionTimeoutMillis)
211224
}
212225

213-
const response = promisify(this.Promise, cb)
214-
cb = response.callback
215-
216226
this.log('connecting new client')
217227
client.connect((err) => {
218228
if (tid) {
@@ -223,23 +233,34 @@ class Pool extends EventEmitter {
223233
this.log('client failed to connect', err)
224234
// remove the dead client from our list of clients
225235
this._clients = this._clients.filter(c => c !== client)
236+
process.nextTick(() => this._pulseQueue())
226237
if (timeoutHit) {
227238
err.message = 'Connection terminated due to connection timeout'
228239
}
229-
cb(err, undefined, NOOP)
240+
241+
if (!pendingItem.timedOut) {
242+
pendingItem.callback(err, undefined, NOOP)
243+
}
230244
} else {
231245
this.log('new client connected')
232246
client.release = release.bind(this, client)
233247
this.emit('connect', client)
234248
this.emit('acquire', client)
235-
if (this.options.verify) {
236-
this.options.verify(client, cb)
249+
if (!pendingItem.timedOut) {
250+
if (this.options.verify) {
251+
this.options.verify(client, pendingItem.callback)
252+
} else {
253+
pendingItem.callback(undefined, client, client.release)
254+
}
237255
} else {
238-
cb(undefined, client, client.release)
256+
if (this.options.verify) {
257+
this.options.verify(client, client.release)
258+
} else {
259+
client.release()
260+
}
239261
}
240262
}
241263
})
242-
return response.result
243264
}
244265

245266
query (text, values, cb) {

test/connection-timeout.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const describe = require('mocha').describe
77
const it = require('mocha').it
88
const before = require('mocha').before
99
const after = require('mocha').after
10+
const connectionFailure = new Error('Temporary connection failure')
1011

1112
const Pool = require('../')
1213

@@ -126,4 +127,94 @@ describe('connection timeout', () => {
126127
})
127128
})
128129
})
130+
131+
it('continues processing after a connection failure', (done) => {
132+
const Client = require('pg').Client
133+
const orgConnect = Client.prototype.connect
134+
let called = false
135+
136+
Client.prototype.connect = function (cb) {
137+
// Simulate a failure on first call
138+
if (!called) {
139+
called = true
140+
141+
return setTimeout(() => {
142+
cb(connectionFailure)
143+
}, 100)
144+
}
145+
// And pass-through the second call
146+
orgConnect.call(this, cb)
147+
}
148+
149+
const pool = new Pool({
150+
Client: Client,
151+
connectionTimeoutMillis: 1000,
152+
max: 1
153+
})
154+
155+
pool.connect((err, client, release) => {
156+
expect(err).to.be(connectionFailure)
157+
158+
pool.query('select $1::text as name', ['brianc'], (err, res) => {
159+
expect(err).to.be(undefined)
160+
expect(res.rows).to.have.length(1)
161+
pool.end(done)
162+
})
163+
})
164+
})
165+
166+
it('releases newly connected clients if the queued already timed out', (done) => {
167+
const Client = require('pg').Client
168+
169+
const orgConnect = Client.prototype.connect
170+
171+
let connection = 0
172+
173+
Client.prototype.connect = function (cb) {
174+
// Simulate a failure on first call
175+
if (connection === 0) {
176+
connection++
177+
178+
return setTimeout(() => {
179+
cb(connectionFailure)
180+
}, 300)
181+
}
182+
183+
// And second connect taking > connection timeout
184+
if (connection === 1) {
185+
connection++
186+
187+
return setTimeout(() => {
188+
orgConnect.call(this, cb)
189+
}, 1000)
190+
}
191+
192+
orgConnect.call(this, cb)
193+
}
194+
195+
const pool = new Pool({
196+
Client: Client,
197+
connectionTimeoutMillis: 1000,
198+
max: 1
199+
})
200+
201+
// Direct connect
202+
pool.connect((err, client, release) => {
203+
expect(err).to.be(connectionFailure)
204+
})
205+
206+
// Queued
207+
let called = 0
208+
pool.connect((err, client, release) => {
209+
// Verify the callback is only called once
210+
expect(called++).to.be(0)
211+
expect(err).to.be.an(Error)
212+
213+
pool.query('select $1::text as name', ['brianc'], (err, res) => {
214+
expect(err).to.be(undefined)
215+
expect(res.rows).to.have.length(1)
216+
pool.end(done)
217+
})
218+
})
219+
})
129220
})

0 commit comments

Comments
 (0)