Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ff47c71
Pin protocol to ladvoc/schema-metadata
ladvoc Jun 5, 2026
d28a823
Expose schema and frame encoding
ladvoc Jun 10, 2026
419e094
Clean up conversion
ladvoc Jun 10, 2026
2099af1
Add unit test
ladvoc Jun 10, 2026
0a48860
Document encoding enum cases
ladvoc Jun 10, 2026
d5832d8
Export under api module
ladvoc Jun 10, 2026
327140d
Define conversion to data blob key
ladvoc Jun 10, 2026
0099197
Prototype
ladvoc Jun 10, 2026
48cd8a1
Expose over FFI
ladvoc Jun 10, 2026
df2d4f7
Changeset
ladvoc Jun 10, 2026
2248728
Add E2E test cases
ladvoc Jun 10, 2026
682a63c
Document core types
ladvoc Jun 10, 2026
ea5c92e
Document store and get methods
ladvoc Jun 10, 2026
b3e6fb7
generated protobuf
github-actions[bot] Jun 10, 2026
8f74a90
Update protocol
ladvoc Jun 11, 2026
49b4fd9
Wire up request/response
ladvoc Jun 11, 2026
fc19418
Update protocol
ladvoc Jun 15, 2026
5ab44b2
Move data blob methods to local participant
ladvoc Jun 15, 2026
ea63124
Update protocol
ladvoc Jun 16, 2026
ab8d654
Derive eq and hash on data blob proto types
ladvoc Jun 16, 2026
c31b6fd
Wire up
ladvoc Jun 16, 2026
e56d21b
Note to make internal
ladvoc Jun 16, 2026
125a7d4
Use request ID
ladvoc Jun 17, 2026
a3b39ca
Make data blobs private, test schema storage only
ladvoc Jun 17, 2026
33baaf7
Add accessors for schema and frame encoding
ladvoc Jun 17, 2026
18381f2
Test publish with metadata
ladvoc Jun 17, 2026
62bf4bb
Don't log error
ladvoc Jun 17, 2026
a271c9d
Expose data track fields over FFI
ladvoc Jun 17, 2026
14af23c
Merge remote-tracking branch 'origin/main' into ladvoc/schema-metadata
ladvoc Jun 23, 2026
24236c3
Merge remote-tracking branch 'origin/main' into ladvoc/schema-metadata
ladvoc Jun 23, 2026
d560373
Pin protocol
ladvoc Jun 23, 2026
aed1fed
generated protobuf
github-actions[bot] Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/data-track-schemas-ffi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-datatrack: patch
livekit-ffi: patch
---

Add schema metadata support for data tracks.
5 changes: 4 additions & 1 deletion livekit-datatrack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
/// Common types for local and remote tracks.
mod track;

/// Schema and frame encoding metadata for typed tracks.
mod schema;

/// Local track publication.
mod local;

Expand All @@ -40,7 +43,7 @@ mod error;

/// Public APIs re-exported by client SDKs.
pub mod api {
pub use crate::{error::*, frame::*, local::*, remote::*, track::*};
pub use crate::{error::*, frame::*, local::*, remote::*, schema::*, track::*};
}

/// Internal APIs used within client SDKs to power data tracks functionality.
Expand Down
3 changes: 3 additions & 0 deletions livekit-datatrack/src/local/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::{
api::{DataTrackInfo, DataTrackOptions, LocalDataTrack, PublishError},
packet::Handle,
schema::{DataTrackFrameEncoding, DataTrackSchemaId},
};
use bytes::Bytes;
use from_variants::FromVariants;
Expand Down Expand Up @@ -124,6 +125,8 @@ pub struct SfuPublishRequest {
pub handle: Handle,
pub name: String,
pub uses_e2ee: bool,
pub schema: Option<DataTrackSchemaId>,
pub frame_encoding: Option<DataTrackFrameEncoding>,
}

