Skip to content

Commit 799baec

Browse files
Improve pinned participant state on rejoin with new session (#1683)
* update sfu models * feat: add pins param in ParticipantJoinedEvent * feat: add logic with logs * feat: remove logs * fix: correct the jetpack compose logic to render pinned icon * fix: remove logs * chore: revert changes * chore: revert changes * chore: remove finally * fix: fix unit tests * feat: update codebase with develop * chore: align with JS logic * fix: refactor pins ownership, move them to PinManager * fix: add unit-tests * fix: remove duplicate code * chore: add logic to updateServer Pins from list of participants with pin information * chore: refactor code * chore: Replace PinUpdateAtTime with PinEntry --------- Co-authored-by: Aleksandar Apostolov <apostolov.alexandar@gmail.com>
1 parent 56e8e46 commit 799baec

31 files changed

Lines changed: 6992 additions & 3080 deletions

generate-sfu.sh

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
REPO_URL="git@github.com:GetStream/protocol.git"
6+
REFERENCE_TYPE="branch"
7+
REFERENCE_VALUE="main"
8+
9+
PROJECT_ROOT="$(dirname "$(realpath "$0")")/"
10+
BUILD_DIR="$PROJECT_ROOT/build"
11+
CLONE_DIR="$BUILD_DIR/protocol-repo"
12+
OUTPUT_CLIENT_PATH="$PROJECT_ROOT/stream-video-android-core/src/main/proto/video/sfu/"
13+
14+
# Step 1: Delete OUTPUT_CLIENT_PATH if exists else create an empty directory
15+
echo "🧹 Preparing output directory: $OUTPUT_CLIENT_PATH"
16+
rm -rf "$OUTPUT_CLIENT_PATH"
17+
mkdir -p "$OUTPUT_CLIENT_PATH"
18+
19+
# Step 2: Clone the repository with shallow depth
20+
echo "🚀 Cloning repository: $REPO_URL (Type: $REFERENCE_TYPE, Value: $REFERENCE_VALUE)..."
21+
rm -rf "$CLONE_DIR"
22+
mkdir -p "$BUILD_DIR"
23+
git clone --depth=1 --branch "$REFERENCE_VALUE" "$REPO_URL" "$CLONE_DIR"
24+
25+
cd "$CLONE_DIR"
26+
27+
# Step 3: Checkout to the correct branch, tag, or commit
28+
if [ "$REFERENCE_TYPE" == "branch" ]; then
29+
git checkout "$REFERENCE_VALUE"
30+
elif [ "$REFERENCE_TYPE" == "tag" ]; then
31+
git fetch --tags
32+
git checkout "tags/$REFERENCE_VALUE"
33+
elif [ "$REFERENCE_TYPE" == "commit" ]; then
34+
git fetch --depth=1 origin "$REFERENCE_VALUE"
35+
git checkout "$REFERENCE_VALUE"
36+
else
37+
echo "❌ ERROR: Invalid reference type '$REFERENCE_TYPE'. Use 'branch', 'tag', or 'commit'."
38+
exit 1
39+
fi
40+
41+
# Step 4: Copy content from CLONE_DIR/protobuf/video/sfu to OUTPUT_CLIENT_PATH
42+
echo "📦 Copying proto files..."
43+
SOURCE_PROTO_PATH="$CLONE_DIR/protobuf/video/sfu"
44+
45+
if [ ! -d "$SOURCE_PROTO_PATH" ]; then
46+
echo "❌ ERROR: Source proto directory does not exist: $SOURCE_PROTO_PATH"
47+
exit 1
48+
fi
49+
50+
cp -R "$SOURCE_PROTO_PATH/"* "$OUTPUT_CLIENT_PATH"
51+
52+
# Step 5: Run Spotless (from project root, not inside cloned repo)
53+
echo "✨ Running Spotless..."
54+
cd "$PROJECT_ROOT"
55+
./gradlew spotlessApply
56+
57+
# Step 6: Delete CLONE_DIR
58+
echo "🗑 Cleaning up cloned repo..."
59+
rm -rf "$CLONE_DIR"
60+
61+
echo "✅ Done."

stream-video-android-core/api/stream-video-android-core.api

Lines changed: 186 additions & 19 deletions
Large diffs are not rendered by default.

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,9 +1937,13 @@ public class Call(
19371937
sessionId,
19381938
)
19391939

1940-
fun isServerPin(sessionId: String): Boolean = state._serverPins.value.containsKey(sessionId)
1940+
fun isServerPin(sessionId: String): Boolean = state.pinManager.serverPins.value.containsKey(
1941+
sessionId,
1942+
)
19411943

1942-
fun isLocalPin(sessionId: String): Boolean = state._localPins.value.containsKey(sessionId)
1944+
fun isLocalPin(sessionId: String): Boolean = state.pinManager.localPins.value.containsKey(
1945+
sessionId,
1946+
)
19431947

19441948
fun hasCapability(vararg capability: OwnCapability): Boolean {
19451949
val elements = capability.toList()

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

Lines changed: 28 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ import io.getstream.video.android.core.notifications.internal.service.CallServic
118118
import io.getstream.video.android.core.notifications.internal.telecom.jetpack.JetpackTelecomRepository
119119
import io.getstream.video.android.core.notifications.internal.telecom.jetpack.TelecomCall
120120
import io.getstream.video.android.core.permission.PermissionRequest
121-
import io.getstream.video.android.core.pinning.PinType
122-
import io.getstream.video.android.core.pinning.PinUpdateAtTime
121+
import io.getstream.video.android.core.pinning.PinEntry
122+
import io.getstream.video.android.core.pinning.PinManager
123123
import io.getstream.video.android.core.socket.common.scope.ClientScope
124124
import io.getstream.video.android.core.socket.common.scope.UserScope
125125
import io.getstream.video.android.core.sorting.SortPreset
@@ -167,6 +167,8 @@ import java.util.UUID
167167
import java.util.concurrent.ConcurrentHashMap
168168
import java.util.concurrent.atomic.AtomicBoolean
169169
import java.util.concurrent.atomic.AtomicReference
170+
import kotlin.collections.map
171+
import kotlin.collections.toMutableList
170172
import kotlin.time.Duration
171173
import kotlin.time.DurationUnit
172174
import kotlin.time.toDuration
@@ -210,6 +212,9 @@ public sealed interface RealtimeConnection {
210212
public data object Disconnected : RealtimeConnection // normal disconnect by the app
211213
}
212214

215+
internal typealias SessionId = String
216+
internal typealias IsPinned = Boolean
217+
213218
/**
214219
* The CallState class keeps all state for a call
215220
* It's available on every call object
@@ -269,7 +274,7 @@ public class CallState(
269274
it is RealtimeConnection.Reconnecting
270275
}
271276

272-
private val internalParticipants = ConcurrentHashMap<String, ParticipantState>()
277+
private val internalParticipants = ConcurrentHashMap<SessionId, ParticipantState>()
273278
private val _participants = MutableStateFlow<Map<String, ParticipantState>>(emptyMap())
274279

275280
/**
@@ -323,34 +328,31 @@ public class CallState(
323328
private val _dominantSpeaker: MutableStateFlow<ParticipantState?> = MutableStateFlow(null)
324329
public val dominantSpeaker: StateFlow<ParticipantState?> = _dominantSpeaker
325330

326-
internal val _localPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
327-
MutableStateFlow(emptyMap())
328-
internal val _serverPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
329-
MutableStateFlow(emptyMap())
331+
internal val pinManager = PinManager() { internalParticipants }
330332

331333
/**
332-
* Combined pin state preserving the underlying [PinUpdateAtTime] (which carries
334+
* Combined pin state preserving the underlying [PinEntry] (which carries
333335
* [io.getstream.video.android.core.pinning.PinType] and the original timestamp).
334336
* Internal; the sorter consumes this to apply local-pin > server-pin > recency.
335337
* Local pins win on key collision.
336338
*/
337-
internal val _pinnedParticipantsDetailed: StateFlow<Map<String, PinUpdateAtTime>> =
338-
combine(_localPins, _serverPins) { local, server ->
339-
val combined = mutableMapOf<String, PinUpdateAtTime>()
339+
internal val _pinnedParticipantsDetailed: StateFlow<Map<SessionId, PinEntry>> =
340+
combine(pinManager.localPins, pinManager.serverPins) { local, server ->
341+
val combined = mutableMapOf<String, PinEntry>()
340342
combined.putAll(server)
341343
combined.putAll(local) // local overrides server on key collision
342344
combined.toMap()
343345
}.stateIn(scope, SharingStarted.Eagerly, emptyMap())
344346

345-
internal val _pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> =
347+
internal val _pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> =
346348
_pinnedParticipantsDetailed.mapState { detailed ->
347349
detailed.mapValues { it.value.at }
348350
}
349351

350352
/**
351353
* Pinned participants, combined value both from server and local pins.
352354
*/
353-
val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants
355+
val pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = _pinnedParticipants
354356

355357
val stats = CallStats(call, scope)
356358

@@ -755,7 +757,8 @@ public class CallState(
755757
*/
756758
val ccMode: StateFlow<ClosedCaptionMode> = closedCaptionManager.ccMode
757759

758-
private val pendingParticipantsJoined = ConcurrentHashMap<String, Participant>()
760+
private val pendingParticipantsJoined =
761+
ConcurrentHashMap<SessionId, Pair<Participant, IsPinned>>()
759762

760763
/**
761764
* We re-create notification more than 1 times, so we don't want to
@@ -805,7 +808,7 @@ public class CallState(
805808
}
806809

807810
is PinsUpdatedEvent -> {
808-
updateServerSidePins(event.pins)
811+
pinManager.setServerPins(event.pins)
809812
}
810813

811814
is UnblockedUserEvent -> {
@@ -1098,7 +1101,7 @@ public class CallState(
10981101
} else {
10991102
_ringingState.value = RingingState.Outgoing(acceptedByCallee = true)
11001103
}
1101-
updateServerSidePins(
1104+
pinManager.setServerPins(
11021105
event.callState.pins.map {
11031106
PinUpdate(it.user_id, it.session_id)
11041107
},
@@ -1109,13 +1112,17 @@ public class CallState(
11091112
try {
11101113
if (participants.value.size < 8) {
11111114
getOrCreateParticipant(event.participant)
1115+
pinManager.onParticipantJoined(event)
11121116
} else {
1113-
pendingParticipantsJoined[event.participant.session_id] = event.participant
1117+
pendingParticipantsJoined[event.participant.session_id] = Pair(event.participant, event.isPinned)
11141118
participantsUpdate.schedule(scope, participantsUpdateConfig) {
11151119
logger.d {
11161120
"[ParticipantJoinedEvent] #participants; #debounce; participants: ${participants.value.size}"
11171121
}
1118-
getOrCreateParticipants(pendingParticipantsJoined.values.toList())
1122+
val pendingParticipants = pendingParticipantsJoined.values.map { it.first }.toList()
1123+
getOrCreateParticipants(pendingParticipants)
1124+
val pendingParticipantsWithPin = pendingParticipantsJoined.values.toList()
1125+
pinManager.onParticipantsJoined(pendingParticipantsWithPin)
11191126
}
11201127
}
11211128
} catch (e: Exception) {
@@ -1136,16 +1143,8 @@ public class CallState(
11361143
if (current?.participant?.sessionId == sessionId) {
11371144
_screenSharingSession.value = null
11381145
}
1139-
if (_localPins.value.containsKey(sessionId)) {
1140-
// Remove any pins for the participant
1141-
unpin(sessionId)
1142-
}
11431146

1144-
if (_serverPins.value.containsKey(sessionId)) {
1145-
scope.launch {
1146-
call.unpinForEveryone(sessionId, event.participant.user_id)
1147-
}
1148-
}
1147+
pinManager.onParticipantLeft(sessionId)
11491148
}
11501149

11511150
is SubscriberOfferEvent -> {
@@ -1295,19 +1294,6 @@ public class CallState(
12951294
}
12961295
}
12971296

1298-
private fun updateServerSidePins(pins: List<PinUpdate>) {
1299-
// Update participants that are still in the call
1300-
val pinnedInCall = pins.filter {
1301-
internalParticipants.containsKey(it.sessionId)
1302-
}
1303-
_serverPins.value = pinnedInCall.associate {
1304-
Pair(
1305-
it.sessionId,
1306-
PinUpdateAtTime(it, OffsetDateTime.now(Clock.systemUTC()), PinType.Server),
1307-
)
1308-
}
1309-
}
1310-
13111297
private fun updateRingingState(rejectReason: RejectReason? = null) {
13121298
when (ringingState.value) {
13131299
RingingState.TimeoutNoAnswer, RingingState.RejectedByAll -> {
@@ -1701,19 +1687,11 @@ public class CallState(
17011687
}
17021688

17031689
fun pin(userId: String, sessionId: String) {
1704-
val pins = _localPins.value.toMutableMap()
1705-
pins[sessionId] = PinUpdateAtTime(
1706-
PinUpdate(userId, sessionId),
1707-
OffsetDateTime.now(Clock.systemUTC()),
1708-
PinType.Local,
1709-
)
1710-
_localPins.value = pins
1690+
pinManager.pin(userId, sessionId)
17111691
}
17121692

17131693
fun unpin(sessionId: String) {
1714-
val pins = _localPins.value.toMutableMap()
1715-
pins.remove(sessionId)
1716-
_localPins.value = pins
1694+
pinManager.unpin(sessionId)
17171695
}
17181696

17191697
fun updateFromResponse(result: StopLiveResponse) {

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/signal/socket/RTCEventMapper.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public object RTCEventMapper {
9595
}
9696

9797
event.participant_joined != null -> with(event.participant_joined) {
98-
ParticipantJoinedEvent(participant!!, call_cid)
98+
ParticipantJoinedEvent(participant!!, call_cid, is_pinned)
9999
}
100100

101101
event.participant_left != null -> with(event.participant_left) {

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecorator.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import stream.video.sfu.signal.ICERestartResponse
2727
import stream.video.sfu.signal.ICETrickleResponse
2828
import stream.video.sfu.signal.SendAnswerRequest
2929
import stream.video.sfu.signal.SendAnswerResponse
30+
import stream.video.sfu.signal.SendMetricsRequest
31+
import stream.video.sfu.signal.SendMetricsResponse
3032
import stream.video.sfu.signal.SendStatsRequest
3133
import stream.video.sfu.signal.SendStatsResponse
3234
import stream.video.sfu.signal.SetPublisherRequest
@@ -117,6 +119,9 @@ internal class RetryableSignalingServiceDecorator(
117119
override suspend fun sendStats(sendStatsRequest: SendStatsRequest): SendStatsResponse =
118120
retryCall({ it.error }) { decorated.sendStats(sendStatsRequest) }
119121

122+
override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse =
123+
retryCall({ null }) { decorated.sendMetrics(sendMetricsRequest) }
124+
120125
override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse =
121126
retryCall({ it.error }) { decorated.startNoiseCancellation(startNoiseCancellationRequest) }
122127

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/events/SfuDataEvent.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public data class TrackUnpublishedEvent(
126126
) : SfuDataEvent()
127127

128128
public data class ParticipantJoinedEvent(
129-
130129
val participant: Participant,
131130
val callCid: String,
131+
val isPinned: Boolean,
132132
) : SfuDataEvent()
133133

134134
public data class ParticipantLeftEvent(
@@ -168,6 +168,7 @@ public data class PinsUpdatedEvent(
168168
val pins: List<PinUpdate>,
169169
) : SfuDataEvent()
170170

171+
// Todo Rahul, should rename it to PinTarget in v2
171172
public data class PinUpdate(val userId: String, val sessionId: String)
172173

173174
public data class UnknownEvent(val event: Any?) : SfuDataEvent()

0 commit comments

Comments
 (0)