Skip to content

Commit 7f6e3e4

Browse files
authored
Merge pull request #1569 from multiversx/ws-custom-subscriptions
Ws custom subscriptions
2 parents f680cd2 + 7b992d7 commit 7f6e3e4

30 files changed

Lines changed: 1086 additions & 35 deletions

config/config.devnet.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ features:
2323
websocketSubscription:
2424
enabled: false
2525
port: 6002
26+
maxSubscriptionsPerInstance: 10000
27+
maxSubscriptionsPerClient: 5
2628
eventsNotifier:
2729
enabled: false
2830
port: 5674

config/config.e2e-mocked.mainnet.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ features:
88
websocketSubscription:
99
enabled: false
1010
port: 6002
11+
maxSubscriptionsPerInstance: 10000
12+
maxSubscriptionsPerClient: 5
1113
dataApi:
1214
enabled: false
1315
serviceUrl: 'https://data-api.multiversx.com'

config/config.e2e.mainnet.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ flags:
2121
collectionPropertiesFromGateway: false
2222
features:
2323
websocketSubscription:
24-
enabled: false
24+
enabled: true
2525
port: 6002
26+
maxSubscriptionsPerInstance: 10000
27+
maxSubscriptionsPerClient: 5
2628
eventsNotifier:
2729
enabled: false
2830
port: 5674

config/config.mainnet.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ features:
2323
websocketSubscription:
2424
enabled: false
2525
port: 6002
26+
maxSubscriptionsPerInstance: 10000
27+
maxSubscriptionsPerClient: 5
2628
eventsNotifier:
2729
enabled: false
2830
port: 5674

config/config.testnet.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ features:
2323
websocketSubscription:
2424
enabled: false
2525
port: 6002
26+
maxSubscriptionsPerInstance: 10000
27+
maxSubscriptionsPerClient: 5
2628
eventsNotifier:
2729
enabled: false
2830
port: 5674

src/common/api-config/api.config.service.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,14 @@ export class ApiConfigService {
10101010
return port;
10111011
}
10121012

