From 77eece8e462cd3a2502cfd87ee6f7272221bc0f0 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 01:40:33 +0000 Subject: [PATCH 1/5] feat(moq-net): sync lite-05 wire with drafts #34/#35 (drop cache hint, add broadcast epoch, rename ANNOUNCE) Brings the moq-lite-05 (Lite05Wip) wire and naming in line with the two latest drafts commits: - #34: remove the Publisher Cache hint from TRACK_INFO. Retention is now a best-effort cache, not a guaranteed minimum, so the field no longer rides the wire. The local retention window (model TrackInfo.cache) stays and falls back to DEFAULT_CACHE on the subscriber. - #35: rename ANNOUNCE -> ANNOUNCE_BROADCAST and ANNOUNCE_INTEREST -> ANNOUNCE_REQUEST (wire names only; stream/message IDs unchanged), and add a per-broadcast Epoch. The epoch is carried end-to-end: BroadcastInfo.epoch is a SystemTime defaulting to now (the instant the instance was created), encoded on the wire as milliseconds since 2020-01-01 UTC. The origin's route selection now prefers the newer instance (larger epoch) among equal-length routes, then falls back to the existing deterministic hash tie-break, so a cluster still converges. Shortest hop chain still wins first to preserve routing. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_0118VcLtfuukFERQ8ito5dUG --- rs/moq-net/src/ietf/subscriber.rs | 4 +- rs/moq-net/src/lite/announce.rs | 122 +++++++++++++++++++++++++----- rs/moq-net/src/lite/publisher.rs | 66 ++++++++++------ rs/moq-net/src/lite/subscriber.rs | 38 +++++++--- rs/moq-net/src/lite/track.rs | 7 -- rs/moq-net/src/lite/version.rs | 12 +++ rs/moq-net/src/model/broadcast.rs | 56 +++++++++++++- rs/moq-net/src/model/origin.rs | 34 ++++++--- 8 files changed, 264 insertions(+), 75 deletions(-) diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index 97964fab7..5c06be6e5 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -460,7 +460,9 @@ impl Subscriber { let mut hops = crate::OriginList::new(); hops.push(self.session_origin) .expect("an empty hop chain has room for one entry"); - let broadcast = BroadcastInfo { hops }.produce(); + // moq-transport carries no broadcast epoch on the wire; stamp the current + // time so the instance is still ordered against any future re-announce. + let broadcast = BroadcastInfo { hops, ..Default::default() }.produce(); // Create the dynamic handler BEFORE publishing so consumers see // dynamic >= 1 the moment they receive the announce. Otherwise a diff --git a/rs/moq-net/src/lite/announce.rs b/rs/moq-net/src/lite/announce.rs index 086422682..fc117af2c 100644 --- a/rs/moq-net/src/lite/announce.rs +++ b/rs/moq-net/src/lite/announce.rs @@ -16,27 +16,34 @@ pub fn restart_supported(version: Version) -> bool { ) } -/// Sent by the publisher to announce the availability of a track. -/// The payload contains the contents of the wildcard. +/// ANNOUNCE_BROADCAST: sent by the publisher to advertise (or retract) a broadcast. +/// +/// Carries the broadcast path suffix, its instance [`epoch`](crate::BroadcastInfo::epoch) +/// (lite-05+), and the hop chain. Renamed from ANNOUNCE in lite-05. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Announce<'a> { +pub enum AnnounceBroadcast<'a> { Active { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + /// Broadcast instance epoch (lite-05+); `0` on older versions. + epoch: u64, hops: OriginList, }, Ended { #[cfg_attr(feature = "serde", serde(borrow))] suffix: Path<'a>, + /// Epoch of the instance that ended (lite-05+); `0` on older versions. + epoch: u64, hops: OriginList, }, } -impl Message for Announce<'_> { +impl Message for AnnounceBroadcast<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { let status = AnnounceStatus::decode(r, version)?; let suffix = Path::decode(r, version)?; + let epoch = decode_epoch(r, version)?; let hops = match version { Version::Lite01 | Version::Lite02 => OriginList::new(), Version::Lite03 => { @@ -53,28 +60,30 @@ impl Message for Announce<'_> { }; Ok(match status { - AnnounceStatus::Active => Self::Active { suffix, hops }, - AnnounceStatus::Ended => Self::Ended { suffix, hops }, + AnnounceStatus::Active => Self::Active { suffix, epoch, hops }, + AnnounceStatus::Ended => Self::Ended { suffix, epoch, hops }, // We encode a restart as a duplicate ANNOUNCE (a second `Active`), but on versions that // support restart we also accept the draft's explicit `restart` status and treat it the // same. For an already-announced path the subscriber turns it into a restart; for an // unknown path it's a fresh announce. Older versions never defined this status, so it's // an invalid value there. - AnnounceStatus::Restart if restart_supported(version) => Self::Active { suffix, hops }, + AnnounceStatus::Restart if restart_supported(version) => Self::Active { suffix, epoch, hops }, AnnounceStatus::Restart => return Err(DecodeError::InvalidValue), }) } fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { match self { - Self::Active { suffix, hops } => { + Self::Active { suffix, epoch, hops } => { AnnounceStatus::Active.encode(w, version)?; suffix.encode(w, version)?; + encode_epoch(w, version, *epoch)?; encode_hops(w, version, hops)?; } - Self::Ended { suffix, hops } => { + Self::Ended { suffix, epoch, hops } => { AnnounceStatus::Ended.encode(w, version)?; suffix.encode(w, version)?; + encode_epoch(w, version, *epoch)?; encode_hops(w, version, hops)?; } } @@ -83,6 +92,23 @@ impl Message for Announce<'_> { } } +/// Encode the broadcast Epoch varint, present only on versions that carry it (lite-05+). +fn encode_epoch(w: &mut W, version: Version, epoch: u64) -> Result<(), EncodeError> { + if version.has_broadcast_epoch() { + epoch.encode(w, version)?; + } + Ok(()) +} + +/// Decode the broadcast Epoch varint, defaulting to `0` on versions that omit it. +fn decode_epoch(r: &mut R, version: Version) -> Result { + if version.has_broadcast_epoch() { + u64::decode(r, version) + } else { + Ok(0) + } +} + fn encode_hops(w: &mut W, version: Version, hops: &OriginList) -> Result<(), EncodeError> { match version { Version::Lite01 | Version::Lite02 => Ok(()), @@ -91,9 +117,10 @@ fn encode_hops(w: &mut W, version: Version, hops: &OriginList) } } -/// Sent by the subscriber to request ANNOUNCE messages. +/// ANNOUNCE_REQUEST: sent by the subscriber to request ANNOUNCE_BROADCAST messages +/// for a path prefix. Renamed from ANNOUNCE_INTEREST in lite-05. #[derive(Clone, Debug)] -pub struct AnnounceInterest<'a> { +pub struct AnnounceRequest<'a> { // Request tracks with this prefix. pub prefix: Path<'a>, // If non-zero, the publisher SHOULD skip announces whose hop IDs contain this value. @@ -101,7 +128,7 @@ pub struct AnnounceInterest<'a> { pub exclude_hop: u64, } -impl Message for AnnounceInterest<'_> { +impl Message for AnnounceRequest<'_> { fn decode_msg(r: &mut R, version: Version) -> Result { let prefix = Path::decode(r, version)?; let exclude_hop = match version { @@ -245,12 +272,13 @@ mod tests { use super::*; use bytes::Buf; - // Forge an ANNOUNCE with the draft's explicit `restart` status (2) for the given version. + // Forge an ANNOUNCE_BROADCAST with the draft's explicit `restart` status (2) for the given version. fn encode_forged_restart(version: Version) -> bytes::Bytes { // Encode a normal Active, then flip its status byte (1 -> 2). let mut buf = bytes::BytesMut::new(); - Announce::Active { + AnnounceBroadcast::Active { suffix: Path::new("foo/bar"), + epoch: 0, hops: OriginList::new(), } .encode(&mut buf, version) @@ -273,10 +301,10 @@ mod tests { fn decodes_explicit_restart_status_as_active_on_lite05() { let version = Version::Lite05Wip; let mut slice = encode_forged_restart(version); - let decoded = Announce::decode(&mut slice, version).expect("explicit restart must decode"); + let decoded = AnnounceBroadcast::decode(&mut slice, version).expect("explicit restart must decode"); assert!(!slice.has_remaining(), "trailing bytes after decode"); assert!( - matches!(decoded, Announce::Active { .. }), + matches!(decoded, AnnounceBroadcast::Active { .. }), "restart should decode as Active" ); } @@ -287,7 +315,7 @@ mod tests { let version = Version::Lite04; let mut slice = encode_forged_restart(version); assert!( - matches!(Announce::decode(&mut slice, version), Err(DecodeError::InvalidValue)), + matches!(AnnounceBroadcast::decode(&mut slice, version), Err(DecodeError::InvalidValue)), "restart status must be rejected before lite-05" ); } @@ -345,23 +373,77 @@ mod tests { } #[test] - fn announce_interest_exclude_hop_round_trip() { + fn announce_request_exclude_hop_round_trip() { // A value above the old 62-bit varint ceiling only survives the fixed-width lite-05 path. for &exclude_hop in &[0u64, 7, 1u64 << 53, u64::MAX] { - let msg = AnnounceInterest { + let msg = AnnounceRequest { prefix: Path::new("foo/bar"), exclude_hop, }; let mut buf = bytes::BytesMut::new(); msg.encode(&mut buf, Version::Lite05Wip).unwrap(); let mut slice = &buf[..]; - let got = AnnounceInterest::decode(&mut slice, Version::Lite05Wip).unwrap(); + let got = AnnounceRequest::decode(&mut slice, Version::Lite05Wip).unwrap(); assert!(slice.is_empty(), "trailing bytes after decode"); assert_eq!(got.exclude_hop, exclude_hop); assert_eq!(got.prefix, msg.prefix); } } + fn broadcast_round_trip(msg: &AnnounceBroadcast, version: Version) -> AnnounceBroadcast<'static> { + let mut buf = bytes::BytesMut::new(); + msg.encode(&mut buf, version).unwrap(); + let mut slice = &buf[..]; + let got = AnnounceBroadcast::decode(&mut slice, version).unwrap(); + assert!(slice.is_empty(), "trailing bytes after decode"); + // Decode borrows from `buf`; re-own so the value can outlive this frame. + match got { + AnnounceBroadcast::Active { suffix, epoch, hops } => AnnounceBroadcast::Active { + suffix: suffix.to_owned(), + epoch, + hops, + }, + AnnounceBroadcast::Ended { suffix, epoch, hops } => AnnounceBroadcast::Ended { + suffix: suffix.to_owned(), + epoch, + hops, + }, + } + } + + #[test] + fn announce_broadcast_epoch_round_trip_on_lite05() { + let mut hops = OriginList::new(); + hops.push(Origin { id: 7 }).unwrap(); + for epoch in [0u64, 1, 1_700_000_000_000, u64::MAX >> 2] { + let msg = AnnounceBroadcast::Active { + suffix: Path::new("room/cam"), + epoch, + hops: hops.clone(), + }; + assert_eq!(broadcast_round_trip(&msg, Version::Lite05Wip), msg); + + let ended = AnnounceBroadcast::Ended { + suffix: Path::new("room/cam"), + epoch, + hops: OriginList::new(), + }; + assert_eq!(broadcast_round_trip(&ended, Version::Lite05Wip), ended); + } + } + + #[test] + fn announce_broadcast_epoch_omitted_before_lite05() { + // Pre-lite-05 carries no epoch on the wire, so a nonzero epoch decodes back as 0. + let msg = AnnounceBroadcast::Active { + suffix: Path::new("room/cam"), + epoch: 42, + hops: OriginList::new(), + }; + let got = broadcast_round_trip(&msg, Version::Lite04); + assert!(matches!(got, AnnounceBroadcast::Active { epoch: 0, .. })); + } + #[test] fn announce_ok_rejects_old_versions() { let msg = AnnounceOk { diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index 040025c69..b2660045a 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -147,7 +147,7 @@ impl Publisher { } pub async fn recv_announce(&self, mut stream: Stream) -> Result<(), Error> { - let interest = stream.reader.decode::().await?; + let interest = stream.reader.decode::().await?; let prefix = interest.prefix.to_owned(); let exclude_hop = interest.exclude_hop; @@ -211,6 +211,12 @@ impl Publisher { let mut stats_guards: std::collections::HashMap = std::collections::HashMap::new(); + // Last advertised epoch per active path (wire value). An ANNOUNCE_BROADCAST + // `ended` carries the epoch of the instance that ended, but the origin's Ended + // event doesn't carry the broadcast, so we stash it here on every Active/Restart + // and read it back when the path goes away. + let mut epochs: std::collections::HashMap = std::collections::HashMap::new(); + match version { Version::Lite01 | Version::Lite02 => { let mut init = Vec::new(); @@ -246,10 +252,10 @@ impl Publisher { } Version::Lite05Wip => { // Drain the current active set synchronously (like the Lite01/02 path), - // stashing suffix+hops so we can both COUNT them for AnnounceOk and re-send - // them afterward. The receiver stamps our origin onto each hop chain, so we - // forward the stored chain as-is (no self push here). - let mut initial: Vec<(crate::PathOwned, OriginList)> = Vec::new(); + // stashing suffix+epoch+hops so we can both COUNT them for AnnounceOk and + // re-send them afterward. The receiver stamps our origin onto each hop chain, + // so we forward the stored chain as-is (no self push here). + let mut initial: Vec<(crate::PathOwned, u64, OriginList)> = Vec::new(); while let Some((path, event)) = announced.try_next() { let suffix = path .strip_prefix(&prefix) @@ -259,7 +265,8 @@ impl Publisher { match event.broadcast() { Some(broadcast) => { - let hops = &broadcast.info().hops; + let info = broadcast.info(); + let hops = &info.hops; // Apply the same exclude_hop and reflected-announce skips as the live // loop so the count matches exactly what we send (minus the self push). if exclude_hop != 0 && hops.iter().any(|h| h.id == exclude_hop) { @@ -268,17 +275,20 @@ impl Publisher { if hops.contains(&self_origin) { continue; } + let epoch = info.epoch_wire(); tracing::debug!(broadcast = %absolute, "announce"); let guard = stats.broadcast(&absolute).publisher(); - stats_guards.entry(absolute).or_insert(guard); - initial.retain(|(s, _)| s != &suffix); - initial.push((suffix, hops.clone())); + stats_guards.entry(absolute.clone()).or_insert(guard); + epochs.insert(absolute, epoch); + initial.retain(|(s, _, _)| s != &suffix); + initial.push((suffix, epoch, hops.clone())); } None => { // A potential race: a just-announced path already unannounced. tracing::debug!(broadcast = %absolute, "unannounce"); stats_guards.remove(&absolute); - initial.retain(|(s, _)| s != &suffix); + epochs.remove(&absolute); + initial.retain(|(s, _, _)| s != &suffix); } } } @@ -291,8 +301,11 @@ impl Publisher { }; stream.writer.encode(&ok).await?; - for (suffix, hops) in initial { - stream.writer.encode(&lite::Announce::Active { suffix, hops }).await?; + for (suffix, epoch, hops) in initial { + stream + .writer + .encode(&lite::AnnounceBroadcast::Active { suffix, epoch, hops }) + .await?; } } _ => { @@ -316,41 +329,49 @@ impl Publisher { match event { crate::Announced::Active(active) => { - let Some(hops) = Self::prepare_active_hops(&active.info().hops, self_origin, exclude_hop, version, &absolute) else { + let info = active.info(); + let Some(hops) = Self::prepare_active_hops(&info.hops, self_origin, exclude_hop, version, &absolute) else { continue; }; + let epoch = info.epoch_wire(); tracing::debug!(broadcast = %absolute, "announce"); let guard = stats.broadcast(&absolute).publisher(); - let prev = stats_guards.insert(absolute, guard); + let prev = stats_guards.insert(absolute.clone(), guard); debug_assert!(prev.is_none(), "origin announced a path that was already active"); - stream.writer.encode(&lite::Announce::Active { suffix, hops }).await?; + epochs.insert(absolute, epoch); + stream.writer.encode(&lite::AnnounceBroadcast::Active { suffix, epoch, hops }).await?; } crate::Announced::Restart(active) => { // On lite-05+ a restart travels as a duplicate ANNOUNCE (a second // `Active` for an already-announced path). Older versions never defined // that, so split it into an unannounce followed by a fresh announce. - match Self::prepare_active_hops(&active.info().hops, self_origin, exclude_hop, version, &absolute) { + let info = active.info(); + match Self::prepare_active_hops(&info.hops, self_origin, exclude_hop, version, &absolute) { Some(hops) => { + let epoch = info.epoch_wire(); tracing::debug!(broadcast = %absolute, "restart"); + epochs.insert(absolute.clone(), epoch); // Continuity: keep the existing stats guard (no close + reopen). if lite::restart_supported(version) { - stream.writer.encode(&lite::Announce::Active { suffix, hops }).await?; + stream.writer.encode(&lite::AnnounceBroadcast::Active { suffix, epoch, hops }).await?; } else { stream .writer - .encode(&lite::Announce::Ended { + .encode(&lite::AnnounceBroadcast::Ended { suffix: suffix.clone(), + epoch, hops: OriginList::new(), }) .await?; - stream.writer.encode(&lite::Announce::Active { suffix, hops }).await?; + stream.writer.encode(&lite::AnnounceBroadcast::Active { suffix, epoch, hops }).await?; } } None => { // The replacement loops back to us; from this peer's view the broadcast is gone. tracing::debug!(broadcast = %absolute, "restart replacement looped; unannouncing"); stats_guards.remove(&absolute); - stream.writer.encode(&lite::Announce::Ended { suffix, hops: OriginList::new() }).await?; + let epoch = epochs.remove(&absolute).unwrap_or(0); + stream.writer.encode(&lite::AnnounceBroadcast::Ended { suffix, epoch, hops: OriginList::new() }).await?; } } } @@ -358,7 +379,9 @@ impl Publisher { tracing::debug!(broadcast = %absolute, "unannounce"); stats_guards.remove(&absolute); // An ended announce doesn't need hops; the receiver matches on path only. - stream.writer.encode(&lite::Announce::Ended { suffix, hops: OriginList::new() }).await?; + // It carries the epoch of the instance that ended (0 if never seen). + let epoch = epochs.remove(&absolute).unwrap_or(0); + stream.writer.encode(&lite::AnnounceBroadcast::Ended { suffix, epoch, hops: OriginList::new() }).await?; } } } @@ -447,7 +470,6 @@ impl Publisher { .encode(&lite::TrackInfo { priority: info.priority, ordered: info.ordered, - cache: info.cache, timescale, compression, }) diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 90c506a99..18f5c9ad9 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -174,7 +174,7 @@ impl Subscriber { // Ask the peer to filter out announces that already passed through us, so // reflected announces (the simple loop case) never hit the wire. Lite03 // peers ignore this field, in which case start_announce below still drops. - let msg = lite::AnnounceInterest { + let msg = lite::AnnounceRequest { prefix: prefix.as_path(), exclude_hop: self.self_origin.id, }; @@ -214,8 +214,9 @@ impl Subscriber { for suffix in msg.suffixes { let path = prefix.join(&suffix); let abs = self.origin.absolute(&path).to_owned(); - // Lite01/02 don't carry hop information; the broadcast starts with an empty chain. - if self.start_announce(path.clone(), crate::OriginList::new(), responder_origin, &mut producers)? { + // Lite01/02 don't carry hop information or an epoch; the broadcast starts with + // an empty chain and a zero epoch (decoded as the 2020 base instant). + if self.start_announce(path.clone(), crate::OriginList::new(), 0, responder_origin, &mut producers)? { stats_guards.insert(abs.clone(), self.stats.broadcast(&abs).subscriber()); } } @@ -245,16 +246,16 @@ impl Subscriber { } }; - while let Some(announce) = stream.reader.decode_maybe::().await? { + while let Some(announce) = stream.reader.decode_maybe::().await? { match announce { - lite::Announce::Active { suffix, hops } => { + lite::AnnounceBroadcast::Active { suffix, epoch, hops } => { let path = prefix.join(&suffix); let abs = self.origin.absolute(&path).to_owned(); if lite::restart_supported(self.version) && producers.contains_key(&path) { // lite-05+ only: a duplicate ANNOUNCE for an already-announced path is a RESTART; // atomically replace the broadcast. Older versions fall through to start_announce, // which rejects the duplicate (Error::Duplicate). - if self.restart_announce(path.clone(), hops, responder_origin, &mut producers)? { + if self.restart_announce(path.clone(), hops, epoch, responder_origin, &mut producers)? { // Continuity: keep the existing stats guard if present. stats_guards .entry(abs.clone()) @@ -262,7 +263,7 @@ impl Subscriber { } else { stats_guards.remove(&abs); } - } else if self.start_announce(path.clone(), hops, responder_origin, &mut producers)? { + } else if self.start_announce(path.clone(), hops, epoch, responder_origin, &mut producers)? { stats_guards.insert(abs.clone(), self.stats.broadcast(&abs).subscriber()); } // The first `initial_count` Active messages are the initial set; once @@ -274,7 +275,7 @@ impl Subscriber { } } } - lite::Announce::Ended { suffix, .. } => { + lite::AnnounceBroadcast::Ended { suffix, .. } => { let path = prefix.join(&suffix); tracing::debug!(broadcast = %self.log_path(&path), "unannounced"); @@ -353,6 +354,8 @@ impl Subscriber { &mut self, path: PathOwned, mut hops: crate::OriginList, + // Broadcast instance epoch from ANNOUNCE_BROADCAST (wire value, 0 on pre-lite-05). + epoch: u64, // Lite05+: the announce sender's origin id (from AnnounceOk). The sender no // longer stamps itself onto the chain, so we append it here to reconstruct // the full `[src...sender]` chain Lite04 stored. None for older versions, @@ -409,7 +412,11 @@ impl Subscriber { tracing::debug!(broadcast = %self.log_path(&path), hops = hops.len(), "announce"); - let broadcast = BroadcastInfo { hops }.produce(); + let broadcast = BroadcastInfo { + hops, + epoch: BroadcastInfo::epoch_from_wire(epoch), + } + .produce(); // Create the dynamic handler BEFORE publishing, so that consumers // see dynamic >= 1 immediately when they receive the announcement. @@ -441,6 +448,8 @@ impl Subscriber { &mut self, path: PathOwned, mut hops: crate::OriginList, + // Broadcast instance epoch from ANNOUNCE_BROADCAST (wire value, 0 on pre-lite-05). + epoch: u64, // Lite05+: the announce sender's origin id (from AnnounceOk), appended here to // rebuild the full chain since the sender no longer stamps itself. None for older // versions. See `start_announce`. @@ -461,7 +470,11 @@ impl Subscriber { tracing::debug!(broadcast = %self.log_path(&path), hops = hops.len(), "restart"); - let broadcast = BroadcastInfo { hops }.produce(); + let broadcast = BroadcastInfo { + hops, + epoch: BroadcastInfo::epoch_from_wire(epoch), + } + .produce(); let dynamic = broadcast.dynamic(); // Publish the replacement first so the origin restarts atomically; the old broadcast is @@ -950,10 +963,13 @@ impl TrackServe { // The publisher FINs after TRACK_INFO; FIN our side too and let the stream drop. let _ = stream.writer.finish(); + // The wire no longer carries a cache hint (the publisher's retention is now + // best-effort, not a guarantee), so the local retention window falls back to + // the model default. let model = crate::TrackInfo { compress: info.compression != Compression::None, timescale: info.timescale, - cache: info.cache, + cache: crate::DEFAULT_CACHE, priority: info.priority, ordered: info.ordered, }; diff --git a/rs/moq-net/src/lite/track.rs b/rs/moq-net/src/lite/track.rs index 523c7f521..2470635e2 100644 --- a/rs/moq-net/src/lite/track.rs +++ b/rs/moq-net/src/lite/track.rs @@ -51,8 +51,6 @@ pub struct TrackInfo { pub priority: u8, /// The publisher's group ordering preference (newest-first when `false`). pub ordered: bool, - /// The minimum age the publisher guarantees to keep an old group around. - pub cache: std::time::Duration, /// Per-frame timestamp scale, or `None` if frames carry no timestamps. On the /// wire `None` is `0` and `Some(n)` is `n`. pub timescale: Option, @@ -68,14 +66,12 @@ impl Message for TrackInfo { let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; - let cache = std::time::Duration::decode(r, version)?; let timescale = Timescale::new(u64::decode(r, version)?).ok(); let compression = Compression::from_code(u64::decode(r, version)?).map_err(|_| DecodeError::InvalidValue)?; Ok(Self { priority, ordered, - cache, timescale, compression, }) @@ -88,7 +84,6 @@ impl Message for TrackInfo { self.priority.encode(w, version)?; (self.ordered as u8).encode(w, version)?; - self.cache.encode(w, version)?; self.timescale.map(u64::from).unwrap_or(0).encode(w, version)?; self.compression.to_code().encode(w, version)?; Ok(()) @@ -103,7 +98,6 @@ mod test { TrackInfo { priority: 7, ordered: false, - cache: std::time::Duration::from_secs(10), timescale: Some(Timescale::MICRO), compression: Compression::Deflate, } @@ -121,7 +115,6 @@ mod test { let got = info_roundtrip(Version::Lite05Wip, &info_sample()); assert_eq!(got.priority, 7); assert!(!got.ordered); - assert_eq!(got.cache, std::time::Duration::from_secs(10)); assert_eq!(got.timescale, Some(Timescale::MICRO)); assert_eq!(got.compression, Compression::Deflate); } diff --git a/rs/moq-net/src/lite/version.rs b/rs/moq-net/src/lite/version.rs index 70c327c4f..fb58361aa 100644 --- a/rs/moq-net/src/lite/version.rs +++ b/rs/moq-net/src/lite/version.rs @@ -43,6 +43,18 @@ impl Version { _ => true, } } + + /// Whether ANNOUNCE_BROADCAST carries a per-broadcast Epoch varint (after the + /// suffix, before the hop chain). Added in lite-05 so a consumer can tell a newer + /// instance of a broadcast from an older one. Older versions omit the field. + #[allow(clippy::match_like_matches_macro)] + pub fn has_broadcast_epoch(self) -> bool { + // Match form so future versions default forward (CLAUDE.md convention). + match self { + Self::Lite01 | Self::Lite02 | Self::Lite03 | Self::Lite04 => false, + _ => true, + } + } } impl fmt::Display for Version { diff --git a/rs/moq-net/src/model/broadcast.rs b/rs/moq-net/src/model/broadcast.rs index 308bcdbe5..2c18c6929 100644 --- a/rs/moq-net/src/model/broadcast.rs +++ b/rs/moq-net/src/model/broadcast.rs @@ -2,29 +2,81 @@ use std::{ collections::{HashMap, VecDeque, hash_map}, sync::Arc, task::{Poll, ready}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use crate::{Error, TrackConsumer, TrackProducer, TrackRequest, TrackWeak}; use super::{OriginList, TrackInfo}; +/// Wall-clock base for the broadcast epoch: 2020-01-01T00:00:00 UTC, expressed as +/// seconds since the Unix epoch. The wire carries milliseconds since this base +/// (smaller than a Unix-epoch value, and good past the year 2500 in a varint). +const EPOCH_BASE_SECS: u64 = 1_577_836_800; + /// A collection of media tracks that can be published and subscribed to. /// /// Create via [`BroadcastInfo::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct BroadcastInfo { /// The chain of origins the broadcast has traversed. Each relay appends its own /// [`crate::Origin`] when forwarding, so the list is used for loop detection and /// shortest-path preference. pub hops: OriginList, + + /// Wall-clock instant identifying this instance of the broadcast. + /// + /// A newer instance of the same broadcast carries a later time than an older one, + /// so a consumer can prefer the newest when the same broadcast is advertised over + /// multiple routes. Defaults to [`SystemTime::now`] (the moment the broadcast was + /// created). Origin-assigned and forwarded unchanged by relays. On the wire + /// (ANNOUNCE_BROADCAST, lite-05+) it is encoded as milliseconds since 2020-01-01 UTC. + pub epoch: SystemTime, +} + +impl Default for BroadcastInfo { + fn default() -> Self { + Self { + hops: OriginList::new(), + // A fresh broadcast instance is stamped with the current wall clock. + epoch: SystemTime::now(), + } + } } impl BroadcastInfo { - /// Create a new broadcast with an empty hop chain. + /// Create a new broadcast with an empty hop chain and an epoch of [`SystemTime::now`]. pub fn new() -> Self { Self::default() } + /// Set the broadcast instance epoch, returning `self` for chaining. + /// + /// A newer instance of the same broadcast must use a later time. See [`Self::epoch`]. + pub fn with_epoch(mut self, epoch: SystemTime) -> Self { + self.epoch = epoch; + self + } + + /// The epoch as the wire value: whole milliseconds since 2020-01-01 UTC. + /// + /// Saturates to `0` for any instant at or before the base (e.g. a skewed clock). + pub(crate) fn epoch_wire(&self) -> u64 { + self.epoch + .duration_since(Self::epoch_base()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) + } + + /// Reconstruct an epoch [`SystemTime`] from the wire value (ms since 2020-01-01 UTC). + pub(crate) fn epoch_from_wire(wire: u64) -> SystemTime { + Self::epoch_base() + Duration::from_millis(wire) + } + + fn epoch_base() -> SystemTime { + UNIX_EPOCH + Duration::from_secs(EPOCH_BASE_SECS) + } + /// Consume this [BroadcastInfo] to create a producer that carries its metadata /// (including the hop chain). pub fn produce(self) -> BroadcastProducer { diff --git a/rs/moq-net/src/model/origin.rs b/rs/moq-net/src/model/origin.rs index 53f9e409e..634a1cb50 100644 --- a/rs/moq-net/src/model/origin.rs +++ b/rs/moq-net/src/model/origin.rs @@ -257,12 +257,14 @@ struct OriginBroadcast { /// Ordering key used to pick the active route among broadcasts at the same path. /// -/// Lower wins. Shorter hop chains sort first; equal-length chains are broken by a -/// deterministic hash of the broadcast name and hop chain, so every node in the -/// cluster, given the same candidate routes, converges on the same winner instead -/// of relying on arrival order. Mixing the name in spreads equal-length routes -/// across different upstreams rather than funneling every broadcast onto one. -fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) { +/// Lower wins. Shorter hop chains sort first (routing prefers the shortest path); +/// among equal-length chains the newer broadcast instance (larger +/// [`epoch`](BroadcastInfo::epoch)) wins, and any remaining ties break on a +/// deterministic hash of the broadcast name and hop chain. Every node in the cluster, +/// given the same candidate routes, converges on the same winner: the epoch and hops +/// are forwarded unchanged, and the hash is build-stable. Mixing the name in spreads +/// equal routes across different upstreams rather than funneling onto one. +fn route_key(name: &Path, info: &BroadcastInfo) -> (usize, std::cmp::Reverse, u64) { // FNV-1a, not the std hasher: its output is fixed across Rust versions and // builds, which matters when nodes run mismatched binaries during a rolling // deploy and still need to agree on the same route. SEED is a custom basis @@ -275,13 +277,14 @@ fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) { for &byte in name.as_str().as_bytes() { hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME); } - for hop in hops { + for hop in &info.hops { for &byte in &hop.id.to_le_bytes() { hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME); } } - (hops.len(), hash) + // Reverse the epoch so a larger (newer) instance sorts lower, i.e. wins. + (info.hops.len(), std::cmp::Reverse(info.epoch_wire()), hash) } /// One coalesced update queued for an `AnnounceConsumer`. @@ -507,7 +510,7 @@ impl OriginNode { return; } - if route_key(&full, &broadcast.info().hops) < route_key(&full, &existing.active.info().hops) { + if route_key(&full, broadcast.info()) < route_key(&full, existing.active.info()) { let old = existing.active.clone(); existing.active = broadcast.clone(); existing.backup.push_back(old); @@ -601,7 +604,7 @@ impl OriginNode { .backup .iter() .enumerate() - .min_by_key(|(_, b)| route_key(&full, &b.info().hops)) + .min_by_key(|(_, b)| route_key(&full, b.info())) .map(|(i, _)| i); if let Some(idx) = best { let active = entry.backup.remove(idx).expect("index in range"); @@ -1863,6 +1866,7 @@ mod tests { // `a` carries one hop; `b` has none, so `b` wins the route and replaces it. let a = BroadcastInfo { hops: OriginList::try_from(vec![Origin::from(1u64)]).unwrap(), + ..Default::default() } .produce(); let b = BroadcastInfo::new().produce(); @@ -1884,6 +1888,7 @@ mod tests { // `a` carries one hop; `b` has none, so `b` wins the route and replaces it. let a = BroadcastInfo { hops: OriginList::try_from(vec![Origin::from(1u64)]).unwrap(), + ..Default::default() } .produce(); let b = BroadcastInfo::new().produce(); @@ -1926,10 +1931,15 @@ mod tests { async fn test_deterministic_tiebreak() { tokio::time::pause(); - // Build a broadcast carrying a specific hop chain. + // Build a broadcast carrying a specific hop chain. All routes share one epoch so + // this test isolates the hash tie-break (a differing epoch would decide first). fn route(ids: &[u64]) -> BroadcastProducer { let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::>()).unwrap(); - BroadcastInfo { hops }.produce() + BroadcastInfo { + hops, + epoch: BroadcastInfo::epoch_from_wire(0), + } + .produce() } // Resolve the active route for "test" after publishing both routes in the given order. From b4e947907ad38499184ca4236b530cd976e4279e Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 01:52:19 +0000 Subject: [PATCH 2/5] feat(moq-net): lite-05 SETUP stream with negotiated probe + path (drafts #26/#27) Adds the lite-05 SETUP message exchanged on a unidirectional Setup Stream (DataType::Setup = 1). Each endpoint opens the stream once, sends a single SETUP advertising its capabilities, and FINs. The two SETUPs are independent: neither side blocks on the peer's before opening other streams. - #26: SETUP message + negotiated Probe capability levels (None/Report/Increase). The SETUP body reuses the parameter framing (Parameters gained typed varint/ bytes accessors). The subscriber now gates opening a PROBE stream on the peer having advertised Probe >= Report in its SETUP; unknown future levels saturate to Increase. Older versions have no Setup Stream and keep probing unconditionally. - #27: Path Setup Parameter for URI-less transports. Client::with_path sets the request path advertised in the client's SETUP (native QUIC / qmux over TCP/TLS); servers never send one. The peer's SETUP (incl. path) is decoded and recorded; wiring the received path into relay routing is left for a follow-up. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_0118VcLtfuukFERQ8ito5dUG --- js/net/src/lite/announce.test.ts | 64 ++++++++++ js/net/src/lite/announce.ts | 68 +++++++--- js/net/src/lite/connection.ts | 4 +- js/net/src/lite/publisher.ts | 38 ++++-- js/net/src/lite/stream.ts | 4 +- js/net/src/lite/subscriber.ts | 22 ++-- js/net/src/lite/track.test.ts | 2 - js/net/src/lite/track.ts | 10 +- js/net/src/lite/version.ts | 16 +++ rs/moq-net/src/client.rs | 25 ++++ rs/moq-net/src/lite/mod.rs | 3 + rs/moq-net/src/lite/parameters.rs | 35 ++++++ rs/moq-net/src/lite/session.rs | 38 +++++- rs/moq-net/src/lite/setup.rs | 203 ++++++++++++++++++++++++++++++ rs/moq-net/src/lite/stream.rs | 3 + rs/moq-net/src/lite/subscriber.rs | 27 ++++ rs/moq-net/src/lite/version.rs | 12 ++ rs/moq-net/src/server.rs | 12 ++ 18 files changed, 540 insertions(+), 46 deletions(-) create mode 100644 js/net/src/lite/announce.test.ts create mode 100644 rs/moq-net/src/lite/setup.rs diff --git a/js/net/src/lite/announce.test.ts b/js/net/src/lite/announce.test.ts new file mode 100644 index 000000000..af277f3c1 --- /dev/null +++ b/js/net/src/lite/announce.test.ts @@ -0,0 +1,64 @@ +import { expect, test } from "bun:test"; +import * as Path from "../path.ts"; +import { Reader, Writer } from "../stream.ts"; +import { AnnounceBroadcast } from "./announce.ts"; +import { OriginSchema } from "./origin.ts"; +import { Version } from "./version.ts"; + +function concat(chunks: Uint8Array[]): Uint8Array { + const total = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const c of chunks) { + out.set(c, offset); + offset += c.byteLength; + } + return out; +} + +async function bytes(f: (w: Writer) => Promise): Promise { + const written: Uint8Array[] = []; + const writer = new Writer( + new WritableStream({ write: (chunk) => void written.push(new Uint8Array(chunk)) }), + ); + await f(writer); + writer.close(); + await writer.closed; + return concat(written); +} + +async function roundTrip(msg: AnnounceBroadcast, version: Version): Promise { + const reader = new Reader(undefined, await bytes((w) => msg.encode(w, version))); + return AnnounceBroadcast.decode(reader, version); +} + +test("AnnounceBroadcast epoch round-trips on draft-05", async () => { + const hops = [OriginSchema.parse(7n)]; + // 1_700_000_000_000 is ~ms since 2020 in 2023; the others probe the edges of u53. + for (const epoch of [0, 1, 1_700_000_000_000, 2 ** 52]) { + const active = new AnnounceBroadcast({ suffix: Path.from("room/cam"), active: true, epoch, hops }); + const gotActive = await roundTrip(active, Version.DRAFT_05_WIP); + expect(gotActive.active).toBe(true); + expect(gotActive.epoch).toBe(epoch); + expect(gotActive.suffix).toBe(Path.from("room/cam")); + expect(gotActive.hops).toEqual(hops); + + const ended = new AnnounceBroadcast({ suffix: Path.from("room/cam"), active: false, epoch }); + const gotEnded = await roundTrip(ended, Version.DRAFT_05_WIP); + expect(gotEnded.active).toBe(false); + expect(gotEnded.epoch).toBe(epoch); + } +}); + +test("AnnounceBroadcast epoch is omitted before draft-05", async () => { + // Pre-lite-05 carries no epoch on the wire, so a nonzero epoch decodes back as 0. + const msg = new AnnounceBroadcast({ + suffix: Path.from("room/cam"), + active: true, + epoch: 42, + hops: [OriginSchema.parse(7n)], + }); + const got = await roundTrip(msg, Version.DRAFT_04); + expect(got.epoch).toBe(0); + expect(got.suffix).toBe(Path.from("room/cam")); +}); diff --git a/js/net/src/lite/announce.ts b/js/net/src/lite/announce.ts index 111a05242..44509cd46 100644 --- a/js/net/src/lite/announce.ts +++ b/js/net/src/lite/announce.ts @@ -2,21 +2,49 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import { type Origin, OriginSchema } from "./origin.ts"; -import { hopsFixedWidth, Version } from "./version.ts"; +import { hasBroadcastEpoch, hopsFixedWidth, Version } from "./version.ts"; // Must match the MAX_HOPS in Rust's model/origin.rs. Broadcasts with longer // hop chains are rejected; this keeps loop-detection bounded and rejects // pathological announcements across clusters with unbounded forwarding. export const MAX_HOPS = 32; -export class Announce { +/** + * Seconds between the Unix epoch and 2020-01-01T00:00:00 UTC. + * + * Broadcast epochs ride the wire as milliseconds since this base (smaller than a + * Unix-epoch value, and good past the year 2500 in a varint). See {@link epochNow}. + */ +export const EPOCH_BASE_SECONDS = 1_577_836_800; + +/** + * The current wall clock as a broadcast epoch: whole milliseconds since + * 2020-01-01 UTC (the wire value). Saturates to `0` for a clock before the base. + */ +export function epochNow(): number { + return Math.max(0, Math.floor(Date.now() - EPOCH_BASE_SECONDS * 1000)); +} + +/** + * ANNOUNCE_BROADCAST: sent by the publisher to advertise (or retract) a broadcast. + * + * Carries the broadcast path suffix, its instance {@link epoch} (lite-05+), and the + * hop chain. Renamed from `Announce` in lite-05. + */ +export class AnnounceBroadcast { suffix: Path.Valid; active: boolean; + /** + * Broadcast instance epoch: milliseconds since 2020-01-01 UTC (see {@link epochNow}). + * Only carried on the wire for lite-05+; `0` on older versions. + */ + epoch: number; hops: Origin[]; - constructor(props: { suffix: Path.Valid; active: boolean; hops?: Origin[] }) { + constructor(props: { suffix: Path.Valid; active: boolean; epoch?: number; hops?: Origin[] }) { this.suffix = props.suffix; this.active = props.active; + this.epoch = props.epoch ?? 0; this.hops = props.hops ?? []; if (this.hops.length > MAX_HOPS) { throw new Error(`hop count ${this.hops.length} exceeds maximum ${MAX_HOPS}`); @@ -27,6 +55,11 @@ export class Announce { await w.bool(this.active); await w.string(this.suffix); + // Lite05+: the epoch varint sits after the suffix and before the hop chain. + if (hasBroadcastEpoch(version)) { + await w.u53(this.epoch); + } + switch (version) { case Version.DRAFT_01: case Version.DRAFT_02: @@ -49,10 +82,13 @@ export class Announce { } } - static async #decode(r: Reader, version: Version): Promise { + static async #decode(r: Reader, version: Version): Promise { const active = await r.bool(); const suffix = Path.from(await r.string()); + // Lite05+ carries the epoch after the suffix; older versions default it to 0. + const epoch = hasBroadcastEpoch(version) ? await r.u53() : 0; + let hops: Origin[] = []; switch (version) { case Version.DRAFT_01: @@ -80,23 +116,27 @@ export class Announce { } } - return new Announce({ suffix, active, hops }); + return new AnnounceBroadcast({ suffix, active, epoch, hops }); } async encode(w: Writer, version: Version): Promise { return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader, version: Version): Promise { - return Message.decode(r, (r) => Announce.#decode(r, version)); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => AnnounceBroadcast.#decode(r, version)); } - static async decodeMaybe(r: Reader, version: Version): Promise { - return Message.decodeMaybe(r, (r) => Announce.#decode(r, version)); + static async decodeMaybe(r: Reader, version: Version): Promise { + return Message.decodeMaybe(r, (r) => AnnounceBroadcast.#decode(r, version)); } } -export class AnnounceInterest { +/** + * ANNOUNCE_REQUEST: sent by the subscriber to request ANNOUNCE_BROADCAST messages + * for a path prefix. Renamed from `AnnounceInterest` in lite-05. + */ +export class AnnounceRequest { prefix: Path.Valid; // Hop ID of the peer asking for announces. Zero means "no exclusion". // Must be a bigint: peer origins are up to 64 bits and overflow u53. @@ -125,7 +165,7 @@ export class AnnounceInterest { } } - static async #decode(r: Reader, version: Version): Promise { + static async #decode(r: Reader, version: Version): Promise { const prefix = Path.from(await r.string()); let excludeHop = 0n; switch (version) { @@ -137,15 +177,15 @@ export class AnnounceInterest { excludeHop = hopsFixedWidth(version) ? await r.u64() : await r.u62(); break; } - return new AnnounceInterest(prefix, excludeHop); + return new AnnounceRequest(prefix, excludeHop); } async encode(w: Writer, version: Version): Promise { return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader, version: Version): Promise { - return Message.decode(r, (r) => AnnounceInterest.#decode(r, version)); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => AnnounceRequest.#decode(r, version)); } } diff --git a/js/net/src/lite/connection.ts b/js/net/src/lite/connection.ts index 8399b9fe0..f24a41de0 100644 --- a/js/net/src/lite/connection.ts +++ b/js/net/src/lite/connection.ts @@ -6,7 +6,7 @@ import type { Established } from "../connection/established.ts"; import * as Path from "../path.ts"; import { type Reader, Readers, Stream } from "../stream.ts"; import type * as Time from "../time.ts"; -import { AnnounceInterest } from "./announce.ts"; +import { AnnounceRequest } from "./announce.ts"; import { Goaway } from "./goaway.ts"; import { Group } from "./group.ts"; import { type Origin, randomOrigin } from "./origin.ts"; @@ -180,7 +180,7 @@ export class Connection implements Established { if (typ === StreamId.Session) { throw new Error("duplicate session stream"); } else if (typ === StreamId.Announce) { - const msg = await AnnounceInterest.decode(stream.reader, this.#version); + const msg = await AnnounceRequest.decode(stream.reader, this.#version); await this.#publisher.runAnnounce(msg, stream); } else if (typ === StreamId.Subscribe) { const msg = await Subscribe.decode(stream.reader, this.#version); diff --git a/js/net/src/lite/publisher.ts b/js/net/src/lite/publisher.ts index 62741c844..f7c4690c8 100644 --- a/js/net/src/lite/publisher.ts +++ b/js/net/src/lite/publisher.ts @@ -6,7 +6,7 @@ import * as Path from "../path.ts"; import { type Stream, Writer } from "../stream.ts"; import type { TrackSubscriber } from "../track.ts"; import { error } from "../util/error.ts"; -import { Announce, AnnounceInit, type AnnounceInterest, AnnounceOk } from "./announce.ts"; +import { AnnounceBroadcast, AnnounceInit, AnnounceOk, type AnnounceRequest, epochNow } from "./announce.ts"; import { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; @@ -60,6 +60,13 @@ export class Publisher { // It's a signal so we can live update any announce streams. #broadcasts = new Signal | undefined>(new Map()); + // Per-broadcast epoch (ms since 2020-01-01 UTC), stamped when the instance is + // published. Mirrors the Rust `BroadcastInfo.epoch` (SystemTime::now at creation): + // a newer instance of the same path carries a later epoch, so a consumer can prefer + // the newest route. Sent in every ANNOUNCE_BROADCAST (lite-05+), including the + // unannounce, so the peer can match the instance that ended. + #epochs = new Map(); + // TRACK_INFO is immutable per track, so resolve it from the application once // (via a throwaway subscribe whose info() resolves when the app calls accept) // and reuse it for every later TRACK request of the same track. Keyed by @@ -85,12 +92,17 @@ export class Publisher { * @param name - The broadcast to publish */ publish(path: Path.Valid, broadcast: Broadcast) { + // Stamp the instance epoch at publish time (mirrors Rust's SystemTime::now default). + this.#epochs.set(path, epochNow()); this.#broadcasts.mutate((broadcasts) => { if (!broadcasts) throw new Error("closed"); broadcasts.set(path, broadcast); }); - // Remove the broadcast from the lookup when it's closed. + // Remove the broadcast from the lookup when it's closed. Keep the epoch around: + // the unannounce announce (sent from the per-stream diff loop after the map change) + // still needs it to identify the instance that ended. It's overwritten on the next + // publish to the same path, so it can't go stale. void broadcast.closed.finally(() => { this.#broadcasts.mutate((broadcasts) => { broadcasts?.delete(path); @@ -98,6 +110,11 @@ export class Publisher { }); } + // The epoch of the broadcast published at `path`, or 0 if it was never seen. + #epoch(path: Path.Valid): number { + return this.#epochs.get(path) ?? 0; + } + /** * Handles an announce interest message. * @param msg - The announce interest message @@ -105,7 +122,7 @@ export class Publisher { * * @internal */ - async runAnnounce(msg: AnnounceInterest, stream: Stream) { + async runAnnounce(msg: AnnounceRequest, stream: Stream) { console.debug(`announce: prefix=${msg.prefix}`); // Send initial announcements @@ -134,7 +151,8 @@ export class Publisher { const ok = new AnnounceOk(this.origin, active.size); await ok.encode(stream.writer, this.version); for (const suffix of active) { - const wire = new Announce({ suffix, active: true }); + const epoch = this.#epoch(Path.join(msg.prefix, suffix)); + const wire = new AnnounceBroadcast({ suffix, active: true, epoch }); await wire.encode(stream.writer, this.version); } break; @@ -142,7 +160,7 @@ export class Publisher { default: // Draft03/04: send individual Announce messages, stamping our origin as a hop. for (const suffix of active) { - const wire = new Announce({ suffix, active: true, hops: [this.origin] }); + const wire = new AnnounceBroadcast({ suffix, active: true, hops: [this.origin] }); await wire.encode(stream.writer, this.version); } break; @@ -175,15 +193,18 @@ export class Publisher { for (const added of newActive.difference(active)) { console.debug(`announce: broadcast=${added} active=true`); const hops = this.version === Version.DRAFT_05_WIP ? [] : [this.origin]; - const wire = new Announce({ suffix: added, active: true, hops }); + const epoch = this.#epoch(Path.join(msg.prefix, added)); + const wire = new AnnounceBroadcast({ suffix: added, active: true, epoch, hops }); await wire.encode(stream.writer, this.version); } // Announce any removed broadcasts. // Ended announces don't need hops — the peer matches on path only. + // They carry the epoch of the instance that ended so the peer can match it. for (const removed of active.difference(newActive)) { console.debug(`announce: broadcast=${removed} active=false`); - const wire = new Announce({ suffix: removed, active: false }); + const epoch = this.#epoch(Path.join(msg.prefix, removed)); + const wire = new AnnounceBroadcast({ suffix: removed, active: false, epoch }); await wire.encode(stream.writer, this.version); } @@ -348,10 +369,11 @@ export class Publisher { if (!published) throw new Error("not found"); const info = await published.track(track).info(); + // The wire no longer carries a cache hint (retention is best-effort, not a + // guarantee); the local `info.cache` stays a purely local retention window. return new TrackInfoMessage({ priority: info.priority, ordered: info.ordered, - cache: info.cache, // This implementation doesn't produce per-frame timestamps yet. timescale: 0, compression: info.compress ? Compression.Deflate : Compression.None, diff --git a/js/net/src/lite/stream.ts b/js/net/src/lite/stream.ts index ad2418b3d..fa4749501 100644 --- a/js/net/src/lite/stream.ts +++ b/js/net/src/lite/stream.ts @@ -1,11 +1,11 @@ -import type { AnnounceInterest } from "./announce.ts"; +import type { AnnounceRequest } from "./announce.ts"; import type { Goaway } from "./goaway.ts"; import type { Group } from "./group.ts"; import type { SessionClient } from "./session.ts"; import type { Subscribe } from "./subscribe.ts"; import type { Track } from "./track.ts"; -export type StreamBi = SessionClient | AnnounceInterest | Subscribe | Track | Goaway; +export type StreamBi = SessionClient | AnnounceRequest | Subscribe | Track | Goaway; export type StreamUni = Group; export const StreamId = { diff --git a/js/net/src/lite/subscriber.ts b/js/net/src/lite/subscriber.ts index eb2effa20..cd1bbe286 100644 --- a/js/net/src/lite/subscriber.ts +++ b/js/net/src/lite/subscriber.ts @@ -7,10 +7,10 @@ import { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Reader, Stream } from "../stream.ts"; import * as Time from "../time.ts"; -import type { TrackProducer } from "../track.ts"; +import { DEFAULT_CACHE_MS, type TrackProducer } from "../track.ts"; import { error } from "../util/error.ts"; import { withTimeout } from "../util/timeout.ts"; -import { Announce, AnnounceInit, AnnounceInterest, AnnounceOk } from "./announce.ts"; +import { AnnounceBroadcast, AnnounceInit, AnnounceOk, AnnounceRequest } from "./announce.ts"; import type { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; @@ -133,7 +133,7 @@ export class Subscriber { // Send our own session-level origin id so the peer can skip announces // whose hop chain already passed through us. Matches the Rust subscriber's // `exclude_hop: self.self_origin.id` in `run_announce_prefix`. - const msg = new AnnounceInterest(prefix, this.origin); + const msg = new AnnounceRequest(prefix, this.origin); try { // Open a stream and send the announce interest. @@ -172,7 +172,7 @@ export class Subscriber { // Receive announce updates (for Draft03, this includes initial state) for (;;) { const announce = await Promise.race([ - Announce.decodeMaybe(stream.reader, this.version), + AnnounceBroadcast.decodeMaybe(stream.reader, this.version), announced.closed, ]); if (!announce) break; @@ -189,7 +189,11 @@ export class Subscriber { const path = Path.join(prefix, announce.suffix); - console.debug(`announced: broadcast=${path} active=${announce.active}`); + // `announce.epoch` (lite-05+) identifies the broadcast instance. The Rust + // origin uses it to prefer the newest of several routes to the same path; the + // JS side has no multi-route origin tree (the announced queue is flat), so + // there is nothing to tie-break here and the epoch is currently unused. + console.debug(`announced: broadcast=${path} active=${announce.active} epoch=${announce.epoch}`); announced.append({ path, active: announce.active }); } @@ -217,7 +221,9 @@ export class Subscriber { const info = await this.#trackInfo(path, name); return { compress: info.compression !== Compression.None, - cache: info.cache, + // The wire no longer carries a cache hint (retention is best-effort), + // so the local retention window falls back to the model default. + cache: DEFAULT_CACHE_MS, priority: info.priority, ordered: info.ordered, }; @@ -334,7 +340,9 @@ export class Subscriber { const info = await this.#trackInfo(msg.broadcast, msg.track); producer = request.accept({ compress: info.compression !== Compression.None, - cache: info.cache, + // The wire no longer carries a cache hint (retention is best-effort), + // so the local retention window falls back to the model default. + cache: DEFAULT_CACHE_MS, priority: info.priority, ordered: info.ordered, }); diff --git a/js/net/src/lite/track.test.ts b/js/net/src/lite/track.test.ts index 39617e97a..14e4deeb1 100644 --- a/js/net/src/lite/track.test.ts +++ b/js/net/src/lite/track.test.ts @@ -31,7 +31,6 @@ test("TrackInfo round-trips on draft-05", async () => { const info = new TrackInfo({ priority: 7, ordered: false, - cache: 10000, timescale: 90000, compression: Compression.Deflate, }); @@ -39,7 +38,6 @@ test("TrackInfo round-trips on draft-05", async () => { const got = await TrackInfo.decode(reader, Version.DRAFT_05_WIP); expect(got.priority).toBe(7); expect(got.ordered).toBe(false); - expect(got.cache).toBe(10000); expect(got.timescale).toBe(90000); expect(got.compression).toBe(Compression.Deflate); }); diff --git a/js/net/src/lite/track.ts b/js/net/src/lite/track.ts index c5730c5c1..69be5c7d1 100644 --- a/js/net/src/lite/track.ts +++ b/js/net/src/lite/track.ts @@ -1,7 +1,6 @@ import { Compression, compressionFromCode } from "../compression.ts"; import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; -import { DEFAULT_CACHE_MS } from "../track.ts"; import * as Message from "./message.ts"; import { Version } from "./version.ts"; @@ -61,8 +60,6 @@ export class Track { export class TrackInfo { priority: number; ordered: boolean; - /** How long (milliseconds) the publisher keeps old groups available. */ - cache: number; /** * Per-frame timestamp scale (units per second). `0` means frames carry no * per-frame timestamps on the wire. @@ -74,19 +71,16 @@ export class TrackInfo { constructor({ priority = 0, ordered = true, - cache = DEFAULT_CACHE_MS, timescale = 0, compression = Compression.None, }: { priority?: number; ordered?: boolean; - cache?: number; timescale?: number; compression?: Compression; }) { this.priority = priority; this.ordered = ordered; - this.cache = cache; this.timescale = timescale; this.compression = compression; } @@ -94,7 +88,6 @@ export class TrackInfo { async #encode(w: Writer) { await w.u8(this.priority); await w.bool(this.ordered); - await w.u53(this.cache); await w.u53(this.timescale); await w.u53(this.compression); } @@ -102,10 +95,9 @@ export class TrackInfo { static async #decode(r: Reader): Promise { const priority = await r.u8(); const ordered = await r.bool(); - const cache = await r.u53(); const timescale = await r.u53(); const compression = compressionFromCode(await r.u53()); - return new TrackInfo({ priority, ordered, cache, timescale, compression }); + return new TrackInfo({ priority, ordered, timescale, compression }); } async encode(w: Writer, version: Version): Promise { diff --git a/js/net/src/lite/version.ts b/js/net/src/lite/version.ts index ce2e49a05..69a32239a 100644 --- a/js/net/src/lite/version.ts +++ b/js/net/src/lite/version.ts @@ -28,6 +28,22 @@ export function hopsFixedWidth(version: Version): boolean { } } +/// Whether ANNOUNCE_BROADCAST carries a per-broadcast Epoch varint (after the suffix, +/// before the hop chain). Added in lite-05 so a consumer can tell a newer instance of a +/// broadcast from an older one. Older versions omit the field. +export function hasBroadcastEpoch(version: Version): boolean { + // Explicitly list older versions so future versions default to carrying the epoch. + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + case Version.DRAFT_03: + case Version.DRAFT_04: + return false; + default: + return true; + } +} + /// The WebTransport subprotocol identifier for moq-lite. /// Version negotiation still happens via SETUP when this is used. export const ALPN = "moql"; diff --git a/rs/moq-net/src/client.rs b/rs/moq-net/src/client.rs index 146cfbe9d..00e6e2396 100644 --- a/rs/moq-net/src/client.rs +++ b/rs/moq-net/src/client.rs @@ -12,6 +12,7 @@ pub struct Client { subscribe: Option, stats: StatsHandle, versions: Versions, + setup_path: Option, } impl Client { @@ -67,6 +68,17 @@ impl Client { self } + /// Set the request path to advertise in the lite-05 SETUP message. + /// + /// Required on transports that carry no request URI (native QUIC, qmux over + /// TCP/TLS) so the server learns which path the client wants; omit it on bindings + /// that already carry a URI (WebTransport). Ignored by the IETF transport and by + /// pre-lite-05 versions, which have no SETUP message. + pub fn with_path(mut self, path: impl Into) -> Self { + self.setup_path = Some(path.into()); + self + } + /// Perform the MoQ handshake as a client negotiating the version. pub async fn connect(&self, session: S) -> Result { if self.publish.is_none() && self.subscribe.is_none() { @@ -144,6 +156,13 @@ impl Client { .select(Version::Lite(lite::Version::Lite05Wip)) .ok_or(Error::Version)?; + // Advertise our capabilities (we report send bitrate; we don't pad) plus + // the request path on URI-less transports. + let our_setup = lite::Setup { + probe: lite::ProbeLevel::Report, + path: self.setup_path.clone(), + }; + let (recv_bw, connecting) = lite::start( session.clone(), None, @@ -151,6 +170,7 @@ impl Client { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite05Wip, + our_setup, )?; // Block until the initial announce set has landed (Lite05 reports it @@ -172,6 +192,7 @@ impl Client { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite04, + lite::Setup::default(), )?; // Lite04 has no initial-set boundary, so this resolves immediately. @@ -192,6 +213,7 @@ impl Client { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite03, + lite::Setup::default(), )?; // Lite03 has no initial-set boundary, so this resolves immediately. @@ -241,6 +263,9 @@ impl Client { self.subscribe.clone(), self.stats.clone(), v, + // This path only handles versions negotiated via the bidi SETUP exchange + // (pre-lite-05), which have no Setup Stream. + lite::Setup::default(), )?; // Block until the initial announce set has landed (for versions that diff --git a/rs/moq-net/src/lite/mod.rs b/rs/moq-net/src/lite/mod.rs index 4df1bb449..78dc16177 100644 --- a/rs/moq-net/src/lite/mod.rs +++ b/rs/moq-net/src/lite/mod.rs @@ -16,6 +16,7 @@ mod priority; mod probe; mod publisher; mod session; +mod setup; mod stream; mod subscribe; mod subscriber; @@ -35,6 +36,8 @@ pub use parameters::*; pub use probe::*; use publisher::*; pub(super) use session::*; +#[allow(unused_imports)] +pub use setup::*; pub use stream::*; pub use subscribe::*; use subscriber::*; diff --git a/rs/moq-net/src/lite/parameters.rs b/rs/moq-net/src/lite/parameters.rs index 1236da79d..f175beb00 100644 --- a/rs/moq-net/src/lite/parameters.rs +++ b/rs/moq-net/src/lite/parameters.rs @@ -6,9 +6,44 @@ use super::Version; const MAX_PARAMS: u64 = 64; +/// A bag of `id -> raw bytes` parameters, the body shared by SETUP (and any other +/// parameterized message). Encoded as a varint count followed by `id, length, value` +/// triples; duplicate ids are rejected on decode. #[derive(Default, Debug, Clone)] pub struct Parameters(HashMap>); +impl Parameters { + /// Set a parameter to a raw byte value, replacing any existing entry. + pub fn set_bytes(&mut self, id: u64, value: Vec) { + self.0.insert(id, value); + } + + /// Borrow a parameter's raw byte value, if present. + pub fn get_bytes(&self, id: u64) -> Option<&[u8]> { + self.0.get(&id).map(Vec::as_slice) + } + + /// Set a parameter to a varint value, replacing any existing entry. + pub fn set_varint(&mut self, id: u64, value: u64) { + let mut buf = Vec::new(); + // Infallible: writing into a Vec never runs short. + value.encode(&mut buf, Version::Lite05Wip).expect("varint encode into Vec"); + self.0.insert(id, buf); + } + + /// Decode a parameter as a single varint, if present. Errors if trailing bytes remain. + pub fn get_varint(&self, id: u64) -> Result, DecodeError> { + let Some(mut bytes) = self.0.get(&id).map(Vec::as_slice) else { + return Ok(None); + }; + let value = u64::decode(&mut bytes, Version::Lite05Wip)?; + if !bytes.is_empty() { + return Err(DecodeError::Long); + } + Ok(Some(value)) + } +} + impl Decode for Parameters { fn decode(mut r: &mut R, version: Version) -> Result { let mut map = HashMap::new(); diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index 21e749c22..fda4bab4c 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -1,9 +1,10 @@ use crate::{ - BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, StatsHandle, coding::Stream, + BandwidthConsumer, BandwidthProducer, Error, Origin, OriginConsumer, OriginProducer, StatsHandle, + coding::{Stream, Writer}, lite::SessionInfo, }; -use super::{Connecting, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; +use super::{Connecting, PeerSetup, Publisher, PublisherConfig, Setup, Subscriber, SubscriberConfig, Version}; /// Start a lite session. /// @@ -24,6 +25,9 @@ pub fn start( stats: StatsHandle, // The version of the protocol to use. version: Version, + // The capabilities (and optional request path) we advertise in our SETUP message. + // Only sent on versions with a Setup Stream (lite-05+); ignored otherwise. + our_setup: Setup, ) -> Result<(Option, Connecting), Error> { let recv_bw = BandwidthProducer::new(); @@ -63,6 +67,21 @@ pub fn start( // stamped onto outbound hops and checked against incoming hops, so it // must be stable across every session that shares the local origin. // Required for cross-session cluster loop detection. + // Shared slot for the peer's SETUP (lite-05+). The subscriber writes it when it + // reads the peer's Setup stream; capability-gated streams (PROBE) wait on it. + let peer_setup = PeerSetup::default(); + + // Advertise our own capabilities on a uni Setup Stream, then FIN. Best-effort: + // a failure here just means the peer falls back to "no capabilities" for us. + if version.has_setup_stream() { + let session = session.clone(); + web_async::spawn(async move { + if let Err(err) = send_setup(&session, our_setup, version).await { + tracing::debug!(%err, "failed to send setup"); + } + }); + } + let publisher = Publisher::new(PublisherConfig { session: session.clone(), origin: publish, @@ -75,6 +94,7 @@ pub fn start( recv_bandwidth: recv_bw_for_sub, stats, version, + peer_setup, }); web_async::spawn(async move { @@ -103,6 +123,20 @@ pub fn start( Ok((recv_bw_consumer, connecting)) } +/// Open a unidirectional Setup Stream, send our single SETUP message, and FIN. +async fn send_setup( + session: &S, + setup: Setup, + version: Version, +) -> Result<(), Error> { + let stream = session.open_uni().await.map_err(Error::from_transport)?; + let mut writer = Writer::new(stream, version); + writer.encode(&super::DataType::Setup).await?; + writer.encode(&setup).await?; + writer.finish()?; + writer.closed().await +} + // TODO do something useful with this async fn run_session(stream: Option>) -> Result<(), Error> { if let Some(mut stream) = stream { diff --git a/rs/moq-net/src/lite/setup.rs b/rs/moq-net/src/lite/setup.rs new file mode 100644 index 000000000..d12ab7d86 --- /dev/null +++ b/rs/moq-net/src/lite/setup.rs @@ -0,0 +1,203 @@ +//! The lite-05 SETUP message: each endpoint advertises its capabilities once, as +//! the sole message on a unidirectional Setup Stream, then closes it. + +use crate::coding::*; + +use super::{Message, Parameters, Version}; + +/// Setup Parameter id for the Probe capability level. +const PARAM_PROBE: u64 = 0x1; +/// Setup Parameter id for the request Path (client-only, URI-less transports). +const PARAM_PATH: u64 = 0x2; + +/// The probe capability an endpoint advertises in SETUP. +/// +/// Monotonic: a higher level implies every lower one. An unknown (future) value +/// decodes as the highest level we understand, so a peer that gains a new level is +/// treated as at least [`Increase`](Self::Increase). +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] +pub enum ProbeLevel { + /// No probing. Equivalent to omitting the parameter. + #[default] + None, + /// The publisher can measure and periodically report its estimated bitrate. + Report, + /// The publisher can additionally pad the connection (or send redundant data). + Increase, +} + +impl ProbeLevel { + /// Map the wire value to a level, saturating unknown values to [`Increase`](Self::Increase). + fn from_code(code: u64) -> Self { + match code { + 0 => Self::None, + 1 => Self::Report, + _ => Self::Increase, + } + } + + /// The wire value for this level. + fn to_code(self) -> u64 { + match self { + Self::None => 0, + Self::Report => 1, + Self::Increase => 2, + } + } +} + +/// The SETUP message, sent once per endpoint on the unidirectional Setup Stream. +/// +/// lite-05+ only. The two endpoints' SETUP messages are independent: neither side +/// blocks on the peer's before opening other streams, but a stream whose encoding +/// depends on a negotiated capability (e.g. PROBE) must wait for it. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Setup { + /// The probe capability this endpoint supports. [`ProbeLevel::None`] when absent. + pub probe: ProbeLevel, + /// The request path, for transports that carry no request URI (native QUIC, + /// qmux over TCP/TLS). Sent only by the client; a server never sends one and a + /// relay never forwards it. `None` on URI-carrying bindings. + pub path: Option, +} + +impl Message for Setup { + fn decode_msg(r: &mut R, version: Version) -> Result { + if !version.has_setup_stream() { + return Err(DecodeError::Version); + } + + let params = Parameters::decode(r, version)?; + let probe = params.get_varint(PARAM_PROBE)?.map(ProbeLevel::from_code).unwrap_or_default(); + let path = match params.get_bytes(PARAM_PATH) { + Some(bytes) => { + let s = std::str::from_utf8(bytes).map_err(|_| DecodeError::InvalidValue)?; + if s.is_empty() { + return Err(DecodeError::InvalidValue); + } + Some(s.to_string()) + } + None => None, + }; + + Ok(Self { probe, path }) + } + + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + if !version.has_setup_stream() { + return Err(EncodeError::Version); + } + + let mut params = Parameters::default(); + // None is the wire default, so omit it to keep the message empty when nothing is set. + if self.probe != ProbeLevel::None { + params.set_varint(PARAM_PROBE, self.probe.to_code()); + } + if let Some(path) = &self.path { + params.set_bytes(PARAM_PATH, path.as_bytes().to_vec()); + } + + params.encode(w, version) + } +} + +/// Shared slot for the peer's SETUP, written once when its Setup stream is read. +/// +/// Streams whose encoding depends on a negotiated capability (e.g. the PROBE +/// stream) wait on this before deciding what to do. Cheap to clone: every handle +/// shares the same watch channel. +#[derive(Clone)] +pub(crate) struct PeerSetup(tokio::sync::watch::Sender>); + +impl Default for PeerSetup { + fn default() -> Self { + Self(tokio::sync::watch::channel(None).0) + } +} + +impl PeerSetup { + /// Record the peer's SETUP. + pub fn set(&self, setup: Setup) { + // Ignored if every receiver has dropped; nothing is waiting on it then. + let _ = self.0.send(Some(setup)); + } + + /// Await the peer's advertised probe level, blocking until its SETUP arrives. + /// + /// The peer MUST send exactly one SETUP, so this resolves once that stream is read. + pub async fn probe_level(&self) -> ProbeLevel { + let mut rx = self.0.subscribe(); + loop { + // Clone out of the borrow before awaiting so no guard crosses the await point. + if let Some(setup) = rx.borrow_and_update().clone() { + return setup.probe; + } + if rx.changed().await.is_err() { + // Sender dropped before sending: treat as no probe support. + return ProbeLevel::default(); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn round_trip(msg: &Setup) -> Setup { + let mut buf = bytes::BytesMut::new(); + msg.encode(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = &buf[..]; + let got = Setup::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert!(bytes::Buf::remaining(&slice) == 0, "trailing bytes after decode"); + got + } + + #[test] + fn empty_round_trip() { + let msg = Setup::default(); + assert_eq!(round_trip(&msg), msg); + } + + #[test] + fn probe_levels_round_trip() { + for probe in [ProbeLevel::None, ProbeLevel::Report, ProbeLevel::Increase] { + let msg = Setup { probe, path: None }; + assert_eq!(round_trip(&msg), msg); + } + } + + #[test] + fn path_round_trip() { + let msg = Setup { + probe: ProbeLevel::Report, + path: Some("/room/123".to_string()), + }; + assert_eq!(round_trip(&msg), msg); + } + + #[test] + fn unknown_probe_level_saturates_to_increase() { + // Frame a SETUP message carrying an unknown probe level (99) by hand: the + // parameters body, prefixed with its length (the lite Message size prefix). + let mut params = Parameters::default(); + params.set_varint(PARAM_PROBE, 99); + let mut body = Vec::new(); + params.encode(&mut body, Version::Lite05Wip).unwrap(); + + let mut buf = bytes::BytesMut::new(); + body.len().encode(&mut buf, Version::Lite05Wip).unwrap(); + buf.extend_from_slice(&body); + + let mut slice = &buf[..]; + let got = Setup::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert_eq!(got.probe, ProbeLevel::Increase); + } + + #[test] + fn rejects_before_lite05() { + let msg = Setup::default(); + let mut buf = bytes::BytesMut::new(); + assert!(matches!(msg.encode(&mut buf, Version::Lite04), Err(EncodeError::Version))); + } +} diff --git a/rs/moq-net/src/lite/stream.rs b/rs/moq-net/src/lite/stream.rs index 394005721..519ea3ef9 100644 --- a/rs/moq-net/src/lite/stream.rs +++ b/rs/moq-net/src/lite/stream.rs @@ -34,7 +34,10 @@ impl Encode for ControlType { #[derive(Debug, PartialEq, Clone, Copy, IntoPrimitive, TryFromPrimitive)] #[repr(u64)] pub enum DataType { + /// A group of frames (the only data stream on every version). Group = 0, + /// The lite-05+ SETUP stream: one SETUP message, then FIN. + Setup = 1, } impl Decode for DataType { diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 18f5c9ad9..c4e229f98 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -39,6 +39,9 @@ pub(super) struct SubscriberConfig { /// to opt out. pub stats: StatsHandle, pub version: Version, + /// Shared slot for the peer's SETUP (lite-05+). Written when the peer's Setup + /// stream is read; the probe stream waits on it before opening. + pub peer_setup: super::PeerSetup, } #[derive(Clone)] @@ -66,6 +69,8 @@ pub(super) struct Subscriber { subscribes: Lock>, next_id: Arc, version: Version, + /// The peer's advertised SETUP (lite-05+), set when its Setup stream is read. + peer_setup: super::PeerSetup, } #[derive(Clone)] @@ -97,6 +102,7 @@ impl Subscriber { subscribes: Default::default(), next_id: Default::default(), version: config.version, + peer_setup: config.peer_setup, } } @@ -134,6 +140,7 @@ impl Subscriber { let res = match kind { lite::DataType::Group => self.recv_group(&mut stream).await, + lite::DataType::Setup => self.recv_setup(&mut stream).await, }; if let Err(err) = res { @@ -143,6 +150,18 @@ impl Subscriber { Ok(()) } + /// Read the peer's single SETUP message off its Setup Stream and record it, so + /// capability-gated streams (PROBE) can consult it. lite-05+ only. + async fn recv_setup(&self, stream: &mut Reader) -> Result<(), Error> { + if !self.version.has_setup_stream() { + return Err(Error::UnexpectedStream); + } + let setup = stream.decode::().await?; + tracing::debug!(?setup, "received peer setup"); + self.peer_setup.set(setup); + Ok(()) + } + async fn run_announce(self, connecting: Option) -> Result<(), Error> { let prefixes: Vec = self.origin.allowed().map(|p| p.to_owned()).collect(); @@ -310,6 +329,14 @@ impl Subscriber { return Ok(()); }; + // lite-05+ negotiates probing: only open a PROBE stream if the peer advertised it + // (Report or higher) in its SETUP. Older versions have no SETUP, so probe is always + // available there. + if self.version.has_setup_stream() && self.peer_setup.probe_level().await < lite::ProbeLevel::Report { + tracing::debug!("peer does not support probing; skipping probe stream"); + return Ok(()); + } + loop { // Wait until at least one consumer is interested in the estimate. if bandwidth.used().await.is_err() { diff --git a/rs/moq-net/src/lite/version.rs b/rs/moq-net/src/lite/version.rs index fb58361aa..627bc7913 100644 --- a/rs/moq-net/src/lite/version.rs +++ b/rs/moq-net/src/lite/version.rs @@ -55,6 +55,18 @@ impl Version { _ => true, } } + + /// Whether the session opens a unidirectional Setup Stream carrying a single SETUP + /// message (capabilities + optional Path). Added in lite-05; the older bidirectional + /// setup exchange (Lite01/02) and the no-setup drafts (Lite03/04) don't use it. + #[allow(clippy::match_like_matches_macro)] + pub fn has_setup_stream(self) -> bool { + // Match form so future versions default forward (CLAUDE.md convention). + match self { + Self::Lite01 | Self::Lite02 | Self::Lite03 | Self::Lite04 => false, + _ => true, + } + } } impl fmt::Display for Version { diff --git a/rs/moq-net/src/server.rs b/rs/moq-net/src/server.rs index 6de97f763..41f4c10fd 100644 --- a/rs/moq-net/src/server.rs +++ b/rs/moq-net/src/server.rs @@ -136,6 +136,12 @@ impl Server { .select(Version::Lite(lite::Version::Lite05Wip)) .ok_or(Error::Version)?; + // We report send bitrate; a server never advertises a request Path. + let our_setup = lite::Setup { + probe: lite::ProbeLevel::Report, + path: None, + }; + // Server side never blocks on the initial set; discard the synced receiver. let (recv_bw, _connecting) = lite::start( session.clone(), @@ -144,6 +150,7 @@ impl Server { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite05Wip, + our_setup, )?; return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw)); @@ -160,6 +167,7 @@ impl Server { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite04, + lite::Setup::default(), )?; return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw)); @@ -177,6 +185,7 @@ impl Server { self.subscribe.clone(), self.stats.clone(), lite::Version::Lite03, + lite::Setup::default(), )?; return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw)); @@ -227,6 +236,9 @@ impl Server { self.subscribe.clone(), self.stats.clone(), v, + // This path only handles versions negotiated via the bidi SETUP exchange + // (pre-lite-05), which have no Setup Stream. + lite::Setup::default(), )?; recv_bw } From 905e659f49433aad4bb182646b653939c3b7a421 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 01:54:00 +0000 Subject: [PATCH 3/5] docs(concept): update moq-lite for lite-05 setup, epoch, and best-effort cache Aligns doc/concept/layer/moq-lite.md with the drafts #26/#27/#34/#35 sync: - SETUP now advertises per-endpoint capabilities (e.g. bitrate probing) and, on URI-less transports, the request path. The two SETUPs are independent. - Broadcasts carry an epoch so duplicate/republished advertisements converge on the newest instance. - The per-track cache is described as a best-effort local retention window, no longer announced on the wire (it previously referenced the removed SUBSCRIBE_OK field). Also drops the stale per-hop wording in line with drafts #28. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_0118VcLtfuukFERQ8ito5dUG --- doc/concept/layer/moq-lite.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/doc/concept/layer/moq-lite.md b/doc/concept/layer/moq-lite.md index ca4a73152..4003cc225 100644 --- a/doc/concept/layer/moq-lite.md +++ b/doc/concept/layer/moq-lite.md @@ -51,7 +51,9 @@ Here's a list of currently supported ALPNs: See the Compatibility section below for more details about `moq-transport` support. Once the QUIC or WebTransport connection is established, there is a minimal MoQ handshake. -The `SETUP` message is primarily used to negotiate extensions, then you're off to the races! +Each endpoint sends a single `SETUP` message advertising its capabilities (for example whether it can probe the available bitrate), then you're off to the races. +The two `SETUP` messages are independent, so neither side waits for the other before getting started. +Transports that don't carry a request URI (native QUIC, or qmux over TCP/TLS) also use `SETUP` to carry the path the client wants to reach. ### Announcements @@ -67,6 +69,9 @@ The [moq-relay clustering](/bin/relay/cluster) feature actually uses this to dis The peer first replies with the set of broadcasts that are currently live, then streams updates as they change. This initial set is a discrete batch: the latest draft reports how many entries to expect up front, so a freshly connected session can wait until that snapshot has fully arrived before listing what's available, rather than racing the gossip. +Each broadcast also carries an **epoch** identifying its instance. +When the same broadcast is announced over multiple routes (or republished after going away), the epoch lets everyone converge on the newest instance instead of picking arbitrarily. + ### Subscriptions All data transfers are initiated by subscriptions. @@ -102,8 +107,8 @@ Each Subscription consists of a few properties: - **Group Order**: The order in which groups are delivered. Defaults to descending; higher IDs are delivered first. - **Group Timeout**: The maximum duration to keep old groups in cache/transit. Defaults to 30 seconds. -The publisher also caps how long it retains old groups via a per-track **cache** age, announced in `SUBSCRIBE_OK` so relays re-serve with the same window. -A subscriber's Group Timeout can only be smaller than this cache age, since a group can't be waited for longer than it's kept around. +The publisher also keeps old groups around for a best-effort **cache** window so relays and late subscribers can still fetch them. +This is a local hint rather than a guarantee carried on the wire, and a subscriber's Group Timeout is bounded by it: a group can't be waited for longer than it's actually kept around. By utilizing these properties, you can choose how your application behaves during congestion. For example, consider a conference room with Alice and Bob: From 1cc9c7e6c5b6ffb2bff1680b3b783b504982cf37 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 01:58:31 +0000 Subject: [PATCH 4/5] feat(@moq/net): mirror lite-05 SETUP stream (drafts #26/#27) Browser-side counterpart to the Rust lite-05 SETUP work, required for browser<->relay interop now that each endpoint exchanges a SETUP on a uni stream (DataType.Setup = 1): - New lite/setup.ts: ProbeLevel (None/Report/Increase, unknown saturates to Increase) and a size-prefixed Setup message over a private Parameters bag (varint count + id/length/value triples), gated to DRAFT_05_WIP+. - On lite-05 session start the connection sends our SETUP (probe = Report; path undefined, since WebTransport carries the URI), decodes the peer's SETUP in the uni dispatch, and threads it to the Subscriber. - runProbe() now waits for and gates on the peer's advertised probe level on lite-05+; older drafts probe unconditionally as before. (Written by Claude) Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_0118VcLtfuukFERQ8ito5dUG --- js/net/src/lite/connection.ts | 48 ++++++++- js/net/src/lite/index.ts | 1 + js/net/src/lite/setup.test.ts | 97 +++++++++++++++++ js/net/src/lite/setup.ts | 190 ++++++++++++++++++++++++++++++++++ js/net/src/lite/stream.ts | 14 +++ js/net/src/lite/subscriber.ts | 26 +++++ js/net/src/lite/version.ts | 15 +++ 7 files changed, 386 insertions(+), 5 deletions(-) create mode 100644 js/net/src/lite/setup.test.ts create mode 100644 js/net/src/lite/setup.ts diff --git a/js/net/src/lite/connection.ts b/js/net/src/lite/connection.ts index f24a41de0..d50acd8d2 100644 --- a/js/net/src/lite/connection.ts +++ b/js/net/src/lite/connection.ts @@ -4,7 +4,7 @@ import { type Bandwidth, createBandwidth } from "../bandwidth.ts"; import type { Broadcast } from "../broadcast.ts"; import type { Established } from "../connection/established.ts"; import * as Path from "../path.ts"; -import { type Reader, Readers, Stream } from "../stream.ts"; +import { type Reader, Readers, Stream, Writer } from "../stream.ts"; import type * as Time from "../time.ts"; import { AnnounceRequest } from "./announce.ts"; import { Goaway } from "./goaway.ts"; @@ -12,11 +12,12 @@ import { Group } from "./group.ts"; import { type Origin, randomOrigin } from "./origin.ts"; import { Publisher } from "./publisher.ts"; import { SessionInfo } from "./session.ts"; -import { StreamId } from "./stream.ts"; +import { ProbeLevel, Setup } from "./setup.ts"; +import { DataType, StreamId } from "./stream.ts"; import { Subscribe } from "./subscribe.ts"; import { Subscriber } from "./subscriber.ts"; import { Track as TrackMessage } from "./track.ts"; -import { Version, versionName } from "./version.ts"; +import { hasSetupStream, Version, versionName } from "./version.ts"; const SEND_BW_POLL_INTERVAL = 100; // ms @@ -60,6 +61,11 @@ export class Connection implements Established { * chains) and Subscriber (available for optional self-filtering on announces). */ readonly origin: Origin; + // The peer's SETUP, recorded once its Setup stream is read (lite-05+). Streams whose + // encoding depends on a negotiated capability (e.g. PROBE) wait on this. undefined + // until the peer's SETUP arrives; stays undefined forever on older drafts. + #peerSetup = new Signal(undefined); + /** * Creates a new Connection instance. * @param url - The URL of the connection @@ -92,7 +98,14 @@ export class Connection implements Established { this.origin = randomOrigin(); this.#publisher = new Publisher(this.#quic, this.#version, this.origin); - this.#subscriber = new Subscriber(this.#quic, this.#version, this.origin, this.recvBandwidth, this.rtt); + this.#subscriber = new Subscriber( + this.#quic, + this.#version, + this.origin, + this.recvBandwidth, + this.rtt, + this.#peerSetup, + ); this.#run(); } @@ -115,6 +128,10 @@ export class Connection implements Established { async #run(): Promise { const tasks: Promise[] = [this.#runSession(), this.#runBidis(), this.#runUnis()]; + if (hasSetupStream(this.#version)) { + tasks.push(this.#sendSetup()); + } + if (this.sendBandwidth) { tasks.push(this.#runSendBandwidth(this.sendBandwidth)); } @@ -159,6 +176,22 @@ export class Connection implements Established { } } + // Open the unidirectional Setup Stream, send our single SETUP, and FIN (lite-05+). + // The browser uses WebTransport, which carries the request URI, so we advertise no + // path and leave routing to the URL. We advertise probe = Report (we measure and + // report bitrate over the PROBE stream, but don't actively pad the connection). + async #sendSetup(): Promise { + const writer = await Writer.open(this.#quic); + try { + await writer.u8(DataType.Setup); + await new Setup(ProbeLevel.Report).encode(writer, this.#version); + writer.close(); + } catch (err: unknown) { + writer.reset(err); + throw err; + } + } + async #runBidis() { for (;;) { const stream = await Stream.accept(this.#quic); @@ -217,9 +250,14 @@ export class Connection implements Established { async #runUni(stream: Reader) { const typ = await stream.u8(); - if (typ === 0) { + if (typ === DataType.Group) { const msg = await Group.decode(stream); await this.#subscriber.runGroup(msg, stream); + } else if (typ === DataType.Setup) { + // The peer sends exactly one SETUP, then FINs. Record it so capability-gated + // streams (e.g. PROBE) can react, then drain to the FIN. + const setup = await Setup.decode(stream, this.#version); + this.#peerSetup.set(setup); } else { throw new Error(`unknown stream type: ${typ.toString()}`); } diff --git a/js/net/src/lite/index.ts b/js/net/src/lite/index.ts index f65bd5caa..515977527 100644 --- a/js/net/src/lite/index.ts +++ b/js/net/src/lite/index.ts @@ -5,6 +5,7 @@ export * from "./goaway.ts"; export * from "./group.ts"; export * from "./probe.ts"; export * from "./session.ts"; +export * from "./setup.ts"; export * from "./stream.ts"; export * from "./subscribe.ts"; export * from "./track.ts"; diff --git a/js/net/src/lite/setup.test.ts b/js/net/src/lite/setup.test.ts new file mode 100644 index 000000000..e0f763920 --- /dev/null +++ b/js/net/src/lite/setup.test.ts @@ -0,0 +1,97 @@ +import { expect, test } from "bun:test"; +import { Reader, Writer } from "../stream.ts"; +import * as Varint from "../varint.ts"; +import { ProbeLevel, Setup } from "./setup.ts"; +import { Version } from "./version.ts"; + +function concat(chunks: Uint8Array[]): Uint8Array { + const total = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const c of chunks) { + out.set(c, offset); + offset += c.byteLength; + } + return out; +} + +async function bytes(f: (w: Writer) => Promise): Promise { + const written: Uint8Array[] = []; + const writer = new Writer( + new WritableStream({ write: (chunk) => void written.push(new Uint8Array(chunk)) }), + ); + await f(writer); + writer.close(); + await writer.closed; + return concat(written); +} + +async function roundTrip(msg: Setup): Promise { + const reader = new Reader(undefined, await bytes((w) => msg.encode(w, Version.DRAFT_05_WIP))); + const got = await Setup.decode(reader, Version.DRAFT_05_WIP); + expect(await reader.done()).toBe(true); + return got; +} + +test("empty SETUP round-trips on draft-05", async () => { + const got = await roundTrip(new Setup()); + expect(got.probe).toBe(ProbeLevel.None); + expect(got.path).toBeUndefined(); +}); + +test("each probe level round-trips on draft-05", async () => { + for (const probe of [ProbeLevel.None, ProbeLevel.Report, ProbeLevel.Increase]) { + const got = await roundTrip(new Setup(probe)); + expect(got.probe).toBe(probe); + expect(got.path).toBeUndefined(); + } +}); + +test("SETUP with path round-trips on draft-05", async () => { + const got = await roundTrip(new Setup(ProbeLevel.Report, "/room/123")); + expect(got.probe).toBe(ProbeLevel.Report); + expect(got.path).toBe("/room/123"); +}); + +test("unknown probe level saturates to Increase", async () => { + // Hand-frame a SETUP body carrying an unknown probe level (99): a 1-parameter bag + // (PROBE id 0x1) whose value is the varint 99, prefixed with the Message size. + const value = Varint.encode(99); + const body = await bytes(async (w) => { + await w.u53(1); // parameter count + await w.u62(0x1n); // PARAM_PROBE + await w.u53(value.byteLength); + await w.write(value); + }); + + const framed = await bytes(async (w) => { + await w.u53(body.byteLength); // Message size prefix + await w.write(body); + }); + + const got = await Setup.decode(new Reader(undefined, framed), Version.DRAFT_05_WIP); + expect(got.probe).toBe(ProbeLevel.Increase); +}); + +test("SETUP is rejected before draft-05", async () => { + await expect(bytes((w) => new Setup().encode(w, Version.DRAFT_04))).rejects.toThrow(); +}); + +test("SETUP decode is rejected before draft-05", async () => { + const framed = await bytes((w) => new Setup().encode(w, Version.DRAFT_05_WIP)); + await expect(Setup.decode(new Reader(undefined, framed), Version.DRAFT_04)).rejects.toThrow(); +}); + +test("empty path is rejected on decode", async () => { + // Hand-frame a SETUP with a zero-length PATH parameter. + const body = await bytes(async (w) => { + await w.u53(1); // parameter count + await w.u62(0x2n); // PARAM_PATH + await w.u53(0); // zero-length value + }); + const framed = await bytes(async (w) => { + await w.u53(body.byteLength); + await w.write(body); + }); + await expect(Setup.decode(new Reader(undefined, framed), Version.DRAFT_05_WIP)).rejects.toThrow(); +}); diff --git a/js/net/src/lite/setup.ts b/js/net/src/lite/setup.ts new file mode 100644 index 000000000..92dc8d4b0 --- /dev/null +++ b/js/net/src/lite/setup.ts @@ -0,0 +1,190 @@ +/** + * The lite-05 SETUP message: each endpoint advertises its capabilities once, as the + * sole message on a unidirectional Setup Stream ({@link DataType.Setup}), then closes it. + * + * @module + */ + +import type { Reader, Writer } from "../stream.ts"; +import * as Varint from "../varint.ts"; +import * as Message from "./message.ts"; +import { hasSetupStream, type Version } from "./version.ts"; + +/** Setup Parameter id for the Probe capability level. */ +const PARAM_PROBE = 0x1n; +/** Setup Parameter id for the request Path (client-only, URI-less transports). */ +const PARAM_PATH = 0x2n; + +/** Cap on the number of parameters in a bag, matching the Rust decoder. */ +const MAX_PARAMS = 64; + +/** + * The probe capability an endpoint advertises in SETUP. + * + * Monotonic: a higher level implies every lower one. An unknown (future) value + * decodes as the highest level we understand, so a peer that gains a new level is + * treated as at least {@link ProbeLevel.Increase}. + */ +export const ProbeLevel = { + /** No probing. Equivalent to omitting the parameter. */ + None: 0, + /** The publisher can measure and periodically report its estimated bitrate. */ + Report: 1, + /** The publisher can additionally pad the connection (or send redundant data). */ + Increase: 2, +} as const; + +/** A probe capability level. See {@link ProbeLevel}. */ +export type ProbeLevel = (typeof ProbeLevel)[keyof typeof ProbeLevel]; + +/** Map a wire value to a level, saturating unknown values to {@link ProbeLevel.Increase}. */ +function probeFromCode(code: bigint): ProbeLevel { + switch (code) { + case 0n: + return ProbeLevel.None; + case 1n: + return ProbeLevel.Report; + default: + return ProbeLevel.Increase; + } +} + +/** + * A bag of `id -> raw bytes` parameters, the body shared by SETUP. Encoded as a varint + * count followed by `id, length, value` triples; duplicate ids are rejected on decode. + */ +class Parameters { + #entries = new Map(); + + /** Set a parameter to a raw byte value, replacing any existing entry. */ + setBytes(id: bigint, value: Uint8Array) { + this.#entries.set(id, value); + } + + /** Return a parameter's raw byte value, if present. */ + getBytes(id: bigint): Uint8Array | undefined { + return this.#entries.get(id); + } + + /** Set a parameter to a varint value, replacing any existing entry. */ + setVarint(id: bigint, value: number | bigint) { + this.#entries.set(id, Varint.encode(Number(value))); + } + + /** Decode a parameter as a single varint, if present. Throws if trailing bytes remain. */ + getVarint(id: bigint): bigint | undefined { + const bytes = this.#entries.get(id); + if (bytes === undefined) return undefined; + const [value, remain] = Varint.decode(bytes); + if (remain.byteLength !== 0) { + throw new Error("trailing bytes after varint parameter"); + } + return BigInt(value); + } + + async encode(w: Writer) { + if (this.#entries.size > MAX_PARAMS) { + throw new Error("too many parameters"); + } + + await w.u53(this.#entries.size); + for (const [id, value] of this.#entries) { + await w.u62(id); + await w.u53(value.byteLength); + await w.write(value); + } + } + + static async decode(r: Reader): Promise { + const params = new Parameters(); + + const count = await r.u53(); + if (count > MAX_PARAMS) { + throw new Error("too many parameters"); + } + + for (let i = 0; i < count; i++) { + const id = await r.u62(); + if (params.#entries.has(id)) { + throw new Error(`duplicate parameter id: ${id.toString()}`); + } + const size = await r.u53(); + const value = await r.read(size); + params.#entries.set(id, value); + } + + return params; + } +} + +/** + * The SETUP message, sent once per endpoint on the unidirectional Setup Stream. + * + * lite-05+ only. The two endpoints' SETUP messages are independent: neither side + * blocks on the peer's before opening other streams, but a stream whose encoding + * depends on a negotiated capability (e.g. PROBE) must wait for it. + */ +export class Setup { + /** The probe capability this endpoint supports. {@link ProbeLevel.None} when absent. */ + probe: ProbeLevel; + + /** + * The request path, for transports that carry no request URI (native QUIC, qmux over + * TCP/TLS). Sent only by the client; a server never sends one and a relay never forwards + * it. `undefined` on URI-carrying bindings such as WebTransport. + */ + path?: string; + + constructor(probe: ProbeLevel = ProbeLevel.None, path?: string) { + this.probe = probe; + this.path = path; + } + + static #guard(version: Version) { + if (!hasSetupStream(version)) { + throw new Error("setup stream not supported for this version"); + } + } + + async #encode(w: Writer) { + const params = new Parameters(); + // None is the wire default, so omit it to keep the message empty when nothing is set. + if (this.probe !== ProbeLevel.None) { + params.setVarint(PARAM_PROBE, this.probe); + } + if (this.path !== undefined) { + params.setBytes(PARAM_PATH, new TextEncoder().encode(this.path)); + } + await params.encode(w); + } + + static async #decode(r: Reader): Promise { + const params = await Parameters.decode(r); + + const probeCode = params.getVarint(PARAM_PROBE); + const probe = probeCode === undefined ? ProbeLevel.None : probeFromCode(probeCode); + + const pathBytes = params.getBytes(PARAM_PATH); + let path: string | undefined; + if (pathBytes !== undefined) { + path = new TextDecoder().decode(pathBytes); + if (path.length === 0) { + throw new Error("empty path parameter"); + } + } + + return new Setup(probe, path); + } + + /** Encode the SETUP message with its size prefix. Throws on pre-lite-05 versions. */ + async encode(w: Writer, version: Version): Promise { + Setup.#guard(version); + return Message.encode(w, this.#encode.bind(this)); + } + + /** Decode a SETUP message with its size prefix. Throws on pre-lite-05 versions. */ + static async decode(r: Reader, version: Version): Promise { + Setup.#guard(version); + return Message.decode(r, Setup.#decode); + } +} diff --git a/js/net/src/lite/stream.ts b/js/net/src/lite/stream.ts index fa4749501..972565a9f 100644 --- a/js/net/src/lite/stream.ts +++ b/js/net/src/lite/stream.ts @@ -19,3 +19,17 @@ export const StreamId = { ClientCompat: 0x20, ServerCompat: 0x21, } as const; + +/** + * The type prefix on a unidirectional data stream, read/written as a single byte. + * + * `Group` is the per-group frame stream. `Setup` (lite-05+) carries the single SETUP + * message advertising this endpoint's capabilities. + */ +export const DataType = { + Group: 0, + Setup: 1, +} as const; + +/** A unidirectional data-stream type. See {@link DataType}. */ +export type DataType = (typeof DataType)[keyof typeof DataType]; diff --git a/js/net/src/lite/subscriber.ts b/js/net/src/lite/subscriber.ts index cd1bbe286..008bf2f77 100644 --- a/js/net/src/lite/subscriber.ts +++ b/js/net/src/lite/subscriber.ts @@ -14,6 +14,7 @@ import { AnnounceBroadcast, AnnounceInit, AnnounceOk, AnnounceRequest } from "./ import type { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; +import { ProbeLevel, type Setup } from "./setup.ts"; import { StreamId } from "./stream.ts"; import { decodeSubscribeResponse, decodeSubscribeResponseMaybe, Subscribe, SubscribeUpdate } from "./subscribe.ts"; import { TrackInfo, Track as TrackMessage } from "./track.ts"; @@ -92,6 +93,10 @@ export class Subscriber { // RTT producer (Lite04+ only). #rtt?: Signal; + // The peer's SETUP (lite-05+), undefined until it arrives. Gates opening the PROBE + // stream on the peer having advertised Probe >= Report. + #peerSetup?: Signal; + /** * Creates a new Subscriber instance. * @param quic - The WebTransport session to use @@ -99,6 +104,7 @@ export class Subscriber { * @param origin - Origin id shared with the Publisher * @param recvBandwidth - Optional bandwidth producer for PROBE * @param rtt - Optional RTT signal for PROBE + * @param peerSetup - Optional peer SETUP slot for capability gating (lite-05+) * * @internal */ @@ -108,12 +114,14 @@ export class Subscriber { origin: Origin, recvBandwidth?: Bandwidth, rtt?: Signal, + peerSetup?: Signal, ) { this.#quic = quic; this.version = version; this.origin = origin; this.#recvBandwidth = recvBandwidth; this.#rtt = rtt; + this.#peerSetup = peerSetup; } /** @@ -531,10 +539,28 @@ export class Subscriber { * * @internal */ + // Await the peer's advertised probe level, blocking until its SETUP arrives. The peer + // MUST send exactly one SETUP, so this resolves once that stream is read. + async #peerProbeLevel(peerSetup: Signal): Promise { + let setup = peerSetup.peek(); + while (setup === undefined) { + setup = await peerSetup.next(); + } + return setup.probe; + } + async runProbe(): Promise { if (!this.#recvBandwidth) return; if (this.version === Version.DRAFT_01 || this.version === Version.DRAFT_02) return; + // Lite-05+ gates the PROBE stream on the peer advertising Probe >= Report in its + // SETUP. Wait for the SETUP, then bail if the peer can't report bitrate. Older + // drafts have no SETUP, so they keep probing unconditionally. + if (this.#peerSetup) { + const probe = await this.#peerProbeLevel(this.#peerSetup); + if (probe < ProbeLevel.Report) return; + } + // Probe is best-effort: any failure (stream reset by peer, missing peer support, // transport hiccup) MUST NOT tear down the connection. On error, drop the // bandwidth/RTT estimates so consumers know they're stale. diff --git a/js/net/src/lite/version.ts b/js/net/src/lite/version.ts index 69a32239a..f0aac601d 100644 --- a/js/net/src/lite/version.ts +++ b/js/net/src/lite/version.ts @@ -44,6 +44,21 @@ export function hasBroadcastEpoch(version: Version): boolean { } } +/// Whether the session opens a unidirectional Setup Stream carrying a single SETUP message +/// (capabilities + optional Path). Added in lite-05; older drafts have no Setup Stream. +export function hasSetupStream(version: Version): boolean { + // Explicitly list older versions so future versions default to having the stream. + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + case Version.DRAFT_03: + case Version.DRAFT_04: + return false; + default: + return true; + } +} + /// The WebTransport subprotocol identifier for moq-lite. /// Version negotiation still happens via SETUP when this is used. export const ALPN = "moql"; From 83da61232de6069900a0f5600c36b90fa1bd09a6 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 01:59:05 +0000 Subject: [PATCH 5/5] style(moq-net): cargo fmt Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_0118VcLtfuukFERQ8ito5dUG --- rs/moq-net/src/ietf/subscriber.rs | 6 +++++- rs/moq-net/src/lite/announce.rs | 5 ++++- rs/moq-net/src/lite/parameters.rs | 4 +++- rs/moq-net/src/lite/session.rs | 6 +----- rs/moq-net/src/lite/setup.rs | 10 ++++++++-- rs/moq-net/src/lite/subscriber.rs | 8 +++++++- 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index 5c06be6e5..c97aa4399 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -462,7 +462,11 @@ impl Subscriber { .expect("an empty hop chain has room for one entry"); // moq-transport carries no broadcast epoch on the wire; stamp the current // time so the instance is still ordered against any future re-announce. - let broadcast = BroadcastInfo { hops, ..Default::default() }.produce(); + let broadcast = BroadcastInfo { + hops, + ..Default::default() + } + .produce(); // Create the dynamic handler BEFORE publishing so consumers see // dynamic >= 1 the moment they receive the announce. Otherwise a diff --git a/rs/moq-net/src/lite/announce.rs b/rs/moq-net/src/lite/announce.rs index fc117af2c..b0cb18f1e 100644 --- a/rs/moq-net/src/lite/announce.rs +++ b/rs/moq-net/src/lite/announce.rs @@ -315,7 +315,10 @@ mod tests { let version = Version::Lite04; let mut slice = encode_forged_restart(version); assert!( - matches!(AnnounceBroadcast::decode(&mut slice, version), Err(DecodeError::InvalidValue)), + matches!( + AnnounceBroadcast::decode(&mut slice, version), + Err(DecodeError::InvalidValue) + ), "restart status must be rejected before lite-05" ); } diff --git a/rs/moq-net/src/lite/parameters.rs b/rs/moq-net/src/lite/parameters.rs index f175beb00..94c378138 100644 --- a/rs/moq-net/src/lite/parameters.rs +++ b/rs/moq-net/src/lite/parameters.rs @@ -27,7 +27,9 @@ impl Parameters { pub fn set_varint(&mut self, id: u64, value: u64) { let mut buf = Vec::new(); // Infallible: writing into a Vec never runs short. - value.encode(&mut buf, Version::Lite05Wip).expect("varint encode into Vec"); + value + .encode(&mut buf, Version::Lite05Wip) + .expect("varint encode into Vec"); self.0.insert(id, buf); } diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index fda4bab4c..5d807b5f1 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -124,11 +124,7 @@ pub fn start( } /// Open a unidirectional Setup Stream, send our single SETUP message, and FIN. -async fn send_setup( - session: &S, - setup: Setup, - version: Version, -) -> Result<(), Error> { +async fn send_setup(session: &S, setup: Setup, version: Version) -> Result<(), Error> { let stream = session.open_uni().await.map_err(Error::from_transport)?; let mut writer = Writer::new(stream, version); writer.encode(&super::DataType::Setup).await?; diff --git a/rs/moq-net/src/lite/setup.rs b/rs/moq-net/src/lite/setup.rs index d12ab7d86..31a6530d0 100644 --- a/rs/moq-net/src/lite/setup.rs +++ b/rs/moq-net/src/lite/setup.rs @@ -68,7 +68,10 @@ impl Message for Setup { } let params = Parameters::decode(r, version)?; - let probe = params.get_varint(PARAM_PROBE)?.map(ProbeLevel::from_code).unwrap_or_default(); + let probe = params + .get_varint(PARAM_PROBE)? + .map(ProbeLevel::from_code) + .unwrap_or_default(); let path = match params.get_bytes(PARAM_PATH) { Some(bytes) => { let s = std::str::from_utf8(bytes).map_err(|_| DecodeError::InvalidValue)?; @@ -198,6 +201,9 @@ mod tests { fn rejects_before_lite05() { let msg = Setup::default(); let mut buf = bytes::BytesMut::new(); - assert!(matches!(msg.encode(&mut buf, Version::Lite04), Err(EncodeError::Version))); + assert!(matches!( + msg.encode(&mut buf, Version::Lite04), + Err(EncodeError::Version) + )); } } diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index c4e229f98..5b0fff8d0 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -235,7 +235,13 @@ impl Subscriber { let abs = self.origin.absolute(&path).to_owned(); // Lite01/02 don't carry hop information or an epoch; the broadcast starts with // an empty chain and a zero epoch (decoded as the 2020 base instant). - if self.start_announce(path.clone(), crate::OriginList::new(), 0, responder_origin, &mut producers)? { + if self.start_announce( + path.clone(), + crate::OriginList::new(), + 0, + responder_origin, + &mut producers, + )? { stats_guards.insert(abs.clone(), self.stats.broadcast(&abs).subscriber()); } }