Skip to content

Commit 01e0556

Browse files
aleclarsonbrianc
andauthored
fix(pg-query-stream): invoke this.callback on cursor end/error (#2810)
* fix(pg-query-stream): invoke `this.callback` on cursor end/error Closes #2013 * fix(Client): respect `callback` argument for `Submittable` case * Add tests * Remove log * Fix lint --------- Co-authored-by: Brian Carlson <brian.m.carlson@gmail.com>
1 parent e6e3692 commit 01e0556

3 files changed

Lines changed: 31 additions & 2 deletions

File tree

packages/pg-query-stream/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class QueryStream extends Readable implements Submittable {
1313
cursor: any
1414
_result: any
1515

16+
callback: Function
1617
handleRowDescription: Function
1718
handleDataRow: Function
1819
handlePortalSuspended: Function
@@ -26,6 +27,13 @@ class QueryStream extends Readable implements Submittable {
2627

2728
super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark })
2829
this.cursor = new Cursor(text, values, config)
30+
this.cursor
31+
.on('end', (result) => {
32+
this.callback && this.callback(null, result)
33+
})
34+
.on('error', (err) => {
35+
this.callback && this.callback(err)
36+
})
2937

3038
// delegate Submittable callbacks to cursor
3139
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { Pool } from 'pg'
2+
import QueryStream from '../src'
3+
4+
describe('pool', function () {
5+
it('works', async function () {
6+
const pool = new Pool()
7+
const query = new QueryStream('SELECT * FROM generate_series(0, 10) num', [])
8+
const q = pool.query(query)
9+
query.on('data', (row) => {
10+
// just consume the whole stream
11+
})
12+
await q
13+
query.on('end', () => {
14+
pool.end()
15+
})
16+
})
17+
})

packages/pg/lib/client.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,12 @@ class Client extends EventEmitter {
609609
} else if (typeof config.submit === 'function') {
610610
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
611611
result = query = config
612-
if (typeof values === 'function') {
613-
query.callback = query.callback || values
612+
if (!query.callback) {
613+
if (typeof values === 'function') {
614+
query.callback = values
615+
} else if (callback) {
616+
query.callback = callback
617+
}
614618
}
615619
} else {
616620
readTimeout = config.query_timeout || this.connectionParameters.query_timeout

0 commit comments

Comments
 (0)