Skip to content

Commit 391a74b

Browse files
committed
add support for events
1 parent e8b6dc9 commit 391a74b

5 files changed

Lines changed: 131 additions & 1 deletion

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { WebSocketGateway, WebSocketServer, SubscribeMessage, ConnectedSocket, MessageBody } from '@nestjs/websockets';
2+
import { Server, Socket } from 'socket.io';
3+
import { UseFilters, UseGuards } from '@nestjs/common';
4+
import { OriginLogger } from '@multiversx/sdk-nestjs-common';
5+
6+
import { QueryPagination } from 'src/common/entities/query.pagination';
7+
import { WsValidationPipe } from 'src/utils/ws-validation.pipe';
8+
import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter';
9+
import { WsSubscriptionLimiterGuard } from 'src/utils/ws.subscription.limiter';
10+
import { RoomKeyGenerator } from './room.key.generator';
11+
import { EventsService } from 'src/endpoints/events/events.service';
12+
import { EventsCustomSubscribePayload } from 'src/endpoints/events/entities/events.custom.subscribe';
13+
import { EventsFilter } from 'src/endpoints/events/entities/events.filter';
14+
import { Events } from 'src/endpoints/events/entities/events';
15+
16+
17+
@UseFilters(WebsocketExceptionsFilter)
18+
@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' })
19+
export class EventsCustomGateway {
20+
private readonly logger = new OriginLogger(EventsCustomGateway.name);
21+
22+
// Prefix distinct pentru camerele de evenimente pentru a nu se suprapune cu txs
23+
static keyPrefix = 'custom-events-';
24+
25+
@WebSocketServer()
26+
server!: Server;
27+
28+
constructor(
29+
private readonly eventsService: EventsService,
30+
) { }
31+
32+
@UseGuards(WsSubscriptionLimiterGuard)
33+
@SubscribeMessage('subscribeCustomEvents')
34+
async handleCustomSubscription(
35+
@ConnectedSocket() client: Socket,
36+
@MessageBody(new WsValidationPipe()) payload: EventsCustomSubscribePayload
37+
) {
38+
const filterIdentifier = RoomKeyGenerator.deterministicStringify(payload);
39+
const roomName = `${EventsCustomGateway.keyPrefix}${filterIdentifier}`;
40+
41+
if (!client.rooms.has(roomName)) {
42+
await client.join(roomName);
43+
}
44+
45+
return { status: 'success' };
46+
}
47+
48+
@SubscribeMessage('unsubscribeCustomEvents')
49+
async handleCustomUnsubscribe(
50+
@ConnectedSocket() client: Socket,
51+
@MessageBody(new WsValidationPipe()) payload: EventsCustomSubscribePayload
52+
) {
53+
const filterIdentifier = RoomKeyGenerator.deterministicStringify(payload);
54+
const roomName = `${EventsCustomGateway.keyPrefix}${filterIdentifier}`;
55+
56+
if (client.rooms.has(roomName)) {
57+
await client.leave(roomName);
58+
}
59+
60+
return { status: 'unsubscribed' };
61+
}
62+
63+
async pushEventsForTimestampMs(timestampMs: number): Promise<void> {
64+
try {
65+
const timestamp = timestampMs / 1000;
66+
67+
const allEvents = await this.eventsService.getEvents(
68+
new QueryPagination({ size: 10000 }),
69+
new EventsFilter({ before: timestamp, after: timestamp }),
70+
);
71+
72+
const eventsFilteredForBroadcast: Map<string, Events[]> = new Map();
73+
74+
for (const event of allEvents) {
75+
const roomKeys = RoomKeyGenerator.generate(
76+
EventsCustomGateway.keyPrefix,
77+
event,
78+
EventsCustomSubscribePayload,
79+
);
80+
81+
for (const roomKey of roomKeys) {
82+
if (this.server.sockets.adapter.rooms.has(roomKey)) {
83+
if (!eventsFilteredForBroadcast.has(roomKey)) {
84+
eventsFilteredForBroadcast.set(roomKey, []);
85+
}
86+
eventsFilteredForBroadcast.get(roomKey)!.push(event);
87+
}
88+
}
89+
}
90+
91+
for (const [roomName] of eventsFilteredForBroadcast) {
92+
this.server.to(roomName).emit("customEventUpdate", {
93+
events: eventsFilteredForBroadcast.get(roomName),
94+
timestampMs
95+
});
96+
}
97+
} catch (error) {
98+
this.logger.error(error);
99+
}
100+
}
101+
}

