Skip to content

Commit 6893b1c

Browse files
kixelatedclaude
andauthored
fix(moq-mux): author DTS for B-frame MPEG-TS export (#1843)
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 84c0551 commit 6893b1c

10 files changed

Lines changed: 444 additions & 56 deletions

File tree

rs/moq-mux/src/codec/av1/import.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::container::jitter::MinFrameDuration;
1+
use crate::container::jitter::Jitter;
22

33
use anyhow::Context;
44
use bytes::BytesMut;
@@ -26,7 +26,7 @@ pub struct Import {
2626
zero: Option<tokio::time::Instant>,
2727

2828
// Tracks the minimum frame duration and updates the catalog `jitter` field.
29-
jitter: MinFrameDuration,
29+
jitter: Jitter,
3030
}
3131

3232
#[derive(Default)]
@@ -45,7 +45,7 @@ impl Import {
4545
config: None,
4646
current: Default::default(),
4747
zero: None,
48-
jitter: MinFrameDuration::new(),
48+
jitter: Jitter::new(),
4949
}
5050
}
5151

@@ -57,7 +57,7 @@ impl Import {
5757
config: None,
5858
current: Default::default(),
5959
zero: None,
60-
jitter: MinFrameDuration::new(),
60+
jitter: Jitter::new(),
6161
}
6262
}
6363

rs/moq-mux/src/codec/h264/import.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncReadExt};
1414
use super::Sps;
1515
use crate::catalog::hang::CatalogExt;
1616
use crate::codec::annexb::{NalIterator, START_CODE};
17-
use crate::container::jitter::MinFrameDuration;
17+
use crate::container::jitter::Jitter;
1818

1919
/// The wire shape an [`Import`] is processing.
2020
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@@ -37,7 +37,7 @@ pub struct Import<E: CatalogExt = ()> {
3737
config: Option<hang::catalog::VideoConfig>,
3838
state: State,
3939
zero: Option<tokio::time::Instant>,
40-
jitter: MinFrameDuration,
40+
jitter: Jitter,
4141
}
4242

4343
enum State {
@@ -78,7 +78,7 @@ impl<E: CatalogExt> Import<E> {
7878
config: None,
7979
state: State::Pending { mode_hint: None },
8080
zero: None,
81-
jitter: MinFrameDuration::new(),
81+
jitter: Jitter::new(),
8282
}
8383
}
8484

@@ -90,7 +90,7 @@ impl<E: CatalogExt> Import<E> {
9090
config: None,
9191
state: State::Pending { mode_hint: None },
9292
zero: None,
93-
jitter: MinFrameDuration::new(),
93+
jitter: Jitter::new(),
9494
}
9595
}
9696

@@ -258,6 +258,22 @@ impl<E: CatalogExt> Import<E> {
258258
}
259259
}
260260

261+
/// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the
262+
/// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container
263+
/// supplies this since the elementary stream alone carries no decode time. No-op until the
264+
/// track exists.
265+
pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) {
266+
let Some(jitter) = self.jitter.observe_reorder(reorder) else {
267+
return;
268+
};
269+
let Some(track) = self.track.as_ref() else {
270+
return;
271+
};
272+
if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) {
273+
c.jitter = Some(jitter);
274+
}
275+
}
276+
261277
fn decode_avc1<T: Buf + AsRef<[u8]>>(
262278
&mut self,
263279
buf: &mut T,
@@ -407,7 +423,13 @@ impl<E: CatalogExt> Import<E> {
407423
// The avc3 track was created eagerly in initialize_avc3; just publish
408424
// (or republish) the catalog rendition with the latest config.
409425
let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone();
410-
self.catalog.lock().video.renditions.insert(track_name, config.clone());
426+
// Seed jitter from whatever has accumulated: a dirty start feeds frames before
427+
// this first rendition exists, so those per-frame updates would otherwise be
428+
// lost. Keep the cached `config` jitter-free so a later jitter change is not
429+
// mistaken for a codec reconfiguration.
430+
let mut published = config.clone();
431+
published.jitter = self.jitter.current();
432+
self.catalog.lock().video.renditions.insert(track_name, published);
411433
self.config = Some(config);
412434
Ok(reconfigured)
413435
}

rs/moq-mux/src/codec/h265/import.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::catalog::hang::CatalogExt;
22
use crate::codec::annexb::{NalIterator, START_CODE};
3-
use crate::container::jitter::MinFrameDuration;
3+
use crate::container::jitter::Jitter;
44

55
use anyhow::Context;
66
use bytes::{Buf, Bytes, BytesMut};
@@ -37,7 +37,7 @@ pub struct Import<E: CatalogExt = ()> {
3737
pps: Vec<Bytes>,
3838

3939
// Tracks the minimum frame duration and updates the catalog `jitter` field.
40-
jitter: MinFrameDuration,
40+
jitter: Jitter,
4141
}
4242

4343
impl<E: CatalogExt> Import<E> {
@@ -52,7 +52,7 @@ impl<E: CatalogExt> Import<E> {
5252
vps: Vec::new(),
5353
sps: Vec::new(),
5454
pps: Vec::new(),
55-
jitter: MinFrameDuration::new(),
55+
jitter: Jitter::new(),
5656
}
5757
}
5858

@@ -67,7 +67,7 @@ impl<E: CatalogExt> Import<E> {
6767
vps: Vec::new(),
6868
sps: Vec::new(),
6969
pps: Vec::new(),
70-
jitter: MinFrameDuration::new(),
70+
jitter: Jitter::new(),
7171
}
7272
}
7373

