Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 57 additions & 28 deletions Sources/LiveKit/Core/RPC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,36 @@ public struct RpcError: Error {
}
}

/*
* Maximum payload size for RPC requests and responses. If a payload exceeds this size,
* the RPC call will fail with a REQUEST_PAYLOAD_TOO_LARGE(1402) or RESPONSE_PAYLOAD_TOO_LARGE(1504) error.
*/
/// Maximum payload size for RPC v1 requests and responses. v2 (data-stream-based) payloads
/// have no size limit. Cross-version interactions still go through v1 packets and are
/// subject to this limit.
let MAX_RPC_PAYLOAD_BYTES = 15360 // 15 KB

// MARK: - Client protocol versioning

/// Legacy client. Only supports RPC v1 (inline `RpcRequest`/`RpcResponse` packets).
public let CLIENT_PROTOCOL_DEFAULT: Int32 = 0

/// Supports RPC v2 — request and response payloads transported over data streams,
/// lifting the v1 15 KB payload size limit.
public let CLIENT_PROTOCOL_DATA_STREAM_RPC: Int32 = 1

// MARK: - RPC v2 stream constants

enum RpcStreamTopic {
static let request = "lk.rpc_request"
static let response = "lk.rpc_response"
}

enum RpcStreamAttribute {
static let requestId = "lk.rpc_request_id"
static let method = "lk.rpc_request_method"
static let timeoutMs = "lk.rpc_request_response_timeout_ms"
static let version = "lk.rpc_request_version"
}

let RPC_STREAM_VERSION = "2"

/// A handler that processes an RPC request and returns a string
/// that will be sent back to the requester.
///
Expand Down Expand Up @@ -141,32 +165,11 @@ struct PendingRpcResponse {
let onResolve: @Sendable (_ payload: String?, _ error: RpcError?) -> Void
}

actor RpcStateManager: Loggable {
private var handlers: [String: RpcHandler] = [:] // methodName to handler
/// Caller-side RPC state: tracks pending acks and pending responses for outgoing requests.
actor RpcClientManager: Loggable {
private var pendingAcks: Set<String> = Set()
private var pendingResponses: [String: PendingRpcResponse] = [:] // requestId to pending response

func registerHandler(_ method: String, handler: @escaping RpcHandler) throws {
guard !isRpcMethodRegistered(method) else {
throw LiveKitError(.invalidState, message: "RPC method '\(method)' already registered")
}
handlers[method] = handler
}

func unregisterHandler(_ method: String) {
if handlers.removeValue(forKey: method) == nil {
log("No handler registered for RPC method '\(method)'", .warning)
}
}

func isRpcMethodRegistered(_ method: String) -> Bool {
handlers[method] != nil
}

func getHandler(for method: String) -> RpcHandler? {
handlers[method]
}

func addPendingAck(_ requestId: String) {
pendingAcks.insert(requestId)
}
Expand All @@ -189,8 +192,34 @@ actor RpcStateManager: Loggable {
pendingResponses.removeValue(forKey: requestId)
}

func removeAllPending(_ requestId: String) async {
func removeAllPending(_ requestId: String) {
pendingAcks.remove(requestId)
pendingResponses.removeValue(forKey: requestId)
}
}

/// Handler-side RPC state: tracks registered method handlers for incoming requests.
actor RpcServerManager: Loggable {
private var handlers: [String: RpcHandler] = [:] // methodName to handler

func registerHandler(_ method: String, handler: @escaping RpcHandler) throws {
guard !isRpcMethodRegistered(method) else {
throw LiveKitError(.invalidState, message: "RPC method '\(method)' already registered")
}
handlers[method] = handler
}

func unregisterHandler(_ method: String) {
if handlers.removeValue(forKey: method) == nil {
log("No handler registered for RPC method '\(method)'", .warning)
}
}

func isRpcMethodRegistered(_ method: String) -> Bool {
handlers[method] != nil
}

func getHandler(for method: String) -> RpcHandler? {
handlers[method]
}
}
16 changes: 16 additions & 0 deletions Sources/LiveKit/Core/Room+DataStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public extension Room {
/// the remote participant who initiated the stream.
///
func registerByteStreamHandler(for topic: String, onNewStream: @escaping ByteStreamHandler) async throws {
try Self.checkReserved(topic: topic)
try await incomingStreamManager.registerByteStreamHandler(for: topic, onNewStream)
}

Expand All @@ -42,6 +43,7 @@ public extension Room {
/// the remote participant who initiated the stream.
///
func registerTextStreamHandler(for topic: String, onNewStream: @escaping TextStreamHandler) async throws {
try Self.checkReserved(topic: topic)
try await incomingStreamManager.registerTextStreamHandler(for: topic, onNewStream)
}

Expand All @@ -58,6 +60,20 @@ public extension Room {
}
}

extension Room {
private static let reservedStreamTopics: Set<String> = [
RpcStreamTopic.request,
RpcStreamTopic.response,
]

static func checkReserved(topic: String) throws {
guard !reservedStreamTopics.contains(topic) else {
throw LiveKitError(.invalidParameter,
message: "Stream topic '\(topic)' is reserved for internal SDK use")
}
}
}

// MARK: - Objective-C Compatibility

public extension Room {
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Room+RPC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public extension Room {
func registerRpcMethod(_ method: String,
handler: @escaping RpcHandler) async throws
{
try await rpcState.registerHandler(method, handler: handler)
try await rpcServer.registerHandler(method, handler: handler)
}

/// Unregisters a previously registered RPC method.
Expand All @@ -58,7 +58,7 @@ public extension Room {
/// - Parameter method: The name of the RPC method to unregister
///
func unregisterRpcMethod(_ method: String) async {
await rpcState.unregisterHandler(method)
await rpcServer.unregisterHandler(method)
}

/// Checks whether or not a handler has been registered for an RPC method.
Expand All @@ -69,7 +69,7 @@ public extension Room {
/// - Returns: `true` if a handler has been registered, otherwise `false`.
///
func isRpcMethodRegistered(_ method: String) async -> Bool {
await rpcState.isRpcMethodRegistered(method)
await rpcServer.isRpcMethodRegistered(method)
}
}

Expand Down
19 changes: 18 additions & 1 deletion Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {

// MARK: - RPC

let rpcState = RpcStateManager()
let rpcClient = RpcClientManager()
let rpcServer = RpcServerManager()

// MARK: - State

Expand Down Expand Up @@ -258,6 +259,22 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
await metricsManager.register(room: self)
}

// Register internal handlers for RPC v2 data streams. Topics are reserved and
// user code is rejected from registering handlers for them via the public API.
Task { [weak self] in
guard let self else { return }
do {
try await incomingStreamManager.registerTextStreamHandler(for: RpcStreamTopic.request) { [weak self] reader, identity in
await self?.localParticipant.handleIncomingRpcRequestStream(reader: reader, callerIdentity: identity)
}
try await incomingStreamManager.registerTextStreamHandler(for: RpcStreamTopic.response) { [weak self] reader, identity in
await self?.localParticipant.handleIncomingRpcResponseStream(reader: reader, senderIdentity: identity)
}
} catch {
log("[Rpc] Failed to register internal RPC stream handlers: \(error)", .error)
}
}

// trigger events when state mutates
_state.onDidMutate = { [weak self] newState, oldState in
guard let self else { return }
Expand Down
Loading
Loading