Skip to content

Commit ffce1e3

Browse files
author
m.bronnikov
committed
feat: Add multihost support for native js driver
1 parent c78b302 commit ffce1e3

File tree

4 files changed

+600
-53
lines changed

4 files changed

+600
-53
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ dist
1010
/.eslintcache
1111
.vscode/
1212
manually-test-on-heroku.js
13+
.history

packages/pg/lib/client.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Client extends EventEmitter {
8181
keepAlive: c.keepAlive || false,
8282
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
8383
encoding: this.connectionParameters.client_encoding || 'utf8',
84+
targetSessionAttrs: c.targetSessionAttrs || null,
8485
})
8586
this._queryQueue = []
8687
this.binary = c.binary || defaults.binary

packages/pg/lib/connection.js

Lines changed: 198 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ class Connection extends EventEmitter {
1515
super()
1616
config = config || {}
1717

18-
this.stream = config.stream || getStream(config.ssl)
19-
if (typeof this.stream === 'function') {
20-
this.stream = this.stream(config)
18+
if (typeof config.stream === 'function') {
19+
this._streamFactory = config.stream
20+
this.stream = config.stream(config)
21+
} else {
22+
this._streamFactory = null
23+
this.stream = config.stream || getStream(config.ssl)
2124
}
2225

2326
this._keepAlive = config.keepAlive
@@ -26,6 +29,7 @@ class Connection extends EventEmitter {
2629
this.ssl = config.ssl || false
2730
this._ending = false
2831
this._emitMessage = false
32+
this._targetSessionAttrs = config.targetSessionAttrs || null
2933
const self = this
3034
this.on('newListener', function (eventName) {
3135
if (eventName === 'message') {
@@ -34,76 +38,200 @@ class Connection extends EventEmitter {
3438
})
3539
}
3640

41+
_newStream() {
42+
return this._streamFactory ? this._streamFactory({}) : getStream(this.ssl)
43+
}
44+
3745
connect(port, host) {
3846
const self = this
3947

48+
const hosts = Array.isArray(host) ? host : [host]
49+
const ports = Array.isArray(port) ? port : [port]
50+
let hostIndex = 0
51+
4052
this._connecting = true
41-
this.stream.setNoDelay(true)
42-
this.stream.connect(port, host)
4353

44-
this.stream.once('connect', function () {
45-
if (self._keepAlive) {
46-
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
54+
const targetAttrs = this._targetSessionAttrs
55+
56+
if (targetAttrs && targetAttrs !== 'any') {
57+
let backendParams = {}
58+
let fetchingState = false
59+
let fetchStateRows = []
60+
let fetchStateError = false
61+
62+
const origEmit = EventEmitter.prototype.emit.bind(self)
63+
64+
const tryNextOrFail = () => {
65+
backendParams = {}
66+
fetchingState = false
67+
fetchStateRows = []
68+
fetchStateError = false
69+
if (hostIndex + 1 < hosts.length) {
70+
hostIndex++
71+
72+
self.stream.removeAllListeners()
73+
self.stream.destroy()
74+
self.stream = self._newStream()
75+
attemptConnect()
76+
} else {
77+
self.emit = origEmit
78+
origEmit('error', new Error(`None of the hosts satisfy the target_session_attrs requirement: ${targetAttrs}`))
79+
}
4780
}
48-
self.emit('connect')
49-
})
5081

51-
const reportStreamError = function (error) {
52-
// errors about disconnections should be ignored during disconnect
53-
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
54-
return
82+
self.emit = function (eventName, ...args) {
83+
if (eventName === 'parameterStatus') {
84+
const msg = args[0]
85+
if (msg) backendParams[msg.parameterName] = msg.parameterValue
86+
return origEmit(eventName, ...args)
87+
}
88+
89+
if (fetchingState) {
90+
if (eventName === 'dataRow') {
91+
fetchStateRows.push(args[0])
92+
return
93+
}
94+
if (eventName === 'rowDescription' || eventName === 'commandComplete') {
95+
return
96+
}
97+
if (eventName === 'errorMessage') {
98+
fetchStateError = true
99+
return
100+
}
101+
if (eventName === 'readyForQuery') {
102+
fetchingState = false
103+
if (!fetchStateError && fetchStateRows.length >= 2) {
104+
const txReadOnly = fetchStateRows[0].fields[0]?.toString('utf8') ?? null
105+
const isRecovery = fetchStateRows[1].fields[0]?.toString('utf8') ?? null
106+
if (txReadOnly !== null) backendParams.default_transaction_read_only = txReadOnly
107+
if (isRecovery !== null) backendParams.in_hot_standby = isRecovery === 't' ? 'on' : 'off'
108+
}
109+
fetchStateRows = []
110+
fetchStateError = false
111+
if (isHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
112+
tryNextOrFail()
113+
} else {
114+
self.emit = origEmit
115+
origEmit('readyForQuery', args[0])
116+
}
117+
return
118+
}
119+
}
120+
121+
if (eventName === 'readyForQuery') {
122+
if (!backendParams.in_hot_standby || !backendParams.default_transaction_read_only) {
123+
fetchingState = true
124+
fetchStateRows = []
125+
self.query('SHOW transaction_read_only; SELECT pg_catalog.pg_is_in_recovery()')
126+
return
127+
}
128+
if (isHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
129+
tryNextOrFail()
130+
return
131+
}
132+
self.emit = origEmit
133+
return origEmit('readyForQuery', args[0])
134+
}
135+
136+
return origEmit(eventName, ...args)
55137
}
56-
self.emit('error', error)
57138
}
58-
this.stream.on('error', reportStreamError)
59139

60-
this.stream.on('close', function () {
61-
self.emit('end')
62-
})
140+
const attemptConnect = () => {
141+
const currentHost = hosts[hostIndex]
142+
const currentPort = ports[Math.min(hostIndex, ports.length - 1)]
143+
let connected = false
63144

64-
if (!this.ssl) {
65-
return this.attachListeners(this.stream)
66-
}
145+
self.stream.setNoDelay(true)
146+
self.stream.connect(currentPort, currentHost)
67147

68-
this.stream.once('data', function (buffer) {
69-
const responseCode = buffer.toString('utf8')
70-
switch (responseCode) {
71-
case 'S': // Server supports SSL connections, continue with a secure connection
72-
break
73-
case 'N': // Server does not support SSL connections
74-
self.stream.end()
75-
return self.emit('error', new Error('The server does not support SSL connections'))
76-
default:
77-
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
78-
self.stream.end()
79-
return self.emit('error', new Error('There was an error establishing an SSL connection'))
80-
}
81-
const options = {
82-
socket: self.stream,
83-
}
148+
self.stream.once('connect', function () {
149+
connected = true
150+
if (self._keepAlive) {
151+
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
152+
}
153+
self.emit('connect')
154+
})
84155

85-
if (self.ssl !== true) {
86-
Object.assign(options, self.ssl)
156+
const reportStreamError = function (error) {
157+
// errors about disconnections should be ignored during disconnect
158+
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
159+
return
160+
}
87161

88-
if ('key' in self.ssl) {
89-
options.key = self.ssl.key
162+
if (!connected && hostIndex + 1 < hosts.length) {
163+
hostIndex++
164+
self.stream.removeAllListeners()
165+
self.stream.destroy()
166+
self.stream = self._newStream()
167+
attemptConnect()
168+
return
90169
}
170+
self.emit('error', error)
91171
}
92172

93-
const net = require('net')
94-
if (net.isIP && net.isIP(host) === 0) {
95-
options.servername = host
173+
self.stream.on('error', reportStreamError)
174+
175+
const onClose = function () {
176+
self.emit('end')
96177
}
97-
try {
98-
self.stream = getSecureStream(options)
99-
} catch (err) {
100-
return self.emit('error', err)
178+
self.stream.on('close', onClose)
179+
180+
if (!self.ssl) {
181+
return self.attachListeners(self.stream)
101182
}
102-
self.attachListeners(self.stream)
103-
self.stream.on('error', reportStreamError)
104183

105-
self.emit('sslconnect')
106-
})
184+
self.stream.once('data', function (buffer) {
185+
const responseCode = buffer.toString('utf8')
186+
switch (responseCode) {
187+
case 'S': // Server supports SSL connections, continue with a secure connection
188+
break
189+
case 'N': // Server does not support SSL connections
190+
self.stream.end()
191+
return self.emit('error', new Error('The server does not support SSL connections'))
192+
default:
193+
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
194+
self.stream.end()
195+
return self.emit('error', new Error('There was an error establishing an SSL connection'))
196+
}
197+
const options = {
198+
socket: self.stream,
199+
}
200+
201+
if (self.ssl !== true) {
202+
Object.assign(options, self.ssl)
203+
204+
if ('key' in self.ssl) {
205+
options.key = self.ssl.key
206+
}
207+
}
208+
209+
const net = require('net')
210+
if (net.isIP && net.isIP(currentHost) === 0) {
211+
options.servername = currentHost
212+
}
213+
214+
// Remove the close listener from the TCP socket before upgrading to TLS.
215+
// Without this, destroying the TLS stream (during host failover) closes the
216+
// underlying TCP socket, which fires 'close' → 'end' even though we are
217+
// still mid-connection to the next host.
218+
const tcpStream = self.stream
219+
tcpStream.removeListener('close', onClose)
220+
tcpStream.removeListener('error', reportStreamError)
221+
try {
222+
self.stream = getSecureStream(options)
223+
} catch (err) {
224+
return self.emit('error', err)
225+
}
226+
self.attachListeners(self.stream)
227+
self.stream.on('error', reportStreamError)
228+
self.stream.on('close', onClose)
229+
230+
self.emit('sslconnect')
231+
})
232+
}
233+
234+
attemptConnect()
107235
}
108236

109237
attachListeners(stream) {
@@ -218,4 +346,21 @@ class Connection extends EventEmitter {
218346
}
219347
}
220348

349+
function isHostMatchTargetSessionAttrs(targetAttrs, params, hostIndex, hosts) {
350+
switch (targetAttrs) {
351+
case 'read-write':
352+
return params.in_hot_standby === 'on' || params.default_transaction_read_only === 'on'
353+
case 'read-only':
354+
return params.in_hot_standby !== 'on' && params.default_transaction_read_only !== 'on'
355+
case 'primary':
356+
return params.in_hot_standby === 'on'
357+
case 'standby':
358+
return params.in_hot_standby === 'off'
359+
case 'prefer-standby':
360+
return params.in_hot_standby === 'off' && hostIndex + 1 < hosts.length
361+
default:
362+
return false
363+
}
364+
}
365+
221366
module.exports = Connection

0 commit comments

Comments
 (0)