@@ -61,54 +61,83 @@ export default class SocketBroker implements IWebSocketBroker {
6161 this . clients [ client . id ] = client ;
6262 }
6363 client . onMessage ( async ( message ) => {
64- if ( message . toString ( ) === 'ping' ) {
64+ const messageText = message . toString ( ) ;
65+
66+ if ( ! messageText . trim ( ) ) {
67+ return ;
68+ }
69+
70+ if ( messageText === 'ping' ) {
6571 client . send ( 'pong' ) ;
6672 client . lastPing = Date . now ( ) ;
67- } else {
68- const data = JSON . parse ( message ) ;
69- if ( data . type === 'subscribe' ) {
70- if ( ! data . topic . startsWith ( '/opentopic/' ) ) {
71- if ( this . adminforth . config . auth . websocketTopicAuth ) {
72- let authResult = false ;
73- try {
74- authResult = await this . adminforth . config . auth . websocketTopicAuth ( data . topic , client . adminUser ) ;
75- } catch ( e ) {
76- afLogger . error ( `Error in websocketTopicAuth, assuming connection not allowed ${ e } ` ) ;
77- }
78- if ( ! authResult ) {
79- client . send ( JSON . stringify ( { type : 'error' , message : 'Unauthorized' } ) ) ;
80- return ;
81- }
73+ return ;
74+ }
75+
76+ let data : unknown ;
77+
78+ try {
79+ data = JSON . parse ( messageText ) ;
80+ } catch ( e ) {
81+ client . send ( JSON . stringify ( { type : 'error' , message : 'Invalid websocket message JSON' } ) ) ;
82+ return ;
83+ }
84+
85+ if ( ! data || typeof data !== 'object' || Array . isArray ( data ) ) {
86+ client . send ( JSON . stringify ( { type : 'error' , message : 'Invalid websocket message format' } ) ) ;
87+ return ;
88+ }
89+
90+ const payload = data as { type ?: unknown ; topic ?: unknown } ;
91+
92+ if ( payload . type !== 'subscribe' && payload . type !== 'unsubscribe' ) {
93+ client . send ( JSON . stringify ( { type : 'error' , message : 'Unknown websocket message type' } ) ) ;
94+ return ;
95+ }
96+
97+ if ( typeof payload . topic !== 'string' || ! payload . topic ) {
98+ client . send ( JSON . stringify ( { type : 'error' , message : 'No topic provided' } ) ) ;
99+ return ;
100+ }
101+
102+ const topic = payload . topic ;
103+
104+ if ( payload . type === 'subscribe' ) {
105+ if ( ! topic . startsWith ( '/opentopic/' ) ) {
106+ if ( this . adminforth . config . auth . websocketTopicAuth ) {
107+ let authResult = false ;
108+ try {
109+ authResult = await this . adminforth . config . auth . websocketTopicAuth ( topic , client . adminUser ) ;
110+ } catch ( e ) {
111+ afLogger . error ( `Error in websocketTopicAuth, assuming connection not allowed ${ e } ` ) ;
112+ }
113+ if ( ! authResult ) {
114+ client . send ( JSON . stringify ( { type : 'error' , message : 'Unauthorized' } ) ) ;
115+ return ;
82116 }
83117 }
84- if ( ! data . topic ) {
85- client . send ( JSON . stringify ( { type : 'error' , message : 'No topic provided' } ) ) ;
86- }
87- if ( ! this . topics [ data . topic ] ) {
88- this . topics [ data . topic ] = [ ] ;
89- }
90- this . topics [ data . topic ] . push ( client ) ;
91- client . topics . add ( data . topic ) ;
92- if ( this . adminforth . config . auth . websocketSubscribed ) {
93- ( async ( ) => {
94- try {
95- await this . adminforth . config . auth . websocketSubscribed ( data . topic , client . adminUser ) ;
96- } catch ( e ) {
97- afLogger . error ( `Error in websocketSubscribed for topic ${ data . topic } , ${ e } ` ) ;
98- }
99- } ) ( ) ; // run in background
100- }
101- } else if ( data . type === 'unsubscribe' ) {
102- if ( ! data . topic ) {
103- client . send ( JSON . stringify ( { type : 'error' , message : 'No topic provided' } ) ) ;
104- }
105-
106- this . deleteClientFromTopic ( client , data . topic ) ;
107- this . cleanupTopicIfEmpty ( data . topic ) ;
108-
109- client . topics . delete ( data . topic ) ;
110118 }
119+ if ( ! this . topics [ topic ] ) {
120+ this . topics [ topic ] = [ ] ;
121+ }
122+ if ( ! this . topics [ topic ] . includes ( client ) ) {
123+ this . topics [ topic ] . push ( client ) ;
124+ }
125+ client . topics . add ( topic ) ;
126+ if ( this . adminforth . config . auth . websocketSubscribed ) {
127+ ( async ( ) => {
128+ try {
129+ await this . adminforth . config . auth . websocketSubscribed ( topic , client . adminUser ) ;
130+ } catch ( e ) {
131+ afLogger . error ( `Error in websocketSubscribed for topic ${ topic } , ${ e } ` ) ;
132+ }
133+ } ) ( ) ; // run in background
134+ }
135+ return ;
111136 }
137+
138+ this . deleteClientFromTopic ( client , topic ) ;
139+ this . cleanupTopicIfEmpty ( topic ) ;
140+ client . topics . delete ( topic ) ;
112141 } ) ;
113142
114143 client . onClose ( ( ) => {
@@ -141,4 +170,4 @@ export default class SocketBroker implements IWebSocketBroker {
141170 }
142171 }
143172
144- }
173+ }
0 commit comments