Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import javax.net.ssl.TrustManager
* Created by pedro on 12/10/23.
*/
class RtmpStreamClient(
private val rtmpClient: RtmpClient,
private val rtmpClient: RtmpClient,
private val streamClientListener: StreamClientListener?
): StreamBaseClient() {

Expand Down Expand Up @@ -174,4 +174,8 @@ class RtmpStreamClient(
override fun setSocketType(type: SocketType) {
rtmpClient.socketType = type
}

fun setCustomAmfObject(amfObject: Map<String, Any>) {
rtmpClient.setCustomAmfObject(amfObject)
}
}
26 changes: 26 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/amf/v0/AmfObject.kt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,32 @@ open class AmfObject(private val properties: LinkedHashMap<AmfString, AmfData> =
bodySize += data.getSize() + 1
}

open fun setProperty(name: String, data: Any) {
val newValue: AmfData = when (data) {
is String -> AmfString(data)
is Int -> AmfNumber(data.toDouble())
is Long -> AmfNumber(data.toDouble())
is Double -> AmfNumber(data)
is Float -> AmfNumber(data.toDouble())
is Boolean -> AmfBoolean(data)
is AmfData -> data
else -> throw IllegalArgumentException(
"Unsupported value type: ${data::class.java.name}"
)
}

val existingEntry = properties.entries.find { it.key.value == name }

if (existingEntry != null) {
properties[existingEntry.key] = newValue
bodySize += newValue.getSize() - existingEntry.value.getSize()
} else {
val key = AmfString(name)
properties[key] = newValue
bodySize += key.getSize() + newValue.getSize() + 1
}
}

fun getProperties() = properties

@Throws(IOException::class)
Expand Down
1 change: 1 addition & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ abstract class CommandsManager {
var readChunkSize = RtmpConfig.DEFAULT_CHUNK_SIZE
var audioDisabled = false
var videoDisabled = false
var customAmfObject: Map<String, Any> = emptyMap()
private var bytesRead = 0
private var acknowledgementSequence = 0

Expand Down
21 changes: 19 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManagerAmf0.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ class CommandsManagerAmf0: CommandsManager() {
val connect = CommandAmf0("connect", ++commandId, getCurrentTimestamp(), streamId,
BasicHeader(ChunkType.TYPE_0, ChunkStreamId.OVER_CONNECTION.mark))
val connectInfo = AmfObject()
connectInfo.setProperty("app", appName + auth)

// Prefer custom app/tcUrl if provided, and always append auth to keep auth flow working
val customApp = customAmfObject["app"]?.toString()
val customTcUrl = customAmfObject["tcUrl"]?.toString()
val finalApp = (customApp ?: appName) + auth
val finalTcUrl = (customTcUrl ?: tcUrl) + auth

connectInfo.setProperty("app", finalApp)
connectInfo.setProperty("flashVer", flashVersion)
connectInfo.setProperty("tcUrl", tcUrl + auth)
connectInfo.setProperty("tcUrl", finalTcUrl)

if (!videoDisabled) {
if (videoCodec == VideoCodec.H265) {
val list = mutableListOf<AmfData>()
Expand All @@ -56,6 +64,15 @@ class CommandsManagerAmf0: CommandsManager() {
}
}
connectInfo.setProperty("objectEncoding", 0.0)

// Inject other custom AMF fields as-is; skip app/tcUrl since we've handled them with auth
customAmfObject.forEach { (key, value) ->
when (key) {
"app" -> connectInfo.setProperty("app", finalApp)
"tcUrl" -> connectInfo.setProperty("tcUrl", finalTcUrl)
else -> connectInfo.setProperty(key, value)
}
}
connect.addData(connectInfo)

connect.writeHeader(socket)
Expand Down
30 changes: 25 additions & 5 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
}
}

fun setCustomAmfObject(amfObject: Map<String, Any>) {
commandsManager.customAmfObject = amfObject
}

fun setAuthorization(user: String?, password: String?) {
commandsManager.setAuth(user, password)
}
Expand Down Expand Up @@ -248,12 +252,28 @@ class RtmpClient(private val connectChecker: ConnectChecker) {

tunneled = urlParser.scheme.startsWith("rtmpt")
tlsEnabled = urlParser.scheme.endsWith("s")
commandsManager.host = urlParser.host
val defaultPort = if (tlsEnabled) 443 else if (tunneled) 80 else 1935
commandsManager.port = urlParser.port ?: defaultPort
commandsManager.appName = urlParser.getAppName()
commandsManager.streamName = urlParser.getStreamName()
commandsManager.tcUrl = urlParser.getTcUrl()

// Prefer values from customAmfObject; fallback to URL parsed values
val co = commandsManager.customAmfObject
commandsManager.host = (co["host"] as? String) ?: urlParser.host
commandsManager.port = (co["port"] as? Int) ?: (urlParser.port ?: defaultPort)
commandsManager.appName = (co["app"] as? String) ?: urlParser.getAppName()
commandsManager.streamName = (co["streamName"] as? String) ?: urlParser.getStreamName()
commandsManager.tcUrl = (co["tcUrl"] as? String) ?: urlParser.getTcUrl()

// Print connection summary for visibility
runCatching {
val used = mapOf(
"host" to (co["host"] != null),
"port" to (co["port"] != null),
"app" to (co["app"] != null),
"streamName" to (co["streamName"] != null),
"tcUrl" to (co["tcUrl"] != null)
)
Log.i(TAG, "connect config => host=${commandsManager.host}, port=${commandsManager.port}, app=${commandsManager.appName}, stream=${commandsManager.streamName}, tcUrl=${commandsManager.tcUrl}, tls=$tlsEnabled, tunneled=$tunneled, fromCustom=$used")
}

if (commandsManager.appName.isEmpty()) {
isStreaming = false
onMainThread {
Expand Down