@@ -101,6 +101,11 @@ impl<E: CatalogExt> Import<E> {
101101
}
102102

103103
let reconfigured = self.config.is_some();
104+
// Seed jitter from whatever has accumulated: a dirty start feeds frames before this
105+
// first rendition exists, so those per-frame updates would otherwise be lost. The
106+
// cached `config` stays jitter-free so a later jitter change is not mistaken for a
107+
// codec reconfiguration.
108+
let jitter = self.jitter.current();
104109
let mut catalog = self.catalog.lock();
105110

106111
if self.track.is_some() && self.tracks.is_fixed() {
@@ -114,7 +119,9 @@ impl<E: CatalogExt> Import<E> {
114119

115120
let track = self.tracks.create()?;
116121
tracing::debug!(name = ?track.name, ?config, "starting track");
117-
catalog.video.renditions.insert(track.name.clone(), config.clone());
122+
let mut published = config.clone();
123+
published.jitter = jitter;
124+
catalog.video.renditions.insert(track.name.clone(), published);
118125

119126
self.config = Some(config);
120127
self.track =
@@ -173,6 +180,22 @@ impl<E: CatalogExt> Import<E> {
173180
/// This can also be used when EOF is detected to flush the final frame.
174181
///
175182
/// NOTE: The next decode will fail if it doesn't begin with a start code.
183+
/// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the
184+
/// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container
185+
/// supplies this since the elementary stream alone carries no decode time. No-op until the
186+
/// track exists.
187+
pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) {
188+
let Some(jitter) = self.jitter.observe_reorder(reorder) else {
189+
return;
190+
};
191+
let Some(track) = self.track.as_ref() else {
192+
return;
193+
};
194+
if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) {
195+
c.jitter = Some(jitter);
196+
}
197+
}
198+
176199
pub fn decode_frame<T: Buf + AsRef<[u8]>>(
177200
&mut self,
178201
buf: &mut T,

rs/moq-mux/src/codec/vp8/import.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::Context;
22
use bytes::Buf;
33

4-
use crate::container::jitter::MinFrameDuration;
4+
use crate::container::jitter::Jitter;
55

66
use super::FrameHeader;
77

@@ -28,7 +28,7 @@ pub struct Import {
2828
zero: Option<tokio::time::Instant>,
2929

3030
// Tracks the minimum frame duration and updates the catalog `jitter` field.
31-
jitter: MinFrameDuration,
31+
jitter: Jitter,
3232
}
3333

3434
impl Import {
@@ -39,7 +39,7 @@ impl Import {
3939
track: None,
4040
config: None,
4141
zero: None,
42-
jitter: MinFrameDuration::new(),
42+
jitter: Jitter::new(),
4343
}
4444
}
4545

@@ -50,7 +50,7 @@ impl Import {
5050
track: None,
5151
config: None,
5252
zero: None,
53-
jitter: MinFrameDuration::new(),
53+
jitter: Jitter::new(),
5454
}
5555
}
5656

rs/moq-mux/src/codec/vp9/import.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::Context;
22
use bytes::Buf;
33

4-
use crate::container::jitter::MinFrameDuration;
4+
use crate::container::jitter::Jitter;
55

66
use super::FrameHeader;
77

@@ -28,7 +28,7 @@ pub struct Import {
2828
zero: Option<tokio::time::Instant>,
2929

3030
// Tracks the minimum frame duration and updates the catalog `jitter` field.
31-
jitter: MinFrameDuration,
31+
jitter: Jitter,
3232
}
3333

3434
impl Import {
@@ -39,7 +39,7 @@ impl Import {
3939
track: None,
4040
config: None,
4141
zero: None,
42-
jitter: MinFrameDuration::new(),
42+
jitter: Jitter::new(),
4343
}
4444
}
4545

@@ -50,7 +50,7 @@ impl Import {
5050
track: None,
5151
config: None,
5252
zero: None,
53-
jitter: MinFrameDuration::new(),
53+
jitter: Jitter::new(),
5454
}
5555
}
5656

