1- import { Injectable } from '@nestjs/common' ;
2- import { Cron } from '@nestjs/schedule' ;
1+ import { Injectable , OnModuleInit } from '@nestjs/common' ;
2+ import { Cron , SchedulerRegistry } from '@nestjs/schedule' ;
33import { TransactionsGateway } from './transaction.gateway' ;
44import { BlocksGateway } from 'src/crons/websocket/blocks.gateway' ;
55import { NetworkGateway } from 'src/crons/websocket/network.gateway' ;
6- import { Lock } from "@multiversx/sdk-nestjs-common" ;
6+ import { Lock , Locker } from "@multiversx/sdk-nestjs-common" ;
77import { PoolGateway } from 'src/crons/websocket/pool.gateway' ;
88import { EventsGateway } from 'src/crons/websocket/events.gateway' ;
99import { WebSocketGateway , WebSocketServer } from '@nestjs/websockets' ;
@@ -21,10 +21,11 @@ import { TransactionsCustomGateway } from './transaction.custom.gateway';
2121import { ConnectionHandler } from './connection.handler' ;
2222import { EventsCustomGateway } from './events.custom.gateway' ;
2323import { TransfersCustomGateway } from './transfers.custom.gateway' ;
24+ import { ApiConfigService } from 'src/common/api-config/api.config.service' ;
2425
2526@Injectable ( )
2627@WebSocketGateway ( { cors : { origin : '*' } , path : '/ws/subscription' } )
27- export class WebsocketCronService {
28+ export class WebsocketCronService implements OnModuleInit {
2829 @WebSocketServer ( )
2930 server ! : Server ;
3031
@@ -43,41 +44,91 @@ export class WebsocketCronService {
4344 private readonly eventsCustomGateway : EventsCustomGateway ,
4445 private readonly connectionHandler : ConnectionHandler ,
4546 private readonly transfersCustomGateway : TransfersCustomGateway ,
47+ private readonly apiConfigService : ApiConfigService ,
48+ private readonly schedulerRegistry : SchedulerRegistry ,
4649 ) { }
4750
48- @Cron ( '*/6 * * * * *' )
49- @Lock ( { name : 'Push transactions to subscribers' , verbose : true } )
51+
52+ onModuleInit ( ) {
53+ const intervalMs = this . apiConfigService . getWebsocketSubscriptionBroadcastIntervalMs ( ) ;
54+
55+ this . registerDynamicInterval (
56+ 'push-transactions' ,
57+ intervalMs ,
58+ 'Push transactions to subscribers' ,
59+ async ( ) => await this . handleTransactionsUpdate ( )
60+ ) ;
61+
62+ this . registerDynamicInterval (
63+ 'push-blocks' ,
64+ intervalMs ,
65+ 'Push blocks to subscribers' ,
66+ async ( ) => await this . handleBlocksUpdate ( )
67+ ) ;
68+
69+ this . registerDynamicInterval (
70+ 'push-stats' ,
71+ intervalMs ,
72+ 'Push stats to subscribers' ,
73+ async ( ) => await this . handleStatsUpdate ( )
74+ ) ;
75+
76+ this . registerDynamicInterval (
77+ 'push-pool' ,
78+ intervalMs ,
79+ 'Push pool transactions to subscribers' ,
80+ async ( ) => await this . handlePoolUpdate ( )
81+ ) ;
82+
83+ this . registerDynamicInterval (
84+ 'push-events' ,
85+ intervalMs ,
86+ 'Push events to subscribers' ,
87+ async ( ) => await this . handleEventsUpdate ( )
88+ ) ;
89+
90+ this . registerDynamicInterval (
91+ 'push-custom-data' ,
92+ intervalMs ,
93+ 'Push custom data to subscribers' ,
94+ async ( ) => await this . handleCustomDataUpdate ( )
95+ ) ;
96+
97+
98+ }
99+
100+ private registerDynamicInterval ( name : string , ms : number , lockMessage : string , callback : ( ) => Promise < void > ) {
101+ const interval = setInterval ( async ( ) => {
102+ await Locker . lock ( lockMessage , async ( ) => {
103+ await callback ( ) ;
104+ } , true ) ;
105+ } , ms ) ;
106+
107+ this . schedulerRegistry . addInterval ( name , interval ) ;
108+ }
109+
50110 async handleTransactionsUpdate ( ) {
51111 await this . transactionsGateway . pushTransactions ( ) ;
52112 }
53113
54- @Cron ( '*/6 * * * * *' )
55- @Lock ( { name : 'Push blocks to subscribers' , verbose : true } )
56114 async handleBlocksUpdate ( ) {
57115 await this . blocksGateway . pushBlocks ( ) ;
58116 }
59117
60- @Cron ( '*/6 * * * * *' )
61- @Lock ( { name : 'Push stats to subscribers' , verbose : true } )
62118 async handleStatsUpdate ( ) {
63119 await this . networkGateway . pushStats ( ) ;
64120 }
65121
66- @Cron ( '*/6 * * * * *' )
67- @Lock ( { name : 'Push pool transactions to subscribers' , verbose : true } )
68- async handlePoolTransactions ( ) {
122+ async handlePoolUpdate ( ) {
69123 await this . poolGateway . pushPool ( ) ;
70124 }
71125
72- @Cron ( '*/6 * * * * *' )
73- @Lock ( { name : 'Push events to subscribers' , verbose : true } )
126+
74127 async handleEventsUpdate ( ) {
75128 await this . eventsGateway . pushEvents ( ) ;
76129 }
77130
78- @Cron ( '*/3 * * * * *' )
79- @Lock ( { name : 'Push custom data to subscribers' , verbose : true } )
80- async handleCustomTransactionsUpdate ( ) {
131+ async handleCustomDataUpdate ( ) {
81132 if ( this . connectionHandler . hasSubscriptionsWithPrefixes ( [ TransactionsCustomGateway . keyPrefix ] ) === false ) {
82133 this . cacheService . deleteLocal ( CacheInfo . WsTimestampMsToProcess ( ) . key ) ;
83134 return ;
0 commit comments