Skip to content

Commit fe899f8

Browse files
committed
[00226] Store Arc<RwLock<AppSession>> in AppSessionStore
- Change sessions map from HashMap<String, ()> to HashMap<String, Arc<RwLock<AppSession>>> - create_session now returns Arc<RwLock<AppSession>> shared between store and handler - Add get_session and connection_ids query methods for admin/monitoring - Add broadcast shutdown channel for graceful shutdown support - Update handle_socket to use Arc<RwLock<AppSession>> with tokio::select! for shutdown - Update existing tests and add 4 new tests (get_session, connection_ids, broadcast_shutdown, arc_lifecycle)
1 parent 76bc62c commit fe899f8

2 files changed

Lines changed: 165 additions & 52 deletions

File tree

rusty/src/server/session.rs

Lines changed: 113 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::HashMap;
22
use std::sync::Arc;
3-
use tokio::sync::RwLock;
3+
use tokio::sync::{broadcast, RwLock};
44

55
use crate::core::reconciler::Reconciler;
66
use crate::core::runtime::Runtime;
@@ -15,34 +15,39 @@ pub struct AppSession {
1515
}
1616

1717
/// Manages per-connection AppSessions, keyed by connection ID.
18-
/// Tracks active connections and creates isolated sessions on demand.
18+
/// Stores `Arc<RwLock<AppSession>>` references so both the store and handlers
19+
/// share ownership, enabling admin/monitoring, graceful shutdown, and timeout enforcement.
1920
pub struct AppSessionStore {
20-
sessions: RwLock<HashMap<String, ()>>,
21+
sessions: RwLock<HashMap<String, Arc<RwLock<AppSession>>>>,
2122
root_factory: Arc<dyn Fn() -> Box<dyn View> + Send + Sync>,
23+
shutdown_tx: broadcast::Sender<()>,
2224
}
2325

2426
impl AppSessionStore {
2527
pub fn new(root_factory: Arc<dyn Fn() -> Box<dyn View> + Send + Sync>) -> Self {
28+
let (shutdown_tx, _) = broadcast::channel(16);
2629
AppSessionStore {
2730
sessions: RwLock::new(HashMap::new()),
2831
root_factory,
32+
shutdown_tx,
2933
}
3034
}
3135

3236
/// Create a new session with an isolated Runtime and Reconciler.
33-
/// Registers the connection and returns the session for the handler to own.
34-
pub async fn create_session(&self, connection_id: String) -> AppSession {
37+
/// Registers the connection and returns an Arc reference to the session.
38+
pub async fn create_session(&self, connection_id: String) -> Arc<RwLock<AppSession>> {
3539
let view = (self.root_factory)();
3640
let runtime = Runtime::new(FuncView(view));
3741
let reconciler = Reconciler::new();
42+
let session = Arc::new(RwLock::new(AppSession {
43+
runtime,
44+
reconciler,
45+
}));
3846

3947
let mut sessions = self.sessions.write().await;
40-
sessions.insert(connection_id, ());
48+
sessions.insert(connection_id, session.clone());
4149

42-
AppSession {
43-
runtime,
44-
reconciler,
45-
}
50+
session
4651
}
4752

4853
/// Remove a session on disconnect.
@@ -56,6 +61,28 @@ impl AppSessionStore {
5661
let sessions = self.sessions.read().await;
5762
sessions.len()
5863
}
64+
65+
/// Get a session by connection ID (for admin/monitoring).
66+
pub async fn get_session(&self, connection_id: &str) -> Option<Arc<RwLock<AppSession>>> {
67+
let sessions = self.sessions.read().await;
68+
sessions.get(connection_id).cloned()
69+
}
70+
71+
/// Get all active connection IDs (for monitoring/debug).
72+
pub async fn connection_ids(&self) -> Vec<String> {
73+
let sessions = self.sessions.read().await;
74+
sessions.keys().cloned().collect()
75+
}
76+
77+
/// Subscribe to the shutdown broadcast channel.
78+
pub fn subscribe_shutdown(&self) -> broadcast::Receiver<()> {
79+
self.shutdown_tx.subscribe()
80+
}
81+
82+
/// Broadcast a shutdown signal to all subscribers.
83+
pub fn broadcast_shutdown(&self) {
84+
let _ = self.shutdown_tx.send(());
85+
}
5986
}
6087

6188
#[cfg(test)]
@@ -110,12 +137,12 @@ mod tests {
110137
}));
111138

