Skip to content

Commit c8c8906

Browse files
committed
feat(android): add broadcast subscriptions and catalog playback
1 parent 8e220c2 commit c8c8906

16 files changed

Lines changed: 1169 additions & 446 deletions

File tree

android/moqkit/moqkit/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ android {
3131
sourceCompatibility = JavaVersion.VERSION_11
3232
targetCompatibility = JavaVersion.VERSION_11
3333
}
34+
35+
testOptions {
36+
unitTests.isReturnDefaultValues = true
37+
}
3438
}
3539

3640
mavenPublishing {
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
11
<?xml version="1.0" encoding="utf-8"?>
2-
<manifest xmlns:android="http://schemas.android.com/apk/res/android">
3-
4-
</manifest>
2+
<manifest xmlns:android="http://schemas.android.com/apk/res/android" />

android/moqkit/moqkit/src/main/java/com/swmansion/moqkit/Session.kt

Lines changed: 127 additions & 152 deletions
Large diffs are not rendered by default.

android/moqkit/moqkit/src/main/java/com/swmansion/moqkit/publish/source/MicrophoneCapture.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package com.swmansion.moqkit.publish.source
22

3+
import android.Manifest
34
import android.media.AudioFormat
45
import android.media.AudioRecord
56
import android.media.MediaRecorder
67
import android.os.SystemClock
78
import android.util.Log
9+
import androidx.annotation.RequiresPermission
810

911
private const val TAG = "MicrophoneCapture"
1012

13+
/**
14+
* Pulls PCM frames from the device microphone.
15+
*
16+
* Calling apps must both request and declare `RECORD_AUDIO` in their own manifest. The moqkit
17+
* library does not add that permission transitively.
18+
*/
1119
class MicrophoneCapture(
1220
private val sampleRate: Int = 48_000,
1321
private val channels: Int = 1,
@@ -19,6 +27,7 @@ class MicrophoneCapture(
1927
private var recordThread: Thread? = null
2028
@Volatile private var running = false
2129

30+
@RequiresPermission(Manifest.permission.RECORD_AUDIO)
2231
fun start() {
2332
val channelConfig = if (channels == 1) AudioFormat.CHANNEL_IN_MONO else AudioFormat.CHANNEL_IN_STEREO
2433
val minBufSize = AudioRecord.getMinBufferSize(

android/moqkit/moqkit/src/main/java/com/swmansion/moqkit/subscribe/AudioTrackInfo.kt

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
package com.swmansion.moqkit.subscribe
2+
3+
import android.util.Log
4+
import kotlinx.coroutines.CancellationException
5+
import kotlinx.coroutines.flow.Flow
6+
import kotlinx.coroutines.flow.flow
7+
import uniffi.moq.MoqAnnounced
8+
import uniffi.moq.MoqAudio
9+
import uniffi.moq.MoqBroadcastConsumer
10+
import uniffi.moq.MoqCatalog
11+
import uniffi.moq.MoqCatalogConsumer
12+
import uniffi.moq.MoqOriginConsumer
13+
import uniffi.moq.MoqVideo
14+
15+
private const val TAG = "Broadcast"
16+
17+
/**
18+
* A pair of pixel dimensions used to describe video resolution or display ratio.
19+
*/
20+
data class VideoSize(
21+
val width: UInt,
22+
val height: UInt,
23+
)
24+
25+
/**
26+
* Codec and format parameters for a video rendition.
27+
*/
28+
data class VideoTrackConfig(
29+
val codec: String,
30+
val coded: VideoSize?,
31+
val displayRatio: VideoSize?,
32+
val bitrate: ULong?,
33+
val framerate: Double?,
34+
)
35+
36+
/**
37+
* Codec and format parameters for an audio rendition.
38+
*/
39+
data class AudioTrackConfig(
40+
val codec: String,
41+
val sampleRate: UInt,
42+
val channelCount: UInt,
43+
val bitrate: ULong?,
44+
)
45+
46+
/** Base interface for a single media track within a broadcast. */
47+
interface TrackInfo {
48+
/** Track name as announced in the catalog (e.g. `"video/high"`, `"audio/main"`). */
49+
val name: String
50+
}
51+
52+
/**
53+
* A video track discovered from a broadcast catalog.
54+
*/
55+
data class VideoTrackInfo internal constructor(
56+
override val name: String,
57+
val config: VideoTrackConfig,
58+
internal val rawConfig: MoqVideo,
59+
) : TrackInfo
60+
61+
/**
62+
* An audio track discovered from a broadcast catalog.
63+
*/
64+
data class AudioTrackInfo internal constructor(
65+
override val name: String,
66+
val config: AudioTrackConfig,
67+
internal val rawConfig: MoqAudio,
68+
) : TrackInfo
69+
70+
/**
71+
* The latest playable track metadata for a single broadcast path.
72+
*/
73+
class Catalog internal constructor(
74+
val path: String,
75+
val videoTracks: List<VideoTrackInfo>,
76+
val audioTracks: List<AudioTrackInfo>,
77+
internal val owner: BroadcastOwner,
78+
) {
79+
internal constructor(path: String, catalog: MoqCatalog, owner: BroadcastOwner) : this(
80+
path = path,
81+
videoTracks = catalog.video.map { (name, rendition) ->
82+
VideoTrackInfo(name = name, config = rendition.toTrackConfig(), rawConfig = rendition)
83+
},
84+
audioTracks = catalog.audio.map { (name, rendition) ->
85+
AudioTrackInfo(name = name, config = rendition.toTrackConfig(), rawConfig = rendition)
86+
},
87+
owner = owner,
88+
)
89+
90+
internal fun retainBroadcastOwner(): BroadcastOwner = owner.retain()
91+
}
92+
93+
/**
94+
* A live broadcast announcement surfaced by a [BroadcastSubscription].
95+
*
96+
* Keep this broadcast open for as long as you need to create players or observe catalog updates.
97+
* A [Player] created from one of this broadcast's catalogs retains the underlying broadcast handle
98+
* until the player is closed.
99+
*/
100+
class Broadcast internal constructor(
101+
val path: String,
102+
private val owner: BroadcastOwner,
103+
) : AutoCloseable {
104+
private val lock = Any()
105+
private var closed = false
106+
107+
/**
108+
* Streams catalog updates for this broadcast until the catalog track ends.
109+
*/
110+
fun catalogs(): Flow<Catalog> = flow {
111+
val broadcast = owner.consumer()
112+
var catalogConsumer: MoqCatalogConsumer? = null
113+
try {
114+
catalogConsumer = broadcast.subscribeCatalog()
115+
while (true) {
116+
val catalog = catalogConsumer.next() ?: break
117+
emit(Catalog(path = path, catalog = catalog, owner = owner))
118+
}
119+
} catch (e: CancellationException) {
120+
throw e
121+
} catch (e: Exception) {
122+
if (!owner.isClosed()) {
123+
throw e
124+
}
125+
} finally {
126+
try {
127+
catalogConsumer?.cancel()
128+
} catch (_: Exception) {
129+
}
130+
try {
131+
catalogConsumer?.close()
132+
} catch (_: Exception) {
133+
}
134+
}
135+
}
136+
137+
override fun close() {
138+
val shouldRelease = synchronized(lock) {
139+
if (closed) {
140+
false
141+
} else {
142+
closed = true
143+
true
144+
}
145+
}
146+
if (shouldRelease) {
147+
owner.release()
148+
}
149+
}
150+
}
151+
152+
/**
153+
* A prefix-based subscription created by [com.swmansion.moqkit.Session.subscribe].
154+
*/
155+
class BroadcastSubscription internal constructor(
156+
val prefix: String,
157+
private var originConsumer: MoqOriginConsumer?,
158+
private var announced: MoqAnnounced?,
159+
private val onClosed: () -> Unit,
160+
) : AutoCloseable {
161+
private val lock = Any()
162+
private var closed = false
163+
private var collectionStarted = false
164+
165+
/**
166+
* Emits broadcasts announced under [prefix].
167+
*
168+
* A subscription supports a single active collector because it is backed by one UniFFI
169+
* announcement stream.
170+
*/
171+
val broadcasts: Flow<Broadcast> = flow {
172+
markCollectionStarted()
173+
174+
val currentAnnounced = synchronized(lock) { announced }
175+
?: throw IllegalStateException("Broadcast subscription for prefix '$prefix' is closed")
176+
177+
try {
178+
while (true) {
179+
val announcement = try {
180+
currentAnnounced.next()
181+
} catch (e: CancellationException) {
182+
throw e
183+
} catch (e: Exception) {
184+
if (isClosed) {
185+
break
186+
}
187+
throw e
188+
} ?: break
189+
val path: String
190+
val consumer: MoqBroadcastConsumer
191+
try {
192+
path = announcement.path()
193+
consumer = announcement.broadcast()
194+
} finally {
195+
try {
196+
announcement.close()
197+
} catch (_: Exception) {
198+
}
199+
}
200+
201+
val owner = BroadcastOwner(path = path, consumer = consumer)
202+
val broadcast = Broadcast(path = path, owner = owner)
203+
try {
204+
emit(broadcast)
205+
} catch (t: Throwable) {
206+
owner.release()
207+
throw t
208+
}
209+
}
210+
} finally {
211+
finish()
212+
}
213+
}
214+
215+
val isClosed: Boolean
216+
get() = synchronized(lock) { closed }
217+
218+
override fun close() {
219+
finish()
220+
}
221+
222+
private fun markCollectionStarted() {
223+
synchronized(lock) {
224+
check(!closed) { "Broadcast subscription for prefix '$prefix' is closed" }
225+
check(!collectionStarted) {
226+
"Broadcast subscription for prefix '$prefix' supports only a single collector"
227+
}
228+
collectionStarted = true
229+
}
230+
}
231+
232+
private fun finish() {
233+
val resources = synchronized(lock) {
234+
if (closed) {
235+
null
236+
} else {
237+
closed = true
238+
val currentAnnounced = announced
239+
announced = null
240+
val currentOriginConsumer = originConsumer
241+
originConsumer = null
242+
currentAnnounced to currentOriginConsumer
243+
}
244+
} ?: return
245+
246+
try {
247+
resources.first?.cancel()
248+
} catch (_: Exception) {
249+
}
250+
251+
try {
252+
resources.first?.close()
253+
} catch (_: Exception) {
254+
}
255+
256+
try {
257+
resources.second?.close()
258+
} catch (_: Exception) {
259+
}
260+
261+
onClosed()
262+
}
263+
}
264+
265+
internal class BroadcastOwner(
266+
private val path: String,
267+
consumer: MoqBroadcastConsumer,
268+
) {
269+
private val lock = Any()
270+
private var refCount = 1
271+
private var consumer: MoqBroadcastConsumer? = consumer
272+
273+
fun retain(): BroadcastOwner = synchronized(lock) {
274+
check(refCount > 0 && consumer != null) { "Broadcast '$path' is already closed" }
275+
refCount += 1
276+
this
277+
}
278+
279+
fun consumer(): MoqBroadcastConsumer = synchronized(lock) {
280+
consumer ?: throw IllegalStateException("Broadcast '$path' is already closed")
281+
}
282+
283+
fun isClosed(): Boolean = synchronized(lock) { consumer == null }
284+
285+
fun release() {
286+
val consumerToClose = synchronized(lock) {
287+
check(refCount > 0) { "Broadcast '$path' has already been released" }
288+
refCount -= 1
289+
if (refCount == 0) {
290+
consumer.also { consumer = null }
291+
} else {
292+
null
293+
}
294+
}
295+
296+
if (consumerToClose != null) {
297+
Log.d(TAG, "Closing broadcast '$path'")
298+
try {
299+
consumerToClose.close()
300+
} catch (e: Exception) {
301+
Log.w(TAG, "Failed to close broadcast '$path'", e)
302+
}
303+
}
304+
}
305+
}
306+
307+
private fun MoqVideo.toTrackConfig(): VideoTrackConfig = VideoTrackConfig(
308+
codec = codec,
309+
coded = coded?.let { VideoSize(width = it.width, height = it.height) },
310+
displayRatio = displayRatio?.let { VideoSize(width = it.width, height = it.height) },
311+
bitrate = bitrate,
312+
framerate = framerate,
313+
)
314+
315+
private fun MoqAudio.toTrackConfig(): AudioTrackConfig = AudioTrackConfig(
316+
codec = codec,
317+
sampleRate = sampleRate,
318+
channelCount = channelCount,
319+
bitrate = bitrate,
320+
)

0 commit comments

Comments
 (0)