-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcloud_session_manager.rs
More file actions
446 lines (397 loc) · 14.5 KB
/
cloud_session_manager.rs
File metadata and controls
446 lines (397 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
//! Cloud session manager — Docker-backed sessions with state machine and
//! SQLite persistence.
//!
//! Orchestrates the full lifecycle: container spawn, exec attach, state
//! transitions, per-project concurrency limits, and recovery on restart.
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use bollard::models::ContainerSummaryStateEnum;
use dashmap::DashMap;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use crate::backends::DockerExecBackend;
use crate::config::Config;
use crate::docker::DockerOrchestrator;
use crate::error::{RelayError, RelayResult};
use crate::project_manager::ProjectManager;
use crate::session::TerminalSession;
use crate::store::{self, Store};
// ---------------------------------------------------------------------------
// Per-project semaphore cache
// ---------------------------------------------------------------------------
/// Thread-safe map of project_id → Semaphore for concurrency control.
///
/// Each semaphore is initialised with `max_sessions` permits (from config).
/// Before creating a session, a permit is acquired; on session stop/fail the
/// permit is released automatically via [`OwnedSemaphorePermit`] drop.
struct ProjectSemaphores {
map: DashMap<String, Arc<Semaphore>>,
max_sessions: usize,
}
#[allow(dead_code)]
impl ProjectSemaphores {
fn new(max_sessions: usize) -> Self {
Self {
map: DashMap::new(),
max_sessions,
}
}
/// Get or create the semaphore for a project.
fn get(&self, project_id: &str) -> Arc<Semaphore> {
self.map
.entry(project_id.to_owned())
.or_insert_with(|| Arc::new(Semaphore::new(self.max_sessions)))
.value()
.clone()
}
}
// ---------------------------------------------------------------------------
// CloudSessionManager
// ---------------------------------------------------------------------------
/// Manages Docker-backed terminal sessions with full lifecycle tracking.
///
/// Responsibilities:
/// - Per-project `max_sessions` enforcement via [`Semaphore`]
/// - State machine: Starting → Running → Stopped / Failed
/// - SQLite persistence of every state transition
/// - Git clone (not worktree) for session isolation
/// - VT snapshot for reconnect via [`TerminalSession`]
/// - Recovery at startup: Running sessions whose containers died → Failed
#[allow(dead_code)]
pub struct CloudSessionManager {
store: Store,
docker: DockerOrchestrator,
project_manager: ProjectManager,
config: Arc<Config>,
semaphores: ProjectSemaphores,
/// Active in-memory sessions keyed by session ID.
sessions: DashMap<String, Arc<TerminalSession>>,
/// Semaphore permits held for active sessions, keyed by session ID.
/// Dropping an entry automatically releases the permit back to the semaphore.
permits: DashMap<String, OwnedSemaphorePermit>,
}
#[allow(dead_code)]
impl CloudSessionManager {
/// Create a new manager.
pub fn new(
store: Store,
docker: DockerOrchestrator,
project_manager: ProjectManager,
config: Arc<Config>,
) -> Self {
let max_sessions = config.max_sessions;
Self {
store,
docker,
project_manager,
config,
semaphores: ProjectSemaphores::new(max_sessions),
sessions: DashMap::new(),
permits: DashMap::new(),
}
}
/// Create a new cloud session for a project.
///
/// Steps:
/// 1. Verify project exists and is Ready
/// 2. Check per-project session limit
/// 3. Create isolated git clone for the branch
/// 4. Create session record in DB (state = Starting)
/// 5. Spawn Docker container + exec attach
/// 6. Transition state to Running
/// 7. Start VT read loop
///
/// On any failure after the DB record is created, state transitions to
/// Failed with the error reason persisted.
pub async fn create_session(
&self,
project_id: &str,
branch: &str,
image: Option<&str>,
profile: Option<&str>,
cols: u16,
rows: u16,
) -> RelayResult<Arc<TerminalSession>> {
// 1. Verify project exists and is Ready.
let project = self
.store
.get_project(project_id)
.await
.map_err(RelayError::Database)?
.ok_or_else(|| RelayError::NotFound {
entity: "Project",
id: project_id.to_owned(),
})?;
if project.state != store::ProjectState::Ready {
return Err(RelayError::Config(format!(
"project {} is not ready (state: {:?})",
project_id, project.state
)));
}
// 2. Check per-project session limit.
let semaphore = self.semaphores.get(project_id);
let permit = semaphore
.try_acquire_owned()
.map_err(|_| RelayError::SessionLimit {
max: self.config.max_sessions,
project_id: project_id.to_owned(),
})?;
// 3. Create isolated git clone for the branch.
let project_dir = self.project_manager.projects_dir().join(project_id);
// Use a unique subdirectory per session to avoid collisions.
let session_clone_id = uuid::Uuid::new_v4().to_string();
let clone_dir = project_dir.join(format!("sessions/{session_clone_id}"));
self.project_manager
.clone_for_session(&project_dir, branch, &clone_dir)
.await?;
// 4. Create session record in DB (state = Starting).
let effective_image = image
.unwrap_or_else(|| self.docker.default_image())
.to_owned();
let profile_name = profile.unwrap_or("default").to_owned();
let db_session = self
.store
.create_session(store::CreateSession {
project_id: project_id.to_owned(),
branch: branch.to_owned(),
image: effective_image.clone(),
profile: profile_name.clone(),
work_dir: String::new(),
})
.await
.map_err(RelayError::Database)?;
let session_id = db_session.id.clone();
let updated_at = db_session.updated_at.to_rfc3339();
tracing::info!(
session_id = %session_id,
project_id,
branch,
"cloud session created (Starting)"
);
// 5. Spawn Docker container + exec attach.
let env = self.build_session_env(&session_id, &clone_dir);
let cmd = vec!["/bin/bash".to_owned()];
let backend = match DockerExecBackend::spawn(
self.docker.clone(),
&session_id,
&effective_image,
env,
cmd,
cols,
rows,
)
.await
{
Ok(b) => b,
Err(e) => {
// Transition to Failed.
let _ = self
.store
.update_session_state(
&session_id,
store::SessionState::Failed,
None,
Some(&e.to_string()),
&updated_at,
)
.await;
// permit drops here, releasing the semaphore slot automatically.
return Err(e);
}
};
let container_id = backend.container_id().to_owned();
// 6. Transition state to Running.
let updated = self
.store
.update_session_state(
&session_id,
store::SessionState::Running,
Some(&container_id),
None,
&updated_at,
)
.await
.map_err(RelayError::Database)?;
if !updated {
tracing::warn!(
session_id = %session_id,
"optimistic concurrency conflict on Starting->Running transition"
);
}
tracing::info!(
session_id = %session_id,
container_id = %container_id,
"cloud session Running"
);
// 7. Create TerminalSession with the docker backend.
let scrollback = self.config.scrollback_lines;
let terminal_session = TerminalSession::new_with_backend(
session_id.clone(),
Box::new(backend),
cols,
rows,
scrollback,
"bash".to_owned(),
);
terminal_session.start_read_loop();
self.sessions
.insert(session_id.clone(), Arc::clone(&terminal_session));
// Store the permit — it is released automatically when removed from the map.
self.permits.insert(session_id.clone(), permit);
Ok(terminal_session)
}
/// Look up an active in-memory session.
pub fn get_session(&self, session_id: &str) -> Option<Arc<TerminalSession>> {
self.sessions.get(session_id).map(|e| Arc::clone(e.value()))
}
/// List all active in-memory sessions.
pub fn list_sessions(&self) -> Vec<Arc<TerminalSession>> {
self.sessions
.iter()
.map(|e| Arc::clone(e.value()))
.collect()
}
/// Stop a session: terminate the backend, transition to Stopped.
pub async fn stop_session(&self, session_id: &str) -> RelayResult<()> {
// Always release the permit, even if the session entry is missing, to
// avoid a permanent leak when the two maps drift out of sync.
self.permits.remove(session_id);
let (_, session) =
self.sessions
.remove(session_id)
.ok_or_else(|| RelayError::NotFound {
entity: "Session",
id: session_id.to_owned(),
})?;
session.terminate();
// Single DB read for state update.
if let Some(s) = self
.store
.get_session(session_id)
.await
.map_err(RelayError::Database)?
{
let _ = self
.store
.update_session_state(
session_id,
store::SessionState::Stopped,
None,
None,
&s.updated_at.to_rfc3339(),
)
.await;
}
tracing::info!(session_id, "cloud session stopped");
Ok(())
}
/// Recover state on server restart.
///
/// All sessions in Starting or Running state whose containers are gone
/// transition to Failed. This is the T-7b acceptance criterion:
/// "Running → Failed (container gone)".
pub async fn recover_sessions(&self) -> RelayResult<()> {
let active = self
.store
.list_active_sessions()
.await
.map_err(RelayError::Database)?;
// Collect running container IDs via non-destructive list (not stop!).
let running_ids: std::collections::HashSet<String> =
match self.docker.list_managed_containers().await {
Ok(containers) => containers
.into_iter()
.filter_map(|c| {
let is_running = c.state == Some(ContainerSummaryStateEnum::RUNNING);
if is_running {
c.id
} else {
None
}
})
.collect(),
Err(e) => {
tracing::error!(error = %e, "recovery: failed to list Docker containers");
return Ok(());
}
};
let mut recovered = 0u32;
for session in &active {
let container_alive = session
.container_id
.as_deref()
.is_some_and(|cid| running_ids.contains(cid));
if !container_alive {
let _ = self
.store
.force_stop_session(&session.id, "container gone after restart")
.await;
recovered += 1;
tracing::info!(
session_id = %session.id,
container_id = ?session.container_id,
"session recovered: Running -> Failed (container gone)"
);
}
}
if recovered > 0 {
tracing::info!(recovered, "cloud session recovery complete");
}
Ok(())
}
/// Terminate all active sessions — for graceful shutdown.
pub async fn terminate_all(&self) {
let count = self.sessions.len();
tracing::info!(count, "terminating all cloud sessions");
let ids: Vec<String> = self.sessions.iter().map(|e| e.key().clone()).collect();
for id in &ids {
if let Err(e) = self.stop_session(id).await {
tracing::warn!(
session_id = %id,
error = %e,
"failed to stop session during shutdown"
);
}
}
tracing::info!("all cloud sessions terminated");
}
/// Build environment variables for a session container.
fn build_session_env(&self, session_id: &str, clone_dir: &Path) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("RELAY_SESSION_ID".to_owned(), session_id.to_owned());
env.insert(
"RELAY_WORKSPACE_DIR".to_owned(),
clone_dir.to_string_lossy().into_owned(),
);
env.insert("TERM".to_owned(), "xterm-256color".to_owned());
env
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn semaphore_returns_same_instance_for_same_project() {
let sems = ProjectSemaphores::new(5);
let a = sems.get("proj-1");
let b = sems.get("proj-1");
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn semaphore_returns_different_instance_for_different_projects() {
let sems = ProjectSemaphores::new(5);
let a = sems.get("proj-1");
let b = sems.get("proj-2");
assert!(!Arc::ptr_eq(&a, &b));
}
#[test]
fn semaphore_respects_max_sessions() {
let sems = ProjectSemaphores::new(2);
let sem = sems.get("proj-1");
// Acquire 2 permits.
let _p1 = sem.try_acquire().expect("first permit");
let _p2 = sem.try_acquire().expect("second permit");
// Third should fail.
assert!(sem.try_acquire().is_err());
}
}