From bf4e26b9e7e5b1bfda41e1c2d445b8454f05bac8 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 20 Jun 2026 17:30:05 -0700 Subject: [PATCH 1/6] fix(moq-mux): author DTS for B-frame MPEG-TS export The TS exporter set `dts: None` on every PES, so B-frame streams carried no decode timeline. Players saw PTS-only video with out-of-order decode times and needed `ffplay -fflags +igndts` to play (reproduced on a real CNN 1080i contribution feed through `moq pub | relay | moq sub`). MoQ groups and frames are delivered in decode order, so the only job is keeping DTS monotonic. A forward-only decode clock runs DTS_RESERVE ticks (0.25 s) behind the PTS and never goes backwards: a reordered frame whose PTS dips below the clock is nudged one tick past the last DTS. The reserve gives the reorder room to land under each frame's own PTS (DTS <= PTS) for broadcast B-frame spans; it is decode lead time, not presentation latency (frames still show at their PTS), so it is effectively free. No buffering, no estimation, no startup latency. PCR follows the decode clock too. The PES optional header grows by 5 bytes when it carries a DTS. No public API or wire/format change. Closes #1836 Co-Authored-By: Claude Opus 4.8 --- rs/moq-mux/src/container/ts/export.rs | 131 +++++++++++++++++++-- rs/moq-mux/src/container/ts/export_test.rs | 69 +++++++++++ 2 files changed, 193 insertions(+), 7 deletions(-) diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index f8a891d3b..361c8045b 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -74,6 +74,9 @@ struct Track { /// PMT ES-level descriptors to re-announce, captured verbatim on import (language, /// registration, ...). Empty for non-TS sources; AC-3/E-AC-3 then synthesize one. descriptors: Vec, + /// Last decode timestamp (90 kHz) authored for this track, so the decode clock stays + /// monotonic across reordered (B-frame) video. Only video uses it. + last_dts: Option, } #[derive(Clone)] @@ -117,6 +120,9 @@ struct PesUnit { is_video: bool, keyframe: bool, timestamp: crate::container::Timestamp, + /// Authored decode timestamp (90 kHz ticks) for a reordered (B-frame) video frame. + /// `Some` only when it differs from the PTS; the PES then carries both PTS and DTS. + dts: Option, /// Explicit PES stream_id (verbatim PES); `None` derives it from `is_video`. stream_id: Option, } @@ -405,6 +411,7 @@ impl Export { pid, kind, descriptors, + last_dts: None, }, ); } @@ -592,6 +599,15 @@ impl Export { } => None, }; + // Author a monotonic decode timeline for reordered video (B-frames). Other kinds + // never reorder, so DTS == PTS and the PES stays PTS-only. + let dts = if is_video { + let pts = to_ts_timestamp(frame.timestamp)?.as_u64(); + author_dts(pts, &mut self.tracks.get_mut(name).context("missing track")?.last_dts) + } else { + None + }; + let mut out = Vec::with_capacity(TsPacket::SIZE); // Refresh PSI at keyframes or after the interval lapses. @@ -626,6 +642,7 @@ impl Export { is_video, keyframe: frame.keyframe, timestamp: frame.timestamp, + dts, stream_id, }; self.write_pes(&mut out, &unit, &es_payload)?; @@ -637,6 +654,11 @@ impl Export { /// Packetize a PES payload into 188-byte TS packets. fn write_pes(&mut self, out: &mut Vec, unit: &PesUnit, payload: &[u8]) -> anyhow::Result<()> { let pts = to_ts_timestamp(unit.timestamp)?; + // A reordered video frame carries DTS (90 kHz ticks) alongside PTS; else PTS-only. + let dts = unit + .dts + .map(|t| TsTimestamp::new(t).map_err(anyhow::Error::msg)) + .transpose()?; let stream_id = match unit.stream_id { Some(id) => StreamId::new(id), None if unit.is_video => StreamId::new(StreamId::VIDEO_MIN), @@ -649,19 +671,25 @@ impl Export { copyright: false, original_or_copy: false, pts: Some(pts), - dts: None, + dts, escr: None, }; + // The optional PES header grows by 5 bytes when it also carries a DTS. + let optional_len = PES_OPTIONAL_LEN + if dts.is_some() { PES_DTS_LEN } else { 0 }; + // `pes_packet_len` counts the optional header plus the payload (not the // 6-byte fixed prefix). Unbounded for video (0); bounded for audio when // it fits a u16. let pes_packet_len = if unit.is_video { 0 } else { - u16::try_from(PES_OPTIONAL_LEN + payload.len()).unwrap_or(0) + u16::try_from(optional_len + payload.len()).unwrap_or(0) }; + // PCR follows the decode clock, so a B-frame stream advertises DTS (not PTS) here. + let pcr = dts.unwrap_or(pts); + let mut offset = 0; let mut first = true; loop { @@ -670,7 +698,7 @@ impl Export { discontinuity_indicator: false, random_access_indicator: unit.keyframe, es_priority_indicator: false, - pcr: if unit.is_pcr { Some(pts.into()) } else { None }, + pcr: if unit.is_pcr { Some(pcr.into()) } else { None }, opcr: None, splice_countdown: None, transport_private_data: Vec::new(), @@ -680,7 +708,7 @@ impl Export { None }; - let header_len = if first { PES_HEADER_LEN } else { 0 }; + let header_len = if first { 6 + optional_len } else { 0 }; let af_len = adaptation.as_ref().map(adaptation_size).unwrap_or(0); let avail = TsBytes::MAX_SIZE - header_len - af_len; let take = avail.min(payload.len() - offset); @@ -781,8 +809,14 @@ impl Export { /// Optional PES header region carrying PTS only: 2 flag bytes + 1 length byte + 5 PTS bytes. const PES_OPTIONAL_LEN: usize = 3 + 5; -/// Full on-wire PES header for the first packet: 6-byte fixed prefix + optional region. -const PES_HEADER_LEN: usize = 6 + PES_OPTIONAL_LEN; +/// Extra bytes when the optional region also carries a DTS (5 DTS bytes). +const PES_DTS_LEN: usize = 5; +/// Decode-clock reserve (90 kHz ticks, 0.25 s): how far before its PTS a video frame is +/// scheduled to decode, leaving room for reordered (B-)frames to slot under their PTS. See +/// [`author_dts`]. This is decode lead time, not presentation latency (frames still show at +/// their PTS), so it is nearly free; it only needs to exceed the largest reorder span +/// (max consecutive B-frames x frame duration), which it does for broadcast contribution. +const DTS_RESERVE: u64 = 90_000 / 4; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -888,9 +922,92 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( } } +/// Author a monotonic decode timestamp (DTS, 90 kHz) for a video frame. +/// +/// [`Frame`] carries only a presentation timestamp (PTS) and frames reach the muxer in +/// decode order (MoQ groups and frames are delivered in decode order), so a B-frame stream +/// arrives with valid but non-monotonic PTS and no decode time. MPEG-TS players need a +/// monotonic DTS to schedule decoding; without it they choke on the out-of-order PTS (the +/// `ffplay -fflags +igndts` workaround). +/// +/// Since decode order is already the delivery order, the only job is to keep DTS strictly +/// increasing. We run a decode clock [`DTS_RESERVE`] ticks behind the PTS and never let it +/// go backwards: a reordered frame whose PTS dips below the clock is nudged one tick past +/// the last DTS instead. The reserve gives the reorder room to land under each frame's own +/// PTS (`DTS <= PTS`) for broadcast B-frame spans. `last` is the previous DTS, updated in +/// place. Returns `None` when the DTS equals the PTS, so the PES stays PTS-only. +fn author_dts(pts: u64, last: &mut Option) -> Option { + let mut dts = pts.saturating_sub(DTS_RESERVE); + if let Some(prev) = *last + && dts <= prev + { + dts = prev + 1; + } + // Wrap into the 33-bit field, matching the PTS in `to_ts_timestamp`. + dts &= (1 << 33) - 1; + *last = Some(dts); + (dts != pts).then_some(dts) +} + #[cfg(test)] mod tests { - use super::is_complete_section; + use super::{DTS_RESERVE, author_dts, is_complete_section}; + + /// Push a decode-order PTS stream (90 kHz) through the decode clock and return the + /// effective DTS per frame (the authored DTS, or the PTS when none is authored). + fn run_clock(pts: &[u64]) -> Vec { + let mut last = None; + pts.iter().map(|&p| author_dts(p, &mut last).unwrap_or(p)).collect() + } + + /// Decode-order PTS for a constant-frame-rate display timeline with `b` B-frames between + /// each pair of reference frames (the common broadcast structure: references pulled ahead + /// of the B-frames they predict). `base` keeps the timeline off zero, like a real feed's + /// initial PTS offset. + fn decode_order(refs: usize, b: usize, dur: u64, base: u64) -> Vec { + let pts = |display: usize| base + display as u64 * dur; + let span = b + 1; + let mut out = vec![pts(0)]; // first reference (keyframe) at display 0 + for g in 1..refs { + let reference = g * span; + out.push(pts(reference)); // reference, decoded before its B-frames + for j in 1..=b { + out.push(pts(reference - span + j)); // the B-frames between the two references + } + } + out + } + + #[test] + fn dts_is_monotonic_and_under_pts() { + // 25 fps, 10 s offset; cover single B-frames and a 3-deep run (kyrion-like). + for b in [1, 3] { + let pts = decode_order(40, b, 3_600, 10_000_000); + let dts = run_clock(&pts); + + // The fixture genuinely reorders (PTS dips in decode order). + assert!(pts.windows(2).any(|w| w[1] < w[0]), "b={b}: stream must reorder PTS"); + // Decode timeline is strictly increasing and never after presentation. + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); + } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "b={b}: DTS {d} after PTS {p} at {i}"); + } + } + } + + #[test] + fn dts_without_reorder_trails_pts_by_the_reserve() { + // A monotonic (no-B) stream: every frame decodes one reserve ahead of its PTS, which + // is decode lead time, not added latency (presentation is still at the PTS). + let pts: Vec = (0..40).map(|i| 10_000_000 + i * 3_600).collect(); + let mut last = None; + for &p in &pts { + let dts = author_dts(p, &mut last).expect("reserve makes DTS differ from PTS"); + assert_eq!(dts, p - DTS_RESERVE, "DTS should trail PTS by exactly the reserve"); + } + } #[test] fn section_validation() { diff --git a/rs/moq-mux/src/container/ts/export_test.rs b/rs/moq-mux/src/container/ts/export_test.rs index d8f16a53b..bcd07fa62 100644 --- a/rs/moq-mux/src/container/ts/export_test.rs +++ b/rs/moq-mux/src/container/ts/export_test.rs @@ -334,6 +334,75 @@ async fn export_avc1_out_of_band_reassembles() { assert_eq!(reassembled.as_slice(), annexb(&[SPS, PPS, &idr]).as_ref()); } +/// A real broadcast contribution feed (Ateme Kyrion, H.264 1080i with ~86 B-frames) +/// must come out of the exporter with an authored decode timeline. Without it players +/// see PTS-only video and choke on the out-of-order decode times (`ffplay +igndts`). +/// Assert the video PES carry a DTS that is strictly increasing in decode order and +/// never after the PTS, and that the reorder was real (some authored DTS, non-monotonic +/// PTS in the source). +#[tokio::test(start_paused = true)] +async fn export_bframe_video_authors_dts() { + let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts"); + + let mut broadcast = moq_net::Broadcast::new().produce(); + let consumer = broadcast.consume(); + let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap(); + let mut import = crate::container::ts::Import::new(broadcast, catalog.clone()); + import.decode(&mut BytesMut::from(&data[..])).unwrap(); + import.finish().unwrap(); + + // `import` and `catalog` stay alive: retained tracks the exporter subscribes to. + let ts = drain(consumer).await; + assert_packet_aligned(&ts); + + // Collect (pts, dts) for the H.264 video PID in transport (decode) order. + let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref())); + let mut video_pid = None; + let mut pts = Vec::new(); + let mut authored = 0usize; + let mut effective = Vec::new(); + while let Some(packet) = reader.read_ts_packet().unwrap() { + match packet.payload { + Some(TsPayload::Pmt(pmt)) => { + if video_pid.is_none() { + video_pid = pmt + .es_info + .iter() + .find(|e| e.stream_type == StreamType::H264) + .map(|e| e.elementary_pid); + } + } + Some(TsPayload::PesStart(pes)) if Some(packet.header.pid) == video_pid => { + let p = pes.header.pts.expect("video PES carried no PTS").as_u64(); + let d = pes.header.dts.map(|t| t.as_u64()); + if d.is_some() { + authored += 1; + } + effective.push(d.unwrap_or(p)); + pts.push(p); + } + _ => {} + } + } + + assert!(video_pid.is_some(), "missing H.264 video PMT entry"); + assert!(pts.len() > 50, "expected the full feed, got {} frames", pts.len()); + // The source is genuinely reordered: PTS dips in decode order (B-frames). + assert!( + pts.windows(2).any(|w| w[1] < w[0]), + "fixture must carry reordered B-frames" + ); + // The exporter authored a decode timeline (the decode clock trails the PTS). + assert!(authored > 0, "no DTS authored for a B-frame stream"); + // That timeline is strictly increasing and never after presentation. + for (i, win) in effective.windows(2).enumerate() { + assert!(win[1] > win[0], "DTS not strictly increasing at frame {i}: {win:?}"); + } + for (i, (&d, &p)) in effective.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "DTS {d} after PTS {p} at frame {i}"); + } +} + /// Full SCTE-35 round-trip: import `bbb.ts` (real H.264 + AAC) into a broadcast /// that also carries a `.scte35` cue track, export to TS, re-import, and assert /// the splice_info_section came back byte-for-byte. The PMT must advertise the From e59ec42e42569683d83f7f32b12cd8c0846cc17a Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 20 Jun 2026 18:54:53 -0700 Subject: [PATCH 2/6] fix(moq-mux): run the DTS clock in continuous ticks; reserve 1s Run the decode clock in the continuous (unwrapped) 90 kHz tick domain that the source timestamps already use, and wrap into the 33-bit wire field only at emission. Previously the clock ran in wrapped 33-bit space, so a 24/7 feed broke at the ~26.5 h wrap: PTS dropped to ~0, the monotonic bump kept DTS high, and DTS > PTS. The clock now never wraps mid-stream, so DTS <= PTS holds across the boundary; PTS and DTS still wrap together on the wire. Also bump DTS_RESERVE from 0.25 s to 1 s so the reserve covers any broadcast B-frame structure. It is decode lead time, not presentation latency. Adds a wrap-boundary unit test. Co-Authored-By: Claude Opus 4.8 --- rs/moq-mux/src/container/ts/export.rs | 67 ++++++++++++++++++++------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 361c8045b..648ed7997 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -120,8 +120,9 @@ struct PesUnit { is_video: bool, keyframe: bool, timestamp: crate::container::Timestamp, - /// Authored decode timestamp (90 kHz ticks) for a reordered (B-frame) video frame. - /// `Some` only when it differs from the PTS; the PES then carries both PTS and DTS. + /// Authored decode timestamp for a reordered (B-frame) video frame, in continuous + /// (unwrapped) 90 kHz ticks (wrapped to the wire field in `write_pes`). `Some` only when + /// it differs from the PTS; the PES then carries both PTS and DTS. dts: Option, /// Explicit PES stream_id (verbatim PES); `None` derives it from `is_video`. stream_id: Option, @@ -602,7 +603,7 @@ impl Export { // Author a monotonic decode timeline for reordered video (B-frames). Other kinds // never reorder, so DTS == PTS and the PES stays PTS-only. let dts = if is_video { - let pts = to_ts_timestamp(frame.timestamp)?.as_u64(); + let pts = to_ticks(frame.timestamp); author_dts(pts, &mut self.tracks.get_mut(name).context("missing track")?.last_dts) } else { None @@ -654,10 +655,11 @@ impl Export { /// Packetize a PES payload into 188-byte TS packets. fn write_pes(&mut self, out: &mut Vec, unit: &PesUnit, payload: &[u8]) -> anyhow::Result<()> { let pts = to_ts_timestamp(unit.timestamp)?; - // A reordered video frame carries DTS (90 kHz ticks) alongside PTS; else PTS-only. + // A reordered video frame carries DTS alongside PTS; else PTS-only. The decode clock + // is continuous ticks, so wrap into the 33-bit wire field here, like the PTS. let dts = unit .dts - .map(|t| TsTimestamp::new(t).map_err(anyhow::Error::msg)) + .map(|t| TsTimestamp::new(t & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg)) .transpose()?; let stream_id = match unit.stream_id { Some(id) => StreamId::new(id), @@ -811,12 +813,12 @@ impl Export { const PES_OPTIONAL_LEN: usize = 3 + 5; /// Extra bytes when the optional region also carries a DTS (5 DTS bytes). const PES_DTS_LEN: usize = 5; -/// Decode-clock reserve (90 kHz ticks, 0.25 s): how far before its PTS a video frame is +/// Decode-clock reserve (90 kHz ticks, 1 s): how far before its PTS a video frame is /// scheduled to decode, leaving room for reordered (B-)frames to slot under their PTS. See /// [`author_dts`]. This is decode lead time, not presentation latency (frames still show at /// their PTS), so it is nearly free; it only needs to exceed the largest reorder span -/// (max consecutive B-frames x frame duration), which it does for broadcast contribution. -const DTS_RESERVE: u64 = 90_000 / 4; +/// (max consecutive B-frames x frame duration), and 1 s covers any broadcast structure. +const DTS_RESERVE: u64 = 90_000; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -828,11 +830,19 @@ fn adaptation_size(af: &AdaptationField) -> usize { 2 + if af.pcr.is_some() { 6 } else { 0 } } +/// The 33-bit wire timestamp field (90 kHz). DTS and PTS both wrap into it. +const TS_TIMESTAMP_MASK: u64 = (1 << 33) - 1; + +/// Continuous (unwrapped) 90 kHz tick count for a media timestamp. The decode clock runs in +/// this domain so it never wraps mid-stream (the source timestamps are already unwrapped); +/// [`to_ts_timestamp`] masks to the 33-bit wire field only at emission. +fn to_ticks(timestamp: crate::container::Timestamp) -> u64 { + (timestamp.as_micros() * 90_000 / 1_000_000) as u64 +} + fn to_ts_timestamp(timestamp: crate::container::Timestamp) -> anyhow::Result { - // micros -> 90 kHz, wrapped into the 33-bit field. - let micros = timestamp.as_micros(); - let ticks = (micros * 90_000 / 1_000_000) as u64 & ((1 << 33) - 1); - TsTimestamp::new(ticks).map_err(anyhow::Error::msg) + // Continuous 90 kHz ticks, wrapped into the 33-bit field. + TsTimestamp::new(to_ticks(timestamp) & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg) } fn video_kind(config: &VideoConfig, name: &str) -> anyhow::Result { @@ -922,7 +932,7 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( } } -/// Author a monotonic decode timestamp (DTS, 90 kHz) for a video frame. +/// Author a monotonic decode timestamp (DTS) for a video frame, in continuous 90 kHz ticks. /// /// [`Frame`] carries only a presentation timestamp (PTS) and frames reach the muxer in /// decode order (MoQ groups and frames are delivered in decode order), so a B-frame stream @@ -934,8 +944,12 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( /// increasing. We run a decode clock [`DTS_RESERVE`] ticks behind the PTS and never let it /// go backwards: a reordered frame whose PTS dips below the clock is nudged one tick past /// the last DTS instead. The reserve gives the reorder room to land under each frame's own -/// PTS (`DTS <= PTS`) for broadcast B-frame spans. `last` is the previous DTS, updated in -/// place. Returns `None` when the DTS equals the PTS, so the PES stays PTS-only. +/// PTS (`DTS <= PTS`) for broadcast B-frame spans. +/// +/// `pts` and `last` are continuous (unwrapped) ticks, so the clock never wraps mid-stream; +/// the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is the previous +/// DTS, updated in place. Returns `None` when the DTS equals the PTS, so the PES stays +/// PTS-only. fn author_dts(pts: u64, last: &mut Option) -> Option { let mut dts = pts.saturating_sub(DTS_RESERVE); if let Some(prev) = *last @@ -943,8 +957,6 @@ fn author_dts(pts: u64, last: &mut Option) -> Option { { dts = prev + 1; } - // Wrap into the 33-bit field, matching the PTS in `to_ts_timestamp`. - dts &= (1 << 33) - 1; *last = Some(dts); (dts != pts).then_some(dts) } @@ -997,6 +1009,27 @@ mod tests { } } + #[test] + fn dts_clock_survives_33bit_wrap() { + // The decode clock runs in continuous ticks, so it must stay monotonic and under PTS + // even as the source timeline crosses the 33-bit wire boundary (~26.5 h). The wrap is + // applied only at emission, so here the authored DTS keeps climbing past 1 << 33. + let wrap = 1u64 << 33; + let pts = decode_order(40, 3, 3_600, wrap - 20 * 3_600); + let dts = run_clock(&pts); + + assert!(pts.iter().any(|&p| p >= wrap), "test must cross the wrap boundary"); + for (i, win) in dts.windows(2).enumerate() { + assert!( + win[1] > win[0], + "DTS not strictly increasing across wrap at {i}: {win:?}" + ); + } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "DTS {d} after PTS {p} across wrap at {i}"); + } + } + #[test] fn dts_without_reorder_trails_pts_by_the_reserve() { // A monotonic (no-B) stream: every frame decodes one reserve ahead of its PTS, which From 41fad1843ba5b4f40b16c78d61376a677fceede3 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 20 Jun 2026 19:19:13 -0700 Subject: [PATCH 3/6] fix(moq-mux): size the DTS reserve in frames, not wall-clock Replace the fixed 1s DTS_RESERVE with a frame-based reserve so the advertised decode buffer scales with the frame rate instead of a flat wall-clock slab. DtsClock learns the frame duration from the smallest PTS gap (adjacent frames are one frame apart) and runs the decode clock RESERVE_FRAMES frame-durations behind the PTS. The reserve must exceed the stream's reorder depth, which open-GOP / B-pyramid structures push past the raw consecutive-B count: the kyrion 1080i contribution feed reorders 5 frames deep with only 3 consecutive B-frames. RESERVE_FRAMES = 8 covers that plus typical pyramids; a deeper stream still gets a monotonic DTS (the +igndts fix), just without a guaranteed DTS <= PTS. The reserve is decode lead time, not presentation latency (frames still show at their PTS). Co-Authored-By: Claude Opus 4.8 --- rs/moq-mux/src/container/ts/export.rs | 123 +++++++++++++++++--------- 1 file changed, 82 insertions(+), 41 deletions(-) diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 648ed7997..5e592d8e0 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -74,9 +74,9 @@ struct Track { /// PMT ES-level descriptors to re-announce, captured verbatim on import (language, /// registration, ...). Empty for non-TS sources; AC-3/E-AC-3 then synthesize one. descriptors: Vec, - /// Last decode timestamp (90 kHz) authored for this track, so the decode clock stays - /// monotonic across reordered (B-frame) video. Only video uses it. - last_dts: Option, + /// Decode-clock state for reordered (B-frame) video, authoring a monotonic DTS. Only + /// video drives it; audio and verbatim streams leave it untouched. + dts_clock: DtsClock, } #[derive(Clone)] @@ -412,7 +412,7 @@ impl Export { pid, kind, descriptors, - last_dts: None, + dts_clock: DtsClock::default(), }, ); } @@ -604,7 +604,11 @@ impl Export { // never reorder, so DTS == PTS and the PES stays PTS-only. let dts = if is_video { let pts = to_ticks(frame.timestamp); - author_dts(pts, &mut self.tracks.get_mut(name).context("missing track")?.last_dts) + self.tracks + .get_mut(name) + .context("missing track")? + .dts_clock + .author(pts) } else { None }; @@ -813,12 +817,15 @@ impl Export { const PES_OPTIONAL_LEN: usize = 3 + 5; /// Extra bytes when the optional region also carries a DTS (5 DTS bytes). const PES_DTS_LEN: usize = 5; -/// Decode-clock reserve (90 kHz ticks, 1 s): how far before its PTS a video frame is -/// scheduled to decode, leaving room for reordered (B-)frames to slot under their PTS. See -/// [`author_dts`]. This is decode lead time, not presentation latency (frames still show at -/// their PTS), so it is nearly free; it only needs to exceed the largest reorder span -/// (max consecutive B-frames x frame duration), and 1 s covers any broadcast structure. -const DTS_RESERVE: u64 = 90_000; +/// Decode-clock lead, in frames: how far ahead of its PTS each video frame is scheduled to +/// decode. It must exceed the stream's reorder depth (how many frames a reference is decoded +/// ahead of the B-frames that depend on it), which open-GOP / B-pyramid structures push past +/// the raw consecutive-B count: a real 1080i contribution feed (kyrion) reorders 5 frames +/// deep with only 3 consecutive B-frames. 8 covers that plus typical pyramids; a deeper +/// stream still gets a monotonic DTS (the `+igndts` fix), just not guaranteed `DTS <= PTS`. +/// Sizing the reserve in frames (via the measured frame duration) keeps the advertised decode +/// buffer to a few frames instead of a fixed wall-clock slab. See [`DtsClock`]. +const RESERVE_FRAMES: u64 = 8; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -932,7 +939,7 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( } } -/// Author a monotonic decode timestamp (DTS) for a video frame, in continuous 90 kHz ticks. +/// Authors a monotonic decode timestamp (DTS) for a reordered (B-frame) video track. /// /// [`Frame`] carries only a presentation timestamp (PTS) and frames reach the muxer in /// decode order (MoQ groups and frames are delivered in decode order), so a B-frame stream @@ -941,35 +948,58 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( /// `ffplay -fflags +igndts` workaround). /// /// Since decode order is already the delivery order, the only job is to keep DTS strictly -/// increasing. We run a decode clock [`DTS_RESERVE`] ticks behind the PTS and never let it -/// go backwards: a reordered frame whose PTS dips below the clock is nudged one tick past -/// the last DTS instead. The reserve gives the reorder room to land under each frame's own -/// PTS (`DTS <= PTS`) for broadcast B-frame spans. +/// increasing. The clock runs [`RESERVE_FRAMES`] frame-durations behind the PTS and never +/// goes backwards: a reordered frame whose PTS dips below the clock is nudged one tick past +/// the last DTS. The reserve gives the reorder room to land under each frame's own PTS +/// (`DTS <= PTS`); sizing it in frames (from the measured duration) keeps the advertised +/// decode buffer to a few frames rather than a fixed wall-clock slab. /// -/// `pts` and `last` are continuous (unwrapped) ticks, so the clock never wraps mid-stream; -/// the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is the previous -/// DTS, updated in place. Returns `None` when the DTS equals the PTS, so the PES stays -/// PTS-only. -fn author_dts(pts: u64, last: &mut Option) -> Option { - let mut dts = pts.saturating_sub(DTS_RESERVE); - if let Some(prev) = *last - && dts <= prev - { - dts = prev + 1; +/// All values are continuous (unwrapped) 90 kHz ticks, so the clock never wraps mid-stream; +/// the 33-bit wire wrap happens once at emission in [`write_pes`]. +#[derive(Default)] +struct DtsClock { + /// Previous PTS, to measure the frame duration from successive gaps. + last_pts: Option, + /// Smallest positive PTS gap seen = one frame duration (adjacent frames differ by one). + frame_dur: Option, + /// Previous authored DTS, keeping the clock strictly increasing. + last_dts: Option, +} + +impl DtsClock { + /// Author the DTS for the next frame (PTS in continuous ticks, decode order). Returns + /// `None` when the DTS equals the PTS, so the PES stays PTS-only. + fn author(&mut self, pts: u64) -> Option { + // Learn the frame duration from the smallest positive gap between successive PTS. + if let Some(prev) = self.last_pts { + let gap = pts.abs_diff(prev); + if gap > 0 { + self.frame_dur = Some(self.frame_dur.map_or(gap, |dur| dur.min(gap))); + } + } + self.last_pts = Some(pts); + + let reserve = self.frame_dur.map_or(0, |dur| RESERVE_FRAMES * dur); + let mut dts = pts.saturating_sub(reserve); + if let Some(prev) = self.last_dts + && dts <= prev + { + dts = prev + 1; + } + self.last_dts = Some(dts); + (dts != pts).then_some(dts) } - *last = Some(dts); - (dts != pts).then_some(dts) } #[cfg(test)] mod tests { - use super::{DTS_RESERVE, author_dts, is_complete_section}; + use super::{DtsClock, RESERVE_FRAMES, is_complete_section}; /// Push a decode-order PTS stream (90 kHz) through the decode clock and return the /// effective DTS per frame (the authored DTS, or the PTS when none is authored). fn run_clock(pts: &[u64]) -> Vec { - let mut last = None; - pts.iter().map(|&p| author_dts(p, &mut last).unwrap_or(p)).collect() + let mut clock = DtsClock::default(); + pts.iter().map(|&p| clock.author(p).unwrap_or(p)).collect() } /// Decode-order PTS for a constant-frame-rate display timeline with `b` B-frames between @@ -992,8 +1022,9 @@ mod tests { #[test] fn dts_is_monotonic_and_under_pts() { - // 25 fps, 10 s offset; cover single B-frames and a 3-deep run (kyrion-like). - for b in [1, 3] { + // 25 fps, 10 s offset; cover up to RESERVE_FRAMES - 1 consecutive B-frames (the most + // the reserve is sized for), which spans broadcast contribution (kyrion is 3-deep). + for b in 1..RESERVE_FRAMES as usize { let pts = decode_order(40, b, 3_600, 10_000_000); let dts = run_clock(&pts); @@ -1031,15 +1062,25 @@ mod tests { } #[test] - fn dts_without_reorder_trails_pts_by_the_reserve() { - // A monotonic (no-B) stream: every frame decodes one reserve ahead of its PTS, which - // is decode lead time, not added latency (presentation is still at the PTS). - let pts: Vec = (0..40).map(|i| 10_000_000 + i * 3_600).collect(); - let mut last = None; - for &p in &pts { - let dts = author_dts(p, &mut last).expect("reserve makes DTS differ from PTS"); - assert_eq!(dts, p - DTS_RESERVE, "DTS should trail PTS by exactly the reserve"); + fn dts_without_reorder_settles_to_frame_based_reserve() { + // A monotonic (no-B) stream stays strictly increasing and under PTS, and once the + // frame duration is learned the clock settles to trailing the PTS by RESERVE_FRAMES + // frames (decode lead time, not added latency: presentation is still at the PTS). + let dur = 3_600u64; + let pts: Vec = (0..40).map(|i| 10_000_000 + i * dur).collect(); + let dts = run_clock(&pts); + + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "DTS not strictly increasing at {i}: {win:?}"); } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "DTS {d} after PTS {p} at {i}"); + } + assert_eq!( + pts.last().unwrap() - dts.last().unwrap(), + RESERVE_FRAMES * dur, + "settled decode lead should be RESERVE_FRAMES frames" + ); } #[test] From 0103929b64d4508a22ea670984d2344b0aec5983 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 20 Jun 2026 19:25:02 -0700 Subject: [PATCH 4/6] fix(moq-mux): bump DTS reserve to 16 frames for deep-pyramid headroom Co-Authored-By: Claude Opus 4.8 --- rs/moq-mux/src/container/ts/export.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 5e592d8e0..9db08753f 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -821,11 +821,12 @@ const PES_DTS_LEN: usize = 5; /// decode. It must exceed the stream's reorder depth (how many frames a reference is decoded /// ahead of the B-frames that depend on it), which open-GOP / B-pyramid structures push past /// the raw consecutive-B count: a real 1080i contribution feed (kyrion) reorders 5 frames -/// deep with only 3 consecutive B-frames. 8 covers that plus typical pyramids; a deeper -/// stream still gets a monotonic DTS (the `+igndts` fix), just not guaranteed `DTS <= PTS`. -/// Sizing the reserve in frames (via the measured frame duration) keeps the advertised decode -/// buffer to a few frames instead of a fixed wall-clock slab. See [`DtsClock`]. -const RESERVE_FRAMES: u64 = 8; +/// deep with only 3 consecutive B-frames. 16 covers that with wide headroom for deep pyramids; +/// a deeper stream still gets a monotonic DTS (the `+igndts` fix), just not guaranteed +/// `DTS <= PTS`. This is decode lead time, not presentation latency (frames still show at their +/// PTS), and sizing it in frames (via the measured frame duration) keeps the advertised decode +/// buffer proportional to the frame rate rather than a fixed wall-clock slab. See [`DtsClock`]. +const RESERVE_FRAMES: u64 = 16; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) From b0bc241477742e69cd3772372657692827a237e0 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 20 Jun 2026 19:34:13 -0700 Subject: [PATCH 5/6] fix(moq-mux): use a fixed 16-tick DTS reserve (monotonic, near-zero lead) Revert the frame-scaled reserve to a fixed 16-tick (90 kHz) DTS_RESERVE: just a strict-monotonic nudge so the decode timeline stays strictly increasing (the +igndts fix) with negligible decode lead. Drops the frame-duration tracking. This trades the DTS <= PTS guarantee: at 16 ticks the reserve cannot cover the reorder span, so a B-frame's DTS sits above its own PTS. Monotonic DTS is what players need to schedule decoding; exact DTS <= PTS is the faithful wire-DTS follow-up. Tests assert monotonicity accordingly. Co-Authored-By: Claude Opus 4.8 --- rs/moq-mux/src/container/ts/export.rs | 135 +++++++-------------- rs/moq-mux/src/container/ts/export_test.rs | 10 +- 2 files changed, 50 insertions(+), 95 deletions(-) diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 9db08753f..73e281b4d 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -74,9 +74,9 @@ struct Track { /// PMT ES-level descriptors to re-announce, captured verbatim on import (language, /// registration, ...). Empty for non-TS sources; AC-3/E-AC-3 then synthesize one. descriptors: Vec, - /// Decode-clock state for reordered (B-frame) video, authoring a monotonic DTS. Only - /// video drives it; audio and verbatim streams leave it untouched. - dts_clock: DtsClock, + /// Last decode timestamp (continuous 90 kHz ticks) authored for this track, keeping the + /// decode clock monotonic across reordered (B-frame) video. Only video uses it. + last_dts: Option, } #[derive(Clone)] @@ -412,7 +412,7 @@ impl Export { pid, kind, descriptors, - dts_clock: DtsClock::default(), + last_dts: None, }, ); } @@ -604,11 +604,7 @@ impl Export { // never reorder, so DTS == PTS and the PES stays PTS-only. let dts = if is_video { let pts = to_ticks(frame.timestamp); - self.tracks - .get_mut(name) - .context("missing track")? - .dts_clock - .author(pts) + author_dts(pts, &mut self.tracks.get_mut(name).context("missing track")?.last_dts) } else { None }; @@ -817,16 +813,14 @@ impl Export { const PES_OPTIONAL_LEN: usize = 3 + 5; /// Extra bytes when the optional region also carries a DTS (5 DTS bytes). const PES_DTS_LEN: usize = 5; -/// Decode-clock lead, in frames: how far ahead of its PTS each video frame is scheduled to -/// decode. It must exceed the stream's reorder depth (how many frames a reference is decoded -/// ahead of the B-frames that depend on it), which open-GOP / B-pyramid structures push past -/// the raw consecutive-B count: a real 1080i contribution feed (kyrion) reorders 5 frames -/// deep with only 3 consecutive B-frames. 16 covers that with wide headroom for deep pyramids; -/// a deeper stream still gets a monotonic DTS (the `+igndts` fix), just not guaranteed -/// `DTS <= PTS`. This is decode lead time, not presentation latency (frames still show at their -/// PTS), and sizing it in frames (via the measured frame duration) keeps the advertised decode -/// buffer proportional to the frame rate rather than a fixed wall-clock slab. See [`DtsClock`]. -const RESERVE_FRAMES: u64 = 16; +/// Decode-clock reserve in 90 kHz ticks: each video frame is scheduled to decode this far +/// before its PTS. At 16 ticks (~0.18 ms) this is effectively just a strict-monotonic nudge: +/// it keeps DTS strictly increasing across reordered (B-frame) decode order, which is what +/// players need to schedule decoding (the `ffplay -fflags +igndts` fix). It does NOT keep +/// `DTS <= PTS` for B-frames, since covering the reorder span would need a frame-scale reserve +/// (and the matching decode lead); exact `DTS <= PTS` is the faithful wire-DTS follow-up. +/// See [`author_dts`]. +const DTS_RESERVE: u64 = 16; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -940,7 +934,7 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( } } -/// Authors a monotonic decode timestamp (DTS) for a reordered (B-frame) video track. +/// Author a monotonic decode timestamp (DTS) for a reordered (B-frame) video frame. /// /// [`Frame`] carries only a presentation timestamp (PTS) and frames reach the muxer in /// decode order (MoQ groups and frames are delivered in decode order), so a B-frame stream @@ -949,58 +943,35 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( /// `ffplay -fflags +igndts` workaround). /// /// Since decode order is already the delivery order, the only job is to keep DTS strictly -/// increasing. The clock runs [`RESERVE_FRAMES`] frame-durations behind the PTS and never -/// goes backwards: a reordered frame whose PTS dips below the clock is nudged one tick past -/// the last DTS. The reserve gives the reorder room to land under each frame's own PTS -/// (`DTS <= PTS`); sizing it in frames (from the measured duration) keeps the advertised -/// decode buffer to a few frames rather than a fixed wall-clock slab. +/// increasing. The clock runs [`DTS_RESERVE`] ticks behind the PTS and never goes backwards: +/// a reordered frame whose PTS dips below the clock is nudged one tick past the last DTS. With +/// the small reserve this keeps DTS monotonic but lets it sit above a B-frame's own PTS; a +/// frame-scale reserve (or the faithful wire DTS) would be needed for `DTS <= PTS`. /// -/// All values are continuous (unwrapped) 90 kHz ticks, so the clock never wraps mid-stream; -/// the 33-bit wire wrap happens once at emission in [`write_pes`]. -#[derive(Default)] -struct DtsClock { - /// Previous PTS, to measure the frame duration from successive gaps. - last_pts: Option, - /// Smallest positive PTS gap seen = one frame duration (adjacent frames differ by one). - frame_dur: Option, - /// Previous authored DTS, keeping the clock strictly increasing. - last_dts: Option, -} - -impl DtsClock { - /// Author the DTS for the next frame (PTS in continuous ticks, decode order). Returns - /// `None` when the DTS equals the PTS, so the PES stays PTS-only. - fn author(&mut self, pts: u64) -> Option { - // Learn the frame duration from the smallest positive gap between successive PTS. - if let Some(prev) = self.last_pts { - let gap = pts.abs_diff(prev); - if gap > 0 { - self.frame_dur = Some(self.frame_dur.map_or(gap, |dur| dur.min(gap))); - } - } - self.last_pts = Some(pts); - - let reserve = self.frame_dur.map_or(0, |dur| RESERVE_FRAMES * dur); - let mut dts = pts.saturating_sub(reserve); - if let Some(prev) = self.last_dts - && dts <= prev - { - dts = prev + 1; - } - self.last_dts = Some(dts); - (dts != pts).then_some(dts) +/// `pts` and `last` are continuous (unwrapped) 90 kHz ticks, so the clock never wraps +/// mid-stream; the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is the +/// previous DTS, updated in place. Returns `None` when the DTS equals the PTS (PES stays +/// PTS-only). +fn author_dts(pts: u64, last: &mut Option) -> Option { + let mut dts = pts.saturating_sub(DTS_RESERVE); + if let Some(prev) = *last + && dts <= prev + { + dts = prev + 1; } + *last = Some(dts); + (dts != pts).then_some(dts) } #[cfg(test)] mod tests { - use super::{DtsClock, RESERVE_FRAMES, is_complete_section}; + use super::{DTS_RESERVE, author_dts, is_complete_section}; /// Push a decode-order PTS stream (90 kHz) through the decode clock and return the /// effective DTS per frame (the authored DTS, or the PTS when none is authored). fn run_clock(pts: &[u64]) -> Vec { - let mut clock = DtsClock::default(); - pts.iter().map(|&p| clock.author(p).unwrap_or(p)).collect() + let mut last = None; + pts.iter().map(|&p| author_dts(p, &mut last).unwrap_or(p)).collect() } /// Decode-order PTS for a constant-frame-rate display timeline with `b` B-frames between @@ -1022,30 +993,27 @@ mod tests { } #[test] - fn dts_is_monotonic_and_under_pts() { - // 25 fps, 10 s offset; cover up to RESERVE_FRAMES - 1 consecutive B-frames (the most - // the reserve is sized for), which spans broadcast contribution (kyrion is 3-deep). - for b in 1..RESERVE_FRAMES as usize { + fn dts_is_monotonic_across_reorder() { + // 25 fps, 10 s offset; a few B-frame depths. The small reserve guarantees a strictly + // increasing decode timeline (the `+igndts` fix); it does not keep DTS <= PTS. + for b in [1, 3, 5] { let pts = decode_order(40, b, 3_600, 10_000_000); let dts = run_clock(&pts); // The fixture genuinely reorders (PTS dips in decode order). assert!(pts.windows(2).any(|w| w[1] < w[0]), "b={b}: stream must reorder PTS"); - // Decode timeline is strictly increasing and never after presentation. + // The decode timeline is strictly increasing. for (i, win) in dts.windows(2).enumerate() { assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); } - for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { - assert!(d <= p, "b={b}: DTS {d} after PTS {p} at {i}"); - } } } #[test] fn dts_clock_survives_33bit_wrap() { - // The decode clock runs in continuous ticks, so it must stay monotonic and under PTS - // even as the source timeline crosses the 33-bit wire boundary (~26.5 h). The wrap is - // applied only at emission, so here the authored DTS keeps climbing past 1 << 33. + // The decode clock runs in continuous ticks, so it stays strictly increasing even as + // the source timeline crosses the 33-bit wire boundary (~26.5 h). The wrap is applied + // only at emission, so here the authored DTS keeps climbing past 1 << 33. let wrap = 1u64 << 33; let pts = decode_order(40, 3, 3_600, wrap - 20 * 3_600); let dts = run_clock(&pts); @@ -1057,31 +1025,22 @@ mod tests { "DTS not strictly increasing across wrap at {i}: {win:?}" ); } - for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { - assert!(d <= p, "DTS {d} after PTS {p} across wrap at {i}"); - } } #[test] - fn dts_without_reorder_settles_to_frame_based_reserve() { - // A monotonic (no-B) stream stays strictly increasing and under PTS, and once the - // frame duration is learned the clock settles to trailing the PTS by RESERVE_FRAMES - // frames (decode lead time, not added latency: presentation is still at the PTS). - let dur = 3_600u64; - let pts: Vec = (0..40).map(|i| 10_000_000 + i * dur).collect(); + fn dts_without_reorder_trails_pts_by_the_reserve() { + // A monotonic (no-B) stream stays strictly increasing and one reserve under its PTS, + // which at 16 ticks is a negligible decode nudge (and not added latency: presentation + // is still at the PTS). + let pts: Vec = (0..40).map(|i| 10_000_000 + i * 3_600).collect(); let dts = run_clock(&pts); for (i, win) in dts.windows(2).enumerate() { assert!(win[1] > win[0], "DTS not strictly increasing at {i}: {win:?}"); } for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { - assert!(d <= p, "DTS {d} after PTS {p} at {i}"); + assert_eq!(d, p - DTS_RESERVE, "DTS should trail PTS by exactly the reserve at {i}"); } - assert_eq!( - pts.last().unwrap() - dts.last().unwrap(), - RESERVE_FRAMES * dur, - "settled decode lead should be RESERVE_FRAMES frames" - ); } #[test] diff --git a/rs/moq-mux/src/container/ts/export_test.rs b/rs/moq-mux/src/container/ts/export_test.rs index bcd07fa62..63f4f087e 100644 --- a/rs/moq-mux/src/container/ts/export_test.rs +++ b/rs/moq-mux/src/container/ts/export_test.rs @@ -337,9 +337,8 @@ async fn export_avc1_out_of_band_reassembles() { /// A real broadcast contribution feed (Ateme Kyrion, H.264 1080i with ~86 B-frames) /// must come out of the exporter with an authored decode timeline. Without it players /// see PTS-only video and choke on the out-of-order decode times (`ffplay +igndts`). -/// Assert the video PES carry a DTS that is strictly increasing in decode order and -/// never after the PTS, and that the reorder was real (some authored DTS, non-monotonic -/// PTS in the source). +/// Assert the video PES carry a strictly increasing DTS in decode order (the fix), and +/// that the reorder was real (some authored DTS, non-monotonic PTS in the source). #[tokio::test(start_paused = true)] async fn export_bframe_video_authors_dts() { let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts"); @@ -394,13 +393,10 @@ async fn export_bframe_video_authors_dts() { ); // The exporter authored a decode timeline (the decode clock trails the PTS). assert!(authored > 0, "no DTS authored for a B-frame stream"); - // That timeline is strictly increasing and never after presentation. + // That timeline is strictly increasing, which is what removes the `+igndts` requirement. for (i, win) in effective.windows(2).enumerate() { assert!(win[1] > win[0], "DTS not strictly increasing at frame {i}: {win:?}"); } - for (i, (&d, &p)) in effective.iter().zip(pts.iter()).enumerate() { - assert!(d <= p, "DTS {d} after PTS {p} at frame {i}"); - } } /// Full SCTE-35 round-trip: import `bbb.ts` (real H.264 + AAC) into a broadcast From 6f0d63d30c95325f1c046247d84ad382ba61fc6d Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 21 Jun 2026 13:23:25 -0700 Subject: [PATCH 6/6] feat(moq-mux): carry B-frame reorder depth as catalog jitter (exact DTS reserve) (#1857) Co-authored-by: Claude Opus 4.8 --- rs/moq-mux/src/codec/av1/import.rs | 8 +- rs/moq-mux/src/codec/h264/import.rs | 32 ++++++- rs/moq-mux/src/codec/h265/import.rs | 33 ++++++- rs/moq-mux/src/codec/vp8/import.rs | 8 +- rs/moq-mux/src/codec/vp9/import.rs | 8 +- rs/moq-mux/src/container/jitter.rs | 71 ++++++++++---- rs/moq-mux/src/container/ts/export.rs | 103 ++++++++++++++------- rs/moq-mux/src/container/ts/export_test.rs | 14 ++- rs/moq-mux/src/container/ts/import.rs | 35 ++++++- rs/moq-mux/src/container/ts/import_test.rs | 18 ++++ 10 files changed, 251 insertions(+), 79 deletions(-) diff --git a/rs/moq-mux/src/codec/av1/import.rs b/rs/moq-mux/src/codec/av1/import.rs index 0df8ef832..8ea9f9fbb 100644 --- a/rs/moq-mux/src/codec/av1/import.rs +++ b/rs/moq-mux/src/codec/av1/import.rs @@ -1,4 +1,4 @@ -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use anyhow::Context; use bytes::BytesMut; @@ -26,7 +26,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } #[derive(Default)] @@ -45,7 +45,7 @@ impl Import { config: None, current: Default::default(), zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -57,7 +57,7 @@ impl Import { config: None, current: Default::default(), zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/codec/h264/import.rs b/rs/moq-mux/src/codec/h264/import.rs index ffffcfe57..42a39c167 100644 --- a/rs/moq-mux/src/codec/h264/import.rs +++ b/rs/moq-mux/src/codec/h264/import.rs @@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use super::Sps; use crate::catalog::hang::CatalogExt; use crate::codec::annexb::{NalIterator, START_CODE}; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; /// The wire shape an [`Import`] is processing. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -37,7 +37,7 @@ pub struct Import { config: Option, state: State, zero: Option, - jitter: MinFrameDuration, + jitter: Jitter, } enum State { @@ -78,7 +78,7 @@ impl Import { config: None, state: State::Pending { mode_hint: None }, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -90,7 +90,7 @@ impl Import { config: None, state: State::Pending { mode_hint: None }, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -258,6 +258,22 @@ impl Import { } } + /// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the + /// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container + /// supplies this since the elementary stream alone carries no decode time. No-op until the + /// track exists. + pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) { + let Some(jitter) = self.jitter.observe_reorder(reorder) else { + return; + }; + let Some(track) = self.track.as_ref() else { + return; + }; + if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) { + c.jitter = Some(jitter); + } + } + fn decode_avc1>( &mut self, buf: &mut T, @@ -407,7 +423,13 @@ impl Import { // The avc3 track was created eagerly in initialize_avc3; just publish // (or republish) the catalog rendition with the latest config. let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone(); - self.catalog.lock().video.renditions.insert(track_name, config.clone()); + // Seed jitter from whatever has accumulated: a dirty start feeds frames before + // this first rendition exists, so those per-frame updates would otherwise be + // lost. Keep the cached `config` jitter-free so a later jitter change is not + // mistaken for a codec reconfiguration. + let mut published = config.clone(); + published.jitter = self.jitter.current(); + self.catalog.lock().video.renditions.insert(track_name, published); self.config = Some(config); Ok(reconfigured) } diff --git a/rs/moq-mux/src/codec/h265/import.rs b/rs/moq-mux/src/codec/h265/import.rs index dac453bdf..981fd8f89 100644 --- a/rs/moq-mux/src/codec/h265/import.rs +++ b/rs/moq-mux/src/codec/h265/import.rs @@ -1,6 +1,6 @@ use crate::catalog::hang::CatalogExt; use crate::codec::annexb::{NalIterator, START_CODE}; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; @@ -37,7 +37,7 @@ pub struct Import { pps: Vec, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -52,7 +52,7 @@ impl Import { vps: Vec::new(), sps: Vec::new(), pps: Vec::new(), - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -67,7 +67,7 @@ impl Import { vps: Vec::new(), sps: Vec::new(), pps: Vec::new(), - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -101,6 +101,11 @@ impl Import { } let reconfigured = self.config.is_some(); + // Seed jitter from whatever has accumulated: a dirty start feeds frames before this + // first rendition exists, so those per-frame updates would otherwise be lost. The + // cached `config` stays jitter-free so a later jitter change is not mistaken for a + // codec reconfiguration. + let jitter = self.jitter.current(); let mut catalog = self.catalog.lock(); if self.track.is_some() && self.tracks.is_fixed() { @@ -114,7 +119,9 @@ impl Import { let track = self.tracks.create()?; tracing::debug!(name = ?track.name, ?config, "starting track"); - catalog.video.renditions.insert(track.name.clone(), config.clone()); + let mut published = config.clone(); + published.jitter = jitter; + catalog.video.renditions.insert(track.name.clone(), published); self.config = Some(config); self.track = @@ -173,6 +180,22 @@ impl Import { /// This can also be used when EOF is detected to flush the final frame. /// /// NOTE: The next decode will fail if it doesn't begin with a start code. + /// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the + /// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container + /// supplies this since the elementary stream alone carries no decode time. No-op until the + /// track exists. + pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) { + let Some(jitter) = self.jitter.observe_reorder(reorder) else { + return; + }; + let Some(track) = self.track.as_ref() else { + return; + }; + if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) { + c.jitter = Some(jitter); + } + } + pub fn decode_frame>( &mut self, buf: &mut T, diff --git a/rs/moq-mux/src/codec/vp8/import.rs b/rs/moq-mux/src/codec/vp8/import.rs index e1a1bd48f..8d5a94250 100644 --- a/rs/moq-mux/src/codec/vp8/import.rs +++ b/rs/moq-mux/src/codec/vp8/import.rs @@ -1,7 +1,7 @@ use anyhow::Context; use bytes::Buf; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use super::FrameHeader; @@ -28,7 +28,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -39,7 +39,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -50,7 +50,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/codec/vp9/import.rs b/rs/moq-mux/src/codec/vp9/import.rs index 9ba89ca66..e7ae4fb0d 100644 --- a/rs/moq-mux/src/codec/vp9/import.rs +++ b/rs/moq-mux/src/codec/vp9/import.rs @@ -1,7 +1,7 @@ use anyhow::Context; use bytes::Buf; -use crate::container::jitter::MinFrameDuration; +use crate::container::jitter::Jitter; use super::FrameHeader; @@ -28,7 +28,7 @@ pub struct Import { zero: Option, // Tracks the minimum frame duration and updates the catalog `jitter` field. - jitter: MinFrameDuration, + jitter: Jitter, } impl Import { @@ -39,7 +39,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } @@ -50,7 +50,7 @@ impl Import { track: None, config: None, zero: None, - jitter: MinFrameDuration::new(), + jitter: Jitter::new(), } } diff --git a/rs/moq-mux/src/container/jitter.rs b/rs/moq-mux/src/container/jitter.rs index 4d3e67299..d99c0c732 100644 --- a/rs/moq-mux/src/container/jitter.rs +++ b/rs/moq-mux/src/container/jitter.rs @@ -1,36 +1,71 @@ use crate::container::Timestamp; -/// Tracks the minimum duration between consecutive frames. +/// Tracks the catalog `jitter` for a video/audio track: the maximum delay before a frame can +/// be emitted, so a player sizes its buffer to at least this much. /// -/// This is the value reported as `jitter` in the catalog: a player should -/// buffer at least this much before emitting frames. Despite the name "jitter", -/// what we actually record is the *minimum frame duration* observed so far. +/// It reports whichever is larger of two contributions: +/// - the minimum frame duration (the steady inter-frame spacing), and +/// - the reorder delay (`max(PTS - DTS)`), which is non-zero only for reordered (B-frame) +/// streams and which a transmuxer also reuses as the decode-clock reserve. +/// +/// A non-reordered stream reports the frame duration; a B-frame stream reports the deeper +/// reorder delay (e.g. up to 3 consecutive B-frames is 3x the frame duration). #[derive(Default)] -pub struct MinFrameDuration { +pub struct Jitter { last_timestamp: Option, min_duration: Option, + max_reorder: Timestamp, + /// Last value handed back from [`observe`](Self::observe) / + /// [`observe_reorder`](Self::observe_reorder), so they only report on a change. + reported: Option, } -impl MinFrameDuration { +impl Jitter { pub fn new() -> Self { Self::default() } - /// Record a new frame timestamp. - /// - /// Returns the new minimum-frame-duration as a `moq_net::Time` if it - /// changed, so the caller can persist it on the catalog rendition. Returns - /// `None` when this is the first observation, the timestamps are - /// non-monotonic, or the new gap is no smaller than the recorded minimum. + /// Record a frame's presentation timestamp (decode order), updating the minimum frame + /// duration. Returns the new jitter as a [`moq_net::Time`] if it changed, else `None`. + /// The first observation and non-monotonic timestamps (B-frames) only update state. pub fn observe(&mut self, ts: Timestamp) -> Option { - let last = self.last_timestamp.replace(ts)?; - let duration = ts.checked_sub(last).ok()?; + if let Some(last) = self.last_timestamp.replace(ts) + && let Ok(duration) = ts.checked_sub(last) + && !duration.is_zero() + && duration < self.min_duration.unwrap_or(Timestamp::MAX) + { + self.min_duration = Some(duration); + } + self.report() + } + + /// Record a frame's reorder delay (`PTS - DTS`), updating the maximum. Returns the new + /// jitter as a [`moq_net::Time`] if it changed, else `None`. + pub fn observe_reorder(&mut self, reorder: Timestamp) -> Option { + self.max_reorder = self.max_reorder.max(reorder); + self.report() + } - if duration >= self.min_duration.unwrap_or(Timestamp::MAX) { + /// The current jitter (the larger of the frame duration and the reorder delay), without + /// the change-detection of [`observe`](Self::observe). Used to seed a freshly created + /// catalog rendition with whatever has accumulated, since per-frame updates before the + /// rendition exists would otherwise be lost. + pub fn current(&self) -> Option { + let jitter = self.combined(); + (!jitter.is_zero()).then(|| jitter.convert().ok()).flatten() + } + + fn combined(&self) -> Timestamp { + self.min_duration.unwrap_or(Timestamp::ZERO).max(self.max_reorder) + } + + /// Report the current jitter only when it changes. + fn report(&mut self) -> Option { + let jitter = self.combined(); + if jitter.is_zero() || self.reported == Some(jitter) { return None; } - - self.min_duration = Some(duration); - duration.convert().ok() + self.reported = Some(jitter); + jitter.convert().ok() } } diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 73e281b4d..47c26e674 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -77,6 +77,10 @@ struct Track { /// Last decode timestamp (continuous 90 kHz ticks) authored for this track, keeping the /// decode clock monotonic across reordered (B-frame) video. Only video uses it. last_dts: Option, + /// Decode-clock reserve (90 kHz ticks): how far ahead of its PTS each frame decodes. Taken + /// from the catalog `jitter` (the reorder depth) so it is large enough for `DTS <= PTS`, + /// or [`DEFAULT_DTS_RESERVE`] when the catalog declares none. Only video uses it. + dts_reserve: u64, } #[derive(Clone)] @@ -337,16 +341,20 @@ impl Export { let kind = video_kind(config, name)?; let descriptors = track_descriptors(&mpegts, name); let pid = pids[name]; + // The catalog `jitter` carries the reorder depth (max PTS - DTS), so use it as the + // decode-clock reserve; it may arrive in a later snapshot, so refresh it each time. + let reserve = dts_reserve(config); match old.remove(name) { Some(mut track) => { track.pid = pid; track.kind = kind; track.descriptors = descriptors; + track.dts_reserve = reserve; self.tracks.insert(name.clone(), track); } None => { let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, reserve); } } } @@ -363,7 +371,7 @@ impl Export { } None => { let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } } @@ -387,7 +395,7 @@ impl Export { } None => { let source = ExportSource::for_stream(&self.broadcast, name, self.latency)?; - self.insert_track(name, source, pid, kind, descriptors); + self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } } @@ -402,6 +410,7 @@ impl Export { pid: u16, kind: Kind, descriptors: Vec, + dts_reserve: u64, ) { self.tracks.insert( name.to_string(), @@ -413,6 +422,7 @@ impl Export { kind, descriptors, last_dts: None, + dts_reserve, }, ); } @@ -604,7 +614,8 @@ impl Export { // never reorder, so DTS == PTS and the PES stays PTS-only. let dts = if is_video { let pts = to_ticks(frame.timestamp); - author_dts(pts, &mut self.tracks.get_mut(name).context("missing track")?.last_dts) + let track = self.tracks.get_mut(name).context("missing track")?; + author_dts(pts, track.dts_reserve, &mut track.last_dts) } else { None }; @@ -813,14 +824,13 @@ impl Export { const PES_OPTIONAL_LEN: usize = 3 + 5; /// Extra bytes when the optional region also carries a DTS (5 DTS bytes). const PES_DTS_LEN: usize = 5; -/// Decode-clock reserve in 90 kHz ticks: each video frame is scheduled to decode this far -/// before its PTS. At 16 ticks (~0.18 ms) this is effectively just a strict-monotonic nudge: -/// it keeps DTS strictly increasing across reordered (B-frame) decode order, which is what -/// players need to schedule decoding (the `ffplay -fflags +igndts` fix). It does NOT keep -/// `DTS <= PTS` for B-frames, since covering the reorder span would need a frame-scale reserve -/// (and the matching decode lead); exact `DTS <= PTS` is the faithful wire-DTS follow-up. -/// See [`author_dts`]. -const DTS_RESERVE: u64 = 16; +/// Fallback decode-clock reserve in 90 kHz ticks when the catalog declares no `jitter`. At +/// 16 ticks (~0.18 ms) it is just a strict-monotonic nudge: it keeps DTS strictly increasing +/// across reordered (B-frame) decode order (the `ffplay -fflags +igndts` fix) but does not +/// keep `DTS <= PTS`. When the catalog carries `jitter` (the reorder depth, populated on +/// import), the track uses that instead, which is large enough to keep `DTS <= PTS`. See +/// [`author_dts`] and [`Track::dts_reserve`]. +const DEFAULT_DTS_RESERVE: u64 = 16; fn psi_interval() -> crate::container::Timestamp { crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO) @@ -948,12 +958,13 @@ fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<( /// the small reserve this keeps DTS monotonic but lets it sit above a B-frame's own PTS; a /// frame-scale reserve (or the faithful wire DTS) would be needed for `DTS <= PTS`. /// -/// `pts` and `last` are continuous (unwrapped) 90 kHz ticks, so the clock never wraps -/// mid-stream; the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is the -/// previous DTS, updated in place. Returns `None` when the DTS equals the PTS (PES stays +/// `reserve` is how far behind the PTS to run the clock (the catalog reorder depth, or the +/// fallback). `pts` and `last` are continuous (unwrapped) 90 kHz ticks, so the clock never +/// wraps mid-stream; the 33-bit wire wrap happens once at emission in [`write_pes`]. `last` is +/// the previous DTS, updated in place. Returns `None` when the DTS equals the PTS (PES stays /// PTS-only). -fn author_dts(pts: u64, last: &mut Option) -> Option { - let mut dts = pts.saturating_sub(DTS_RESERVE); +fn author_dts(pts: u64, reserve: u64, last: &mut Option) -> Option { + let mut dts = pts.saturating_sub(reserve); if let Some(prev) = *last && dts <= prev { @@ -963,15 +974,27 @@ fn author_dts(pts: u64, last: &mut Option) -> Option { (dts != pts).then_some(dts) } +/// The decode-clock reserve for a video rendition: its catalog `jitter` (the reorder depth) +/// in 90 kHz ticks, or [`DEFAULT_DTS_RESERVE`] when none is declared. +fn dts_reserve(config: &VideoConfig) -> u64 { + config + .jitter + .map(|t| t.as_scale(90_000) as u64) + .filter(|&ticks| ticks > 0) + .unwrap_or(DEFAULT_DTS_RESERVE) +} + #[cfg(test)] mod tests { - use super::{DTS_RESERVE, author_dts, is_complete_section}; + use super::{DEFAULT_DTS_RESERVE, author_dts, is_complete_section}; - /// Push a decode-order PTS stream (90 kHz) through the decode clock and return the - /// effective DTS per frame (the authored DTS, or the PTS when none is authored). - fn run_clock(pts: &[u64]) -> Vec { + /// Push a decode-order PTS stream (90 kHz) through the decode clock with a given reserve and + /// return the effective DTS per frame (the authored DTS, or the PTS when none is authored). + fn run_clock(pts: &[u64], reserve: u64) -> Vec { let mut last = None; - pts.iter().map(|&p| author_dts(p, &mut last).unwrap_or(p)).collect() + pts.iter() + .map(|&p| author_dts(p, reserve, &mut last).unwrap_or(p)) + .collect() } /// Decode-order PTS for a constant-frame-rate display timeline with `b` B-frames between @@ -994,21 +1017,39 @@ mod tests { #[test] fn dts_is_monotonic_across_reorder() { - // 25 fps, 10 s offset; a few B-frame depths. The small reserve guarantees a strictly - // increasing decode timeline (the `+igndts` fix); it does not keep DTS <= PTS. + // 25 fps, 10 s offset. Even with the tiny fallback reserve the decode timeline is + // strictly increasing (the `+igndts` fix); it just may sit above PTS for B-frames. for b in [1, 3, 5] { let pts = decode_order(40, b, 3_600, 10_000_000); - let dts = run_clock(&pts); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); // The fixture genuinely reorders (PTS dips in decode order). assert!(pts.windows(2).any(|w| w[1] < w[0]), "b={b}: stream must reorder PTS"); - // The decode timeline is strictly increasing. for (i, win) in dts.windows(2).enumerate() { assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); } } } + #[test] + fn sufficient_reserve_keeps_dts_under_pts() { + // With a reserve covering the reorder span (the catalog `jitter` carries it), the decode + // timeline is both strictly increasing and never after the PTS. + let dur = 3_600; + for b in [1, 3, 5] { + let reserve = (b as u64 + 1) * dur; // one frame past the b-frame run + let pts = decode_order(40, b, dur, 10_000_000); + let dts = run_clock(&pts, reserve); + + for (i, win) in dts.windows(2).enumerate() { + assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}"); + } + for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "b={b}: DTS {d} after PTS {p} at {i}"); + } + } + } + #[test] fn dts_clock_survives_33bit_wrap() { // The decode clock runs in continuous ticks, so it stays strictly increasing even as @@ -1016,7 +1057,7 @@ mod tests { // only at emission, so here the authored DTS keeps climbing past 1 << 33. let wrap = 1u64 << 33; let pts = decode_order(40, 3, 3_600, wrap - 20 * 3_600); - let dts = run_clock(&pts); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); assert!(pts.iter().any(|&p| p >= wrap), "test must cross the wrap boundary"); for (i, win) in dts.windows(2).enumerate() { @@ -1029,17 +1070,15 @@ mod tests { #[test] fn dts_without_reorder_trails_pts_by_the_reserve() { - // A monotonic (no-B) stream stays strictly increasing and one reserve under its PTS, - // which at 16 ticks is a negligible decode nudge (and not added latency: presentation - // is still at the PTS). + // A monotonic (no-B) stream stays strictly increasing and one reserve under its PTS. let pts: Vec = (0..40).map(|i| 10_000_000 + i * 3_600).collect(); - let dts = run_clock(&pts); + let dts = run_clock(&pts, DEFAULT_DTS_RESERVE); for (i, win) in dts.windows(2).enumerate() { assert!(win[1] > win[0], "DTS not strictly increasing at {i}: {win:?}"); } for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() { - assert_eq!(d, p - DTS_RESERVE, "DTS should trail PTS by exactly the reserve at {i}"); + assert_eq!(d, p - DEFAULT_DTS_RESERVE, "DTS should trail PTS by the reserve at {i}"); } } diff --git a/rs/moq-mux/src/container/ts/export_test.rs b/rs/moq-mux/src/container/ts/export_test.rs index 63f4f087e..43dbb4fa4 100644 --- a/rs/moq-mux/src/container/ts/export_test.rs +++ b/rs/moq-mux/src/container/ts/export_test.rs @@ -335,10 +335,10 @@ async fn export_avc1_out_of_band_reassembles() { } /// A real broadcast contribution feed (Ateme Kyrion, H.264 1080i with ~86 B-frames) -/// must come out of the exporter with an authored decode timeline. Without it players -/// see PTS-only video and choke on the out-of-order decode times (`ffplay +igndts`). -/// Assert the video PES carry a strictly increasing DTS in decode order (the fix), and -/// that the reorder was real (some authored DTS, non-monotonic PTS in the source). +/// must come out of the exporter with an authored decode timeline. The importer publishes +/// the reorder depth as the catalog `jitter`, and the exporter sizes its decode-clock reserve +/// from it, so the video PES carry a DTS that is both strictly increasing and never after the +/// PTS in decode order. Also assert the reorder was real (non-monotonic PTS in the source). #[tokio::test(start_paused = true)] async fn export_bframe_video_authors_dts() { let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts"); @@ -393,10 +393,14 @@ async fn export_bframe_video_authors_dts() { ); // The exporter authored a decode timeline (the decode clock trails the PTS). assert!(authored > 0, "no DTS authored for a B-frame stream"); - // That timeline is strictly increasing, which is what removes the `+igndts` requirement. + // Strictly increasing (removes the `+igndts` requirement) and never after presentation + // (the catalog jitter sized the reserve to the reorder depth). for (i, win) in effective.windows(2).enumerate() { assert!(win[1] > win[0], "DTS not strictly increasing at frame {i}: {win:?}"); } + for (i, (&d, &p)) in effective.iter().zip(pts.iter()).enumerate() { + assert!(d <= p, "DTS {d} after PTS {p} at frame {i}"); + } } /// Full SCTE-35 round-trip: import `bbb.ts` (real H.264 + AAC) into a broadcast diff --git a/rs/moq-mux/src/container/ts/import.rs b/rs/moq-mux/src/container/ts/import.rs index 94f3bda31..f6feea0fd 100644 --- a/rs/moq-mux/src/container/ts/import.rs +++ b/rs/moq-mux/src/container/ts/import.rs @@ -472,6 +472,7 @@ impl Import { let data_len = pes_data_len(&pes.header, pes.pes_packet_len); let mut pending = Pending { pts: pes.header.pts.map(|t| t.as_u64()), + dts: pes.header.dts.map(|t| t.as_u64()), stream_id: pes.header.stream_id.as_u8(), data: Vec::with_capacity(pes.data.len()), data_len, @@ -586,6 +587,9 @@ impl Import { struct Pending { /// Raw 90 kHz PTS, before wrap-unwrapping. pts: Option, + /// Raw 90 kHz DTS, before wrap-unwrapping. Present on reordered (B-frame) video; its + /// distance below the PTS is the reorder delay published as the catalog jitter. + dts: Option, /// PES stream_id, preserved for verbatim PES carriage. stream_id: u8, data: Vec, @@ -981,12 +985,23 @@ impl Stream { fn write(&mut self, pending: Pending, burst: Option) -> anyhow::Result<()> { match self { Stream::H264 { import, unwrap } => { + let reorder = reorder_delay(pending.pts, pending.dts); let pts = unwrap_pts(unwrap, pending.pts)?; - import.decode_frame(&mut pending.data.as_slice(), pts) + import.decode_frame(&mut pending.data.as_slice(), pts)?; + // After decode_frame, so the track (and its catalog rendition) exists. + if let Some(reorder) = reorder { + import.observe_reorder(reorder); + } + Ok(()) } Stream::H265 { import, unwrap } => { + let reorder = reorder_delay(pending.pts, pending.dts); let pts = unwrap_pts(unwrap, pending.pts)?; - import.decode_frame(&mut pending.data.as_slice(), pts) + import.decode_frame(&mut pending.data.as_slice(), pts)?; + if let Some(reorder) = reorder { + import.observe_reorder(reorder); + } + Ok(()) } Stream::Aac(stream) => stream.write(pending, burst), Stream::Legacy(stream) => stream.write(pending), @@ -1298,6 +1313,22 @@ fn unwrap_pts(unwrap: &mut PtsUnwrap, pts: Option) -> anyhow::Result