Skip to content

Commit 1c6ab8c

Browse files
author
Simone Nigro
committed
perf(client): batch pipeline queries with single cork/uncork
1 parent 7904fea commit 1c6ab8c

1 file changed

Lines changed: 52 additions & 52 deletions

File tree

packages/pg/lib/client.js

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -656,10 +656,19 @@ class Client extends EventEmitter {
656656
return
657657
}
658658
// In pipeline mode, send all queued queries immediately
659-
while (this._queryQueue.length > 0) {
660-
const query = this._queryQueue.shift()
661-
this._pendingQueries.push(query)
662-
this._submitPipelineQuery(query)
659+
// Cork the stream once for all queries to avoid unnecessary cork/uncork per query
660+
if (this._queryQueue.length > 0) {
661+
const connection = this.connection
662+
connection.stream.cork && connection.stream.cork()
663+
try {
664+
while (this._queryQueue.length > 0) {
665+
const query = this._queryQueue.shift()
666+
this._pendingQueries.push(query)
667+
this._submitPipelineQuery(query)
668+
}
669+
} finally {
670+
connection.stream.uncork && connection.stream.uncork()
671+
}
663672
}
664673
} else {
665674
// Existing non-pipeline behavior
@@ -688,62 +697,53 @@ class Client extends EventEmitter {
688697

689698
// Submit a query using the Extended Query Protocol for pipeline mode
690699
// Sends Parse/Bind/Describe/Execute/Sync for each query
700+
// Note: cork/uncork is handled by the caller (_pulseQueryQueue) for batching
691701
_submitPipelineQuery(query) {
692702
const connection = this.connection
693703

694-
// Use cork/uncork to buffer messages before sending them all at once
695-
// This optimizes network performance by avoiding multiple small writes
696-
connection.stream.cork && connection.stream.cork()
697-
try {
698-
// Parse - only if the statement hasn't been parsed before (for named statements)
699-
// In pipeline mode, we also track "in-flight" prepared statements to avoid
700-
// sending duplicate Parse commands for the same named statement
701-
const needsParse = query.name && !query.hasBeenParsed(connection) && !this._pendingParsedStatements[query.name]
702-
703-
if (!query.name || needsParse) {
704-
// For unnamed queries, always parse
705-
// For named queries, only parse if not already parsed or in-flight
706-
if (query.name) {
707-
// Track this statement as "in-flight" to prevent duplicate Parse commands
708-
this._pendingParsedStatements[query.name] = query.text
709-
}
710-
connection.parse({
711-
text: query.text,
712-
name: query.name,
713-
types: query.types,
714-
})
704+
// Parse - only if the statement hasn't been parsed before (for named statements)
705+
// In pipeline mode, we also track "in-flight" prepared statements to avoid
706+
// sending duplicate Parse commands for the same named statement
707+
const needsParse = query.name && !query.hasBeenParsed(connection) && !this._pendingParsedStatements[query.name]
708+
709+
if (!query.name || needsParse) {
710+
// For unnamed queries, always parse
711+
// For named queries, only parse if not already parsed or in-flight
712+
if (query.name) {
713+
// Track this statement as "in-flight" to prevent duplicate Parse commands
714+
this._pendingParsedStatements[query.name] = query.text
715715
}
716-
717-
// Bind - map user values to postgres wire protocol compatible values
718-
// This could throw an exception, so we handle it in the try block
719-
connection.bind({
720-
portal: query.portal,
721-
statement: query.name,
722-
values: query.values,
723-
binary: query.binary,
724-
valueMapper: utils.prepareValue,
716+
connection.parse({
717+
text: query.text,
718+
name: query.name,
719+
types: query.types,
725720
})
721+
}
726722

727-
// Describe - request description of the portal
728-
connection.describe({
729-
type: 'P',
730-
name: query.portal || '',
731-
})
723+
// Bind - map user values to postgres wire protocol compatible values
724+
connection.bind({
725+
portal: query.portal,
726+
statement: query.name,
727+
values: query.values,
728+
binary: query.binary,
729+
valueMapper: utils.prepareValue,
730+
})
732731

733-
// Execute - execute the query
734-
connection.execute({
735-
portal: query.portal,
736-
rows: query.rows,
737-
})
732+
// Describe - request description of the portal
733+
connection.describe({
734+
type: 'P',
735+
name: query.portal || '',
736+
})
738737

739-
// Sync - establishes synchronization point
740-
// This tells the server to process all messages up to this point
741-
connection.sync()
742-
} finally {
743-
// Always uncork the stream, even if an error occurs
744-
// This prevents the client from becoming unresponsive
745-
connection.stream.uncork && connection.stream.uncork()
746-
}
738+
// Execute - execute the query
739+
connection.execute({
740+
portal: query.portal,
741+
rows: query.rows,
742+
})
743+
744+
// Sync - establishes synchronization point
745+
// This tells the server to process all messages up to this point
746+
connection.sync()
747747
}
748748

749749
// Returns the current query being processed in pipeline mode

0 commit comments

Comments
 (0)