Skip to content

Commit 2adc857

Browse files
author
Simone Nigro
committed
refactor(client): extract message target logic and reject custom submittables in pipeline mode (like pg-cursor)
1 parent 1780d1f commit 2adc857

3 files changed

Lines changed: 50 additions & 6 deletions

File tree

docs/pages/features/queries.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ const results = await Promise.all([
194194

195195
- **No multi-statement queries**: Queries with multiple SQL statements separated by `;` are rejected
196196
- **No COPY operations**: COPY commands are not supported in pipeline mode
197+
- **No pg-cursor**: Custom submittables like `pg-cursor` are not supported in pipeline mode
197198
- **JavaScript client only**: Pipeline mode is not available in `pg-native`
198199

199200
### When to Use Pipeline Mode

packages/pg/lib/client.js

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ class Client extends EventEmitter {
120120
return this._activeQuery
121121
}
122122

123+
// Returns the query that should receive the current message
124+
_getMessageTarget() {
125+
if (this._pipelineMode) {
126+
return this._getCurrentPipelineQuery()
127+
}
128+
return this._getActiveQuery()
129+
}
130+
123131
get pipelineMode() {
124132
return this._pipelineMode
125133
}
@@ -464,7 +472,7 @@ class Client extends EventEmitter {
464472
}
465473

466474
_handleRowDescription(msg) {
467-
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
475+
const activeQuery = this._getMessageTarget()
468476
if (activeQuery == null) {
469477
const error = new Error('Received unexpected rowDescription message from backend.')
470478
this._handleErrorEvent(error)
@@ -479,7 +487,7 @@ class Client extends EventEmitter {
479487
}
480488

481489
_handleDataRow(msg) {
482-
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
490+
const activeQuery = this._getMessageTarget()
483491
if (activeQuery == null) {
484492
const error = new Error('Received unexpected dataRow message from backend.')
485493
this._handleErrorEvent(error)
@@ -490,7 +498,7 @@ class Client extends EventEmitter {
490498
}
491499

492500
_handlePortalSuspended(msg) {
493-
const activeQuery = this._getActiveQuery()
501+
const activeQuery = this._getMessageTarget()
494502
if (activeQuery == null) {
495503
const error = new Error('Received unexpected portalSuspended message from backend.')
496504
this._handleErrorEvent(error)
@@ -501,7 +509,7 @@ class Client extends EventEmitter {
501509
}
502510

503511
_handleEmptyQuery(msg) {
504-
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
512+
const activeQuery = this._getMessageTarget()
505513
if (activeQuery == null) {
506514
const error = new Error('Received unexpected emptyQuery message from backend.')
507515
this._handleErrorEvent(error)
@@ -516,7 +524,7 @@ class Client extends EventEmitter {
516524
}
517525

518526
_handleCommandComplete(msg) {
519-
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
527+
const activeQuery = this._getMessageTarget()
520528
if (activeQuery == null) {
521529
const error = new Error('Received unexpected commandComplete message from backend.')
522530
this._handleErrorEvent(error)
@@ -531,7 +539,7 @@ class Client extends EventEmitter {
531539
}
532540

533541
_handleParseComplete() {
534-
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
542+
const activeQuery = this._getMessageTarget()
535543
if (activeQuery == null) {
536544
const error = new Error('Received unexpected parseComplete message from backend.')
537545
this._handleErrorEvent(error)
@@ -777,6 +785,15 @@ class Client extends EventEmitter {
777785
if (config === null || config === undefined) {
778786
throw new TypeError('Client was passed a null or undefined query')
779787
} else if (typeof config.submit === 'function') {
788+
// Check if this is a custom submittable (not our Query class)
789+
// Custom submittables like pg-cursor are not supported in pipeline mode
790+
if (this._pipelineMode && config.submit !== Query.prototype.submit) {
791+
const error = new Error('Custom submittables are not supported in pipeline mode')
792+
process.nextTick(() => {
793+
config.handleError(error, this.connection)
794+
})
795+
return config
796+
}
780797
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
781798
result = query = config
782799
if (typeof values === 'function') {

packages/pg/test/integration/client/pipeline-mode-tests.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,3 +1018,29 @@ suite.test('pipeline mode - rowMode array works', (done) => {
10181018
})
10191019
})
10201020
})
1021+
1022+
suite.test('pipeline mode - pg-cursor is rejected', (done) => {
1023+
let Cursor
1024+
try {
1025+
Cursor = require('pg-cursor')
1026+
} catch (e) {
1027+
console.log(' (skipped - pg-cursor not installed)')
1028+
return done()
1029+
}
1030+
1031+
const client = new Client({ pipelineMode: true })
1032+
client.connect((err) => {
1033+
if (err) return done(err)
1034+
1035+
const cursor = new Cursor('SELECT generate_series(1, 100) as num')
1036+
1037+
// Cursor should receive an error
1038+
cursor.on('error', (err) => {
1039+
assert.ok(err instanceof Error, 'Should receive an error')
1040+
assert.ok(err.message.includes('pipeline'), 'Error should mention pipeline mode')
1041+
client.end(done)
1042+
})
1043+
1044+
client.query(cursor)
1045+
})
1046+
})

0 commit comments

Comments
 (0)