Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5239843
add websockets for blocks and txs
stefangutica Aug 20, 2025
6be90b6
add support for subscribe to stats
stefangutica Aug 21, 2025
1442509
use websockets rooms
stefangutica Aug 22, 2025
7d322ee
remove logs
stefangutica Aug 22, 2025
62974c7
add lock on crons
stefangutica Aug 22, 2025
8006d5e
check stats room exists
stefangutica Aug 22, 2025
d09f688
fix indent spaces
stefangutica Aug 22, 2025
62fa109
add validation pipes + filters
stefangutica Aug 22, 2025
a7e71ea
add try catch + class validator fixes
stefangutica Aug 22, 2025
58be392
fix linter
stefangutica Aug 22, 2025
cb92d2b
add pool subscription + reduce filters combinations for subscriptions
stefangutica Sep 2, 2025
344330e
lint
stefangutica Sep 2, 2025
404b18f
add support for events subscription
stefangutica Sep 2, 2025
d12208d
lint
stefangutica Sep 2, 2025
c0833a7
separate subscription websocket into separate app
stefangutica Sep 3, 2025
4f9741d
add config
stefangutica Sep 3, 2025
889a75b
add path
stefangutica Sep 3, 2025
cdfc711
fix
stefangutica Sep 3, 2025
788284b
add path for events + config default settings
stefangutica Sep 3, 2025
207ef83
temp logs
bogdan-rosianu Sep 4, 2025
524fd9e
temp logs 2
bogdan-rosianu Sep 4, 2025
e644794
added missing configs + remove temp logs
bogdan-rosianu Sep 4, 2025
c5dc598
enable andromeda in config
stefangutica Sep 4, 2025
cad49bc
add metrics on subscription
stefangutica Sep 8, 2025
8e92d63
remove async + reschedule
stefangutica Sep 8, 2025
578303a
refresh metrics every second
stefangutica Sep 8, 2025
6679b1f
set max listeners to 12
stefangutica Sep 10, 2025
703a19e
Merge branch 'development' into websockets-poc
cfaur09 Sep 16, 2025
f6f747b
add EOL
cfaur09 Sep 16, 2025
d86907f
add count on update + parallel broadcast to rooms
stefangutica Sep 25, 2025
5ceec0e
Merge branch 'development' into websockets-poc
bogdan-rosianu Oct 9, 2025
bdb82de
lower ttl for blocks count cache
stefangutica Oct 16, 2025
341cbe3
remove comments
stefangutica Oct 16, 2025
e0c2295
remove comments
stefangutica Oct 16, 2025
a4dfb21
renaming
stefangutica Oct 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.devnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ api:
publicPort: 3001
private: true
privatePort: 4001
websocket: true
websocket: false
cron:
cacheWarmer: true
fastWarm: true
Expand Down
93 changes: 93 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@
"rxjs": "^7.1.0",
"sharp": "^0.34.2",
"simple-git": "^3.16.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"swagger-ui-express": "^4.3.0",
"tiny-async-pool": "^1.2.0",
"typeorm": "^0.3.25",
Expand Down
32 changes: 32 additions & 0 deletions src/crons/websocket/websocket.cron.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { TransactionsGateway } from '../../endpoints/transactions/transaction.gateway';
import { BlocksGateway } from 'src/endpoints/blocks/blocks.gateway';
import { NetworkGateway } from 'src/endpoints/network/network.gateway';
import { Lock } from "@multiversx/sdk-nestjs-common";
@Injectable()
export class WebsocketCronService {
constructor(
private readonly transactionsGateway: TransactionsGateway,
private readonly blocksGateway: BlocksGateway,
private readonly networkGateway: NetworkGateway,
) { }

@Cron('*/6 * * * * *')
@Lock({ name: 'Push transactions to subscribers', verbose: true })
async handleTransactionsUpdate() {
await this.transactionsGateway.pushTransactions();
}

@Cron('*/6 * * * * *')
@Lock({ name: 'Push blocks to subscribers', verbose: true })
async handleBlocksUpdate() {
await this.blocksGateway.pushBlocks();
}

@Cron('*/6 * * * * *')
@Lock({ name: 'Push stats to subscribers', verbose: true })
async handleStatsUpdate() {
await this.networkGateway.pushStats();
}
}
19 changes: 19 additions & 0 deletions src/crons/websocket/websocket.crons.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { TransactionModule } from 'src/endpoints/transactions/transaction.module';
import { WebsocketCronService } from './websocket.cron.service';
import { BlockModule } from 'src/endpoints/blocks/block.module';
import { NetworkModule } from 'src/endpoints/network/network.module';

@Module({
imports: [
ScheduleModule.forRoot(),
TransactionModule,
BlockModule,
NetworkModule,
],
providers: [
WebsocketCronService,
],
})
export class WebSocketCronModule { }
5 changes: 3 additions & 2 deletions src/endpoints/blocks/block.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { BlsModule } from "../bls/bls.module";
import { IdentitiesModule } from "../identities/identities.module";
import { NodeModule } from "../nodes/node.module";
import { BlockService } from "./block.service";
import { BlocksGateway } from "./blocks.gateway";