/// Request sent to the SFU to unpublish a track.
Expand Down
18 changes: 18 additions & 0 deletions livekit-datatrack/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ impl Manager {
handle,
name: event.options.name,
uses_e2ee: self.encryption_provider.is_some(),
schema: event.options.schema,
frame_encoding: event.options.frame_encoding,
};
_ = self.event_out_tx.send(event.into()).await;
}
Expand Down Expand Up @@ -280,6 +282,8 @@ impl Manager {
handle: info.pub_handle,
name: info.name.clone(),
uses_e2ee: info.uses_e2ee,
schema: info.schema.clone(),
frame_encoding: info.frame_encoding,
};
_ = state_tx.send(PublishState::Republishing);
_ = self.event_out_tx.send(event.into()).await;
Expand Down Expand Up @@ -525,6 +529,8 @@ mod tests {
pub_handle,
name: event.name,
uses_e2ee: event.uses_e2ee,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
_ = input.send(event.into());
Expand Down Expand Up @@ -604,6 +610,8 @@ mod tests {
pub_handle: handle,
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -634,6 +642,8 @@ mod tests {
pub_handle: event.handle,
name: "secure".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -674,6 +684,8 @@ mod tests {
pub_handle: handle,
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand All @@ -699,6 +711,8 @@ mod tests {
pub_handle: handle,
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -728,6 +742,8 @@ mod tests {
pub_handle: event.handle,
name: name.into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -767,6 +783,8 @@ mod tests {
pub_handle: event.handle,
name: "active".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion livekit-datatrack/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::{
api::{DataTrack, DataTrackFrame, DataTrackInfo, InternalError},
schema::{DataTrackFrameEncoding, DataTrackSchemaId},
track::DataTrackInner,
};
use std::{fmt, marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -153,6 +154,8 @@ impl Drop for LocalTrackInner {
#[derive(Clone, Debug)]
pub struct DataTrackOptions {
pub(crate) name: String,
pub(crate) schema: Option<DataTrackSchemaId>,
pub(crate) frame_encoding: Option<DataTrackFrameEncoding>,
}

impl DataTrackOptions {
Expand All @@ -165,7 +168,15 @@ impl DataTrackOptions {
/// - Must be unique per publisher
///
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
Self { name: name.into(), schema: None, frame_encoding: None }
}

pub fn with_schema(self, schema: DataTrackSchemaId) -> Self {
Self { schema: Some(schema), ..self }
}

pub fn with_frame_encoding(self, encoding: DataTrackFrameEncoding) -> Self {
Self { frame_encoding: Some(encoding), ..self }
}
}

Expand Down
74 changes: 69 additions & 5 deletions livekit-datatrack/src/local/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ impl From<SfuPublishRequest> for proto::PublishDataTrackRequest {
fn from(event: SfuPublishRequest) -> Self {
use proto::encryption::Type;
let encryption = if event.uses_e2ee { Type::Gcm } else { Type::None }.into();
Self { pub_handle: event.handle.into(), name: event.name, encryption }
let schema = event.schema.map(|schema| schema.into());
let frame_encoding = event
.frame_encoding
.map(|encoding| proto::DataTrackFrameEncoding::from(encoding) as i32);
Self {
pub_handle: event.handle.into(),
name: event.name,
encryption,
schema,
frame_encoding,
}
}
}

Expand Down Expand Up @@ -74,8 +84,18 @@ impl TryFrom<proto::DataTrackInfo> for DataTrackInfo {
proto::encryption::Type::Gcm => true,
other => Err(anyhow!("Unsupported E2EE type: {:?}", other))?,
};
let frame_encoding = msg.frame_encoding.map(|_| msg.frame_encoding().into());
let sid: DataTrackSid = msg.sid.try_into().map_err(anyhow::Error::from)?;
Ok(Self { pub_handle: handle, sid: RwLock::new(sid).into(), name: msg.name, uses_e2ee })
let schema = msg.schema.map(|schema| schema.into());

Ok(Self {
pub_handle: handle,
sid: RwLock::new(sid).into(),
name: msg.name,
uses_e2ee,
schema,
frame_encoding,
})
}
}

Expand Down Expand Up @@ -106,12 +126,19 @@ impl From<DataTrackInfo> for proto::DataTrackInfo {
proto::encryption::Type::Gcm
} else {
proto::encryption::Type::None
};
} as i32;
let sid = info.sid().to_string();
let schema = info.schema.map(|schema| schema.into());
let frame_encoding = info
.frame_encoding
.map(|encoding| proto::DataTrackFrameEncoding::from(encoding) as i32);
Self {
pub_handle: info.pub_handle.into(),
sid: info.sid().to_string(),
sid,
name: info.name,
encryption: encryption as i32,
encryption,
schema,
frame_encoding,
}
}
}
Expand All @@ -128,6 +155,8 @@ pub fn publish_responses_for_sync_state(

#[cfg(test)]
mod tests {
use crate::schema::{DataTrackFrameEncoding, DataTrackSchemaEncoding, DataTrackSchemaId};

use super::*;
use fake::{Fake, Faker};

Expand All @@ -137,6 +166,8 @@ mod tests {
handle: 1u32.try_into().unwrap(),
name: "track".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};
let request: proto::PublishDataTrackRequest = event.into();
assert_eq!(request.pub_handle, 1);
Expand All @@ -159,6 +190,12 @@ mod tests {
sid: "DTR_1234".into(),
name: "track".into(),
encryption: proto::encryption::Type::Gcm.into(),
schema: proto::DataTrackSchemaId {
name: "schema".into(),
encoding: proto::DataTrackSchemaEncoding::JsonSchema.into(),
}
.into(),
frame_encoding: Some(proto::DataTrackFrameEncoding::Json.into()),
}
.into(),
};
Expand All @@ -169,9 +206,36 @@ mod tests {
assert_eq!(info.pub_handle, 1u32.try_into().unwrap());
assert_eq!(*info.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
assert_eq!(info.name, "track");
assert_eq!(
info.schema,
Some(DataTrackSchemaId::new("schema", DataTrackSchemaEncoding::JsonSchema))
);
assert_eq!(info.frame_encoding, Some(DataTrackFrameEncoding::Json));
assert!(info.uses_e2ee);
}

#[test]
fn test_frame_encoding_mapping() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_empty_frame_encoding ?

let base = proto::DataTrackInfo {
pub_handle: 1,
sid: "DTR_1234".into(),
name: "track".into(),
encryption: proto::encryption::Type::None.into(),
schema: None,
frame_encoding: None,
};

let info: DataTrackInfo = base.clone().try_into().unwrap();
assert_eq!(info.frame_encoding, None);

let unspecified = proto::DataTrackInfo {
frame_encoding: Some(proto::DataTrackFrameEncoding::Unspecified.into()),
..base
};
let info: DataTrackInfo = unspecified.try_into().unwrap();
assert_eq!(info.frame_encoding, Some(DataTrackFrameEncoding::Other));
}

#[test]
fn test_from_request_response() {
use proto::request_response::{Reason, Request};
Expand Down
18 changes: 18 additions & 0 deletions livekit-datatrack/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ mod tests {
pub_handle: Faker.fake(), // Pub handle
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
}],
)]),
};
Expand Down Expand Up @@ -658,6 +660,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -694,6 +698,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate three identical publication updates
Expand Down Expand Up @@ -726,6 +732,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -785,6 +793,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};

// Simulate track published (with e2ee)
Expand Down Expand Up @@ -846,6 +856,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -944,6 +956,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -987,6 +1001,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -1038,6 +1054,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down
2 changes: 2 additions & 0 deletions livekit-datatrack/src/remote/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ mod tests {
sid: "DTR_1234".into(),
name: "track1".into(),
encryption: proto::encryption::Type::Gcm.into(),
schema: None,
frame_encoding: None,
}];
let mut participant_info = proto::ParticipantInfo { data_tracks, ..Default::default() };

Expand Down
Loading