Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@ import io.github.davidepianca98.MQTTClient
import io.github.davidepianca98.mqtt.MQTTVersion
import io.github.davidepianca98.mqtt.Subscription
import io.github.davidepianca98.mqtt.packets.Qos
import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode
import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
import io.github.davidepianca98.socket.tls.TLSClientSettings
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kotlinx.coroutines.IO is imported but not used anywhere in this file. This will typically fail ktlint/IDE checks for unused imports; please remove it (or use it consistently instead of Dispatchers.IO).

Suggested change
import kotlinx.coroutines.IO

Copilot uses AI. Check for mistakes.
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import kotlinx.serialization.SerializationException
import kotlinx.serialization.json.Json
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.model.MqttJsonPayload
Expand All @@ -46,7 +57,6 @@ import org.meshtastic.proto.MqttClientProxyMessage
class MQTTRepositoryImpl(
private val radioConfigRepository: RadioConfigRepository,
private val nodeRepository: NodeRepository,
dispatchers: org.meshtastic.core.di.CoroutineDispatchers,
) : MQTTRepository {

companion object {
Expand All @@ -58,9 +68,11 @@ class MQTTRepositoryImpl(

private var client: MQTTClient? = null
private val json = Json { ignoreUnknownKeys = true }
private val scope = CoroutineScope(dispatchers.default + SupervisorJob())
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private var clientJob: Job? = null
Comment on lines 57 to 72
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This repository now hardcodes Dispatchers.Default/Dispatchers.IO and no longer accepts the project’s injected CoroutineDispatchers, which is used widely to keep coroutine contexts testable and consistent (e.g., NetworkRepositoryImpl uses dispatchers.io/dispatchers.default). Consider reintroducing CoroutineDispatchers injection here and using it for scope and the reconnect loop dispatcher.

Copilot uses AI. Check for mistakes.
private val publishSemaphore = Semaphore(20)
private val reconnectMutex = Mutex()
private var reconnectAttempt = 0
Comment on lines 69 to +75
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client is written from the reconnect loop (IO dispatcher) and read from publish() (Default dispatcher) without any visibility/atomicity guarantees. This can lead to stale reads or racey behavior across threads; consider making it @Volatile, guarding access with a mutex, or using an atomic (kotlinx.atomicfu.atomic) and keeping all access on a single dispatcher.

Copilot uses AI. Check for mistakes.

override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
Expand All @@ -70,7 +82,8 @@ class MQTTRepositoryImpl(
}

@OptIn(ExperimentalUnsignedTypes::class)
override val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
@Suppress("LongMethod", "CyclomaticComplexMethod")
private fun createProxyMessageFlow(): Flow<MqttClientProxyMessage> = callbackFlow {
val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}"
val channelSet = radioConfigRepository.channelSetFlow.first()
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
Expand All @@ -79,67 +92,11 @@ class MQTTRepositoryImpl(

val (host, port) =
(mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
@Suppress("MagicNumber")
it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883)
}

val newClient =
MQTTClient(
mqttVersion = MQTTVersion.MQTT5,
address = host,
port = port,
tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null,
userName = mqttConfig?.username,
password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
clientId = ownerId,
publishReceived = { packet ->
val topic = packet.topicName
val payload = packet.payload?.toByteArray()
Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" }

if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }

trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain))
} catch (e: kotlinx.serialization.SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
trySend(
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: okio.ByteString.EMPTY,
retained = packet.retain,
),
)
}
},
)

client = newClient

clientJob = scope.launch {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
newClient.runSuspend()
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT)" }
close(e)
} catch (e: io.github.davidepianca98.socket.IOException) {
Logger.e(e) { "MQTT Client loop error (IO)" }
close(e)
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
}
}

// Subscriptions
// Subscriptions (out of loop)
val subscriptions = mutableListOf<Subscription>()
channelSet.subscribeList.forEach { globalId ->
subscriptions.add(
Expand All @@ -153,25 +110,195 @@ class MQTTRepositoryImpl(
}
subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))

