Skip to content

Commit 47c812a

Browse files
committed
add get rtt and send shouldsendpings in rtmp
1 parent 6f0abf3 commit 47c812a

4 files changed

Lines changed: 53 additions & 0 deletions

File tree

common/src/main/java/com/pedro/common/TimeUtils.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ object TimeUtils {
3030
@JvmStatic
3131
fun getCurrentTimeMillis(): Long = SystemClock.elapsedRealtime()
3232

33+
@JvmStatic
34+
fun getCurrentTimeSeconds(): Int = (getCurrentTimeMillis() / 1000).toInt()
35+
3336
@JvmStatic
3437
fun getCurrentTimeNano(): Long {
3538
return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR1) {

library/src/main/java/com/pedro/library/util/streamclient/RtmpStreamClient.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ class RtmpStreamClient(
7878
rtmpClient.setWriteChunkSize(chunkSize)
7979
}
8080

81+
/**
82+
* RTT in micro seconds reported by ping-pong commands.
83+
* shouldSendPings must be enabled to work properly.
84+
*/
85+
fun getRtt() = rtmpClient.rtt
86+
87+
/**
88+
* Send ping commands each second to server.
89+
* This allow get a RTT and keep alive the read channel in servers that close it due to inactivity.
90+
*
91+
* Could be useful in combination with shouldFailOnRead to detect connection closed in few servers.
92+
*/
93+
fun shouldSendPings(enabled: Boolean) {
94+
rtmpClient.shouldSendPings(enabled)
95+
}
96+
8197
override fun reTry(delay: Long, reason: String, backupUrl: String?): Boolean {
8298
val result = rtmpClient.shouldRetry(reason)
8399
if (result) {

rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,16 @@ abstract class CommandsManager {
172172
}
173173
}
174174

175+
suspend fun sendPing(socket: RtmpSocket) {
176+
writeSync.withLock {
177+
val ping = UserControl(Type.PING_REQUEST, Event(TimeUtils.getCurrentTimeSeconds()))
178+
ping.writeHeader(socket)
179+
ping.writeBody(socket)
180+
socket.flush()
181+
Log.i(TAG, "send ping")
182+
}
183+
}
184+
175185
@Throws(IOException::class)
176186
suspend fun sendClose(socket: RtmpSocket) {
177187
writeSync.withLock {

rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import kotlinx.coroutines.withTimeoutOrNull
5959
import java.io.IOException
6060
import java.net.URISyntaxException
6161
import java.nio.ByteBuffer
62+
import java.util.concurrent.atomic.AtomicLong
6263
import javax.net.ssl.TrustManager
6364

6465
/**
@@ -109,6 +110,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
109110
var socketType = SocketType.KTOR
110111
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
111112
var shouldFailOnRead = false
113+
var shouldSendPings = false
114+
var rtt = 0 //in micro
115+
private set
116+
private val pingTs = AtomicLong(0)
112117

113118
/**
114119
* Add certificates for TLS connection
@@ -145,6 +150,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
145150
checkServerAlive = enabled
146151
}
147152

153+
fun shouldSendPings(enabled: Boolean) {
154+
shouldSendPings = enabled
155+
}
156+
148157
/**
149158
* Must be called before connect
150159
*/
@@ -286,6 +295,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
286295
//Handle all command received and send response for it.
287296
handleMessages()
288297
}
298+
if (shouldSendPings) commandsManager.sendPing(socket)
289299
//read packet because maybe server want send you something while streaming
290300
handleServerPackets()
291301
}.exceptionOrNull()
@@ -387,6 +397,19 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
387397
Type.PING_REQUEST -> {
388398
commandsManager.sendPong(userControl.event, socket)
389399
}
400+
Type.PONG_REPLY -> {
401+
Log.i(TAG, "pong received: ${userControl.event.data}")
402+
if (shouldSendPings) {
403+
rtt = (TimeUtils.getCurrentTimeMicro() - pingTs.get()).toInt()
404+
CoroutineScope(Dispatchers.IO).launch {
405+
delay(1000)
406+
if (isStreaming) {
407+
pingTs.set(TimeUtils.getCurrentTimeMicro())
408+
commandsManager.sendPing(socket)
409+
}
410+
}
411+
}
412+
}
390413
else -> {
391414
Log.i(TAG, "user control command $type ignored")
392415
}
@@ -563,6 +586,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
563586
scope = CoroutineScope(Dispatchers.IO)
564587
publishPermitted = false
565588
commandsManager.reset()
589+
rtt = 0
566590
}
567591

568592
fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {

0 commit comments

Comments
 (0)