diff --git a/Cargo.lock b/Cargo.lock index d7aa47c50..445e2d918 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3762,6 +3762,7 @@ dependencies = [ "moq-mux", "moq-native", "moq-net", + "moq-video", "thiserror 2.0.18", "tokio", "tracing", @@ -4399,6 +4400,7 @@ dependencies = [ "bytes", "cudarc", "dispatch2", + "hang", "moq-mux", "moq-net", "moq-vaapi", diff --git a/rs/libmoq/CHANGELOG.md b/rs/libmoq/CHANGELOG.md index 108f5bcb6..85948e6e6 100644 --- a/rs/libmoq/CHANGELOG.md +++ b/rs/libmoq/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Native video decode C API: `moq_consume_video_raw` (+ `_close`, `_frame`, + `_frame_free`) subscribes to an H.264 track and hands back decoded I420 frames, + the video counterpart to `moq_consume_audio_raw`. Decoding happens inside + libmoq (VideoToolbox / openh264), so consumers no longer need ffmpeg. + ## [0.3.7](https://github.com/moq-dev/moq/compare/libmoq-v0.3.6...libmoq-v0.3.7) - 2026-06-19 ### Fixed diff --git a/rs/libmoq/Cargo.toml b/rs/libmoq/Cargo.toml index 275bb6d79..d0c9792ee 100644 --- a/rs/libmoq/Cargo.toml +++ b/rs/libmoq/Cargo.toml @@ -24,6 +24,7 @@ moq-audio = { workspace = true } moq-mux = { workspace = true } moq-native = { workspace = true, default-features = true } moq-net = { workspace = true, features = ["serde"] } +moq-video = { workspace = true } thiserror = "2" tokio = { workspace = true, features = ["macros"] } tracing = "0.1" diff --git a/rs/libmoq/src/consume.rs b/rs/libmoq/src/consume.rs index 93006dc15..c4e40303d 100644 --- a/rs/libmoq/src/consume.rs +++ b/rs/libmoq/src/consume.rs @@ -492,6 +492,25 @@ impl Consume { Ok(()) } + /// Look up a video rendition by catalog index, returning the + /// (broadcast, config, name) tuple needed to subscribe — mirrors + /// the index-based selection in `video_ordered`. + pub fn video_rendition( + &self, + catalog: Id, + index: usize, + ) -> Result<(moq_net::BroadcastConsumer, hang::catalog::VideoConfig, String), Error> { + let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?; + let (name, config) = consume + .catalog + .video + .renditions + .iter() + .nth(index) + .ok_or(Error::NoIndex)?; + Ok((consume.broadcast.clone(), config.clone(), name.clone())) + } + /// Look up an audio rendition by catalog index, returning the /// (broadcast, config, name) tuple needed to subscribe — mirrors /// the index-based selection in `audio_ordered`. diff --git a/rs/libmoq/src/error.rs b/rs/libmoq/src/error.rs index 9801f4be8..147df1295 100644 --- a/rs/libmoq/src/error.rs +++ b/rs/libmoq/src/error.rs @@ -142,6 +142,10 @@ pub enum Error { /// Error from the moq-audio codec layer. #[error("audio error: {0}")] Audio(Arc), + + /// Error from the moq-video codec layer. + #[error("video error: {0}")] + Video(Arc), } impl From for Error { @@ -150,6 +154,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: moq_video::Error) -> Self { + Error::Video(Arc::new(err)) + } +} + impl From for Error { fn from(err: tracing::metadata::ParseLevelError) -> Self { Error::Level(Arc::new(err)) @@ -195,6 +205,7 @@ impl ffi::ReturnCode for Error { Error::Native(_) => -33, Error::Unauthorized => -34, Error::Forbidden => -35, + Error::Video(_) => -36, } } } diff --git a/rs/libmoq/src/lib.rs b/rs/libmoq/src/lib.rs index 58f3bde4c..8b4fbb0d9 100644 --- a/rs/libmoq/src/lib.rs +++ b/rs/libmoq/src/lib.rs @@ -26,11 +26,13 @@ mod origin; mod publish; mod session; mod state; +mod video; pub use api::*; pub use audio::*; pub use error::*; pub use id::*; +pub use video::*; pub(crate) use consume::*; pub(crate) use origin::*; diff --git a/rs/libmoq/src/state.rs b/rs/libmoq/src/state.rs index a4e988528..31c0585dd 100644 --- a/rs/libmoq/src/state.rs +++ b/rs/libmoq/src/state.rs @@ -1,6 +1,6 @@ use std::sync::{LazyLock, Mutex, MutexGuard}; -use crate::{Consume, Origin, Publish, Session, audio::Audio}; +use crate::{Consume, Origin, Publish, Session, audio::Audio, video::Video}; pub struct State { pub session: Session, @@ -8,6 +8,7 @@ pub struct State { pub publish: Publish, pub consume: Consume, pub audio: Audio, + pub video: Video, } impl State { @@ -18,6 +19,7 @@ impl State { publish: Publish::default(), consume: Consume::default(), audio: Audio::default(), + video: Video::default(), } } diff --git a/rs/libmoq/src/test.rs b/rs/libmoq/src/test.rs index 2b9665a79..00c9e4893 100644 --- a/rs/libmoq/src/test.rs +++ b/rs/libmoq/src/test.rs @@ -926,6 +926,106 @@ fn video_publish_consume() { assert_eq!(moq_origin_close(origin), 0); } +/// End-to-end native decode: publish real H.264 (encoded by moq-video) and +/// consume it through `moq_consume_video_raw`, asserting decoded I420 frames. +#[test] +fn video_raw_decode() { + // Encode a few gray frames to Annex-B (avc3, SPS/PPS inline on the keyframe). + let mut config = moq_video::encode::Config::new(320, 240, 30); + config.kind = moq_video::encode::Kind::Software; + let mut encoder = moq_video::encode::Encoder::new(&config).expect("openh264 encoder"); + let gray = vec![0x80u8; 320 * 240 * 4]; + let mut frames: Vec = Vec::new(); + for i in 0..5 { + frames.extend(encoder.encode_rgba(&gray, 320, 240, i == 0).unwrap()); + } + frames.extend(encoder.finish().unwrap()); + assert!(!frames.is_empty(), "encoder produced no frames"); + + let origin = id(moq_origin_create()); + let broadcast = id(moq_publish_create()); + + // The init's SPS/PPS only seed catalog metadata; avc3 frames carry their own + // inline parameter sets, so the decoder reads the true 320x240 from the wire. + let init = h264_init(); + let format = b"avc3"; + let media = id(unsafe { + moq_publish_media_ordered( + broadcast, + format.as_ptr() as *const c_char, + format.len(), + init.as_ptr(), + init.len(), + ) + }); + + let path = b"video-raw-test"; + let _publish = id(unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) }); + + let consume = request_broadcast(origin, path); + let catalog_cb = Callback::new(); + let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) }); + let catalog_id = id(catalog_cb.recv()); + + // Subscribe + decode before publishing frames so the keyframe group is delivered. + let output = moq_video_decoder_output { latency_max_ms: 10_000 }; + let frame_cb = Callback::new(); + let consumer = id(unsafe { moq_consume_video_raw(catalog_id, 0, &output, Some(channel_callback), frame_cb.ptr) }); + + for (i, frame) in frames.iter().enumerate() { + assert_eq!( + unsafe { moq_publish_media_frame(media, frame.as_ptr(), frame.len(), (i as u64) * 33_000) }, + 0 + ); + } + + // First decoded frame: packed I420 at the encoder resolution. + let frame_id = id(frame_cb.recv()); + let mut frame = moq_video_frame { + timestamp_us: 0, + width: 0, + height: 0, + data: std::ptr::null(), + data_size: 0, + }; + assert_eq!(unsafe { moq_consume_video_raw_frame(frame_id, &mut frame) }, 0); + assert_eq!(frame.width, 320); + assert_eq!(frame.height, 240); + assert_eq!(frame.data_size, 320 * 240 * 3 / 2, "tightly-packed I420"); + assert!(!frame.data.is_null()); + + assert_eq!(moq_consume_video_raw_frame_free(frame_id), 0); + assert_eq!(moq_consume_video_raw_close(consumer), 0); + + // Drain any other decoded frames already queued, then expect the terminal 0. + loop { + let code = frame_cb.recv(); + if code > 0 { + assert_eq!(moq_consume_video_raw_frame_free(id(code)), 0); + } else { + assert_eq!(code, 0, "raw video close delivers terminal 0"); + break; + } + } + assert_eq!(moq_consume_catalog_free(catalog_id), 0); + assert_eq!(moq_consume_catalog_close(catalog_task), 0); + // The publisher may emit more than one catalog snapshot (e.g. as the track's + // stats settle), so drain any extra snapshots before the terminal. + loop { + let code = catalog_cb.recv(); + if code > 0 { + assert_eq!(moq_consume_catalog_free(id(code)), 0); + } else { + assert_eq!(code, 0, "catalog close delivers terminal 0"); + break; + } + } + assert_eq!(moq_consume_close(consume), 0); + assert_eq!(moq_publish_media_close(media), 0); + assert_eq!(moq_publish_close(broadcast), 0); + assert_eq!(moq_origin_close(origin), 0); +} + #[test] fn multiple_frames_ordering() { let origin = id(moq_origin_create()); diff --git a/rs/libmoq/src/video.rs b/rs/libmoq/src/video.rs new file mode 100644 index 000000000..1c0ecb2dd --- /dev/null +++ b/rs/libmoq/src/video.rs @@ -0,0 +1,252 @@ +//! Native video decode via [`moq_video`]. +//! +//! The video counterpart to [`audio`](crate::audio)'s decoder: subscribe to an +//! H.264 track and hand back decoded raw frames, with the decode happening +//! inside the FFI boundary (VideoToolbox on macOS, openh264 elsewhere; no +//! ffmpeg). Sibling to `moq_consume_video_ordered`, which delivers the +//! still-encoded frames for a caller that brings its own decoder. +//! +//! Only H.264 is supported. A non-H.264 rendition fails the subscribe with a +//! terminal error on the callback. + +use std::ffi::c_void; +use std::time::Duration; + +use tokio::sync::oneshot; + +use crate::ffi::OnStatus; +use crate::{Error, Id, NonZeroSlab, State, ffi}; + +// ---- C-visible types ---- + +/// Decode-side configuration the caller passes to [`moq_consume_video_raw`]. +/// +/// Output is always tightly-packed I420 (see [`moq_video_frame`]); there is no +/// format/resolution knob yet. The struct exists so future options (a pixel +/// format, a target size) stay additive. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_video_decoder_output { + /// Upper bound on buffering before skipping a stalled group, in + /// milliseconds. Same congestion-control knob as + /// `moq_consume_video_ordered`'s `max_latency_ms`. 0 = skip aggressively + /// (the moq-mux default); set to your playout buffer for a softer skip. + pub latency_max_ms: u64, +} + +/// One decoded video frame: packed I420 plus a presentation timestamp. +/// +/// `data` is `width * height * 3 / 2` bytes: the Y plane (`width * height`), +/// then U, then V (`width/2 * height/2` each), no row padding. It's BT.601 +/// limited range. `width` and `height` are even. `data` is owned by the consume +/// slab and stays valid until the same id is released with +/// [`moq_consume_video_raw_frame_free`]. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_video_frame { + pub timestamp_us: u64, + pub width: u32, + pub height: u32, + pub data: *const u8, + pub data_size: usize, +} + +// ---- State extension (used internally by lib.rs) ---- + +/// Raw-video consume state: decoder tasks plus their buffered decoded frames. +#[derive(Default)] +pub struct Video { + consumer_tasks: NonZeroSlab>, + frames: NonZeroSlab, +} + +/// A spawned task entry: `close` signals shutdown, `callback` delivers status. +/// +/// Same lifetime contract as the audio decoder: the task delivers one final +/// terminal callback and then removes itself, so `user_data` stays valid until +/// that callback fires. `close` is an `Option` so `consume_close` can drop just +/// the sender without removing the entry. +struct VideoTaskEntry { + close: Option>, + callback: OnStatus, +} + +impl Video { + pub fn consume( + &mut self, + broadcast: &moq_net::BroadcastConsumer, + catalog: &hang::catalog::VideoConfig, + name: &str, + config: moq_video::decode::Config, + on_frame: OnStatus, + ) -> Result { + let broadcast = broadcast.clone(); + let catalog = catalog.clone(); + let name = name.to_string(); + + let channel = oneshot::channel(); + let entry = VideoTaskEntry { + close: Some(channel.0), + callback: on_frame, + }; + let id = self.consumer_tasks.insert(Some(entry))?; + + // `Consumer::new` subscribes (blocking on SUBSCRIBE_OK), so run it inside + // the task to keep this entrypoint non-blocking. + tokio::spawn(async move { + let res = async move { + let consumer = moq_video::decode::Consumer::new(&broadcast, &catalog, name, config).await?; + Self::run(on_frame, consumer, channel.1).await + } + .await; + + // Deliver one final terminal callback (code <= 0), then drop the entry. + // Pull it out from under the lock so the callback never runs while held. + let entry = State::lock().video.consumer_tasks.remove(id).flatten(); + if let Some(entry) = entry { + entry.callback.call(res); + } + }); + + Ok(id) + } + + async fn run( + callback: OnStatus, + mut consumer: moq_video::decode::Consumer, + mut close: oneshot::Receiver<()>, + ) -> Result<(), Error> { + loop { + // `biased` so a pending close always wins over a ready frame. + let frame = tokio::select! { + biased; + _ = &mut close => return Ok(()), + frame = consumer.read() => match frame? { + Some(frame) => frame, + None => return Ok(()), + }, + }; + + // Hold the lock only to buffer the frame; release it before the callback. + let frame_id = State::lock().video.frames.insert(frame)?; + callback.call(Ok(frame_id)); + } + } + + pub fn consume_close(&mut self, id: Id) -> Result<(), Error> { + // Signal shutdown; the task delivers a final callback and removes itself. + self.consumer_tasks + .get_mut(id) + .and_then(|entry| entry.as_mut()) + .ok_or(Error::TrackNotFound)? + .close + .take() + .ok_or(Error::TrackNotFound)?; + Ok(()) + } + + pub fn frame_info(&self, id: Id, dst: &mut moq_video_frame) -> Result<(), Error> { + let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?; + *dst = moq_video_frame { + timestamp_us: frame.timestamp_us, + width: frame.width, + height: frame.height, + data: frame.data.as_ptr(), + data_size: frame.data.len(), + }; + Ok(()) + } + + pub fn frame_free(&mut self, id: Id) -> Result<(), Error> { + self.frames.remove(id).ok_or(Error::FrameNotFound)?; + Ok(()) + } +} + +// ---- C entry points ---- + +/// Subscribe to a video track and decode it into raw I420 frames. +/// +/// The catalog `index` selects which video rendition to subscribe to, matching +/// the existing `moq_consume_video_ordered` selection model. Only H.264 is +/// supported; a non-H.264 rendition fails on the terminal callback. +/// +/// Returns a non-zero handle on success or a negative error code. +/// +/// `on_frame` is called with a positive frame id per decoded frame, then exactly +/// once more with a terminal code: `0` (closed cleanly) or a negative error. +/// After the terminal (`<= 0`) callback, `on_frame` is never called again and +/// `user_data` is never touched again, so release `user_data` there. The terminal +/// callback fires even after [`moq_consume_video_raw_close`]. +/// +/// # Safety +/// - `output` must point to a valid [`moq_video_decoder_output`]. +/// - `user_data` must stay valid until the terminal (`<= 0`) `on_frame` callback. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn moq_consume_video_raw( + catalog: u32, + index: u32, + output: *const moq_video_decoder_output, + on_frame: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let catalog = ffi::parse_id(catalog)?; + let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?; + + let mut config = moq_video::decode::Config::new(); + config.latency_max = if raw.latency_max_ms == 0 { + None + } else { + Some(Duration::from_millis(raw.latency_max_ms)) + }; + let on_frame = unsafe { OnStatus::new(user_data, on_frame) }; + + let mut state = State::lock(); + let (broadcast, video_cfg, name) = state.consume.video_rendition(catalog, index as usize)?; + + let State { video, .. } = &mut *state; + video.consume(&broadcast, &video_cfg, &name, config, on_frame) + }) +} + +/// Stop a video (raw) consumer's background task. +/// +/// Returns immediately: zero on success, or a negative code if already closed. +/// Does NOT free `user_data`; the on-frame callback still fires once more with a +/// terminal `0` (or a negative error), which is where `user_data` should be +/// released. Frame ids already delivered are likewise not freed; release each +/// with [`moq_consume_video_raw_frame_free`]. +#[unsafe(no_mangle)] +pub extern "C" fn moq_consume_video_raw_close(consumer: u32) -> i32 { + ffi::enter(move || { + let consumer = ffi::parse_id(consumer)?; + State::lock().video.consume_close(consumer) + }) +} + +/// Copy a delivered frame's metadata into `dst`. +/// +/// The written `dst->data` pointer remains valid until the same `id` is released +/// with [`moq_consume_video_raw_frame_free`]. +/// +/// # Safety +/// - `dst` must point to a writable [`moq_video_frame`]. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn moq_consume_video_raw_frame(id: u32, dst: *mut moq_video_frame) -> i32 { + ffi::enter(move || { + let id = ffi::parse_id(id)?; + let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?; + State::lock().video.frame_info(id, dst) + }) +} + +/// Free a frame previously delivered through the consume callback. Required for +/// every delivered frame id; closing the parent consumer is not enough. +#[unsafe(no_mangle)] +pub extern "C" fn moq_consume_video_raw_frame_free(id: u32) -> i32 { + ffi::enter(move || { + let id = ffi::parse_id(id)?; + State::lock().video.frame_free(id) + }) +} diff --git a/rs/moq-video/CHANGELOG.md b/rs/moq-video/CHANGELOG.md index ba9671a29..a4f3d1658 100644 --- a/rs/moq-video/CHANGELOG.md +++ b/rs/moq-video/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Native H.264 decode: a `decode` module mirroring `encode`, with a + `decode::Consumer` (the counterpart to `moq-audio`'s `AudioConsumer`) that + subscribes to an H.264 track and returns raw I420 frames. Backends are + VideoToolbox (macOS) and openh264 (portable software fallback); no ffmpeg. + ## [0.0.4](https://github.com/moq-dev/moq/compare/moq-video-v0.0.3...moq-video-v0.0.4) - 2026-06-16 ### Other diff --git a/rs/moq-video/Cargo.toml b/rs/moq-video/Cargo.toml index 3ec9a3a8d..6e287cc3f 100644 --- a/rs/moq-video/Cargo.toml +++ b/rs/moq-video/Cargo.toml @@ -39,6 +39,8 @@ vaapi = ["dep:moq-vaapi"] anyhow = "1" bytes = "1" cudarc = { version = "0.19", optional = true, default-features = false, features = ["driver"] } +# Catalog types (VideoConfig / VideoCodec) the decode consumer reads. +hang = { workspace = true } moq-mux = { workspace = true } moq-net = { workspace = true } # VAAPI H.264 encoder, gated behind the `vaapi` feature + a linux cfg. Listed here diff --git a/rs/moq-video/src/decode/backend/mod.rs b/rs/moq-video/src/decode/backend/mod.rs new file mode 100644 index 000000000..42f1557e8 --- /dev/null +++ b/rs/moq-video/src/decode/backend/mod.rs @@ -0,0 +1,81 @@ +//! Pluggable H.264 decoder backends. +//! +//! The mirror of [`encode::backend`](crate::encode). [`Backend`] is the seam +//! between the access-unit prep (avc1 -> Annex-B conversion + keyframe gating, +//! owned by [`Decoder`](super::Decoder)) and the codec itself. Every backend +//! takes one **Annex-B** H.264 access unit (SPS/PPS inline ahead of each +//! keyframe) and returns zero or more decoded [`I420`] frames. +//! +//! [`open`] picks the best backend for a [`Kind`](super::Kind), trying hardware +//! candidates (platform-gated) before the openh264 software fallback, exactly +//! like the encode side. + +use bytes::Bytes; + +use super::decoder::Kind; +use crate::Error; +use crate::frame::I420; + +mod openh264; + +#[cfg(target_os = "macos")] +mod videotoolbox; + +/// An opened H.264 decoder. Feed it Annex-B access units in decode order; get +/// back zero or more raw I420 frames (zero while the decoder is still buffering, +/// e.g. before the first keyframe's parameter sets). +pub(crate) trait Backend: Send { + /// Decode one Annex-B access unit. `keyframe` marks an IDR (parameter sets + /// are inline ahead of it). Takes an owned [`Bytes`] so a backend can split + /// out NAL slices without copying. + fn decode(&mut self, access_unit: Bytes, keyframe: bool) -> Result, Error>; + + /// The decoder name in use, e.g. `"videotoolbox"` (for logging). + fn name(&self) -> &str; +} + +/// A backend constructor: name plus an opener that tries to start it. +struct Candidate { + name: &'static str, + open: fn() -> Result, Error>, +} + +/// Hardware backends, in priority order. Platform-gated so only the ones that +/// could plausibly work on this target are even listed. +const HARDWARE: &[Candidate] = &[ + #[cfg(target_os = "macos")] + Candidate { + name: videotoolbox::NAME, + open: videotoolbox::VideoToolbox::open, + }, +]; + +const SOFTWARE: Candidate = Candidate { + name: openh264::NAME, + open: openh264::Openh264::open, +}; + +/// Open the best decoder for `kind`, trying candidates in priority order and +/// falling back until one succeeds. +pub(crate) fn open(kind: &Kind) -> Result, Error> { + let candidates: Vec<&Candidate> = match kind { + Kind::Auto => HARDWARE.iter().chain(std::iter::once(&SOFTWARE)).collect(), + Kind::Hardware => HARDWARE.iter().collect(), + Kind::Software => vec![&SOFTWARE], + Kind::Named(name) => { + let all = HARDWARE.iter().chain(std::iter::once(&SOFTWARE)); + all.filter(|c| c.name == name).collect() + } + }; + + let mut tried = Vec::new(); + for candidate in candidates { + tried.push(candidate.name); + match (candidate.open)() { + Ok(backend) => return Ok(backend), + Err(e) => tracing::debug!(decoder = candidate.name, error = %e, "decoder unavailable, trying next"), + } + } + + Err(Error::NoDecoder(tried.join(", "))) +} diff --git a/rs/moq-video/src/decode/backend/openh264.rs b/rs/moq-video/src/decode/backend/openh264.rs new file mode 100644 index 000000000..e0f336943 --- /dev/null +++ b/rs/moq-video/src/decode/backend/openh264.rs @@ -0,0 +1,68 @@ +//! Software H.264 decode backend via Cisco's openh264 (vendored, statically linked). +//! +//! The portable fallback when no hardware decoder is available, and the only +//! backend on Linux/Windows for now. Accepts Annex-B access units (SPS/PPS +//! inline ahead of each keyframe) and returns packed I420. + +use bytes::Bytes; +use openh264::OpenH264API; +use openh264::decoder::{Decoder, DecoderConfig}; +use openh264::formats::YUVSource; + +use super::Backend; +use crate::Error; +use crate::frame::I420; + +pub(crate) const NAME: &str = "openh264"; + +pub(crate) struct Openh264 { + decoder: Decoder, +} + +impl Openh264 { + pub(crate) fn open() -> Result, Error> { + let decoder = Decoder::with_api_config(OpenH264API::from_source(), DecoderConfig::new()) + .map_err(|e| Error::Codec(anyhow::anyhow!("openh264 decoder init: {e}")))?; + + tracing::info!(decoder = NAME, "opened H.264 decoder"); + Ok(Box::new(Self { decoder })) + } +} + +impl Backend for Openh264 { + fn decode(&mut self, access_unit: Bytes, _keyframe: bool) -> Result, Error> { + let decoded = self + .decoder + .decode(&access_unit) + .map_err(|e| Error::Codec(anyhow::anyhow!("openh264 decode: {e}")))?; + + // `None` means the decoder buffered the access unit but has no picture + // yet (e.g. parameter sets only, or it needs more data). + let Some(yuv) = decoded else { + return Ok(Vec::new()); + }; + + let (width, height) = yuv.dimensions(); + if width % 2 != 0 || height % 2 != 0 { + return Err(Error::Codec(anyhow::anyhow!( + "decoded frame has odd dimensions {width}x{height}, expected 4:2:0" + ))); + } + let (y_stride, uv_stride, _) = yuv.strides(); + + let frame = I420::from_planes( + yuv.y(), + yuv.u(), + yuv.v(), + y_stride, + uv_stride, + width as u32, + height as u32, + ); + Ok(vec![frame]) + } + + fn name(&self) -> &str { + NAME + } +} diff --git a/rs/moq-video/src/decode/backend/videotoolbox.rs b/rs/moq-video/src/decode/backend/videotoolbox.rs new file mode 100644 index 000000000..bb15fa7ad --- /dev/null +++ b/rs/moq-video/src/decode/backend/videotoolbox.rs @@ -0,0 +1,344 @@ +//! Hardware H.264 decode backend via Apple VideoToolbox (`VTDecompressionSession`). +//! +//! The inverse of the encode VideoToolbox backend. We receive Annex-B access +//! units (SPS/PPS inline ahead of each keyframe), so we: +//! - pull SPS/PPS out of the stream and build a `CMVideoFormatDescription`, +//! (re)creating the decompression session whenever the parameter sets change; +//! - repackage the slice NALs as AVCC (4-byte length-prefixed) in a +//! `CMSampleBuffer`, the form VideoToolbox decodes; +//! - request NV12 output and download it to packed I420 (reusing the same +//! `CVPixelBuffer` download as the capture path). +//! +//! Hand-written on the raw `objc2-video-toolbox` bindings; there's no +//! higher-level crate we trust. Decoding is synchronous (no async flag), so the +//! output callback fires from within `decode_frame` before it returns, which is +//! what lets the `!Send` CoreFoundation handles stay thread-confined. + +use std::ffi::c_void; +use std::ptr::{self, NonNull}; + +use bytes::Bytes; +use moq_mux::codec::annexb::NalIterator; +use objc2_core_foundation::{CFDictionary, CFNumber, CFNumberType, CFRetained, CFString}; +use objc2_core_media::{ + CMBlockBuffer, CMFormatDescription, CMSampleBuffer, CMTime, CMVideoFormatDescriptionCreateFromH264ParameterSets, + kCMBlockBufferAssureMemoryNowFlag, +}; +use objc2_core_video::{ + CVImageBuffer, CVPixelBuffer, CVPixelBufferGetHeight, CVPixelBufferGetWidth, kCVPixelBufferPixelFormatTypeKey, + kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange, +}; +use objc2_video_toolbox::{ + VTDecodeFrameFlags, VTDecodeInfoFlags, VTDecompressionOutputCallbackRecord, VTDecompressionSession, +}; + +use super::Backend; +use crate::Error; +use crate::frame::I420; + +pub(crate) const NAME: &str = "videotoolbox"; + +const NAL_TYPE_SPS: u8 = 7; +const NAL_TYPE_PPS: u8 = 8; + +/// Where the C output callback drops decoded frames, drained after each +/// `decode_frame`. Boxed so its address is a stable refcon for the session. +#[derive(Default)] +struct Sink { + frames: Vec, + error: Option, +} + +pub(crate) struct VideoToolbox { + /// Built lazily once the first SPS+PPS arrive, rebuilt if they change. + session: Option>, + /// Format description the current session + samples use (kept in lockstep + /// with `session`). + format: Option>, + /// Latest SPS/PPS seen, persisted across access units (a delta frame carries + /// neither). `built_from` records which pair the live session was built from, + /// so a mid-stream parameter-set change triggers a rebuild. + sps: Option, + pps: Option, + built_from: Option<(Bytes, Bytes)>, + sink: Box, +} + +// The session and its CoreFoundation handles are only ever touched from the one +// decode task (the consumer's `read` loop, single-threaded per consumer). +unsafe impl Send for VideoToolbox {} + +impl VideoToolbox { + pub(crate) fn open() -> Result, Error> { + tracing::info!(decoder = NAME, "opened H.264 decoder"); + Ok(Box::new(Self { + session: None, + format: None, + sps: None, + pps: None, + built_from: None, + sink: Box::new(Sink::default()), + })) + } + + /// (Re)build the decompression session when the parameter sets first appear + /// or change. Returns `false` if we still don't have both SPS and PPS. + fn ensure_session(&mut self, sps: Option, pps: Option) -> Result { + if let Some(sps) = sps { + self.sps = Some(sps); + } + if let Some(pps) = pps { + self.pps = Some(pps); + } + let (Some(sps), Some(pps)) = (self.sps.clone(), self.pps.clone()) else { + return Ok(false); + }; + + // Reuse the existing session if it was built from these exact sets. + if self.session.is_some() && self.built_from.as_ref() == Some(&(sps.clone(), pps.clone())) { + return Ok(true); + } + + let format = create_format_description(&sps, &pps)?; + let attrs = nv12_output_attributes()?; + + let refcon = (&mut *self.sink as *mut Sink).cast::(); + let record = VTDecompressionOutputCallbackRecord { + decompressionOutputCallback: Some(output_callback), + decompressionOutputRefCon: refcon, + }; + + let mut session_ptr: *mut VTDecompressionSession = ptr::null_mut(); + let status = unsafe { + VTDecompressionSession::create( + None, + &format, + None, + Some(&attrs), + &record, + NonNull::new(&mut session_ptr).unwrap(), + ) + }; + let session = NonNull::new(session_ptr) + .filter(|_| status == 0) + .map(|p| unsafe { CFRetained::from_raw(p) }) + .ok_or_else(|| Error::Codec(anyhow::anyhow!("VTDecompressionSessionCreate failed: {status}")))?; + + self.session = Some(session); + self.format = Some(format); + self.built_from = Some((sps, pps)); + Ok(true) + } +} + +impl Backend for VideoToolbox { + fn decode(&mut self, access_unit: Bytes, _keyframe: bool) -> Result, Error> { + // Split the Annex-B access unit, pull out any parameter sets, and gather + // the VCL slices into AVCC (4-byte length-prefixed) form. `NalIterator` + // yields the parameter-set NALs as zero-copy `Bytes` (sub-slices of + // `access_unit`), so SPS/PPS need no copy. + let mut sps = None; + let mut pps = None; + let mut avcc: Vec = Vec::with_capacity(access_unit.len()); + let mut handle = |nal: Bytes| match nal_unit_type(&nal) { + NAL_TYPE_SPS => sps = Some(nal), + NAL_TYPE_PPS => pps = Some(nal), + _ => { + avcc.extend_from_slice(&(nal.len() as u32).to_be_bytes()); + avcc.extend_from_slice(&nal); + } + }; + + // `NalIterator` yields every NAL except the last (it has no trailing start + // code); `flush` returns that final one. + let mut buf = access_unit; + let mut nals = NalIterator::new(&mut buf); + for nal in nals.by_ref() { + handle(nal.map_err(moq_mux::Error::from)?); + } + if let Some(nal) = nals.flush().map_err(moq_mux::Error::from)? { + handle(nal); + } + + if !self.ensure_session(sps, pps)? { + // No parameter sets yet (e.g. a delta frame before the first keyframe). + return Ok(Vec::new()); + } + if avcc.is_empty() { + // Parameter-set-only access unit: nothing to decode. + return Ok(Vec::new()); + } + + let format = self.format.as_ref().expect("format ensured above"); + let sample = make_sample_buffer(&avcc, format)?; + let session = self.session.as_ref().expect("session ensured above"); + + self.sink.frames.clear(); + self.sink.error = None; + + let status = unsafe { session.decode_frame(&sample, VTDecodeFrameFlags(0), ptr::null_mut(), ptr::null_mut()) }; + if status != 0 { + return Err(Error::Codec(anyhow::anyhow!( + "VTDecompressionSessionDecodeFrame failed: {status}" + ))); + } + + if let Some(error) = self.sink.error.take() { + return Err(Error::Codec(anyhow::anyhow!( + "VideoToolbox decode callback failed: {error}" + ))); + } + Ok(std::mem::take(&mut self.sink.frames)) + } + + fn name(&self) -> &str { + NAME + } +} + +/// C callback VideoToolbox invokes (synchronously, from `decode_frame`) for each +/// decoded frame. Downloads the NV12 pixel buffer to packed I420. +unsafe extern "C-unwind" fn output_callback( + refcon: *mut c_void, + _source_frame_refcon: *mut c_void, + status: i32, + _flags: VTDecodeInfoFlags, + image_buffer: *mut CVImageBuffer, + _pts: CMTime, + _duration: CMTime, +) { + let sink = unsafe { &mut *(refcon as *mut Sink) }; + if status != 0 { + sink.error = Some(format!("decode status {status}")); + return; + } + let Some(image) = NonNull::new(image_buffer) else { + return; // dropped frame + }; + + // The decoded image buffer is a CVPixelBuffer; retain it (the callback only + // borrows) and download NV12 -> I420 with the shared capture-path code. + let pixel_buffer = unsafe { CFRetained::retain(image.cast::()) }; + let width = CVPixelBufferGetWidth(&pixel_buffer) as u32; + let height = CVPixelBufferGetHeight(&pixel_buffer) as u32; + let surface = crate::frame::macos::Surface::new(pixel_buffer, width, height); + + match surface.download_i420() { + Ok(i420) => sink.frames.push(i420), + Err(e) => sink.error = Some(e.to_string()), + } +} + +/// Build a `CMVideoFormatDescription` from raw SPS and PPS NAL units. +fn create_format_description(sps: &[u8], pps: &[u8]) -> Result, Error> { + let pointers: [NonNull; 2] = [ + NonNull::new(sps.as_ptr() as *mut u8).ok_or_else(|| Error::Codec(anyhow::anyhow!("empty SPS")))?, + NonNull::new(pps.as_ptr() as *mut u8).ok_or_else(|| Error::Codec(anyhow::anyhow!("empty PPS")))?, + ]; + let sizes: [usize; 2] = [sps.len(), pps.len()]; + + let mut format_ptr: *const CMFormatDescription = ptr::null(); + let status = unsafe { + CMVideoFormatDescriptionCreateFromH264ParameterSets( + None, + 2, + NonNull::new(pointers.as_ptr() as *mut NonNull).unwrap(), + NonNull::new(sizes.as_ptr() as *mut usize).unwrap(), + 4, // 4-byte NAL length prefixes (AVCC), matching make_sample_buffer + NonNull::new(&mut format_ptr).unwrap(), + ) + }; + NonNull::new(format_ptr as *mut CMFormatDescription) + .filter(|_| status == 0) + .map(|p| unsafe { CFRetained::from_raw(p) }) + .ok_or_else(|| { + Error::Codec(anyhow::anyhow!( + "CMVideoFormatDescriptionCreateFromH264ParameterSets failed: {status}" + )) + }) +} + +/// Wrap an AVCC (length-prefixed) access unit in a `CMSampleBuffer` for decode. +/// The block buffer owns a fresh copy of the bytes, so the sample outlives `avcc`. +fn make_sample_buffer(avcc: &[u8], format: &CMFormatDescription) -> Result, Error> { + let mut block_ptr: *mut CMBlockBuffer = ptr::null_mut(); + let status = unsafe { + CMBlockBuffer::create_with_memory_block( + None, + ptr::null_mut(), + avcc.len(), + None, + ptr::null(), + 0, + avcc.len(), + kCMBlockBufferAssureMemoryNowFlag, + NonNull::new(&mut block_ptr).unwrap(), + ) + }; + let block = NonNull::new(block_ptr) + .filter(|_| status == 0) + .map(|p| unsafe { CFRetained::from_raw(p) }) + .ok_or_else(|| Error::Codec(anyhow::anyhow!("CMBlockBufferCreateWithMemoryBlock failed: {status}")))?; + + let status = unsafe { + CMBlockBuffer::replace_data_bytes( + NonNull::new(avcc.as_ptr() as *mut c_void).unwrap(), + &block, + 0, + avcc.len(), + ) + }; + if status != 0 { + return Err(Error::Codec(anyhow::anyhow!( + "CMBlockBufferReplaceDataBytes failed: {status}" + ))); + } + + let sizes: [usize; 1] = [avcc.len()]; + let mut sample_ptr: *mut CMSampleBuffer = ptr::null_mut(); + let status = unsafe { + CMSampleBuffer::create_ready( + None, + Some(&block), + Some(format), + 1, + 0, + ptr::null(), + 1, + sizes.as_ptr(), + NonNull::new(&mut sample_ptr).unwrap(), + ) + }; + NonNull::new(sample_ptr) + .filter(|_| status == 0) + .map(|p| unsafe { CFRetained::from_raw(p) }) + .ok_or_else(|| Error::Codec(anyhow::anyhow!("CMSampleBufferCreateReady failed: {status}"))) +} + +/// Build the destination attributes requesting NV12 output, so the download path +/// (which expects NV12) always gets it regardless of the decoder's native format. +fn nv12_output_attributes() -> Result, Error> { + let format = kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange as i32; + let number = unsafe { CFNumber::new(None, CFNumberType::SInt32Type, &format as *const i32 as *const c_void) } + .ok_or_else(|| Error::Codec(anyhow::anyhow!("failed to build CFNumber")))?; + + let key = (unsafe { kCVPixelBufferPixelFormatTypeKey } as *const CFString).cast::(); + let value = (number.as_ref() as *const CFNumber).cast::(); + let mut keys: [*const c_void; 1] = [key]; + let mut values: [*const c_void; 1] = [value]; + unsafe { + CFDictionary::new( + None, + keys.as_mut_ptr(), + values.as_mut_ptr(), + 1, + &objc2_core_foundation::kCFTypeDictionaryKeyCallBacks, + &objc2_core_foundation::kCFTypeDictionaryValueCallBacks, + ) + } + .ok_or_else(|| Error::Codec(anyhow::anyhow!("failed to build NV12 attributes dictionary"))) +} + +fn nal_unit_type(nal: &[u8]) -> u8 { + nal.first().map_or(0, |b| b & 0x1f) +} diff --git a/rs/moq-video/src/decode/consumer.rs b/rs/moq-video/src/decode/consumer.rs new file mode 100644 index 000000000..c24efcb67 --- /dev/null +++ b/rs/moq-video/src/decode/consumer.rs @@ -0,0 +1,81 @@ +//! Subscribe to an encoded H.264 track and emit raw I420 frames. + +use std::collections::VecDeque; + +use bytes::Bytes; +use hang::catalog::VideoConfig; + +use super::Frame; +use super::decoder::{Config, Decoder}; +use crate::Error; + +/// Subscribe to a moq-mux H.264 track and emit decoded I420. +/// +/// The codec/backend are fixed at construction; [`read`](Self::read) returns +/// plain [`Frame`]s. The direct mirror of `moq_audio::AudioConsumer`. +pub struct Consumer { + decoder: Decoder, + track: moq_mux::container::Consumer, + /// Frames a single access unit decoded to but `read` hasn't returned yet. + /// One AU yields one frame in the low-delay path, but a backend may hand back + /// more, so we buffer to keep `read` one-frame-per-call. + pending: VecDeque, +} + +impl Consumer { + /// Subscribe to `name` in `broadcast`, decoding it per the catalog entry. + /// Errors if the rendition isn't H.264. + pub async fn new( + broadcast: &moq_net::BroadcastConsumer, + catalog: &VideoConfig, + name: impl Into, + config: Config, + ) -> Result { + let decoder = Decoder::new(catalog, &config.kind)?; + + let name = name.into(); + let track = broadcast.track(&name)?.subscribe(None)?.await?; + let mut track = moq_mux::container::Consumer::new(track, moq_mux::container::legacy::Wire); + if let Some(latency) = config.latency_max { + track = track.with_latency(latency); + } + + Ok(Self { + decoder, + track, + pending: VecDeque::new(), + }) + } + + /// The decoder backend name in use, e.g. `"videotoolbox"` or `"openh264"`. + pub fn name(&self) -> &str { + self.decoder.name() + } + + /// Read the next decoded I420 frame, or `None` when the track ends. + pub async fn read(&mut self) -> Result, Error> { + loop { + if let Some(frame) = self.pending.pop_front() { + return Ok(Some(frame)); + } + + let Some(mux_frame) = self.track.read().await? else { + return Ok(None); + }; + let timestamp_us: u64 = mux_frame + .timestamp + .as_micros() + .try_into() + .map_err(|_| moq_net::TimeOverflow)?; + + for i420 in self.decoder.decode(&mux_frame.payload, mux_frame.keyframe)? { + self.pending.push_back(Frame { + timestamp_us, + width: i420.width, + height: i420.height, + data: Bytes::from(i420.data), + }); + } + } + } +} diff --git a/rs/moq-video/src/decode/decoder.rs b/rs/moq-video/src/decode/decoder.rs new file mode 100644 index 000000000..6efb4c253 --- /dev/null +++ b/rs/moq-video/src/decode/decoder.rs @@ -0,0 +1,190 @@ +//! H.264 decoder front end. +//! +//! Prepares each container frame for a [`Backend`](super::backend::Backend): +//! converts avc1 (length-prefixed, out-of-band avcC) payloads to Annex-B and +//! injects the SPS/PPS ahead of keyframes, leaving avc3 (already Annex-B inline) +//! payloads untouched. Gates output until the first keyframe so the backend +//! never sees a delta frame it can't decode. + +use std::time::Duration; + +use bytes::Bytes; +use hang::catalog::{VideoCodec, VideoConfig}; +use moq_mux::codec::{annexb, h264}; + +use super::backend::{self, Backend}; +use crate::Error; +use crate::frame::I420; + +/// Which decoder implementation to use. `#[non_exhaustive]` so new selection +/// strategies can be added without breaking external `match`es. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[non_exhaustive] +pub enum Kind { + /// Prefer a platform hardware decoder, fall back to software. + #[default] + Auto, + /// Hardware only; error if none is available. + Hardware, + /// Software (openh264) only. + Software, + /// A specific backend by name, e.g. `"videotoolbox"`, `"openh264"`. + Named(String), +} + +/// Decoder configuration. +/// +/// `#[non_exhaustive]`: build via [`Config::new`] (or `default()`) and set the +/// optional fields, so future knobs don't break callers. +#[derive(Clone, Debug, Default)] +#[non_exhaustive] +pub struct Config { + /// Which backend to use. + pub kind: Kind, + /// Upper bound on buffering before a stalled group is skipped. `None` uses + /// the moq-mux default (skip aggressively); set it to your playout buffer for + /// a softer skip. Forwarded to the container consumer's `with_latency`. + pub latency_max: Option, +} + +impl Config { + /// A default config: automatic backend selection, default latency. + pub fn new() -> Self { + Self::default() + } +} + +/// How to turn a container payload into an Annex-B access unit for the backend. +enum Conversion { + /// avc3: the payload is already Annex-B with SPS/PPS inline. Pass through. + Annexb, + /// avc1: length-prefixed NALs with the avcC out-of-band. Replace the length + /// prefixes with start codes and prepend `keyframe_prefix` (SPS/PPS from the + /// avcC) ahead of every keyframe. + Avc1 { length_size: usize, keyframe_prefix: Bytes }, +} + +/// Drives a [`Backend`] from container frames. +pub(crate) struct Decoder { + backend: Box, + conversion: Conversion, + got_keyframe: bool, +} + +impl Decoder { + /// Build a decoder for the catalog's video config. Errors if the codec is not + /// H.264 (the only codec the native backends support). + pub(crate) fn new(catalog: &VideoConfig, kind: &Kind) -> Result { + let VideoCodec::H264(h264) = &catalog.codec else { + return Err(Error::UnsupportedCodec(catalog.codec.to_string())); + }; + + let conversion = if h264.inline { + Conversion::Annexb + } else { + let avcc = catalog + .description + .as_ref() + .ok_or_else(|| Error::Codec(anyhow::anyhow!("avc1 H.264 track is missing its avcC description")))?; + let params = h264::parse_avcc_param_sets(avcc).map_err(moq_mux::Error::from)?; + let keyframe_prefix = annexb::build_prefix(params.sps.iter().chain(params.pps.iter())); + Conversion::Avc1 { + length_size: params.length_size, + keyframe_prefix, + } + }; + + let backend = backend::open(kind)?; + tracing::debug!(decoder = backend.name(), "opened video decoder"); + Ok(Self { + backend, + conversion, + got_keyframe: false, + }) + } + + /// The decoder backend name in use, e.g. `"videotoolbox"`. + pub(crate) fn name(&self) -> &str { + self.backend.name() + } + + /// Decode one container frame, returning zero or more raw I420 frames. + pub(crate) fn decode(&mut self, payload: &Bytes, keyframe: bool) -> Result, Error> { + // Wait for the first keyframe: a decoder started mid-GOP can't decode + // delta frames, and the parameter sets ride along with the keyframe. + if !self.got_keyframe { + if !keyframe { + return Ok(Vec::new()); + } + self.got_keyframe = true; + } + + let annexb = match &self.conversion { + // Cheap refcount bump; the backend splits NAL slices off this buffer. + Conversion::Annexb => payload.clone(), + Conversion::Avc1 { + length_size, + keyframe_prefix, + } => { + let prefix = keyframe.then(|| keyframe_prefix.as_ref()); + annexb::from_length_prefixed(payload, *length_size, prefix).map_err(moq_mux::Error::from)? + } + }; + + self.backend.decode(annexb, keyframe) + } +} + +#[cfg(test)] +mod tests { + use super::backend; + use crate::encode::{Config as EncodeConfig, Encoder, Kind as EncodeKind}; + + /// A mid-gray RGBA frame: encodable without a camera. + fn gray_rgba(width: u32, height: u32) -> Vec { + vec![0x80u8; width as usize * height as usize * 4] + } + + /// Encode synthetic frames to Annex-B, decode them back, and assert the + /// backend hands us a correctly-sized I420 picture. Exercises the whole + /// software path (openh264 encode -> openh264 decode), keyframe gating + /// included (the first packet is an IDR with inline SPS/PPS). + fn round_trip(decode_kind: &super::Kind, expect_name: &str) { + let mut encoder = Encoder::new(&EncodeConfig { + kind: EncodeKind::Software, + ..EncodeConfig::new(320, 240, 30) + }) + .expect("openh264 encoder"); + + let mut decoder = backend::open(decode_kind).expect("decoder available"); + assert_eq!(decoder.name(), expect_name); + + let frame = gray_rgba(320, 240); + let mut decoded = Vec::new(); + for i in 0..10 { + let keyframe = i == 0; + for packet in encoder.encode_rgba(&frame, 320, 240, keyframe).unwrap() { + decoded.extend(decoder.decode(packet, keyframe).unwrap()); + } + } + + assert!(!decoded.is_empty(), "decoder produced no frames"); + for i420 in &decoded { + assert_eq!(i420.width, 320); + assert_eq!(i420.height, 240); + // Tightly-packed I420: luma + two quarter-size chroma planes. + assert_eq!(i420.data.len(), 320 * 240 * 3 / 2); + } + } + + #[test] + fn openh264_round_trip() { + round_trip(&super::Kind::Software, "openh264"); + } + + #[cfg(target_os = "macos")] + #[test] + fn videotoolbox_round_trip() { + round_trip(&super::Kind::Named("videotoolbox".into()), "videotoolbox"); + } +} diff --git a/rs/moq-video/src/decode/mod.rs b/rs/moq-video/src/decode/mod.rs new file mode 100644 index 000000000..594edaa75 --- /dev/null +++ b/rs/moq-video/src/decode/mod.rs @@ -0,0 +1,35 @@ +//! Subscribe to an H.264 track and decode it to raw frames. +//! +//! The decode counterpart to [`encode`](crate::encode), and the mirror of +//! `moq-audio`'s `AudioConsumer`. [`Consumer`] subscribes to a moq-mux H.264 +//! track and hands back decoded [`Frame`]s; a native backend does the work +//! (VideoToolbox on macOS, openh264 everywhere as the software fallback). +//! +//! Only H.264 is supported: it's symmetric with what [`encode`](crate::encode) +//! produces. A non-H.264 rendition yields [`Error::UnsupportedCodec`](crate::Error). + +use bytes::Bytes; + +mod backend; +mod consumer; +mod decoder; + +pub use consumer::Consumer; +pub use decoder::{Config, Kind}; + +/// A decoded raw video frame in tightly-packed I420 (YUV 4:2:0), BT.601 limited +/// range (studio swing, what H.264 carries by default). +/// +/// `data` holds the three planes contiguously: Y (`width * height` bytes), then +/// U, then V (`width/2 * height/2` bytes each), no row padding. `width` and +/// `height` are even. +pub struct Frame { + /// Presentation timestamp in microseconds (from the container). + pub timestamp_us: u64, + /// Frame width in pixels (even). + pub width: u32, + /// Frame height in pixels (even). + pub height: u32, + /// Packed I420 plane data (Y, then U, then V). + pub data: Bytes, +} diff --git a/rs/moq-video/src/encode/mod.rs b/rs/moq-video/src/encode/mod.rs index 435dbac08..e48c528a4 100644 --- a/rs/moq-video/src/encode/mod.rs +++ b/rs/moq-video/src/encode/mod.rs @@ -11,8 +11,8 @@ //! - [`Producer`] alone publishes packets you already encoded. //! //! [`Options`] / [`Kind`] / [`Config`] configure them. The decode/consume -//! counterpart (mirror of `moq-audio`'s consumer) will land in a sibling -//! `decode` module. +//! counterpart (mirror of `moq-audio`'s consumer) lives in the sibling +//! [`decode`](crate::decode) module. mod backend; mod encoder; diff --git a/rs/moq-video/src/error.rs b/rs/moq-video/src/error.rs index 1135bd772..33c5bc810 100644 --- a/rs/moq-video/src/error.rs +++ b/rs/moq-video/src/error.rs @@ -7,6 +7,15 @@ pub enum Error { #[error("no usable video encoder found (tried: {0})")] NoEncoder(String), + /// No decoder matching the requested hardware preference could be opened + /// (none compiled in, or none available on this machine). + #[error("no usable H.264 decoder found (tried: {0})")] + NoDecoder(String), + + /// A track's codec is not H.264, the only codec the native decoders support. + #[error("unsupported codec for native decode: {0} (only H.264 is supported)")] + UnsupportedCodec(String), + /// The configured framerate was zero (would divide by zero / produce a /// degenerate codec time base). #[error("invalid framerate: {0} (must be non-zero)")] diff --git a/rs/moq-video/src/frame.rs b/rs/moq-video/src/frame.rs index 5bf441cb4..c42269c03 100644 --- a/rs/moq-video/src/frame.rs +++ b/rs/moq-video/src/frame.rs @@ -97,6 +97,38 @@ impl I420 { Ok(Self::pack(&planar, width, height)) } + /// Pack strided Y/U/V planes (4:2:0, full-size luma, half-size chroma) into a + /// tightly-packed I420 buffer. `y_stride` / `uv_stride` are the source row + /// strides, which a decoder may pad wider than the visible width. Used by the + /// software H.264 decode backend, whose `DecodedYUV` exposes strided planes. + /// Width and height must be even (4:2:0 chroma). + pub(crate) fn from_planes( + y: &[u8], + u: &[u8], + v: &[u8], + y_stride: usize, + uv_stride: usize, + width: u32, + height: u32, + ) -> Self { + let (w, h) = (width as usize, height as usize); + let (cw, ch) = (w / 2, h / 2); + + let mut data = vec![0u8; Self::len(width, height)]; + let (luma, chroma) = data.split_at_mut(w * h); + let (u_dst, v_dst) = chroma.split_at_mut(cw * ch); + + for row in 0..h { + luma[row * w..row * w + w].copy_from_slice(&y[row * y_stride..row * y_stride + w]); + } + for row in 0..ch { + u_dst[row * cw..row * cw + cw].copy_from_slice(&u[row * uv_stride..row * uv_stride + cw]); + v_dst[row * cw..row * cw + cw].copy_from_slice(&v[row * uv_stride..row * uv_stride + cw]); + } + + Self { width, height, data } + } + /// Convert tightly-packed RGB (`width * height * 3` bytes) to I420, BT.601 /// limited range. Used for MJPEG capture (Linux V4L2), which decodes to RGB. #[cfg(target_os = "linux")] diff --git a/rs/moq-video/src/lib.rs b/rs/moq-video/src/lib.rs index a28f544f3..1412a8eb3 100644 --- a/rs/moq-video/src/lib.rs +++ b/rs/moq-video/src/lib.rs @@ -17,20 +17,22 @@ //! front, but the camera opens only while a subscriber is watching and is //! released when the last one leaves. //! - [`encode::Producer`] publishes packets you encoded yourself. -//! -//! The decode/consume side (the mirror of `moq-audio`'s `AudioConsumer`) is -//! not implemented yet; native subscribers can keep using `moq_mux` directly. +//! - [`decode`] subscribes to an H.264 track and decodes it to raw I420 frames +//! with a native backend (VideoToolbox / openh264). [`decode::Consumer`] is the +//! mirror of `moq-audio`'s `AudioConsumer`. //! //! ## API stability //! //! The public API is codec-agnostic: no public type, signature, or error //! variant names a backend (openh264 / VideoToolbox / NVENC) or a capture -//! implementation. [`encode::Encoder`] takes raw RGBA bytes, and the camera -//! capture path stays internal. So swapping or bumping any backend crate is not -//! a breaking change for consumers. Config structs are `#[non_exhaustive]`: -//! build them via `default()`/`new()` and set fields, so new options stay additive. +//! implementation. [`encode::Encoder`] takes raw RGBA bytes, [`decode::Consumer`] +//! returns raw I420, and the camera capture path stays internal. So swapping or +//! bumping any backend crate is not a breaking change for consumers. Config +//! structs are `#[non_exhaustive]`: build them via `default()`/`new()` and set +//! fields, so new options stay additive. pub mod capture; +pub mod decode; pub mod encode; mod error;