Skip to content

Commit 8bcb9ee

Browse files
committed
WIP: MoQ server breaks /reset
1 parent 7f54ad4 commit 8bcb9ee

1 file changed

Lines changed: 19 additions & 4 deletions

File tree

smelter-core/src/pipeline/moq/server.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, Mutex};
22
use std::time::Duration;
33

4-
use moq_lite::{Origin, OriginConsumer, OriginProducer};
4+
use moq_lite::{Origin, OriginConsumer, OriginProducer, Session};
55
use moq_native::{ServerConfig, ServerTlsConfig};
66
use smelter_render::error::ErrorStack;
77
use tracing::{debug, info, warn};
@@ -28,15 +28,23 @@ impl MoqPipelineState {
2828
}
2929
}
3030

31+
type MoqSessions = Arc<Mutex<Vec<Session>>>;
3132
pub struct MoqServerHandle {
3233
tasks: Vec<tokio::task::JoinHandle<()>>,
34+
sessions: MoqSessions,
3335
}
3436

3537
impl Drop for MoqServerHandle {
3638
fn drop(&mut self) {
3739
for task in &self.tasks {
3840
task.abort();
3941
}
42+
tracing::error!("DUPA1");
43+
let mut sessions = self.sessions.lock().unwrap();
44+
tracing::error!("DUPA2");
45+
for session in sessions.iter_mut() {
46+
session.close(moq_lite::Error::Cancel);
47+
}
4048
}
4149
}
4250

@@ -70,7 +78,8 @@ pub async fn spawn_moq_server(
7078
Err(error) => return Err(InitPipelineError::MoqServerInitError(error)),
7179
};
7280

73-
let accept_task = tokio::spawn(run_accept_loop(server));
81+
let moq_sessions: MoqSessions = Arc::new(Mutex::new(vec![]));
82+
let accept_task = tokio::spawn(run_accept_loop(server, moq_sessions.clone()));
7483

7584
let origin_consumer = state.origin.consume();
7685
let moq_inputs = state.inputs.clone();
@@ -80,16 +89,22 @@ pub async fn spawn_moq_server(
8089

8190
Ok(MoqServerHandle {
8291
tasks: vec![accept_task, announce_task],
92+
sessions: Arc::new(Mutex::new(vec![])),
8393
})
8494
}
8595

86-
async fn run_accept_loop(mut server: moq_native::Server) {
96+
async fn run_accept_loop(mut server: moq_native::Server, moq_sessions: MoqSessions) {
8797
while let Some(request) = server.accept().await {
98+
let moq_sessions = moq_sessions.clone();
8899
tokio::spawn(async move {
89100
match request.ok().await {
90101
Ok(session) => {
91102
info!("MoQ session established");
92103
debug!(moq_version=?session.version());
104+
{
105+
let mut sessions = moq_sessions.lock().unwrap();
106+
sessions.push(session.clone());
107+
}
93108
let _ = session.closed().await;
94109
info!("MoQ session closed");
95110
}

0 commit comments

Comments
 (0)