rs/moq-mux/src/container/jitter.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,71 @@
11
use crate::container::Timestamp;
22

3-
/// Tracks the minimum duration between consecutive frames.
3+
/// Tracks the catalog `jitter` for a video/audio track: the maximum delay before a frame can
4+
/// be emitted, so a player sizes its buffer to at least this much.
45
///
5-
/// This is the value reported as `jitter` in the catalog: a player should
6-
/// buffer at least this much before emitting frames. Despite the name "jitter",
7-
/// what we actually record is the *minimum frame duration* observed so far.
6+
/// It reports whichever is larger of two contributions:
7+
/// - the minimum frame duration (the steady inter-frame spacing), and
8+
/// - the reorder delay (`max(PTS - DTS)`), which is non-zero only for reordered (B-frame)
9+
/// streams and which a transmuxer also reuses as the decode-clock reserve.
10+
///
11+
/// A non-reordered stream reports the frame duration; a B-frame stream reports the deeper
12+
/// reorder delay (e.g. up to 3 consecutive B-frames is 3x the frame duration).
813
#[derive(Default)]
9-
pub struct MinFrameDuration {
14+
pub struct Jitter {
1015
last_timestamp: Option<Timestamp>,
1116
min_duration: Option<Timestamp>,
17+
max_reorder: Timestamp,
18+
/// Last value handed back from [`observe`](Self::observe) /
19+
/// [`observe_reorder`](Self::observe_reorder), so they only report on a change.
20+
reported: Option<Timestamp>,
1221
}
1322

14-
impl MinFrameDuration {
23+
impl Jitter {
1524
pub fn new() -> Self {
1625
Self::default()
1726
}
1827

19-
/// Record a new frame timestamp.
20-
///
21-
/// Returns the new minimum-frame-duration as a `moq_net::Time` if it
22-
/// changed, so the caller can persist it on the catalog rendition. Returns
23-
/// `None` when this is the first observation, the timestamps are
24-
/// non-monotonic, or the new gap is no smaller than the recorded minimum.
28+
/// Record a frame's presentation timestamp (decode order), updating the minimum frame
29+
/// duration. Returns the new jitter as a [`moq_net::Time`] if it changed, else `None`.
30+
/// The first observation and non-monotonic timestamps (B-frames) only update state.
2531
pub fn observe(&mut self, ts: Timestamp) -> Option<moq_net::Time> {
26-
let last = self.last_timestamp.replace(ts)?;
27-
let duration = ts.checked_sub(last).ok()?;
32+
if let Some(last) = self.last_timestamp.replace(ts)
33+
&& let Ok(duration) = ts.checked_sub(last)
34+
&& !duration.is_zero()
35+
&& duration < self.min_duration.unwrap_or(Timestamp::MAX)
36+
{
37+
self.min_duration = Some(duration);
38+
}
39+
self.report()
40+
}
41+
42+
/// Record a frame's reorder delay (`PTS - DTS`), updating the maximum. Returns the new
43+
/// jitter as a [`moq_net::Time`] if it changed, else `None`.
44+
pub fn observe_reorder(&mut self, reorder: Timestamp) -> Option<moq_net::Time> {
45+
self.max_reorder = self.max_reorder.max(reorder);
46+
self.report()
47+
}
2848

29-
if duration >= self.min_duration.unwrap_or(Timestamp::MAX) {
49+
/// The current jitter (the larger of the frame duration and the reorder delay), without
50+
/// the change-detection of [`observe`](Self::observe). Used to seed a freshly created
51+
/// catalog rendition with whatever has accumulated, since per-frame updates before the
52+
/// rendition exists would otherwise be lost.
53+
pub fn current(&self) -> Option<moq_net::Time> {
54+
let jitter = self.combined();
55+
(!jitter.is_zero()).then(|| jitter.convert().ok()).flatten()
56+
}
57+
58+
fn combined(&self) -> Timestamp {
59+
self.min_duration.unwrap_or(Timestamp::ZERO).max(self.max_reorder)
60+
}
61+
62+
/// Report the current jitter only when it changes.
63+
fn report(&mut self) -> Option<moq_net::Time> {
64+
let jitter = self.combined();
65+
if jitter.is_zero() || self.reported == Some(jitter) {
3066
return None;
3167
}
32-
33-
self.min_duration = Some(duration);
34-
duration.convert().ok()
68+
self.reported = Some(jitter);
69+
jitter.convert().ok()
3570
}
3671
}

0 commit comments

Comments
 (0)