Skip to content

Commit 1aacec9

Browse files
author
Simone Nigro
committed
feat(pipeline-mode): add backpressure support with configurable query limits
1 parent ad32396 commit 1aacec9

4 files changed

Lines changed: 487 additions & 0 deletions

File tree

docs/pages/features/queries.mdx

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,37 @@ await client.query('INSERT INTO t VALUES (2)') // also fails: transaction aborte
211211
await client.query('COMMIT') // returns ROLLBACK command
212212
```
213213

214+
### Backpressure
215+
216+
When submitting a large number of queries in pipeline mode, you may want to limit how many queries are pending at once to prevent memory issues. The client supports configurable backpressure:
217+
218+
```js
219+
const client = new Client({
220+
pipelineMode: true,
221+
pipelineMaxQueries: 100, // default is 1000
222+
})
223+
```
224+
225+
When the number of pending queries reaches `pipelineMaxQueries`, new queries are queued internally and will be sent once space becomes available. The client emits events to help you monitor backpressure:
226+
227+
```js
228+
client.on('pipelineFull', () => {
229+
console.log('Pipeline is full, new queries will wait')
230+
})
231+
232+
client.on('pipelineDrain', () => {
233+
console.log('Pipeline has drained, accepting new queries')
234+
})
235+
```
236+
237+
You can also check the current pipeline depth:
238+
239+
```js
240+
console.log(client.pendingQueryCount) // number of queries waiting for results
241+
```
242+
243+
The `pipelineDrain` event is emitted when the pending query count drops below 75% of `pipelineMaxQueries` (the "low water mark").
244+
214245
### When to Use Pipeline Mode
215246

216247
Pipeline mode is most beneficial when:

packages/pg/lib/client.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ class Client extends EventEmitter {
8484
// Pipeline mode configuration
8585
this._pipelineMode = Boolean(c.pipelineMode) || false
8686

87+
// Backpressure configuration for pipeline mode
88+
this._pipelineMaxQueries = c.pipelineMaxQueries || 1000
89+
this._lowWaterMark = Math.floor(this._pipelineMaxQueries * 0.75)
90+
this._pipelinePaused = false
91+
this._waitingForDrain = []
92+
8793
// Queue for tracking pending query results in pipeline mode
8894
this._pendingQueries = []
8995

@@ -132,6 +138,11 @@ class Client extends EventEmitter {
132138
return this._pipelineMode
133139
}
134140

141+
// Property for current pipeline depth (number of pending queries)
142+
get pendingQueryCount() {
143+
return this._pendingQueries.length + this._queryQueue.length
144+
}
145+
135146
_errorAllQueries(err) {
136147
const enqueueError = (query) => {
137148
process.nextTick(() => {
@@ -394,6 +405,9 @@ class Client extends EventEmitter {
394405
) {
395406
this._pendingQueries.shift()
396407
currentQuery.handleReadyForQuery(this.connection)
408+
409+
// Check if backpressure should be released after query completion
410+
this._checkBackpressureResume()
397411
}
398412

399413
// Check if more queries to send
@@ -774,6 +788,20 @@ class Client extends EventEmitter {
774788
return this._pendingQueries[0]
775789
}
776790

791+
// Called when queries complete to check if backpressure should be released
792+
// Resumes accepting new queries when pipeline depth drops below low water mark
793+
_checkBackpressureResume() {
794+
if (this._pipelinePaused && this.pendingQueryCount < this._lowWaterMark) {
795+
this._pipelinePaused = false
796+
this.emit('pipelineDrain')
797+
798+
// Resume all waiting queries
799+
const waiting = this._waitingForDrain
800+
this._waitingForDrain = []
801+
waiting.forEach((resolve) => resolve())
802+
}
803+
}
804+
777805
query(config, values, callback) {
778806
// can take in strings, config object or query object
779807
let query
@@ -875,11 +903,37 @@ class Client extends EventEmitter {
875903
return result
876904
}
877905

906+
// Backpressure handling for pipeline mode
907+
// we queue the query and delay its execution
908+
// the query is added to the queue immediately, but _pulseQueryQueue respects backpressure
909+
if (this._pipelineMode && this.pendingQueryCount >= this._pipelineMaxQueries) {
910+
// Emit pipelineFull event if not already paused
911+
if (!this._pipelinePaused) {
912+
this._pipelinePaused = true
913+
this.emit('pipelineFull')
914+
}
915+
// Queue the query with backpressure - it will be processed when space is available
916+
this._queueQueryWithBackpressure(query)
917+
return result
918+
}
919+
878920
this._queryQueue.push(query)
879921
this._pulseQueryQueue()
880922
return result
881923
}
882924

925+
// Internal method to queue a query when backpressure is active
926+
// The query waits for drain before being added to the main queue
927+
_queueQueryWithBackpressure(query) {
928+
const waitForDrain = new this._Promise((resolve) => {
929+
this._waitingForDrain.push(resolve)
930+
})
931+
waitForDrain.then(() => {
932+
this._queryQueue.push(query)
933+
this._pulseQueryQueue()
934+
})
935+
}
936+
883937
ref() {
884938
this.connection.ref()
885939
}

packages/pg/test/integration/client/pipeline-mode-tests.js

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,3 +1655,207 @@ suite.test('pipeline mode - Pool with size 1 maintains transaction isolation', (
16551655
pool.end().then(() => done(err))
16561656
})
16571657
})
1658+
1659+
suite.test('pipeline mode - pipelineFull event is emitted when pipelineMaxQueries is reached', (done) => {
1660+
// Use a small pipelineMaxQueries to trigger backpressure quickly
1661+
const client = new Client({ pipelineMode: true, pipelineMaxQueries: 10 })
1662+
1663+
let pipelineFullEmitted = false
1664+
client.on('pipelineFull', () => {
1665+
pipelineFullEmitted = true
1666+
})
1667+
1668+
client.connect((err) => {
1669+
if (err) return done(err)
1670+
1671+
// Submit more queries than pipelineMaxQueries to trigger backpressure
1672+
const promises = []
1673+
for (let i = 0; i < 15; i++) {
1674+
promises.push(client.query('SELECT $1::int as num, pg_sleep(0.01)', [i]))
1675+
}
1676+
1677+
Promise.all(promises)
1678+
.then((results) => {
1679+
// Verify all queries completed successfully
1680+
assert.equal(results.length, 15, 'All 15 queries should complete')
1681+
1682+
// Verify pipelineFull event was emitted
1683+
assert.ok(pipelineFullEmitted, 'pipelineFull event should have been emitted')
1684+
1685+
client.end(done)
1686+
})
1687+
.catch((err) => {
1688+
client.end(() => done(err))
1689+
})
1690+
})
1691+
})
1692+
1693+
suite.test('pipeline mode - pipelineDrain event is emitted when pipeline depth drops below low water mark', (done) => {
1694+
// Use a small pipelineMaxQueries to trigger backpressure quickly
1695+
// lowWaterMark will be 75% of 10 = 7
1696+
const client = new Client({ pipelineMode: true, pipelineMaxQueries: 10 })
1697+
1698+
let pipelineFullEmitted = false
1699+
let pipelineDrainEmitted = false
1700+
1701+
client.on('pipelineFull', () => {
1702+
pipelineFullEmitted = true
1703+
})
1704+
1705+
client.on('pipelineDrain', () => {
1706+
pipelineDrainEmitted = true
1707+
})
1708+
1709+
client.connect((err) => {
1710+
if (err) return done(err)
1711+
1712+
// Submit more queries than pipelineMaxQueries to trigger backpressure
1713+
// Then wait for them to complete and verify pipelineDrain is emitted
1714+
const promises = []
1715+
for (let i = 0; i < 15; i++) {
1716+
promises.push(client.query('SELECT $1::int as num, pg_sleep(0.01)', [i]))
1717+
}
1718+
1719+
Promise.all(promises)
1720+
.then((results) => {
1721+
// Verify all queries completed successfully
1722+
assert.equal(results.length, 15, 'All 15 queries should complete')
1723+
1724+
// Verify both events were emitted
1725+
assert.ok(pipelineFullEmitted, 'pipelineFull event should have been emitted')
1726+
assert.ok(
1727+
pipelineDrainEmitted,
1728+
'pipelineDrain event should have been emitted when pipeline depth dropped below low water mark'
1729+
)
1730+
1731+
client.end(done)
1732+
})
1733+
.catch((err) => {
1734+
client.end(() => done(err))
1735+
})
1736+
})
1737+
})
1738+
1739+
suite.test('pipeline mode - backpressure activation with real PostgreSQL connection', (done) => {
1740+
// Test that backpressure correctly pauses query submission when pipelineMaxQueries is reached
1741+
const pipelineMaxQueries = 5
1742+
const client = new Client({ pipelineMode: true, pipelineMaxQueries: pipelineMaxQueries })
1743+
1744+
let pipelineFullCount = 0
1745+
let maxObservedPendingCount = 0
1746+
1747+
client.on('pipelineFull', () => {
1748+
pipelineFullCount++
1749+
// Record the pending count when pipelineFull is emitted
1750+
if (client.pendingQueryCount > maxObservedPendingCount) {
1751+
maxObservedPendingCount = client.pendingQueryCount
1752+
}
1753+
})
1754+
1755+
client.connect((err) => {
1756+
if (err) return done(err)
1757+
1758+
// Submit many queries - backpressure should prevent pendingQueryCount from exceeding pipelineMaxQueries
1759+
const numQueries = 20
1760+
const promises = []
1761+
1762+
for (let i = 0; i < numQueries; i++) {
1763+
promises.push(
1764+
client.query('SELECT $1::int as num, pg_sleep(0.02)', [i]).then((r) => ({ success: true, num: r.rows[0].num }))
1765+
)
1766+
}
1767+
1768+
Promise.all(promises)
1769+
.then((results) => {
1770+
// All queries should complete successfully
1771+
assert.equal(results.length, numQueries, `All ${numQueries} queries should complete`)
1772+
results.forEach((r, idx) => {
1773+
assert.equal(r.success, true, `Query ${idx} should succeed`)
1774+
assert.equal(r.num, idx.toString(), `Query ${idx} should return correct value`)
1775+
})
1776+
1777+
// pipelineFull should have been emitted at least once
1778+
assert.ok(pipelineFullCount >= 1, 'pipelineFull event should have been emitted at least once')
1779+
1780+
// pendingQueryCount should never have exceeded pipelineMaxQueries significantly
1781+
// (there may be slight timing variations, so we allow a small buffer)
1782+
assert.ok(
1783+
maxObservedPendingCount <= pipelineMaxQueries + 2,
1784+
`maxObservedPendingCount (${maxObservedPendingCount}) should not significantly exceed pipelineMaxQueries (${pipelineMaxQueries})`
1785+
)
1786+
1787+
client.end(done)
1788+
})
1789+
.catch((err) => {
1790+
client.end(() => done(err))
1791+
})
1792+
})
1793+
})
1794+
1795+
suite.test('pipeline mode - backpressure release and query resumption', (done) => {
1796+
// Test that queries resume correctly after backpressure is released
1797+
const pipelineMaxQueries = 5
1798+
const client = new Client({ pipelineMode: true, pipelineMaxQueries: pipelineMaxQueries })
1799+
1800+
let pipelineFullEmitted = false
1801+
let pipelineDrainEmitted = false
1802+
const eventOrder = []
1803+
1804+
client.on('pipelineFull', () => {
1805+
pipelineFullEmitted = true
1806+
eventOrder.push('full')
1807+
})
1808+
1809+
client.on('pipelineDrain', () => {
1810+
pipelineDrainEmitted = true
1811+
eventOrder.push('drain')
1812+
})
1813+
1814+
client.connect((err) => {
1815+
if (err) return done(err)
1816+
1817+
// Submit queries that will trigger backpressure and then drain
1818+
const numQueries = 15
1819+
const promises = []
1820+
const completionOrder = []
1821+
1822+
for (let i = 0; i < numQueries; i++) {
1823+
promises.push(
1824+
client.query('SELECT $1::int as num, pg_sleep(0.01)', [i]).then((r) => {
1825+
completionOrder.push(parseInt(r.rows[0].num))
1826+
return r.rows[0].num
1827+
})
1828+
)
1829+
}
1830+
1831+
Promise.all(promises)
1832+
.then((results) => {
1833+
// All queries should complete
1834+
assert.equal(results.length, numQueries, `All ${numQueries} queries should complete`)
1835+
1836+
// Verify pipelineFull was emitted (backpressure activated)
1837+
assert.ok(pipelineFullEmitted, 'pipelineFull event should have been emitted')
1838+
1839+
// Verify pipelineDrain was emitted (backpressure released)
1840+
assert.ok(pipelineDrainEmitted, 'pipelineDrain event should have been emitted')
1841+
1842+
// Verify event order: full should come before drain
1843+
const fullIndex = eventOrder.indexOf('full')
1844+
const drainIndex = eventOrder.indexOf('drain')
1845+
assert.ok(fullIndex < drainIndex, 'pipelineFull should be emitted before pipelineDrain')
1846+
1847+
// Verify queries completed in submission order (pipeline preserves order)
1848+
for (let i = 0; i < completionOrder.length; i++) {
1849+
assert.equal(completionOrder[i], i, `Query ${i} should complete in order`)
1850+
}
1851+
1852+
// Verify pendingQueryCount is 0 after all queries complete
1853+
assert.equal(client.pendingQueryCount, 0, 'pendingQueryCount should be 0 after all queries complete')
1854+
1855+
client.end(done)
1856+
})
1857+
.catch((err) => {
1858+
client.end(() => done(err))
1859+
})
1860+
})
1861+
})

0 commit comments

Comments
 (0)