Skip to content

Commit 1780d1f

Browse files
author
Simone Nigro
committed
fix(client): improve pipeline mode query completion tracking
1 parent 6e2f146 commit 1780d1f

2 files changed

Lines changed: 218 additions & 5 deletions

File tree

packages/pg/lib/client.js

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,10 @@ class Client extends EventEmitter {
470470
this._handleErrorEvent(error)
471471
return
472472
}
473-
// Mark that this query has started receiving results (for pipeline mode)
474-
activeQuery._gotRowDescription = true
473+
// Mark that this query has started receiving results (for pipeline mode completion tracking)
474+
if (this._pipelineMode) {
475+
activeQuery._gotRowDescription = true
476+
}
475477
// delegate rowDescription to active query
476478
activeQuery.handleRowDescription(msg)
477479
}
@@ -499,12 +501,16 @@ class Client extends EventEmitter {
499501
}
500502

501503
_handleEmptyQuery(msg) {
502-
const activeQuery = this._getActiveQuery()
504+
const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery()
503505
if (activeQuery == null) {
504506
const error = new Error('Received unexpected emptyQuery message from backend.')
505507
this._handleErrorEvent(error)
506508
return
507509
}
510+
// Mark that this query has completed (for pipeline mode completion tracking)
511+
if (this._pipelineMode) {
512+
activeQuery._gotCommandComplete = true
513+
}
508514
// delegate emptyQuery to active query
509515
activeQuery.handleEmptyQuery(this.connection)
510516
}
@@ -516,8 +522,10 @@ class Client extends EventEmitter {
516522
this._handleErrorEvent(error)
517523
return
518524
}
519-
// Mark that this query has completed (for pipeline mode)
520-
activeQuery._gotCommandComplete = true
525+
// Mark that this query has completed (for pipeline mode completion tracking)
526+
if (this._pipelineMode) {
527+
activeQuery._gotCommandComplete = true
528+
}
521529
// delegate commandComplete to active query
522530
activeQuery.handleCommandComplete(msg, this.connection)
523531
}
@@ -811,6 +819,14 @@ class Client extends EventEmitter {
811819
this._queryQueue.splice(index, 1)
812820
}
813821

822+
// In pipeline mode, also remove from pending queries
823+
if (this._pipelineMode) {
824+
const pendingIndex = this._pendingQueries.indexOf(query)
825+
if (pendingIndex > -1) {
826+
this._pendingQueries.splice(pendingIndex, 1)
827+
}
828+
}
829+
814830
this._pulseQueryQueue()
815831
}, readTimeout)
816832

