From 5d1f42461c4719d2bde7ad40d33d1aa9e83abfef Mon Sep 17 00:00:00 2001 From: Manish Date: Fri, 19 Dec 2025 00:12:42 +0530 Subject: [PATCH 01/14] refactor: simplified remote manager --- Cargo.lock | 1 + moq-relay-ietf/Cargo.toml | 2 +- moq-relay-ietf/src/producer.rs | 27 +- moq-relay-ietf/src/relay.rs | 19 +- moq-relay-ietf/src/remote.rs | 567 +++++++++++++-------------------- 5 files changed, 237 insertions(+), 379 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65af2d8c..cfc98acc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1308,6 +1308,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tokio-util", "tower-http", "tracing", "tracing-subscriber", diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 0df99745..773f0ba4 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -30,7 +30,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } -# tokio-util = "0.7" +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index 23ea49f3..8f17352a 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -4,18 +4,18 @@ use moq_transport::{ session::{Publisher, SessionError, Subscribed, TrackStatusRequested}, }; -use crate::{Locals, RemotesConsumer}; +use crate::{Locals, RemoteManager}; /// Producer of tracks to a remote Subscriber #[derive(Clone)] pub struct Producer { publisher: Publisher, locals: Locals, - remotes: Option, + remotes: RemoteManager, } impl Producer { - pub fn new(publisher: Publisher, locals: Locals, remotes: Option) -> Self { + pub fn new(publisher: Publisher, locals: Locals, remotes: RemoteManager) -> Self { Self { publisher, locals, @@ -89,21 +89,16 @@ impl Producer { } } - if let Some(remotes) = self.remotes { - // Check remote tracks second, and serve from remote if possible - match remotes.route(&namespace).await { - Ok(remote) => { - if let Some(remote) = remote { - if let Some(track) = remote.subscribe(&namespace, &track_name)? { - log::info!("serving subscribe from remote: {:?}", track.info); - return Ok(subscribed.serve(track.reader).await?); - } - } - } - Err(e) => { - log::error!("failed to route to remote: {}", e); + match self.remotes.subscribe(&namespace, &track_name).await { + Ok(track) => { + if let Some(track) = track { + log::info!("serving subscribe from remote: {:?}", track.info); + return Ok(subscribed.serve(track).await?); } } + Err(e) => { + log::error!("failed to route to remote: {}", e); + } } // Track not found - close the subscription with not found error let err = ServeError::not_found_ctx(format!( diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index f40e576f..5d913ec2 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -6,9 +6,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_native_ietf::quic::{self, Endpoint}; use url::Url; -use crate::{ - Consumer, Coordinator, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session, -}; +use crate::{Consumer, Coordinator, Locals, Producer, RemoteManager, Session}; // A type alias for boxed future type ServerFuture = Pin< @@ -56,7 +54,7 @@ pub struct Relay { announce_url: Option, mlog_dir: Option, locals: Locals, - remotes: Option<(RemotesProducer, RemotesConsumer)>, + remotes: RemoteManager, coordinator: Arc, } @@ -101,18 +99,14 @@ impl Relay { .collect::>(); // Create remote manager - uses coordinator for namespace lookups - let remotes = Remotes { - coordinator: config.coordinator.clone(), - quic: remote_clients[0].clone(), - } - .produce(); + let remotes = RemoteManager::new(config.coordinator.clone(), remote_clients); Ok(Self { quic_endpoints: endpoints, announce_url: config.announce, mlog_dir: config.mlog_dir, locals, - remotes: Some(remotes), + remotes, coordinator: config.coordinator, }) } @@ -122,10 +116,7 @@ impl Relay { let mut tasks = FuturesUnordered::new(); // Split remotes producer/consumer and spawn producer task - let remotes = self.remotes.map(|(producer, consumer)| { - tasks.push(producer.run().boxed()); - consumer - }); + let remotes = self.remotes; // Start the forwarder, if any let forward_producer = if let Some(url) = &self.announce_url { diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 05fe407c..1d69bcb0 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -1,416 +1,287 @@ use std::collections::HashMap; - -use std::collections::VecDeque; -use std::fmt; -use std::ops; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::sync::Weak; -use futures::stream::FuturesUnordered; -use futures::FutureExt; -use futures::StreamExt; use moq_native_ietf::quic; use moq_transport::coding::TrackNamespace; -use moq_transport::serve::{Track, TrackReader, TrackWriter}; -use moq_transport::watch::State; +use moq_transport::serve::{Track, TrackReader}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use url::Url; use crate::Coordinator; -/// Information about remote origins. -pub struct Remotes { - /// The client we use to fetch/store origin information. - pub coordinator: Arc, - - // A QUIC endpoint we'll use to fetch from other origins. - pub quic: quic::Client, -} - -impl Remotes { - pub fn produce(self) -> (RemotesProducer, RemotesConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let producer = RemotesProducer::new(info.clone(), send); - let consumer = RemotesConsumer::new(info, recv); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemotesState { - lookup: HashMap, - requested: VecDeque, -} - -// Clone for convenience, but there should only be one instance of this +/// Manages connections to remote relays. +/// +/// When a subscription request comes in for a namespace that isn't local, +/// RemoteManager uses the coordinator to find which remote relay serves it, +/// establishes a connection if needed, and subscribes to the track. #[derive(Clone)] -pub struct RemotesProducer { - info: Arc, - state: State, +pub struct RemoteManager { + coordinator: Arc, + clients: Vec, + remotes: Arc>>, } -impl RemotesProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Block until the next remote requested by a consumer. - async fn next(&mut self) -> Option { - loop { - { - let state = self.state.lock(); - if !state.requested.is_empty() { - return state.into_mut()?.requested.pop_front(); - } - - state.modified()? - } - .await; +impl RemoteManager { + /// Create a new RemoteManager. + pub fn new(coordinator: Arc, clients: Vec) -> Self { + Self { + coordinator, + clients, + remotes: Arc::new(Mutex::new(HashMap::new())), } } - /// Run the remotes producer to serve remote requests. - pub async fn run(mut self) -> anyhow::Result<()> { - let mut tasks = FuturesUnordered::new(); - - loop { - tokio::select! { - Some(mut remote) = self.next() => { - let url = remote.url.clone(); - - // Spawn a task to serve the remote - tasks.push(async move { - let info = remote.info.clone(); + /// Subscribe to a track from a remote relay. + /// + /// This will: + /// 1. Use the coordinator to lookup which relay serves the namespace + /// 2. Connect to that relay if not already connected + /// 3. Subscribe to the specific track + /// + /// Returns None if the namespace isn't found in any remote relay. + pub async fn subscribe( + &self, + namespace: &TrackNamespace, + track_name: &str, + ) -> anyhow::Result> { + // Ask coordinator where this namespace lives + let (origin, client) = match self.coordinator.lookup(namespace).await { + Ok((origin, client)) => (origin, client), + Err(e) => { + log::error!("failed to lookup namespace: {}", e); + return Ok(None); // Namespace not found anywhere + } + }; - log::warn!("serving remote: {:?}", info); + let url = origin.url(); - // Run the remote producer - if let Err(err) = remote.run().await { - log::warn!("failed serving remote: {:?}, error: {}", info, err); - } + // Get or create a connection to the remote relay + let remote = match self.get_or_connect(&url, client.as_ref()).await { + Ok(remote) => remote, + Err(e) => { + log::error!("failed to connect to remote relay {}: {}", url, e); + // Remove failed connection from cache + self.remove(&url).await; + return Err(e); + } + }; - url - }); + // Subscribe to the track on the remote + match remote + .subscribe(namespace.clone(), track_name.to_string()) + .await + { + Ok(reader) => Ok(reader), + Err(e) => { + // If subscription fails, check if connection is dead and remove it + if !remote.is_connected() { + log::warn!("remote connection {} is dead, removing from cache", url); + self.remove(&url).await; } - - // Handle finished remote producers - res = tasks.next(), if !tasks.is_empty() => { - let url = res.unwrap(); - - if let Some(mut state) = self.state.lock_mut() { - state.lookup.remove(&url); - } - }, - else => return Ok(()), + Err(e) } } } -} - -impl ops::Deref for RemotesProducer { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.info - } -} - -#[derive(Clone)] -pub struct RemotesConsumer { - pub info: Arc, - state: State, -} -impl RemotesConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Route to a remote origin based on the namespace. - pub async fn route( + /// Get an existing remote connection or create a new one. + async fn get_or_connect( &self, - namespace: &TrackNamespace, - ) -> anyhow::Result> { - // Always fetch the origin instead of using the (potentially invalid) cache. - let (origin, client) = self.coordinator.lookup(namespace).await?; - - // Check if we already have a remote for this origin - let state = self.state.lock(); - if let Some(remote) = state.lookup.get(&origin.url()).cloned() { - return Ok(Some(remote)); + url: &Url, + client: Option<&quic::Client>, + ) -> anyhow::Result { + let mut remotes = self.remotes.lock().await; + + // Check if we already have a connection + if let Some(remote) = remotes.get(url) { + if remote.is_connected() { + return Ok(remote.clone()); + } + // Connection is dead, remove it + log::info!("removing dead connection to remote relay: {}", url); + remotes.remove(url); } - // Create a new remote for this origin - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), + // Get client, with proper error handling for empty clients vec + let client = match client { + Some(c) => c, + None => self.clients.first().ok_or_else(|| { + anyhow::anyhow!("no QUIC clients configured for remote connections") + })?, }; - let remote = Remote { - url: origin.url(), - remotes: self.info.clone(), - client, - }; - - // Produce the remote - let (writer, reader) = remote.produce(); - state.requested.push_back(writer); + // Create a new connection with its own QUIC client + log::info!("connecting to remote relay: {}", url); + let remote = Remote::connect(url.clone(), client).await?; - // Insert the remote into our Map - state.lookup.insert(origin.url(), reader.clone()); + remotes.insert(url.clone(), remote.clone()); - Ok(Some(reader)) + Ok(remote) } -} -impl ops::Deref for RemotesConsumer { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.info + /// Remove a remote connection (called when connection fails). + pub async fn remove(&self, url: &Url) { + let mut remotes = self.remotes.lock().await; + if let Some(remote) = remotes.remove(url) { + // Cancel the session task when removing + remote.shutdown(); + } } -} - -pub struct Remote { - pub remotes: Arc, - pub url: Url, - pub client: Option, -} -impl fmt::Debug for Remote { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Remote") - .field("url", &self.url.to_string()) - .finish() + /// Shutdown all remote connections. + pub async fn shutdown(&self) { + let mut remotes = self.remotes.lock().await; + for (url, remote) in remotes.drain() { + log::info!("shutting down remote connection: {}", url); + remote.shutdown(); + } } } -impl ops::Deref for Remote { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.remotes - } +/// A connection to a single remote relay with its own QUIC client. +#[derive(Clone)] +pub struct Remote { + url: Url, + subscriber: moq_transport::session::Subscriber, + /// Track subscriptions - maps (namespace, track_name) to track reader + tracks: Arc>>, + /// Flag indicating if the connection is still alive + connected: Arc, + /// Cancellation token for the session task + cancel: CancellationToken, } impl Remote { - /// Create a new broadcast. - pub fn produce(self) -> (RemoteProducer, RemoteConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let consumer = RemoteConsumer::new(info.clone(), recv); - let producer = RemoteProducer::new(info, send); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemoteState { - tracks: HashMap<(TrackNamespace, String), RemoteTrackWeak>, - requested: VecDeque, -} - -pub struct RemoteProducer { - pub info: Arc, - state: State, -} - -impl RemoteProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - pub async fn run(&mut self) -> anyhow::Result<()> { - let client = if let Some(client) = &self.info.client { - client - } else { - &self.quic - }; - // TODO reuse QUIC and MoQ sessions - let (session, _quic_client_initial_cid) = client.connect(&self.url, None).await?; + /// Connect to a remote relay with a dedicated QUIC client. + async fn connect(url: Url, client: &quic::Client) -> anyhow::Result { + // Connect to the remote relay (DNS resolution happens inside connect) + let (session, _cid) = client.connect(&url, None).await?; let (session, subscriber) = moq_transport::session::Subscriber::connect(session).await?; - // Run the session - let mut session = session.run().boxed(); - let mut tasks = FuturesUnordered::new(); + let connected = Arc::new(AtomicBool::new(true)); + let cancel = CancellationToken::new(); - let mut done = None; + // Spawn a task to run the session + let session_url = url.clone(); + let session_connected = connected.clone(); + let session_cancel = cancel.clone(); - // Serve requested tracks - loop { + tokio::spawn(async move { tokio::select! { - track = self.next(), if done.is_none() => { - let track = match track { - Ok(Some(track)) => track, - Ok(None) => { done = Some(Ok(())); continue }, - Err(err) => { done = Some(Err(err)); continue }, - }; - - let info = track.info.clone(); - let mut subscriber = subscriber.clone(); - - tasks.push(async move { - if let Err(err) = subscriber.subscribe(track).await { - log::warn!("failed serving track: {:?}, error: {}", info, err); - } - }); - } - _ = tasks.next(), if !tasks.is_empty() => {}, - - // Keep running the session - res = &mut session, if !tasks.is_empty() || done.is_none() => return Ok(res?), - - else => return done.unwrap(), - } - } - } - - /// Block until the next track requested by a consumer. - async fn next(&self) -> anyhow::Result> { - loop { - let notify = { - let state = self.state.lock(); - - // Check if we have any requested tracks - if !state.requested.is_empty() { - return Ok(state - .into_mut() - .and_then(|mut state| state.requested.pop_front())); + result = session.run() => { + if let Err(err) = result { + log::warn!("remote session closed: {} - {}", session_url, err); + } else { + log::info!("remote session closed normally: {}", session_url); + } } - - match state.modified() { - Some(notified) => notified, - None => return Ok(None), + _ = session_cancel.cancelled() => { + log::info!("remote session cancelled: {}", session_url); } - }; + } + // Mark connection as dead + session_connected.store(false, Ordering::Release); + }); - notify.await - } + Ok(Self { + url, + subscriber, + tracks: Arc::new(Mutex::new(HashMap::new())), + connected, + cancel, + }) } -} -impl ops::Deref for RemoteProducer { - type Target = Remote; - - fn deref(&self) -> &Self::Target { - &self.info + /// Check if the connection is still alive. + pub fn is_connected(&self) -> bool { + self.connected.load(Ordering::Acquire) } -} - -#[derive(Clone)] -pub struct RemoteConsumer { - pub info: Arc, - state: State, -} -impl RemoteConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } + /// Shutdown the remote connection. + pub fn shutdown(&self) { + self.cancel.cancel(); + self.connected.store(false, Ordering::Release); } - /// Request a track from the broadcast. - pub fn subscribe( + /// Subscribe to a track on this remote relay. + pub async fn subscribe( &self, - namespace: &TrackNamespace, - name: &str, - ) -> anyhow::Result> { - let key = (namespace.clone(), name.to_string()); - let state = self.state.lock(); - if let Some(track) = state.tracks.get(&key) { - if let Some(track) = track.upgrade() { - return Ok(Some(track)); - } + namespace: TrackNamespace, + track_name: String, + ) -> anyhow::Result> { + // Check connection state first + if !self.is_connected() { + return Err(anyhow::anyhow!( + "remote connection to {} is closed", + self.url + )); } - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), - }; - - let (writer, reader) = Track::new(namespace.clone(), name.to_string()).produce(); - let reader = RemoteTrackReader::new(reader, self.state.clone()); + let key = (namespace.clone(), track_name.clone()); - // Insert the track into our Map so we deduplicate future requests. - state.tracks.insert(key, reader.downgrade()); - state.requested.push_back(writer); + // Hold lock for entire check-and-insert to prevent race conditions + let mut tracks = self.tracks.lock().await; - Ok(Some(reader)) - } -} - -impl ops::Deref for RemoteConsumer { - type Target = Remote; - - fn deref(&self) -> &Self::Target { - &self.info - } -} - -#[derive(Clone)] -pub struct RemoteTrackReader { - pub reader: TrackReader, - drop: Arc, -} - -impl RemoteTrackReader { - fn new(reader: TrackReader, parent: State) -> Self { - let drop = Arc::new(RemoteTrackDrop { - parent, - key: (reader.namespace.clone(), reader.name.clone()), - }); - - Self { reader, drop } - } - - fn downgrade(&self) -> RemoteTrackWeak { - RemoteTrackWeak { - reader: self.reader.clone(), - drop: Arc::downgrade(&self.drop), + // Check if we already have this track + if let Some(reader) = tracks.get(&key) { + return Ok(Some(reader.clone())); } - } -} -impl ops::Deref for RemoteTrackReader { - type Target = TrackReader; - - fn deref(&self) -> &Self::Target { - &self.reader - } -} - -impl ops::DerefMut for RemoteTrackReader { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.reader - } -} + // Create a new track and subscribe + let (writer, reader) = Track::new(namespace.clone(), track_name.clone()).produce(); + + // Insert BEFORE spawning to prevent race with removal in the spawned task + tracks.insert(key.clone(), reader.clone()); + + // Drop lock before spawning async task + drop(tracks); + + // Subscribe to the track on the remote + let mut subscriber = self.subscriber.clone(); + let track_key = key; + let tracks_clone = self.tracks.clone(); + let url = self.url.clone(); + + tokio::spawn(async move { + log::info!( + "subscribing to remote track: {} - {}/{}", + url, + track_key.0, + track_key.1 + ); + + if let Err(err) = subscriber.subscribe(writer).await { + log::warn!( + "failed subscribing to remote track: {} - {}/{} - {}", + url, + track_key.0, + track_key.1, + err + ); + // NOTE(itzmanish): should we assume the connection is bad? + // connected.store(false, Ordering::Release); + } -struct RemoteTrackWeak { - reader: TrackReader, - drop: Weak, -} + // Remove track from map when subscription ends + tracks_clone.lock().await.remove(&track_key); + log::debug!( + "remote track subscription ended: {} - {}/{}", + url, + track_key.0, + track_key.1 + ); + }); -impl RemoteTrackWeak { - fn upgrade(&self) -> Option { - Some(RemoteTrackReader { - reader: self.reader.clone(), - drop: self.drop.upgrade()?, - }) + Ok(Some(reader)) } } -struct RemoteTrackDrop { - parent: State, - key: (TrackNamespace, String), -} - -impl Drop for RemoteTrackDrop { - fn drop(&mut self) { - if let Some(mut parent) = self.parent.lock_mut() { - parent.tracks.remove(&self.key); - } +impl std::fmt::Debug for Remote { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Remote") + .field("url", &self.url.to_string()) + .field("connected", &self.is_connected()) + .finish() } } From cc4f63b754975427d06901a8ffe1c1bc51e6510d Mon Sep 17 00:00:00 2001 From: Scott Godin Date: Mon, 15 Dec 2025 16:47:17 -0500 Subject: [PATCH 02/14] If we receive a stream header with an unknown track alias, then wait for up to 1 second for SubscribeOk to arrive -remove panic if Fetch stream header --- moq-transport/src/session/mod.rs | 3 +- moq-transport/src/session/subscriber.rs | 83 +++++++++++++++++++++---- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index e5d9efb6..3b01dbd1 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -385,7 +385,8 @@ impl Session { subscriber .as_mut() .ok_or(SessionError::RoleViolation)? - .recv_datagram(datagram)?; + .recv_datagram(datagram) + .await?; } } } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index a9fa2e16..955a33cc 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -2,8 +2,11 @@ use std::{ collections::{hash_map, HashMap}, io, sync::{atomic, Arc, Mutex}, + time::Duration, }; +use tokio::sync::Notify; + use crate::{ coding::{Decode, TrackNamespace}, data, @@ -16,6 +19,9 @@ use crate::watch::Queue; use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeRecv}; +// Default timeout for waiting for subscribe aliases to become available via SUBSCRIBE_OK (1 second) +const DEFAULT_ALIAS_WAIT_TIME_MS: u64 = 1000; + // TODO remove Clone. #[derive(Clone)] pub struct Subscriber { @@ -31,6 +37,9 @@ pub struct Subscriber { /// Map of track alias to subscription id for quick lookup when receiving streams/datagrams. subscribe_alias_map: Arc>>, + /// Notify when subscribe alias map is updated + subscribe_alias_notify: Arc, + /// The queue we will write any outbound control messages we want to send, the session run_send task /// will process the queue and send the message on the control stream. outgoing: Queue, @@ -60,6 +69,7 @@ impl Subscriber { outgoing, next_requestid, mlog, + subscribe_alias_notify: Arc::new(Notify::new()), } } @@ -204,6 +214,9 @@ impl Subscriber { .unwrap() .insert(msg.track_alias, msg.id); + // Notify waiting tasks that the alias map has been updated + self.subscribe_alias_notify.notify_waiters(); + // Notify the subscribe of the successful subscription subscribe.ok(msg.track_alias)?; } @@ -258,13 +271,47 @@ impl Subscriber { self.announced.lock().unwrap().remove(namespace); } - /// Get a subscribe id by track alias. - fn get_subscribe_id_by_alias(&mut self, track_alias: u64) -> Option { - self.subscribe_alias_map - .lock() - .unwrap() - .get(&track_alias) - .cloned() + /// Get a subscribe id by track alias, waiting up to the specified timeout if not present. + /// If timeout_ms is None, only check if already present and return None if not. + async fn get_subscribe_id_by_alias( + &self, + track_alias: u64, + timeout_ms: Option, + ) -> Option { + // If no timeout specified, don't wait + let timeout_ms = match timeout_ms { + Some(ms) => ms, + None => { + // Just check once + return self + .subscribe_alias_map + .lock() + .unwrap() + .get(&track_alias) + .cloned(); + } + }; + + // Wait for it to appear, checking after each notification + let timeout_duration = Duration::from_millis(timeout_ms); + tokio::time::timeout(timeout_duration, async { + loop { + // Check first + if let Some(id) = self + .subscribe_alias_map + .lock() + .unwrap() + .get(&track_alias) + .cloned() + { + return id; + } + // Then wait for notification + self.subscribe_alias_notify.notified().await; + } + }) + .await + .ok() } /// Handle reception of a new stream from the QUIC session. @@ -282,6 +329,11 @@ impl Subscriber { stream_header.header_type ); + // No fetch support yet + if !stream_header.header_type.is_subgroup() { + return Err(SessionError::unimplemented("non-SUBGROUP stream types")); + } + // Log subgroup header parsed/received if let Some(ref subgroup_header) = stream_header.subgroup_header { if let Some(ref mlog) = self.mlog { @@ -294,7 +346,6 @@ impl Subscriber { } } - // No fetch support yet, so panic if fetch_header for now (via unwrap below) let track_alias = stream_header.subgroup_header.as_ref().unwrap().track_alias; log::trace!( "[SUBSCRIBER] recv_stream: stream for subscription track_alias={}", @@ -309,9 +360,9 @@ impl Subscriber { track_alias, err ); - // The writer is closed, so we should teriminate. + // The writer is closed, so we should terminate. // TODO it would be nice to do this immediately when the Writer is closed. - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias) { + if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias, None).await { if let Some(subscribe) = self.remove_subscribe(subscribe_id) { subscribe.error(err.clone())?; } @@ -342,7 +393,10 @@ impl Subscriber { let writer = { // Look up the subscribe id for this track alias - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias) { + if let Some(subscribe_id) = self + .get_subscribe_id_by_alias(track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS)) + .await + { // Look up the subscribe by id let mut subscribes = self.subscribes.lock().unwrap(); let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| { @@ -577,7 +631,7 @@ impl Subscriber { } /// Handle reception of a datagram from the QUIC session. - pub fn recv_datagram(&mut self, datagram: bytes::Bytes) -> Result<(), SessionError> { + pub async fn recv_datagram(&mut self, datagram: bytes::Bytes) -> Result<(), SessionError> { let mut cursor = io::Cursor::new(datagram); let datagram = data::Datagram::decode(&mut cursor)?; @@ -627,7 +681,10 @@ impl Subscriber { } // Look up the subscribe id for this track alias - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(datagram.track_alias) { + if let Some(subscribe_id) = self + .get_subscribe_id_by_alias(datagram.track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS)) + .await + { // Look up the subscribe by id if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&subscribe_id) { log::trace!( From 2fe94606e43842a7927c9c8e038f869e844b75e2 Mon Sep 17 00:00:00 2001 From: Scott Godin Date: Wed, 17 Dec 2025 14:40:17 -0500 Subject: [PATCH 03/14] - register for notification before checking map to avoid race --- moq-transport/src/session/subscriber.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 955a33cc..49c59e8d 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -296,7 +296,10 @@ impl Subscriber { let timeout_duration = Duration::from_millis(timeout_ms); tokio::time::timeout(timeout_duration, async { loop { - // Check first + // Register for notification before checking map + let notified = self.subscribe_alias_notify.notified(); + + // Check Map for alias if let Some(id) = self .subscribe_alias_map .lock() @@ -306,8 +309,9 @@ impl Subscriber { { return id; } - // Then wait for notification - self.subscribe_alias_notify.notified().await; + + // Alias not present yet, wait for notification + notified.await; } }) .await From 11d189196a23896eb25f80adb4ae964d2776a600 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 00:43:07 +0000 Subject: [PATCH 04/14] chore: release --- Cargo.lock | 14 +++---- moq-api/CHANGELOG.md | 6 +++ moq-api/Cargo.toml | 2 +- moq-clock-ietf/CHANGELOG.md | 12 ++++++ moq-clock-ietf/Cargo.toml | 6 +-- moq-native-ietf/CHANGELOG.md | 18 ++++++++ moq-native-ietf/Cargo.toml | 4 +- moq-pub/CHANGELOG.md | 9 ++++ moq-pub/Cargo.toml | 6 +-- moq-relay-ietf/CHANGELOG.md | 23 +++++++++++ moq-relay-ietf/Cargo.toml | 6 +-- moq-sub/CHANGELOG.md | 10 +++++ moq-sub/Cargo.toml | 6 +-- moq-transport/CHANGELOG.md | 80 ++++++++++++++++++++++++++++++++++++ moq-transport/Cargo.toml | 2 +- 15 files changed, 181 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfc98acc..a53c8b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "moq-api" -version = "0.2.4" +version = "0.2.5" dependencies = [ "axum", "clap", @@ -1228,7 +1228,7 @@ dependencies = [ [[package]] name = "moq-clock-ietf" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "chrono", @@ -1245,7 +1245,7 @@ dependencies = [ [[package]] name = "moq-native-ietf" -version = "0.5.5" +version = "0.6.0" dependencies = [ "anyhow", "clap", @@ -1268,7 +1268,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.8.5" +version = "0.8.6" dependencies = [ "anyhow", "bytes", @@ -1289,7 +1289,7 @@ dependencies = [ [[package]] name = "moq-relay-ietf" -version = "0.7.5" +version = "0.7.6" dependencies = [ "anyhow", "async-trait", @@ -1318,7 +1318,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.3.4" +version = "0.4.0" dependencies = [ "anyhow", "clap", @@ -1337,7 +1337,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.11.0" +version = "0.12.0" dependencies = [ "bytes", "futures", diff --git a/moq-api/CHANGELOG.md b/moq-api/CHANGELOG.md index 391451bc..93bc6109 100644 --- a/moq-api/CHANGELOG.md +++ b/moq-api/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.5](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.4...moq-api-v0.2.5) - 2025-12-18 + +### Other + +- Bump redis to fix deprecation warning during build + ## [0.2.4](https://github.com/englishm/moq-rs/compare/moq-api-v0.2.3...moq-api-v0.2.4) - 2025-09-15 ### Other diff --git a/moq-api/Cargo.toml b/moq-api/Cargo.toml index 7d4072c7..7ea2747a 100644 --- a/moq-api/Cargo.toml +++ b/moq-api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.2.4" +version = "0.2.5" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-clock-ietf/CHANGELOG.md b/moq-clock-ietf/CHANGELOG.md index 1d22f19a..9b285228 100644 --- a/moq-clock-ietf/CHANGELOG.md +++ b/moq-clock-ietf/CHANGELOG.md @@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.6](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.5...moq-clock-ietf-v0.6.6) - 2025-12-18 + +### Other + +- Add extension header support to datagrams +- Fix Datagram Support +- Wire Up Track Status Handling +- moq-clock-ietf variable renames and comments added +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration +- -clock demo - task out reception of new streams so we don't need to wait for previous stream to end + ## [0.6.5](https://github.com/englishm/moq-rs/compare/moq-clock-ietf-v0.6.4...moq-clock-ietf-v0.6.5) - 2025-09-15 ### Other diff --git a/moq-clock-ietf/Cargo.toml b/moq-clock-ietf/Cargo.toml index bc97649d..73a52b46 100644 --- a/moq-clock-ietf/Cargo.toml +++ b/moq-clock-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.5" +version = "0.6.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-transport = { path = "../moq-transport", version = "0.12" } # QUIC url = "2" diff --git a/moq-native-ietf/CHANGELOG.md b/moq-native-ietf/CHANGELOG.md index d9420bee..faa28e8d 100644 --- a/moq-native-ietf/CHANGELOG.md +++ b/moq-native-ietf/CHANGELOG.md @@ -6,6 +6,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.5.5...moq-native-ietf-v0.6.0) - 2025-12-18 + +### Other + +- Update moq-native-ietf/src/quic.rs +- Print CID for clock sessions +- cargo fmt +- Refactor mlog feature for better layering +- First pass of 'mlog' support +- Implement per-connection qlog file generation +- Thread qlog_dir and base_server_config to accept_session +- Store qlog_dir and base_server_config in Server struct +- Validate qlog directory exists at startup +- Add --qlog-dir CLI argument to QUIC configuration +- Enable qlog feature of quinn +- Log QUIC CIDs for accepted connections +- Use newer quinn + ## [0.5.5](https://github.com/englishm/moq-rs/compare/moq-native-ietf-v0.5.4...moq-native-ietf-v0.5.5) - 2025-09-15 ### Other diff --git a/moq-native-ietf/Cargo.toml b/moq-native-ietf/Cargo.toml index 7815678d..cdb22d35 100644 --- a/moq-native-ietf/Cargo.toml +++ b/moq-native-ietf/Cargo.toml @@ -5,14 +5,14 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.5" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-transport = { path = "../moq-transport", version = "0.12" } web-transport = { workspace = true } web-transport-quinn = "0.3" diff --git a/moq-pub/CHANGELOG.md b/moq-pub/CHANGELOG.md index 453dccce..5ececa26 100644 --- a/moq-pub/CHANGELOG.md +++ b/moq-pub/CHANGELOG.md @@ -6,6 +6,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.8.6](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.5...moq-pub-v0.8.6) - 2025-12-18 + +### Other + +- moq-transport variable renames and comments added +- Log CID +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration + ## [0.8.5](https://github.com/englishm/moq-rs/compare/moq-pub-v0.8.4...moq-pub-v0.8.5) - 2025-09-15 ### Other diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 23af605d..24198d1e 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.8.5" +version = "0.8.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-transport = { path = "../moq-transport", version = "0.12" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-relay-ietf/CHANGELOG.md b/moq-relay-ietf/CHANGELOG.md index 65ace645..c0e14e1c 100644 --- a/moq-relay-ietf/CHANGELOG.md +++ b/moq-relay-ietf/CHANGELOG.md @@ -6,6 +6,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.6](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.5...moq-relay-ietf-v0.7.6) - 2025-12-18 + +### Other + +- Use correlation IDs in errors +- cargo fmt +- Add support for nested namespaces +- Revert "Add support for namespace hierachies" +- Address PR feedback +- cargo fmt +- Add support for namespace hierachies +- Wire Up Track Status Handling +- moq-relay-ietf variable renames and comments added +- Update moq-relay-ietf/src/relay.rs +- Print CID for clock sessions +- Add --mlog-serve +- Refactor mlog feature for better layering +- First pass of 'mlog' support +- Allow either CID or CID_server.qlog paths +- Add --qlog-serve +- Wire qlog_dir CLI argument through moq-relay-ietf +- Add --qlog-dir CLI argument to QUIC configuration + ## [0.7.5](https://github.com/englishm/moq-rs/compare/moq-relay-ietf-v0.7.4...moq-relay-ietf-v0.7.5) - 2025-09-15 ### Other diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 773f0ba4..e8a4406f 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley", "Manish Kumar Pandit"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" -version = "0.7.5" +version = "0.7.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -20,8 +20,8 @@ name = "moq-relay-ietf" path = "src/bin/moq-relay-ietf/main.rs" [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.12" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } moq-api = { path = "../moq-api", version = "0.2" } web-transport = { workspace = true } diff --git a/moq-sub/CHANGELOG.md b/moq-sub/CHANGELOG.md index 685698a7..9eedc258 100644 --- a/moq-sub/CHANGELOG.md +++ b/moq-sub/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.4.0](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.3.4...moq-sub-v0.4.0) - 2025-12-18 + +### Other + +- Add support for nested namespaces +- Log CID +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration +- Merge branch 'main' into sub-catalog + ## [0.3.4](https://github.com/englishm/moq-rs/compare/moq-sub-v0.3.3...moq-sub-v0.3.4) - 2025-09-15 ### Other diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index e87deba6..5fc3514c 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.3.4" +version = "0.4.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.12" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-transport/CHANGELOG.md b/moq-transport/CHANGELOG.md index 86b2fd8a..fb02df4b 100644 --- a/moq-transport/CHANGELOG.md +++ b/moq-transport/CHANGELOG.md @@ -6,6 +6,86 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.12.0](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.11.0...moq-transport-v0.12.0) - 2025-12-18 + +### Fixed + +- fix issues noticed by CoPilot + +### Other + +- - register for notification before checking map to avoid race +- If we receive a stream header with an unknown track alias, then wait for up to 1 second for SubscribeOk to arrive +- cargo fmt +- Improve error expressivity and safety +- Address PR feedback +- Use correlation IDs in errors +- Add error correlation ids +- Align error codes with draft-14 +- Consistently handle unimplemented features +- Add extension header support to datagrams +- Add support for Immutable Extension headers +- Add logging for immutable extension headers +- Fix last commit +- Fixup KVPs Parsing +- Merge pull request #108 from cloudflare/me/more-kvp-parsing-logging +- Merge pull request #102 from sgodin/datagram-logging +- -fix typo in error definition +- - enable trace level logging on fly +- cargo fmt +- Add support for nested namespaces +- Revert "Add support for namespace hierachies" +- Address PR feedback +- cargo fmt +- Add support for namespace hierachies +- Fix Datagram Support +- Wire Up Track Status Handling +- cargo fmt +- Add separators when printing multiple kv pairs +- Print max 16 bytes of BytesValues when debugging +- Cleanup mlog serialization +- Appease Copilot with more comments +- Improve err handling of push_and_wait_until_popped +- Fix comment typo +- Fix comment typo +- Fixup Subscribe Alias Handling +- -fix spelling errors found by CoPilot +- moq-transport variable renames and comments added +- Use FilterType::LargestObject for subscribe +- Fix param types to match draft-14 +- cargo fmt +- Add MoQT qlog events and TODOs for remainder +- cargo fmt +- cargo clippy --fix +- Add more qlog logging to 'mlog' session logs +- Add qlog events for generic logs +- Add some events for subgroup headers and objects +- Add more moqt qlog events +- Emit subscribe and subscribe_ok moqt qlog events +- Add more moqt qlog events +- Refactor mlog feature for better layering +- cargo fmt +- First pass of 'mlog' support +- Initial mlog scaffolding +- Add/bump serde for mlog in moq-transport +- Merge pull request #78 from sgodin/moq-interim-updates-2 +- cargo fmt +- Fix lint nit +- Fix lint nit +- Add extra logging +- cargo fmt +- - send track_alias in SubscribeOk to match what is sent in the stream header +- cargo fmt +- Appease linter +- -clock demo - task out reception of new streams so we don't need to wait for previous stream to end +- Tidy versions test fixture +- Tidy track namespace test fixture +- Tidy tuple test fixture +- Setup message test formatting +- Fix comment placement in Location test +- Fix comment placement in KeyValuePair tests +- VarInt tests - use binary literals for readability + ## [0.11.0](https://github.com/englishm/moq-rs/compare/moq-transport-v0.10.0...moq-transport-v0.11.0) - 2025-09-15 ### Other diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 86756455..b499dad6 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.11.0" +version = "0.12.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] From b3a5c29871c79d7e5215fbdb8a745bd9166567c8 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:34:58 +0000 Subject: [PATCH 05/14] chore: release --- Cargo.lock | 14 +++++++------- moq-api/CHANGELOG.md | 6 ++++++ moq-api/Cargo.toml | 2 +- moq-clock-ietf/CHANGELOG.md | 6 ++++++ moq-clock-ietf/Cargo.toml | 4 ++-- moq-native-ietf/CHANGELOG.md | 6 ++++++ moq-native-ietf/Cargo.toml | 2 +- moq-pub/CHANGELOG.md | 6 ++++++ moq-pub/Cargo.toml | 4 ++-- moq-relay-ietf/CHANGELOG.md | 28 ++++++++++++++++++++++++++++ moq-relay-ietf/Cargo.toml | 4 ++-- moq-sub/CHANGELOG.md | 6 ++++++ moq-sub/Cargo.toml | 4 ++-- moq-transport/CHANGELOG.md | 6 ++++++ moq-transport/Cargo.toml | 2 +- 15 files changed, 82 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a53c8b33..e45c4da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "moq-api" -version = "0.2.5" +version = "0.2.6" dependencies = [ "axum", "clap", @@ -1228,7 +1228,7 @@ dependencies = [ [[package]] name = "moq-clock-ietf" -version = "0.6.6" +version = "0.6.7" dependencies = [ "anyhow", "chrono", @@ -1245,7 +1245,7 @@ dependencies = [ [[package]] name = "moq-native-ietf" -version = "0.6.0" +version = "0.7.0" dependencies = [ "anyhow", "clap", @@ -1268,7 +1268,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.8.6" +version = "0.8.7" dependencies = [ "anyhow", "bytes", @@ -1289,7 +1289,7 @@ dependencies = [ [[package]] name = "moq-relay-ietf" -version = "0.7.6" +version = "0.7.7" dependencies = [ "anyhow", "async-trait", @@ -1318,7 +1318,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.4.0" +version = "0.4.1" dependencies = [ "anyhow", "clap", @@ -1337,7 +1337,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.12.0" +version = "0.12.1" dependencies = [ "bytes", "futures", diff --git a/moq-api/CHANGELOG.md b/moq-api/CHANGELOG.md index 93bc6109..d4ba5ede 100644 --- a/moq-api/CHANGELOG.md +++ b/moq-api/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.6](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.5...moq-api-v0.2.6) - 2025-12-18 + +### Other + +- update Cargo.lock dependencies + ## [0.2.5](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.4...moq-api-v0.2.5) - 2025-12-18 ### Other diff --git a/moq-api/Cargo.toml b/moq-api/Cargo.toml index 7ea2747a..0e3a9a24 100644 --- a/moq-api/Cargo.toml +++ b/moq-api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.2.5" +version = "0.2.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-clock-ietf/CHANGELOG.md b/moq-clock-ietf/CHANGELOG.md index 9b285228..ea65e0cf 100644 --- a/moq-clock-ietf/CHANGELOG.md +++ b/moq-clock-ietf/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.7](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.6...moq-clock-ietf-v0.6.7) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.6.6](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.5...moq-clock-ietf-v0.6.6) - 2025-12-18 ### Other diff --git a/moq-clock-ietf/Cargo.toml b/moq-clock-ietf/Cargo.toml index 73a52b46..9614f43a 100644 --- a/moq-clock-ietf/Cargo.toml +++ b/moq-clock-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.6" +version = "0.6.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,7 +14,7 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-transport = { path = "../moq-transport", version = "0.12" } # QUIC diff --git a/moq-native-ietf/CHANGELOG.md b/moq-native-ietf/CHANGELOG.md index faa28e8d..1e3773dd 100644 --- a/moq-native-ietf/CHANGELOG.md +++ b/moq-native-ietf/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.6.0...moq-native-ietf-v0.7.0) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.6.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.5.5...moq-native-ietf-v0.6.0) - 2025-12-18 ### Other diff --git a/moq-native-ietf/Cargo.toml b/moq-native-ietf/Cargo.toml index cdb22d35..39968516 100644 --- a/moq-native-ietf/Cargo.toml +++ b/moq-native-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.0" +version = "0.7.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-pub/CHANGELOG.md b/moq-pub/CHANGELOG.md index 5ececa26..4dbd3e9f 100644 --- a/moq-pub/CHANGELOG.md +++ b/moq-pub/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.8.7](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.6...moq-pub-v0.8.7) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.8.6](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.5...moq-pub-v0.8.6) - 2025-12-18 ### Other diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 24198d1e..cac56cc0 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.8.6" +version = "0.8.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,7 +14,7 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-transport = { path = "../moq-transport", version = "0.12" } moq-catalog = { path = "../moq-catalog", version = "0.2" } diff --git a/moq-relay-ietf/CHANGELOG.md b/moq-relay-ietf/CHANGELOG.md index c0e14e1c..e1afb5fe 100644 --- a/moq-relay-ietf/CHANGELOG.md +++ b/moq-relay-ietf/CHANGELOG.md @@ -6,6 +6,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.7](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.6...moq-relay-ietf-v0.7.7) - 2025-12-18 + +### Added + +- add file-based coordinator and rewrote remote for handling remote streams + +### Fixed + +- ci +- linter +- seperate RemoteManager rewrite to different PR +- remove once_cell to pass the test +- clippy unused imports +- clippy warnings +- race and proper task shutdown +- if host is IpAddr construct socket addr else resolve dns +- update lookup signature to return owned Client instead of reference +- prevent file truncation when opening for read/write in FileCoordinator +- add lifetime parameter to lookup method signature for proper borrow checking +- return clients on lookup for coordinator and misc fix + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay +- remove track registration from coordinator interface and file implementation +- clarify coordinator file usage in CLI help text and add FIXME for unregister_namespace +- restructure relay into lib/bin and add coordinator interface + ## [0.7.6](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.5...moq-relay-ietf-v0.7.6) - 2025-12-18 ### Other diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index e8a4406f..bfd0edcd 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley", "Manish Kumar Pandit"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" -version = "0.7.6" +version = "0.7.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -21,7 +21,7 @@ path = "src/bin/moq-relay-ietf/main.rs" [dependencies] moq-transport = { path = "../moq-transport", version = "0.12" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-api = { path = "../moq-api", version = "0.2" } web-transport = { workspace = true } diff --git a/moq-sub/CHANGELOG.md b/moq-sub/CHANGELOG.md index 9eedc258..960e76e7 100644 --- a/moq-sub/CHANGELOG.md +++ b/moq-sub/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.4.1](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.4.0...moq-sub-v0.4.1) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.4.0](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.3.4...moq-sub-v0.4.0) - 2025-12-18 ### Other diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index 5fc3514c..cb9f7d71 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.4.0" +version = "0.4.1" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -15,7 +15,7 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport", version = "0.12" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-transport/CHANGELOG.md b/moq-transport/CHANGELOG.md index fb02df4b..91606bad 100644 --- a/moq-transport/CHANGELOG.md +++ b/moq-transport/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.12.1](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.12.0...moq-transport-v0.12.1) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.12.0](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.11.0...moq-transport-v0.12.0) - 2025-12-18 ### Fixed diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index b499dad6..9685a2a1 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.12.0" +version = "0.12.1" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] From 8dcd496ad56927d4e6966e6d08fc637e121c38f4 Mon Sep 17 00:00:00 2001 From: Manish Date: Fri, 19 Dec 2025 17:20:04 +0530 Subject: [PATCH 06/14] add Mike in authors list --- moq-relay-ietf/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index bfd0edcd..771d2d73 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "moq-relay-ietf" description = "Media over QUIC" -authors = ["Luke Curley", "Manish Kumar Pandit"] +authors = ["Luke Curley", "Manish Kumar Pandit", "Mike English"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" From e7433699bc870ba814a92f68fafeb76413995e23 Mon Sep 17 00:00:00 2001 From: Manish Date: Mon, 4 May 2026 11:31:14 +0530 Subject: [PATCH 07/14] fix: apply suggestions from opencode review --- moq-relay-ietf/Cargo.toml | 5 +- moq-relay-ietf/src/lib.rs | 2 +- moq-relay-ietf/src/producer.rs | 1 - moq-relay-ietf/src/relay.rs | 446 ++++++++++++------------ moq-relay-ietf/src/remote.rs | 194 ++++++++--- moq-transport/src/session/subscribe.rs | 19 + moq-transport/src/session/subscriber.rs | 13 +- 7 files changed, 403 insertions(+), 277 deletions(-) diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 1430a943..005fceb6 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -33,7 +33,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } -tokio-util = "0.7" +tokio-util = { version = "0.7", default-features = false } futures = "0.3" async-trait = "0.1" @@ -70,9 +70,6 @@ thiserror = "2.0.17" metrics = "0.24" metrics-exporter-prometheus = { version = "0.16", optional = true } -# misc -#once_cell = "1.21.3" - [features] default = [] metrics-prometheus = ["dep:metrics-exporter-prometheus"] diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs index 098fa047..c469a730 100644 --- a/moq-relay-ietf/src/lib.rs +++ b/moq-relay-ietf/src/lib.rs @@ -48,6 +48,6 @@ pub use coordinator::*; pub use local::*; pub use producer::*; pub use relay::*; -pub use remote::*; +pub use remote::RemoteManager; pub use session::*; pub use web::*; diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index 2f42c106..4a96615a 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -46,7 +46,6 @@ impl Producer { /// Run the producer to serve subscribe requests. pub async fn run(self) -> Result<(), SessionError> { - //let mut tasks = FuturesUnordered::new(); let mut tasks: FuturesUnordered> = FuturesUnordered::new(); diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index ee088a84..06a43c9e 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -120,239 +120,249 @@ impl Relay { /// Run the relay server. pub async fn run(self) -> anyhow::Result<()> { - let mut tasks = FuturesUnordered::new(); + let Self { + quic_endpoints, + announce_url, + mlog_dir, + locals, + remotes, + coordinator, + } = self; - // Split remotes producer/consumer and spawn producer task - let remotes = self.remotes; + let run_result = async { + let mut tasks = FuturesUnordered::new(); - // Start the forwarder, if any - let forward_producer = if let Some(url) = &self.announce_url { - tracing::info!("forwarding announces to {}", url); + // Use the remote manager for routing to remote relays. + let remote_manager = remotes.clone(); - // Establish a QUIC connection to the forward URL - let (session, _quic_client_initial_cid, transport) = self.quic_endpoints[0] - .client - .connect(url, None) - .await - .context("failed to establish forward connection")?; + // Start the forwarder, if any + let forward_producer = if let Some(url) = &announce_url { + tracing::info!("forwarding announces to {}", url); - // Create the MoQ session over the connection - let (session, publisher, subscriber) = - moq_transport::session::Session::connect(session, None, transport) + // Establish a QUIC connection to the forward URL + let (session, _quic_client_initial_cid, transport) = quic_endpoints[0] + .client + .connect(url, None) .await - .context("failed to establish forward session")?; - - // Use the connection path already validated and stored by Session::connect(). - // The forward session is scoped to whatever path the announce URL specifies. - // - // Note: the forward connection intentionally does not call - // coordinator.resolve_scope(). The announce URL is operator-configured - // (via --announce), not client-supplied, so it doesn't need the same - // auth/permission checks that incoming client connections get. The - // forward session always gets both Producer and Consumer (full - // read-write) since it's acting as a relay peer, not a client. - // - // Limitation: all incoming scopes are forwarded to this single upstream scope. - // Multi-scope forwarding (routing different incoming scopes to different - // upstream paths) would require per-scope forward connections. - let forward_scope = session.connection_path().map(|s| s.to_string()); - - let coordinator = self.coordinator.clone(); - let session = Session { - session, - producer: Some(Producer::new( - publisher, - self.locals.clone(), - remotes.clone(), - forward_scope.clone(), - )), - consumer: Some(Consumer::new( - subscriber, - self.locals.clone(), - coordinator, - None, - forward_scope, - )), - // Forward connections are always full read-write relay peers, - // so no reject loops needed. - reject_publishes: None, - reject_subscribes: None, + .context("failed to establish forward connection")?; + + // Create the MoQ session over the connection + let (session, publisher, subscriber) = + moq_transport::session::Session::connect(session, None, transport) + .await + .context("failed to establish forward session")?; + + // Use the connection path already validated and stored by Session::connect(). + // The forward session is scoped to whatever path the announce URL specifies. + // + // Note: the forward connection intentionally does not call + // coordinator.resolve_scope(). The announce URL is operator-configured + // (via --announce), not client-supplied, so it doesn't need the same + // auth/permission checks that incoming client connections get. The + // forward session always gets both Producer and Consumer (full + // read-write) since it's acting as a relay peer, not a client. + // + // Limitation: all incoming scopes are forwarded to this single upstream scope. + // Multi-scope forwarding (routing different incoming scopes to different + // upstream paths) would require per-scope forward connections. + let forward_scope = session.connection_path().map(|s| s.to_string()); + + let forward_coordinator = coordinator.clone(); + let session = Session { + session, + producer: Some(Producer::new( + publisher, + locals.clone(), + remote_manager.clone(), + forward_scope.clone(), + )), + consumer: Some(Consumer::new( + subscriber, + locals.clone(), + forward_coordinator, + None, + forward_scope, + )), + // Forward connections are always full read-write relay peers, + // so no reject loops needed. + reject_publishes: None, + reject_subscribes: None, + }; + + let forward_producer = session.producer.clone(); + + tasks.push(async move { session.run().await.context("forwarding failed") }.boxed()); + + forward_producer + } else { + None }; - let forward_producer = session.producer.clone(); - - tasks.push(async move { session.run().await.context("forwarding failed") }.boxed()); - - forward_producer - } else { - None - }; - - let servers: Vec = self - .quic_endpoints - .into_iter() - .map(|endpoint| { - endpoint - .server - .context("missing TLS certificate for server") - }) - .collect::>()?; - - // This will hold the futures for all our listening servers. - let mut accepts: FuturesUnordered = FuturesUnordered::new(); - for mut server in servers { - tracing::info!("listening on {}", server.local_addr()?); - - // Create a future, box it, and push it to the collection. - accepts.push( - async move { - let conn = server.accept().await.context("accept failed"); - (conn, server) - } - .boxed(), - ); - } + let servers: Vec = quic_endpoints + .into_iter() + .map(|endpoint| endpoint.server.context("missing TLS certificate for server")) + .collect::>()?; + + // This will hold the futures for all our listening servers. + let mut accepts: FuturesUnordered = FuturesUnordered::new(); + for mut server in servers { + tracing::info!("listening on {}", server.local_addr()?); + + // Create a future, box it, and push it to the collection. + accepts.push( + async move { + let conn = server.accept().await.context("accept failed"); + (conn, server) + } + .boxed(), + ); + } - loop { - tokio::select! { - // This branch polls all the `accept` futures concurrently. - Some((conn_result, mut server)) = accepts.next() => { - // An accept operation has completed. - // First, immediately queue up the next accept() call for this server. - accepts.push( - async move { - let conn = server.accept().await.context("accept failed"); - (conn, server) - } - .boxed(), - ); - - let (conn, connection_id, transport) = conn_result.context("failed to accept QUIC connection")?; - - metrics::counter!("moq_relay_connections_total").increment(1); - - // Construct mlog path from connection ID if mlog directory is configured - let mlog_path = self.mlog_dir.as_ref() - .map(|dir| dir.join(format!("{}_server.mlog", connection_id))); - - let locals = self.locals.clone(); - let remotes = remotes.clone(); - let forward = forward_producer.clone(); - let coordinator = self.coordinator.clone(); - - // Spawn a new task to handle the connection - tasks.push(async move { - // Track active connections - decrements when task completes - let _conn_guard = GaugeGuard::new("moq_relay_active_connections"); - - // Clone the raw connection so we can close it with a proper - // error code if scope resolution fails after the MoQ handshake. - let raw_conn = conn.clone(); - - // Create the MoQ session over the connection (setup handshake etc) - let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path, transport).await { - Ok(session) => session, - Err(err) => { - tracing::warn!(error = %err, "failed to accept MoQ session: {}", err); - metrics::counter!("moq_relay_connection_errors_total", "stage" => "session_accept").increment(1); - // Maintain invariant: connections_total - connections_closed_total == active_connections - metrics::counter!("moq_relay_connections_closed_total").increment(1); - return Ok(()); + loop { + tokio::select! { + // This branch polls all the `accept` futures concurrently. + Some((conn_result, mut server)) = accepts.next() => { + // An accept operation has completed. + // First, immediately queue up the next accept() call for this server. + accepts.push( + async move { + let conn = server.accept().await.context("accept failed"); + (conn, server) } - }; - - // Create our MoQ relay session - let moq_session = session; - - // Resolve the connection path to a scope (identity + permissions). - // This translates the raw transport-level path into an application-level - // scope_id and determines what the connection is allowed to do. - let scope_info = match coordinator.resolve_scope(moq_session.connection_path()).await { - Ok(info) => info, - Err(err) => { - tracing::warn!( + .boxed(), + ); + + let (conn, connection_id, transport) = conn_result.context("failed to accept QUIC connection")?; + + metrics::counter!("moq_relay_connections_total").increment(1); + + // Construct mlog path from connection ID if mlog directory is configured + let mlog_path = mlog_dir.as_ref() + .map(|dir| dir.join(format!("{}_server.mlog", connection_id))); + + let locals = locals.clone(); + let remotes = remote_manager.clone(); + let forward = forward_producer.clone(); + let coordinator = coordinator.clone(); + + // Spawn a new task to handle the connection + tasks.push(async move { + // Track active connections - decrements when task completes + let _conn_guard = GaugeGuard::new("moq_relay_active_connections"); + + // Clone the raw connection so we can close it with a proper + // error code if scope resolution fails after the MoQ handshake. + let raw_conn = conn.clone(); + + // Create the MoQ session over the connection (setup handshake etc) + let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path, transport).await { + Ok(session) => session, + Err(err) => { + tracing::warn!(error = %err, "failed to accept MoQ session: {}", err); + metrics::counter!("moq_relay_connection_errors_total", "stage" => "session_accept").increment(1); + // Maintain invariant: connections_total - connections_closed_total == active_connections + metrics::counter!("moq_relay_connections_closed_total").increment(1); + return Ok(()); + } + }; + + // Create our MoQ relay session + let moq_session = session; + + // Resolve the connection path to a scope (identity + permissions). + // This translates the raw transport-level path into an application-level + // scope_id and determines what the connection is allowed to do. + let scope_info = match coordinator.resolve_scope(moq_session.connection_path()).await { + Ok(info) => info, + Err(err) => { + tracing::warn!( + connection_path = moq_session.connection_path(), + error = %err, + "scope resolution failed, rejecting session" + ); + // Close with PROTOCOL_VIOLATION (0x3) so the client + // gets a meaningful error instead of an abrupt reset. + // This is a QUIC APPLICATION_CLOSE, not a MoQT SESSION_CLOSE + // control message. Sending a proper SESSION_CLOSE would require + // running the MoQ session's send loop, which is not warranted + // for a pre-session rejection. The QUIC close code and reason + // string are visible to the client's transport layer. + raw_conn.close(0x3, "scope resolution failed"); + metrics::counter!("moq_relay_connection_errors_total", "stage" => "scope_resolve").increment(1); + metrics::counter!("moq_relay_connections_closed_total").increment(1); + return Ok(()); + } + }; + + let scope_id = scope_info.as_ref().map(|s| s.scope_id.clone()); + let can_publish = scope_info.as_ref().is_none_or(|s| s.permissions.can_publish()); + let can_subscribe = scope_info.as_ref().is_none_or(|s| s.permissions.can_subscribe()); + + if let Some(ref info) = scope_info { + tracing::debug!( connection_path = moq_session.connection_path(), - error = %err, - "scope resolution failed, rejecting session" + scope_id = %info.scope_id, + permissions = ?info.permissions, + "scope resolved" ); - // Close with PROTOCOL_VIOLATION (0x3) so the client - // gets a meaningful error instead of an abrupt reset. - // This is a QUIC APPLICATION_CLOSE, not a MoQT SESSION_CLOSE - // control message. Sending a proper SESSION_CLOSE would require - // running the MoQ session's send loop, which is not warranted - // for a pre-session rejection. The QUIC close code and reason - // string are visible to the client's transport layer. - raw_conn.close(0x3, "scope resolution failed"); - metrics::counter!("moq_relay_connection_errors_total", "stage" => "scope_resolve").increment(1); - metrics::counter!("moq_relay_connections_closed_total").increment(1); - return Ok(()); - } - }; - - let scope_id = scope_info.as_ref().map(|s| s.scope_id.clone()); - let can_publish = scope_info.as_ref().is_none_or(|s| s.permissions.can_publish()); - let can_subscribe = scope_info.as_ref().is_none_or(|s| s.permissions.can_subscribe()); - - if let Some(ref info) = scope_info { - tracing::debug!( - connection_path = moq_session.connection_path(), - scope_id = %info.scope_id, - permissions = ?info.permissions, - "scope resolved" - ); - } - - // Gate Producer/Consumer creation on permissions. - // Note the intentional inversion: - // - Producer serves SUBSCRIBEs → gated on can_subscribe - // - Consumer handles PUBLISH_NAMESPACEs → gated on can_publish - // - // When a half is disabled, we pass its transport counterpart - // to the Session's reject fields so unauthorized messages get - // an explicit error response instead of being silently ignored. - let (producer, reject_subscribes) = if can_subscribe { - (publisher.map(|publisher| Producer::new(publisher, locals.clone(), remotes, scope_id.clone())), None) - } else { - (None, publisher) - }; - - let (consumer, reject_publishes) = if can_publish { - (subscriber.map(|subscriber| Consumer::new(subscriber, locals, coordinator, forward, scope_id)), None) - } else { - (None, subscriber) - }; - - let session = Session { - session: moq_session, - producer, - consumer, - reject_publishes, - reject_subscribes, - }; - - match session.run().await { - Ok(()) => { - // Session ended cleanly (uncommon - usually ends via close) - metrics::counter!("moq_relay_connections_closed_total").increment(1); - } - Err(err) if err.is_graceful_close() => { - // Graceful close - peer sent APPLICATION_CLOSE with code 0 - tracing::debug!("MoQ session closed gracefully"); - metrics::counter!("moq_relay_connections_closed_total").increment(1); } - Err(err) => { - // Actual error - protocol violation, timeout, etc. - tracing::warn!(error = %err, "MoQ session error: {}", err); - metrics::counter!("moq_relay_connection_errors_total", "stage" => "session_run").increment(1); - metrics::counter!("moq_relay_connections_closed_total").increment(1); + + // Gate Producer/Consumer creation on permissions. + // Note the intentional inversion: + // - Producer serves SUBSCRIBEs → gated on can_subscribe + // - Consumer handles PUBLISH_NAMESPACEs → gated on can_publish + // + // When a half is disabled, we pass its transport counterpart + // to the Session's reject fields so unauthorized messages get + // an explicit error response instead of being silently ignored. + let (producer, reject_subscribes) = if can_subscribe { + (publisher.map(|publisher| Producer::new(publisher, locals.clone(), remotes, scope_id.clone())), None) + } else { + (None, publisher) + }; + + let (consumer, reject_publishes) = if can_publish { + (subscriber.map(|subscriber| Consumer::new(subscriber, locals, coordinator, forward, scope_id)), None) + } else { + (None, subscriber) + }; + + let session = Session { + session: moq_session, + producer, + consumer, + reject_publishes, + reject_subscribes, + }; + + match session.run().await { + Ok(()) => { + // Session ended cleanly (uncommon - usually ends via close) + metrics::counter!("moq_relay_connections_closed_total").increment(1); + } + Err(err) if err.is_graceful_close() => { + // Graceful close - peer sent APPLICATION_CLOSE with code 0 + tracing::debug!("MoQ session closed gracefully"); + metrics::counter!("moq_relay_connections_closed_total").increment(1); + } + Err(err) => { + // Actual error - protocol violation, timeout, etc. + tracing::warn!(error = %err, "MoQ session error: {}", err); + metrics::counter!("moq_relay_connection_errors_total", "stage" => "session_run").increment(1); + metrics::counter!("moq_relay_connections_closed_total").increment(1); + } } - } - Ok(()) - }.boxed()); - }, - res = tasks.next(), if !tasks.is_empty() => res.unwrap()?, + Ok(()) + }.boxed()); + }, + res = tasks.next(), if !tasks.is_empty() => res.unwrap()?, + } } } + .await; + + remotes.shutdown().await; + run_result } } diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index d7120f74..2a517593 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use moq_native_ietf::quic; use moq_transport::coding::TrackNamespace; @@ -20,6 +20,8 @@ use crate::{metrics::GaugeGuard, Coordinator, CoordinatorError}; /// Keyed by both URL and destination address so that connections are reused /// only when both match. type RemoteCacheKey = (Url, Option); +type RemoteSlot = Arc>>; +type TrackCacheKey = (TrackNamespace, String); /// Manages connections to remote relays. /// @@ -30,7 +32,7 @@ type RemoteCacheKey = (Url, Option); pub struct RemoteManager { coordinator: Arc, clients: Vec, - remotes: Arc>>, + remotes: Arc>>, } impl RemoteManager { @@ -71,7 +73,6 @@ impl RemoteManager { Ok(remote) => remote, Err(err) => { tracing::error!(remote_url = %url, error = %err, "failed to connect to remote relay: {}", err); - self.remove(&cache_key).await; return Err(err); } }; @@ -82,10 +83,8 @@ impl RemoteManager { { Ok(reader) => Ok(reader), Err(err) => { - if !remote.is_connected() { - tracing::warn!(remote_url = %url, "remote connection is dead, removing from cache"); - self.remove(&cache_key).await; - } + tracing::warn!(remote_url = %url, error = %err, "remote subscribe failed, removing from cache"); + self.remove_if_same_remote(&cache_key, &remote).await; Err(err) } @@ -98,59 +97,98 @@ impl RemoteManager { cache_key: RemoteCacheKey, client: Option<&quic::Client>, ) -> anyhow::Result { - let mut remotes = self.remotes.lock().await; + let client = match client { + Some(client) => client, + None => self.clients.first().ok_or_else(|| { + anyhow::anyhow!("no QUIC clients configured for remote connections") + })?, + }; - if let Some(remote) = remotes.get(&cache_key) { + // The manager lock only protects the map. The per-key slot lock protects + // that key's connection state, so unrelated remotes can connect in parallel. + let slot = { + let mut remotes = self.remotes.lock().await; + remotes + .entry(cache_key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(None))) + .clone() + }; + + let mut cached = slot.lock().await; + + if let Some(remote) = cached.as_ref() { if remote.is_connected() { return Ok(remote.clone()); } tracing::info!(remote_url = %cache_key.0, "removing dead connection to remote relay"); - remotes.remove(&cache_key); - } - - let client = match client { - Some(client) => client, - None => self.clients.first().ok_or_else(|| { - anyhow::anyhow!("no QUIC clients configured for remote connections") - })?, }; - tracing::info!(remote_url = %cache_key.0, "connecting to remote relay"); - let remote = Remote::connect(cache_key.0.clone(), cache_key.1, client).await?; - remotes.insert(cache_key, remote.clone()); + if let Some(remote) = cached.take() { + remote.shutdown().await; + } + tracing::info!(remote_url = %cache_key.0, "connecting to remote relay"); + let remote = Remote::connect( + cache_key.0.clone(), + cache_key.1, + client, + Arc::downgrade(&slot), + ) + .await?; + + *cached = Some(remote.clone()); Ok(remote) } - /// Remove a remote connection (called when connection fails). - async fn remove(&self, cache_key: &RemoteCacheKey) { - let mut remotes = self.remotes.lock().await; - if let Some(remote) = remotes.remove(cache_key) { - remote.shutdown(); + async fn remove_if_same_remote(&self, cache_key: &RemoteCacheKey, remote: &Remote) { + let slot = { + let remotes = self.remotes.lock().await; + remotes.get(cache_key).cloned() + }; + + let removed = if let Some(slot) = slot { + let mut cached = slot.lock().await; + match cached.as_ref() { + Some(current) if current.is_same_connection(remote) => cached.take(), + _ => None, + } + } else { + None + }; + + if let Some(remote) = removed { + remote.shutdown().await; } } /// Shutdown all remote connections. - pub async fn shutdown(&self) { - let mut remotes = self.remotes.lock().await; - for (cache_key, remote) in remotes.drain() { + pub(crate) async fn shutdown(&self) { + let remotes = { + let mut remotes = self.remotes.lock().await; + remotes.drain().collect::>() + }; + + for (cache_key, slot) in remotes { tracing::info!(remote_url = %cache_key.0, "shutting down remote connection"); - remote.shutdown(); + let mut remote = slot.lock().await; + if let Some(remote) = remote.take() { + remote.shutdown().await; + } } } } /// A connection to a single remote relay with its own QUIC client. #[derive(Clone)] -pub struct Remote { +struct Remote { url: Url, subscriber: moq_transport::session::Subscriber, - /// Track subscriptions - maps (namespace, track_name) to track reader - tracks: Arc>>, - /// Flag indicating if the connection is still alive + /// Track subscriptions keyed by full track name. + tracks: Arc>>, + /// Flag indicating if the connection is still alive. connected: Arc, - /// Cancellation token for the session task + /// Cancellation token for the session task. cancel: CancellationToken, } @@ -160,6 +198,7 @@ impl Remote { url: Url, addr: Option, client: &quic::Client, + cache_slot: Weak>>, ) -> anyhow::Result { let (session, _quic_client_initial_cid, transport) = match client.connect(&url, addr).await { @@ -205,6 +244,15 @@ impl Remote { } session_connected.store(false, Ordering::Release); + + if let Some(cache_slot) = cache_slot.upgrade() { + let mut cached = cache_slot.lock().await; + if matches!(cached.as_ref(), Some(remote) if Arc::ptr_eq(&remote.connected, &session_connected)) + { + cached.take(); + tracing::info!(remote_url = %session_url, "cleared closed remote connection from cache"); + } + } }); Ok(Self { @@ -217,18 +265,23 @@ impl Remote { } /// Check if the connection is still alive. - pub fn is_connected(&self) -> bool { + fn is_connected(&self) -> bool { self.connected.load(Ordering::Acquire) } + fn is_same_connection(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.connected, &other.connected) + } + /// Shutdown the remote connection. - pub fn shutdown(&self) { + async fn shutdown(&self) { self.cancel.cancel(); self.connected.store(false, Ordering::Release); + self.tracks.lock().await.clear(); } /// Subscribe to a track on this remote relay. - pub async fn subscribe( + async fn subscribe( &self, namespace: TrackNamespace, track_name: String, @@ -238,29 +291,68 @@ impl Remote { } let key = (namespace.clone(), track_name.clone()); - let mut tracks = self.tracks.lock().await; - if let Some(reader) = tracks.get(&key) { - return Ok(Some(reader.clone())); - } + { + let mut tracks = self.tracks.lock().await; + if let Some(reader) = tracks.get(&key) { + if !reader.is_closed() { + return Ok(Some(reader.clone())); + } - let (writer, reader) = Track::new(namespace, track_name).produce(); - tracks.insert(key.clone(), reader.clone()); - drop(tracks); + tracing::debug!(remote_url = %self.url, namespace = %key.0, track = %key.1, "removing closed remote track from cache"); + tracks.remove(&key); + } + } let mut subscriber = self.subscriber.clone(); - let tracks = self.tracks.clone(); let url = self.url.clone(); + let tracks = Arc::downgrade(&self.tracks); + let cancel = self.cancel.clone(); - tokio::spawn(async move { - tracing::info!(remote_url = %url, namespace = %key.0, track = %key.1, "subscribing to remote track"); + tracing::info!(remote_url = %url, namespace = %key.0, track = %key.1, "subscribing to remote track"); + + let (writer, reader) = Track::new(namespace, track_name).produce(); + let subscribe = subscriber.subscribe_open(writer).await?; - if let Err(err) = subscriber.subscribe(writer).await { - tracing::warn!(remote_url = %url, namespace = %key.0, track = %key.1, error = %err, "failed subscribing to remote track: {}", err); + { + let mut tracks = self.tracks.lock().await; + if let Some(current) = tracks.get(&key) { + if !current.is_closed() { + return Ok(Some(current.clone())); + } + + tracks.remove(&key); + } + + tracks.insert(key.clone(), reader.clone()); + } + + let cleanup_key = key.clone(); + let cleanup_reader = reader.clone(); + tokio::spawn(async move { + tokio::select! { + result = subscribe.closed() => { + match result { + Ok(()) => { + tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription ended"); + } + Err(err) => { + tracing::warn!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, error = %err, "remote track subscription ended with error: {}", err); + } + } + } + _ = cancel.cancelled() => { + tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription cancelled"); + } } - tracks.lock().await.remove(&key); - tracing::debug!(remote_url = %url, namespace = %key.0, track = %key.1, "remote track subscription ended"); + if let Some(tracks) = tracks.upgrade() { + let mut tracks = tracks.lock().await; + if matches!(tracks.get(&cleanup_key), Some(current) if Arc::ptr_eq(¤t.info, &cleanup_reader.info)) + { + tracks.remove(&cleanup_key); + } + } }); Ok(Some(reader)) diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index c98e7370..8052623c 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -140,6 +140,25 @@ impl Subscribe { .await; } } + + pub async fn ok(&self) -> Result<(), ServeError> { + loop { + { + let state = self.state.lock(); + state.closed.clone()?; + + if state.ok { + return Ok(()); + } + + match state.modified() { + Some(notify) => notify, + None => return Err(ServeError::Done), + } + } + .await; + } + } } impl Drop for Subscribe { diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index af3b51a4..2a0755aa 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -123,11 +123,20 @@ impl Subscriber { /// Subscribe to a track by creating a new subscribe request to the publisher. Block until subscription is closed. pub async fn subscribe(&mut self, track: serve::TrackWriter) -> Result<(), ServeError> { + let subscribe = self.subscribe_open(track).await?; + subscribe.closed().await + } + + /// Subscribe to a track and wait until the publisher acknowledges it. + pub async fn subscribe_open( + &mut self, + track: serve::TrackWriter, + ) -> Result { let request_id = self.get_next_request_id(); let (send, recv) = Subscribe::new(self.clone(), request_id, track); self.subscribes.lock().unwrap().insert(request_id, recv); - - send.closed().await + send.ok().await?; + Ok(send) } /// Send a message to the publisher via the control stream. From b827a2bb7889753adb964e11acfba104e1bc4f05 Mon Sep 17 00:00:00 2001 From: Manish Date: Mon, 4 May 2026 16:13:06 +0530 Subject: [PATCH 08/14] fix: suggestions from opencode reviewers --- moq-relay-ietf/src/remote.rs | 270 +++++++++++++++++++++++------------ 1 file changed, 181 insertions(+), 89 deletions(-) diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 2a517593..570163bc 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -22,6 +22,7 @@ use crate::{metrics::GaugeGuard, Coordinator, CoordinatorError}; type RemoteCacheKey = (Url, Option); type RemoteSlot = Arc>>; type TrackCacheKey = (TrackNamespace, String); +type TrackSlot = Arc>>; /// Manages connections to remote relays. /// @@ -104,41 +105,62 @@ impl RemoteManager { })?, }; - // The manager lock only protects the map. The per-key slot lock protects - // that key's connection state, so unrelated remotes can connect in parallel. - let slot = { - let mut remotes = self.remotes.lock().await; - remotes - .entry(cache_key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(None))) - .clone() - }; + loop { + // The manager lock only protects the map. The per-key slot lock protects + // that key's connection state, so unrelated remotes can connect in parallel. + let slot = { + let mut remotes = self.remotes.lock().await; + remotes + .entry(cache_key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(None))) + .clone() + }; + + let mut cached = slot.lock().await; - let mut cached = slot.lock().await; + let is_current_slot = { + let remotes = self.remotes.lock().await; + matches!(remotes.get(&cache_key), Some(current) if Arc::ptr_eq(current, &slot)) + }; - if let Some(remote) = cached.as_ref() { - if remote.is_connected() { - return Ok(remote.clone()); + if !is_current_slot { + continue; } - tracing::info!(remote_url = %cache_key.0, "removing dead connection to remote relay"); - }; + if let Some(remote) = cached.as_ref() { + if remote.is_connected() { + return Ok(remote.clone()); + } - if let Some(remote) = cached.take() { - remote.shutdown().await; - } + tracing::info!(remote_url = %cache_key.0, "removing dead connection to remote relay"); + }; + + if let Some(remote) = cached.take() { + remote.shutdown().await; + } + + tracing::info!(remote_url = %cache_key.0, "connecting to remote relay"); + let remote = match Remote::connect( + cache_key.0.clone(), + cache_key.1, + client, + Arc::downgrade(&self.remotes), + cache_key.clone(), + Arc::downgrade(&slot), + ) + .await + { + Ok(remote) => remote, + Err(err) => { + drop(cached); + remove_empty_remote_slot(&self.remotes, &cache_key, &slot).await; + return Err(err); + } + }; - tracing::info!(remote_url = %cache_key.0, "connecting to remote relay"); - let remote = Remote::connect( - cache_key.0.clone(), - cache_key.1, - client, - Arc::downgrade(&slot), - ) - .await?; - - *cached = Some(remote.clone()); - Ok(remote) + *cached = Some(remote.clone()); + return Ok(remote); + } } async fn remove_if_same_remote(&self, cache_key: &RemoteCacheKey, remote: &Remote) { @@ -147,18 +169,19 @@ impl RemoteManager { remotes.get(cache_key).cloned() }; - let removed = if let Some(slot) = slot { - let mut cached = slot.lock().await; - match cached.as_ref() { - Some(current) if current.is_same_connection(remote) => cached.take(), - _ => None, - } - } else { - None - }; + if let Some(slot) = slot { + let removed = { + let mut cached = slot.lock().await; + match cached.as_ref() { + Some(current) if current.is_same_connection(remote) => cached.take(), + _ => None, + } + }; - if let Some(remote) = removed { - remote.shutdown().await; + if let Some(remote) = removed { + remote.shutdown().await; + remove_empty_remote_slot(&self.remotes, cache_key, &slot).await; + } } } @@ -179,13 +202,45 @@ impl RemoteManager { } } +async fn remove_empty_remote_slot( + remotes: &Arc>>, + cache_key: &RemoteCacheKey, + slot: &RemoteSlot, +) { + let cached = slot.lock().await; + if cached.is_some() { + return; + } + + let mut remotes = remotes.lock().await; + if matches!(remotes.get(cache_key), Some(current) if Arc::ptr_eq(current, slot)) { + remotes.remove(cache_key); + } +} + +async fn remove_empty_track_slot( + tracks: &Arc>>, + key: &TrackCacheKey, + slot: &TrackSlot, +) { + let cached = slot.lock().await; + if cached.is_some() { + return; + } + + let mut tracks = tracks.lock().await; + if matches!(tracks.get(key), Some(current) if Arc::ptr_eq(current, slot)) { + tracks.remove(key); + } +} + /// A connection to a single remote relay with its own QUIC client. #[derive(Clone)] struct Remote { url: Url, subscriber: moq_transport::session::Subscriber, /// Track subscriptions keyed by full track name. - tracks: Arc>>, + tracks: Arc>>, /// Flag indicating if the connection is still alive. connected: Arc, /// Cancellation token for the session task. @@ -198,6 +253,8 @@ impl Remote { url: Url, addr: Option, client: &quic::Client, + remotes: Weak>>, + cache_key: RemoteCacheKey, cache_slot: Weak>>, ) -> anyhow::Result { let (session, _quic_client_initial_cid, transport) = match client.connect(&url, addr).await @@ -246,12 +303,21 @@ impl Remote { session_connected.store(false, Ordering::Release); if let Some(cache_slot) = cache_slot.upgrade() { + let mut cleared = false; let mut cached = cache_slot.lock().await; if matches!(cached.as_ref(), Some(remote) if Arc::ptr_eq(&remote.connected, &session_connected)) { cached.take(); + cleared = true; tracing::info!(remote_url = %session_url, "cleared closed remote connection from cache"); } + drop(cached); + + if cleared { + if let Some(remotes) = remotes.upgrade() { + remove_empty_remote_slot(&remotes, &cache_key, &cache_slot).await; + } + } } }); @@ -286,76 +352,102 @@ impl Remote { namespace: TrackNamespace, track_name: String, ) -> anyhow::Result> { - if !self.is_connected() { - anyhow::bail!("remote connection to {} is closed", self.url); - } - let key = (namespace.clone(), track_name.clone()); - { - let mut tracks = self.tracks.lock().await; - if let Some(reader) = tracks.get(&key) { + loop { + if !self.is_connected() { + anyhow::bail!("remote connection to {} is closed", self.url); + } + + let slot = { + let mut tracks = self.tracks.lock().await; + tracks + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(None))) + .clone() + }; + + let mut cached = slot.lock().await; + + let is_current_slot = { + let tracks = self.tracks.lock().await; + matches!(tracks.get(&key), Some(current) if Arc::ptr_eq(current, &slot)) + }; + + if !is_current_slot { + continue; + } + + if let Some(reader) = cached.as_ref() { if !reader.is_closed() { return Ok(Some(reader.clone())); } tracing::debug!(remote_url = %self.url, namespace = %key.0, track = %key.1, "removing closed remote track from cache"); - tracks.remove(&key); } - } - let mut subscriber = self.subscriber.clone(); - let url = self.url.clone(); - let tracks = Arc::downgrade(&self.tracks); - let cancel = self.cancel.clone(); + cached.take(); - tracing::info!(remote_url = %url, namespace = %key.0, track = %key.1, "subscribing to remote track"); + let mut subscriber = self.subscriber.clone(); + let url = self.url.clone(); + let tracks = Arc::downgrade(&self.tracks); + let cancel = self.cancel.clone(); - let (writer, reader) = Track::new(namespace, track_name).produce(); - let subscribe = subscriber.subscribe_open(writer).await?; + tracing::info!(remote_url = %url, namespace = %key.0, track = %key.1, "subscribing to remote track"); - { - let mut tracks = self.tracks.lock().await; - if let Some(current) = tracks.get(&key) { - if !current.is_closed() { - return Ok(Some(current.clone())); + let (writer, reader) = Track::new(namespace.clone(), track_name.clone()).produce(); + let subscribe = match subscriber.subscribe_open(writer).await { + Ok(subscribe) => subscribe, + Err(err) => { + drop(cached); + remove_empty_track_slot(&self.tracks, &key, &slot).await; + return Err(err.into()); } + }; - tracks.remove(&key); + if !self.is_connected() { + drop(cached); + remove_empty_track_slot(&self.tracks, &key, &slot).await; + anyhow::bail!("remote connection to {} is closed", self.url); } - tracks.insert(key.clone(), reader.clone()); - } - - let cleanup_key = key.clone(); - let cleanup_reader = reader.clone(); - tokio::spawn(async move { - tokio::select! { - result = subscribe.closed() => { - match result { - Ok(()) => { - tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription ended"); - } - Err(err) => { - tracing::warn!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, error = %err, "remote track subscription ended with error: {}", err); + *cached = Some(reader.clone()); + drop(cached); + + let cleanup_key = key.clone(); + let cleanup_reader = reader.clone(); + let cleanup_slot = slot.clone(); + tokio::spawn(async move { + tokio::select! { + result = subscribe.closed() => { + match result { + Ok(()) => { + tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription ended"); + } + Err(err) => { + tracing::warn!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, error = %err, "remote track subscription ended with error: {}", err); + } } } + _ = cancel.cancelled() => { + tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription cancelled"); + } } - _ = cancel.cancelled() => { - tracing::debug!(remote_url = %url, namespace = %cleanup_key.0, track = %cleanup_key.1, "remote track subscription cancelled"); - } - } - if let Some(tracks) = tracks.upgrade() { - let mut tracks = tracks.lock().await; - if matches!(tracks.get(&cleanup_key), Some(current) if Arc::ptr_eq(¤t.info, &cleanup_reader.info)) - { - tracks.remove(&cleanup_key); + if let Some(tracks) = tracks.upgrade() { + let mut cached = cleanup_slot.lock().await; + if matches!(cached.as_ref(), Some(current) if Arc::ptr_eq(¤t.info, &cleanup_reader.info)) + { + cached.take(); + } + drop(cached); + + remove_empty_track_slot(&tracks, &cleanup_key, &cleanup_slot).await; } - } - }); + }); - Ok(Some(reader)) + return Ok(Some(reader)); + } } } From f8856524b803325197dd1c9d2b7b44361818f10a Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 5 May 2026 12:12:02 +0530 Subject: [PATCH 09/14] fix: tokio utils use default features --- moq-relay-ietf/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 005fceb6..4fe345f4 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -33,7 +33,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } -tokio-util = { version = "0.7", default-features = false } +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" From c7a667b299b8631b2c9f66a23adf25a485f1fb72 Mon Sep 17 00:00:00 2001 From: Manish Date: Wed, 13 May 2026 09:15:10 +0530 Subject: [PATCH 10/14] keep the comments for readability purpose --- moq-relay-ietf/src/producer.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index 4a96615a..9387b6a1 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -121,6 +121,7 @@ impl Producer { } } + // Check remote tracks second, and serve from remote if possible match self .remotes .subscribe(self.scope.as_deref(), &namespace, &track_name) @@ -130,17 +131,23 @@ impl Producer { if let Some(track) = track { let ns = namespace.to_utf8_path(); tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info); + // Update label to indicate remote source, timing recorded on drop timing_guard.set_label("source", "remote"); + // Track active tracks - decrements when serve completes let _track_guard = GaugeGuard::new("moq_relay_active_tracks"); return Ok(subscribed.serve(track).await?); } } Err(e) => { + // Route error = infrastructure failure (couldn't reach coordinator/upstream) + // This is different from "not found" - we don't know if the track exists let ns = namespace.to_utf8_path(); tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e); timing_guard.set_label("source", "route_error"); metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1); + // Return an internal error rather than "not found" since we couldn't check + // TODO: Consider returning a more specific error to the subscriber let err = ServeError::internal_ctx(format!( "route error for namespace '{}': {}", namespace, e From 4e5352cf1a60b7c838481ba9658829e7ff7a402c Mon Sep 17 00:00:00 2001 From: Manish Date: Wed, 13 May 2026 09:38:42 +0530 Subject: [PATCH 11/14] check for cancelled of cancellation token when waiting for subscribe open --- moq-relay-ietf/src/remote.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 570163bc..f86ba374 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -396,7 +396,16 @@ impl Remote { tracing::info!(remote_url = %url, namespace = %key.0, track = %key.1, "subscribing to remote track"); let (writer, reader) = Track::new(namespace.clone(), track_name.clone()).produce(); - let subscribe = match subscriber.subscribe_open(writer).await { + let subscribe_result = tokio::select! { + result = subscriber.subscribe_open(writer) => result, + _ = cancel.cancelled() => { + drop(cached); + remove_empty_track_slot(&self.tracks, &key, &slot).await; + anyhow::bail!("subscribe cancelled, remote connection to {} is closed", self.url); + } + }; + + let subscribe = match subscribe_result { Ok(subscribe) => subscribe, Err(err) => { drop(cached); From e325d1677ae74dec0a60629bacdca150ff4a4795 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 19 May 2026 10:23:56 +0530 Subject: [PATCH 12/14] fix the sub script for url --- dev/sub | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/sub b/dev/sub index 8e6c114a..1b04863b 100755 --- a/dev/sub +++ b/dev/sub @@ -20,6 +20,6 @@ ADDR="${ADDR:-$HOST:$PORT}" NAME="${NAME:-bbb}" # Combine the host and name into a URL. -URL="${URL:-"https://$ADDR/$NAME"}" +URL="${URL:-"https://$ADDR"}" cargo run --bin moq-sub -- --name "$NAME" "$URL" "$@" | ffplay - From d70abe747648a2638c919cb65ad220ac7c93c311 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 19 May 2026 12:41:41 +0530 Subject: [PATCH 13/14] fix: subscribe cleaning on drop --- moq-sub/README.md | 6 +- moq-transport/src/session/subscribe.rs | 1 + moq-transport/src/session/subscriber.rs | 85 ++++++++++++++++++++++++- 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/moq-sub/README.md b/moq-sub/README.md index c1340d57..695cc68d 100644 --- a/moq-sub/README.md +++ b/moq-sub/README.md @@ -2,9 +2,9 @@ A command line tool for subscribing to media via Media over QUIC (MoQ). -Takes an URL to MoQ relay with a broadcast name in the path part of the URL. It will connect to the relay, subscribe to -the broadcast, and dump the media segments of the first video and first audio track to STDOUT. +Takes a URL to a MoQ relay and a broadcast name via `--name`. It will connect to the relay, subscribe to the broadcast, +and dump the media segments of the first video and first audio track to STDOUT. ``` -moq-sub https://localhost:4443/dev | ffplay - +moq-sub --name dev https://localhost:4443 | ffplay - ``` diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 8052623c..ce63e6c3 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -165,6 +165,7 @@ impl Drop for Subscribe { fn drop(&mut self) { self.subscriber .send_message(message::Unsubscribe { id: self.info.id }); + self.subscriber.remove_subscribe(self.info.id); } } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 2a0755aa..2653abee 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -244,7 +244,7 @@ impl Subscriber { } /// Remove a subscribe from our map of active subscribes, and the alias map if present. - fn remove_subscribe(&mut self, id: u64) -> Option { + pub(super) fn remove_subscribe(&mut self, id: u64) -> Option { if let Some(subscribe) = self.subscribes.lock().unwrap().remove(&id) { // Remove from alias map if present if let Some(track_alias) = subscribe.track_alias() { @@ -734,3 +734,86 @@ impl Subscriber { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::{sync::atomic, task::Poll}; + + use super::*; + use crate::{ + message::{self, GroupOrder}, + serve::Track, + }; + + fn subscriber() -> Subscriber { + Subscriber::new(Queue::default(), Arc::new(atomic::AtomicU64::new(0)), None) + } + + #[tokio::test] + async fn subscribe_open_cleans_up_when_cancelled_before_ok() { + let mut subscriber = subscriber(); + let observer = subscriber.clone(); + let (writer, _reader) = + Track::new(TrackNamespace::from_utf8_path("test"), "0.mp4".into()).produce(); + + { + let subscribe = subscriber.subscribe_open(writer); + futures::pin_mut!(subscribe); + + assert!(matches!(futures::poll!(&mut subscribe), Poll::Pending)); + assert_eq!(observer.subscribes.lock().unwrap().len(), 1); + } + + assert!(observer.subscribes.lock().unwrap().is_empty()); + assert!(observer.subscribe_alias_map.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn dropping_open_subscribe_removes_recv_state() { + let mut subscriber = subscriber(); + let observer = subscriber.clone(); + let (writer, _reader) = + Track::new(TrackNamespace::from_utf8_path("test"), "0.mp4".into()).produce(); + + let subscribe = subscriber.subscribe_open(writer); + futures::pin_mut!(subscribe); + + assert!(matches!(futures::poll!(&mut subscribe), Poll::Pending)); + assert_eq!(observer.subscribes.lock().unwrap().len(), 1); + + let mut receiver = observer.clone(); + receiver + .recv_subscribe_ok(&message::SubscribeOk { + id: 0, + track_alias: 10, + expires: 0, + group_order: GroupOrder::Publisher, + content_exists: false, + largest_location: None, + params: Default::default(), + }) + .unwrap(); + + let subscribe = match futures::poll!(&mut subscribe) { + Poll::Ready(Ok(subscribe)) => subscribe, + Poll::Ready(Err(err)) => panic!("subscribe failed: {err}"), + Poll::Pending => panic!("subscribe remained pending after SubscribeOk"), + }; + + assert_eq!(observer.subscribes.lock().unwrap().len(), 1); + assert_eq!( + observer + .subscribe_alias_map + .lock() + .unwrap() + .get(&10) + .copied(), + Some(0) + ); + + drop(subscribe); + + assert!(observer.subscribes.lock().unwrap().is_empty()); + assert!(observer.subscribe_alias_map.lock().unwrap().is_empty()); + } +} From 9074c26d081cc67db39491a15a62e86b74940d4f Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 19 May 2026 12:55:31 +0530 Subject: [PATCH 14/14] update time package to latest --- Cargo.lock | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4eab2305..cc6ddb79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1464,9 +1464,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" [[package]] name = "num-integer" @@ -2501,30 +2501,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.44" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.24" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core",