112139
// Create two sessions — each gets its own Runtime with a different view
113-
let mut session_a = store.create_session("conn-a".to_string()).await;
114-
let mut session_b = store.create_session("conn-b".to_string()).await;
140+
let session_a = store.create_session("conn-a".to_string()).await;
141+
let session_b = store.create_session("conn-b".to_string()).await;
115142

116143
// Build each session's tree independently
117-
let tree_a = session_a.runtime.build().await;
118-
let tree_b = session_b.runtime.build().await;
144+
let tree_a = session_a.write().await.runtime.build().await;
145+
let tree_b = session_b.write().await.runtime.build().await;
119146

120147
let json_a = serde_json::to_value(&tree_a).unwrap().to_string();
121148
let json_b = serde_json::to_value(&tree_b).unwrap().to_string();
@@ -144,9 +171,9 @@ mod tests {
144171
let store = store.clone();
145172
let handle = tokio::spawn(async move {
146173
let id = format!("conn-{}", i);
147-
let mut session = store.create_session(id.clone()).await;
174+
let session = store.create_session(id.clone()).await;
148175
// Verify we got a valid session by building its tree
149-
let tree = session.runtime.build().await;
176+
let tree = session.write().await.runtime.build().await;
150177
let json = serde_json::to_value(&tree).unwrap().to_string();
151178
assert!(json.contains("concurrent"));
152179
store.remove_session(&id).await;
@@ -160,4 +187,74 @@ mod tests {
160187

161188
assert_eq!(store.session_count().await, 0);
162189
}
190+
191+
#[tokio::test]
192+
async fn test_get_session() {
193+
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("get-test"))));
194+
195+
store.create_session("conn-1".to_string()).await;
196+
197+
// Should return Some for an active session
198+
assert!(store.get_session("conn-1").await.is_some());
199+
200+
// Should return None for a non-existent session
201+
assert!(store.get_session("conn-999").await.is_none());
202+
203+
// Should return None after removal
204+
store.remove_session("conn-1").await;
205+
assert!(store.get_session("conn-1").await.is_none());
206+
}
207+
208+
#[tokio::test]
209+
async fn test_connection_ids() {
210+
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("ids-test"))));
211+
212+
store.create_session("conn-a".to_string()).await;
213+
store.create_session("conn-b".to_string()).await;
214+
store.create_session("conn-c".to_string()).await;
215+
216+
let mut ids = store.connection_ids().await;
217+
ids.sort();
218+
assert_eq!(ids, vec!["conn-a", "conn-b", "conn-c"]);
219+
220+
store.remove_session("conn-b").await;
221+
let mut ids = store.connection_ids().await;
222+
ids.sort();
223+
assert_eq!(ids, vec!["conn-a", "conn-c"]);
224+
}
225+
226+
#[tokio::test]
227+
async fn test_broadcast_shutdown() {
228+
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("shutdown-test"))));
229+
230+
let mut rx1 = store.subscribe_shutdown();
231+
let mut rx2 = store.subscribe_shutdown();
232+
let mut rx3 = store.subscribe_shutdown();
233+
234+
store.broadcast_shutdown();
235+
236+
// All receivers should get the signal
237+
assert!(rx1.recv().await.is_ok());
238+
assert!(rx2.recv().await.is_ok());
239+
assert!(rx3.recv().await.is_ok());
240+
}
241+
242+
#[tokio::test]
243+
async fn test_session_arc_lifecycle() {
244+
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("lifecycle-test"))));
245+
246+
let session_arc = store.create_session("conn-1".to_string()).await;
247+
248+
// Handler holds a clone — simulates what handle_socket does
249+
let handler_clone = session_arc.clone();
250+
251+
// Remove from store — store's reference is dropped
252+
store.remove_session("conn-1").await;
253+
assert!(store.get_session("conn-1").await.is_none());
254+
255+
// Handler's clone is still valid and usable
256+
let tree = handler_clone.write().await.runtime.build().await;
257+
let json = serde_json::to_value(&tree).unwrap().to_string();
258+
assert!(json.contains("lifecycle-test"));
259+
}
163260
}

