Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let package = Package(
dependencies: [
// LK-Prefixed Dynamic WebRTC XCFramework
.package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "144.7559.03"),
.package(url: "https://github.com/livekit/livekit-uniffi-xcframework.git", exact: "0.0.6"),
.package(name: "LiveKitUniFFI", path: "../../rust-sdks/livekit-uniffi/packages/swift/LiveKitUniFFI"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"),
// Only used for DocC generation
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"),
Expand All @@ -34,7 +34,7 @@ let package = Package(
name: "LiveKit",
dependencies: [
.product(name: "LiveKitWebRTC", package: "webrtc-xcframework"),
.product(name: "LiveKitUniFFI", package: "livekit-uniffi-xcframework"),
.product(name: "LiveKitUniFFI", package: "LiveKitUniFFI"),
.product(name: "SwiftProtobuf", package: "swift-protobuf"),
"LKObjCHelpers",
],
Expand Down
4 changes: 2 additions & 2 deletions Package@swift-6.2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
dependencies: [
// LK-Prefixed Dynamic WebRTC XCFramework
.package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "144.7559.03"),
.package(url: "https://github.com/livekit/livekit-uniffi-xcframework.git", exact: "0.0.6"),
.package(name: "LiveKitUniFFI", path: "../../rust-sdks/livekit-uniffi/packages/swift/LiveKitUniFFI"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"),
// Only used for DocC generation
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"),
Expand All @@ -35,7 +35,7 @@ let package = Package(
name: "LiveKit",
dependencies: [
.product(name: "LiveKitWebRTC", package: "webrtc-xcframework"),
.product(name: "LiveKitUniFFI", package: "livekit-uniffi-xcframework"),
.product(name: "LiveKitUniFFI", package: "LiveKitUniFFI"),
.product(name: "SwiftProtobuf", package: "swift-protobuf"),
"LKObjCHelpers",
],
Expand Down
135 changes: 135 additions & 0 deletions Sources/LiveKit/Core/Room+DataTrack.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2026 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

internal import LiveKitUniFFI
internal import LiveKitWebRTC

// MARK: - Data Track Manager Properties

extension Room {
func setupDataTrackManagers() {
let localBridge = LocalDataTrackBridge(room: self)
localDataTrackManager = LocalDataTrackManager(
delegate: localBridge,
encryptionProvider: nil // TODO: E2EE bridge in Phase 1f
)

let remoteBridge = RemoteDataTrackBridge(room: self)
remoteDataTrackManager = RemoteDataTrackManager(
delegate: remoteBridge,
decryptionProvider: nil // TODO: E2EE bridge in Phase 1f
)
}

func cleanUpDataTrackManagers() {
localDataTrackManager = nil
remoteDataTrackManager = nil
}
}

// MARK: - Subscriber Data Track Channel

extension Room {
func configureSubscriberDataTrackChannel(_ dataChannel: LKRTCDataChannel) {
log("Setting subscriber data track channel")
subscriberDataTrackChannel = dataChannel
dataChannel.delegate = subscriberDataTrackChannelDelegate
}
}

// MARK: - Local Data Track Bridge

final class LocalDataTrackBridge: LocalDataTrackManagerDelegate, @unchecked Sendable {
private weak var room: Room?

init(room: Room) {
self.room = room
}

func onSignalRequest(request: Data) {
guard let room else { return }
guard let signalRequest = try? Livekit_SignalRequest(serializedBytes: request) else {
room.log("Failed to decode data track signal request", .warning)
return
}
Task {
try? await room.signalClient.sendRequest(signalRequest)
}
}

func onPacketsAvailable(packets: [Data]) {
guard let room, let channel = room.publisherDataTrackChannel else { return }
for packet in packets {
let buffer = RTC.createDataBuffer(data: packet)
DispatchQueue.liveKitWebRTC.sync {
channel.sendData(buffer)
}
}
}
}

// MARK: - Remote Data Track Bridge

final class RemoteDataTrackBridge: RemoteDataTrackManagerDelegate, @unchecked Sendable {
private weak var room: Room?

init(room: Room) {
self.room = room
}

func onSignalRequest(request: Data) {
guard let room else { return }
guard let signalRequest = try? Livekit_SignalRequest(serializedBytes: request) else {
room.log("Failed to decode data track signal request", .warning)
return
}
Task {
try? await room.signalClient.sendRequest(signalRequest)
}
}

func onTrackPublished(track: RemoteDataTrack) {
guard let room else { return }
room.dataTrackDelegates.notify(label: { "room.didPublishDataTrack" }) {
$0.room(room, didPublishDataTrack: track)
}
}

func onTrackUnpublished(sid: String) {
guard let room else { return }
room.dataTrackDelegates.notify(label: { "room.didUnpublishDataTrack" }) {
$0.room(room, didUnpublishDataTrack: sid)
}
}
}

// MARK: - Subscriber Data Track Channel Delegate

final class SubscriberDataTrackChannelDelegate: NSObject, LKRTCDataChannelDelegate, @unchecked Sendable {
private weak var room: Room?

init(room: Room) {
self.room = room
}

func dataChannelDidChangeState(_: LKRTCDataChannel) {}

func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
room?.remoteDataTrackManager?.handlePacketReceived(packet: buffer.data)
}
}
15 changes: 15 additions & 0 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ extension Room {
publisherDataChannel.reset()
subscriberDataChannel.reset()

// Clean up data track channels and managers
publisherDataTrackChannel = nil
subscriberDataTrackChannel = nil
cleanUpDataTrackManagers()

await _state.transport?.close()

// Reset publish state
Expand Down Expand Up @@ -178,8 +183,13 @@ extension Room {
publisherDataChannel.set(reliable: reliableDataChannel)
publisherDataChannel.set(lossy: lossyDataChannel)

// Data track channel (unordered, unreliable — DTP handles its own sequencing)
publisherDataTrackChannel = await publisher.dataChannel(for: LKRTCDataChannel.Labels.dataTrack,
configuration: RTC.createDataChannelConfiguration(ordered: false, maxRetransmits: 0))

log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))")
log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))")
log("dataChannel.\(String(describing: publisherDataTrackChannel?.label)) : \(String(describing: publisherDataTrackChannel?.channelId))")

let subscriber = isSinglePC ? nil : try Transport(config: rtcConfiguration,
target: .subscriber,
Expand Down Expand Up @@ -447,9 +457,14 @@ extension Room {
if case .quick = mode {
try await quickReconnectSequence()
self.log("[Connect] Quick reconnect succeeded for attempt \(currentAttempt)")
// Resend data track subscription state after quick reconnect
self.remoteDataTrackManager?.resendSubscriptionUpdates()
} else if case .full = mode {
try await fullReconnectSequence()
self.log("[Connect] Full reconnect succeeded for attempt \(currentAttempt)")
// Republish data tracks after full reconnect
self.localDataTrackManager?.republishTracks()
self.remoteDataTrackManager?.resendSubscriptionUpdates()
}
} catch {
self.log("[Connect] Reconnect mode: \(mode) failed with error: \(error)", .error)
Expand Down
14 changes: 14 additions & 0 deletions Sources/LiveKit/Core/Room+SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,20 @@ extension Room: SignalClientDelegate {
}
}

func signalClient(_: SignalClient, didReceiveRawResponse data: Data) async {
// Each manager has specific handler methods per message type.
// Try them all — they return UnsupportedType for messages they don't handle.
try? localDataTrackManager?.handleSfuRequestResponse(res: data)
try? localDataTrackManager?.handleSfuPublishResponse(res: data)
try? remoteDataTrackManager?.handleSubscriberHandles(res: data)
if let identity = localParticipant.identity?.stringValue {
try? remoteDataTrackManager?.handleSfuParticipantUpdate(
res: data,
localParticipantIdentity: identity
)
}
}

func signalClient(_: SignalClient, didReceiveMediaSectionsRequirement requirement: Livekit_MediaSectionsRequirement) async {
guard case let .publisherOnly(publisher) = _state.transport else { return }

Expand Down
1 change: 1 addition & 0 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ extension Room: TransportDelegate {
switch dataChannel.label {
case LKRTCDataChannel.Labels.reliable: subscriberDataChannel.set(reliable: dataChannel)
case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel)
case LKRTCDataChannel.Labels.dataTrack: configureSubscriberDataTrackChannel(dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
}
}
Expand Down
14 changes: 14 additions & 0 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import Combine
import Foundation

internal import LiveKitUniFFI
internal import LiveKitWebRTC

#if canImport(Network)
import Network
#endif
Expand Down Expand Up @@ -128,6 +131,15 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
self?.e2eeManager?.dataChannelEncryptionType ?? .none
}

// MARK: - Data Tracks

var localDataTrackManager: LocalDataTrackManager?
var remoteDataTrackManager: RemoteDataTrackManager?
var publisherDataTrackChannel: LKRTCDataChannel?
var subscriberDataTrackChannel: LKRTCDataChannel?
lazy var subscriberDataTrackChannelDelegate = SubscriberDataTrackChannelDelegate(room: self)
let dataTrackDelegates = MulticastDelegate<DataTrackDelegate>(label: "DataTrackDelegate")

// MARK: - PreConnect

lazy var preConnectBuffer = PreConnectAudioBuffer(room: self)
Expand Down Expand Up @@ -433,6 +445,8 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
// Final check if cancelled, don't fire connected events
try Task.checkCancellation()

setupDataTrackManagers()

_state.mutate {
$0.connectedUrl = finalUrl
$0.connectionState = .connected
Expand Down
10 changes: 10 additions & 0 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ private extension SignalClient {
return
}

// Forward serialized protobuf bytes to delegate for data track managers.
// Re-serialize if the message arrived as JSON (string) rather than binary.
if let rawData = try? response.serializedData() {
_delegate.notifyDetached { await $0.signalClient(self, didReceiveRawResponse: rawData) }
}

Task.detached {
let alwaysProcess = switch response.message {
case .join, .reconnect, .leave: true
Expand Down Expand Up @@ -416,6 +422,10 @@ extension SignalClient {
// MARK: - Send methods

extension SignalClient {
func sendRequest(_ request: Livekit_SignalRequest) async throws {
try await _sendRequest(request)
}

func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws {
let r = Livekit_SignalRequest.with {
$0.offer = offer.toPBType(offerId: offerId)
Expand Down
1 change: 1 addition & 0 deletions Sources/LiveKit/Extensions/RTCDataChannel+Util.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extension LKRTCDataChannel {
enum Labels {
static let reliable = "_reliable"
static let lossy = "_lossy"
static let dataTrack = "_data_track"
}

func toLKInfoType() -> Livekit_DataChannelInfo {
Expand Down
82 changes: 82 additions & 0 deletions Sources/LiveKit/Participant/LocalParticipant+DataTrack.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2026 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

internal import LiveKitUniFFI

// MARK: - Data Track Publishing

extension LocalParticipant {
/// Publishes a data track with the given name.
///
/// - Parameter name: Track name visible to other participants. Must be unique per publisher.
/// - Returns: A ``LocalDataTrack`` that can be used to push frames to subscribers.
/// - Throws: ``PublishError`` if the track cannot be published.
func publishDataTrack(name: String) async throws -> LocalDataTrack {
guard let manager = _room?.localDataTrackManager else {
throw LiveKitError(.invalidState, message: "Not connected to a room")
}
return try await manager.publishTrack(options: DataTrackOptions(name: name))
}

/// Publishes a data track for the duration of the given closure, then unpublishes automatically.
///
/// - Parameters:
/// - name: Track name visible to other participants.
/// - body: Closure that receives the published track. The track is unpublished when the closure returns or throws.
/// - Returns: The value returned by `body`.
func withDataTrack<T>(name: String, body: (LocalDataTrack) async throws -> T) async throws -> T {
let track = try await publishDataTrack(name: name)
return try await withTaskCancellationHandler {
defer { track.unpublish() }
return try await body(track)
} onCancel: {
track.unpublish()
}
}
}

// MARK: - Frame Drop Policy

enum FrameDropPolicy {
/// Propagate the error to the caller.
case `throw`
/// Silently skip the frame.
case drop
}

// MARK: - Sending AsyncSequence to a Track

extension LocalDataTrack {
/// Sends frames from the source until it ends or the track is unpublished.
func send<S: AsyncSequence>(
contentsOf source: S,
onQueueFull: FrameDropPolicy = .drop
) async throws where S.Element == DataTrackFrame {
for try await frame in source {
guard isPublished() else { break }
do {
try tryPush(frame: frame)
} catch PushFrameErrorReason.QueueFull {
switch onQueueFull {
case .throw: throw PushFrameErrorReason.QueueFull
case .drop: continue
}
}
}
}
}
Loading
Loading