Skip to content

Commit afbfe3d

Browse files
Added ShrimpyWsClient to connect to the Shrimpy Websocket API
1 parent d07ee46 commit afbfe3d

11 files changed

Lines changed: 261 additions & 9 deletions

README.md

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,4 +439,40 @@ const assetDominance = await client.getAssetDominance();
439439
* [`getAssetPopularity`](https://developers.shrimpy.io/docs/#get-asset-popularity)
440440
```js
441441
const assetPopularity = await client.getAssetPopularity();
442-
```
442+
```
443+
444+
## Websocket
445+
446+
Users can access the Shrimpy websocket feed using the [`ShrimpyWsClient`](https://github.com/shrimpy-dev/shrimpy-node/blob/master/lib/client/shrimpy-ws-client.ts) class. A handler must be passed in on subscription
447+
that is responsible for processing incoming messages from the websocket stream. It is recommended that you simply send the message
448+
to another processing thread from your custom handler to prevent blocking the incoming message stream.
449+
450+
The client handles pings to the Shrimpy server based on the [`API Documentation`](https://developers.shrimpy.io/docs/#websocket)
451+
452+
```js
453+
import { ShrimpyWsClient, ISubscriptionRequest, IWebsocketMessage, IErrorMessage } from 'shrimpy-node';
454+
455+
let errorHandler = (error: IErrorMessage) => { console.log(error) };
456+
let client = new ShrimpyWsClient(errorHandler);
457+
458+
const subscribeData: ISubscriptionRequest = {
459+
"type": "subscribe",
460+
"pair": "btc-usd",
461+
"exchange": "coinbasepro",
462+
"channel": "trade"
463+
};
464+
465+
const unsubscribeData : ISubscriptionRequest = {
466+
"type": "unsubscribe",
467+
"pair": "btc-usd",
468+
"exchange": "coinbasepro",
469+
"channel": "trade"
470+
};
471+
472+
let handler = (msg: IWebsocketMessage) => { console.log(msg); };
473+
474+
client.connect();
475+
client.subscribe(subscribeData, handler);
476+
client.unsubscribe(unsubscribeData);
477+
client.forceDisconnect();
478+
```

lib/client/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-

2-
export * from './authentication-provider';
3-
export * from './shrimpy-api-client';
1+
export * from './authentication-provider';
2+
export * from './shrimpy-api-client';
3+
export * from './shrimpy-ws-client';

lib/client/shrimpy-ws-client.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import * as WebSocket from 'ws';
2+
import { ISubscriptionRequest, IWebsocketMessage, IPingMessage, IErrorMessage } from '../models';
3+
4+
5+
export class ShrimpyWsClient {
6+
private _websocket: WebSocket;
7+
private _subscriptionCallbacks: {
8+
[subscription: string]: (data: IWebsocketMessage) => void
9+
} = {};
10+
11+
private _websocketErrorCallback: (error: IErrorMessage) => void;
12+
13+
constructor (errorCallback: (error: IErrorMessage) => void) {
14+
this._websocket = new WebSocket('wss://ws-feed.shrimpy.io/');
15+
this._websocketErrorCallback = errorCallback;
16+
}
17+
18+
public connect() {
19+
20+
this._websocket.on('open', function open() {
21+
// Open the connection
22+
});
23+
24+
this._websocket.on('error', (error: Error) => {
25+
const wsError : IErrorMessage = {
26+
'type' : 'WebsocketClientError',
27+
'code' : 2404,
28+
'message' : error.message
29+
};
30+
this._websocketErrorCallback(wsError);
31+
});
32+
33+
this._websocket.on('message', (message) => {
34+
const parsedMessage: IWebsocketMessage = JSON.parse(message.toString());
35+
const topic = this._getTopic(parsedMessage);
36+
37+
if (topic === 'ping') {
38+
// Handle Ping
39+
this._pong(parsedMessage as IPingMessage);
40+
return;
41+
}
42+
43+
if (topic === 'error') {
44+
this._websocketErrorCallback(parsedMessage as IErrorMessage);
45+
return;
46+
}
47+
48+
const successCallback = this._subscriptionCallbacks[topic];
49+
if (successCallback !== undefined) {
50+
successCallback(parsedMessage);
51+
}
52+
});
53+
54+
this._websocket.on('close', () => {
55+
// Connection has been closed, delete all callbacks
56+
this._subscriptionCallbacks = {};
57+
});
58+
}
59+
60+
public disconnect() {
61+
if (this._websocket !== undefined) {
62+
this._websocket.close();
63+
}
64+
}
65+
66+
public forceDisconnect() {
67+
if (this._websocket !== undefined) {
68+
this._websocket.terminate();
69+
}
70+
}
71+
72+
public subscribe(
73+
subscriptionRequest: ISubscriptionRequest,
74+
successCallback: (data: IWebsocketMessage) => void,
75+
) {
76+
if (this._websocket.OPEN) {
77+
const topic = this._getTopic(subscriptionRequest);
78+
this._subscriptionCallbacks[topic] = successCallback;
79+
this._websocket.send(JSON.stringify(subscriptionRequest));
80+
}
81+
}
82+
83+
public unsubscribe(
84+
unsubscriptionRequest: ISubscriptionRequest
85+
) {
86+
const topic = this._getTopic(unsubscriptionRequest);
87+
delete this._subscriptionCallbacks[topic];
88+
this._websocket.send(JSON.stringify(unsubscriptionRequest));
89+
}
90+
91+
private _getTopic(message: any): string {
92+
93+
if (message.hasOwnProperty('type')) {
94+
const messageType = message['type'];
95+
if (messageType.indexOf('subscribe') === -1) {
96+
return messageType.toLowerCase();
97+
}
98+
}
99+
100+
const exchange = message['exchange'];
101+
const pair = message['pair'];
102+
const channel = message['channel'];
103+
104+
const rawKeys = [exchange.toLowerCase(), pair.toLowerCase(), channel.toLowerCase()];
105+
const nonNullKeys = rawKeys.filter((k) => {
106+
return k !== undefined;
107+
});
108+
109+
return nonNullKeys.join('-');
110+
}
111+
112+
private _pong(parsedData: IPingMessage) {
113+
const pong = {
114+
'type': 'pong',
115+
'data': parsedData.data
116+
};
117+
this._websocket.send(JSON.stringify(pong));
118+
}
119+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export enum SubscriptionTypeEnum {
2+
3+
}

lib/models/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,8 @@ export * from './itotal-balance-history-item';
2727
export * from './itrade';
2828
export * from './itrade-changes';
2929
export * from './itrade-fill';
30+
export * from './itrade-item';
3031
export * from './itrading-pair';
32+
export * from './isubscription-request';
3133
export * from './iuser';
34+
export * from './iwebsocket-message';
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
export type ISubscriptionRequest = IExchangeSubscription | IOrderSubscription;
2+
3+
export interface IExchangeSubscription {
4+
type: 'subscribe' | 'unsubscribe';
5+
exchange: string;
6+
pair: string;
7+
channel: string;
8+
}
9+
10+
export interface IOrderSubscription {
11+
channel: string;
12+
}

lib/models/itrade-item.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import Decimal from "decimal.js";
2+
3+
export interface ITradeItem {
4+
id: number;
5+
price: Decimal;
6+
quantity: Decimal;
7+
time: Date;
8+
btcValue: Decimal;
9+
usdValue: Decimal;
10+
}

lib/models/iwebsocket-message.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { IOrderBookItem } from "./iorder-book-item";
2+
import { ITradeItem } from "./itrade-item";
3+
4+
export type IWebsocketMessage = IExchangePairMessage | IOrderMessage | IErrorMessage | IPingMessage;
5+
6+
export interface IExchangePairMessage {
7+
exchange: string;
8+
pair: string;
9+
channel: 'bbo' | 'orderbook' | 'trades'
10+
content: WebsocketContent
11+
}
12+
13+
export interface IOrderMessage {
14+
channel: 'orders';
15+
content: WebsocketContent;
16+
}
17+
18+
export interface IErrorMessage {
19+
type: string;
20+
code: number;
21+
message: string;
22+
}
23+
24+
export interface IPingMessage {
25+
type: string;
26+
data: number;
27+
}
28+
29+
export type WebsocketContent = OrderBookContent | TradeContent | OrdersContent;
30+
31+
export interface OrderBookContent {
32+
sequence: number;
33+
asks: IOrderBookItem[];
34+
bids: IOrderBookItem[];
35+
}
36+
37+
export interface TradeContent {
38+
trades: ITradeItem[];
39+
}
40+
41+
export type OrdersContent = string[];

lib/public-api.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11

2-
export { ShrimpyApiClient } from './client';
2+
export { ShrimpyApiClient, ShrimpyWsClient } from './client';
33
export * from './models';

package-lock.json

Lines changed: 28 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)