Skip to content

Commit 0c7040e

Browse files
authored
Add onConnect callback which allows "setup" of a pooled client (#3620)
* wip * Initial connect lifecycle working * Connect hook working * Rename 'hooks.bla' to 'onBla' * Add more tests * More cleanup testing * Use promise.try
1 parent ad36e3c commit 0c7040e

3 files changed

Lines changed: 211 additions & 17 deletions

File tree

packages/pg-pool/index.js

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ class Pool extends EventEmitter {
108108
this.ended = false
109109
}
110110

111+
_promiseTry(f) {
112+
const Promise = this.Promise
113+
if (typeof Promise.try === 'function') {
114+
return Promise.try(f)
115+
}
116+
return new Promise((resolve) => resolve(f()))
117+
}
118+
111119
_isFull() {
112120
return this._clients.length >= this.options.max
113121
}
@@ -277,30 +285,52 @@ class Pool extends EventEmitter {
277285
} else {
278286
this.log('new client connected')
279287

280-
if (this.options.maxLifetimeSeconds !== 0) {
281-
const maxLifetimeTimeout = setTimeout(() => {
282-
this.log('ending client due to expired lifetime')
283-
this._expired.add(client)
284-
const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client)
285-
if (idleIndex !== -1) {
286-
this._acquireClient(
287-
client,
288-
new PendingItem((err, client, clientRelease) => clientRelease()),
289-
idleListener,
290-
false
291-
)
288+
if (this.options.onConnect) {
289+
this._promiseTry(() => this.options.onConnect(client)).then(
290+
() => {
291+
this._afterConnect(client, pendingItem, idleListener)
292+
},
293+
(hookErr) => {
294+
this._clients = this._clients.filter((c) => c !== client)
295+
client.end(() => {
296+
this._pulseQueue()
297+
if (!pendingItem.timedOut) {
298+
pendingItem.callback(hookErr, undefined, NOOP)
299+
}
300+
})
292301
}
293-
}, this.options.maxLifetimeSeconds * 1000)
294-
295-
maxLifetimeTimeout.unref()
296-
client.once('end', () => clearTimeout(maxLifetimeTimeout))
302+
)
303+
return
297304
}
298305

299-
return this._acquireClient(client, pendingItem, idleListener, true)
306+
return this._afterConnect(client, pendingItem, idleListener)
300307
}
301308
})
302309
}
303310

