3333// - Uses `tokio` (TCP, MPSC channels, tasks) and `tokio-tungstenite`.
3434// - Manages internal state (`CONNECTIONS`, `NEXT_CONN_ID`).
3535// - Emits Tauri events (`mist_client_connected`, `mist_client_disconnected`,
36+
3637// `mist://message`).
3738// - `send_message_to_client` can be called by handlers (e.g.,
39+
3840// `handlers::mist::handle_ws_send`) or effects to push data to the frontend.
3941// --------------------------------------------------------------------------------------------
4042
@@ -54,31 +56,40 @@ use serde_json::{Value, json};
5456use tauri:: { AppHandle , Manager , Runtime } ;
5557use tokio:: {
5658 net:: { TcpListener , TcpStream } ,
57- sync:: mpsc, // Use tokio's MPSC channels for inter-task communication
59+
60+ // Use tokio's MPSC channels for inter-task communication
61+ sync:: mpsc,
5862} ;
5963use tokio_tungstenite:: {
6064 WebSocketStream ,
6165 accept_async,
6266 tungstenite:: { Error as WsError , Message as WsMessage } ,
6367} ;
6468
65- use crate :: { track, vine} ; // Include track/vine if needed for error types or future interactions
69+ // Include track/vine if needed for error types or future interactions
70+ use crate :: { track, vine} ;
6671
6772// --- Mist Error Type (Specific for this module) ---
6873#[ derive( Debug , thiserror:: Error ) ]
6974pub enum MistError {
7075 #[ error( "WebSocket listener failed: {0}" ) ]
7176 ListenError ( String ) ,
77+
7278 #[ error( "Failed to accept TCP connection: {0}" ) ]
7379 AcceptError ( std:: io:: Error ) ,
80+
7481 #[ error( "WebSocket handshake error: {0}" ) ]
7582 HandshakeError ( WsError ) ,
83+
7684 #[ error( "Failed to send message to client {0}: {1}" ) ]
7785 SendError ( u32 , String ) ,
86+
7887 #[ error( "Failed to receive message from client {0}: {1}" ) ]
7988 ReceiveError ( u32 , WsError ) ,
89+
8090 #[ error( "Client connection {0} not found" ) ]
8191 ConnectionNotFound ( u32 ) ,
92+
8293 #[ error( "Serialization failed: {0}" ) ]
8394 SerializationError ( #[ from] serde_json:: Error ) ,
8495}
@@ -88,10 +99,12 @@ pub enum MistError {
8899// Type alias for the map storing senders to client tasks.
89100// Key: Connection ID (u32), Value: Sender channel to the connection's writer
90101// task.
91- type ConnectionMap = Arc < StdMutex < HashMap < u32 , mpsc:: Sender < WsMessage > > > > ; // Send WsMessage directly
102+ // Send WsMessage directly
103+ type ConnectionMap = Arc < StdMutex < HashMap < u32 , mpsc:: Sender < WsMessage > > > > ;
92104
93105// Global map holding communication channels to active WebSocket clients.
94106static CONNECTIONS : Lazy < ConnectionMap > = Lazy :: new ( Default :: default) ;
107+
95108// Atomic counter for assigning unique connection IDs.
96109static NEXT_CONN_ID : Lazy < AtomicU32 > = Lazy :: new ( || AtomicU32 :: new ( 1 ) ) ;
97110
@@ -102,32 +115,39 @@ static NEXT_CONN_ID:Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(1));
102115pub async fn start_websocket_server < R : Runtime > ( app_handle : AppHandle < R > ) -> Result < ( ) , MistError > {
103116 // TODO: Make port configurable via environment or AppState
104117 let port = 9001 ;
118+
105119 let addr = format ! ( "127.0.0.1:{}" , port) ;
106120
107121 // Bind the TCP listener.
108122 let listener = TcpListener :: bind ( & addr)
109123 . await
110124 . map_err ( |e| MistError :: ListenError ( format ! ( "Failed to bind to {}: {}" , addr, e) ) ) ?;
125+
111126 println ! ( "[Mist] Native WebSocket server listening on {}" , addr) ;
112127
113128 // Accept connections in a loop.
114129 loop {
115130 match listener. accept ( ) . await {
116131 Ok ( ( stream, peer_addr) ) => {
117132 println ! ( "[Mist] Accepted new TCP connection from: {}" , peer_addr) ;
133+
118134 let app_handle_clone = app_handle. clone ( ) ;
135+
119136 // Spawn a dedicated task to handle the WebSocket handshake and communication.
120137 tokio:: spawn ( handle_connection ( stream, peer_addr, app_handle_clone) ) ;
121138 } ,
139+
122140 Err ( e) => {
123141 // Log error but continue accepting connections if possible.
124142 eprintln ! ( "[Mist] Failed to accept TCP connection: {}" , e) ;
143+
125144 // Consider adding a small delay here if accept fails rapidly.
126145 // If the listener error is fatal, this loop might need to break
127146 // or return the error. For now, assume we can continue.
128147 } ,
129148 }
130149 }
150+
131151 // Note: In this simple form, the server loop never exits unless the
132152 // listener fails fatally. Ok(())
133153}
@@ -142,6 +162,7 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
142162 Ok ( ws_stream) => {
143163 // Assign a unique ID to this connection.
144164 let conn_id = NEXT_CONN_ID . fetch_add ( 1 , Ordering :: Relaxed ) ;
165+
145166 println ! ( "[Mist] WebSocket handshake successful for {} [ID={}]" , peer_addr, conn_id) ;
146167
147168 // Split the WebSocket stream into a sender and receiver.
@@ -156,7 +177,9 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
156177 {
157178 // Scope the lock guard.
158179 let mut conns = CONNECTIONS . lock ( ) . unwrap ( ) ;
180+
159181 conns. insert ( conn_id, tx_to_client) ;
182+
160183 println ! ( "[Mist] Registered sender for Conn ID {}" , conn_id) ;
161184 }
162185
@@ -170,6 +193,7 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
170193
171194 // --- Task: Reading messages FROM the client ---
172195 let app_handle_reader = app_handle. clone ( ) ;
196+
173197 let reader_task = tokio:: spawn ( async move {
174198 loop {
175199 match ws_receiver. next ( ) . await {
@@ -186,6 +210,7 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
186210 "[Mist Rx][Conn {}] Failed to parse text message as JSON: {}" ,
187211 conn_id, e
188212 ) ;
213+
189214 json ! ( { "parseError" : e. to_string( ) , "original" : msg. to_text( ) . unwrap_or_default( ) } )
190215 } )
191216 } else {
@@ -197,7 +222,8 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
197222 // This is generally safer than directly calling Track from the network task.
198223 // Frontend or a dedicated Tauri-side listener can handle the event.
199224 if let Err ( e) = app_handle_reader. emit_all (
200- "mist://message" , // Define a clear event name
225+ // Define a clear event name
226+ "mist://message" ,
201227 json ! ( { "connId" : conn_id, "payload" : payload_value} ) ,
202228 ) {
203229 eprintln ! ( "[Mist Rx][Conn {}] Failed to emit Tauri event: {}" , conn_id, e) ;
@@ -208,29 +234,42 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
208234 // here.
209235 } else if msg. is_close ( ) {
210236 println ! ( "[Mist Rx][Conn {}] Received WebSocket close frame." , conn_id) ;
211- break ; // Exit loop on close frame
237+
238+ // Exit loop on close frame
239+ break ;
212240 } else if msg. is_ping ( ) {
213241 println ! ( "[Mist Rx][Conn {}] Received ping." , conn_id) ;
242+
214243 // tokio-tungstenite handles sending pongs
215244 // automatically.
216245 } else if msg. is_pong ( ) {
217246 println ! ( "[Mist Rx][Conn {}] Received pong." , conn_id) ;
218247 }
219248 } ,
249+
220250 Some ( Err ( WsError :: ConnectionClosed ) ) => {
221251 println ! ( "[Mist Rx][Conn {}] Connection closed normally by peer." , conn_id) ;
222- break ; // Exit loop
252+
253+ // Exit loop
254+ break ;
223255 } ,
256+
224257 Some ( Err ( e) ) => {
225258 eprintln ! ( "[Mist Rx][Conn {}] Error receiving message: {}" , conn_id, e) ;
226- break ; // Exit loop on error
259+
260+ // Exit loop on error
261+ break ;
227262 } ,
263+
228264 None => {
229265 println ! ( "[Mist Rx][Conn {}] WebSocket receiver stream ended." , conn_id) ;
230- break ; // Exit loop if stream ends
266+
267+ // Exit loop if stream ends
268+ break ;
231269 } ,
232270 }
233271 }
272+
234273 println ! ( "[Mist Rx][Conn {}] Reader task finished." , conn_id) ;
235274 } ) ;
236275
@@ -255,42 +294,53 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
255294
256295 if let Err ( e) = ws_sender. send ( message_to_send) . await {
257296 eprintln ! ( "[Mist Tx][Conn {}] Error sending message: {}" , conn_id, e) ;
297+
258298 // If sending fails, the connection is likely broken. Stop the task.
259299 break ;
260300 }
261301 }
302+
262303 println ! (
263304 "[Mist Tx][Conn {}] Writer task exiting (channel closed or send error)." ,
264305 conn_id
265306 ) ;
307+
266308 // Attempt graceful close on exit?
267309 // let _ = ws_sender.close().await;
268310 } ) ;
269311
270312 // Wait for either the reader or writer task to finish.
271313 // This indicates the connection is closing or has errored.
272314 tokio:: select! {
315+
273316 _ = reader_task => { println!( "[Mist][Conn {}] Reader task completed." , conn_id) ; } ,
317+
274318 _ = writer_task => { println!( "[Mist][Conn {}] Writer task completed." , conn_id) ; } ,
319+
275320 }
276321
277322 // --- Cleanup ---
278323 println ! ( "[Mist] Cleaning up connection state for Conn ID {}" , conn_id) ;
324+
279325 {
280326 // Remove the sender channel from the global map.
281327 let mut conns = CONNECTIONS . lock ( ) . unwrap ( ) ;
328+
282329 if conns. remove ( & conn_id) . is_some ( ) {
283330 println ! ( "[Mist] Unregistered sender for Conn ID {}" , conn_id) ;
284331 }
285332 }
333+
286334 // Emit event signalling client disconnection
287335 app_handle
288336 . emit_all ( "mist_client_disconnected" , json ! ( { "connId" : conn_id } ) )
289337 . ok ( ) ;
290338
291339 // WebSocket stream (`ws_sender`, `ws_receiver`) is dropped here,
340+
292341 // closing the connection.
293342 } ,
343+
294344 Err ( e) => {
295345 // Handshake failed.
296346 eprintln ! ( "[Mist] WebSocket handshake error with {}: {}" , peer_addr, e) ;
@@ -305,12 +355,15 @@ async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, ap
305355pub async fn send_message_to_client ( conn_id : u32 , message : String ) -> Result < ( ) , MistError > {
306356 let sender = {
307357 let conns_guard = CONNECTIONS . lock ( ) . unwrap ( ) ;
308- conns_guard. get ( & conn_id) . cloned ( ) // Clone the mpsc::Sender
358+
359+ // Clone the mpsc::Sender
360+ conns_guard. get ( & conn_id) . cloned ( )
309361 } ;
310362
311363 if let Some ( tx) = sender {
312364 // Wrap the string message into a tungstenite Text message.
313365 let ws_msg = WsMessage :: Text ( message) ;
366+
314367 // Send it via the channel to the client's writer task.
315368 tx. send ( ws_msg)
316369 . await
@@ -324,12 +377,17 @@ pub async fn send_message_to_client(conn_id:u32, message:String) -> Result<(), M
324377// This demonstrates how Track might use `send_message_to_client`.
325378pub async fn handle_ws_send < R : Runtime > (
326379 _app : AppHandle < R > ,
380+
327381 _window : Window < R > ,
328- args : Vec < Value > , // Assuming Track provides args as Vec<Value>
382+
383+ // Assuming Track provides args as Vec<Value>
384+ args : Vec < Value > ,
329385) -> Result < Value , String > {
330386 println ! ( "[Mist Handler] Handling ws_send request: {:?}" , args) ;
387+
331388 // TODO: Robust arg parsing
332389 let conn_id_val = args. get ( 0 ) . ok_or ( "Missing connection ID argument" . to_string ( ) ) ?;
390+
333391 let payload = args. get ( 1 ) . cloned ( ) . ok_or ( "Missing payload argument" . to_string ( ) ) ?;
334392
335393 let conn_id = conn_id_val. as_u64 ( ) . ok_or ( "Connection ID must be a number" . to_string ( ) ) ? as u32 ;
@@ -338,8 +396,11 @@ pub async fn handle_ws_send<R:Runtime>(
338396 let message_string = serde_json:: to_string ( & payload) . map_err ( |e| e. to_string ( ) ) ?;
339397
340398 println ! ( "[Mist Handler] Sending payload to Conn ID {}: {}" , conn_id, message_string) ;
399+
341400 send_message_to_client ( conn_id, message_string)
342401 . await
343- . map ( |_| Value :: Null ) // Return null on success
344- . map_err ( |e| e. to_string ( ) ) // Map MistError to String
402+ // Return null on success
403+ . map ( |_| Value :: Null )
404+ // Map MistError to String
405+ . map_err ( |e| e. to_string ( ) )
345406}
0 commit comments