Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
97f19f4
update sfu models
rahul-lohra Apr 14, 2026
8c09bdf
feat: add pins param in ParticipantJoinedEvent
rahul-lohra Apr 27, 2026
56a0249
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra Apr 27, 2026
a8921c7
feat: add logic with logs
rahul-lohra Apr 29, 2026
70bfcc8
feat: remove logs
rahul-lohra Apr 29, 2026
1c763bf
fix: correct the jetpack compose logic to render pinned icon
rahul-lohra Apr 29, 2026
dada793
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra Apr 29, 2026
24ad3a4
fix: remove logs
rahul-lohra Apr 29, 2026
bd10d0e
chore: revert changes
rahul-lohra Apr 29, 2026
350a430
chore: revert changes
rahul-lohra Apr 29, 2026
4056ffd
chore: remove finally
rahul-lohra Apr 29, 2026
c72056d
fix: fix unit tests
rahul-lohra Apr 29, 2026
6530dcf
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra May 1, 2026
cd0b197
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra May 5, 2026
63b30fe
feat: update codebase with develop
rahul-lohra May 5, 2026
c5e5c00
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra May 5, 2026
9395d66
Merge branch 'develop' into feature/rahullohra/pinned_users
rahul-lohra May 12, 2026
e372c92
chore: align with JS logic
rahul-lohra May 14, 2026
8941650
fix: refactor pins ownership, move them to PinManager
rahul-lohra May 15, 2026
34bd1f5
fix: add unit-tests
rahul-lohra May 15, 2026
162721a
Merge branch 'develop' into feature/rahullohra/pinned-users-2
rahul-lohra May 15, 2026
0a6ea1a
fix: remove duplicate code
rahul-lohra May 15, 2026
60f03ef
chore: add logic to updateServer Pins from list of participants with …
rahul-lohra May 20, 2026
e72368e
chore: refactor code
rahul-lohra May 20, 2026
91d66a4
Merge branch 'develop' into feature/rahullohra/pinned-users-2
rahul-lohra May 20, 2026
731de62
chore: Replace PinUpdateAtTime with PinEntry
rahul-lohra May 20, 2026
dd1d510
Merge branch 'develop' into feature/rahullohra/pinned-users-2
aleksandar-apostolov May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions generate-sfu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env bash

set -e

REPO_URL="git@github.com:GetStream/protocol.git"
REFERENCE_TYPE="branch"
REFERENCE_VALUE="main"

PROJECT_ROOT="$(dirname "$(realpath "$0")")/"
BUILD_DIR="$PROJECT_ROOT/build"
CLONE_DIR="$BUILD_DIR/protocol-repo"
OUTPUT_CLIENT_PATH="$PROJECT_ROOT/stream-video-android-core/src/main/proto/video/sfu/"

# Step 1: Delete OUTPUT_CLIENT_PATH if exists else create an empty directory
echo "🧹 Preparing output directory: $OUTPUT_CLIENT_PATH"
rm -rf "$OUTPUT_CLIENT_PATH"
mkdir -p "$OUTPUT_CLIENT_PATH"

# Step 2: Clone the repository with shallow depth
echo "🚀 Cloning repository: $REPO_URL (Type: $REFERENCE_TYPE, Value: $REFERENCE_VALUE)..."
rm -rf "$CLONE_DIR"
mkdir -p "$BUILD_DIR"
git clone --depth=1 --branch "$REFERENCE_VALUE" "$REPO_URL" "$CLONE_DIR"

cd "$CLONE_DIR"

# Step 3: Checkout to the correct branch, tag, or commit
if [ "$REFERENCE_TYPE" == "branch" ]; then
git checkout "$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "tag" ]; then
git fetch --tags
git checkout "tags/$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "commit" ]; then
git fetch --depth=1 origin "$REFERENCE_VALUE"
git checkout "$REFERENCE_VALUE"
else
echo "❌ ERROR: Invalid reference type '$REFERENCE_TYPE'. Use 'branch', 'tag', or 'commit'."
exit 1
fi

# Step 4: Copy content from CLONE_DIR/protobuf/video/sfu to OUTPUT_CLIENT_PATH
echo "📦 Copying proto files..."
SOURCE_PROTO_PATH="$CLONE_DIR/protobuf/video/sfu"

if [ ! -d "$SOURCE_PROTO_PATH" ]; then
echo "❌ ERROR: Source proto directory does not exist: $SOURCE_PROTO_PATH"
exit 1
fi

cp -R "$SOURCE_PROTO_PATH/"* "$OUTPUT_CLIENT_PATH"

# Step 5: Run Spotless (from project root, not inside cloned repo)
echo "✨ Running Spotless..."
cd "$PROJECT_ROOT"
./gradlew spotlessApply

