Skip to content

Commit 1773702

Browse files
feat(Mountain): add native WebSocket server implementation with client management
- Introduces `mist.rs` as a feature-gated (`mist_native`) native WebSocket server within Mountain's Rust backend - Implements TCP listener with tokio+tungstenite for handling WebSocket connections, eliminating need for separate Node.js sidecar - Manages client connections via atomic IDs and global MPSC channel map (`CONNECTIONS`) for efficient message routing - Spawns async reader/writer tasks per connection to handle bidirectional communication: - Reader parses incoming messages (text/binary), emits `mist://message` Tauri events for frontend/extension handling - Writer sends messages from Mountain components via channel-based queue - Provides `send_message_to_client` API for Track handlers/effects to push data to specific clients - Emits connection lifecycle events (`mist_client_connected/disconnected`) for state tracking - Integrates with Land's architecture by: - Using Tauri's event system for Sky frontend communication - Preparing foundation for direct extension host communication via WebSocket - Following Mountain's async runtime patterns (tokio tasks, atomic state) - Reduces external dependencies by implementing WebSocket layer natively in Rust, aligning with Land's performance/efficiency goals
1 parent c90dfc0 commit 1773702

1 file changed

Lines changed: 345 additions & 0 deletions

File tree

Source/mist.rs

Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
// ---------------------------------------------------------------------------------------------
2+
// Mountain Mist - Native WebSocket Server (mist.rs) [Feature Gated:
3+
// mist_native]
4+
// --------------------------------------------------------------------------------------------
5+
// Implements an optional native WebSocket server within Mountain, allowing
6+
// direct connections from clients (like the Sky frontend) if the `mist_native`
7+
// feature flag is enabled during build. This serves as an alternative to
8+
// requiring a separate Node.js-based Mist sidecar.
9+
//
10+
// Responsibilities:
11+
// - Starting a `tokio::net::TcpListener` on a configured port.
12+
// - Accepting incoming TCP connections.
13+
// - Performing WebSocket handshakes using `tokio-tungstenite`.
14+
// - Managing active client connections:
15+
// - Assigning unique connection IDs.
16+
// - Storing sender channels (`mpsc::Sender<WsMessage>`) for each connection
17+
// in a global map (`CONNECTIONS`).
18+
// - Spawning tasks (`handle_connection`, reader/writer tasks) for each client
19+
// to:
20+
// - Read incoming WebSocket messages (`ws_receiver.next().await`).
21+
// - Process received messages (e.g., parse JSON, emit Tauri events like
22+
// `mist://message`).
23+
// - Send outgoing messages received via the MPSC channel
24+
// (`rx_from_mountain.recv().await`).
25+
// - Providing a public function (`send_message_to_client`) for other Mountain
26+
// components to send string messages to specific clients via their connection
27+
// ID.
28+
// - Handling connection cleanup (removing from `CONNECTIONS`, emitting
29+
// disconnect event) when a client disconnects or an error occurs.
30+
//
31+
// Key Interactions:
32+
// - Started conditionally in `main.rs` via `tokio::spawn`.
33+
// - Uses `tokio` (TCP, MPSC channels, tasks) and `tokio-tungstenite`.
34+
// - Manages internal state (`CONNECTIONS`, `NEXT_CONN_ID`).
35+
// - Emits Tauri events (`mist_client_connected`, `mist_client_disconnected`,
36+
// `mist://message`).
37+
// - `send_message_to_client` can be called by handlers (e.g.,
38+
// `handlers::mist::handle_ws_send`) or effects to push data to the frontend.
39+
// --------------------------------------------------------------------------------------------
40+
41+
use std::{
42+
collections::HashMap,
43+
net::SocketAddr,
44+
sync::{
45+
Arc,
46+
Mutex as StdMutex,
47+
atomic::{AtomicU32, Ordering},
48+
},
49+
};
50+
51+
use futures_util::{SinkExt, StreamExt};
52+
use once_cell::sync::Lazy;
53+
use serde_json::{Value, json};
54+
use tauri::{AppHandle, Manager, Runtime};
55+
use tokio::{
56+
net::{TcpListener, TcpStream},
57+
sync::mpsc, // Use tokio's MPSC channels for inter-task communication
58+
};
59+
use tokio_tungstenite::{
60+
WebSocketStream,
61+
accept_async,
62+
tungstenite::{Error as WsError, Message as WsMessage},
63+
};
64+
65+
use crate::{track, vine}; // Include track/vine if needed for error types or future interactions
66+
67+
// --- Mist Error Type (Specific for this module) ---
68+
#[derive(Debug, thiserror::Error)]
69+
pub enum MistError {
70+
#[error("WebSocket listener failed: {0}")]
71+
ListenError(String),
72+
#[error("Failed to accept TCP connection: {0}")]
73+
AcceptError(std::io::Error),
74+
#[error("WebSocket handshake error: {0}")]
75+
HandshakeError(WsError),
76+
#[error("Failed to send message to client {0}: {1}")]
77+
SendError(u32, String),
78+
#[error("Failed to receive message from client {0}: {1}")]
79+
ReceiveError(u32, WsError),
80+
#[error("Client connection {0} not found")]
81+
ConnectionNotFound(u32),
82+
#[error("Serialization failed: {0}")]
83+
SerializationError(#[from] serde_json::Error),
84+
}
85+
86+
// --- Server State ---
87+
88+
// Type alias for the map storing senders to client tasks.
89+
// Key: Connection ID (u32), Value: Sender channel to the connection's writer
90+
// task.
91+
type ConnectionMap = Arc<StdMutex<HashMap<u32, mpsc::Sender<WsMessage>>>>; // Send WsMessage directly
92+
93+
// Global map holding communication channels to active WebSocket clients.
94+
static CONNECTIONS:Lazy<ConnectionMap> = Lazy::new(Default::default);
95+
// Atomic counter for assigning unique connection IDs.
96+
static NEXT_CONN_ID:Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(1));
97+
98+
// --- Server Initialization ---
99+
100+
/// Starts the native WebSocket server, listening for incoming connections.
101+
/// This function should be called once during Mountain's setup.
102+
pub async fn start_websocket_server<R:Runtime>(app_handle:AppHandle<R>) -> Result<(), MistError> {
103+
// TODO: Make port configurable via environment or AppState
104+
let port = 9001;
105+
let addr = format!("127.0.0.1:{}", port);
106+
107+
// Bind the TCP listener.
108+
let listener = TcpListener::bind(&addr)
109+
.await
110+
.map_err(|e| MistError::ListenError(format!("Failed to bind to {}: {}", addr, e)))?;
111+
println!("[Mist] Native WebSocket server listening on {}", addr);
112+
113+
// Accept connections in a loop.
114+
loop {
115+
match listener.accept().await {
116+
Ok((stream, peer_addr)) => {
117+
println!("[Mist] Accepted new TCP connection from: {}", peer_addr);
118+
let app_handle_clone = app_handle.clone();
119+
// Spawn a dedicated task to handle the WebSocket handshake and communication.
120+
tokio::spawn(handle_connection(stream, peer_addr, app_handle_clone));
121+
},
122+
Err(e) => {
123+
// Log error but continue accepting connections if possible.
124+
eprintln!("[Mist] Failed to accept TCP connection: {}", e);
125+
// Consider adding a small delay here if accept fails rapidly.
126+
// If the listener error is fatal, this loop might need to break
127+
// or return the error. For now, assume we can continue.
128+
},
129+
}
130+
}
131+
// Note: In this simple form, the server loop never exits unless the
132+
// listener fails fatally. Ok(())
133+
}
134+
135+
// --- Connection Handling ---
136+
137+
/// Handles an individual WebSocket connection after the TCP connection is
138+
/// accepted.
139+
async fn handle_connection<R:Runtime>(stream:TcpStream, peer_addr:SocketAddr, app_handle:AppHandle<R>) {
140+
// Perform the WebSocket handshake.
141+
match accept_async(stream).await {
142+
Ok(ws_stream) => {
143+
// Assign a unique ID to this connection.
144+
let conn_id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
145+
println!("[Mist] WebSocket handshake successful for {} [ID={}]", peer_addr, conn_id);
146+
147+
// Split the WebSocket stream into a sender and receiver.
148+
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
149+
150+
// Create an MPSC channel for sending messages *to* this client *from* other
151+
// parts of Mountain. We send pre-formatted WsMessage objects for
152+
// flexibility.
153+
let (tx_to_client, mut rx_from_mountain) = mpsc::channel::<WsMessage>(100);
154+
155+
// Register the sender channel in the global map.
156+
{
157+
// Scope the lock guard.
158+
let mut conns = CONNECTIONS.lock().unwrap();
159+
conns.insert(conn_id, tx_to_client);
160+
println!("[Mist] Registered sender for Conn ID {}", conn_id);
161+
}
162+
163+
// Emit a Tauri event signalling a new client connection
164+
app_handle
165+
.emit_all(
166+
"mist_client_connected",
167+
json!({ "connId": conn_id, "peerAddr": peer_addr.to_string() }),
168+
)
169+
.ok();
170+
171+
// --- Task: Reading messages FROM the client ---
172+
let app_handle_reader = app_handle.clone();
173+
let reader_task = tokio::spawn(async move {
174+
loop {
175+
match ws_receiver.next().await {
176+
Some(Ok(msg)) => {
177+
// Process different message types
178+
if msg.is_text() || msg.is_binary() {
179+
println!("[Mist Rx][Conn {}] Received message ({} bytes)", conn_id, msg.len());
180+
181+
// --- Message Processing Logic ---
182+
// TODO: Parse the message payload (e.g., assume JSON text)
183+
let payload_value:Value = if msg.is_text() {
184+
serde_json::from_str(msg.to_text().unwrap_or_default()).unwrap_or_else(|e| {
185+
eprintln!(
186+
"[Mist Rx][Conn {}] Failed to parse text message as JSON: {}",
187+
conn_id, e
188+
);
189+
json!({ "parseError": e.to_string(), "original": msg.to_text().unwrap_or_default() })
190+
})
191+
} else {
192+
// Handle binary data if needed, maybe base64 encode for event?
193+
json!({ "binaryDataLength": msg.len() })
194+
};
195+
196+
// --- Option A: Emit Tauri event ---
197+
// This is generally safer than directly calling Track from the network task.
198+
// Frontend or a dedicated Tauri-side listener can handle the event.
199+
if let Err(e) = app_handle_reader.emit_all(
200+
"mist://message", // Define a clear event name
201+
json!({"connId": conn_id, "payload": payload_value}),
202+
) {
203+
eprintln!("[Mist Rx][Conn {}] Failed to emit Tauri event: {}", conn_id, e);
204+
}
205+
206+
// --- Option B: Send to a central queue/actor
207+
// (More complex) --- Not implemented
208+
// here.
209+
} else if msg.is_close() {
210+
println!("[Mist Rx][Conn {}] Received WebSocket close frame.", conn_id);
211+
break; // Exit loop on close frame
212+
} else if msg.is_ping() {
213+
println!("[Mist Rx][Conn {}] Received ping.", conn_id);
214+
// tokio-tungstenite handles sending pongs
215+
// automatically.
216+
} else if msg.is_pong() {
217+
println!("[Mist Rx][Conn {}] Received pong.", conn_id);
218+
}
219+
},
220+
Some(Err(WsError::ConnectionClosed)) => {
221+
println!("[Mist Rx][Conn {}] Connection closed normally by peer.", conn_id);
222+
break; // Exit loop
223+
},
224+
Some(Err(e)) => {
225+
eprintln!("[Mist Rx][Conn {}] Error receiving message: {}", conn_id, e);
226+
break; // Exit loop on error
227+
},
228+
None => {
229+
println!("[Mist Rx][Conn {}] WebSocket receiver stream ended.", conn_id);
230+
break; // Exit loop if stream ends
231+
},
232+
}
233+
}
234+
println!("[Mist Rx][Conn {}] Reader task finished.", conn_id);
235+
});
236+
237+
// --- Task: Sending messages TO the client ---
238+
let writer_task = tokio::spawn(async move {
239+
while let Some(message_to_send) = rx_from_mountain.recv().await {
240+
if message_to_send.is_text() {
241+
// Log text messages carefully
242+
println!(
243+
"[Mist Tx][Conn {}] Sending message: {}",
244+
conn_id,
245+
message_to_send.to_text().unwrap_or("").chars().take(100).collect::<String>() /* Log truncated text */
246+
);
247+
} else {
248+
println!(
249+
"[Mist Tx][Conn {}] Sending non-text message (type: {:?}, len: {})",
250+
conn_id,
251+
message_to_send.type_id(),
252+
message_to_send.len()
253+
);
254+
}
255+
256+
if let Err(e) = ws_sender.send(message_to_send).await {
257+
eprintln!("[Mist Tx][Conn {}] Error sending message: {}", conn_id, e);
258+
// If sending fails, the connection is likely broken. Stop the task.
259+
break;
260+
}
261+
}
262+
println!(
263+
"[Mist Tx][Conn {}] Writer task exiting (channel closed or send error).",
264+
conn_id
265+
);
266+
// Attempt graceful close on exit?
267+
// let _ = ws_sender.close().await;
268+
});
269+
270+
// Wait for either the reader or writer task to finish.
271+
// This indicates the connection is closing or has errored.
272+
tokio::select! {
273+
_ = reader_task => { println!("[Mist][Conn {}] Reader task completed.", conn_id); },
274+
_ = writer_task => { println!("[Mist][Conn {}] Writer task completed.", conn_id); },
275+
}
276+
277+
// --- Cleanup ---
278+
println!("[Mist] Cleaning up connection state for Conn ID {}", conn_id);
279+
{
280+
// Remove the sender channel from the global map.
281+
let mut conns = CONNECTIONS.lock().unwrap();
282+
if conns.remove(&conn_id).is_some() {
283+
println!("[Mist] Unregistered sender for Conn ID {}", conn_id);
284+
}
285+
}
286+
// Emit event signalling client disconnection
287+
app_handle
288+
.emit_all("mist_client_disconnected", json!({ "connId": conn_id }))
289+
.ok();
290+
291+
// WebSocket stream (`ws_sender`, `ws_receiver`) is dropped here,
292+
// closing the connection.
293+
},
294+
Err(e) => {
295+
// Handshake failed.
296+
eprintln!("[Mist] WebSocket handshake error with {}: {}", peer_addr, e);
297+
},
298+
}
299+
}
300+
301+
// --- Public API for Sending Messages ---
302+
303+
/// Sends a message (as a String) to a specific connected WebSocket client.
304+
/// Called by other parts of Mountain (e.g., Track handlers).
305+
pub async fn send_message_to_client(conn_id:u32, message:String) -> Result<(), MistError> {
306+
let sender = {
307+
let conns_guard = CONNECTIONS.lock().unwrap();
308+
conns_guard.get(&conn_id).cloned() // Clone the mpsc::Sender
309+
};
310+
311+
if let Some(tx) = sender {
312+
// Wrap the string message into a tungstenite Text message.
313+
let ws_msg = WsMessage::Text(message);
314+
// Send it via the channel to the client's writer task.
315+
tx.send(ws_msg)
316+
.await
317+
.map_err(|e| MistError::SendError(conn_id, format!("MPSC channel send error: {}", e)))
318+
} else {
319+
Err(MistError::ConnectionNotFound(conn_id))
320+
}
321+
}
322+
323+
// --- Example handler called by Track for ws_send command (Illustrative) ---
324+
// This demonstrates how Track might use `send_message_to_client`.
325+
pub async fn handle_ws_send<R:Runtime>(
326+
_app:AppHandle<R>,
327+
_window:Window<R>,
328+
args:Vec<Value>, // Assuming Track provides args as Vec<Value>
329+
) -> Result<Value, String> {
330+
println!("[Mist Handler] Handling ws_send request: {:?}", args);
331+
// TODO: Robust arg parsing
332+
let conn_id_val = args.get(0).ok_or("Missing connection ID argument".to_string())?;
333+
let payload = args.get(1).cloned().ok_or("Missing payload argument".to_string())?;
334+
335+
let conn_id = conn_id_val.as_u64().ok_or("Connection ID must be a number".to_string())? as u32;
336+
337+
// Serialize the payload back to a JSON string to send
338+
let message_string = serde_json::to_string(&payload).map_err(|e| e.to_string())?;
339+
340+
println!("[Mist Handler] Sending payload to Conn ID {}: {}", conn_id, message_string);
341+
send_message_to_client(conn_id, message_string)
342+
.await
343+
.map(|_| Value::Null) // Return null on success
344+
.map_err(|e| e.to_string()) // Map MistError to String
345+
}

0 commit comments

Comments
 (0)