if (subscriptions.isNotEmpty()) {
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
newClient.subscribe(subscriptions)
}
// Using IO-dispatcher since we use blocking MQTTClient.run()
@Suppress("MagicNumber")
clientJob =
scope.launch(Dispatchers.IO) {
val baseDelay = 2_000L // Base backoff value
val maxDelay = 64_000L // Maximal backoff value

// Reconnection loop
while (isActive) {
val attempt =
reconnectMutex.withLock {
++reconnectAttempt // Don't really think we will ever get overflow here since it will take
// 4300 years
}

// Exponential backoff
val delayMs =
when {
attempt == 1 -> 0
attempt >= 7 -> maxDelay
else -> baseDelay * (1L shl (attempt - 2)) // Backoff 2→4→8→16→32→64 seconds
}

if (delayMs > 0) {
Logger.w { "MQTT reconnect #$attempt in ${delayMs / 1000}s" }
delay(delayMs)
}

// Creating client on each iteration
var newClient: MQTTClient? = null
@Suppress("TooGenericExceptionCaught")
try {
newClient =
MQTTClient(
mqttVersion = MQTTVersion.MQTT5,
address = host,
port = port,
tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null,
userName = mqttConfig?.username,
password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
clientId = ownerId,
onConnected = {
Logger.i { "MQTT connected" }
scope.launch {
// Reset backoff
reconnectMutex.withLock { reconnectAttempt = 0 }
}
},
onDisconnected = { Logger.w { "MQTT disconnected" } },
publishReceived = { packet ->
val topic = packet.topicName
val payload = packet.payload?.toByteArray()
Logger.d {
"MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)"
}

val result =
trySend(
(
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
MqttClientProxyMessage(
topic = topic,
text = jsonStr,
retained = packet.retain,
)
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: ByteString.EMPTY,
retained = packet.retain,
)
}
)
as MqttClientProxyMessage,
)
if (result.isFailure) {
Logger.w { "MQTT message dropped: flow channel closed" }
Comment on lines +169 to +199
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON branch catches SerializationException/IllegalArgumentException but doesn't return a MqttClientProxyMessage (the catch blocks evaluate to Unit). Because the surrounding code later casts the if expression to MqttClientProxyMessage, any JSON parse failure will crash with a ClassCastException instead of being safely ignored/logged.

Suggested change
val result =
trySend(
(
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
MqttClientProxyMessage(
topic = topic,
text = jsonStr,
retained = packet.retain,
)
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: ByteString.EMPTY,
retained = packet.retain,
)
}
)
as MqttClientProxyMessage,
)
if (result.isFailure) {
Logger.w { "MQTT message dropped: flow channel closed" }
val message: MqttClientProxyMessage? =
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
MqttClientProxyMessage(
topic = topic,
text = jsonStr,
retained = packet.retain,
)
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
null
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
null
}
} else {
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: ByteString.EMPTY,
retained = packet.retain,
)
}
if (message != null) {
val result = trySend(message)
if (result.isFailure) {
Logger.w { "MQTT message dropped: flow channel closed" }
}
} else {
Logger.w { "MQTT message ignored due to invalid JSON payload" }

Copilot uses AI. Check for mistakes.
Comment on lines +169 to +199
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forced cast as MqttClientProxyMessage is unsafe here because the preceding if (topic.contains("/json/")) { try { ... } catch { ... } } can evaluate to Unit on parse errors. Instead, build a nullable message (or return early) and only call trySend when a valid MqttClientProxyMessage is produced.

Suggested change
val result =
trySend(
(
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
MqttClientProxyMessage(
topic = topic,
text = jsonStr,
retained = packet.retain,
)
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: ByteString.EMPTY,
retained = packet.retain,
)
}
)
as MqttClientProxyMessage,
)
if (result.isFailure) {
Logger.w { "MQTT message dropped: flow channel closed" }
val message: MqttClientProxyMessage? =
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
MqttClientProxyMessage(
topic = topic,
text = jsonStr,
retained = packet.retain,
)
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
null
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
null
}
} else {
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: ByteString.EMPTY,
retained = packet.retain,
)
}
if (message != null) {
val result = trySend(message)
if (result.isFailure) {
Logger.w { "MQTT message dropped: flow channel closed" }
}
} else {
Logger.w { "MQTT message skipped: invalid JSON payload" }

Copilot uses AI. Check for mistakes.
}
},
)

