Skip to content

Commit 6bebf15

Browse files
committed
feat: per-call DDP registry for call waiting
Replace singleton DDP client with VoipPerCallDdpRegistry keyed by callId so busy reject and second incoming call do not disconnect the first call's listener. Add isLiveClient guards for stale callbacks; stopClient(callId) and stopAllClients for scoped and full teardown. Add unit tests for registry isolation and logged-in state.
1 parent d7bffcd commit 6bebf15

File tree

6 files changed

+297
-84
lines changed

6 files changed

+297
-84
lines changed

android/app/src/main/java/chat/rocket/reactnative/voip/VoipNotification.kt

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ class VoipNotification(private val context: Context) {
7676
private val SUPPORTED_VOIP_FEATURES = JSONArray().apply { put("audio") }
7777
private val timeoutHandler = Handler(Looper.getMainLooper())
7878
private val timeoutCallbacks = mutableMapOf<String, Runnable>()
79-
private var ddpClient: DDPClient? = null
80-
private var isDdpLoggedIn = false
79+
private val ddpRegistry = VoipPerCallDdpRegistry<DDPClient> { client ->
80+
client.clearQueuedMethodCalls()
81+
client.disconnect()
82+
}
83+
84+
/** False when [callId] was reassigned or torn down (stale DDP callback). */
85+
private fun isLiveClient(callId: String, client: DDPClient) = ddpRegistry.clientFor(callId) === client
8186

8287
/**
8388
* Cancels a VoIP notification by ID.
@@ -135,7 +140,7 @@ class VoipNotification(private val context: Context) {
135140
putExtras(payload.toBundle())
136141
}
137142
)
138-
stopDDPClientInternal()
143+
ddpRegistry.stopClient(payload.callId)
139144
Log.d(TAG, "Timed out incoming VoIP call: ${payload.callId}")
140145
}
141146

@@ -147,7 +152,7 @@ class VoipNotification(private val context: Context) {
147152
fun handleDeclineAction(context: Context, payload: VoipPayload) {
148153
Log.d(TAG, "Decline action triggered for callId: ${payload.callId}")
149154
cancelTimeout(payload.callId)
150-
if (isDdpLoggedIn) {
155+
if (ddpRegistry.isLoggedIn(payload.callId)) {
151156
sendRejectSignal(context, payload)
152157
} else {
153158
queueRejectSignal(context, payload)
@@ -239,7 +244,7 @@ class VoipNotification(private val context: Context) {
239244
fun finish(ddpSuccess: Boolean) {
240245
if (!finished.compareAndSet(false, true)) return
241246
timeoutRunnable?.let { timeoutHandler.removeCallbacks(it) }
242-
stopDDPClientInternal()
247+
ddpRegistry.stopClient(payload.callId)
243248
if (ddpSuccess) {
244249
answerIncomingCall(payload.callId)
245250
VoipModule.storeInitialEvents(payload)
@@ -266,14 +271,14 @@ class VoipNotification(private val context: Context) {
266271
timeoutRunnable = postedTimeout
267272
timeoutHandler.postDelayed(postedTimeout, 10_000L)
268273

269-
val client = ddpClient
274+
val client = ddpRegistry.clientFor(payload.callId)
270275
if (client == null) {
271276
Log.d(TAG, "Native DDP client unavailable for accept ${payload.callId}")
272277
finish(false)
273278
return
274279
}
275280

276-
if (isDdpLoggedIn) {
281+
if (ddpRegistry.isLoggedIn(payload.callId)) {
277282
sendAcceptSignal(context, payload) { success ->
278283
finish(success)
279284
}
@@ -342,7 +347,7 @@ class VoipNotification(private val context: Context) {
342347
}
343348

344349
private fun sendRejectSignal(context: Context, payload: VoipPayload) {
345-
val client = ddpClient
350+
val client = ddpRegistry.clientFor(payload.callId)
346351
if (client == null) {
347352
Log.d(TAG, "Native DDP client unavailable, cannot send reject for ${payload.callId}")
348353
return
@@ -352,12 +357,12 @@ class VoipNotification(private val context: Context) {
352357

353358
client.callMethod("stream-notify-user", params) { success ->
354359
Log.d(TAG, "Native reject signal result for ${payload.callId}: $success")
355-
stopDDPClientInternal()
360+
ddpRegistry.stopClient(payload.callId)
356361
}
357362
}
358363

359364
private fun queueRejectSignal(context: Context, payload: VoipPayload) {
360-
val client = ddpClient
365+
val client = ddpRegistry.clientFor(payload.callId)
361366
if (client == null) {
362367
Log.d(TAG, "Native DDP client unavailable, cannot queue reject for ${payload.callId}")
363368
return
@@ -367,13 +372,13 @@ class VoipNotification(private val context: Context) {
367372

368373
client.queueMethodCall("stream-notify-user", params) { success ->
369374
Log.d(TAG, "Queued native reject signal result for ${payload.callId}: $success")
370-
stopDDPClientInternal()
375+
ddpRegistry.stopClient(payload.callId)
371376
}
372377
Log.d(TAG, "Queued native reject signal for ${payload.callId}")
373378
}
374379

375-
private fun flushPendingQueuedSignalsIfNeeded(): Boolean {
376-
val client = ddpClient ?: return false
380+
private fun flushPendingQueuedSignalsIfNeeded(callId: String): Boolean {
381+
val client = ddpRegistry.clientFor(callId) ?: return false
377382
if (!client.hasQueuedMethodCalls()) {
378383
return false
379384
}
@@ -387,7 +392,7 @@ class VoipNotification(private val context: Context) {
387392
payload: VoipPayload,
388393
onComplete: (Boolean) -> Unit
389394
) {
390-
val client = ddpClient
395+
val client = ddpRegistry.clientFor(payload.callId)
391396
if (client == null) {
392397
Log.d(TAG, "Native DDP client unavailable, cannot send accept for ${payload.callId}")
393398
onComplete(false)
@@ -410,7 +415,7 @@ class VoipNotification(private val context: Context) {
410415
payload: VoipPayload,
411416
onComplete: (Boolean) -> Unit
412417
) {
413-
val client = ddpClient
418+
val client = ddpRegistry.clientFor(payload.callId)
414419
if (client == null) {
415420
Log.d(TAG, "Native DDP client unavailable, cannot queue accept for ${payload.callId}")
416421
onComplete(false)
@@ -440,13 +445,13 @@ class VoipNotification(private val context: Context) {
440445
val userId = ejson.userId()
441446
if (userId.isNullOrEmpty()) {
442447
Log.d(TAG, "Missing userId, cannot build stream-notify-user params for ${payload.callId}")
443-
stopDDPClientInternal()
448+
ddpRegistry.stopClient(payload.callId)
444449
return null
445450
}
446451
val deviceId = Settings.Secure.getString(context.contentResolver, Settings.Secure.ANDROID_ID)
447452
if (deviceId.isNullOrEmpty()) {
448453
Log.d(TAG, "Missing deviceId, cannot build stream-notify-user params for ${payload.callId}")
449-
stopDDPClientInternal()
454+
ddpRegistry.stopClient(payload.callId)
450455
return null
451456
}
452457
return VoipMediaCallIdentity(userId, deviceId)
@@ -538,7 +543,7 @@ class VoipNotification(private val context: Context) {
538543
Log.d(TAG, "Rejected busy call ${payload.callId} — user already on a call")
539544
cancelTimeout(payload.callId)
540545
startListeningForCallEnd(context, payload)
541-
if (isDdpLoggedIn) {
546+
if (ddpRegistry.isLoggedIn(payload.callId)) {
542547
sendRejectSignal(context, payload)
543548
} else {
544549
queueRejectSignal(context, payload)
@@ -549,8 +554,6 @@ class VoipNotification(private val context: Context) {
549554

550555
@JvmStatic
551556
fun startListeningForCallEnd(context: Context, payload: VoipPayload) {
552-
stopDDPClientInternal()
553-
554557
val ejson = Ejson()
555558
ejson.host = payload.host
556559
val userId = ejson.userId()
@@ -564,12 +567,14 @@ class VoipNotification(private val context: Context) {
564567
val deviceId = Settings.Secure.getString(context.contentResolver, Settings.Secure.ANDROID_ID)
565568
val callId = payload.callId
566569
val client = DDPClient()
567-
ddpClient = client
568-
isDdpLoggedIn = false
570+
ddpRegistry.putClient(callId, client)
569571

570572
Log.d(TAG, "Starting DDP listener for call $callId")
571573

572-
client.onCollectionMessage = { message ->
574+
client.onCollectionMessage = collector@{ message ->
575+
if (!isLiveClient(callId, client)) {
576+
return@collector
577+
}
573578
Log.d(TAG, "DDP received message: $message")
574579
val fields = message.optJSONObject("fields")
575580
if (fields != null) {
@@ -593,6 +598,9 @@ class VoipNotification(private val context: Context) {
593598
)) {
594599
val appContext = context.applicationContext
595600
Handler(Looper.getMainLooper()).post {
601+
if (!isLiveClient(callId, client)) {
602+
return@post
603+
}
596604
cancelTimeout(callId)
597605
disconnectIncomingCall(callId, false)
598606
cancelById(appContext, payload.notificationId)
@@ -601,7 +609,7 @@ class VoipNotification(private val context: Context) {
601609
putExtras(payload.toBundle())
602610
}
603611
)
604-
stopDDPClientInternal()
612+
ddpRegistry.stopClient(callId)
605613
}
606614
}
607615
}
@@ -611,21 +619,27 @@ class VoipNotification(private val context: Context) {
611619
}
612620

613621
client.connect(payload.host) { connected ->
622+
if (!isLiveClient(callId, client)) {
623+
return@connect
624+
}
614625
if (!connected) {
615626
Log.d(TAG, "DDP connection failed")
616-
stopDDPClientInternal()
627+
ddpRegistry.stopClient(callId)
617628
return@connect
618629
}
619630

620631
client.login(token) { loggedIn ->
632+
if (!isLiveClient(callId, client)) {
633+
return@login
634+
}
621635
if (!loggedIn) {
622636
Log.d(TAG, "DDP login failed")
623-
stopDDPClientInternal()
637+
ddpRegistry.stopClient(callId)
624638
return@login
625639
}
626640

627-
isDdpLoggedIn = true
628-
if (flushPendingQueuedSignalsIfNeeded()) {
641+
ddpRegistry.markLoggedIn(callId)
642+
if (flushPendingQueuedSignalsIfNeeded(callId)) {
629643
return@login
630644
}
631645

@@ -638,9 +652,12 @@ class VoipNotification(private val context: Context) {
638652
}
639653

640654
client.subscribe("stream-notify-user", params) { subscribed ->
655+
if (!isLiveClient(callId, client)) {
656+
return@subscribe
657+
}
641658
Log.d(TAG, "DDP subscribe result: $subscribed")
642659
if (!subscribed) {
643-
stopDDPClientInternal()
660+
ddpRegistry.stopClient(callId)
644661
}
645662
}
646663
}
@@ -651,14 +668,7 @@ class VoipNotification(private val context: Context) {
651668
@JvmStatic
652669
fun stopDDPClient() {
653670
Log.d(TAG, "stopDDPClient called from JS")
654-
stopDDPClientInternal()
655-
}
656-
657-
private fun stopDDPClientInternal() {
658-
isDdpLoggedIn = false
659-
ddpClient?.clearQueuedMethodCalls()
660-
ddpClient?.disconnect()
661-
ddpClient = null
671+
ddpRegistry.stopAllClients()
662672
}
663673
}
664674

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package chat.rocket.reactnative.voip
2+
3+
/**
4+
* Per-call DDP client slots: each [callId] maps to at most one [DDPClient] in production.
5+
* Isolates teardown so a busy-call reject (call B) does not disconnect call A's listener.
6+
*/
7+
internal class VoipPerCallDdpRegistry<T : Any>(
8+
private val releaseClient: (T) -> Unit
9+
) {
10+
private val lock = Any()
11+
private val clients = mutableMapOf<String, T>()
12+
private val loggedInCallIds = mutableSetOf<String>()
13+
14+
fun clientFor(callId: String): T? = synchronized(lock) { clients[callId] }
15+
16+
fun isLoggedIn(callId: String): Boolean = synchronized(lock) { loggedInCallIds.contains(callId) }
17+
18+
fun putClient(callId: String, client: T) {
19+
synchronized(lock) {
20+
clients.remove(callId)?.let(releaseClient)
21+
clients[callId] = client
22+
loggedInCallIds.remove(callId)
23+
}
24+
}
25+
26+
fun markLoggedIn(callId: String) {
27+
synchronized(lock) {
28+
loggedInCallIds.add(callId)
29+
}
30+
}
31+
32+
fun stopClient(callId: String) {
33+
synchronized(lock) {
34+
loggedInCallIds.remove(callId)
35+
clients.remove(callId)?.let(releaseClient)
36+
}
37+
}
38+
39+
fun stopAllClients() {
40+
synchronized(lock) {
41+
loggedInCallIds.clear()
42+
clients.values.forEach(releaseClient)
43+
clients.clear()
44+
}
45+
}
46+
47+
fun clientCount(): Int = synchronized(lock) { clients.size }
48+
49+
fun clientIds(): Set<String> = synchronized(lock) { clients.keys.toSet() }
50+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package chat.rocket.reactnative.voip
2+
3+
import org.junit.Assert.assertEquals
4+
import org.junit.Assert.assertFalse
5+
import org.junit.Assert.assertNull
6+
import org.junit.Assert.assertTrue
7+
import org.junit.Test
8+
9+
class VoipPerCallDdpRegistryTest {
10+
11+
private fun registry(): Pair<MutableList<String>, VoipPerCallDdpRegistry<String>> {
12+
val released = mutableListOf<String>()
13+
return released to VoipPerCallDdpRegistry { released.add(it) }
14+
}
15+
16+
@Test
17+
fun `stopClient removes only the named callId`() {
18+
val (released, reg) = registry()
19+
reg.putClient("callA", "clientA")
20+
reg.putClient("callB", "clientB")
21+
reg.stopClient("callA")
22+
assertEquals(listOf("clientA"), released)
23+
assertEquals(setOf("callB"), reg.clientIds())
24+
assertEquals("clientB", reg.clientFor("callB"))
25+
assertNull(reg.clientFor("callA"))
26+
}
27+
28+
@Test
29+
fun `stopAllClients disconnects every entry`() {
30+
val (released, reg) = registry()
31+
reg.putClient("callA", "clientA")
32+
reg.putClient("callB", "clientB")
33+
reg.stopAllClients()
34+
assertEquals(2, released.size)
35+
assertTrue(released.containsAll(listOf("clientA", "clientB")))
36+
assertEquals(0, reg.clientCount())
37+
assertTrue(reg.clientIds().isEmpty())
38+
}
39+
40+
@Test
41+
fun `starting a second listener for another callId does not release the first`() {
42+
val (released, reg) = registry()
43+
reg.putClient("callA", "clientA")
44+
reg.putClient("callB", "clientB")
45+
assertTrue(released.isEmpty())
46+
assertEquals(setOf("callA", "callB"), reg.clientIds())
47+
}
48+
49+
@Test
50+
fun `putClient for the same callId releases the previous client`() {
51+
val (released, reg) = registry()
52+
reg.putClient("callA", "first")
53+
reg.putClient("callA", "second")
54+
assertEquals(listOf("first"), released)
55+
assertEquals("second", reg.clientFor("callA"))
56+
}
57+
58+
@Test
59+
fun `loggedIn state is per callId`() {
60+
val (_, reg) = registry()
61+
reg.putClient("callA", "a")
62+
reg.putClient("callB", "b")
63+
assertFalse(reg.isLoggedIn("callA"))
64+
reg.markLoggedIn("callA")
65+
assertTrue(reg.isLoggedIn("callA"))
66+
assertFalse(reg.isLoggedIn("callB"))
67+
}
68+
69+
@Test
70+
fun `stopClient clears loggedIn for that callId`() {
71+
val (_, reg) = registry()
72+
reg.putClient("callA", "a")
73+
reg.markLoggedIn("callA")
74+
reg.stopClient("callA")
75+
assertFalse(reg.isLoggedIn("callA"))
76+
}
77+
}

0 commit comments

Comments
 (0)