311+
_afterConnect(client, pendingItem, idleListener) {
312+
if (this.options.maxLifetimeSeconds !== 0) {
313+
const maxLifetimeTimeout = setTimeout(() => {
314+
this.log('ending client due to expired lifetime')
315+
this._expired.add(client)
316+
const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client)
317+
if (idleIndex !== -1) {
318+
this._acquireClient(
319+
client,
320+
new PendingItem((err, client, clientRelease) => clientRelease()),
321+
idleListener,
322+
false
323+
)
324+
}
325+
}, this.options.maxLifetimeSeconds * 1000)
326+
327+
maxLifetimeTimeout.unref()
328+
client.once('end', () => clearTimeout(maxLifetimeTimeout))
329+
}
330+
331+
return this._acquireClient(client, pendingItem, idleListener, true)
332+
}
333+
304334
// acquire a client for a pending work item
305335
_acquireClient(client, pendingItem, idleListener, isNew) {
306336
if (isNew) {
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
const describe = require('mocha').describe
2+
const it = require('mocha').it
3+
const expect = require('expect.js')
4+
5+
const Pool = require('..')
6+
7+
describe('lifecycle hooks', () => {
8+
it('are called on connect', async () => {
9+
const pool = new Pool({
10+
onConnect: (client) => {
11+
client.HOOK_CONNECT_COUNT = (client.HOOK_CONNECT_COUNT || 0) + 1
12+
},
13+
})
14+
const client = await pool.connect()
15+
expect(client.HOOK_CONNECT_COUNT).to.equal(1)
16+
client.release()
17+
const client2 = await pool.connect()
18+
expect(client).to.equal(client2)
19+
expect(client2.HOOK_CONNECT_COUNT).to.equal(1)
20+
client.release()
21+
await pool.end()
22+
})
23+
24+
it('are called on connect with an async hook', async () => {
25+
const pool = new Pool({
26+
onConnect: async (client) => {
27+
const res = await client.query('SELECT 1 AS num')
28+
client.HOOK_CONNECT_RESULT = res.rows[0].num
29+
},
30+
})
31+
const client = await pool.connect()
32+
expect(client.HOOK_CONNECT_RESULT).to.equal(1)
33+
const res = await client.query('SELECT 1 AS num')
34+
expect(res.rows[0].num).to.equal(1)
35+
client.release()
36+
const client2 = await pool.connect()
37+
expect(client).to.equal(client2)
38+
expect(client2.HOOK_CONNECT_RESULT).to.equal(1)
39+
client.release()
40+
await pool.end()
41+
})
42+
43+
it('errors out the connect call if the async connect hook rejects', async () => {
44+
const pool = new Pool({
45+
onConnect: async (client) => {
46+
await client.query('SELECT INVALID HERE')
47+
},
48+
})
49+
try {
50+
await pool.connect()
51+
throw new Error('Expected connect to throw')
52+
} catch (err) {
53+
expect(err.message).to.contain('invalid')
54+
}
55+
await pool.end()
56+
})
57+
58+
it('calls onConnect when using pool.query', async () => {
59+
const pool = new Pool({
60+
onConnect: async (client) => {
61+
const res = await client.query('SELECT 1 AS num')
62+
client.HOOK_CONNECT_RESULT = res.rows[0].num
63+
},
64+
})
65+
const res = await pool.query('SELECT $1::text AS name', ['brianc'])
66+
expect(res.rows[0].name).to.equal('brianc')
67+
const client = await pool.connect()
68+
expect(client.HOOK_CONNECT_RESULT).to.equal(1)
69+
client.release()
70+
await pool.end()
71+
})
72+
73+
it('recovers after a hook error', async () => {
74+
let shouldError = true
75+
const pool = new Pool({
76+
onConnect: () => {
77+
if (shouldError) {
78+
throw new Error('connect hook error')
79+
}
80+
},
81+
})
82+
try {
83+
await pool.connect()
84+
throw new Error('Expected connect to throw')
85+
} catch (err) {
86+
expect(err.message).to.equal('connect hook error')
87+
}
88+
shouldError = false
89+
const client = await pool.connect()
90+
const res = await client.query('SELECT 1 AS num')
91+
expect(res.rows[0].num).to.equal(1)
92+
client.release()
93+
await pool.end()
94+
})
95+
96+
it('calls onConnect for each new client', async () => {
97+
let connectCount = 0
98+
const pool = new Pool({
99+
max: 2,
100+
onConnect: async (client) => {
101+
connectCount++
102+
await client.query('SELECT 1')
103+
},
104+
})
105+
const client1 = await pool.connect()
106+
const client2 = await pool.connect()
107+
expect(connectCount).to.equal(2)
108+
expect(client1).to.not.equal(client2)
109+
client1.release()
110+
client2.release()
111+
await pool.end()
112+
})
113+
114+
it('cleans up clients after repeated hook failures', async () => {
115+
let errorCount = 0
116+
const pool = new Pool({
117+
max: 2,
118+
onConnect: () => {
119+
if (errorCount < 10) {
120+
errorCount++
121+
throw new Error('connect hook error')
122+
}
123+
},
124+
})
125+
for (let i = 0; i < 10; i++) {
126+
let threw = false
127+
try {
128+
await pool.connect()
129+
} catch (err) {
130+
threw = true
131+
expect(err.message).to.equal('connect hook error')
132+
}
133+
expect(threw).to.equal(true)
134+
}
135+
expect(errorCount).to.equal(10)
136+
expect(pool.totalCount).to.equal(0)
137+
expect(pool.idleCount).to.equal(0)
138+
const client1 = await pool.connect()
139+
const res1 = await client1.query('SELECT 1 AS num')
140+
expect(res1.rows[0].num).to.equal(1)
141+
const client2 = await pool.connect()
142+
const res2 = await client2.query('SELECT 2 AS num')
143+
expect(res2.rows[0].num).to.equal(2)
144+
expect(pool.totalCount).to.equal(2)
145+
client1.release()
146+
client2.release()
147+
await pool.end()
148+
})
149+
150+
it('errors out the connect call if the connect hook throws', async () => {
151+
const pool = new Pool({
152+
onConnect: () => {
153+
throw new Error('connect hook error')
154+
},
155+
})
156+
try {
157+
await pool.connect()
158+
throw new Error('Expected connect to throw')
159+
} catch (err) {
160+
expect(err.message).to.equal('connect hook error')
161+
}
162+
await pool.end()
163+
})
164+
})

packages/pg-pool/test/timeout.js

Whitespace-only changes.

0 commit comments

Comments
 (0)