Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/frame-metadata-user-data-addition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: minor
livekit-ffi: minor
livekit-protocol: minor
---

Add `user_data` support to frame metadata, allowing arbitrary application-supplied bytes to be attached to a video frame via the `PTF_USER_DATA` packet trailer feature.
32 changes: 28 additions & 4 deletions examples/local_video/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod argus;
mod codec_display;
mod test_pattern;
mod timestamp_burn;
mod user_data;
mod video_display;
mod viewport_aspect;

Expand Down Expand Up @@ -250,6 +251,13 @@ struct Args {
#[arg(long, default_value_t = false)]
attach_frame_id: bool,

/// Attach keyboard-controlled 6-channel data (6x int16 fixed-point, 12 bytes)
/// as the per-frame user_data trailer field. Control the channels from the
/// preview window: Q/A=CH1, W/S=CH2, E/D=CH3, R/F=CH4, T/G=CH5, Y/H=CH6.
/// Requires --display-video (the window provides keyboard focus).
#[arg(long, default_value_t = false, requires = "display_video")]
attach_user_data: bool,

/// Open a window that displays the video frames being published
#[arg(long, default_value_t = false)]
display_video: bool,
Expand Down Expand Up @@ -1181,6 +1189,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
let mut frame_metadata_features = FrameMetadataFeatures::default();
frame_metadata_features.user_timestamp = args.attach_timestamp;
frame_metadata_features.frame_id = args.attach_frame_id;
frame_metadata_features.user_data = args.attach_user_data;

let publish_opts = |codec: VideoCodec| TrackPublishOptions {
source: TrackSource::Camera,
Expand Down Expand Up @@ -1222,6 +1231,11 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
display_timing: args.display_timing,
};

// Shared keyboard-controlled channel values, written by the preview window
// and read by the capture loop to fill the user_data trailer.
let user_data_channels =
args.attach_user_data.then(|| Arc::new(Mutex::new([0.0f32; user_data::NUM_CHANNELS])));

let publish_stats_task =
tokio::spawn(update_publisher_video_stats(track.clone(), ctrl_c_received.clone()));

Expand All @@ -1235,6 +1249,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
session,
width,
height,
user_data_channels.clone(),
)
.await;
let _ = publish_stats_task.await;
Expand Down Expand Up @@ -1264,13 +1279,15 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
height,
Some(shared.clone()),
publish_timing_state.clone(),
user_data_channels.clone(),
));

let display_result = video_display::run_display(
"LiveKit Video Publisher",
shared,
ctrl_c_received.clone(),
Some(width as f32 / height as f32),
user_data_channels.clone(),
);