src/crons/websocket/room.key.generator.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventsCustomSubscribePayload } from 'src/endpoints/events/entities/events.custom.subscribe';
12
import { TransactionCustomSubscribePayload } from 'src/endpoints/transactions/entities/dtos/transaction.custom.subscribe';
23

34
export class RoomKeyGenerator {
@@ -61,6 +62,8 @@ export class RoomKeyGenerator {
6162
switch (targetClass) {
6263
case TransactionCustomSubscribePayload:
6364
return TransactionCustomSubscribePayload.getClassFields();
65+
case EventsCustomSubscribePayload:
66+
return EventsCustomSubscribePayload.getClassFields();
6467
default:
6568
console.warn(`RoomKeyGenerator: No manual key mapping found for class ${targetClass.name}`);
6669
return [];

src/crons/websocket/websocket.cron.service.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { NetworkService } from 'src/endpoints/network/network.service';
1919
import { Stats } from 'src/endpoints/network/entities/stats';
2020
import { TransactionsCustomGateway } from './transaction.custom.gateway';
2121
import { ConnectionHandler } from './connection.handler';
22+
import { EventsCustomGateway } from './events.custom.gateway';
2223

2324
@Injectable()
2425
@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' })
@@ -38,6 +39,7 @@ export class WebsocketCronService {
3839
private readonly elasticService: ElasticService,
3940
private readonly networkService: NetworkService,
4041
private readonly transactionsCustomGateway: TransactionsCustomGateway,
42+
private readonly eventsCustomGateway: EventsCustomGateway,
4143
private readonly connectionHandler: ConnectionHandler,
4244
) { }
4345

@@ -111,7 +113,10 @@ export class WebsocketCronService {
111113
await this.pollUntil(async () => await this.isElasticDataAvailableForTimestampMs(roundToProcessTimestampMs, stats), pollingDelay, pollingMaxAttempts);
112114

113115
// call gateways to process logic for custom subscriptions
114-
await this.transactionsCustomGateway.pushTransactionsForTimestampMs(roundToProcessTimestampMs);
116+
await Promise.all([
117+
this.transactionsCustomGateway.pushTransactionsForTimestampMs(roundToProcessTimestampMs),
118+
this.eventsCustomGateway.pushEventsForTimestampMs(roundToProcessTimestampMs)
119+
]);
115120
roundToProcessTimestampMs += stats.refreshRate;
116121
}
117122
this.cacheService.setLocal(

src/crons/websocket/websocket.subscription.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { EventsGateway } from './events.gateway';
1414
import { ConnectionHandler } from './connection.handler';
1515
import { RoundModule } from 'src/endpoints/rounds/round.module';
1616
import { TransactionsCustomGateway } from './transaction.custom.gateway';
17+
import { EventsCustomGateway } from './events.custom.gateway';
1718

1819
@Module({
1920
imports: [
@@ -34,6 +35,7 @@ import { TransactionsCustomGateway } from './transaction.custom.gateway';
3435
PoolGateway,
3536
EventsGateway,
3637
TransactionsCustomGateway,
38+
EventsCustomGateway,
3739
],
3840
})
3941
export class WebsocketSubscriptionModule { }
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { IsOptional, IsString } from 'class-validator';
2+
3+
export class EventsCustomSubscribePayload {
4+
@IsOptional()
5+
@IsString()
6+
address?: string;
7+
8+
@IsOptional()
9+
@IsString()
10+
identifier?: string;
11+
12+
@IsOptional()
13+
@IsString()
14+
logAddress?: string;
15+
16+
public static getClassFields(): string[] {
17+
return ['address', 'identifier', 'logAddress'];
18+
}
19+
}

0 commit comments

Comments
 (0)