Skip to content

Commit f5f0f1b

Browse files
committed
update local video example to publish user_data
1 parent 7d2ade7 commit f5f0f1b

4 files changed

Lines changed: 298 additions & 9 deletions

File tree

examples/local_video/src/publisher.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use yuv_sys;
3333
mod argus;
3434
mod codec_display;
3535
mod test_pattern;
36+
mod user_data;
3637
mod timestamp_burn;
3738
mod video_display;
3839
mod viewport_aspect;
@@ -250,6 +251,13 @@ struct Args {
250251
#[arg(long, default_value_t = false)]
251252
attach_frame_id: bool,
252253

254+
/// Attach keyboard-controlled 6-channel data (6x int16 fixed-point, 12 bytes)
255+
/// as the per-frame user_data trailer field. Control the channels from the
256+
/// preview window: Q/A=CH1, W/S=CH2, E/D=CH3, R/F=CH4, T/G=CH5, Y/H=CH6.
257+
/// Requires --display-video (the window provides keyboard focus).
258+
#[arg(long, default_value_t = false, requires = "display_video")]
259+
attach_user_data: bool,
260+
253261
/// Open a window that displays the video frames being published
254262
#[arg(long, default_value_t = false)]
255263
display_video: bool,
@@ -1181,6 +1189,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
11811189
let mut frame_metadata_features = FrameMetadataFeatures::default();
11821190
frame_metadata_features.user_timestamp = args.attach_timestamp;
11831191
frame_metadata_features.frame_id = args.attach_frame_id;
1192+
frame_metadata_features.user_data = args.attach_user_data;
11841193

11851194
let publish_opts = |codec: VideoCodec| TrackPublishOptions {
11861195
source: TrackSource::Camera,
@@ -1222,6 +1231,12 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
12221231
display_timing: args.display_timing,
12231232
};
12241233

1234+
// Shared keyboard-controlled channel values, written by the preview window
1235+
// and read by the capture loop to fill the user_data trailer.
1236+
let user_data_channels = args
1237+
.attach_user_data
1238+
.then(|| Arc::new(Mutex::new([0.0f32; user_data::NUM_CHANNELS])));
1239+
12251240
let publish_stats_task =
12261241
tokio::spawn(update_publisher_video_stats(track.clone(), ctrl_c_received.clone()));
12271242

@@ -1235,6 +1250,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
12351250
session,
12361251
width,
12371252
height,
1253+
user_data_channels.clone(),
12381254
)
12391255
.await;
12401256
let _ = publish_stats_task.await;
@@ -1264,13 +1280,15 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
12641280
height,
12651281
Some(shared.clone()),
12661282
publish_timing_state.clone(),
1283+
user_data_channels.clone(),
12671284
));
12681285

12691286
let display_result = video_display::run_display(
12701287
"LiveKit Video Publisher",
12711288
shared,
12721289
ctrl_c_received.clone(),
12731290
Some(width as f32 / height as f32),
1291+
user_data_channels.clone(),
12741292
);
12751293

12761294
let capture_result = capture_task.await?;
@@ -1289,6 +1307,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
12891307
height,
12901308
None,
12911309
publish_timing_state.clone(),
1310+
user_data_channels.clone(),
12921311
)
12931312
.await;
12941313
let _ = publish_stats_task.await;
@@ -1310,6 +1329,7 @@ async fn run_capture_loop(
13101329
height: u32,
13111330
display_shared: Option<Arc<Mutex<SharedYuv>>>,
13121331
publish_timing_state: Option<Arc<Mutex<PublisherTimingState>>>,
1332+
user_data_channels: Option<Arc<Mutex<[f32; user_data::NUM_CHANNELS]>>>,
13131333
) -> Result<()> {
13141334
// Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
13151335
let pace_fps = config.fps as f64;
@@ -1573,8 +1593,10 @@ async fn run_capture_loop(
15731593
if burned_timestamp_us.is_some() {
15741594
debug_assert_eq!(burned_timestamp_us, Some(capture_wall_time_us));
15751595
}
1576-
frame.frame_metadata = if user_ts.is_some() || fid.is_some() {
1577-
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None })
1596+
let user_data =
1597+
user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock()));
1598+
frame.frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() {
1599+
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data })
15781600
} else {
15791601
None
15801602
};
@@ -1702,6 +1724,7 @@ async fn run_argus_capture_loop(
17021724
session: argus::ArgusCaptureSession,
17031725
width: u32,
17041726
height: u32,
1727+
user_data_channels: Option<Arc<Mutex<[f32; user_data::NUM_CHANNELS]>>>,
17051728
) -> Result<()> {
17061729
let capture_handle = std::thread::Builder::new()
17071730
.name("mipi-capture".into())
@@ -1818,8 +1841,10 @@ async fn run_argus_capture_loop(
18181841
} else {
18191842
None
18201843
};
1821-
let frame_metadata = if user_ts.is_some() || fid.is_some() {
1822-
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data: None })
1844+
let user_data =
1845+
user_data_channels.as_ref().map(|targets| user_data::encode(&targets.lock()));
1846+
let frame_metadata = if user_ts.is_some() || fid.is_some() || user_data.is_some() {
1847+
Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid, user_data })
18231848
} else {
18241849
None
18251850
};

