Skip to content

Commit dae947f

Browse files
authored
Add ws heartbeat (#642)
* Add ws heartbeat * Address comments
1 parent 9e39280 commit dae947f

5 files changed

Lines changed: 284 additions & 7 deletions

File tree

docs/components/transport-types/websocket-transport.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,55 @@ In the example above, after the connection is established a custom authenticatio
109109

110110
As shown in the first example, **builders** object contains two methods, **subscribeMessage** and **unsubscribeMessage**. These methods can be provided for the WS transport to use to send subscription/unsubscription messages to Data Provider. Both accept _params_ which is the current input parameters of the request and should return object or string as payload that will be sent to Data Provider. If the payload is object it will be automatically stringified to JSON.
111111

112+
### Heartbeat messages
113+
114+
Some WebSocket providers require periodic heartbeat messages to keep the connection alive. The `WebSocketTransport` supports sending heartbeat messages automatically at a configurable interval.
115+
116+
To enable heartbeat functionality, provide a `heartbeat` handler in the `handlers` object. The heartbeat will automatically start when the connection is opened and stop when the connection is closed.
117+
118+
The `heartbeat` handler receives the WebSocket connection and adapter context, allowing you to implement any heartbeat logic you need. You can use WebSocket protocol-level ping, send custom messages, or perform any other heartbeat-related operations.
119+
120+
**Using WebSocket protocol-level ping:**
121+
122+
```typescript
123+
handlers: {
124+
heartbeat: (connection) => {
125+
connection.ping()
126+
},
127+
}
128+
```
129+
130+
**Using custom heartbeat message:**
131+
132+
```typescript
133+
handlers: {
134+
heartbeat: (connection, context) => {
135+
connection.send(JSON.stringify({
136+
type: 'ping',
137+
timestamp: Date.now(),
138+
}))
139+
},
140+
}
141+
```
142+
143+
**Using ping with optional data:**
144+
145+
```typescript
146+
handlers: {
147+
heartbeat: (connection) => {
148+
connection.ping('heartbeat-data')
149+
},
150+
}
151+
```
152+
153+
The heartbeat interval is controlled by the `WS_HEARTBEAT_INTERVAL_MS` adapter setting (default: 10000ms). The heartbeat will automatically stop if:
154+
155+
- The connection is closed
156+
- The connection state is no longer `OPEN`
157+
- A new heartbeat is started (replaces the previous one)
158+
159+
**Note:** The heartbeat only starts if the `heartbeat` handler is provided. If you don't need heartbeat functionality, simply omit the `heartbeat` handler.
160+
112161
### Retrieving and storing the response or errors
113162

114163
As shown in the first example, the **handlers** object accepts a function called **message** which will be executed when Data Provider sends a message through the WS connection. It takes this message as its first argument and the adapter context as the second, and should build and return a list of response objects (_ProviderResult_) that will be stored in the response cache for the endpoint.

docs/reference-tables/ea-settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,6 @@
6363
| TLS_PUBLIC_KEY | string | undefined | Base64 Public Key of TSL/SSL certificate | - Value must be a valid base64 string | |
6464
| WARMUP_SUBSCRIPTION_TTL | number | 300000 | TTL for batch warmer subscriptions | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 0 | 3600000 |
6565
| WS_CONNECTION_OPEN_TIMEOUT | number | 10000 | The maximum amount of time in milliseconds to wait for the websocket connection to open (including custom open handler) | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 500 | 30000 |
66+
| WS_HEARTBEAT_INTERVAL_MS | number | 10000 | The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 5000 | 300000 |
6667
| WS_SUBSCRIPTION_TTL | number | 120000 | The time in ms a request will live in the subscription set before becoming stale | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 0 | 3600000 |
6768
| WS_SUBSCRIPTION_UNRESPONSIVE_TTL | number | 120000 | The maximum acceptable time (in milliseconds) since the last message was received and stored in the cache on a WebSocket connection before it is considered unresponsive, causing the adapter to close and attempt to reopen it. | - Value must be an integer<br> - Value must be above the minimum<br> - Value must be below the maximum | 1000 | 180000 |

src/config/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ export const BaseSettingsDefinition = {
249249
default: 10_000,
250250
validate: validator.integer({ min: 500, max: 30_000 }),
251251
},
252+
WS_HEARTBEAT_INTERVAL_MS: {
253+
description:
254+
'The number of ms between each hearbeat message that EA sends to server, only works if heartbeat handler is provided',
255+
type: 'number',
256+
default: 10_000,
257+
validate: validator.integer({ min: 5_000, max: 300_000 }),
258+
},
252259
CACHE_POLLING_MAX_RETRIES: {
253260
description:
254261
'Max amount of times to attempt to find EA response in the cache after the Transport has been set up',

src/transports/websocket.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,20 @@ export interface WebSocketTransportConfig<T extends WebsocketTransportGenerics>
6464
* Note: any listeners set in this method will be cleared after its execution.
6565
*
6666
* @param wsConnection - the WebSocket with an established connection
67+
* @param context - the background context for the Adapter
6768
* @returns an empty Promise, or void
6869
*/
6970
open?: (wsConnection: WebSocket, context: EndpointContext<T>) => Promise<void> | void
7071

72+
/**
73+
* Handles when client is ready to send a heartbeat to server
74+
*
75+
* @param wsConnection - the WebSocket with an established connection
76+
* @param context - the background context for the Adapter
77+
* @returns an empty Promise, or void
78+
*/
79+
heartbeat?: (wsConnection: WebSocket, context: EndpointContext<T>) => Promise<void> | void
80+
7181
/**
7282
* Handles when the websocket connection dispatches an error event
7383
* Optional to let the adapter handle the event in its own way if it decides to
@@ -159,6 +169,7 @@ export class WebSocketTransport<
159169
lastMessageReceivedAt = 0
160170
connectionOpenedAt = 0
161171
streamHandlerInvocationsWithNoConnection = 0
172+
heartbeatInterval?: NodeJS.Timeout
162173

163174
constructor(private config: WebSocketTransportConfig<T>) {
164175
super()
@@ -179,6 +190,39 @@ export class WebSocketTransport<
179190
return JSON.parse(data.toString()) as T['Provider']['WsMessage']
180191
}
181192

193+
startHeartbeat(context: EndpointContext<T>): void {
194+
if (this.config.handlers.heartbeat) {
195+
this.stopHeartbeat()
196+
197+
const intervalId = setInterval(async () => {
198+
if (this.heartbeatInterval !== intervalId) {
199+
clearInterval(intervalId)
200+
return
201+
}
202+
203+
if (this.wsConnection && this.wsConnection.readyState === WebSocket.OPEN) {
204+
try {
205+
logger.debug('Calling heartbeat handler')
206+
await this.config.handlers.heartbeat?.(this.wsConnection, context)
207+
} catch (error) {
208+
logger.warn({ error }, 'Heartbeat handler failed, will be tried later.')
209+
}
210+
} else {
211+
this.stopHeartbeat()
212+
}
213+
}, context.adapterSettings.WS_HEARTBEAT_INTERVAL_MS)
214+
215+
this.heartbeatInterval = intervalId
216+
}
217+
}
218+
219+
stopHeartbeat(): void {
220+
if (this.heartbeatInterval) {
221+
clearInterval(this.heartbeatInterval)
222+
this.heartbeatInterval = undefined
223+
}
224+
}
225+
182226
buildConnectionHandlers(
183227
context: EndpointContext<T>,
184228
connection: WebSocket,
@@ -192,6 +236,7 @@ export class WebSocketTransport<
192236
await this.config.handlers.open(connection, context)
193237
logger.debug('Successfully executed connection opened handler')
194238
}
239+
this.startHeartbeat(context)
195240
connectionReadyResolve(event.target)
196241
},
197242

@@ -262,6 +307,7 @@ export class WebSocketTransport<
262307
this.config.handlers.close(event, context)
263308
logger.debug('Successfully executed connection close handler')
264309
}
310+
this.stopHeartbeat()
265311
},
266312
}
267313
}

0 commit comments

Comments
 (0)