Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rs/moq-mux/src/codec/av1/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::container::jitter::MinFrameDuration;
use crate::container::jitter::Jitter;

use anyhow::Context;
use bytes::BytesMut;
Expand Down Expand Up @@ -26,7 +26,7 @@ pub struct Import {
zero: Option<tokio::time::Instant>,

// Tracks the minimum frame duration and updates the catalog `jitter` field.
jitter: MinFrameDuration,
jitter: Jitter,
}

#[derive(Default)]
Expand All @@ -45,7 +45,7 @@ impl Import {
config: None,
current: Default::default(),
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand All @@ -57,7 +57,7 @@ impl Import {
config: None,
current: Default::default(),
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand Down
32 changes: 27 additions & 5 deletions rs/moq-mux/src/codec/h264/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncReadExt};
use super::Sps;
use crate::catalog::hang::CatalogExt;
use crate::codec::annexb::{NalIterator, START_CODE};
use crate::container::jitter::MinFrameDuration;
use crate::container::jitter::Jitter;

/// The wire shape an [`Import`] is processing.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand All @@ -37,7 +37,7 @@ pub struct Import<E: CatalogExt = ()> {
config: Option<hang::catalog::VideoConfig>,
state: State,
zero: Option<tokio::time::Instant>,
jitter: MinFrameDuration,
jitter: Jitter,
}

enum State {
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<E: CatalogExt> Import<E> {
config: None,
state: State::Pending { mode_hint: None },
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand All @@ -90,7 +90,7 @@ impl<E: CatalogExt> Import<E> {
config: None,
state: State::Pending { mode_hint: None },
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand Down Expand Up @@ -258,6 +258,22 @@ impl<E: CatalogExt> Import<E> {
}
}

/// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the
/// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container
/// supplies this since the elementary stream alone carries no decode time. No-op until the
/// track exists.
pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) {
let Some(jitter) = self.jitter.observe_reorder(reorder) else {
return;
};
let Some(track) = self.track.as_ref() else {
return;
};
if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) {
c.jitter = Some(jitter);
}
}

fn decode_avc1<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
Expand Down Expand Up @@ -407,7 +423,13 @@ impl<E: CatalogExt> Import<E> {
// The avc3 track was created eagerly in initialize_avc3; just publish
// (or republish) the catalog rendition with the latest config.
let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone();
self.catalog.lock().video.renditions.insert(track_name, config.clone());
// Seed jitter from whatever has accumulated: a dirty start feeds frames before
// this first rendition exists, so those per-frame updates would otherwise be
// lost. Keep the cached `config` jitter-free so a later jitter change is not
// mistaken for a codec reconfiguration.
let mut published = config.clone();
published.jitter = self.jitter.current();
self.catalog.lock().video.renditions.insert(track_name, published);
self.config = Some(config);
Ok(reconfigured)
}
Expand Down
33 changes: 28 additions & 5 deletions rs/moq-mux/src/codec/h265/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::catalog::hang::CatalogExt;
use crate::codec::annexb::{NalIterator, START_CODE};
use crate::container::jitter::MinFrameDuration;
use crate::container::jitter::Jitter;

use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
Expand Down Expand Up @@ -37,7 +37,7 @@ pub struct Import<E: CatalogExt = ()> {
pps: Vec<Bytes>,

// Tracks the minimum frame duration and updates the catalog `jitter` field.
jitter: MinFrameDuration,
jitter: Jitter,
}