@Module({
imports: [
Expand All @@ -11,10 +12,10 @@ import { BlockService } from "./block.service";
forwardRef(() => IdentitiesModule),
],
providers: [
BlockService,
BlockService, BlocksGateway,
],
exports: [
BlockService,
BlockService, BlocksGateway,
],
})
export class BlockModule { }
48 changes: 48 additions & 0 deletions src/endpoints/blocks/blocks.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayDisconnect } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { BlockService } from './block.service';
import { BlockFilter } from './entities/block.filter';
import { QueryPagination } from 'src/common/entities/query.pagination';

@WebSocketGateway({ cors: { origin: '*' } })
export class BlocksGateway implements OnGatewayDisconnect {
@WebSocketServer()
server!: Server;

constructor(private readonly blockService: BlockService) { }

@SubscribeMessage('subscribeBlocks')
async handleSubscription(client: Socket, payload: any) {
const filterHash = JSON.stringify(payload);
await client.join(`block-${filterHash}`);
}

async pushBlocks() {
for (const [roomName] of this.server.sockets.adapter.rooms) {
if (!roomName.startsWith("block-")) continue;

const filterHash = roomName.replace("block-", "");
const filter = JSON.parse(filterHash);

const blockFilter = new BlockFilter({
shard: filter.shard,
proposer: filter.proposer,
validator: filter.validator,
epoch: filter.epoch,
nonce: filter.nonce,
hashes: filter.hashes,
order: filter.order,
});

const blocks = await this.blockService.getBlocks(
Comment thread
cfaur09 marked this conversation as resolved.
Outdated
blockFilter,
new QueryPagination({ from: filter.from || 0, size: filter.size || 25 }),
filter.withProposerIdentity,
);

this.server.to(roomName).emit('blocksUpdate', blocks);
}
}

handleDisconnect(_client: Socket) { }
}
25 changes: 25 additions & 0 deletions src/endpoints/network/network.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayDisconnect } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { NetworkService } from './network.service';

@WebSocketGateway({ cors: { origin: '*' } })
export class NetworkGateway implements OnGatewayDisconnect {
@WebSocketServer()
server!: Server;

constructor(private readonly networkService: NetworkService) { }

@SubscribeMessage('subscribeStats')
async handleSubscription(client: Socket) {
await client.join('statsRoom');
}

async pushStats() {
if (this.server.sockets.adapter.rooms.has('statsRoom')) {
Comment thread
cfaur09 marked this conversation as resolved.
Outdated
const stats = await this.networkService.getStats();
this.server.to('statsRoom').emit('statsUpdate', stats);
}
}

handleDisconnect(_client: Socket) { }
}
5 changes: 3 additions & 2 deletions src/endpoints/network/network.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TokenModule } from "../tokens/token.module";
import { TransactionModule } from "../transactions/transaction.module";
import { VmQueryModule } from "../vm.query/vm.query.module";
import { NetworkService } from "./network.service";
import { NetworkGateway } from "./network.gateway";

@Module({
imports: [
Expand All @@ -21,10 +22,10 @@ import { NetworkService } from "./network.service";
forwardRef(() => SmartContractResultModule),
],
providers: [
NetworkService,
NetworkService, NetworkGateway,
],
exports: [
NetworkService,
NetworkService, NetworkGateway,
],
})
export class NetworkModule { }
74 changes: 74 additions & 0 deletions src/endpoints/transactions/transaction.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayDisconnect } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { TransactionService } from './transaction.service';
import { TransactionFilter } from './entities/transaction.filter';
import { QueryPagination } from 'src/common/entities/query.pagination';
import { TransactionQueryOptions } from './entities/transactions.query.options';

@WebSocketGateway({ cors: { origin: '*' } })
export class TransactionsGateway implements OnGatewayDisconnect {
@WebSocketServer()
server!: Server;

constructor(private readonly transactionService: TransactionService) { }

@SubscribeMessage('subscribeTransactions')
async handleSubscription(client: Socket, payload: any) {
const filterHash = JSON.stringify(payload);
await client.join(`tx-${filterHash}`);
}

async pushTransactions() {
for (const [roomName] of this.server.sockets.adapter.rooms) {
if (!roomName.startsWith("tx-")) continue;

const filterHash = roomName.replace("tx-", "");
const filter = JSON.parse(filterHash);

const options = TransactionQueryOptions.applyDefaultOptions(filter.size || 25, {
withScResults: filter.withScResults,
withOperations: filter.withOperations,
withLogs: filter.withLogs,
withScamInfo: filter.withScamInfo,
withUsername: filter.withUsername,
withBlockInfo: filter.withBlockInfo,
withActionTransferValue: filter.withActionTransferValue,
});

const transactionFilter = new TransactionFilter({
sender: filter.sender,
receivers: filter.receiver,
token: filter.token,
functions: filter.functions,
senderShard: filter.senderShard,
receiverShard: filter.receiverShard,
miniBlockHash: filter.miniBlockHash,
hashes: filter.hashes,
status: filter.status,
before: filter.before,
after: filter.after,
condition: filter.condition,
order: filter.order,
relayer: filter.relayer,
isRelayed: filter.isRelayed,
isScCall: filter.isScCall,
round: filter.round,
withRelayedScresults: filter.withRelayedScresults,
});

TransactionFilter.validate(transactionFilter, filter.size || 25);

const txs = await this.transactionService.getTransactions(
transactionFilter,
new QueryPagination({ from: filter.from || 0, size: filter.size || 25 }),
options,
undefined,
filter.fields || [],
);

this.server.to(roomName).emit('transactionUpdate', txs);
}
}

handleDisconnect(_client: Socket) { }
}
Loading
Loading