packages/pg/test/integration/client/pipeline-mode-tests.js

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,3 +821,200 @@ suite.test('pipeline mode - COPY operations are rejected', (done) => {
821821
})
822822
})
823823
})
824+
825+
suite.test('pipeline mode - empty query handling', (done) => {
826+
const client = new Client({ pipelineMode: true })
827+
client.connect((err) => {
828+
if (err) return done(err)
829+
830+
// Empty query should be handled correctly
831+
client
832+
.query('')
833+
.then((r) => {
834+
// Empty query returns empty result
835+
assert.equal(r.rows.length, 0, 'Empty query should return empty rows')
836+
client.end(done)
837+
})
838+
.catch((err) => {
839+
// Some versions may error on empty query - that's also acceptable
840+
client.end(done)
841+
})
842+
})
843+
})
844+
845+
suite.test('pipeline mode - LISTEN/NOTIFY', (done) => {
846+
const client = new Client({ pipelineMode: true })
847+
client.connect((err) => {
848+
if (err) return done(err)
849+
850+
let notificationReceived = false
851+
852+
client.on('notification', (msg) => {
853+
assert.equal(msg.channel, 'test_channel', 'Should receive notification on correct channel')
854+
assert.equal(msg.payload, 'test_payload', 'Should receive correct payload')
855+
notificationReceived = true
856+
})
857+
858+
client
859+
.query('LISTEN test_channel')
860+
.then(() => client.query("NOTIFY test_channel, 'test_payload'"))
861+
.then(() => {
862+
// Give time for notification to arrive
863+
setTimeout(() => {
864+
assert.ok(notificationReceived, 'Should have received notification')
865+
client.end(done)
866+
}, 100)
867+
})
868+
.catch((err) => {
869+
client.end(() => done(err))
870+
})
871+
})
872+
})
873+
874+
suite.test('pipeline mode - binary mode works', (done) => {
875+
const client = new Client({ pipelineMode: true, binary: true })
876+
client.connect((err) => {
877+
if (err) return done(err)
878+
879+
Promise.all([
880+
client.query('SELECT 1::int4 as num'),
881+
client.query('SELECT 2::int4 as num'),
882+
client.query('SELECT 3::int4 as num'),
883+
])
884+
.then((results) => {
885+
// In binary mode, integers come back as numbers not strings
886+
assert.equal(results[0].rows[0].num, 1)
887+
assert.equal(results[1].rows[0].num, 2)
888+
assert.equal(results[2].rows[0].num, 3)
889+
client.end(done)
890+
})
891+
.catch((err) => {
892+
client.end(() => done(err))
893+
})
894+
})
895+
})
896+
897+
suite.test('pipeline mode - query timeout triggers error', (done) => {
898+
const client = new Client({ pipelineMode: true })
899+
client.connect((err) => {
900+
if (err) return done(err)
901+
902+
// Suppress error event from forced disconnect
903+
client.on('error', () => {})
904+
905+
// Query with very short timeout
906+
const slowQuery = client.query({
907+
text: 'SELECT pg_sleep(10)',
908+
query_timeout: 100, // 100ms timeout
909+
})
910+
911+
slowQuery
912+
.then(() => {
913+
client.connection.stream.destroy()
914+
done(new Error('Query should have timed out'))
915+
})
916+
.catch((err) => {
917+
assert.ok(err.message.includes('timeout'), 'Should be a timeout error')
918+
// Note: In pipeline mode, after a timeout the connection state may be inconsistent
919+
// because PostgreSQL continues processing the query. Force close the connection.
920+
client.connection.stream.destroy()
921+
// Wait for connection to fully close before completing test
922+
client.once('end', () => done())
923+
})
924+
})
925+
})
926+
927+
suite.test('pipeline mode - queries before connect are queued', (done) => {
928+
const client = new Client({ pipelineMode: true })
929+
930+
// Queue queries BEFORE connecting
931+
const p1 = client.query('SELECT 1 as num')
932+
const p2 = client.query('SELECT 2 as num')
933+
const p3 = client.query('SELECT 3 as num')
934+
935+
// Now connect
936+
client.connect((err) => {
937+
if (err) return done(err)
938+
939+
Promise.all([p1, p2, p3])
940+
.then((results) => {
941+
assert.equal(results[0].rows[0].num, '1')
942+
assert.equal(results[1].rows[0].num, '2')
943+
assert.equal(results[2].rows[0].num, '3')
944+
client.end(done)
945+
})
946+
.catch((err) => {
947+
client.end(() => done(err))
948+
})
949+
})
950+
})
951+
952+
suite.test('pipeline mode - notice messages are emitted', (done) => {
953+
const client = new Client({ pipelineMode: true })
954+
client.connect((err) => {
955+
if (err) return done(err)
956+
957+
let noticeReceived = false
958+
959+
client.on('notice', (msg) => {
960+
noticeReceived = true
961+
})
962+
963+
// Create a function that raises a notice
964+
client
965+
.query(
966+
`
967+
DO $$
968+
BEGIN
969+
RAISE NOTICE 'Test notice from pipeline mode';
970+
END $$;
971+
`
972+
)
973+
.then(() => {
974+
assert.ok(noticeReceived, 'Should have received notice')
975+
client.end(done)
976+
})
977+
.catch((err) => {
978+
client.end(() => done(err))
979+
})
980+
})
981+
})
982+
983+
suite.test('pipeline mode - custom type parsers work', (done) => {
984+
const client = new Client({ pipelineMode: true })
985+
986+
// Set custom type parser for int4 (OID 23)
987+
client.setTypeParser(23, (val) => parseInt(val, 10) * 2)
988+
989+
client.connect((err) => {
990+
if (err) return done(err)
991+
992+
client
993+
.query('SELECT 5::int4 as num')
994+
.then((r) => {
995+
assert.equal(r.rows[0].num, 10, 'Custom type parser should double the value')
996+
client.end(done)
997+
})
998+
.catch((err) => {
999+
client.end(() => done(err))
1000+
})
1001+
})
1002+
})
1003+
1004+
suite.test('pipeline mode - rowMode array works', (done) => {
1005+
const client = new Client({ pipelineMode: true })
1006+
client.connect((err) => {
1007+
if (err) return done(err)
1008+
1009+
client
1010+
.query({ text: 'SELECT 1 as a, 2 as b, 3 as c', rowMode: 'array' })
1011+
.then((r) => {
1012+
assert.ok(Array.isArray(r.rows[0]), 'Row should be an array')
1013+
assert.deepEqual(r.rows[0], ['1', '2', '3'], 'Should have correct values')
1014+
client.end(done)
1015+
})
1016+
.catch((err) => {
1017+
client.end(() => done(err))
1018+
})
1019+
})
1020+
})

0 commit comments

Comments
 (0)