Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,15 @@ class Pool extends EventEmitter {
this.emit('release', err, client)

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (
err ||
this.ending ||
!client._queryable ||
client._ending ||
client._txStatus === 'T' ||
client._txStatus === 'E' ||
client._poolUseCount >= this.options.maxUses
) {
if (client._poolUseCount >= this.options.maxUses) {
this.log('remove expended client')
}
Expand Down
82 changes: 82 additions & 0 deletions packages/pg-pool/test/poison-pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
'use strict'

const expect = require('expect.js')
const describe = require('mocha').describe
const it = require('mocha').it
const Pool = require('..')

describe('poison connection pool defense (_txStatus check)', function () {
it('removes a client with an open transaction on release', async function () {
const pool = new Pool({ max: 1 })
const client = await pool.connect()
await client.query('BEGIN')
expect(client._txStatus).to.be('T')

client.release()
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)

// pool should still work by creating a fresh connection
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
await pool.end()
})

it('removes a client in a failed transaction state on release', async function () {
const pool = new Pool({ max: 1 })
const client = await pool.connect()
await client.query('BEGIN')
try {
await client.query('SELECT invalid_column FROM nonexistent_table')
} catch (e) {
// swallow the error to avoid pool close the connection
}
// The ReadyForQuery message with status 'E' may arrive on a separate I/O event.
// Issue a follow-up query to ensure it has been processed — this will also fail
// (since the transaction is aborted) but guarantees _txStatus is updated.
try {
await client.query('SELECT 1')
} catch (e) {
// expected — "current transaction is aborted"
}
expect(client._txStatus).to.be('E')

client.release()
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)

// pool should still work
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
await pool.end()
})

it('only removes connections with open transactions, keeps idle ones', async function () {
const pool = new Pool({ max: 3 })
const clientA = await pool.connect()
const clientB = await pool.connect()
const clientC = await pool.connect()

// Client A: open transaction (poisoned)
await clientA.query('BEGIN')
expect(clientA._txStatus).to.be('T')

// Client B: normal query (idle)
await clientB.query('SELECT 1')
expect(clientB._txStatus).to.be('I')

// Client C: committed transaction (idle)
await clientC.query('BEGIN')
await clientC.query('COMMIT')
expect(clientC._txStatus).to.be('I')

clientA.release()
clientB.release()
clientC.release()

// A was removed, B and C kept
expect(pool.totalCount).to.be(2)
expect(pool.idleCount).to.be(2)
await pool.end()
})
})
2 changes: 2 additions & 0 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Client extends EventEmitter {
this._connectionError = false
this._queryable = true
this._activeQuery = null
this._txStatus = null

this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
this.connection =
Expand Down Expand Up @@ -356,6 +357,7 @@ class Client extends EventEmitter {
}
const activeQuery = this._getActiveQuery()
this._activeQuery = null
this._txStatus = msg?.status ?? null
Comment thread
panga marked this conversation as resolved.
this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
Expand Down