# Step 6: Delete CLONE_DIR
echo "🗑 Cleaning up cloned repo..."
rm -rf "$CLONE_DIR"

echo "✅ Done."
205 changes: 186 additions & 19 deletions stream-video-android-core/api/stream-video-android-core.api

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"fastReconnectDeadlineSeconds. This constant will be removed in a future release.",
level = DeprecationLevel.WARNING,
)
const val sfuReconnectTimeoutMillis = 30_000

Check warning on line 140 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcaKrDdcKSm8aAks&open=AZ4qtcaKrDdcKSm8aAks&pullRequest=1683

/**
* Outcome of a single reconnect attempt. Each reconnect method returns one of
Expand Down Expand Up @@ -264,8 +264,8 @@
*/
private var isDestroyed = false

/** Session handles all real time communication for video and audio */
internal val session: MutableStateFlow<RtcSession?> = MutableStateFlow(null)

Check warning on line 268 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Don't expose mutable flow types.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcaKrDdcKSm8aAkr&open=AZ4qtcaKrDdcKSm8aAkr&pullRequest=1683

var sessionId = UUID.randomUUID().toString()
internal val unifiedSessionId = UUID.randomUUID().toString()
Expand Down Expand Up @@ -853,7 +853,7 @@
* @param strategy the initial reconnection strategy requested by the caller.
* @param reason a human-readable reason for logging / tracing.
*/
internal suspend fun reconnect(

Check failure on line 856 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 31 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcaKrDdcKSm8aAku&open=AZ4qtcaKrDdcKSm8aAku&pullRequest=1683
strategy: WebsocketReconnectStrategy,
reason: String,
) {
Expand Down Expand Up @@ -1870,7 +1870,7 @@
/**
* Should outlive both the call scope and the service scope and needs to be executed in the client-level scope.
* Because the call scope or service scope may be cancelled or finished while the network request is still in flight
* TODO: Run this in clientImpl.scope internally

Check warning on line 1873 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcaKrDdcKSm8aAkt&open=AZ4qtcaKrDdcKSm8aAkt&pullRequest=1683
*/
suspend fun reject(reason: RejectReason? = null): Result<RejectCallResponse> {
logger.d { "[reject] #ringing; rejectReason: $reason, call_id:$id" }
Expand Down Expand Up @@ -1937,9 +1937,13 @@
sessionId,
)

fun isServerPin(sessionId: String): Boolean = state._serverPins.value.containsKey(sessionId)
fun isServerPin(sessionId: String): Boolean = state.pinManager.serverPins.value.containsKey(
sessionId,
)

fun isLocalPin(sessionId: String): Boolean = state._localPins.value.containsKey(sessionId)
fun isLocalPin(sessionId: String): Boolean = state.pinManager.localPins.value.containsKey(
sessionId,
)

fun hasCapability(vararg capability: OwnCapability): Boolean {
val elements = capability.toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@
import io.getstream.video.android.core.notifications.internal.telecom.jetpack.JetpackTelecomRepository
import io.getstream.video.android.core.notifications.internal.telecom.jetpack.TelecomCall
import io.getstream.video.android.core.permission.PermissionRequest
import io.getstream.video.android.core.pinning.PinType
import io.getstream.video.android.core.pinning.PinUpdateAtTime
import io.getstream.video.android.core.pinning.PinEntry
import io.getstream.video.android.core.pinning.PinManager
import io.getstream.video.android.core.socket.common.scope.ClientScope
import io.getstream.video.android.core.socket.common.scope.UserScope
import io.getstream.video.android.core.sorting.SortPreset
Expand Down Expand Up @@ -167,6 +167,8 @@
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.map
import kotlin.collections.toMutableList
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
Expand Down Expand Up @@ -210,6 +212,9 @@
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}

internal typealias SessionId = String
internal typealias IsPinned = Boolean

/**
* The CallState class keeps all state for a call
* It's available on every call object
Expand Down Expand Up @@ -269,7 +274,7 @@
it is RealtimeConnection.Reconnecting
}

private val internalParticipants = ConcurrentHashMap<String, ParticipantState>()
private val internalParticipants = ConcurrentHashMap<SessionId, ParticipantState>()
private val _participants = MutableStateFlow<Map<String, ParticipantState>>(emptyMap())

/**
Expand Down Expand Up @@ -323,34 +328,31 @@
private val _dominantSpeaker: MutableStateFlow<ParticipantState?> = MutableStateFlow(null)
public val dominantSpeaker: StateFlow<ParticipantState?> = _dominantSpeaker

internal val _localPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())
internal val _serverPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())
internal val pinManager = PinManager() { internalParticipants }

/**
* Combined pin state preserving the underlying [PinUpdateAtTime] (which carries
* Combined pin state preserving the underlying [PinEntry] (which carries
* [io.getstream.video.android.core.pinning.PinType] and the original timestamp).
* Internal; the sorter consumes this to apply local-pin > server-pin > recency.
* Local pins win on key collision.
*/
internal val _pinnedParticipantsDetailed: StateFlow<Map<String, PinUpdateAtTime>> =
combine(_localPins, _serverPins) { local, server ->
val combined = mutableMapOf<String, PinUpdateAtTime>()
internal val _pinnedParticipantsDetailed: StateFlow<Map<SessionId, PinEntry>> =
combine(pinManager.localPins, pinManager.serverPins) { local, server ->
val combined = mutableMapOf<String, PinEntry>()
combined.putAll(server)
combined.putAll(local) // local overrides server on key collision
combined.toMap()
}.stateIn(scope, SharingStarted.Eagerly, emptyMap())

internal val _pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> =
internal val _pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> =
_pinnedParticipantsDetailed.mapState { detailed ->
detailed.mapValues { it.value.at }
}

/**
* Pinned participants, combined value both from server and local pins.
*/
val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants
val pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = _pinnedParticipants

val stats = CallStats(call, scope)

Expand Down Expand Up @@ -471,7 +473,7 @@
replaceWith = ReplaceWith("participants"),
level = DeprecationLevel.WARNING,
)
val sortedParticipants: Flow<List<ParticipantState>>

Check warning on line 476 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ5ERb4OxHKSlhRod5zJ&open=AZ5ERb4OxHKSlhRod5zJ&pullRequest=1683
get() = participants

/**
Expand Down Expand Up @@ -755,7 +757,8 @@
*/
val ccMode: StateFlow<ClosedCaptionMode> = closedCaptionManager.ccMode

private val pendingParticipantsJoined = ConcurrentHashMap<String, Participant>()
private val pendingParticipantsJoined =
ConcurrentHashMap<SessionId, Pair<Participant, IsPinned>>()

/**
* We re-create notification more than 1 times, so we don't want to
Expand Down Expand Up @@ -805,7 +808,7 @@
}

is PinsUpdatedEvent -> {
updateServerSidePins(event.pins)
pinManager.setServerPins(event.pins)
}

is UnblockedUserEvent -> {
Expand Down Expand Up @@ -1098,7 +1101,7 @@
} else {
_ringingState.value = RingingState.Outgoing(acceptedByCallee = true)
}
updateServerSidePins(
pinManager.setServerPins(
event.callState.pins.map {
PinUpdate(it.user_id, it.session_id)
},
Expand All @@ -1109,13 +1112,17 @@
try {
if (participants.value.size < 8) {
getOrCreateParticipant(event.participant)
pinManager.onParticipantJoined(event)
} else {
pendingParticipantsJoined[event.participant.session_id] = event.participant
pendingParticipantsJoined[event.participant.session_id] = Pair(event.participant, event.isPinned)
participantsUpdate.schedule(scope, participantsUpdateConfig) {
logger.d {
"[ParticipantJoinedEvent] #participants; #debounce; participants: ${participants.value.size}"
}
getOrCreateParticipants(pendingParticipantsJoined.values.toList())
val pendingParticipants = pendingParticipantsJoined.values.map { it.first }.toList()
getOrCreateParticipants(pendingParticipants)
val pendingParticipantsWithPin = pendingParticipantsJoined.values.toList()
pinManager.onParticipantsJoined(pendingParticipantsWithPin)
}
}
} catch (e: Exception) {
Expand All @@ -1136,16 +1143,8 @@
if (current?.participant?.sessionId == sessionId) {
_screenSharingSession.value = null
}
if (_localPins.value.containsKey(sessionId)) {
// Remove any pins for the participant
unpin(sessionId)
}

if (_serverPins.value.containsKey(sessionId)) {
scope.launch {
call.unpinForEveryone(sessionId, event.participant.user_id)
}
}
pinManager.onParticipantLeft(sessionId)
}

is SubscriberOfferEvent -> {
Expand Down Expand Up @@ -1295,19 +1294,6 @@
}
}

private fun updateServerSidePins(pins: List<PinUpdate>) {
// Update participants that are still in the call
val pinnedInCall = pins.filter {
internalParticipants.containsKey(it.sessionId)
}
_serverPins.value = pinnedInCall.associate {
Pair(
it.sessionId,
PinUpdateAtTime(it, OffsetDateTime.now(Clock.systemUTC()), PinType.Server),
)
}
}

private fun updateRingingState(rejectReason: RejectReason? = null) {
when (ringingState.value) {
RingingState.TimeoutNoAnswer, RingingState.RejectedByAll -> {
Expand All @@ -1331,7 +1317,7 @@
_session.value?.participants?.find { it.user.id == client.userId } != null
val outgoingMembersCount = _members.value.filter { it.value.user.id != client.userId }.size
val isCallEnded: Boolean = _endedAt.value != null
val createdBySelf = createdBy?.id == client.userId

Check warning on line 1320 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "createdBySelf" local variable.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcbdrDdcKSm8aAky&open=AZ4qtcbdrDdcKSm8aAky&pullRequest=1683

ringingLogger.d { "Current: ${_ringingState.value}, call_id: ${call.cid}" }

Expand All @@ -1348,7 +1334,7 @@
ringingLogger.d { "call_id: ${call.cid}, Flags: $ringingStateLogs" }

// no members - call is empty, we can join
val state: RingingState = if (hasActiveCall && !isJoinAndRingInProgress.get()) {

Check warning on line 1337 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Merge chained "if" statements into a single "when" statement.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcbdrDdcKSm8aAkz&open=AZ4qtcbdrDdcKSm8aAkz&pullRequest=1683
/**
* Normal join, not joinAndRing
*/
Expand Down Expand Up @@ -1701,19 +1687,11 @@
}

