Skip to content

Commit 08a23b7

Browse files
authored
feat: pipeline mode
1 parent 9174783 commit 08a23b7

4 files changed

Lines changed: 341 additions & 7 deletions

File tree

packages/pg/lib/client.js

Lines changed: 180 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ class Client extends EventEmitter {
8383
this.binary = c.binary || defaults.binary
8484
this.processID = null
8585
this.secretKey = null
86+
this._pipelining = false
87+
this._pipelineQueue = []
88+
this._pipelineActive = false
8689
this.ssl = this.connectionParameters.ssl || false
8790
// As with Password, make SSL->Key (the private key) non-enumerable.
8891
// It won't show up in stack traces
@@ -125,6 +128,12 @@ class Client extends EventEmitter {
125128

126129
this._queryQueue.forEach(enqueueError)
127130
this._queryQueue.length = 0
131+
132+
// Also error all pipeline queries
133+
if (this._pipelineQueue) {
134+
this._pipelineQueue.forEach(enqueueError)
135+
this._pipelineQueue.length = 0
136+
}
128137
}
129138

130139
_connect(callback) {
@@ -354,6 +363,11 @@ class Client extends EventEmitter {
354363
}
355364
this.emit('connect')
356365
}
366+
367+
if (this._pipelining) {
368+
return this._handlePipelineReadyForQuery(msg)
369+
}
370+
357371
const activeQuery = this._getActiveQuery()
358372
this._activeQuery = null
359373
this.readyForQuery = true
@@ -363,6 +377,22 @@ class Client extends EventEmitter {
363377
this._pulseQueryQueue()
364378
}
365379

380+
_handlePipelineReadyForQuery(msg) {
381+
// In pipeline mode, handle completed queries
382+
if (this._pipelineQueue.length > 0) {
383+
const completedQuery = this._pipelineQueue.shift()
384+
if (completedQuery) {
385+
completedQuery.handleReadyForQuery(this.connection)
386+
}
387+
}
388+
389+
// If no more queries in pipeline, we're ready for more
390+
if (this._pipelineQueue.length === 0) {
391+
this.readyForQuery = true
392+
this.emit('drain')
393+
}
394+
}
395+
366396
// if we receive an error event or error message
367397
// during the connection process we handle it here
368398
_handleErrorWhileConnecting(err) {
@@ -408,33 +438,53 @@ class Client extends EventEmitter {
408438

409439
_handleRowDescription(msg) {
410440
// delegate rowDescription to active query
411-
this._getActiveQuery().handleRowDescription(msg)
441+
const query = this._getCurrentQuery()
442+
if (query) {
443+
query.handleRowDescription(msg)
444+
}
412445
}
413446

414447
_handleDataRow(msg) {
415448
// delegate dataRow to active query
416-
this._getActiveQuery().handleDataRow(msg)
449+
const query = this._getCurrentQuery()
450+
if (query) {
451+
query.handleDataRow(msg)
452+
}
417453
}
418454

419455
_handlePortalSuspended(msg) {
420456
// delegate portalSuspended to active query
421-
this._getActiveQuery().handlePortalSuspended(this.connection)
457+
const query = this._getCurrentQuery()
458+
if (query) {
459+
query.handlePortalSuspended(this.connection)
460+
}
422461
}
423462

424463
_handleEmptyQuery(msg) {
425464
// delegate emptyQuery to active query
426-
this._getActiveQuery().handleEmptyQuery(this.connection)
465+
const query = this._getCurrentQuery()
466+
if (query) {
467+
query.handleEmptyQuery(this.connection)
468+
}
427469
}
428470

429471
_handleCommandComplete(msg) {
430-
const activeQuery = this._getActiveQuery()
431-
if (activeQuery == null) {
472+
const query = this._getCurrentQuery()
473+
if (query == null) {
432474
const error = new Error('Received unexpected commandComplete message from backend.')
433475
this._handleErrorEvent(error)
434476
return
435477
}
436478
// delegate commandComplete to active query
437-
activeQuery.handleCommandComplete(msg, this.connection)
479+
query.handleCommandComplete(msg, this.connection)
480+
}
481+
482+
_getCurrentQuery() {
483+
if (this._pipelining) {
484+
// In pipeline mode, return the first query in the pipeline queue
485+
return this._pipelineQueue.length > 0 ? this._pipelineQueue[0] : null
486+
}
487+
return this._getActiveQuery()
438488
}
439489

440490
_handleParseComplete() {
@@ -644,6 +694,10 @@ class Client extends EventEmitter {
644694
return result
645695
}
646696

697+
if (this._pipelining) {
698+
return this._pipelineQuery(query, result)
699+
}
700+
647701
this._queryQueue.push(query)
648702
this._pulseQueryQueue()
649703
return result
@@ -689,6 +743,125 @@ class Client extends EventEmitter {
689743
queryQueueDeprecationNotice()
690744
return this._queryQueue
691745
}
746+
747+
get pipelining() {
748+
return this._pipelining
749+
}
750+
751+
set pipelining(value) {
752+
if (typeof value !== 'boolean') {
753+
throw new TypeError('pipelining must be a boolean')
754+
}
755+
756+
if (this._pipelining === value) {
757+
return
758+
}
759+
760+
if (value && !this._connected) {
761+
throw new Error('Cannot enable pipelining mode before connection is established')
762+
}
763+
764+
if (value && this._getActiveQuery()) {
765+
throw new Error('Cannot enable pipelining mode while a query is active')
766+
}
767+
768+
if (value) {
769+
this._enterPipelineMode()
770+
} else {
771+
this._exitPipelineMode()
772+
}
773+
}
774+
775+
_enterPipelineMode() {
776+
if (this._pipelining) {
777+
return
778+
}
779+
780+
this._pipelining = true
781+
this._pipelineActive = true
782+
this._pipelineQueue = []
783+
784+
// Send pipeline mode command to server
785+
this.connection.enterPipelineMode()
786+
}
787+
788+
_exitPipelineMode() {
789+
if (!this._pipelining) {
790+
return
791+
}
792+
793+
// Process any remaining queries in pipeline
794+
if (this._pipelineQueue.length > 0) {
795+
throw new Error('Cannot exit pipeline mode with pending queries')
796+
}
797+
798+
this._pipelining = false
799+
this._pipelineActive = false
800+
801+
// Send exit pipeline mode command to server
802+
this.connection.exitPipelineMode()
803+
}
804+
805+
_pipelineQuery(query, result) {
806+
// Validate query for pipeline mode
807+
if (query.text && typeof query.text === 'string' && query.text.includes(';')) {
808+
const error = new Error('Multiple SQL commands in a single query are not allowed in pipeline mode')
809+
process.nextTick(() => {
810+
query.handleError(error, this.connection)
811+
})
812+
return result
813+
}
814+
815+
// Disallow simple query protocol in pipeline mode
816+
if (!query.requiresPreparation()) {
817+
const error = new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries.')
818+
process.nextTick(() => {
819+
query.handleError(error, this.connection)
820+
})
821+
return result
822+
}
823+
824+
// Add query to pipeline queue
825+
this._pipelineQueue.push(query)
826+
827+
// Submit query using pipeline-specific method
828+
const queryError = this._submitPipelineQuery(query)
829+
if (queryError) {
830+
process.nextTick(() => {
831+
query.handleError(queryError, this.connection)
832+
// Remove from pipeline queue on error
833+
const index = this._pipelineQueue.indexOf(query)
834+
if (index > -1) {
835+
this._pipelineQueue.splice(index, 1)
836+
}
837+
})
838+
}
839+
840+
return result
841+
}
842+
843+
_submitPipelineQuery(query) {
844+
if (typeof query.text !== 'string' && typeof query.name !== 'string') {
845+
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
846+
}
847+
const previous = this.connection.parsedStatements[query.name]
848+
if (query.text && previous && query.text !== previous) {
849+
return new Error(`Prepared statements must be unique - '${query.name}' was used for a different statement`)
850+
}
851+
if (query.values && !Array.isArray(query.values)) {
852+
return new Error('Query values must be an array')
853+
}
854+
855+
// In pipeline mode, we always use extended query protocol
856+
this.connection.stream.cork && this.connection.stream.cork()
857+
try {
858+
query.preparePipeline(this.connection)
859+
} finally {
860+
this.connection.stream.uncork && this.connection.stream.uncork()
861+
}
862+
863+
return null
864+
}
692865
}
693866

694867
// expose a Query constructor

packages/pg/lib/connection.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,16 @@ class Connection extends EventEmitter {
217217
sendCopyFail(msg) {
218218
this._send(serialize.copyFail(msg))
219219
}
220+
221+
enterPipelineMode() {
222+
this._pipelineMode = true
223+
}
224+
225+
exitPipelineMode() {
226+
// Send sync to end pipeline mode
227+
this.sync()
228+
this._pipelineMode = false
229+
}
220230
}
221231

222232
module.exports = Connection

packages/pg/lib/query.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,41 @@ class Query extends EventEmitter {
247247
handleCopyData(msg, connection) {
248248
// noop
249249
}
250+
251+
preparePipeline(connection) {
252+
if (!this.hasBeenParsed(connection)) {
253+
connection.parse({
254+
text: this.text,
255+
name: this.name,
256+
types: this.types,
257+
})
258+
}
259+
260+
try {
261+
connection.bind({
262+
portal: this.portal,
263+
statement: this.name,
264+
values: this.values,
265+
binary: this.binary,
266+
valueMapper: utils.prepareValue,
267+
})
268+
} catch (err) {
269+
this.handleError(err, connection)
270+
return
271+
}
272+
273+
connection.describe({
274+
type: 'P',
275+
name: this.portal || '',
276+
})
277+
278+
connection.execute({
279+
portal: this.portal,
280+
rows: this.rows,
281+
})
282+
283+
connection.flush()
284+
}
250285
}
251286

252287
module.exports = Query

0 commit comments

Comments
 (0)