Skip to content

Commit c110c70

Browse files
committed
fix: handle native websocket on node >=22
1 parent 688dd9c commit c110c70

6 files changed

Lines changed: 114 additions & 76 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
},
4545
"browser": {
4646
"net": false,
47-
"ws": false
47+
"./dist/src/connection/socket-factory.js": "./dist/src/connection/socket-factory-browser.js"
4848
},
4949
"devDependencies": {
5050
"@istanbuljs/nyc-config-typescript": "^1.0.1",

src/connection/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ export class Connection {
284284
* was initialised with.
285285
*/
286286
private createEndpoint (): void {
287-
this.endpoint = this.services.socketFactory(this.url, this.options.socketOptions, this.options.heartbeatInterval)
287+
this.endpoint = this.services.socketFactory(this.url, this.options.socketOptions, this.options.heartbeatInterval, this.services.logger)
288288

289289
this.endpoint.onopened = this.onOpen.bind(this)
290290
this.endpoint.onerror = this.onError.bind(this)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { JSONObject, TOPIC } from '../constants'
2+
import { wireSocket, hasMeaningfulSocketOptions, SocketFactory } from './socket-factory-shared'
3+
import { Logger } from '../util/logger'
4+
5+
export { SocketFactory } from './socket-factory-shared'
6+
7+
const NativeWS = (window as any).WebSocket || (window as any).MozWebSocket
8+
9+
let warned = false
10+
11+
export const socketFactory: SocketFactory = (url: string, options: JSONObject = { jsonTransportMode: false }, heartBeatInterval: number, logger?: Logger) => {
12+
if (!warned && hasMeaningfulSocketOptions(options)) {
13+
warned = true
14+
const msg = 'Browser WebSocket ignores socketOptions; use the Node build to honor them or provide a custom socketFactory in the client constructor options.'
15+
if (logger) {
16+
logger.warn({ topic: TOPIC.CONNECTION }, undefined, msg)
17+
}
18+
}
19+
return wireSocket(new NativeWS(url, []), options, heartBeatInterval, true)
20+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { parse } from '@deepstream/protobuf/dist/src/message-parser'
2+
import { getMessage } from '@deepstream/protobuf/dist/src/message-builder'
3+
import { Socket } from '../deepstream-client'
4+
import { JSONObject, TOPIC, Message, CONNECTION_ACTION } from '../constants'
5+
import { Logger } from '../util/logger'
6+
7+
export type SocketFactory = (url: string, options: JSONObject, heartBeatInterval: number, logger?: Logger) => Socket
8+
9+
export const wireSocket = (socket: any, options: JSONObject, heartBeatInterval: number, isNative: boolean): Socket => {
10+
if (isNative && options.jsonTransportMode !== true) {
11+
socket.binaryType = 'arraybuffer'
12+
}
13+
14+
const buildMessage = options.jsonTransportMode !== true ? getMessage : (message: Message, isAck: boolean) => JSON.stringify({ ...message, isAck })
15+
16+
const pingMessage = buildMessage({ topic: TOPIC.CONNECTION, action: CONNECTION_ACTION.PING }, false)
17+
let pingInterval: number | null = null
18+
let lastRecievedMessageTimestamp = -1
19+
20+
// tslint:disable-next-line:no-empty
21+
socket.onparsedmessage = () => {}
22+
socket.onmessage = (raw: { data: Buffer | ArrayBuffer | string }) => {
23+
lastRecievedMessageTimestamp = Date.now()
24+
let parseResults
25+
if (options.jsonTransportMode !== true) {
26+
parseResults = parse(isNative ? new Buffer(new Uint8Array(raw.data as ArrayBuffer)) : raw.data as Buffer)
27+
} else {
28+
parseResults = [JSON.parse(raw.data as string)]
29+
}
30+
socket.onparsedmessages(parseResults)
31+
}
32+
socket.getTimeSinceLastMessage = () => {
33+
if (lastRecievedMessageTimestamp < 0) return 0
34+
return Date.now() - lastRecievedMessageTimestamp
35+
}
36+
socket.sendParsedMessage = (message: Message): void => {
37+
if (message.topic === TOPIC.CONNECTION && message.action === CONNECTION_ACTION.CLOSING) {
38+
socket.onparsedmessages([{ topic: TOPIC.CONNECTION, action: CONNECTION_ACTION.CLOSED }])
39+
socket.close()
40+
return
41+
}
42+
if (message.parsedData) {
43+
message.data = JSON.stringify(message.parsedData)
44+
}
45+
if (message.data === undefined) {
46+
delete message.data
47+
}
48+
socket.send(buildMessage(message, false))
49+
}
50+
51+
socket.onclosed = null
52+
socket.onclose = () => {
53+
clearInterval(pingInterval!)
54+
socket.onclosed()
55+
}
56+
57+
socket.onopened = null
58+
socket.onopen = () => {
59+
pingInterval = setInterval(() => {
60+
try {
61+
socket.send(pingMessage)
62+
} catch (e) {
63+
clearTimeout(pingInterval!)
64+
}
65+
}, heartBeatInterval) as never as number
66+
socket.onopened()
67+
}
68+
69+
return socket
70+
}
71+
72+
export const hasMeaningfulSocketOptions = (options: JSONObject): boolean => {
73+
if (!options) return false
74+
for (const key of Object.keys(options)) {
75+
if (key !== 'jsonTransportMode') return true
76+
}
77+
return false
78+
}

src/connection/socket-factory.ts

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,14 @@
1-
import { parse } from '@deepstream/protobuf/dist/src/message-parser'
2-
import { getMessage } from '@deepstream/protobuf/dist/src/message-builder'
3-
import {Socket} from '../deepstream-client'
4-
import { JSONObject, TOPIC, Message, CONNECTION_ACTION } from '../constants'
1+
import { JSONObject } from '../constants'
2+
import { wireSocket, hasMeaningfulSocketOptions, SocketFactory } from './socket-factory-shared'
53

6-
const BrowserWebsocket = (global.WebSocket || global.MozWebSocket) as any
4+
export { SocketFactory } from './socket-factory-shared'
75

8-
export type SocketFactory = (url: string, options: JSONObject, heartBeatInterval: number) => Socket
6+
const NativeWS = (global as any).WebSocket as any
97

10-
export const socketFactory: SocketFactory = (url, options = { jsonTransportMode: false }, heartBeatInterval) => {
11-
const socket = BrowserWebsocket
12-
? new BrowserWebsocket(url, [], options)
13-
: new (require('ws'))(url, options) as any
14-
15-
if (BrowserWebsocket && options.jsonTransportMode !== true) {
16-
socket.binaryType = 'arraybuffer'
17-
}
18-
19-
const buildMessage = options.jsonTransportMode !== true ? getMessage : (message: Message, isAck: boolean) => JSON.stringify({ ...message, isAck })
20-
21-
const pingMessage = buildMessage({ topic: TOPIC.CONNECTION, action: CONNECTION_ACTION.PING }, false)
22-
let pingInterval: number | null = null
23-
let lastRecievedMessageTimestamp = -1
24-
25-
// tslint:disable-next-line:no-empty
26-
socket.onparsedmessage = () => {}
27-
socket.onmessage = (raw: {data: Buffer | string }) => {
28-
lastRecievedMessageTimestamp = Date.now()
29-
let parseResults
30-
if (options.jsonTransportMode !== true) {
31-
parseResults = parse(BrowserWebsocket ? new Buffer(new Uint8Array(raw.data as Buffer)) : raw.data as Buffer)
32-
} else {
33-
parseResults = [JSON.parse(raw.data as string)]
34-
}
35-
socket.onparsedmessages(parseResults)
36-
}
37-
socket.getTimeSinceLastMessage = () => {
38-
if (lastRecievedMessageTimestamp < 0) return 0
39-
return Date.now() - lastRecievedMessageTimestamp
8+
export const socketFactory: SocketFactory = (url: string, options: JSONObject = { jsonTransportMode: false }, heartBeatInterval: number) => {
9+
if (NativeWS && !hasMeaningfulSocketOptions(options)) {
10+
return wireSocket(new NativeWS(url, []), options, heartBeatInterval, true)
4011
}
41-
socket.sendParsedMessage = (message: Message): void => {
42-
if (message.topic === TOPIC.CONNECTION && message.action === CONNECTION_ACTION.CLOSING) {
43-
socket.onparsedmessages([{ topic: TOPIC.CONNECTION, action: CONNECTION_ACTION.CLOSED }])
44-
socket.close()
45-
return
46-
}
47-
if (message.parsedData) {
48-
message.data = JSON.stringify(message.parsedData)
49-
}
50-
// if (message.action !== CONNECTION_ACTIONS.PONG && message.action !== CONNECTION_ACTIONS.PING) {
51-
// console.log('>>>', TOPIC[message.topic], (ACTIONS as any)[message.topic][message.action], message.parsedData, message.data, message.name)
52-
// }
53-
if (message.data === undefined) {
54-
delete message.data
55-
}
56-
socket.send(buildMessage(message, false))
57-
}
58-
59-
socket.onclosed = null
60-
socket.onclose = () => {
61-
clearInterval(pingInterval!)
62-
socket.onclosed()
63-
}
64-
65-
socket.onopened = null
66-
socket.onopen = () => {
67-
pingInterval = setInterval(() => {
68-
try {
69-
socket.send(pingMessage)
70-
} catch (e) {
71-
clearTimeout(pingInterval!)
72-
}
73-
}, heartBeatInterval) as never as number
74-
socket.onopened()
75-
}
76-
77-
return socket
12+
const WS = require('ws')
13+
return wireSocket(new WS(url, options), options, heartBeatInterval, false)
7814
}

webpack.dev.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const path = require('path');
12
const webpack = require('webpack');
23

34
module.exports = {
@@ -12,6 +13,10 @@ module.exports = {
1213
},
1314
resolve: {
1415
extensions: ['.ts', '.js'],
16+
alias: {
17+
[path.resolve(__dirname, 'src/connection/socket-factory.ts')]:
18+
path.resolve(__dirname, 'src/connection/socket-factory-browser.ts')
19+
},
1520
fallback: {
1621
fs: false,
1722
module: false,
@@ -28,7 +33,6 @@ module.exports = {
2833
},
2934
plugins: [
3035
new webpack.IgnorePlugin({resourceRegExp: /url/}),
31-
new webpack.IgnorePlugin({resourceRegExp:/ws/}),
3236
new webpack.IgnorePlugin({resourceRegExp:/node-localstorage/}),
3337
new webpack.ProvidePlugin({Buffer: ['buffer', 'Buffer']})
3438
],

0 commit comments

Comments
 (0)