Skip to content

Commit 60aa783

Browse files
Merge pull request #1574 from multiversx/add-ws-subscriptions-metrics
added metric per topics
2 parents bd08586 + 820466f commit 60aa783

3 files changed

Lines changed: 71 additions & 18 deletions

File tree

src/common/metrics/api.metrics.service.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export class ApiMetricsService {
2323
private static transactionsPendingResultsCounter: Counter<string>;
2424
private static batchUpdatesCounter: Counter<string>;
2525
private static subscriptionsConnectionsGauge: Gauge<string>;
26+
private static subscriptionsTopicConnectionsGauge: Gauge<string>;
2627

2728
constructor(
2829
private readonly apiConfigService: ApiConfigService,
@@ -40,6 +41,14 @@ export class ApiMetricsService {
4041
});
4142
}
4243

44+
if (!ApiMetricsService.subscriptionsTopicConnectionsGauge) {
45+
ApiMetricsService.subscriptionsTopicConnectionsGauge = new Gauge({
46+
name: 'websocket_subscriptions_topic_connections',
47+
help: 'Unique websocket clients per topic',
48+
labelNames: ['topic'],
49+
});
50+
}
51+
4352
if (!ApiMetricsService.vmQueriesHistogram) {
4453
ApiMetricsService.vmQueriesHistogram = new Histogram({
4554
name: 'vm_query',
@@ -191,10 +200,16 @@ export class ApiMetricsService {
191200
}
192201

193202
@OnEvent(MetricsEvents.SetWebsocketMetrics)
194-
setWebsocketSubscriptionsMetrics(payload: { connectedClients: number }) {
195-
const { connectedClients } = payload;
203+
setWebsocketSubscriptionsMetrics(payload: { connectedClients: number; topics?: Record<string, number> }) {
204+
const { connectedClients, topics } = payload;
196205

197206
ApiMetricsService.subscriptionsConnectionsGauge.set(connectedClients);
207+
208+
if (topics) {
209+
for (const [topic, count] of Object.entries(topics)) {
210+
ApiMetricsService.subscriptionsTopicConnectionsGauge.set({ topic }, count);
211+
}
212+
}
198213
}
199214

200215

src/crons/websocket/connection.handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export class ConnectionHandler implements OnGatewayDisconnect, OnGatewayConnecti
1515
handleDisconnect(_client: Socket) { }
1616

1717
handleConnection(client: Socket, ..._args: any[]) {
18-
client.setMaxListeners(12);
18+
client.setMaxListeners(16);
1919
}
2020

2121
hasSubscriptionsWithPrefixes(prefixes: string[]): boolean {

src/crons/websocket/websocket.cron.service.ts

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,6 @@ export class WebsocketCronService {
4545
private readonly apiConfigService: ApiConfigService,
4646
) { }
4747

48-
@Cron('*/1 * * * * *')
49-
handleWebsocketMetrics() {
50-
const connectedClients = this.server.sockets.sockets.size ?? 0;
51-
// TODO: add more metrics in the future
52-
// const subscriptions: Record<string, number> = {};
53-
54-
// this.server.sockets.adapter.rooms.forEach((socketsSet, roomName) => {
55-
// subscriptions[roomName] = socketsSet.size;
56-
// });
57-
58-
this.eventEmitter.emit(MetricsEvents.SetWebsocketMetrics, {
59-
connectedClients,
60-
});
61-
}
62-
6348
@Cron('*/6 * * * * *')
6449
@Lock({ name: 'Push transactions to subscribers', verbose: true })
6550
async handleTransactionsUpdate() {
@@ -128,6 +113,59 @@ export class WebsocketCronService {
128113
);
129114
}
130115

116+
@Cron('*/10 * * * * *')
117+
@Lock({ name: 'Push websocket subscriptions metrics', verbose: true })
118+
handleWebsocketMetrics() {
119+
const connectedClients = this.server.sockets.sockets.size ?? 0;
120+
121+
// Efficient unique-listener counts per topic
122+
const adapter = this.server.sockets.adapter as any;
123+
const sids: Map<string, Set<string>> = adapter?.sids ?? new Map();
124+
125+
const topicPrefixes: Array<{ key: string; prefix?: string; room?: string }> = [
126+
{ key: 'tx', prefix: TransactionsGateway.keyPrefix },
127+
{ key: 'customTx', prefix: TransactionsCustomGateway.keyPrefix },
128+
{ key: 'events', prefix: EventsGateway.keyPrefix },
129+
{ key: 'customEvents', prefix: EventsCustomGateway.keyPrefix },
130+
{ key: 'blocks', prefix: BlocksGateway.keyPrefix },
131+
{ key: 'pool', prefix: PoolGateway.keyPrefix },
132+
{ key: 'stats', room: 'statsRoom' },
133+
];
134+
135+
const topics: Record<string, number> = {};
136+
for (const { key } of topicPrefixes) topics[key] = 0;
137+
138+
// Count unique sockets per prefix-based topic by scanning socket -> rooms map once
139+
if (sids && sids.size > 0) {
140+
for (const [, rooms] of sids) {
141+
// Track whether this socket has been counted for a given topic key
142+
const matched: Record<string, boolean> = {};
143+
144+
for (const roomName of rooms) {
145+
for (const { key, prefix } of topicPrefixes) {
146+
if (!prefix || matched[key]) continue;
147+
if (roomName.startsWith(prefix)) {
148+
topics[key] += 1;
149+
matched[key] = true;
150+
}
151+
}
152+
}
153+
}
154+
}
155+
156+
// Handle exact-room topics (like statsRoom) directly from rooms map
157+
const rooms: Map<string, Set<string>> = adapter?.rooms ?? new Map();
158+
const statsRoomSet = rooms.get('statsRoom');
159+
if (statsRoomSet) {
160+
topics['stats'] = statsRoomSet.size;
161+
}
162+
163+
this.eventEmitter.emit(MetricsEvents.SetWebsocketMetrics, {
164+
connectedClients,
165+
topics,
166+
});
167+
}
168+
131169
private async getLatestRoundOnChainData() {
132170
const rounds = await this.roundService.getRounds(new RoundFilter({ size: 1 }));
133171
return rounds[0];

0 commit comments

Comments
 (0)