examples/local_video/src/subscriber.rs

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use livekit_api::access_token;
1313
use log::{debug, info};
1414
use parking_lot::Mutex;
1515
use std::{
16-
collections::HashMap,
16+
collections::{HashMap, VecDeque},
1717
env,
1818
sync::OnceLock,
1919
sync::{
@@ -24,6 +24,7 @@ use std::{
2424
};
2525

2626
mod codec_display;
27+
mod user_data;
2728
mod subscriber_timing;
2829
mod viewport_aspect;
2930

@@ -946,7 +947,7 @@ async fn handle_track_subscribed(
946947
if drained_frames > 0 {
947948
debug!("Dropped {drained_frames} stale decoded frames before render upload");
948949
}
949-
if let Some(metadata) = frame.frame_metadata {
950+
if let Some(metadata) = &frame.frame_metadata {
950951
if let Some(capture_timestamp_us) = metadata.user_timestamp {
951952
subscriber_timing_sink.record_frame_received_by_sink(
952953
capture_timestamp_us,
@@ -1120,6 +1121,81 @@ fn subscriber_overlay_lines(
11201121
Some(lines)
11211122
}
11221123

1124+
/// Render a live line graph of the six decoded channel values (top-right overlay).
1125+
/// Each trace is normalized so ±`VALUE_RANGE` spans the plot height.
1126+
fn paint_channel_graph(ctx: &egui::Context, history: &VecDeque<[f32; user_data::NUM_CHANNELS]>) {
1127+
if history.is_empty() {
1128+
return;
1129+
}
1130+
let latest = *history.back().unwrap();
1131+
1132+
egui::Area::new("channel_graph".into())
1133+
.anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0))
1134+
.interactable(false)
1135+
.show(ctx, |ui| {
1136+
egui::Frame::NONE
1137+
.fill(egui::Color32::from_black_alpha(180))
1138+
.corner_radius(egui::CornerRadius::same(4))
1139+
.inner_margin(egui::Margin::same(8))
1140+
.show(ui, |ui| {
1141+
let plot_size = egui::vec2(360.0, 160.0);
1142+
// Pin the panel to the plot width so the legend wraps within it
1143+
// instead of stretching the frame wider than the graph.
1144+
ui.set_max_width(plot_size.x);
1145+
1146+
ui.label(
1147+
egui::RichText::new("user_data channels")
1148+
.monospace()
1149+
.size(12.0)
1150+
.color(egui::Color32::WHITE),
1151+
);
1152+
1153+
let (rect, _) = ui.allocate_exact_size(plot_size, egui::Sense::hover());
1154+
let painter = ui.painter_at(rect);
1155+
1156+
// Zero axis.
1157+
painter.hline(
1158+
rect.x_range(),
1159+
rect.center().y,
1160+
egui::Stroke::new(1.0, egui::Color32::from_gray(90)),
1161+
);
1162+
1163+
let n = history.len();
1164+
let denom = (n.saturating_sub(1)).max(1) as f32;
1165+
let half_h = rect.height() / 2.0 - 2.0;
1166+
for j in 0..user_data::NUM_CHANNELS {
1167+
let points: Vec<egui::Pos2> = history
1168+
.iter()
1169+
.enumerate()
1170+
.map(|(i, sample)| {
1171+
let x = rect.left() + (i as f32 / denom) * rect.width();
1172+
let norm = (sample[j] / user_data::VALUE_RANGE)
1173+
.clamp(-1.0, 1.0);
1174+
let y = rect.center().y - norm * half_h;
1175+
egui::pos2(x, y)
1176+
})
1177+
.collect();
1178+
painter.add(egui::Shape::line(
1179+
points,
1180+
egui::Stroke::new(1.5, CHANNEL_COLORS[j]),
1181+
));
1182+
}
1183+
1184+
// Legend: current value per channel.
1185+
ui.horizontal_wrapped(|ui| {
1186+
for (j, value) in latest.iter().enumerate() {
1187+
ui.label(
1188+
egui::RichText::new(format!("CH{}: {:>+6.2}", j + 1, value))
1189+
.monospace()
1190+
.size(11.0)
1191+
.color(CHANNEL_COLORS[j]),
1192+
);
1193+
}
1194+
});
1195+
});
1196+
});
1197+
}
1198+
11231199
fn paint_subscriber_overlay(ctx: &egui::Context, lines: &[String]) {
11241200
egui::Area::new("subscriber_overlay".into())
11251201
.anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0))
@@ -1182,6 +1258,19 @@ fn handle_track_unpublished(
11821258
clear_hud_and_simulcast(shared, frame_slot, video_size, simulcast, subscriber_timing);
11831259
}
11841260

1261+
/// Number of channel samples retained for the live graph (~10s at 30fps).
1262+
const CHANNEL_HISTORY_LEN: usize = 300;
1263+
1264+
/// Distinct colors for the six channel traces.
1265+
const CHANNEL_COLORS: [egui::Color32; user_data::NUM_CHANNELS] = [
1266+
egui::Color32::from_rgb(0xef, 0x53, 0x50), // red
1267+
egui::Color32::from_rgb(0xff, 0xa7, 0x26), // orange
1268+
egui::Color32::from_rgb(0xff, 0xee, 0x58), // yellow
1269+
egui::Color32::from_rgb(0x66, 0xbb, 0x6a), // green
1270+
egui::Color32::from_rgb(0x42, 0xa5, 0xf5), // blue
1271+
egui::Color32::from_rgb(0xab, 0x47, 0xbc), // purple
1272+
];
1273+
11851274
struct VideoApp {
11861275
shared: Arc<Mutex<SharedYuv>>,
11871276
frame_slot: Arc<LatestRenderFrameSlot>,
@@ -1192,6 +1281,8 @@ struct VideoApp {
11921281
ctrl_c_received: Arc<AtomicBool>,
11931282
viewport: AspectConstrainedViewport,
11941283
display_timestamp: bool,
1284+
/// Rolling history of decoded channel values from the user_data trailer.
1285+
channel_history: VecDeque<[f32; user_data::NUM_CHANNELS]>,
11951286
}
11961287

11971288
impl eframe::App for VideoApp {
@@ -1208,14 +1299,21 @@ impl eframe::App for VideoApp {
12081299

12091300
let render_frame = self.frame_slot.take();
12101301
if let Some(frame) = render_frame.as_ref() {
1211-
if let Some(metadata) = frame.frame_metadata {
1302+
if let Some(metadata) = &frame.frame_metadata {
12121303
if let Some(capture_timestamp_us) = metadata.user_timestamp {
12131304
self.subscriber_timing.record_frame_selected_for_render(
12141305
capture_timestamp_us,
12151306
metadata.frame_id,
12161307
current_timestamp_us(),
12171308
);
12181309
}
1310+
// Decode the 6 user_data channel values for the live graph.
1311+
if let Some(values) = metadata.user_data.as_deref().and_then(user_data::decode) {
1312+
if self.channel_history.len() >= CHANNEL_HISTORY_LEN {
1313+
self.channel_history.pop_front();
1314+
}
1315+
self.channel_history.push_back(values);
1316+
}
12191317
}
12201318
}
12211319

@@ -1255,6 +1353,8 @@ impl eframe::App for VideoApp {
12551353
paint_subscriber_overlay(ctx, lines);
12561354
}
12571355

1356+
paint_channel_graph(ctx, &self.channel_history);
1357+
12581358
// Simulcast layer controls: bottom-left overlay
12591359
egui::Area::new("simulcast_controls".into())
12601360
.anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0))
@@ -1450,6 +1550,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
14501550
ctrl_c_received: ctrl_c_received.clone(),
14511551
viewport,
14521552
display_timestamp: args.display_timestamp,
1553+
channel_history: VecDeque::with_capacity(CHANNEL_HISTORY_LEN),
14531554
};
14541555
let native_options = viewport_aspect::native_options(None);
14551556
eframe::run_native(
@@ -1797,8 +1898,8 @@ impl CallbackTrait for YuvPaintCallback {
17971898

17981899
let frame_for_upload = self.render_frame.lock().take().map(|frame| {
17991900
let prepare_timestamp_us = current_timestamp_us();
1800-
let frame_id = frame.frame_metadata.and_then(|m| m.frame_id);
1801-
let sample = frame.frame_metadata.and_then(|metadata| {
1901+
let frame_id = frame.frame_metadata.as_ref().and_then(|m| m.frame_id);
1902+
let sample = frame.frame_metadata.as_ref().and_then(|metadata| {
18021903
metadata.user_timestamp.map(|capture_timestamp_us| PendingPaintSample {
18031904
frame_id,
18041905
capture_timestamp_us,
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Shared 6-channel codec for the `--attach-user-data` demo.
2+
//!
3+
//! Six channel values are encoded as little-endian `int16` fixed-point, 2 bytes
4+
//! per channel = 12 bytes total, and shipped in the `user_data` frame-metadata
5+
//! trailer field. The full `int16` range maps to `±VALUE_RANGE`, giving
6+
//! ~1/32767 of the range in resolution — well within the ~232-byte trailer
7+
//! budget.
8+
//!
9+
//! Both the `publisher` and `subscriber` binaries include this file via
10+
//! `mod user_data;` so they agree on the wire format.
11+
12+
/// Number of channels carried in the user_data payload.
13+
pub const NUM_CHANNELS: usize = 6;
14+
15+
/// Encoded payload size in bytes (2 bytes per channel).
16+
pub const ENCODED_LEN: usize = NUM_CHANNELS * 2;
17+
18+
/// Value that maps to `i16::MAX`. Channel values are normalized to
19+
/// `±VALUE_RANGE` before quantization.
20+
pub const VALUE_RANGE: f32 = 1.0;
21+
22+
/// Value units per `int16` step.
23+
fn scale() -> f32 {
24+
VALUE_RANGE / i16::MAX as f32
25+
}
26+
27+
/// Clamp a channel value to the encodable `±VALUE_RANGE` range.
28+
pub fn clamp_value(value: f32) -> f32 {
29+
value.clamp(-VALUE_RANGE, VALUE_RANGE)
30+
}
31+
32+
/// Encode 6 channel values into 12 little-endian `int16` bytes.
33+
pub fn encode(values: &[f32; NUM_CHANNELS]) -> Vec<u8> {
34+
let s = scale();
35+
let mut buf = Vec::with_capacity(ENCODED_LEN);
36+
for &v in values {
37+
let q = (v / s).round().clamp(i16::MIN as f32, i16::MAX as f32) as i16;
38+
buf.extend_from_slice(&q.to_le_bytes());
39+
}
40+
buf
41+
}
42+
43+
/// Decode 6 channel values from the `user_data` payload. Returns `None` if the
44+
/// buffer is too short to hold all six values.
45+
pub fn decode(buf: &[u8]) -> Option<[f32; NUM_CHANNELS]> {
46+
if buf.len() < ENCODED_LEN {
47+
return None;
48+
}
49+
let s = scale();
50+
let mut out = [0.0f32; NUM_CHANNELS];
51+
for (i, chunk) in buf.chunks_exact(2).take(NUM_CHANNELS).enumerate() {
52+
out[i] = i16::from_le_bytes([chunk[0], chunk[1]]) as f32 * s;
53+
}
54+
Some(out)
55+
}
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use super::*;
60+
61+
#[test]
62+
fn round_trips_within_quantization_error() {
63+
let values = [0.0, 0.5, -0.75, 1.0, -0.1, 0.9];
64+
let decoded = decode(&encode(&values)).unwrap();
65+
for (v, d) in values.iter().zip(decoded.iter()) {
66+
assert!((v - d).abs() <= scale(), "got {d}, expected ~{v}");
67+
}
68+
}
69+
70+
#[test]
71+
fn clamp_keeps_within_range() {
72+
assert_eq!(clamp_value(100.0), VALUE_RANGE);
73+
assert_eq!(clamp_value(-100.0), -VALUE_RANGE);
74+
assert_eq!(clamp_value(0.5), 0.5);
75+
}
76+
77+
#[test]
78+
fn decode_rejects_short_buffer() {
79+
assert!(decode(&[0u8; ENCODED_LEN - 1]).is_none());
80+
}
81+
}

0 commit comments

Comments
 (0)