impl<E: CatalogExt> Import<E> {
Expand All @@ -52,7 +52,7 @@ impl<E: CatalogExt> Import<E> {
vps: Vec::new(),
sps: Vec::new(),
pps: Vec::new(),
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand All @@ -67,7 +67,7 @@ impl<E: CatalogExt> Import<E> {
vps: Vec::new(),
sps: Vec::new(),
pps: Vec::new(),
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand Down Expand Up @@ -101,6 +101,11 @@ impl<E: CatalogExt> Import<E> {
}

let reconfigured = self.config.is_some();
// Seed jitter from whatever has accumulated: a dirty start feeds frames before this
// first rendition exists, so those per-frame updates would otherwise be lost. The
// cached `config` stays jitter-free so a later jitter change is not mistaken for a
// codec reconfiguration.
let jitter = self.jitter.current();
let mut catalog = self.catalog.lock();

if self.track.is_some() && self.tracks.is_fixed() {
Expand All @@ -114,7 +119,9 @@ impl<E: CatalogExt> Import<E> {

let track = self.tracks.create()?;
tracing::debug!(name = ?track.name, ?config, "starting track");
catalog.video.renditions.insert(track.name.clone(), config.clone());
let mut published = config.clone();
published.jitter = jitter;
catalog.video.renditions.insert(track.name.clone(), published);

self.config = Some(config);
self.track =
Expand Down Expand Up @@ -173,6 +180,22 @@ impl<E: CatalogExt> Import<E> {
/// This can also be used when EOF is detected to flush the final frame.
///
/// NOTE: The next decode will fail if it doesn't begin with a start code.
/// Record a frame's reorder delay (`PTS - DTS`) so the catalog `jitter` reflects the
/// B-frame reorder depth (the decode buffer a transmuxer/player must hold). The container
/// supplies this since the elementary stream alone carries no decode time. No-op until the
/// track exists.
pub fn observe_reorder(&mut self, reorder: crate::container::Timestamp) {
let Some(jitter) = self.jitter.observe_reorder(reorder) else {
return;
};
let Some(track) = self.track.as_ref() else {
return;
};
if let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name) {
c.jitter = Some(jitter);
}
}

pub fn decode_frame<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-mux/src/codec/vp8/import.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context;
use bytes::Buf;

use crate::container::jitter::MinFrameDuration;
use crate::container::jitter::Jitter;

use super::FrameHeader;

Expand All @@ -28,7 +28,7 @@ pub struct Import {
zero: Option<tokio::time::Instant>,

// Tracks the minimum frame duration and updates the catalog `jitter` field.
jitter: MinFrameDuration,
jitter: Jitter,
}

impl Import {
Expand All @@ -39,7 +39,7 @@ impl Import {
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand All @@ -50,7 +50,7 @@ impl Import {
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand Down
8 changes: 4 additions & 4 deletions rs/moq-mux/src/codec/vp9/import.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context;
use bytes::Buf;

use crate::container::jitter::MinFrameDuration;
use crate::container::jitter::Jitter;

use super::FrameHeader;

Expand All @@ -28,7 +28,7 @@ pub struct Import {
zero: Option<tokio::time::Instant>,

// Tracks the minimum frame duration and updates the catalog `jitter` field.
jitter: MinFrameDuration,
jitter: Jitter,
}

impl Import {
Expand All @@ -39,7 +39,7 @@ impl Import {
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand All @@ -50,7 +50,7 @@ impl Import {
track: None,
config: None,
zero: None,
jitter: MinFrameDuration::new(),
jitter: Jitter::new(),
}
}

Expand Down
71 changes: 53 additions & 18 deletions rs/moq-mux/src/container/jitter.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,71 @@
use crate::container::Timestamp;

/// Tracks the minimum duration between consecutive frames.
/// Tracks the catalog `jitter` for a video/audio track: the maximum delay before a frame can
/// be emitted, so a player sizes its buffer to at least this much.
///
/// This is the value reported as `jitter` in the catalog: a player should
/// buffer at least this much before emitting frames. Despite the name "jitter",
/// what we actually record is the *minimum frame duration* observed so far.
/// It reports whichever is larger of two contributions:
/// - the minimum frame duration (the steady inter-frame spacing), and
/// - the reorder delay (`max(PTS - DTS)`), which is non-zero only for reordered (B-frame)
/// streams and which a transmuxer also reuses as the decode-clock reserve.
///
/// A non-reordered stream reports the frame duration; a B-frame stream reports the deeper
/// reorder delay (e.g. up to 3 consecutive B-frames is 3x the frame duration).
#[derive(Default)]
pub struct MinFrameDuration {
pub struct Jitter {
last_timestamp: Option<Timestamp>,
min_duration: Option<Timestamp>,
max_reorder: Timestamp,
/// Last value handed back from [`observe`](Self::observe) /
/// [`observe_reorder`](Self::observe_reorder), so they only report on a change.
reported: Option<Timestamp>,
}

impl MinFrameDuration {
impl Jitter {
pub fn new() -> Self {
Self::default()
}

/// Record a new frame timestamp.
///
/// Returns the new minimum-frame-duration as a `moq_net::Time` if it
/// changed, so the caller can persist it on the catalog rendition. Returns
/// `None` when this is the first observation, the timestamps are
/// non-monotonic, or the new gap is no smaller than the recorded minimum.
/// Record a frame's presentation timestamp (decode order), updating the minimum frame
/// duration. Returns the new jitter as a [`moq_net::Time`] if it changed, else `None`.
/// The first observation and non-monotonic timestamps (B-frames) only update state.
pub fn observe(&mut self, ts: Timestamp) -> Option<moq_net::Time> {
let last = self.last_timestamp.replace(ts)?;
let duration = ts.checked_sub(last).ok()?;
if let Some(last) = self.last_timestamp.replace(ts)
&& let Ok(duration) = ts.checked_sub(last)
&& !duration.is_zero()
&& duration < self.min_duration.unwrap_or(Timestamp::MAX)
{
self.min_duration = Some(duration);
}
self.report()
}

/// Record a frame's reorder delay (`PTS - DTS`), updating the maximum. Returns the new
/// jitter as a [`moq_net::Time`] if it changed, else `None`.
pub fn observe_reorder(&mut self, reorder: Timestamp) -> Option<moq_net::Time> {
self.max_reorder = self.max_reorder.max(reorder);
self.report()
}

if duration >= self.min_duration.unwrap_or(Timestamp::MAX) {
/// The current jitter (the larger of the frame duration and the reorder delay), without
/// the change-detection of [`observe`](Self::observe). Used to seed a freshly created
/// catalog rendition with whatever has accumulated, since per-frame updates before the
/// rendition exists would otherwise be lost.
pub fn current(&self) -> Option<moq_net::Time> {
let jitter = self.combined();
(!jitter.is_zero()).then(|| jitter.convert().ok()).flatten()
}

fn combined(&self) -> Timestamp {
self.min_duration.unwrap_or(Timestamp::ZERO).max(self.max_reorder)
}

/// Report the current jitter only when it changes.
fn report(&mut self) -> Option<moq_net::Time> {
let jitter = self.combined();
if jitter.is_zero() || self.reported == Some(jitter) {
return None;
}

self.min_duration = Some(duration);
duration.convert().ok()
self.reported = Some(jitter);
jitter.convert().ok()
}
}
Loading