Skip to content

Commit ab3ef77

Browse files
committed
feat(pg-native): pipeline mode
1 parent f332f28 commit ab3ef77

3 files changed

Lines changed: 556 additions & 4 deletions

File tree

packages/pg-native/index.js

Lines changed: 322 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const Client = (module.exports = function (config) {
1717
this.pq = new Libpq()
1818
this._reading = false
1919
this._read = this._read.bind(this)
20+
this._readPipeline = this._readPipeline.bind(this)
2021

2122
// allow custom type conversion to be passed in
2223
this._types = config.types || types
@@ -28,6 +29,14 @@ const Client = (module.exports = function (config) {
2829
this._rows = undefined
2930
this._results = undefined
3031

32+
// Pipeline mode configuration
33+
this._pipelineMode = config.pipelineMode || false
34+
this._pipelineMaxQueries = config.pipelineMaxQueries || 1000
35+
this._pipelineEnabled = false
36+
this._pipelineQueue = [] // Queue of pending queries with callbacks
37+
this._pipelinePendingCount = 0 // Count of sent but not yet resolved queries
38+
this._pipelineCallbacks = [] // Callbacks for sent queries awaiting results
39+
3140
// lazy start the reader if notifications are listened for
3241
// this way if you only run sync queries you wont block
3342
// the event loop artificially
@@ -43,20 +52,46 @@ const Client = (module.exports = function (config) {
4352
util.inherits(Client, EventEmitter)
4453

4554
Client.prototype.connect = function (params, cb) {
46-
this.pq.connect(params, cb)
55+
if (typeof params === 'function') {
56+
cb = params
57+
params = undefined
58+
}
59+
60+
const self = this
61+
this.pq.connect(params, function (err) {
62+
if (err) return cb(err)
63+
64+
// enter pipeline mode if enabled and supported
65+
if (self._pipelineMode && self._pipelineModeSupported()) {
66+
self._enterPipelineMode()
67+
self._startPipelineReading()
68+
}
69+
70+
cb()
71+
})
4772
}
4873

4974
Client.prototype.connectSync = function (params) {
5075
this.pq.connectSync(params)
76+
77+
// enter pipeline mode if enabled and supported
78+
if (this._pipelineMode && this._pipelineModeSupported()) {
79+
this._enterPipelineMode()
80+
this._startPipelineReading()
81+
}
5182
}
5283

5384
Client.prototype.query = function (text, values, cb) {
54-
let queryFn
55-
5685
if (typeof values === 'function') {
5786
cb = values
87+
values = undefined
88+
}
89+
90+
if (this._pipelineMode) {
91+
return this._pipelineQuery(text, values, cb)
5892
}
5993

94+
let queryFn
6095
if (Array.isArray(values)) {
6196
queryFn = () => {
6297
return this.pq.sendQueryParams(text, values)
@@ -329,3 +364,287 @@ Client.prototype._onReadyForQuery = function () {
329364
cb(err, rows || [], results)
330365
}
331366
}
367+
368+
// Check if pipeline mode is supported
369+
Client.prototype._pipelineModeSupported = function () {
370+
return this.pq.pipelineModeSupported()
371+
}
372+
373+
// Enter pipeline mode - allows sending multiple queries without waiting for results
374+
Client.prototype._enterPipelineMode = function () {
375+
if (!this._pipelineModeSupported()) {
376+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+')
377+
}
378+
const result = this.pq.enterPipelineMode()
379+
if (result) {
380+
this._pipelineEnabled = true
381+
this.pq.setNonBlocking(true)
382+
}
383+
return result
384+
}
385+
386+
// Exit pipeline mode
387+
Client.prototype._exitPipelineMode = function () {
388+
if (!this._pipelineEnabled) {
389+
return true
390+
}
391+
const result = this.pq.exitPipelineMode()
392+
if (result) {
393+
this._pipelineEnabled = false
394+
this._pipelineQueue = []
395+
this._pipelinePendingCount = 0
396+
this._pipelineCallbacks = []
397+
}
398+
return result
399+
}
400+
401+
// Get current pipeline status (0=off, 1=on, 2=aborted)
402+
Client.prototype._pipelineStatus = function () {
403+
return this.pq.pipelineStatus()
404+
}
405+
406+
// Send a sync point in the pipeline to trigger result delivery
407+
Client.prototype._pipelineSync = function () {
408+
return this.pq.pipelineSync()
409+
}
410+
411+
// Execute a query in pipeline mode
412+
// Called by query() when pipelineMode is enabled
413+
Client.prototype._pipelineQuery = function (text, values, cb) {
414+
// if pipeline mode is not enabled but was configured, auto-enter
415+
if (this._pipelineMode && !this._pipelineEnabled) {
416+
this._enterPipelineMode()
417+
this._startPipelineReading()
418+
}
419+
420+
if (!this._pipelineEnabled) {
421+
const err = new Error('Pipeline mode is not enabled. Set pipelineMode: true in config.')
422+
if (cb) return setImmediate(() => cb(err))
423+
return Promise.reject(err)
424+
}
425+
426+
// Check if we need to apply backpressure
427+
if (this._pipelinePendingCount >= this._pipelineMaxQueries) {
428+
// Queue the query for later execution
429+
const queued = { text, values, cb }
430+
this._pipelineQueue.push(queued)
431+
432+
if (!cb) {
433+
return new Promise((resolve, reject) => {
434+
queued.cb = (err, rows, result) => {
435+
if (err) reject(err)
436+
else resolve(rows)
437+
}
438+
})
439+
}
440+
return
441+
}
442+
443+
// Send the query - always use sendQueryParams in pipeline mode
444+
// (sendQuery is not allowed in pipeline mode)
445+
const params = Array.isArray(values) ? values : []
446+
const sent = this.pq.sendQueryParams(text, params)
447+
448+
if (!sent) {
449+
const err = new Error(this.pq.errorMessage() || 'Failed to send query in pipeline mode')
450+
if (cb) return setImmediate(() => cb(err))
451+
return Promise.reject(err)
452+
}
453+
454+
this._pipelinePendingCount++
455+
456+
// Send sync to mark this query and trigger result delivery
457+
this._pipelineSync()
458+
459+
// Flush to ensure the query and sync are sent
460+
this.pq.flush()
461+
462+
if (cb) {
463+
this._pipelineCallbacks.push({ cb, resultCount: 0, rows: undefined, results: undefined, error: undefined })
464+
return
465+
}
466+
467+
return new Promise((resolve, reject) => {
468+
this._pipelineCallbacks.push({
469+
cb: (err, rows, result) => {
470+
if (err) reject(err)
471+
else resolve(rows)
472+
},
473+
resultCount: 0,
474+
rows: undefined,
475+
results: undefined,
476+
error: undefined,
477+
})
478+
})
479+
}
480+
481+
// Flush and wait for all pending pipeline queries to complete
482+
Client.prototype._pipelineFlush = function (cb) {
483+
if (!this._pipelineEnabled) {
484+
if (cb) return setImmediate(cb)
485+
return Promise.resolve()
486+
}
487+
488+
// Send sync to mark end of current batch
489+
this._pipelineSync()
490+
this.pq.flush()
491+
492+
// Add a sync callback marker
493+
if (cb) {
494+
this._pipelineCallbacks.push({ cb, isSync: true })
495+
return
496+
}
497+
498+
return new Promise((resolve, reject) => {
499+
this._pipelineCallbacks.push({
500+
cb: (err) => {
501+
if (err) reject(err)
502+
else resolve()
503+
},
504+
isSync: true,
505+
})
506+
})
507+
}
508+
509+
// Start reading for pipeline mode
510+
Client.prototype._startPipelineReading = function () {
511+
if (this._reading) return
512+
this._reading = true
513+
this.pq.on('readable', this._readPipeline)
514+
this.pq.startReader()
515+
}
516+
517+
// Stop reading for pipeline mode
518+
Client.prototype._stopPipelineReading = function () {
519+
if (!this._reading) return
520+
this._reading = false
521+
this.pq.stopReader()
522+
this.pq.removeListener('readable', this._readPipeline)
523+
}
524+
525+
// Read handler specificaly for pipeline mode
526+
Client.prototype._readPipeline = function () {
527+
const pq = this.pq
528+
529+
// Read waiting data from the socket
530+
if (!pq.consumeInput()) {
531+
// Read error and notify all pending callbacks
532+
const err = new Error(pq.errorMessage())
533+
this._pipelineCallbacks.forEach((pending) => {
534+
if (pending.cb) pending.cb(err)
535+
})
536+
this._pipelineCallbacks = []
537+
this._pipelinePendingCount = 0
538+
return
539+
}
540+
541+
// Process all available results
542+
// In pipeline mode, getResult() returns false for NULL markers between results
543+
// We should only break when isBusy() becomes true
544+
let loopCount = 0
545+
const maxLoops = 1000 // Safety limit
546+
while (!pq.isBusy() && loopCount < maxLoops) {
547+
loopCount++
548+
const hasResult = pq.getResult()
549+
if (!hasResult) {
550+
// NULL result - this is normal between query results and sync markers
551+
// Check if there are more pending callbacks, if so try again
552+
if (this._pipelineCallbacks.length === 0) {
553+
break
554+
}
555+
// Try one more time in case there's another result
556+
continue
557+
}
558+
559+
const status = pq.resultStatus()
560+
561+
// Handle pipeline sync point
562+
if (status === 'PGRES_PIPELINE_SYNC') {
563+
// Find and call the sync callback
564+
const idx = this._pipelineCallbacks.findIndex((p) => p.isSync)
565+
if (idx !== -1) {
566+
const syncCb = this._pipelineCallbacks.splice(idx, 1)[0]
567+
if (syncCb.cb) syncCb.cb()
568+
}
569+
continue
570+
}
571+
572+
// Get the next pending callbacks
573+
if (this._pipelineCallbacks.length === 0) {
574+
continue
575+
}
576+
577+
const pending = this._pipelineCallbacks[0]
578+
if (pending.isSync) {
579+
continue
580+
}
581+
582+
if (status === 'PGRES_FATAL_ERROR') {
583+
pending.error = new Error(pq.resultErrorMessage())
584+
} else if (status === 'PGRES_TUPLES_OK' || status === 'PGRES_COMMAND_OK' || status === 'PGRES_EMPTY_QUERY') {
585+
const result = this._consumeQueryResults(pq)
586+
if (pending.resultCount === 0) {
587+
pending.rows = result.rows
588+
pending.results = result
589+
} else if (pending.resultCount === 1) {
590+
pending.rows = [pending.rows, result.rows]
591+
pending.results = [pending.results, result]
592+
} else {
593+
pending.rows.push(result.rows)
594+
pending.results.push(result)
595+
}
596+
pending.resultCount++
597+
}
598+
599+
// Check if we need to get more results for this query (null result marks end)
600+
// For pipeline mode, each query gets exactly one result set typically
601+
// We complete the callback and move to the next one
602+
this._pipelineCallbacks.shift()
603+
this._pipelinePendingCount--
604+
605+
if (pending.cb) {
606+
pending.cb(pending.error, pending.rows || [], pending.results)
607+
}
608+
609+
// Process queued queries now that we have room
610+
this._processQueuedPipelineQueries()
611+
}
612+
613+
// Check for notifications
614+
let notice = pq.notifies()
615+
while (notice) {
616+
this.emit('notification', notice)
617+
notice = pq.notifies()
618+
}
619+
}
620+
621+
// Process queued pipeline queries when there's room
622+
Client.prototype._processQueuedPipelineQueries = function () {
623+
while (this._pipelineQueue.length > 0 && this._pipelinePendingCount < this._pipelineMaxQueries) {
624+
const queued = this._pipelineQueue.shift()
625+
626+
// Always use sendQueryParams in pipeline mode
627+
const params = Array.isArray(queued.values) ? queued.values : []
628+
const sent = this.pq.sendQueryParams(queued.text, params)
629+
630+
if (!sent) {
631+
const err = new Error(this.pq.errorMessage() || 'Failed to send queued query in pipeline mode')
632+
if (queued.cb) queued.cb(err)
633+
continue
634+
}
635+
636+
this._pipelinePendingCount++
637+
638+
// Send sync to trigger result delivery
639+
this._pipelineSync()
640+
this.pq.flush()
641+
642+
this._pipelineCallbacks.push({
643+
cb: queued.cb,
644+
resultCount: 0,
645+
rows: undefined,
646+
results: undefined,
647+
error: undefined,
648+
})
649+
}
650+
}

packages/pg-native/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"homepage": "https://github.com/brianc/node-postgres/tree/master/packages/pg-native",
3636
"dependencies": {
37-
"libpq": "^1.8.15",
37+
"libpq": "^1.9.0",
3838
"pg-types": "2.2.0"
3939
},
4040
"devDependencies": {

0 commit comments

Comments
 (0)