let capture_result = capture_task.await?;
Expand All @@ -1289,6 +1306,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
height,
None,
publish_timing_state.clone(),
user_data_channels.clone(),
)
.await;
let _ = publish_stats_task.await;
Expand All @@ -1310,6 +1328,7 @@ async fn run_capture_loop(
height: u32,
display_shared: Option<Arc<Mutex<SharedYuv>>>,
publish_timing_state: Option<Arc<Mutex<PublisherTimingState>>>,
user_data_channels: Option<Arc<Mutex<[f32; user_data::NUM_CHANNELS]>>>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): Calling this data channel might imply this is related to WebRTC data channels, not frame metadata.

) -> Result<()> {
// Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
let pace_fps = config.fps as f64;
Expand Down Expand Up @@ -1573,8 +1592,10 @@ async fn run_capture_loop(
if burned_timestamp_us.is_some() {
debug_assert_eq!(burned_timestamp_us, Some(capture_wall_time_us));
}
frame.frame_metadata = if user_ts.is_some() || fid.is_some() {
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid })
let user_data =
user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock()));
frame.frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() {
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data })
} else {
None
};
Expand Down Expand Up @@ -1702,6 +1723,7 @@ async fn run_argus_capture_loop(
session: argus::ArgusCaptureSession,
width: u32,
height: u32,
user_data_channels: Option<Arc<Mutex<[f32; user_data::NUM_CHANNELS]>>>,
) -> Result<()> {
let capture_handle = std::thread::Builder::new()
.name("mipi-capture".into())
Expand Down Expand Up @@ -1818,8 +1840,10 @@ async fn run_argus_capture_loop(
} else {
None
};
let frame_metadata = if user_ts.is_some() || fid.is_some() {
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid })
let user_data =
user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock()));
let frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() {
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data })
} else {
None
};
Expand Down
110 changes: 105 additions & 5 deletions examples/local_video/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use livekit_api::access_token;
use log::{debug, info};
use parking_lot::Mutex;
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
env,
sync::OnceLock,
sync::{
Expand All @@ -25,6 +25,7 @@ use std::{

mod codec_display;
mod subscriber_timing;
mod user_data;
mod viewport_aspect;

use codec_display::{codec_from_mime, codec_with_implementation};
Expand Down Expand Up @@ -946,7 +947,7 @@ async fn handle_track_subscribed(
if drained_frames > 0 {
debug!("Dropped {drained_frames} stale decoded frames before render upload");
}
if let Some(metadata) = frame.frame_metadata {
if let Some(metadata) = &frame.frame_metadata {
if let Some(capture_timestamp_us) = metadata.user_timestamp {
subscriber_timing_sink.record_frame_received_by_sink(
capture_timestamp_us,
Expand Down Expand Up @@ -1120,6 +1121,80 @@ fn subscriber_overlay_lines(
Some(lines)
}

/// Render a live line graph of the six decoded channel values (top-right overlay).
/// Each trace is normalized so ±`VALUE_RANGE` spans the plot height.
fn paint_channel_graph(ctx: &egui::Context, history: &VecDeque<[f32; user_data::NUM_CHANNELS]>) {
if history.is_empty() {
return;
}
let latest = *history.back().unwrap();

egui::Area::new("channel_graph".into())
.anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0))
.interactable(false)
.show(ctx, |ui| {
egui::Frame::NONE
.fill(egui::Color32::from_black_alpha(180))
.corner_radius(egui::CornerRadius::same(4))
.inner_margin(egui::Margin::same(8))
.show(ui, |ui| {
let plot_size = egui::vec2(360.0, 160.0);
// Pin the panel to the plot width so the legend wraps within it
// instead of stretching the frame wider than the graph.
ui.set_max_width(plot_size.x);

ui.label(
egui::RichText::new("user_data channels")
.monospace()
.size(12.0)
.color(egui::Color32::WHITE),
);

let (rect, _) = ui.allocate_exact_size(plot_size, egui::Sense::hover());
let painter = ui.painter_at(rect);

// Zero axis.
painter.hline(
rect.x_range(),
rect.center().y,
egui::Stroke::new(1.0, egui::Color32::from_gray(90)),
);

let n = history.len();
let denom = (n.saturating_sub(1)).max(1) as f32;
let half_h = rect.height() / 2.0 - 2.0;
for j in 0..user_data::NUM_CHANNELS {
let points: Vec<egui::Pos2> = history
.iter()
.enumerate()
.map(|(i, sample)| {
let x = rect.left() + (i as f32 / denom) * rect.width();
let norm = (sample[j] / user_data::VALUE_RANGE).clamp(-1.0, 1.0);
let y = rect.center().y - norm * half_h;
egui::pos2(x, y)
})
.collect();
painter.add(egui::Shape::line(
points,
egui::Stroke::new(1.5, CHANNEL_COLORS[j]),
));
}

// Legend: current value per channel.
ui.horizontal_wrapped(|ui| {
for (j, value) in latest.iter().enumerate() {
ui.label(
egui::RichText::new(format!("CH{}: {:>+6.2}", j + 1, value))
.monospace()
.size(11.0)
.color(CHANNEL_COLORS[j]),
);
}
});
});
});
}

fn paint_subscriber_overlay(ctx: &egui::Context, lines: &[String]) {
egui::Area::new("subscriber_overlay".into())
.anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0))
Expand Down Expand Up @@ -1182,6 +1257,19 @@ fn handle_track_unpublished(
clear_hud_and_simulcast(shared, frame_slot, video_size, simulcast, subscriber_timing);
}

/// Number of channel samples retained for the live graph (~10s at 30fps).
const CHANNEL_HISTORY_LEN: usize = 300;

/// Distinct colors for the six channel traces.
const CHANNEL_COLORS: [egui::Color32; user_data::NUM_CHANNELS] = [
egui::Color32::from_rgb(0xef, 0x53, 0x50), // red
egui::Color32::from_rgb(0xff, 0xa7, 0x26), // orange
egui::Color32::from_rgb(0xff, 0xee, 0x58), // yellow
egui::Color32::from_rgb(0x66, 0xbb, 0x6a), // green
egui::Color32::from_rgb(0x42, 0xa5, 0xf5), // blue
egui::Color32::from_rgb(0xab, 0x47, 0xbc), // purple
];

struct VideoApp {
shared: Arc<Mutex<SharedYuv>>,
frame_slot: Arc<LatestRenderFrameSlot>,
Expand All @@ -1192,6 +1280,8 @@ struct VideoApp {
ctrl_c_received: Arc<AtomicBool>,
viewport: AspectConstrainedViewport,
display_timestamp: bool,
/// Rolling history of decoded channel values from the user_data trailer.
channel_history: VecDeque<[f32; user_data::NUM_CHANNELS]>,
}

impl eframe::App for VideoApp {
Expand All @@ -1208,14 +1298,21 @@ impl eframe::App for VideoApp {

let render_frame = self.frame_slot.take();
if let Some(frame) = render_frame.as_ref() {
if let Some(metadata) = frame.frame_metadata {
if let Some(metadata) = &frame.frame_metadata {
if let Some(capture_timestamp_us) = metadata.user_timestamp {
self.subscriber_timing.record_frame_selected_for_render(
capture_timestamp_us,
metadata.frame_id,
current_timestamp_us(),
);
}
// Decode the 6 user_data channel values for the live graph.
if let Some(values) = metadata.user_data.as_deref().and_then(user_data::decode) {
if self.channel_history.len() >= CHANNEL_HISTORY_LEN {
self.channel_history.pop_front();
}
self.channel_history.push_back(values);
}
}
}

Expand Down Expand Up @@ -1255,6 +1352,8 @@ impl eframe::App for VideoApp {
paint_subscriber_overlay(ctx, lines);
}

paint_channel_graph(ctx, &self.channel_history);

// Simulcast layer controls: bottom-left overlay
egui::Area::new("simulcast_controls".into())
.anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0))
Expand Down Expand Up @@ -1450,6 +1549,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
ctrl_c_received: ctrl_c_received.clone(),
viewport,
display_timestamp: args.display_timestamp,
channel_history: VecDeque::with_capacity(CHANNEL_HISTORY_LEN),
};
let native_options = viewport_aspect::native_options(None);
eframe::run_native(
Expand Down Expand Up @@ -1797,8 +1897,8 @@ impl CallbackTrait for YuvPaintCallback {

let frame_for_upload = self.render_frame.lock().take().map(|frame| {
let prepare_timestamp_us = current_timestamp_us();
let frame_id = frame.frame_metadata.and_then(|m| m.frame_id);
let sample = frame.frame_metadata.and_then(|metadata| {
let frame_id = frame.frame_metadata.as_ref().and_then(|m| m.frame_id);
let sample = frame.frame_metadata.as_ref().and_then(|metadata| {
metadata.user_timestamp.map(|capture_timestamp_us| PendingPaintSample {
frame_id,
capture_timestamp_us,
Expand Down
81 changes: 81 additions & 0 deletions examples/local_video/src/user_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Shared 6-channel codec for the `--attach-user-data` demo.
//!
//! Six channel values are encoded as little-endian `int16` fixed-point, 2 bytes
//! per channel = 12 bytes total, and shipped in the `user_data` frame-metadata
//! trailer field. The full `int16` range maps to `±VALUE_RANGE`, giving
//! ~1/32767 of the range in resolution — well within the ~232-byte trailer
//! budget.
//!
//! Both the `publisher` and `subscriber` binaries include this file via
//! `mod user_data;` so they agree on the wire format.

/// Number of channels carried in the user_data payload.
pub const NUM_CHANNELS: usize = 6;

/// Encoded payload size in bytes (2 bytes per channel).
pub const ENCODED_LEN: usize = NUM_CHANNELS * 2;

/// Value that maps to `i16::MAX`. Channel values are normalized to
/// `±VALUE_RANGE` before quantization.
pub const VALUE_RANGE: f32 = 1.0;

/// Value units per `int16` step.
fn scale() -> f32 {
VALUE_RANGE / i16::MAX as f32
}

/// Clamp a channel value to the encodable `±VALUE_RANGE` range.
pub fn clamp_value(value: f32) -> f32 {
value.clamp(-VALUE_RANGE, VALUE_RANGE)
}

/// Encode 6 channel values into 12 little-endian `int16` bytes.
pub fn encode(values: &[f32; NUM_CHANNELS]) -> Vec<u8> {
let s = scale();
let mut buf = Vec::with_capacity(ENCODED_LEN);
for &v in values {
let q = (v / s).round().clamp(i16::MIN as f32, i16::MAX as f32) as i16;
buf.extend_from_slice(&q.to_le_bytes());
}
buf
}

/// Decode 6 channel values from the `user_data` payload. Returns `None` if the
/// buffer is too short to hold all six values.
pub fn decode(buf: &[u8]) -> Option<[f32; NUM_CHANNELS]> {
if buf.len() < ENCODED_LEN {
return None;
}
let s = scale();
let mut out = [0.0f32; NUM_CHANNELS];
for (i, chunk) in buf.chunks_exact(2).take(NUM_CHANNELS).enumerate() {
out[i] = i16::from_le_bytes([chunk[0], chunk[1]]) as f32 * s;
}
Some(out)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn round_trips_within_quantization_error() {
let values = [0.0, 0.5, -0.75, 1.0, -0.1, 0.9];
let decoded = decode(&encode(&values)).unwrap();
for (v, d) in values.iter().zip(decoded.iter()) {
assert!((v - d).abs() <= scale(), "got {d}, expected ~{v}");
}
}

#[test]
fn clamp_keeps_within_range() {
assert_eq!(clamp_value(100.0), VALUE_RANGE);
assert_eq!(clamp_value(-100.0), -VALUE_RANGE);
assert_eq!(clamp_value(0.5), 0.5);
}

#[test]
fn decode_rejects_short_buffer() {
assert!(decode(&[0u8; ENCODED_LEN - 1]).is_none());
}
}
Loading
Loading