diff --git a/rs/moq-mux/src/codec/av1/import.rs b/rs/moq-mux/src/codec/av1/import.rs index 0df8ef832..8ea9f9fbb 100644 --- a/rs/moq-mux/src/codec/av1/import.rs +++ b/rs/moq-mux/src/codec/av1/import.rs @@ -1,4 +1,4 @@ -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use anyhow::Context; use bytes::BytesMut; @@ -26,7 +26,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } #[derive(Default)] @@ -45,7 +45,7 @@ impl Import { config: None, current: Default::default(), zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -57,7 +57,7 @@ impl Import { config: None, current: Default::default(), zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/codec/h264/import.rs b/rs/moq-mux/src/codec/h264/import.rs index ffffcfe57..42a39c167 100644 --- a/rs/moq-mux/src/codec/h264/import.rs +++ b/rs/moq-mux/src/codec/h264/import.rs @@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use super::Sps; use crate::catalog::hang::CatalogExt; use crate::codec::annexb::{NalIterator, START_CODE}; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; /// The wire shape an [`Import`] is processing. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -37,7 +37,7 @@ pub struct Import { config: Option, state: State, zero: Option, - jitter: MinFrameDuration, + jitter: Jitter, } enum State { @@ -78,7 +78,7 @@ impl Import { config: None, state: State::Pending { mode_hint: None }, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -90,7 +90,7 @@ impl Import { config: None, state: State::Pending { mode_hint: None }, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -258,6 +258,22 @@ impl Import { } } + /// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the + /// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container + /// supplies this since the elementary stream alone carries no decode time. No-op until the + /// track exists. + pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) { + let Some(jitter) = self.jitter.observe_reorder(reorder) else { + return; + }; + let Some(track) = self.track.as_ref() else { + return; + }; + if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) { + c.jitter = Some(jitter); + } + } + fn decode_avc1>( &mut self, buf: &mut T, @@ -407,7 +423,13 @@ impl Import { // The avc3 track was created eagerly in initialize_avc3; just publish // (or republish) the catalog rendition with the latest config. let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone(); - self.catalog.lock().video.renditions.insert(track_name, config.clone()); + // Seed jitter from whatever has accumulated: a dirty start feeds frames before + // this first rendition exists, so those per-frame updates would otherwise be + // lost. Keep the cached `config` jitter-free so a later jitter change is not + // mistaken for a codec reconfiguration. + let mut published = config.clone(); + published.jitter = self.jitter.current(); + self.catalog.lock().video.renditions.insert(track_name, published); self.config = Some(config); Ok(reconfigured) } diff --git a/rs/moq-mux/src/codec/h265/import.rs b/rs/moq-mux/src/codec/h265/import.rs index dac453bdf..981fd8f89 100644 --- a/rs/moq-mux/src/codec/h265/import.rs +++ b/rs/moq-mux/src/codec/h265/import.rs @@ -1,6 +1,6 @@ use crate::catalog::hang::CatalogExt; use crate::codec::annexb::{NalIterator, START_CODE}; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; @@ -37,7 +37,7 @@ pub struct Import { pps: Vec, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -52,7 +52,7 @@ impl Import { vps: Vec::new(), sps: Vec::new(), pps: Vec::new(), - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -67,7 +67,7 @@ impl Import { vps: Vec::new(), sps: Vec::new(), pps: Vec::new(), - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -101,6 +101,11 @@ impl Import { } let reconfigured = self.config.is_some(); + // Seed jitter from whatever has accumulated: a dirty start feeds frames before this + // first rendition exists, so those per-frame updates would otherwise be lost. The + // cached `config` stays jitter-free so a later jitter change is not mistaken for a + // codec reconfiguration. + let jitter = self.jitter.current(); let mut catalog = self.catalog.lock(); if self.track.is_some() && self.tracks.is_fixed() { @@ -114,7 +119,9 @@ impl Import { let track = self.tracks.create()?; tracing::debug!(name = ?track.name, ?config, "starting track"); - catalog.video.renditions.insert(track.name.clone(), config.clone()); + let mut published = config.clone(); + published.jitter = jitter; + catalog.video.renditions.insert(track.name.clone(), published); self.config = Some(config); self.track = @@ -173,6 +180,22 @@ impl Import { /// This can also be used when EOF is detected to flush the final frame. /// /// NOTE: The next decode will fail if it doesn't begin with a start code. + /// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the + /// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container + /// supplies this since the elementary stream alone carries no decode time. No-op until the + /// track exists. + pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) { + let Some(jitter) = self.jitter.observe_reorder(reorder) else { + return; + }; + let Some(track) = self.track.as_ref() else { + return; + }; + if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) { + c.jitter = Some(jitter); + } + } + pub fn decode_frame>( &mut self, buf: &mut T, diff --git a/rs/moq-mux/src/codec/vp8/import.rs b/rs/moq-mux/src/codec/vp8/import.rs index e1a1bd48f..8d5a94250 100644 --- a/rs/moq-mux/src/codec/vp8/import.rs +++ b/rs/moq-mux/src/codec/vp8/import.rs @@ -1,7 +1,7 @@ use anyhow::Context; use bytes::Buf; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use super::FrameHeader; @@ -28,7 +28,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -39,7 +39,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -50,7 +50,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/codec/vp9/import.rs b/rs/moq-mux/src/codec/vp9/import.rs index 9ba89ca66..e7ae4fb0d 100644 --- a/rs/moq-mux/src/codec/vp9/import.rs +++ b/rs/moq-mux/src/codec/vp9/import.rs @@ -1,7 +1,7 @@ use anyhow::Context; use bytes::Buf; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use super::FrameHeader; @@ -28,7 +28,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -39,7 +39,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -50,7 +50,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/container/jitter.rs b/rs/moq-mux/src/container/jitter.rs index 4d3e67299..d99c0c732 100644 --- a/rs/moq-mux/src/container/jitter.rs +++ b/rs/moq-mux/src/container/jitter.rs @@ -1,36 +1,71 @@ use crate::container::Timestamp; -/// Tracks the minimum duration between consecutive frames. +/// Tracks the catalog `jitter` for a video/audio track: the maximum delay before a frame can +/// be emitted, so a player sizes its buffer to at least this much. /// -/// This is the value reported as `jitter` in the catalog: a player should -/// buffer at least this much before emitting frames. Despite the name "jitter", -/// what we actually record is the *minimum frame duration* observed so far. +/// It reports whichever is larger of two contributions: +/// - the minimum frame duration (the steady inter-frame spacing), and +/// - the reorder delay (`max(PTS - DTS)`), which is non-zero only for reordered (B-frame) +/// streams and which a transmuxer also reuses as the decode-clock reserve. +/// +/// A non-reordered stream reports the frame duration; a B-frame stream reports the deeper +/// reorder delay (e.g. up to 3 consecutive B-frames is 3x the frame duration). #[derive(Default)] -pub struct MinFrameDuration { +pub struct Jitter { last_timestamp: Option, min_duration: Option, + max_reorder: Timestamp, + /// Last value handed back from [`observe`](Self::observe) / + /// [`observe_reorder`](Self::observe_reorder), so they only report on a change. + reported: Option, } -impl MinFrameDuration { +impl Jitter { pub fn new() -> Self { Self::default() } - /// Record a new frame timestamp. - /// - /// Returns the new minimum-frame-duration as a `moq_net::Time` if it - /// changed, so the caller can persist it on the catalog rendition. Returns - /// `None` when this is the first observation, the timestamps are - /// non-monotonic, or the new gap is no smaller than the recorded minimum. + /// Record a frame's presentation timestamp (decode order), updating the minimum frame + /// duration. Returns the new jitter as a [`moq_net::Time`] if it changed, else `None`. + /// The first observation and non-monotonic timestamps (B-frames) only update state. pub fn observe(&mut self, ts: Timestamp) -> Option { - let last = self.last_timestamp.replace(ts)?; - let duration = ts.checked_sub(last).ok()?; + if let Some(last) = self.last_timestamp.replace(ts) + && let Ok(duration) = ts.checked_sub(last) + && !duration.is_zero() + && duration < self.min_duration.unwrap_or(Timestamp::MAX) + { + self.min_duration = Some(duration); + } + self.report() + } + + /// Record a frame's reorder delay (`PTS - DTS`), updating the maximum. Returns the new + /// jitter as a [`moq_net::Time`] if it changed, else `None`. + pub fn observe_reorder(&mut self, reorder: Timestamp) -> Option { + self.max_reorder = self.max_reorder.max(reorder); + self.report() + } - if duration >= self.min_duration.unwrap_or(Timestamp::MAX) { + /// The current jitter (the larger of the frame duration and the reorder delay), without + /// the change-detection of [`observe`](Self::observe). Used to seed a freshly created + /// catalog rendition with whatever has accumulated, since per-frame updates before the + /// rendition exists would otherwise be lost. + pub fn current(&self) -> Option { + let jitter = self.combined(); + (!jitter.is_zero()).then(|| jitter.convert().ok()).flatten() + } + + fn combined(&self) -> Timestamp { + self.min_duration.unwrap_or(Timestamp::ZERO).max(self.max_reorder) + } + + /// Report the current jitter only when it changes. + fn report(&mut self) -> Option { + let jitter = self.combined(); + if jitter.is_zero() || self.reported == Some(jitter) { return None; } - - self.min_duration = Some(duration); - duration.convert().ok() + self.reported = Some(jitter); + jitter.convert().ok() } } diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index f8a891d3b..47c26e674 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -74,6 +74,13 @@ struct Track { /// PMT ES-level descriptors to re-announce, captured verbatim on import (language, /// registration, ...). Empty for non-TS sources; AC-3/E-AC-3 then synthesize one. descriptors: Vec, + /// Last decode timestamp (continuous 90 kHz ticks) authored for this track, keeping the + /// decode clock monotonic across reordered (B-frame) video. Only video uses it. + last_dts: Option, + /// Decode-clock reserve (90 kHz ticks): how far ahead of its PTS each frame decodes. Taken + /// from the catalog `jitter` (the reorder depth) so it is large enough for `DTS <= PTS`, + /// or [`DEFAULT_DTS_RESERVE`] when the catalog declares none. Only video uses it. + dts_reserve: u64, } #[derive(Clone)] @@ -117,6 +124,10 @@ struct PesUnit { is_video: bool, keyframe: bool, timestamp: crate::container::Timestamp, + /// Authored decode timestamp for a reordered (B-frame) video frame, in continuous + /// (unwrapped) 90 kHz ticks (wrapped to the wire field in `write_pes`). `Some` only when + /// it differs from the PTS; the PES then carries both PTS and DTS. + dts: Option, /// Explicit PES stream_id (verbatim PES); `None` derives it from `is_video`. stream_id: Option, } @@ -330,16 +341,20 @@ impl Export { let kind = video_kind(config, name)?; let descriptors = track_descriptors(&mpegts, name); let pid = pids[name]; + // The catalog `jitter` carries the reorder depth (max PTS - DTS), so use it as the + // decode-clock reserve; it may arrive in a later snapshot, so refresh it each time. + let reserve = dts_reserve(config); match old.remove(name) { Some(mut track) => { track.pid = pid; track.kind = kind; track.descriptors = descriptors; + track.dts_reserve = reserve; self.tracks.insert(name.clone(), track); } None => { let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, reserve); } } } @@ -356,7 +371,7 @@ impl Export { } None => { let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } } @@ -380,7 +395,7 @@ impl Export { } None => { let source = ExportSource::for_stream(&self.broadcast, name, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } } @@ -395,6 +410,7 @@ impl Export { pid: u16, kind: Kind, descriptors: Vec, + dts_reserve: u64, ) { self.tracks.insert( name.to_string(), @@ -405,6 +421,8 @@ impl Export { pid, kind, descriptors, + last_dts: None, + dts_reserve, }, ); } @@ -592,6 +610,16 @@ impl Export { } => None, }; + // Author a monotonic decode timeline for reordered video (B-frames). Other kinds + // never reorder, so DTS == PTS and the PES stays PTS-only. + let dts = if is_video { + let pts = to_ticks(frame.timestamp); + let track = self.tracks.get_mut(name).context("missing track")?; + author_dts(pts, track.dts_reserve, &mut track.last_dts) + } else { + None + }; + let mut out = Vec::with_capacity(TsPacket::SIZE); // Refresh PSI at keyframes or after the interval lapses. @@ -626,6 +654,7 @@ impl Export { is_video, keyframe: frame.keyframe, timestamp: frame.timestamp, + dts, stream_id, }; self.write_pes(&mut out, &unit, &es_payload)?; @@ -637,6 +666,12 @@ impl Export { /// Packetize a PES payload into 188-byte TS packets. fn write_pes(&mut self, out: &mut Vec, unit: &PesUnit, payload: &[u8]) -> anyhow::Result<()> { let pts = to_ts_timestamp(unit.timestamp)?; + // A reordered video frame carries DTS alongside PTS; else PTS-only. The decode clock + // is continuous ticks, so wrap into the 33-bit wire field here, like the PTS. + let dts = unit + .dts + .map(|t| TsTimestamp::new(t & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg)) + .transpose()?; let stream_id = match unit.stream_id { Some(id) => StreamId::new(id), None if unit.is_video => StreamId::new(StreamId::VIDEO_MIN), @@ -649,19 +684,25 @@ impl Export { copyright: false, original_or_copy: false, pts: Some(pts), - dts: None, + dts, escr: None, }; + // The optional PES header grows by 5 bytes when it also carries a DTS. + let optional_len = PES_OPTIONAL_LEN + if dts.is_some() { PES_DTS_LEN } else { 0 }; + // `pes_packet_len` counts the optional header plus the payload (not the // 6-byte fixed prefix). Unbounded for video (0); bounded for audio when // it fits a u16. let pes_packet_len = if unit.is_video { 0 } else { - u16::try_from(PES_OPTIONAL_LEN + payload.len()).unwrap_or(0) + u16::try_from(optional_len + payload.len()).unwrap_or(0) }; + // PCR follows the decode clock, so a B-frame stream advertises DTS (not PTS) here. + let pcr = dts.unwrap_or(pts); + let mut offset = 0; let mut first = true; loop { @@ -670,7 +711,7 @@ impl Export { discontinuity_indicator: false, random_access_indicator: unit.keyframe, es_priority_indicator: false, - pcr: if unit.is_pcr { Some(pts.into()) } else { None }, + pcr: if unit.is_pcr { Some(pcr.into()) } else { None }, opcr: None, splice_countdown: None, transport_private_data: Vec::new(), @@ -680,7 +721,7 @@ impl Export { None }; - let header_len = if first { PES_HEADER_LEN } else { 0 }; + let header_len = if first { 6 + optional_len } else { 0 }; let af_len = adaptation.as_ref().map(adaptation_size).unwrap_or(0); let avail = TsBytes::MAX_SIZE - header_len - af_len; let take = avail.min(payload.len() - offset); @@ -781,8 +822,15 @@ impl Export { /// Optional PES header region carrying PTS only: 2 flag bytes + 1 length byte + 5 PTS bytes. const PES_OPTIONAL_LEN: usize = 3 + 5; -/// Full on-wire PES header for the first packet: 6-byte fixed prefix + optional region. -const PES_HEADER_LEN: usize = 6 + PES_OPTIONAL_LEN; +/// Extra bytes when the optional region also carries a DTS (5 DTS bytes). +const PES_DTS_LEN: usize = 5; +/// Fallback decode-clock reserve in 90 kHz ticks when the catalog declares no `jitter`. At +/// 16 ticks (~0.18 ms) it is just a strict-monotonic nudge: it keeps DTS strictly increasing +/// across reordered (B-frame) decode order (the `ffplay -fflags +igndts` fix) but does not +/// keep `DTS <= PTS`. When the catalog carries `jitter` (the reorder depth, populated on +/// import), the track uses that instead, which is large enough to keep `DTS <= PTS`. See +/// [`author_dts`] and [`Track::dts_reserve`]. +const DEFAULT_DTS_RESERVE: u64 = 16; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -794,11 +842,19 @@ fn adaptation_size(af: &AdaptationField) -> usize { 2 + if af.pcr.is_some() { 6 } else { 0 } } +/// The 33-bit wire timestamp field (90 kHz). DTS and PTS both wrap into it. +const TS_TIMESTAMP_MASK: u64 = (1 << 33) - 1; + +/// Continuous (unwrapped) 90 kHz tick count for a media timestamp. The decode clock runs in +/// this domain so it never wraps mid-stream (the source timestamps are already unwrapped); +/// [`to_ts_timestamp`] masks to the 33-bit wire field only at emission. +fn to_ticks(timestamp: crate::container::Timestamp) -> u64 { + (timestamp.as_micros() * 90_000 / 1_000_000) as u64 +} + fn to_ts_timestamp(timestamp: crate::container::Timestamp) -> anyhow::Result { - // micros -> 90 kHz, wrapped into the 33-bit field. - let micros = timestamp.as_micros(); - let ticks = (micros * 90_000 / 1_000_000) as u64 & ((1 << 33) - 1); - TsTimestamp::new(ticks).map_err(anyhow::Error::msg) + // Continuous 90 kHz ticks, wrapped into the 33-bit field. + TsTimestamp::new(to_ticks(timestamp) & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg) } fn video_kind(config: &VideoConfig, name: &str) -> anyhow::Result { @@ -888,9 +944,143 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( } } +/// Author a monotonic decode timestamp (DTS) for a reordered (B-frame) video frame. +/// +/// [`Frame`] carries only a presentation timestamp (PTS) and frames reach the muxer in +/// decode order (MoQ groups and frames are delivered in decode order), so a B-frame stream +/// arrives with valid but non-monotonic PTS and no decode time. MPEG-TS players need a +/// monotonic DTS to schedule decoding; without it they choke on the out-of-order PTS (the +/// `ffplay -fflags +igndts` workaround). +/// +/// Since decode order is already the delivery order, the only job is to keep DTS strictly +/// increasing. The clock runs [`DTS_RESERVE`] ticks behind the PTS and never goes backwards: +/// a reordered frame whose PTS dips below the clock is nudged one tick past the last DTS. With +/// the small reserve this keeps DTS monotonic but lets it sit above a B-frame's own PTS; a +/// frame-scale reserve (or the faithful wire DTS) would be needed for `DTS <= PTS`. +/// +/// `reserve` is how far behind the PTS to run the clock (the catalog reorder depth, or the +/// fallback). `pts` and `last` are continuous (unwrapped) 90 kHz ticks, so the clock never +/// wraps mid-stream; the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is +/// the previous DTS, updated in place. Returns `None` when the DTS equals the PTS (PES stays +/// PTS-only). +fn author_dts(pts: u64, reserve: u64, last: &mut Option) -> Option { + let mut dts = pts.saturating_sub(reserve); + if let Some(prev) = *last + && dts <= prev + { + dts = prev + 1; + } + *last = Some(dts); + (dts != pts).then_some(dts) +} + +/// The decode-clock reserve for a video rendition: its catalog `jitter` (the reorder depth) +/// in 90 kHz ticks, or [`DEFAULT_DTS_RESERVE`] when none is declared. +fn dts_reserve(config: &VideoConfig) -> u64 { + config + .jitter + .map(|t| t.as_scale(90_000) as u64) + .filter(|&ticks| ticks > 0) + .unwrap_or(DEFAULT_DTS_RESERVE) +} + #[cfg(test)] mod tests { - use super::is_complete_section; + use super::{DEFAULT_DTS_RESERVE, author_dts, is_complete_section}; + + /// Push a decode-order PTS stream (90 kHz) through the decode clock with a given reserve and + /// return the effective DTS per frame (the authored DTS, or the PTS when none is authored). + fn run_clock(pts: &[u64], reserve: u64) -> Vec { + let mut last = None; + pts.iter() + .map(|&p| author_dts(p, reserve, &mut last).unwrap_or(p)) + .collect() + } + + /// Decode-order PTS for a constant-frame-rate display timeline with `b` B-frames between + /// each pair of reference frames (the common broadcast structure: references pulled ahead + /// of the B-frames they predict). `base` keeps the timeline off zero, like a real feed's + /// initial PTS offset. + fn decode_order(refs: usize, b: usize, dur: u64, base: u64) -> Vec { + let pts = |display: usize| base + display as u64 * dur; + let span = b + 1; + let mut out = vec![pts(0)]; // first reference (keyframe) at display 0 + for g in 1..refs { + let reference = g * span; + out.push(pts(reference)); // reference, decoded before its B-frames + for j in 1..=b { + out.push(pts(reference - span + j)); // the B-frames between the two references + } + } + out + } + + #[test] + fn dts_is_monotonic_across_reorder() { + // 25 fps, 10 s offset. Even with the tiny fallback reserve the decode timeline is + // strictly increasing (the `+igndts` fix); it just may sit above PTS for B-frames. + for b in [1, 3, 5] { + let pts = decode_order(40, b, 3_600, 10_000_000); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); + + // The fixture genuinely reorders (PTS dips in decode order). + assert!(pts.windows(2).any(|w| w[1] < w[0]), "b={b}: stream must reorder PTS"); + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); + } + } + } + + #[test] + fn sufficient_reserve_keeps_dts_under_pts() { + // With a reserve covering the reorder span (the catalog `jitter` carries it), the decode + // timeline is both strictly increasing and never after the PTS. + let dur = 3_600; + for b in [1, 3, 5] { + let reserve = (b as u64 + 1) * dur; // one frame past the b-frame run + let pts = decode_order(40, b, dur, 10_000_000); + let dts = run_clock(&pts, reserve); + + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); + } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "b={b}: DTS {d} after PTS {p} at {i}"); + } + } + } + + #[test] + fn dts_clock_survives_33bit_wrap() { + // The decode clock runs in continuous ticks, so it stays strictly increasing even as + // the source timeline crosses the 33-bit wire boundary (~26.5 h). The wrap is applied + // only at emission, so here the authored DTS keeps climbing past 1 << 33. + let wrap = 1u64 << 33; + let pts = decode_order(40, 3, 3_600, wrap - 20 * 3_600); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); + + assert!(pts.iter().any(|&p| p >= wrap), "test must cross the wrap boundary"); + for (i, win) in dts.windows(2).enumerate() { + assert!( + win[1] > win[0], + "DTS not strictly increasing across wrap at {i}: {win:?}" + ); + } + } + + #[test] + fn dts_without_reorder_trails_pts_by_the_reserve() { + // A monotonic (no-B) stream stays strictly increasing and one reserve under its PTS. + let pts: Vec = (0..40).map(|i| 10_000_000 + i * 3_600).collect(); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); + + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "DTS not strictly increasing at {i}: {win:?}"); + } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert_eq!(d, p - DEFAULT_DTS_RESERVE, "DTS should trail PTS by the reserve at {i}"); + } + } #[test] fn section_validation() { diff --git a/rs/moq-mux/src/container/ts/export_test.rs b/rs/moq-mux/src/container/ts/export_test.rs index d8f16a53b..43dbb4fa4 100644 --- a/rs/moq-mux/src/container/ts/export_test.rs +++ b/rs/moq-mux/src/container/ts/export_test.rs @@ -334,6 +334,75 @@ async fn export_avc1_out_of_band_reassembles() { assert_eq!(reassembled.as_slice(), annexb(&[SPS, PPS, &idr]).as_ref()); } +/// A real broadcast contribution feed (Ateme Kyrion, H.264 1080i with ~86 B-frames) +/// must come out of the exporter with an authored decode timeline. The importer publishes +/// the reorder depth as the catalog `jitter`, and the exporter sizes its decode-clock reserve +/// from it, so the video PES carry a DTS that is both strictly increasing and never after the +/// PTS in decode order. Also assert the reorder was real (non-monotonic PTS in the source). +#[tokio::test(start_paused = true)] +async fn export_bframe_video_authors_dts() { + let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts"); + + let mut broadcast = moq_net::Broadcast::new().produce(); + let consumer = broadcast.consume(); + let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap(); + let mut import = crate::container::ts::Import::new(broadcast, catalog.clone()); + import.decode(&mut BytesMut::from(&data[..])).unwrap(); + import.finish().unwrap(); + + // `import` and `catalog` stay alive: retained tracks the exporter subscribes to. + let ts = drain(consumer).await; + assert_packet_aligned(&ts); + + // Collect (pts, dts) for the H.264 video PID in transport (decode) order. + let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref())); + let mut video_pid = None; + let mut pts = Vec::new(); + let mut authored = 0usize; + let mut effective = Vec::new(); + while let Some(packet) = reader.read_ts_packet().unwrap() { + match packet.payload { + Some(TsPayload::Pmt(pmt)) => { + if video_pid.is_none() { + video_pid = pmt + .es_info + .iter() + .find(|e| e.stream_type == StreamType::H264) + .map(|e| e.elementary_pid); + } + } + Some(TsPayload::PesStart(pes)) if Some(packet.header.pid) == video_pid => { + let p = pes.header.pts.expect("video PES carried no PTS").as_u64(); + let d = pes.header.dts.map(|t| t.as_u64()); + if d.is_some() { + authored += 1; + } + effective.push(d.unwrap_or(p)); + pts.push(p); + } + _ => {} + } + } + + assert!(video_pid.is_some(), "missing H.264 video PMT entry"); + assert!(pts.len() > 50, "expected the full feed, got {} frames", pts.len()); + // The source is genuinely reordered: PTS dips in decode order (B-frames). + assert!( + pts.windows(2).any(|w| w[1] < w[0]), + "fixture must carry reordered B-frames" + ); + // The exporter authored a decode timeline (the decode clock trails the PTS). + assert!(authored > 0, "no DTS authored for a B-frame stream"); + // Strictly increasing (removes the `+igndts` requirement) and never after presentation + // (the catalog jitter sized the reserve to the reorder depth). + for (i, win) in effective.windows(2).enumerate() { + assert!(win[1] > win[0], "DTS not strictly increasing at frame {i}: {win:?}"); + } + for (i, (&d, &p)) in effective.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "DTS {d} after PTS {p} at frame {i}"); + } +} + /// Full SCTE-35 round-trip: import `bbb.ts` (real H.264 + AAC) into a broadcast /// that also carries a `.scte35` cue track, export to TS, re-import, and assert /// the splice_info_section came back byte-for-byte. The PMT must advertise the diff --git a/rs/moq-mux/src/container/ts/import.rs b/rs/moq-mux/src/container/ts/import.rs index 94f3bda31..f6feea0fd 100644 --- a/rs/moq-mux/src/container/ts/import.rs +++ b/rs/moq-mux/src/container/ts/import.rs @@ -472,6 +472,7 @@ impl Import { let data_len = pes_data_len(&pes.header, pes.pes_packet_len); let mut pending = Pending { pts: pes.header.pts.map(|t| t.as_u64()), + dts: pes.header.dts.map(|t| t.as_u64()), stream_id: pes.header.stream_id.as_u8(), data: Vec::with_capacity(pes.data.len()), data_len, @@ -586,6 +587,9 @@ impl Import { struct Pending { /// Raw 90 kHz PTS, before wrap-unwrapping. pts: Option, + /// Raw 90 kHz DTS, before wrap-unwrapping. Present on reordered (B-frame) video; its + /// distance below the PTS is the reorder delay published as the catalog jitter. + dts: Option, /// PES stream_id, preserved for verbatim PES carriage. stream_id: u8, data: Vec, @@ -981,12 +985,23 @@ impl Stream { fn write(&mut self, pending: Pending, burst: Option) -> anyhow::Result<()> { match self { Stream::H264 { import, unwrap } => { + let reorder = reorder_delay(pending.pts, pending.dts); let pts = unwrap_pts(unwrap, pending.pts)?; - import.decode_frame(&mut pending.data.as_slice(), pts) + import.decode_frame(&mut pending.data.as_slice(), pts)?; + // After decode_frame, so the track (and its catalog rendition) exists. + if let Some(reorder) = reorder { + import.observe_reorder(reorder); + } + Ok(()) } Stream::H265 { import, unwrap } => { + let reorder = reorder_delay(pending.pts, pending.dts); let pts = unwrap_pts(unwrap, pending.pts)?; - import.decode_frame(&mut pending.data.as_slice(), pts) + import.decode_frame(&mut pending.data.as_slice(), pts)?; + if let Some(reorder) = reorder { + import.observe_reorder(reorder); + } + Ok(()) } Stream::Aac(stream) => stream.write(pending, burst), Stream::Legacy(stream) => stream.write(pending), @@ -1298,6 +1313,22 @@ fn unwrap_pts(unwrap: &mut PtsUnwrap, pts: Option) -> anyhow::Result