diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index a429b90ae1..f3b7addb97 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -21,19 +21,30 @@ import io.github.davidepianca98.MQTTClient import io.github.davidepianca98.mqtt.MQTTVersion import io.github.davidepianca98.mqtt.Subscription import io.github.davidepianca98.mqtt.packets.Qos +import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions import io.github.davidepianca98.socket.tls.TLSClientSettings import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withPermit +import kotlinx.serialization.SerializationException import kotlinx.serialization.json.Json +import okio.ByteString import okio.ByteString.Companion.toByteString import org.koin.core.annotation.Single import org.meshtastic.core.model.MqttJsonPayload @@ -46,7 +57,6 @@ import org.meshtastic.proto.MqttClientProxyMessage class MQTTRepositoryImpl( private val radioConfigRepository: RadioConfigRepository, private val nodeRepository: NodeRepository, - dispatchers: org.meshtastic.core.di.CoroutineDispatchers, ) : MQTTRepository { companion object { @@ -58,9 +68,11 @@ class MQTTRepositoryImpl( private var client: MQTTClient? = null private val json = Json { ignoreUnknownKeys = true } - private val scope = CoroutineScope(dispatchers.default + SupervisorJob()) + private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var clientJob: Job? = null private val publishSemaphore = Semaphore(20) + private val reconnectMutex = Mutex() + private var reconnectAttempt = 0 override fun disconnect() { Logger.i { "MQTT Disconnecting" } @@ -70,7 +82,8 @@ class MQTTRepositoryImpl( } @OptIn(ExperimentalUnsignedTypes::class) - override val proxyMessageFlow: Flow = callbackFlow { + @Suppress("LongMethod", "CyclomaticComplexMethod") + private fun createProxyMessageFlow(): Flow = callbackFlow { val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}" val channelSet = radioConfigRepository.channelSetFlow.first() val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt @@ -79,67 +92,11 @@ class MQTTRepositoryImpl( val (host, port) = (mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let { + @Suppress("MagicNumber") it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883) } - val newClient = - MQTTClient( - mqttVersion = MQTTVersion.MQTT5, - address = host, - port = port, - tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, - userName = mqttConfig?.username, - password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), - clientId = ownerId, - publishReceived = { packet -> - val topic = packet.topicName - val payload = packet.payload?.toByteArray() - Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" } - - if (topic.contains("/json/")) { - try { - val jsonStr = payload?.decodeToString() ?: "" - // Validate JSON by parsing it - json.decodeFromString(jsonStr) - Logger.d { "MQTT parsed JSON payload successfully" } - - trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain)) - } catch (e: kotlinx.serialization.SerializationException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } catch (e: IllegalArgumentException) { - Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } - } - } else { - trySend( - MqttClientProxyMessage( - topic = topic, - data_ = payload?.toByteString() ?: okio.ByteString.EMPTY, - retained = packet.retain, - ), - ) - } - }, - ) - - client = newClient - - clientJob = scope.launch { - try { - Logger.i { "MQTT Starting client loop for $host:$port" } - newClient.runSuspend() - } catch (e: io.github.davidepianca98.mqtt.MQTTException) { - Logger.e(e) { "MQTT Client loop error (MQTT)" } - close(e) - } catch (e: io.github.davidepianca98.socket.IOException) { - Logger.e(e) { "MQTT Client loop error (IO)" } - close(e) - } catch (e: kotlinx.coroutines.CancellationException) { - Logger.i { "MQTT Client loop cancelled" } - throw e - } - } - - // Subscriptions + // Subscriptions (out of loop) val subscriptions = mutableListOf() channelSet.subscribeList.forEach { globalId -> subscriptions.add( @@ -153,25 +110,195 @@ class MQTTRepositoryImpl( } subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE))) - if (subscriptions.isNotEmpty()) { - Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } - newClient.subscribe(subscriptions) - } + // Using IO-dispatcher since we use blocking MQTTClient.run() + @Suppress("MagicNumber") + clientJob = + scope.launch(Dispatchers.IO) { + val baseDelay = 2_000L // Base backoff value + val maxDelay = 64_000L // Maximal backoff value + + // Reconnection loop + while (isActive) { + val attempt = + reconnectMutex.withLock { + ++reconnectAttempt // Don't really think we will ever get overflow here since it will take + // 4300 years + } + + // Exponential backoff + val delayMs = + when { + attempt == 1 -> 0 + attempt >= 7 -> maxDelay + else -> baseDelay * (1L shl (attempt - 2)) // Backoff 2→4→8→16→32→64 seconds + } + + if (delayMs > 0) { + Logger.w { "MQTT reconnect #$attempt in ${delayMs / 1000}s" } + delay(delayMs) + } + + // Creating client on each iteration + var newClient: MQTTClient? = null + @Suppress("TooGenericExceptionCaught") + try { + newClient = + MQTTClient( + mqttVersion = MQTTVersion.MQTT5, + address = host, + port = port, + tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null, + userName = mqttConfig?.username, + password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(), + clientId = ownerId, + onConnected = { + Logger.i { "MQTT connected" } + scope.launch { + // Reset backoff + reconnectMutex.withLock { reconnectAttempt = 0 } + } + }, + onDisconnected = { Logger.w { "MQTT disconnected" } }, + publishReceived = { packet -> + val topic = packet.topicName + val payload = packet.payload?.toByteArray() + Logger.d { + "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" + } + + val result = + trySend( + ( + if (topic.contains("/json/")) { + try { + val jsonStr = payload?.decodeToString() ?: "" + // Validate JSON by parsing it + json.decodeFromString(jsonStr) + Logger.d { "MQTT parsed JSON payload successfully" } + MqttClientProxyMessage( + topic = topic, + text = jsonStr, + retained = packet.retain, + ) + } catch (e: SerializationException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } catch (e: IllegalArgumentException) { + Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" } + } + } else { + MqttClientProxyMessage( + topic = topic, + data_ = payload?.toByteString() ?: ByteString.EMPTY, + retained = packet.retain, + ) + } + ) + as MqttClientProxyMessage, + ) + if (result.isFailure) { + Logger.w { "MQTT message dropped: flow channel closed" } + } + }, + ) + + if (subscriptions.isNotEmpty()) { + Logger.d { "MQTT subscribing to ${subscriptions.size} topics" } + newClient.subscribe(subscriptions) + } + + // Renew client for publish() + client = newClient + + Logger.i { "MQTT run loop start ($host:$port)" } + + // Note: run() is blocking. Cancellation via clientJob.cancel() + // will be processed when run() returns or if the library checks for interruption. + // Test with actual network disconnect to verify timely shutdown. + newClient.run() // Blocking + + Logger.w { "MQTT run() exited normally — reconnecting" } + } catch (e: io.github.davidepianca98.mqtt.MQTTException) { + Logger.e(e) { "MQTT protocol error (attempt #$attempt)" } + // Continue loop + } catch (e: io.github.davidepianca98.socket.IOException) { + Logger.e(e) { "MQTT IO error (attempt #$attempt)" } + // Continue loop + } catch (e: kotlinx.coroutines.CancellationException) { + Logger.i { "MQTT reconnect loop cancelled" } + throw e // Stop + } catch (e: Exception) { + Logger.e(e) { "MQTT unexpected error (attempt #$attempt): ${e::class.simpleName}" } + } finally { + // Cleanup + newClient?.let { + if (client === it) { + client = null + } + try { + newClient.disconnect(ReasonCode.SUCCESS) // Success? + } catch (_: Exception) {} + } + } + } + } awaitClose { disconnect() } } + /** + * Cold flow that creates MQTT client and manages connection lifecycle. + * + * Single collector requirement: This flow MUST be collected by exactly one subscriber. Multiple collectors will + * create duplicate MQTT clients with the same clientId, causing the broker to disconnect previous connections + * (standard MQTT behavior). + * + * Lifecycle fix (shareIn wrapper): Originally, this callbackFlow would close when the collector stopped (e.g., UI + * lifecycle changes, configuration changes, collector errors). This triggered awaitClose { disconnect() }, which + * canceled the reconnect loop entirely. + * + * Symptoms observed: + * - "MQTT message dropped: flow channel closed" logs + * - No reconnection attempts after initial connection loss + * - MQTT permanently dead until app restart + * + * Possible solution: Wrap with shareIn() to create a hot SharedFlow that: + * - Survives temporary collector loss (30s timeout) + * - Keeps reconnect loop running independently of UI/collector lifecycle + * - Only stops when the application process terminates + */ + @OptIn(ExperimentalUnsignedTypes::class) + override val proxyMessageFlow: Flow = + createProxyMessageFlow() + .shareIn( + scope = CoroutineScope(SupervisorJob() + Dispatchers.IO), + started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000), + replay = 0, + ) + @OptIn(ExperimentalUnsignedTypes::class) override fun publish(topic: String, data: ByteArray, retained: Boolean) { Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } scope.launch { + val c = client + + if (c == null || !c.isConnackReceived()) { + Logger.w { "MQTT not connected, dropping message" } + return@launch + } + publishSemaphore.withPermit { - client?.publish( - retain = retained, - qos = Qos.AT_LEAST_ONCE, - topic = topic, - payload = data.toUByteArray(), - ) + @Suppress("TooGenericExceptionCaught") + try { + c.publish( + retain = retained, + qos = Qos.AT_LEAST_ONCE, + topic = topic, + payload = data.toUByteArray(), + ) // Potential TOCTOU. + Logger.d { "MQTT publish succeeded" } + } catch (e: Exception) { + Logger.w(e) { "MQTT publish failed" } + } } } } diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt index 73e096da9b..6960542a6b 100644 --- a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt @@ -16,34 +16,362 @@ */ package org.meshtastic.core.network.repository +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.flowOf import kotlinx.serialization.json.Json import org.meshtastic.core.model.MqttJsonPayload +import org.meshtastic.core.model.MyNodeInfo +import org.meshtastic.core.model.Node +import org.meshtastic.core.model.NodeSortOption +import org.meshtastic.core.repository.NodeRepository +import org.meshtastic.core.repository.RadioConfigRepository +import org.meshtastic.proto.Channel +import org.meshtastic.proto.ChannelSet +import org.meshtastic.proto.ChannelSettings +import org.meshtastic.proto.Config +import org.meshtastic.proto.DeviceMetadata +import org.meshtastic.proto.DeviceProfile +import org.meshtastic.proto.DeviceUIConfig +import org.meshtastic.proto.FileInfo +import org.meshtastic.proto.LocalConfig +import org.meshtastic.proto.LocalModuleConfig +import org.meshtastic.proto.LocalStats +import org.meshtastic.proto.ModuleConfig +import org.meshtastic.proto.User import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNotNull import kotlin.test.assertTrue +// ── Fake dependencies ───────────────────────────────────────────────────────── + +private class FakeRadioConfigRepository( + mqttAddress: String = "mqtt.meshtastic.org", + tlsEnabled: Boolean = false, + jsonEnabled: Boolean = false, + channelSettings: List = emptyList(), +) : RadioConfigRepository { + + override val channelSetFlow: Flow = flowOf(ChannelSet(settings = channelSettings)) + + override val moduleConfigFlow: Flow = + flowOf( + LocalModuleConfig( + mqtt = + ModuleConfig.MQTTConfig( + enabled = true, + address = mqttAddress, + tls_enabled = tlsEnabled, + json_enabled = jsonEnabled, + ), + ), + ) + + override val localConfigFlow: Flow = flowOf(LocalConfig()) + override val deviceProfileFlow: Flow = flowOf(DeviceProfile()) + override val deviceUIConfigFlow: Flow = flowOf(null) + override val fileManifestFlow: Flow> = flowOf(emptyList()) + + override suspend fun clearChannelSet() {} + + override suspend fun replaceAllSettings(settingsList: List) {} + + override suspend fun updateChannelSettings(channel: Channel) {} + + override suspend fun clearLocalConfig() {} + + override suspend fun setLocalConfig(config: Config) {} + + override suspend fun clearLocalModuleConfig() {} + + override suspend fun setLocalModuleConfig(config: ModuleConfig) {} + + override suspend fun setDeviceUIConfig(config: DeviceUIConfig) {} + + override suspend fun clearDeviceUIConfig() {} + + override suspend fun addFileInfo(info: FileInfo) {} + + override suspend fun clearFileManifest() {} +} + +private class FakeNodeRepository(nodeId: String? = "!aabbccdd") : NodeRepository { + + override val myId: StateFlow = MutableStateFlow(nodeId) + override val myNodeInfo: StateFlow = MutableStateFlow(null) + override val ourNodeInfo: StateFlow = MutableStateFlow(null) + override val localStats: StateFlow = MutableStateFlow(LocalStats()) + override val nodeDBbyNum: StateFlow> = MutableStateFlow(emptyMap()) + override val onlineNodeCount: Flow = flowOf(0) + override val totalNodeCount: Flow = flowOf(0) + + override fun updateLocalStats(stats: LocalStats) {} + + override fun effectiveLogNodeId(nodeNum: Int): Flow = flowOf(nodeNum) + + override fun getNode(userId: String): Node = Node(0) + + override fun getUser(nodeNum: Int): User = User() + + override fun getUser(userId: String): User = User() + + override fun getNodes( + sort: NodeSortOption, + filter: String, + includeUnknown: Boolean, + onlyOnline: Boolean, + onlyDirect: Boolean, + ): Flow> = flowOf(emptyList()) + + override suspend fun getNodesOlderThan(lastHeard: Int): List = emptyList() + + override suspend fun getUnknownNodes(): List = emptyList() + + override suspend fun clearNodeDB(preserveFavorites: Boolean) {} + + override suspend fun clearMyNodeInfo() {} + + override suspend fun deleteNode(num: Int) {} + + override suspend fun deleteNodes(nodeNums: List) {} + + override suspend fun setNodeNotes(num: Int, notes: String) {} + + override suspend fun upsert(node: Node) {} + + override suspend fun installConfig(mi: MyNodeInfo, nodes: List) {} + + override suspend fun insertMetadata(nodeNum: Int, metadata: DeviceMetadata) {} +} + +// ── Helper functions (mirror of production logic) ───────────────────────────── + +private fun parseAddress(address: String, tlsEnabled: Boolean = false): Pair = + address.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (tlsEnabled) 8883 else 1883) } + +private fun backoffDelayMs(attempt: Int): Long { + val baseDelay = 2_000L + val maxDelay = 64_000L + return when { + attempt == 1 -> 0L + attempt >= 7 -> maxDelay + else -> baseDelay * (1L shl (attempt - 2)) + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + class MQTTRepositoryImplTest { + // ── Instance creation (covers constructor and field initialization) ──────────── + + @Test + fun `repository can be instantiated with default config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `proxyMessageFlow is not null after instantiation`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo.proxyMessageFlow) + } + + @Test + fun `disconnect does not throw when called before connect`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.disconnect() // should not throw an exception + } + + @Test + fun `disconnect can be called multiple times without error`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.disconnect() + repo.disconnect() + repo.disconnect() + } + @Test - fun `test address parsing logic`() { - val address1 = "mqtt.example.com:1883" - val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } - assertEquals("mqtt.example.com", host1) - assertEquals(1883, port1) + fun `publish does not throw when client is null`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + // client == null immediately after creation without connecting + repo.publish("msh/test", byteArrayOf(1, 2, 3), retained = false) + // test passes if no crash + } - val address2 = "mqtt.example.com" - val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) } - assertEquals("mqtt.example.com", host2) - assertEquals(1883, port2) + @Test + fun `publish does not throw with retained flag`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.publish("msh/test", byteArrayOf(0x08, 0x01), retained = true) } @Test - fun `test json payload parsing`() { + fun `publish does not throw with empty payload`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(), + ) + repo.publish("msh/test", ByteArray(0), retained = false) + } + + @Test + fun `repository works with null nodeId`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(), + nodeRepository = FakeNodeRepository(nodeId = null), + ) + assertNotNull(repo) + repo.disconnect() + } + + @Test + fun `repository works with TLS config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = + FakeRadioConfigRepository(mqttAddress = "broker.example.com", tlsEnabled = true), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `repository works with json enabled config`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = FakeRadioConfigRepository(jsonEnabled = true), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + @Test + fun `repository works with channel settings`() { + val repo = + MQTTRepositoryImpl( + radioConfigRepository = + FakeRadioConfigRepository( + channelSettings = listOf(ChannelSettings(name = "LongFast", downlink_enabled = true)), + ), + nodeRepository = FakeNodeRepository(), + ) + assertNotNull(repo) + } + + // ── Address parsing ─────────────────────────────────────────────────────────── + + @Test + fun `address with explicit port is parsed correctly`() { + val (host, port) = parseAddress("mqtt.example.com:1883") + assertEquals("mqtt.example.com", host) + assertEquals(1883, port) + } + + @Test + fun `address without port uses default plain port 1883`() { + val (host, port) = parseAddress("mqtt.example.com", tlsEnabled = false) + assertEquals("mqtt.example.com", host) + assertEquals(1883, port) + } + + @Test + fun `address without port uses default TLS port 8883`() { + val (_, port) = parseAddress("broker.local", tlsEnabled = true) + assertEquals(8883, port) + } + + @Test + fun `explicit port overrides TLS default`() { + val (host, port) = parseAddress("broker.local:9999", tlsEnabled = true) + assertEquals("broker.local", host) + assertEquals(9999, port) + } + + // ── Exponential backoff ─────────────────────────────────────────────────────── + + @Test + fun `backoff attempt 1 is immediate`() { + assertEquals(0L, backoffDelayMs(1)) + } + + @Test + fun `backoff attempt 2 is 2 seconds`() { + assertEquals(2_000L, backoffDelayMs(2)) + } + + @Test + fun `backoff attempt 3 is 4 seconds`() { + assertEquals(4_000L, backoffDelayMs(3)) + } + + @Test + fun `backoff attempt 4 is 8 seconds`() { + assertEquals(8_000L, backoffDelayMs(4)) + } + + @Test + fun `backoff attempt 5 is 16 seconds`() { + assertEquals(16_000L, backoffDelayMs(5)) + } + + @Test + fun `backoff attempt 6 is 32 seconds`() { + assertEquals(32_000L, backoffDelayMs(6)) + } + + @Test + fun `backoff is capped at 64 seconds from attempt 7`() { + val cap = 64_000L + assertEquals(cap, backoffDelayMs(7)) + assertEquals(cap, backoffDelayMs(8)) + assertEquals(cap, backoffDelayMs(100)) + } + + @Test + fun `backoff sequence is non-decreasing`() { + val delays = (1..10).map { backoffDelayMs(it) } + for (i in 0 until delays.size - 1) { + assertTrue(delays[i] <= delays[i + 1], "backoff[${i + 1}]=${delays[i + 1]} < backoff[$i]=${delays[i]}") + } + } + + // ── JSON serialization / deserialization ────────────────────────────────────── + + private val json = Json { ignoreUnknownKeys = true } + + @Test + fun `json payload is parsed correctly`() { val jsonStr = """{"type":"text","from":12345678,"to":4294967295,"payload":"Hello World","hop_limit":3,"id":123,"time":1600000000}""" - val json = Json { ignoreUnknownKeys = true } val payload = json.decodeFromString(jsonStr) - assertEquals("text", payload.type) assertEquals(12345678L, payload.from) assertEquals(4294967295L, payload.to) @@ -54,7 +382,7 @@ class MQTTRepositoryImplTest { } @Test - fun `test json payload serialization`() { + fun `json payload is serialized correctly`() { val payload = MqttJsonPayload( type = "text", @@ -65,11 +393,19 @@ class MQTTRepositoryImplTest { id = 123, time = 1600000000, ) - val json = Json { ignoreUnknownKeys = true } - val jsonStr = json.encodeToString(MqttJsonPayload.serializer(), payload) - + val jsonStr = json.encodeToString(payload) assertTrue(jsonStr.contains("\"type\":\"text\"")) assertTrue(jsonStr.contains("\"from\":12345678")) assertTrue(jsonStr.contains("\"payload\":\"Hello World\"")) } + + @Test + fun `json payload with optional fields null is valid`() { + val jsonStr = """{"type":"position","from":999}""" + val payload = json.decodeFromString(jsonStr) + assertEquals("position", payload.type) + assertEquals(999L, payload.from) + assertEquals(null, payload.to) + assertEquals(null, payload.payload) + } }