if (subscriptions.isNotEmpty()) {
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
newClient.subscribe(subscriptions)
}

// Renew client for publish()
client = newClient

Logger.i { "MQTT run loop start ($host:$port)" }

// Note: run() is blocking. Cancellation via clientJob.cancel()
// will be processed when run() returns or if the library checks for interruption.
// Test with actual network disconnect to verify timely shutdown.
newClient.run() // Blocking

Logger.w { "MQTT run() exited normally — reconnecting" }
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT protocol error (attempt #$attempt)" }
// Continue loop
} catch (e: io.github.davidepianca98.socket.IOException) {
Logger.e(e) { "MQTT IO error (attempt #$attempt)" }
// Continue loop
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.i { "MQTT reconnect loop cancelled" }
throw e // Stop
} catch (e: Exception) {
Logger.e(e) { "MQTT unexpected error (attempt #$attempt): ${e::class.simpleName}" }
} finally {
// Cleanup
newClient?.let {
if (client === it) {
client = null
}
try {
newClient.disconnect(ReasonCode.SUCCESS) // Success?
} catch (_: Exception) {}
}
}
}
}

awaitClose { disconnect() }
}

/**
* Cold flow that creates MQTT client and manages connection lifecycle.
*
* Single collector requirement: This flow MUST be collected by exactly one subscriber. Multiple collectors will
* create duplicate MQTT clients with the same clientId, causing the broker to disconnect previous connections
* (standard MQTT behavior).
*
* Lifecycle fix (shareIn wrapper): Originally, this callbackFlow would close when the collector stopped (e.g., UI
* lifecycle changes, configuration changes, collector errors). This triggered awaitClose { disconnect() }, which
* canceled the reconnect loop entirely.
*
* Symptoms observed:
* - "MQTT message dropped: flow channel closed" logs
* - No reconnection attempts after initial connection loss
* - MQTT permanently dead until app restart
*
* Possible solution: Wrap with shareIn() to create a hot SharedFlow that:
* - Survives temporary collector loss (30s timeout)
* - Keeps reconnect loop running independently of UI/collector lifecycle
* - Only stops when the application process terminates
*/
@OptIn(ExperimentalUnsignedTypes::class)
override val proxyMessageFlow: Flow<MqttClientProxyMessage> =
createProxyMessageFlow()
.shareIn(
scope = CoroutineScope(SupervisorJob() + Dispatchers.IO),
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000),
replay = 0,
)
Comment on lines +264 to +276
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KDoc claims shareIn keeps the reconnect loop running independently of collector lifecycle and "only stops when the application process terminates", but SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000) will cancel the upstream flow (triggering awaitClose { disconnect() }) when there are no subscribers for 30s. If the intent is truly "infinite reconnect" even with zero collectors (e.g., app background), consider SharingStarted.Eagerly or using a process/app lifecycle scope with a start strategy that doesn't stop upstream on unsubscribe.

Copilot uses AI. Check for mistakes.

@OptIn(ExperimentalUnsignedTypes::class)
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
scope.launch {
val c = client

if (c == null || !c.isConnackReceived()) {
Logger.w { "MQTT not connected, dropping message" }
return@launch
}

publishSemaphore.withPermit {
client?.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
)
@Suppress("TooGenericExceptionCaught")
try {
c.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
) // Potential TOCTOU.
Logger.d { "MQTT publish succeeded" }
} catch (e: Exception) {
Logger.w(e) { "MQTT publish failed" }
}
}
}
}
Expand Down
Loading
Loading