1013+
getWebsocketMaxSubscriptionsPerInstance(): number {
1014+
return this.configService.get<number>('features.websocketSubscription.maxSubscriptionsPerInstance') ?? 10_000;
1015+
}
1016+
1017+
getWebsocketMaxSubscriptionsPerClient(): number {
1018+
return this.configService.get<number>('features.websocketSubscription.maxSubscriptionsPerClient') ?? 5;
1019+
}
1020+
10131021
getHeadersForCustomUrl(url: string): Record<string, string> | undefined {
10141022
let customUrlConfigs = this.configService.get<any>('customUrlHeaders');
10151023

src/common/metrics/api.metrics.service.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export class ApiMetricsService {
2323
private static transactionsPendingResultsCounter: Counter<string>;
2424
private static batchUpdatesCounter: Counter<string>;
2525
private static subscriptionsConnectionsGauge: Gauge<string>;
26+
private static subscriptionsTopicConnectionsGauge: Gauge<string>;
2627

2728
constructor(
2829
private readonly apiConfigService: ApiConfigService,
@@ -40,6 +41,14 @@ export class ApiMetricsService {
4041
});
4142
}
4243

44+
if (!ApiMetricsService.subscriptionsTopicConnectionsGauge) {
45+
ApiMetricsService.subscriptionsTopicConnectionsGauge = new Gauge({
46+
name: 'websocket_subscriptions_topic_connections',
47+
help: 'Unique websocket clients per topic',
48+
labelNames: ['topic'],
49+
});
50+
}
51+
4352
if (!ApiMetricsService.vmQueriesHistogram) {
4453
ApiMetricsService.vmQueriesHistogram = new Histogram({
4554
name: 'vm_query',
@@ -191,10 +200,16 @@ export class ApiMetricsService {
191200
}
192201

193202
@OnEvent(MetricsEvents.SetWebsocketMetrics)
194-
setWebsocketSubscriptionsMetrics(payload: { connectedClients: number }) {
195-
const { connectedClients } = payload;
203+
setWebsocketSubscriptionsMetrics(payload: { connectedClients: number; topics?: Record<string, number> }) {
204+
const { connectedClients, topics } = payload;
196205

197206
ApiMetricsService.subscriptionsConnectionsGauge.set(connectedClients);
207+
208+
if (topics) {
209+
for (const [topic, count] of Object.entries(topics)) {
210+
ApiMetricsService.subscriptionsTopicConnectionsGauge.set({ topic }, count);
211+
}
212+
}
198213
}
199214

200215

src/crons/websocket/blocks.gateway.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,59 @@ import { BlockService } from '../../endpoints/blocks/block.service';
44
import { BlockFilter } from '../../endpoints/blocks/entities/block.filter';
55
import { QueryPagination } from 'src/common/entities/query.pagination';
66
import { BlockSubscribePayload } from '../../endpoints/blocks/entities/block.subscribe';
7-
import { UseFilters } from '@nestjs/common';
7+
import { UseFilters, UseGuards } from '@nestjs/common';
88
import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter';
99
import { WsValidationPipe } from 'src/utils/ws-validation.pipe';
1010
import { OriginLogger } from '@multiversx/sdk-nestjs-common';
11+
import { WsSubscriptionLimiterGuard } from 'src/utils/ws.subscription.limiter';
1112

1213
@UseFilters(WebsocketExceptionsFilter)
1314
@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' })
1415
export class BlocksGateway {
1516
private readonly logger = new OriginLogger(BlocksGateway.name);
17+
static readonly keyPrefix = 'blocks-';
1618

1719
@WebSocketServer()
1820
server!: Server;
1921

2022
constructor(private readonly blockService: BlockService) { }
2123

24+
@UseGuards(WsSubscriptionLimiterGuard)
2225
@SubscribeMessage('subscribeBlocks')
2326
async handleSubscription(
2427
@ConnectedSocket() client: Socket,
2528
@MessageBody(new WsValidationPipe()) payload: BlockSubscribePayload
2629
) {
2730
const filterIdentifier = JSON.stringify(payload);
28-
await client.join(`blocks-${filterIdentifier}`);
31+
const roomName = `${BlocksGateway.keyPrefix}${filterIdentifier}`;
32+
33+
if (!client.rooms.has(roomName)) {
34+
await client.join(roomName);
35+
}
2936

3037
return { status: 'success' };
3138
}
3239

40+
@SubscribeMessage('unsubscribeBlocks')
41+
async handleUnsubscribe(
42+
@ConnectedSocket() client: Socket,
43+
@MessageBody(new WsValidationPipe()) payload: BlockSubscribePayload
44+
) {
45+
const filterIdentifier = JSON.stringify(payload);
46+
const roomName = `${BlocksGateway.keyPrefix}${filterIdentifier}`;
47+
48+
if (client.rooms.has(roomName)) {
49+
await client.leave(roomName);
50+
}
51+
52+
return { status: 'unsubscribed' };
53+
}
54+
3355
async pushBlocksForRoom(roomName: string): Promise<void> {
34-
if (!roomName.startsWith("blocks-")) return;
56+
if (!roomName.startsWith(BlocksGateway.keyPrefix)) return;
3557

3658
try {
37-
const filterIdentifier = roomName.replace("blocks-", "");
59+
const filterIdentifier = roomName.replace(BlocksGateway.keyPrefix, "");
3860
const filter: BlockSubscribePayload = JSON.parse(filterIdentifier);
3961

4062
const blockFilter = new BlockFilter({

src/crons/websocket/connection.handler.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ export class ConnectionHandler implements OnGatewayDisconnect, OnGatewayConnecti
1515
handleDisconnect(_client: Socket) { }
1616

1717
handleConnection(client: Socket, ..._args: any[]) {
18-
client.setMaxListeners(12);
18+
client.setMaxListeners(16);
19+
}
20+
21+
hasSubscriptionsWithPrefixes(prefixes: string[]): boolean {
22+
const rooms = this.server.sockets.adapter.rooms;
23+
24+
for (const roomName of rooms.keys()) {
25+
if (prefixes.some(prefix => roomName.startsWith(prefix))) {
26+
return true;
27+
}
28+
}
29+
30+
return false;
1931
}
2032
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { WebSocketGateway, WebSocketServer, SubscribeMessage, ConnectedSocket, MessageBody } from '@nestjs/websockets';
2+
import { Server, Socket } from 'socket.io';
3+
import { UseFilters, UseGuards } from '@nestjs/common';
4+
import { OriginLogger } from '@multiversx/sdk-nestjs-common';
5+
6+
import { QueryPagination } from 'src/common/entities/query.pagination';
7+
import { WsValidationPipe } from 'src/utils/ws-validation.pipe';
8+
import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter';
9+
import { WsSubscriptionLimiterGuard } from 'src/utils/ws.subscription.limiter';
10+
import { RoomKeyGenerator } from './room.key.generator';
11+
import { EventsService } from 'src/endpoints/events/events.service';
12+
import { EventsCustomSubscribePayload } from 'src/endpoints/events/entities/events.custom.subscribe';
13+
import { EventsFilter } from 'src/endpoints/events/entities/events.filter';
14+
import { Events } from 'src/endpoints/events/entities/events';
15+
16+
17+
@UseFilters(WebsocketExceptionsFilter)
18+
@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' })
19+
export class EventsCustomGateway {
20+
private readonly logger = new OriginLogger(EventsCustomGateway.name);
21+
22+
static keyPrefix = 'custom-events-';
23+
24+
@WebSocketServer()
25+
server!: Server;
26+
27+
constructor(
28+
private readonly eventsService: EventsService,
29+
) { }
30+
31+
@UseGuards(WsSubscriptionLimiterGuard)
32+
@SubscribeMessage('subscribeCustomEvents')
33+
async handleCustomSubscription(
34+
@ConnectedSocket() client: Socket,
35+
@MessageBody(new WsValidationPipe()) payload: EventsCustomSubscribePayload
36+
) {
37+
const filterIdentifier = RoomKeyGenerator.deterministicStringify(payload);
38+
const roomName = `${EventsCustomGateway.keyPrefix}${filterIdentifier}`;
39+
40+
if (!client.rooms.has(roomName)) {
41+
await client.join(roomName);
42+
}
43+
44+
return { status: 'success' };
45+
}
46+
47+
@SubscribeMessage('unsubscribeCustomEvents')
48+
async handleCustomUnsubscribe(
49+
@ConnectedSocket() client: Socket,
50+
@MessageBody(new WsValidationPipe()) payload: EventsCustomSubscribePayload
51+
) {
52+
const filterIdentifier = RoomKeyGenerator.deterministicStringify(payload);
53+
const roomName = `${EventsCustomGateway.keyPrefix}${filterIdentifier}`;
54+
55+
if (client.rooms.has(roomName)) {
56+
await client.leave(roomName);
57+
}
58+
59+
return { status: 'unsubscribed' };
60+
}
61+
62+
async pushEventsForTimestampMs(timestampMs: number): Promise<void> {
63+
try {
64+
const allEvents = await this.eventsService.getEvents(
65+
new QueryPagination({ size: 10000 }),
66+
new EventsFilter({ before: timestampMs, after: timestampMs }),
67+
);
68+
69+
const eventsFilteredForBroadcast: Map<string, Events[]> = new Map();
70+
71+
for (const event of allEvents) {
72+
const roomKeys = RoomKeyGenerator.generate(
73+
EventsCustomGateway.keyPrefix,
74+
event,
75+
EventsCustomSubscribePayload,
76+
);
77+
78+
for (const roomKey of roomKeys) {
79+
if (this.server.sockets.adapter.rooms.has(roomKey)) {
80+
if (!eventsFilteredForBroadcast.has(roomKey)) {
81+
eventsFilteredForBroadcast.set(roomKey, []);
82+
}
83+
eventsFilteredForBroadcast.get(roomKey)!.push(event);
84+
}
85+
}
86+
}
87+
88+
for (const [roomName] of eventsFilteredForBroadcast) {
89+
this.server.to(roomName).emit("customEventUpdate", {
90+
events: eventsFilteredForBroadcast.get(roomName),
91+
timestampMs,
92+
});
93+
}
94+
} catch (error) {
95+
this.logger.error(error);
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)