1+
12use std:: collections:: { HashMap , HashSet } ;
23use crossbeam:: channel:: { Receiver , Sender } ;
34use wg_internal:: network:: NodeId ;
45use wg_internal:: packet:: { NodeType , Packet } ;
56use common:: { FragmentAssembler , RoutingHandler } ;
67use common:: packet_processor:: Processor ;
7- use common:: types:: { ChatCommand , ChatEvent , ChatRequest , ChatResponse , Command , Event , NodeCommand , ServerType } ;
8+ use common:: types:: { ChatCommand , ChatEvent , ChatRequest , ChatResponse , Command , Event , NodeCommand , NodeEvent , ServerType } ;
89
910pub struct ChatServer {
1011 routing_handler : RoutingHandler ,
1112 controller_recv : Receiver < Box < dyn Command > > ,
1213 controller_send : Sender < Box < dyn Event > > ,
1314 packet_recv : Receiver < Packet > ,
14- _id : NodeId ,
15+ id : NodeId ,
1516 assembler : FragmentAssembler ,
1617 registered_clients : HashSet < NodeId > ,
1718}
1819
1920impl ChatServer {
21+ #[ must_use]
2022 pub fn new ( id : NodeId , neighbors : HashMap < NodeId , Sender < Packet > > , packet_recv : Receiver < Packet > , controller_recv : Receiver < Box < dyn Command > > , controller_send : Sender < Box < dyn Event > > ) -> Self {
2123 let router = RoutingHandler :: new ( id, NodeType :: Server , neighbors, controller_send. clone ( ) ) ;
2224 Self {
2325 routing_handler : router,
2426 controller_recv,
2527 controller_send,
2628 packet_recv,
27- _id : id,
29+ id,
2830 assembler : FragmentAssembler :: default ( ) ,
2931 registered_clients : HashSet :: new ( ) ,
3032 }
3133 }
34+ #[ must_use]
3235 pub fn get_registered_clients ( & self ) -> Vec < NodeId > {
33- self . registered_clients . iter ( ) . cloned ( ) . collect ( )
36+ self . registered_clients . iter ( ) . copied ( ) . collect ( )
3437 }
3538}
3639
@@ -52,34 +55,72 @@ impl Processor for ChatServer {
5255 }
5356
5457 fn handle_msg ( & mut self , msg : Vec < u8 > , from : NodeId , session_id : u64 ) {
58+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageReceived {
59+ notification_from : self . id ,
60+ from
61+ } ) ) ;
5562 if let Ok ( msg) = serde_json:: from_slice :: < ChatRequest > ( & msg) {
5663 match msg {
5764 ChatRequest :: ServerTypeQuery => {
65+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: ServerTypeQueried {
66+ notification_from : self . id ,
67+ from
68+ } ) ) ;
5869 if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: ServerType { server_type : ServerType :: ChatServer } ) {
5970 let _ = self . routing_handler . send_message ( & res, from, Some ( session_id) ) ;
71+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageSent {
72+ notification_from : self . id ,
73+ to : from
74+ } ) ) ;
6075 }
6176 }
6277 ChatRequest :: RegistrationToChat { client_id } => {
6378 self . registered_clients . insert ( client_id) ;
6479 if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: RegistrationSuccess ) {
6580 let _ = self . routing_handler . send_message ( & res, from, Some ( session_id) ) ;
81+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageSent {
82+ notification_from : self . id ,
83+ to : from
84+ } ) ) ;
85+ let _ = self . controller_send . send ( Box :: new ( ChatEvent :: ClientRegistered {
86+ client : client_id,
87+ server : self . id
88+ } ) ) ;
6689 }
6790 }
6891 ChatRequest :: ClientListQuery => {
69- let client_list = self . registered_clients . iter ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
92+ let _ = self . controller_send . send ( Box :: new ( ChatEvent :: ClientListQueried {
93+ notification_from : self . id ,
94+ from
95+ } ) ) ;
96+ let client_list = self . registered_clients . iter ( ) . copied ( ) . collect :: < Vec < _ > > ( ) ;
7097 if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: ClientList { list_of_client_ids : client_list} ) {
7198 let _ = self . routing_handler . send_message ( & res, from, Some ( session_id) ) ;
99+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageSent {
100+ notification_from : self . id ,
101+ to : from
102+ } ) ) ;
72103 }
73104 }
74105 ChatRequest :: MessageFor { client_id, message } => {
75106 if !self . registered_clients . contains ( & client_id) {
76- if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: ErrorWrongClientId ) {
107+ if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: ErrorWrongClientId {
108+ wrong_id : client_id
109+ } ) {
77110 let _ = self . routing_handler . send_message ( & res, from, Some ( session_id) ) ;
111+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageSent {
112+ notification_from : self . id ,
113+ to : from
114+ } ) ) ;
78115 }
79116 return
80117 }
81118 if let Ok ( res) = serde_json:: to_vec ( & ChatResponse :: MessageFrom { client_id : from, message } ) {
82119 let _ = self . routing_handler . send_message ( & res, client_id, Some ( session_id) ) ;
120+ let _ = self . controller_send . send ( Box :: new ( NodeEvent :: MessageSent {
121+ notification_from : self . id ,
122+ to : client_id
123+ } ) ) ;
83124 }
84125 }
85126 }
@@ -94,22 +135,22 @@ impl Processor for ChatServer {
94135 NodeCommand :: RemoveSender ( node_id) => self . routing_handler . remove_neighbor ( * node_id) ,
95136 NodeCommand :: Shutdown => return true
96137 }
97- } else if let Some ( cmd) = cmd. downcast_ref :: < ChatCommand > ( ) {
98- match cmd {
99- ChatCommand :: GetRegisteredClients => {
100- let registered_clients = self . get_registered_clients ( ) ;
101- if self . controller_send . send ( Box :: new ( ChatEvent :: RegisteredClients ( registered_clients) ) ) . is_err ( ) {
102- return true ;
103- }
104- }
105- _ => { }
138+ } else if let Some ( ChatCommand :: GetRegisteredClients ) = cmd. downcast_ref :: < ChatCommand > ( ) {
139+ let registered_clients = self . get_registered_clients ( ) ;
140+ if self . controller_send . send ( Box :: new ( ChatEvent :: RegisteredClients {
141+ notification_from : self . id ,
142+ list : registered_clients
143+ } ) ) . is_err ( ) {
144+ return true ;
106145 }
146+
107147 }
108148 false
109149 }
110150}
111151
112152mod communication_server_tests {
153+ #[ allow( clippy:: wildcard_imports) ]
113154 use super :: * ;
114155 use crossbeam:: channel:: unbounded;
115156
0 commit comments