rusty/src/server/ws.rs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -150,58 +150,74 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
150150

151151
// Generate a unique connection ID and create an isolated session
152152
let connection_id = Uuid::new_v4().to_string();
153-
let mut session = state
153+
let session_arc = state
154154
.session_store
155155
.create_session(connection_id.clone())
156156
.await;
157+
let mut shutdown_rx = state.session_store.subscribe_shutdown();
157158

158159
// Send initial render from this session's own runtime
159-
if let Some(tree) = session.runtime.current_tree().await {
160-
let msg = ServerMessage::Refresh {
161-
widgets: tree.clone(),
162-
};
163-
session.reconciler.reconcile(&tree);
164-
if let Ok(json) = serde_json::to_string(&msg) {
165-
let _ = sender.send(Message::Text(json.into())).await;
160+
{
161+
let mut session = session_arc.write().await;
162+
if let Some(tree) = session.runtime.current_tree().await {
163+
let msg = ServerMessage::Refresh {
164+
widgets: tree.clone(),
165+
};
166+
session.reconciler.reconcile(&tree);
167+
if let Ok(json) = serde_json::to_string(&msg) {
168+
let _ = sender.send(Message::Text(json.into())).await;
169+
}
166170
}
167171
}
168-
let event_tx = session.runtime.event_sender();
172+
let event_tx = session_arc.read().await.runtime.event_sender();
169173

170174
// Process incoming messages using this session's isolated runtime
171-
while let Some(Ok(msg)) = receiver.next().await {
172-
if let Message::Text(text) = msg {
173-
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) {
174-
match client_msg {
175-
ClientMessage::Event {
176-
widget_id,
177-
event_name,
178-
args,
179-
} => {
180-
let _ = event_tx
181-
.send(RuntimeMessage::Event {
182-
widget_id,
183-
event_name,
184-
args,
185-
})
186-
.await;
187-
188-
// After event, get updated tree from this session's runtime
189-
if let Some(tree) = session.runtime.current_tree().await {
190-
if let Some(patches) = session.reconciler.reconcile(&tree) {
191-
if !patches.is_empty() {
192-
let msg = ServerMessage::Update { patches };
193-
if let Ok(json) = serde_json::to_string(&msg) {
194-
let _ = sender.send(Message::Text(json.into())).await;
175+
loop {
176+
tokio::select! {
177+
msg = receiver.next() => {
178+
match msg {
179+
Some(Ok(Message::Text(text))) => {
180+
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) {
181+
match client_msg {
182+
ClientMessage::Event {
183+
widget_id,
184+
event_name,
185+
args,
186+
} => {
187+
let _ = event_tx
188+
.send(RuntimeMessage::Event {
189+
widget_id,
190+
event_name,
191+
args,
192+
})
193+
.await;
194+
195+
// After event, get updated tree from this session's runtime
196+
let mut session = session_arc.write().await;
197+
if let Some(tree) = session.runtime.current_tree().await {
198+
if let Some(patches) = session.reconciler.reconcile(&tree) {
199+
if !patches.is_empty() {
200+
let msg = ServerMessage::Update { patches };
201+
if let Ok(json) = serde_json::to_string(&msg) {
202+
let _ = sender.send(Message::Text(json.into())).await;
203+
}
204+
}
205+
}
195206
}
196207
}
208+
ClientMessage::Navigate { .. } => {
209+
// Navigation handling (future)
210+
}
197211
}
198212
}
199213
}
200-
ClientMessage::Navigate { .. } => {
201-
// Navigation handling (future)
202-
}
214+
Some(Ok(_)) => {} // Ignore non-text messages
215+
_ => break, // Connection closed or error
203216
}
204217
}
218+
_ = shutdown_rx.recv() => {
219+
break;
220+
}
205221
}
206222
}
207223

0 commit comments

Comments
 (0)