Skip to content

Commit 0575ca6

Browse files
committed
Add support for generic request response
1 parent 224319f commit 0575ca6

10 files changed

Lines changed: 464 additions & 3 deletions

File tree

.changeset/late-lamps-greet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": minor
3+
---
4+
5+
Add support for generic RequestResponse

.changeset/polite-goats-refuse.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
"client-sdk-android": minor
3+
---
4+
5+
Add the following suspend methods to LocalParticipant:
6+
7+
- `setName`
8+
- `setMetadata`
9+
- `setAttributes`
10+
11+
These replace the following deprecated methods:
12+
13+
- `updateName`
14+
- `updateMetadata`
15+
- `updateAttributes`

livekit-android-sdk/src/main/java/io/livekit/android/ConnectOptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ data class ConnectOptions(
5353
/**
5454
* the protocol version to use with the server.
5555
*/
56-
val protocolVersion: ProtocolVersion = ProtocolVersion.v13,
56+
val protocolVersion: ProtocolVersion = ProtocolVersion.v15,
5757

5858
/**
5959
* The client protocol version to advertise to other participants in the room

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,7 @@ internal constructor(
10361036
fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
10371037
fun onTranscriptionReceived(transcription: LivekitModels.Transcription)
10381038
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
1039+
fun onRequestResponse(response: LivekitRtc.RequestResponse)
10391040
fun onRpcPacketReceived(dp: LivekitModels.DataPacket)
10401041
fun onDataStreamPacket(dp: LivekitModels.DataPacket, encryptionType: LivekitModels.Encryption.Type)
10411042
}
@@ -1275,6 +1276,10 @@ internal constructor(
12751276
listener?.onLocalTrackUnpublished(trackUnpublished)
12761277
}
12771278

1279+
override fun onRequestResponse(response: LivekitRtc.RequestResponse) {
1280+
listener?.onRequestResponse(response)
1281+
}
1282+
12781283
// --------------------------------- DataChannel.Observer ------------------------------------//
12791284

12801285
fun onBufferedAmountChange(dataChannel: DataChannel, previousAmount: Long) {

livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,13 @@ constructor(
14221422
localParticipant.handleSubscribedQualityUpdate(subscribedQualityUpdate)
14231423
}
14241424

1425+
/**
1426+
* @suppress
1427+
*/
1428+
override fun onRequestResponse(response: LivekitRtc.RequestResponse) {
1429+
localParticipant.handleRequestResponse(response)
1430+
}
1431+
14251432
/**
14261433
* @suppress
14271434
*/

livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import okhttp3.WebSocketListener
6060
import okio.ByteString
6161
import okio.ByteString.Companion.toByteString
6262
import java.util.Date
63+
import java.util.concurrent.atomic.AtomicInteger
6364
import javax.inject.Inject
6465
import javax.inject.Named
6566
import javax.inject.Singleton
@@ -97,6 +98,7 @@ constructor(
9798
private var lastUrl: String? = null
9899
internal var lastOptions: ConnectOptions? = null
99100
private var lastRoomOptions: RoomOptions? = null
101+
private val nextRequestId = AtomicInteger(0)
100102

101103
// join will always return a JoinResponse.
102104
// reconnect will return a ReconnectResponse or a Unit if a different response was received.
@@ -563,8 +565,16 @@ constructor(
563565
sendRequest(request)
564566
}
565567

566-
fun sendUpdateLocalMetadata(metadata: String?, name: String?, attributes: Map<String, String>? = emptyMap()) {
568+
internal fun allocateRequestId(): Int = nextRequestId.incrementAndGet()
569+
570+
fun sendUpdateLocalMetadata(
571+
metadata: String?,
572+
name: String?,
573+
attributes: Map<String, String> = emptyMap(),
574+
requestId: Int = allocateRequestId(),
575+
): Int {
567576
val update = LivekitRtc.UpdateParticipantMetadata.newBuilder()
577+
.setRequestId(requestId)
568578
.setMetadata(metadata ?: "")
569579
.setName(name ?: "")
570580
.putAllAttributes(attributes)
@@ -574,6 +584,7 @@ constructor(
574584
.build()
575585

576586
sendRequest(request)
587+
return requestId
577588
}
578589

579590
fun sendSyncState(syncState: LivekitRtc.SyncState) {
@@ -837,7 +848,7 @@ constructor(
837848
}
838849

839850
LivekitRtc.SignalResponse.MessageCase.REQUEST_RESPONSE -> {
840-
// TODO
851+
listener?.onRequestResponse(response.requestResponse)
841852
}
842853

843854
LivekitRtc.SignalResponse.MessageCase.MESSAGE_NOT_SET,
@@ -957,6 +968,7 @@ constructor(
957968
fun onRefreshToken(token: String)
958969
fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
959970
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
971+
fun onRequestResponse(response: LivekitRtc.RequestResponse)
960972
}
961973

962974
companion object {
@@ -1024,6 +1036,9 @@ enum class ProtocolVersion(val value: Int) {
10241036

10251037
// new leave request handling
10261038
v13(13),
1039+
1040+
// signal request response handling
1041+
v15(15),
10271042
}
10281043

10291044
/**

livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import io.livekit.android.room.isSVCCodec
4141
import io.livekit.android.room.rpc.RpcClientManager
4242
import io.livekit.android.room.rpc.RpcManager
4343
import io.livekit.android.room.rpc.RpcServerManager
44+
import io.livekit.android.room.signal.SignalRequestException
4445
import io.livekit.android.room.track.DataPublishReliability
4546
import io.livekit.android.room.track.LocalAudioTrack
4647
import io.livekit.android.room.track.LocalAudioTrackOptions
@@ -59,13 +60,19 @@ import io.livekit.android.room.track.screencapture.ScreenCaptureParams
5960
import io.livekit.android.room.util.EncodingUtils
6061
import io.livekit.android.rpc.RpcError
6162
import io.livekit.android.util.LKLog
63+
import io.livekit.android.util.TimeoutException
6264
import io.livekit.android.util.flow
6365
import io.livekit.android.util.rethrowIfCancellationSignal
66+
import io.livekit.android.util.withDeadline
6467
import io.livekit.android.webrtc.sortVideoCodecPreferences
68+
import kotlinx.coroutines.CancellationException
69+
import kotlinx.coroutines.CompletableDeferred
6570
import kotlinx.coroutines.CoroutineDispatcher
6671
import kotlinx.coroutines.Job
6772
import kotlinx.coroutines.async
6873
import kotlinx.coroutines.coroutineScope
74+
import kotlinx.coroutines.flow.combine
75+
import kotlinx.coroutines.flow.first
6976
import kotlinx.coroutines.launch
7077
import kotlinx.coroutines.sync.Mutex
7178
import kotlinx.coroutines.sync.withLock
@@ -90,6 +97,7 @@ import javax.inject.Named
9097
import kotlin.math.max
9198
import kotlin.math.min
9299
import kotlin.time.Duration
100+
import kotlin.time.Duration.Companion.milliseconds
93101

94102
class LocalParticipant
95103
@AssistedInject
@@ -130,6 +138,8 @@ internal constructor(
130138

131139
private val jobs = mutableMapOf<LocalTrackPublication, Job>()
132140

141+
private val pendingSignalRequests = Collections.synchronizedMap(mutableMapOf<Int, CompletableDeferred<Unit>>())
142+
133143
// For ensuring that only one caller can execute setTrackEnabled at a time.
134144
// Without it, there's a potential to create multiple of the same source,
135145
// Camera has deadlock issues with multiple CameraCapturers trying to activate/stop.
@@ -1109,12 +1119,59 @@ internal constructor(
11091119
}
11101120
}
11111121

1122+
/**
1123+
* Sets and updates the metadata of the local participant.
1124+
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
1125+
*
1126+
* @param metadata the metadata to set
1127+
* @return a [Result] that succeeds when the server confirms the update, or fails with
1128+
* [SignalRequestException] if the server rejects the request or
1129+
* [TimeoutException] if it times out
1130+
*/
1131+
@CheckResult
1132+
suspend fun setMetadata(metadata: String): Result<Unit> {
1133+
return requestMetadataUpdate(metadata = metadata)
1134+
}
1135+
1136+
/**
1137+
* Sets and updates the name of the local participant.
1138+
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
1139+
*
1140+
* @param name the name to set
1141+
* @return a [Result] that succeeds when the server confirms the update, or fails with
1142+
* [SignalRequestException] if the server rejects the request or
1143+
* [TimeoutException] if it times out
1144+
*/
1145+
@CheckResult
1146+
suspend fun setName(name: String): Result<Unit> {
1147+
return requestMetadataUpdate(name = name)
1148+
}
1149+
1150+
/**
1151+
* Set or update participant attributes. It will make updates only to keys that
1152+
* are present in [attributes], and will not override others.
1153+
*
1154+
* To delete a value, set the value to an empty string.
1155+
*
1156+
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
1157+
*
1158+
* @param attributes attributes to update
1159+
* @return a [Result] that succeeds when the server confirms the update, or fails with
1160+
* [SignalRequestException] if the server rejects the request or
1161+
* [TimeoutException] if it times out
1162+
*/
1163+
@CheckResult
1164+
suspend fun setAttributes(attributes: Map<String, String>): Result<Unit> {
1165+
return requestMetadataUpdate(attributes = attributes)
1166+
}
1167+
11121168
/**
11131169
* Updates the metadata of the local participant. Changes will not be reflected until the
11141170
* server responds confirming the update.
11151171
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
11161172
* @param metadata
11171173
*/
1174+
@Deprecated(message = "Use the suspend function setMetadata instead.")
11181175
fun updateMetadata(metadata: String) {
11191176
this.engine.client.sendUpdateLocalMetadata(metadata, name)
11201177
}
@@ -1125,6 +1182,7 @@ internal constructor(
11251182
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
11261183
* @param name
11271184
*/
1185+
@Deprecated(message = "Use the suspend function setName instead.")
11281186
fun updateName(name: String) {
11291187
this.engine.client.sendUpdateLocalMetadata(metadata, name)
11301188
}
@@ -1138,6 +1196,7 @@ internal constructor(
11381196
* Note: this requires `canUpdateOwnMetadata` permission.
11391197
* @param attributes attributes to update
11401198
*/
1199+
@Deprecated(message = "Use the suspend function setAttributes instead.")
11411200
fun updateAttributes(attributes: Map<String, String>) {
11421201
this.engine.client.sendUpdateLocalMetadata(metadata, name, attributes)
11431202
}
@@ -1147,6 +1206,88 @@ internal constructor(
11471206
pub?.muted = muted
11481207
}
11491208

1209+
internal fun handleRequestResponse(response: LivekitRtc.RequestResponse) {
1210+
val deferred = pendingSignalRequests[response.requestId] ?: return
1211+
if (response.reason != LivekitRtc.RequestResponse.Reason.OK) {
1212+
pendingSignalRequests.remove(response.requestId)
1213+
deferred.completeExceptionally(SignalRequestException.fromResponse(response))
1214+
return
1215+
}
1216+
pendingSignalRequests.remove(response.requestId)
1217+
}
1218+
1219+
private suspend fun requestMetadataUpdate(
1220+
metadata: String? = null,
1221+
name: String? = null,
1222+
attributes: Map<String, String>? = null,
1223+
): Result<Unit> {
1224+
val requestId = engine.client.allocateRequestId()
1225+
val deferred = CompletableDeferred<Unit>()
1226+
pendingSignalRequests[requestId] = deferred
1227+
1228+
return try {
1229+
engine.client.sendUpdateLocalMetadata(
1230+
metadata = metadata ?: this.metadata,
1231+
name = name ?: this.name,
1232+
attributes = attributes ?: emptyMap(),
1233+
requestId = requestId,
1234+
)
1235+
withDeadline(METADATA_UPDATE_TIMEOUT) {
1236+
coroutineScope {
1237+
val confirmationJob = launch {
1238+
combine(
1239+
::name.flow,
1240+
::metadata.flow,
1241+
::attributes.flow,
1242+
) { _, _, _ -> }
1243+
.first { isMetadataUpdateConfirmed(metadata, name, attributes) }
1244+
if (!deferred.isCompleted) {
1245+
deferred.complete(Unit)
1246+
}
1247+
}
1248+
try {
1249+
deferred.await()
1250+
} finally {
1251+
confirmationJob.cancel()
1252+
}
1253+
}
1254+
}
1255+
Result.success(Unit)
1256+
} catch (e: TimeoutException) {
1257+
deferred.completeExceptionally(e)
1258+
Result.failure(e)
1259+
} catch (e: CancellationException) {
1260+
deferred.cancel()
1261+
throw e
1262+
} catch (e: Exception) {
1263+
Result.failure(e)
1264+
} finally {
1265+
pendingSignalRequests.remove(requestId)
1266+
}
1267+
}
1268+
1269+
private fun isMetadataUpdateConfirmed(
1270+
metadata: String?,
1271+
name: String?,
1272+
attributes: Map<String, String>?,
1273+
): Boolean {
1274+
if (name != null && this.name != name) {
1275+
return false
1276+
}
1277+
if (metadata != null && this.metadata != metadata) {
1278+
return false
1279+
}
1280+
if (attributes != null &&
1281+
!attributes.all { (key, value) ->
1282+
val current = this.attributes[key]
1283+
current == value || (value.isEmpty() && current.isNullOrEmpty())
1284+
}
1285+
) {
1286+
return false
1287+
}
1288+
return true
1289+
}
1290+
11501291
internal fun handleSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
11511292
if (!dynacast) {
11521293
return
@@ -1319,6 +1460,9 @@ internal constructor(
13191460
* @suppress
13201461
*/
13211462
fun cleanup() {
1463+
pendingSignalRequests.values.forEach { it.cancel() }
1464+
pendingSignalRequests.clear()
1465+
13221466
for (pub in trackPublications.values) {
13231467
val track = pub.track
13241468

@@ -1394,6 +1538,10 @@ internal constructor(
13941538
interface Factory {
13951539
fun create(dynacast: Boolean): LocalParticipant
13961540
}
1541+
1542+
companion object {
1543+
private val METADATA_UPDATE_TIMEOUT = 5_000.milliseconds
1544+
}
13971545
}
13981546

13991547
internal fun LocalParticipant.publishTracksInfo(): List<LivekitRtc.TrackPublishedResponse> {

0 commit comments

Comments
 (0)