From ec63967d736cc1243e67365039c1e1600f3be3dd Mon Sep 17 00:00:00 2001 From: theKorzh Date: Tue, 31 Mar 2026 22:02:28 +0300 Subject: [PATCH 1/6] added MQTT auto-reconnect with exponential backoff --- .../network/repository/MQTTRepositoryImpl.kt | 234 ++++++++++++------ 1 file changed, 164 insertions(+), 70 deletions(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index a429b90ae1..e88e699e61 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -21,19 +21,28 @@ import io.github.davidepianca98.MQTTClient import io.github.davidepianca98.mqtt.MQTTVersion import io.github.davidepianca98.mqtt.Subscription import io.github.davidepianca98.mqtt.packets.Qos +import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions import io.github.davidepianca98.socket.tls.TLSClientSettings import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withPermit +import kotlinx.serialization.SerializationException import kotlinx.serialization.json.Json +import okio.ByteString import okio.ByteString.Companion.toByteString import org.koin.core.annotation.Single import org.meshtastic.core.model.MqttJsonPayload @@ -46,7 +55,6 @@ import org.meshtastic.proto.MqttClientProxyMessage class MQTTRepositoryImpl( private val radioConfigRepository: RadioConfigRepository, private val nodeRepository: NodeRepository, - dispatchers: org.meshtastic.core.di.CoroutineDispatchers, ) : MQTTRepository { companion object { @@ -58,9 +66,11 @@ class MQTTRepositoryImpl( private var client: MQTTClient? = null private val json = Json { ignoreUnknownKeys = true } - private val scope = CoroutineScope(dispatchers.default + SupervisorJob()) + private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var clientJob: Job? = null private val publishSemaphore = Semaphore(20) + private val reconnectMutex = Mutex() + private var reconnectAttempt = 0 override fun disconnect() { Logger.i { "MQTT Disconnecting" } @@ -70,6 +80,10 @@ class MQTTRepositoryImpl( } @OptIn(ExperimentalUnsignedTypes::class) + /** + * Cold flow. MUST be collected by exactly one subscriber. Multiple collectors will create duplicate MQTT clients + * with same clientId, causing broker to disconnect previous connections. + */ override val proxyMessageFlow: Flow = callbackFlow { val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" val channelSet = radioConfigRepository.channelSetFlow.first() @@ -82,64 +96,7 @@ class MQTTRepositoryImpl( it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883) } - val newClient = - MQTTClient( - mqttVersion = MQTTVersion.MQTT5, - address = host, - port = port, - tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, - userName = mqttConfig?.username, - password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), - clientId = ownerId, - publishReceived = { packet -> - val topic = packet.topicName - val payload = packet.payload?.toByteArray() - Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" } - - if (topic.contains("/json/")) { - try { - val jsonStr = payload?.decodeToString() ?: "" - // Validate JSON by parsing it - json.decodeFromString(jsonStr) - Logger.d { "MQTT parsed JSON payload successfully" } - - trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain)) - } catch (e: kotlinx.serialization.SerializationException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } catch (e: IllegalArgumentException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } - } else { - trySend( - MqttClientProxyMessage( - topic = topic, - data_ = payload?.toByteString() ?: okio.ByteString.EMPTY, - retained = packet.retain, - ), - ) - } - }, - ) - - client = newClient - - clientJob = scope.launch { - try { - Logger.i { "MQTT Starting client loop for $host:$port" } - newClient.runSuspend() - } catch (e: io.github.davidepianca98.mqtt.MQTTException) { - Logger.e(e) { "MQTT Client loop error (MQTT)" } - close(e) - } catch (e: io.github.davidepianca98.socket.IOException) { - Logger.e(e) { "MQTT Client loop error (IO)" } - close(e) - } catch (e: kotlinx.coroutines.CancellationException) { - Logger.i { "MQTT Client loop cancelled" } - throw e - } - } - - // Subscriptions + // Subscriptions (out of loop) val subscriptions = mutableListOf() channelSet.subscribeList.forEach { globalId -> subscriptions.add( @@ -153,10 +110,135 @@ class MQTTRepositoryImpl( } subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) - if (subscriptions.isNotEmpty()) { - Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } - newClient.subscribe(subscriptions) - } + // Using IO-dispatcher since we use blocking MQTTClient.run() + clientJob = + scope.launch(Dispatchers.IO) { + val baseDelay = 2_000L // Base backoff value + val maxDelay = 64_000L // Maximal backoff value + + // Reconnection loop + while (isActive) { + val attempt = + reconnectMutex.withLock { + ++reconnectAttempt // Don't really think we will ever get overflow here since it will take + // 4300 years + } + + // Exponential backoff + val delayMs = + when { + attempt == 1 -> 0 + attempt >= 7 -> maxDelay + else -> baseDelay * (1L shl (attempt - 2)) // Backoff 2→4→8→16→32→64 seconds + } + + if (delayMs > 0) { + Logger.w { "MQTT reconnect #$attempt in ${delayMs / 1000}s" } + delay(delayMs) + } + + // Creating client on each iteration + var newClient: MQTTClient? = null + try { + newClient = + MQTTClient( + mqttVersion = MQTTVersion.MQTT5, + address = host, + port = port, + tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, + userName = mqttConfig?.username, + password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), + clientId = ownerId, + onConnected = { + Logger.i { "MQTT connected" } + scope.launch { + // Reset backoff + reconnectMutex.withLock { reconnectAttempt = 0 } + } + }, + onDisconnected = { Logger.w { "MQTT disconnected" } }, + publishReceived = { packet -> + val topic = packet.topicName + val payload = packet.payload?.toByteArray() + Logger.d { + "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" + } + + val result = + trySend( + ( + if (topic.contains("/json/")) { + try { + val jsonStr = payload?.decodeToString() ?: "" + // Validate JSON by parsing it + json.decodeFromString(jsonStr) + Logger.d { "MQTT parsed JSON payload successfully" } + MqttClientProxyMessage( + topic = topic, + text = jsonStr, + retained = packet.retain, + ) + } catch (e: SerializationException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } catch (e: IllegalArgumentException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } + } else { + MqttClientProxyMessage( + topic = topic, + data_ = payload?.toByteString() ?: ByteString.EMPTY, + retained = packet.retain, + ) + } + ) + as MqttClientProxyMessage, + ) + if (result.isFailure) { + Logger.w { "MQTT message dropped: flow channel closed" } + } + }, + ) + + if (subscriptions.isNotEmpty()) { + Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } + newClient.subscribe(subscriptions) + } + + // Renew client for publish() + client = newClient + + Logger.i { "MQTT run loop start ($host:$port)" } + + // Note: run() is blocking. Cancellation via clientJob.cancel() + // will be processed when run() returns or if the library checks for interruption. + // Test with actual network disconnect to verify timely shutdown. + newClient.run() // Blocking + + Logger.w { "MQTT run() exited normally — reconnecting" } + } catch (e: io.github.davidepianca98.mqtt.MQTTException) { + Logger.e(e) { "MQTT protocol error (attempt #$attempt)" } + // Continue loop + } catch (e: io.github.davidepianca98.socket.IOException) { + Logger.e(e) { "MQTT IO error (attempt #$attempt)" } + // Continue loop + } catch (e: kotlinx.coroutines.CancellationException) { + Logger.i { "MQTT reconnect loop cancelled" } + throw e // Stop + } catch (e: Exception) { + Logger.e(e) { "MQTT unexpected error (attempt #$attempt): ${e::class.simpleName}" } + } finally { + // Cleanup + newClient?.let { + if (client === it) { + client = null + } + try { + newClient.disconnect(ReasonCode.SUCCESS) // Success? + } catch (_: Exception) {} + } + } + } + } awaitClose { disconnect() } } @@ -165,13 +247,25 @@ class MQTTRepositoryImpl( override fun publish(topic: String, data: ByteArray, retained: Boolean) { Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } scope.launch { + val c = client + + if (c == null || !c.isConnackReceived()) { + Logger.w { "MQTT not connected, dropping message" } + return@launch + } + publishSemaphore.withPermit { - client?.publish( - retain = retained, - qos = Qos.AT_LEAST_ONCE, - topic = topic, - payload = data.toUByteArray(), - ) + try { + c.publish( + retain = retained, + qos = Qos.AT_LEAST_ONCE, + topic = topic, + payload = data.toUByteArray(), + ) // Potential TOCTOU. + Logger.d { "MQTT publish succeeded" } + } catch (e: Exception) { + Logger.w(e) { "MQTT publish failed" } + } } } } From a2e17ded7d26206a76bd4ec4b9435b1539cbb0d1 Mon Sep 17 00:00:00 2001 From: theKorzh Date: Tue, 31 Mar 2026 22:21:01 +0300 Subject: [PATCH 2/6] Detekt and Spotless warnings fix --- .../core/network/repository/MQTTRepositoryImpl.kt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index e88e699e61..536796529f 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -113,7 +113,11 @@ class MQTTRepositoryImpl( // Using IO-dispatcher since we use blocking MQTTClient.run() clientJob = scope.launch(Dispatchers.IO) { - val baseDelay = 2_000L // Base backoff value + @Suppress("MagicNumber") + val baseDelay = 2_000L + + // Base backoff value + @Suppress("MagicNumber") val maxDelay = 64_000L // Maximal backoff value // Reconnection loop @@ -125,6 +129,7 @@ class MQTTRepositoryImpl( } // Exponential backoff + @Suppress("MagicNumber") val delayMs = when { attempt == 1 -> 0 @@ -139,6 +144,7 @@ class MQTTRepositoryImpl( // Creating client on each iteration var newClient: MQTTClient? = null + @Suppress("TooGenericExceptionCaught") try { newClient = MQTTClient( @@ -255,6 +261,7 @@ class MQTTRepositoryImpl( } publishSemaphore.withPermit { + @Suppress("TooGenericExceptionCaught") try { c.publish( retain = retained, From 94f8ca4d9cd724a59be37b44962d2928dc266da5 Mon Sep 17 00:00:00 2001 From: theKorzh Date: Wed, 1 Apr 2026 03:01:42 +0300 Subject: [PATCH 3/6] Wrap MQTT proxyMessageFlow with shareIn to prevent lifecycle-related disconnects. Prevents callbackFlow from closing when collector stops (UI changes, errors, etc.). --- .../network/repository/MQTTRepositoryImpl.kt | 49 ++++++++++++++----- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index 536796529f..8d9edb4d40 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -32,8 +32,10 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -80,11 +82,7 @@ class MQTTRepositoryImpl( } @OptIn(ExperimentalUnsignedTypes::class) - /** - * Cold flow. MUST be collected by exactly one subscriber. Multiple collectors will create duplicate MQTT clients - * with same clientId, causing broker to disconnect previous connections. - */ - override val proxyMessageFlow: Flow = callbackFlow { + private fun createProxyMessageFlow(): Flow = callbackFlow { val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" val channelSet = radioConfigRepository.channelSetFlow.first() val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt @@ -113,11 +111,7 @@ class MQTTRepositoryImpl( // Using IO-dispatcher since we use blocking MQTTClient.run() clientJob = scope.launch(Dispatchers.IO) { - @Suppress("MagicNumber") - val baseDelay = 2_000L - - // Base backoff value - @Suppress("MagicNumber") + val baseDelay = 2_000L // Base backoff value val maxDelay = 64_000L // Maximal backoff value // Reconnection loop @@ -129,7 +123,6 @@ class MQTTRepositoryImpl( } // Exponential backoff - @Suppress("MagicNumber") val delayMs = when { attempt == 1 -> 0 @@ -144,7 +137,6 @@ class MQTTRepositoryImpl( // Creating client on each iteration var newClient: MQTTClient? = null - @Suppress("TooGenericExceptionCaught") try { newClient = MQTTClient( @@ -249,6 +241,38 @@ class MQTTRepositoryImpl( awaitClose { disconnect() } } + /** + * Cold flow that creates MQTT client and manages connection lifecycle. + * + * Single collector requirement: + * This flow MUST be collected by exactly one subscriber. Multiple collectors + * will create duplicate MQTT clients with the same clientId, causing the broker + * to disconnect previous connections (standard MQTT behavior). + * + * Lifecycle fix (shareIn wrapper): + * Originally, this callbackFlow would close when the collector stopped (e.g., UI + * lifecycle changes, configuration changes, collector errors). This triggered + * awaitClose { disconnect() }, which canceled the reconnect loop entirely. + * + * Symptoms observed: + * - "MQTT message dropped: flow channel closed" logs + * - No reconnection attempts after initial connection loss + * - MQTT permanently dead until app restart + * + * Possible solution: Wrap with shareIn() to create a hot SharedFlow that: + * - Survives temporary collector loss (30s timeout) + * - Keeps reconnect loop running independently of UI/collector lifecycle + * - Only stops when the application process terminates + */ + @OptIn(ExperimentalUnsignedTypes::class) + override val proxyMessageFlow: Flow = + createProxyMessageFlow() + .shareIn( + scope = CoroutineScope(SupervisorJob() + Dispatchers.IO), + started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000), + replay = 0 + ) + @OptIn(ExperimentalUnsignedTypes::class) override fun publish(topic: String, data: ByteArray, retained: Boolean) { Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } @@ -261,7 +285,6 @@ class MQTTRepositoryImpl( } publishSemaphore.withPermit { - @Suppress("TooGenericExceptionCaught") try { c.publish( retain = retained, From dd666aa640b7ef9d01037ff88e4c4f89346e83ec Mon Sep 17 00:00:00 2001 From: theKorzh Date: Wed, 1 Apr 2026 04:07:36 +0300 Subject: [PATCH 4/6] Spotless and detekt fixes --- .../network/repository/MQTTRepositoryImpl.kt | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index 8d9edb4d40..fbd6f60c43 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -82,6 +82,7 @@ class MQTTRepositoryImpl( } @OptIn(ExperimentalUnsignedTypes::class) + @Suppress("LongMethod", "CyclomaticComplexMethod") private fun createProxyMessageFlow(): Flow = callbackFlow { val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" val channelSet = radioConfigRepository.channelSetFlow.first() @@ -91,6 +92,7 @@ class MQTTRepositoryImpl( val (host, port) = (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let { + @Suppress("MagicNumber") it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883) } @@ -109,6 +111,7 @@ class MQTTRepositoryImpl( subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) // Using IO-dispatcher since we use blocking MQTTClient.run() + @Suppress("MagicNumber") clientJob = scope.launch(Dispatchers.IO) { val baseDelay = 2_000L // Base backoff value @@ -116,11 +119,10 @@ class MQTTRepositoryImpl( // Reconnection loop while (isActive) { - val attempt = - reconnectMutex.withLock { - ++reconnectAttempt // Don't really think we will ever get overflow here since it will take - // 4300 years - } + val attempt = reconnectMutex.withLock { + ++reconnectAttempt // Don't really think we will ever get overflow here since it will take + // 4300 years + } // Exponential backoff val delayMs = @@ -137,6 +139,7 @@ class MQTTRepositoryImpl( // Creating client on each iteration var newClient: MQTTClient? = null + @Suppress("TooGenericExceptionCaught") try { newClient = MQTTClient( @@ -244,15 +247,13 @@ class MQTTRepositoryImpl( /** * Cold flow that creates MQTT client and manages connection lifecycle. * - * Single collector requirement: - * This flow MUST be collected by exactly one subscriber. Multiple collectors - * will create duplicate MQTT clients with the same clientId, causing the broker - * to disconnect previous connections (standard MQTT behavior). + * Single collector requirement: This flow MUST be collected by exactly one subscriber. Multiple collectors will + * create duplicate MQTT clients with the same clientId, causing the broker to disconnect previous connections + * (standard MQTT behavior). * - * Lifecycle fix (shareIn wrapper): - * Originally, this callbackFlow would close when the collector stopped (e.g., UI - * lifecycle changes, configuration changes, collector errors). This triggered - * awaitClose { disconnect() }, which canceled the reconnect loop entirely. + * Lifecycle fix (shareIn wrapper): Originally, this callbackFlow would close when the collector stopped (e.g., UI + * lifecycle changes, configuration changes, collector errors). This triggered awaitClose { disconnect() }, which + * canceled the reconnect loop entirely. * * Symptoms observed: * - "MQTT message dropped: flow channel closed" logs @@ -270,7 +271,7 @@ class MQTTRepositoryImpl( .shareIn( scope = CoroutineScope(SupervisorJob() + Dispatchers.IO), started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000), - replay = 0 + replay = 0, ) @OptIn(ExperimentalUnsignedTypes::class) @@ -285,6 +286,7 @@ class MQTTRepositoryImpl( } publishSemaphore.withPermit { + @Suppress("TooGenericExceptionCaught") try { c.publish( retain = retained, From 0639ae0c97b0b7bd3d02981bb7bfaa0a51d5ea0e Mon Sep 17 00:00:00 2001 From: theKorzh Date: Sat, 4 Apr 2026 15:18:23 +0300 Subject: [PATCH 5/6] test(mqtt): add unit tests for MQTTRepositoryImpl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover constructor, disconnect(), publish() drop when client is null, address parsing with TLS fallback port, and exponential backoff logic. Reconnect loop and publishReceived callbacks require an integration test with an embedded MQTT broker (e.g. Moquette) as MQTTClient.run() is a blocking call that needs a real TCP connection — out of scope for unit tests. --- .../repository/MQTTRepositoryImplTest.kt | 368 +++++++++++++++++- 1 file changed, 352 insertions(+), 16 deletions(-) diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt index 73e096da9b..6960542a6b 100644 --- a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt @@ -16,34 +16,362 @@ */ package org.meshtastic.core.network.repository +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.flowOf import kotlinx.serialization.json.Json import org.meshtastic.core.model.MqttJsonPayload +import org.meshtastic.core.model.MyNodeInfo +import org.meshtastic.core.model.Node +import org.meshtastic.core.model.NodeSortOption +import org.meshtastic.core.repository.NodeRepository +import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.proto.Channel +import org.meshtastic.proto.ChannelSet +import org.meshtastic.proto.ChannelSettings +import org.meshtastic.proto.Config +import org.meshtastic.proto.DeviceMetadata +import org.meshtastic.proto.DeviceProfile +import org.meshtastic.proto.DeviceUIConfig +import org.meshtastic.proto.FileInfo +import org.meshtastic.proto.LocalConfig +import org.meshtastic.proto.LocalModuleConfig +import org.meshtastic.proto.LocalStats +import org.meshtastic.proto.ModuleConfig +import org.meshtastic.proto.User import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNotNull import kotlin.test.assertTrue +// ── Fake dependencies ───────────────────────────────────────────────────────── + +private class FakeRadioConfigRepository( + mqttAddress: String = "mqtt.meshtastic.org", + tlsEnabled: Boolean = false, + jsonEnabled: Boolean = false, + channelSettings: List = emptyList(), +) : RadioConfigRepository { + + override val channelSetFlow: Flow = flowOf(ChannelSet(settings = channelSettings)) + + override val moduleConfigFlow: Flow = + flowOf( + LocalModuleConfig( + mqtt = + ModuleConfig.MQTTConfig( + enabled = true, + address = mqttAddress, + tls_enabled = tlsEnabled, + json_enabled = jsonEnabled, + ), + ), + ) + + override val localConfigFlow: Flow = flowOf(LocalConfig()) + override val deviceProfileFlow: Flow = flowOf(DeviceProfile()) + override val deviceUIConfigFlow: Flow = flowOf(null) + override val fileManifestFlow: Flow> = flowOf(emptyList()) + + override suspend fun clearChannelSet() {} + + override suspend fun replaceAllSettings(settingsList: List) {} + + override suspend fun updateChannelSettings(channel: Channel) {} + + override suspend fun clearLocalConfig() {} + + override suspend fun setLocalConfig(config: Config) {} + + override suspend fun clearLocalModuleConfig() {} + + override suspend fun setLocalModuleConfig(config: ModuleConfig) {} + + override suspend fun setDeviceUIConfig(config: DeviceUIConfig) {} + + override suspend fun clearDeviceUIConfig() {} + + override suspend fun addFileInfo(info: FileInfo) {} + + override suspend fun clearFileManifest() {} +} + +private class FakeNodeRepository(nodeId: String? = "!aabbccdd") : NodeRepository { + + override val myId: StateFlow = MutableStateFlow(nodeId) + override val myNodeInfo: StateFlow = MutableStateFlow(null) + override val ourNodeInfo: StateFlow = MutableStateFlow(null) + override val localStats: StateFlow = MutableStateFlow(LocalStats()) + override val nodeDBbyNum: StateFlow> = MutableStateFlow(emptyMap()) + override val onlineNodeCount: Flow = flowOf(0) + override val totalNodeCount: Flow = flowOf(0) + + override fun updateLocalStats(stats: LocalStats) {} + + override fun effectiveLogNodeId(nodeNum: Int): Flow = flowOf(nodeNum) + + override fun getNode(userId: String): Node = Node(0) + + override fun getUser(nodeNum: Int): User = User() + + override fun getUser(userId: String): User = User() + + override fun getNodes( + sort: NodeSortOption, + filter: String, + includeUnknown: Boolean, + onlyOnline: Boolean, + onlyDirect: Boolean, + ): Flow> = flowOf(emptyList()) + + override suspend fun getNodesOlderThan(lastHeard: Int): List = emptyList() + + override suspend fun getUnknownNodes(): List = emptyList() + + override suspend fun clearNodeDB(preserveFavorites: Boolean) {} + + override suspend fun clearMyNodeInfo() {} + + override suspend fun deleteNode(num: Int) {} + + override suspend fun deleteNodes(nodeNums: List) {} + + override suspend fun setNodeNotes(num: Int, notes: String) {} + + override suspend fun upsert(node: Node) {} + + override suspend fun installConfig(mi: MyNodeInfo, nodes: List) {} + + override suspend fun insertMetadata(nodeNum: Int, metadata: DeviceMetadata) {} +} + +// ── Helper functions (mirror of production logic) ───────────────────────────── + +private fun parseAddress(address: String, tlsEnabled: Boolean = false): Pair = + address.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (tlsEnabled) 8883 else 1883) } + +private fun backoffDelayMs(attempt: Int): Long { + val baseDelay = 2_000L + val maxDelay = 64_000L + return when { + attempt == 1 -> 0L + attempt >= 7 -> maxDelay + else -> baseDelay * (1L shl (attempt - 2)) + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + class MQTTRepositoryImplTest { + // ── Instance creation (covers constructor and field initialization) ──────────── + + @Test + fun `repository can be instantiated with default config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `proxyMessageFlow is not null after instantiation`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo.proxyMessageFlow) + } + + @Test + fun `disconnect does not throw when called before connect`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.disconnect() // should not throw an exception + } + + @Test + fun `disconnect can be called multiple times without error`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.disconnect() + repo.disconnect() + repo.disconnect() + } + @Test - fun `test address parsing logic`() { - val address1 = "mqtt.example.com:1883" - val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } - assertEquals("mqtt.example.com", host1) - assertEquals(1883, port1) + fun `publish does not throw when client is null`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + // client == null immediately after creation without connecting + repo.publish("msh/test", byteArrayOf(1, 2, 3), retained = false) + // test passes if no crash + } - val address2 = "mqtt.example.com" - val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } - assertEquals("mqtt.example.com", host2) - assertEquals(1883, port2) + @Test + fun `publish does not throw with retained flag`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.publish("msh/test", byteArrayOf(0x08, 0x01), retained = true) } @Test - fun `test json payload parsing`() { + fun `publish does not throw with empty payload`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.publish("msh/test", ByteArray(0), retained = false) + } + + @Test + fun `repository works with null nodeId`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(nodeId = null), + ) + assertNotNull(repo) + repo.disconnect() + } + + @Test + fun `repository works with TLS config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = + FakeRadioConfigRepository(mqttAddress = "broker.example.com", tlsEnabled = true), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `repository works with json enabled config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(jsonEnabled = true), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `repository works with channel settings`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = + FakeRadioConfigRepository( + channelSettings = listOf(ChannelSettings(name = "LongFast", downlink_enabled = true)), + ), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + // ── Address parsing ─────────────────────────────────────────────────────────── + + @Test + fun `address with explicit port is parsed correctly`() { + val (host, port) = parseAddress("mqtt.example.com:1883") + assertEquals("mqtt.example.com", host) + assertEquals(1883, port) + } + + @Test + fun `address without port uses default plain port 1883`() { + val (host, port) = parseAddress("mqtt.example.com", tlsEnabled = false) + assertEquals("mqtt.example.com", host) + assertEquals(1883, port) + } + + @Test + fun `address without port uses default TLS port 8883`() { + val (_, port) = parseAddress("broker.local", tlsEnabled = true) + assertEquals(8883, port) + } + + @Test + fun `explicit port overrides TLS default`() { + val (host, port) = parseAddress("broker.local:9999", tlsEnabled = true) + assertEquals("broker.local", host) + assertEquals(9999, port) + } + + // ── Exponential backoff ─────────────────────────────────────────────────────── + + @Test + fun `backoff attempt 1 is immediate`() { + assertEquals(0L, backoffDelayMs(1)) + } + + @Test + fun `backoff attempt 2 is 2 seconds`() { + assertEquals(2_000L, backoffDelayMs(2)) + } + + @Test + fun `backoff attempt 3 is 4 seconds`() { + assertEquals(4_000L, backoffDelayMs(3)) + } + + @Test + fun `backoff attempt 4 is 8 seconds`() { + assertEquals(8_000L, backoffDelayMs(4)) + } + + @Test + fun `backoff attempt 5 is 16 seconds`() { + assertEquals(16_000L, backoffDelayMs(5)) + } + + @Test + fun `backoff attempt 6 is 32 seconds`() { + assertEquals(32_000L, backoffDelayMs(6)) + } + + @Test + fun `backoff is capped at 64 seconds from attempt 7`() { + val cap = 64_000L + assertEquals(cap, backoffDelayMs(7)) + assertEquals(cap, backoffDelayMs(8)) + assertEquals(cap, backoffDelayMs(100)) + } + + @Test + fun `backoff sequence is non-decreasing`() { + val delays = (1..10).map { backoffDelayMs(it) } + for (i in 0 until delays.size - 1) { + assertTrue(delays[i] <= delays[i + 1], "backoff[${i + 1}]=${delays[i + 1]} < backoff[$i]=${delays[i]}") + } + } + + // ── JSON serialization / deserialization ────────────────────────────────────── + + private val json = Json { ignoreUnknownKeys = true } + + @Test + fun `json payload is parsed correctly`() { val jsonStr = """{"type":"text","from":12345678,"to":4294967295,"payload":"Hello World","hop_limit":3,"id":123,"time":1600000000}""" - val json = Json { ignoreUnknownKeys = true } val payload = json.decodeFromString(jsonStr) - assertEquals("text", payload.type) assertEquals(12345678L, payload.from) assertEquals(4294967295L, payload.to) @@ -54,7 +382,7 @@ class MQTTRepositoryImplTest { } @Test - fun `test json payload serialization`() { + fun `json payload is serialized correctly`() { val payload = MqttJsonPayload( type = "text", @@ -65,11 +393,19 @@ class MQTTRepositoryImplTest { id = 123, time = 1600000000, ) - val json = Json { ignoreUnknownKeys = true } - val jsonStr = json.encodeToString(MqttJsonPayload.serializer(), payload) - + val jsonStr = json.encodeToString(payload) assertTrue(jsonStr.contains("\"type\":\"text\"")) assertTrue(jsonStr.contains("\"from\":12345678")) assertTrue(jsonStr.contains("\"payload\":\"Hello World\"")) } + + @Test + fun `json payload with optional fields null is valid`() { + val jsonStr = """{"type":"position","from":999}""" + val payload = json.decodeFromString(jsonStr) + assertEquals("position", payload.type) + assertEquals(999L, payload.from) + assertEquals(null, payload.to) + assertEquals(null, payload.payload) + } } From 54f6ff3ab4f80af8f3287cc1825f70c604122a62 Mon Sep 17 00:00:00 2001 From: theKorzh Date: Sat, 4 Apr 2026 15:42:25 +0300 Subject: [PATCH 6/6] Apply spotless formatting --- .../core/network/repository/MQTTRepositoryImpl.kt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index fbd6f60c43..f3b7addb97 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -119,10 +119,11 @@ class MQTTRepositoryImpl( // Reconnection loop while (isActive) { - val attempt = reconnectMutex.withLock { - ++reconnectAttempt // Don't really think we will ever get overflow here since it will take - // 4300 years - } + val attempt = + reconnectMutex.withLock { + ++reconnectAttempt // Don't really think we will ever get overflow here since it will take + // 4300 years + } // Exponential backoff val delayMs =