Skip to content

Commit 02f5146

Browse files
author
m.bronnikov
committed
feat: Add multihost support for native js driver
1 parent c78b302 commit 02f5146

File tree

4 files changed

+627
-53
lines changed

4 files changed

+627
-53
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ dist
1010
/.eslintcache
1111
.vscode/
1212
manually-test-on-heroku.js
13+
.history

packages/pg/lib/client.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Client extends EventEmitter {
8181
keepAlive: c.keepAlive || false,
8282
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
8383
encoding: this.connectionParameters.client_encoding || 'utf8',
84+
targetSessionAttrs: c.targetSessionAttrs || null,
8485
})
8586
this._queryQueue = []
8687
this.binary = c.binary || defaults.binary

packages/pg/lib/connection.js

Lines changed: 200 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ class Connection extends EventEmitter {
1515
super()
1616
config = config || {}
1717

18-
this.stream = config.stream || getStream(config.ssl)
19-
if (typeof this.stream === 'function') {
20-
this.stream = this.stream(config)
18+
if (typeof config.stream === 'function') {
19+
this._streamFactory = config.stream
20+
this._config = config
21+
this.stream = config.stream(config)
22+
} else {
23+
this._streamFactory = null
24+
this._config = null
25+
this.stream = config.stream || getStream(config.ssl)
2126
}
2227

2328
this._keepAlive = config.keepAlive
@@ -26,6 +31,7 @@ class Connection extends EventEmitter {
2631
this.ssl = config.ssl || false
2732
this._ending = false
2833
this._emitMessage = false
34+
this._targetSessionAttrs = config.targetSessionAttrs || null
2935
const self = this
3036
this.on('newListener', function (eventName) {
3137
if (eventName === 'message') {
@@ -34,76 +40,200 @@ class Connection extends EventEmitter {
3440
})
3541
}
3642

43+
_newStream() {
44+
return this._streamFactory ? this._streamFactory(this._config) : getStream(this.ssl)
45+
}
46+
3747
connect(port, host) {
3848
const self = this
3949

50+
const hosts = Array.isArray(host) ? host : [host]
51+
const ports = Array.isArray(port) ? port : [port]
52+
let hostIndex = 0
53+
4054
this._connecting = true
41-
this.stream.setNoDelay(true)
42-
this.stream.connect(port, host)
4355

44-
this.stream.once('connect', function () {
45-
if (self._keepAlive) {
46-
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
56+
const targetAttrs = this._targetSessionAttrs
57+
58+
if (targetAttrs && targetAttrs !== 'any') {
59+
let backendParams = {}
60+
let fetchingState = false
61+
let fetchStateRows = []
62+
let fetchStateError = false
63+
64+
const origEmit = EventEmitter.prototype.emit.bind(self)
65+
66+
const tryNextOrFail = () => {
67+
backendParams = {}
68+
fetchingState = false
69+
fetchStateRows = []
70+
fetchStateError = false
71+
if (hostIndex + 1 < hosts.length) {
72+
hostIndex++
73+
74+
self.stream.removeAllListeners()
75+
self.stream.destroy()
76+
self.stream = self._newStream()
77+
attemptConnect()
78+
} else {
79+
self.emit = origEmit
80+
origEmit('error', new Error(`None of the hosts satisfy the target_session_attrs requirement: ${targetAttrs}`))
81+
}
4782
}
48-
self.emit('connect')
49-
})
5083

51-
const reportStreamError = function (error) {
52-
// errors about disconnections should be ignored during disconnect
53-
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
54-
return
84+
self.emit = function (eventName, ...args) {
85+
if (eventName === 'parameterStatus') {
86+
const msg = args[0]
87+
if (msg) backendParams[msg.parameterName] = msg.parameterValue
88+
return origEmit(eventName, ...args)
89+
}
90+
91+
if (fetchingState) {
92+
if (eventName === 'dataRow') {
93+
fetchStateRows.push(args[0])
94+
return
95+
}
96+
if (eventName === 'rowDescription' || eventName === 'commandComplete') {
97+
return
98+
}
99+
if (eventName === 'errorMessage') {
100+
fetchStateError = true
101+
return
102+
}
103+
if (eventName === 'readyForQuery') {
104+
fetchingState = false
105+
if (!fetchStateError && fetchStateRows.length >= 2) {
106+
const txReadOnly = fetchStateRows[0].fields[0]?.toString('utf8') ?? null
107+
const isRecovery = fetchStateRows[1].fields[0]?.toString('utf8') ?? null
108+
if (txReadOnly !== null) backendParams.default_transaction_read_only = txReadOnly
109+
if (isRecovery !== null) backendParams.in_hot_standby = isRecovery === 't' ? 'on' : 'off'
110+
}
111+
fetchStateRows = []
112+
fetchStateError = false
113+
if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
114+
tryNextOrFail()
115+
} else {
116+
self.emit = origEmit
117+
origEmit('readyForQuery', args[0])
118+
}
119+
return
120+
}
121+
}
122+
123+
if (eventName === 'readyForQuery') {
124+
if (!backendParams.in_hot_standby || !backendParams.default_transaction_read_only) {
125+
fetchingState = true
126+
fetchStateRows = []
127+
self.query('SHOW transaction_read_only; SELECT pg_catalog.pg_is_in_recovery()')
128+
return
129+
}
130+
if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
131+
tryNextOrFail()
132+
return
133+
}
134+
self.emit = origEmit
135+
return origEmit('readyForQuery', args[0])
136+
}
137+
138+
return origEmit(eventName, ...args)
55139
}
56-
self.emit('error', error)
57140
}
58-
this.stream.on('error', reportStreamError)
59141

60-
this.stream.on('close', function () {
61-
self.emit('end')
62-
})
142+
const attemptConnect = () => {
143+
const currentHost = hosts[hostIndex]
144+
const currentPort = ports[Math.min(hostIndex, ports.length - 1)]
145+
let connected = false
63146

64-
if (!this.ssl) {
65-
return this.attachListeners(this.stream)
66-
}
147+
self.stream.setNoDelay(true)
148+
self.stream.connect(currentPort, currentHost)
67149

68-
this.stream.once('data', function (buffer) {
69-
const responseCode = buffer.toString('utf8')
70-
switch (responseCode) {
71-
case 'S': // Server supports SSL connections, continue with a secure connection
72-
break
73-
case 'N': // Server does not support SSL connections
74-
self.stream.end()
75-
return self.emit('error', new Error('The server does not support SSL connections'))
76-
default:
77-
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
78-
self.stream.end()
79-
return self.emit('error', new Error('There was an error establishing an SSL connection'))
80-
}
81-
const options = {
82-
socket: self.stream,
83-
}
150+
self.stream.once('connect', function () {
151+
connected = true
152+
if (self._keepAlive) {
153+
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
154+
}
155+
self.emit('connect')
156+
})
84157

85-
if (self.ssl !== true) {
86-
Object.assign(options, self.ssl)
158+
const reportStreamError = function (error) {
159+
// errors about disconnections should be ignored during disconnect
160+
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
161+
return
162+
}
87163

88-
if ('key' in self.ssl) {
89-
options.key = self.ssl.key
164+
if (!connected && hostIndex + 1 < hosts.length) {
165+
hostIndex++
166+
self.stream.removeAllListeners()
167+
self.stream.destroy()
168+
self.stream = self._newStream()
169+
attemptConnect()
170+
return
90171
}
172+
self.emit('error', error)
91173
}
92174

93-
const net = require('net')
94-
if (net.isIP && net.isIP(host) === 0) {
95-
options.servername = host
175+
self.stream.on('error', reportStreamError)
176+
177+
const onClose = function () {
178+
self.emit('end')
96179
}
97-
try {
98-
self.stream = getSecureStream(options)
99-
} catch (err) {
100-
return self.emit('error', err)
180+
self.stream.on('close', onClose)
181+
182+
if (!self.ssl) {
183+
return self.attachListeners(self.stream)
101184
}
102-
self.attachListeners(self.stream)
103-
self.stream.on('error', reportStreamError)
104185

105-
self.emit('sslconnect')
106-
})
186+
self.stream.once('data', function (buffer) {
187+
const responseCode = buffer.toString('utf8')
188+
switch (responseCode) {
189+
case 'S': // Server supports SSL connections, continue with a secure connection
190+
break
191+
case 'N': // Server does not support SSL connections
192+
self.stream.end()
193+
return self.emit('error', new Error('The server does not support SSL connections'))
194+
default:
195+
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
196+
self.stream.end()
197+
return self.emit('error', new Error('There was an error establishing an SSL connection'))
198+
}
199+
const options = {
200+
socket: self.stream,
201+
}
202+
203+
if (self.ssl !== true) {
204+
Object.assign(options, self.ssl)
205+
206+
if ('key' in self.ssl) {
207+
options.key = self.ssl.key
208+
}
209+
}
210+
211+
const net = require('net')
212+
if (net.isIP && net.isIP(currentHost) === 0) {
213+
options.servername = currentHost
214+
}
215+
216+
// Remove the close listener from the TCP socket before upgrading to TLS.
217+
// Without this, destroying the TLS stream (during host failover) closes the
218+
// underlying TCP socket, which fires 'close' → 'end' even though we are
219+
// still mid-connection to the next host.
220+
const tcpStream = self.stream
221+
tcpStream.removeListener('close', onClose)
222+
tcpStream.removeListener('error', reportStreamError)
223+
try {
224+
self.stream = getSecureStream(options)
225+
} catch (err) {
226+
return self.emit('error', err)
227+
}
228+
self.attachListeners(self.stream)
229+
self.stream.on('error', reportStreamError)
230+
self.stream.on('close', onClose)
231+
232+
self.emit('sslconnect')
233+
})
234+
}
235+
236+
attemptConnect()
107237
}
108238

109239
attachListeners(stream) {
@@ -218,4 +348,21 @@ class Connection extends EventEmitter {
218348
}
219349
}
220350

351+
function notHostMatchTargetSessionAttrs(targetAttrs, params, hostIndex, hosts) {
352+
switch (targetAttrs) {
353+
case 'read-write':
354+
return params.in_hot_standby === 'on' || params.default_transaction_read_only === 'on'
355+
case 'read-only':
356+
return params.in_hot_standby !== 'on' && params.default_transaction_read_only !== 'on'
357+
case 'primary':
358+
return params.in_hot_standby === 'on'
359+
case 'standby':
360+
return params.in_hot_standby === 'off'
361+
case 'prefer-standby':
362+
return params.in_hot_standby === 'off' && hostIndex + 1 < hosts.length
363+
default:
364+
return false
365+
}
366+
}
367+
221368
module.exports = Connection

0 commit comments

Comments
 (0)