fun pin(userId: String, sessionId: String) {
val pins = _localPins.value.toMutableMap()
pins[sessionId] = PinUpdateAtTime(
PinUpdate(userId, sessionId),
OffsetDateTime.now(Clock.systemUTC()),
PinType.Local,
)
_localPins.value = pins
pinManager.pin(userId, sessionId)
}

fun unpin(sessionId: String) {
val pins = _localPins.value.toMutableMap()
pins.remove(sessionId)
_localPins.value = pins
pinManager.unpin(sessionId)
}

fun updateFromResponse(result: StopLiveResponse) {
Expand Down Expand Up @@ -1840,7 +1818,7 @@
}

@Deprecated("Use updateNotification(Int, Notification) instead")
fun updateNotification(notification: Notification) {

Check warning on line 1821 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcbdrDdcKSm8aAkx&open=AZ4qtcbdrDdcKSm8aAkx&pullRequest=1683
atomicNotification.set(notification)
}

Expand All @@ -1861,12 +1839,12 @@
private fun observeTelecomHold(repo: JetpackTelecomRepository) {
telecomHoldObserverJob?.cancel()

telecomHoldObserverJob = scope.launch(Dispatchers.Default) {

Check warning on line 1842 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Avoid hardcoded dispatchers.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcbdrDdcKSm8aAkw&open=AZ4qtcbdrDdcKSm8aAkw&pullRequest=1683
repo.currentCall
.map { (it as? TelecomCall.Registered)?.isOnHold == true }
.distinctUntilChanged()
.filter { it }
.collect { isOnHold ->

Check warning on line 1847 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "_" instead of this unused lambda parameter "isOnHold".

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcbdrDdcKSm8aAkv&open=AZ4qtcbdrDdcKSm8aAkv&pullRequest=1683
when (ringingState.value) {
is RingingState.Active -> {
call.leave("call-on-hold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public object RTCEventMapper {
}

event.participant_joined != null -> with(event.participant_joined) {
ParticipantJoinedEvent(participant!!, call_cid)
ParticipantJoinedEvent(participant!!, call_cid, is_pinned)
}

event.participant_left != null -> with(event.participant_left) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import stream.video.sfu.signal.ICERestartResponse
import stream.video.sfu.signal.ICETrickleResponse
import stream.video.sfu.signal.SendAnswerRequest
import stream.video.sfu.signal.SendAnswerResponse
import stream.video.sfu.signal.SendMetricsRequest
import stream.video.sfu.signal.SendMetricsResponse
import stream.video.sfu.signal.SendStatsRequest
import stream.video.sfu.signal.SendStatsResponse
import stream.video.sfu.signal.SetPublisherRequest
Expand Down Expand Up @@ -117,6 +119,9 @@ internal class RetryableSignalingServiceDecorator(
override suspend fun sendStats(sendStatsRequest: SendStatsRequest): SendStatsResponse =
retryCall({ it.error }) { decorated.sendStats(sendStatsRequest) }

override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse =
retryCall({ null }) { decorated.sendMetrics(sendMetricsRequest) }

override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse =
retryCall({ it.error }) { decorated.startNoiseCancellation(startNoiseCancellationRequest) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@
) : SfuDataEvent()

public data class ParticipantJoinedEvent(

val participant: Participant,
val callCid: String,
val isPinned: Boolean,
) : SfuDataEvent()

public data class ParticipantLeftEvent(
Expand Down Expand Up @@ -168,6 +168,7 @@
val pins: List<PinUpdate>,
) : SfuDataEvent()

// Todo Rahul, should rename it to PinTarget in v2

Check warning on line 171 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/events/SfuDataEvent.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ4qtcW6rDdcKSm8aAkq&open=AZ4qtcW6rDdcKSm8aAkq&pullRequest=1683
public data class PinUpdate(val userId: String, val sessionId: String)

public data class UnknownEvent(val event: Any?) : SfuDataEvent()
Expand Down
Loading
Loading