Skip to content

Commit 14e86b9

Browse files
jamesarichCopilot
andauthored
feat(mqtt): adopt mqttastic-client-kmp 0.2.0 — disconnect reasons + Test Connection (#5181)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent ef0e159 commit 14e86b9

12 files changed

Lines changed: 425 additions & 55 deletions

File tree

core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MqttManagerImpl.kt

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ import kotlinx.coroutines.flow.stateIn
3131
import org.koin.core.annotation.Named
3232
import org.koin.core.annotation.Single
3333
import org.meshtastic.core.model.MqttConnectionState
34+
import org.meshtastic.core.model.MqttProbeStatus
3435
import org.meshtastic.core.network.repository.MQTTRepository
36+
import org.meshtastic.core.network.repository.resolveEndpoint
3537
import org.meshtastic.core.repository.MqttManager
3638
import org.meshtastic.core.repository.PacketHandler
3739
import org.meshtastic.core.repository.ServiceRepository
3840
import org.meshtastic.mqtt.ConnectionState
41+
import org.meshtastic.mqtt.MqttClient
3942
import org.meshtastic.mqtt.MqttException
43+
import org.meshtastic.mqtt.ProbeResult
44+
import org.meshtastic.mqtt.probe
4045
import org.meshtastic.proto.MqttClientProxyMessage
4146
import org.meshtastic.proto.ToRadio
4247

@@ -52,9 +57,9 @@ class MqttManagerImpl(
5257

5358
override val mqttConnectionState: StateFlow<MqttConnectionState> =
5459
combine(proxyActive, mqttRepository.connectionState) { active, libState ->
55-
if (!active) MqttConnectionState.INACTIVE else libState.toAppState()
60+
if (!active) MqttConnectionState.Inactive else libState.toAppState()
5661
}
57-
.stateIn(scope, SharingStarted.Eagerly, MqttConnectionState.INACTIVE)
62+
.stateIn(scope, SharingStarted.Eagerly, MqttConnectionState.Inactive)
5863

5964
override fun startProxy(enabled: Boolean, proxyToClientEnabled: Boolean) {
6065
if (mqttMessageFlow?.isActive == true) return
@@ -102,9 +107,55 @@ class MqttManagerImpl(
102107
}
103108

104109
private fun ConnectionState.toAppState(): MqttConnectionState = when (this) {
105-
ConnectionState.DISCONNECTED -> MqttConnectionState.DISCONNECTED
106-
ConnectionState.CONNECTING -> MqttConnectionState.CONNECTING
107-
ConnectionState.CONNECTED -> MqttConnectionState.CONNECTED
108-
ConnectionState.RECONNECTING -> MqttConnectionState.RECONNECTING
110+
is ConnectionState.Connecting -> MqttConnectionState.Connecting
111+
is ConnectionState.Connected -> MqttConnectionState.Connected
112+
is ConnectionState.Reconnecting ->
113+
MqttConnectionState.Reconnecting(attempt = attempt, lastError = lastError?.message)
114+
is ConnectionState.Disconnected ->
115+
reason?.let { MqttConnectionState.Disconnected(reason = it.message) }
116+
?: MqttConnectionState.Disconnected.Idle
117+
}
118+
119+
override suspend fun probe(
120+
address: String,
121+
tlsEnabled: Boolean,
122+
username: String?,
123+
password: String?,
124+
): MqttProbeStatus {
125+
val endpoint = resolveEndpoint(address, tlsEnabled)
126+
val result =
127+
MqttClient.probe(endpoint = endpoint) {
128+
val user = username?.takeUnless { it.isEmpty() }
129+
val pass = password?.takeUnless { it.isEmpty() }
130+
if (user != null) this.username = user
131+
if (pass != null) password(pass)
132+
}
133+
return result.toAppStatus()
134+
}
135+
136+
private fun ProbeResult.toAppStatus(): MqttProbeStatus = when (this) {
137+
is ProbeResult.Success -> {
138+
val info = serverInfo
139+
val summary =
140+
buildList {
141+
info.assignedClientIdentifier?.let { add("client=$it") }
142+
info.maximumQosOrdinal?.let { add("maxQoS=$it") }
143+
info.serverKeepAliveSeconds?.let { add("keepalive=${it}s") }
144+
}
145+
.joinToString(", ")
146+
.ifEmpty { null }
147+
MqttProbeStatus.Success(serverInfo = summary)
148+
}
149+
is ProbeResult.Rejected ->
150+
MqttProbeStatus.Rejected(
151+
reasonCode = reasonCode.value,
152+
reason = message,
153+
serverReference = serverReference,
154+
)
155+
is ProbeResult.DnsFailure -> MqttProbeStatus.DnsFailure(message = cause.message)
156+
is ProbeResult.TcpFailure -> MqttProbeStatus.TcpFailure(message = cause.message)
157+
is ProbeResult.TlsFailure -> MqttProbeStatus.TlsFailure(message = cause.message)
158+
is ProbeResult.Timeout -> MqttProbeStatus.Timeout(timeoutMs = durationMs)
159+
is ProbeResult.Other -> MqttProbeStatus.Other(message = cause.message)
109160
}
110161
}

core/model/src/commonMain/kotlin/org/meshtastic/core/model/MqttConnectionState.kt

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,41 @@
1616
*/
1717
package org.meshtastic.core.model
1818

19-
/** App-level MQTT proxy connection state, decoupled from the MQTT library's internal type. */
20-
enum class MqttConnectionState {
19+
/**
20+
* App-level MQTT proxy connection state, decoupled from the MQTT library's internal type.
21+
*
22+
* Modeled as a sealed class so disconnect / reconnect events can carry diagnostic context — the user-facing reason for
23+
* an unexpected disconnect, or the most recent reconnect attempt failure — without requiring downstream consumers to
24+
* depend on the MQTT library's exception types.
25+
*/
26+
sealed class MqttConnectionState {
2127
/** The MQTT proxy has not been started (disabled or not yet initialized). */
22-
INACTIVE,
23-
24-
/** The MQTT client is not connected to the broker. */
25-
DISCONNECTED,
28+
data object Inactive : MqttConnectionState()
2629

2730
/** The MQTT client is actively connecting to the broker. */
28-
CONNECTING,
31+
data object Connecting : MqttConnectionState()
2932

3033
/** The MQTT client is connected and subscribed to topics. */
31-
CONNECTED,
34+
data object Connected : MqttConnectionState()
35+
36+
/**
37+
* The MQTT client lost connection and is attempting to reconnect.
38+
*
39+
* @property attempt 1-based attempt counter for the current reconnect loop.
40+
* @property lastError Localized message from the most recent reconnect failure, if any.
41+
*/
42+
data class Reconnecting(val attempt: Int = 0, val lastError: String? = null) : MqttConnectionState()
3243

33-
/** The MQTT client lost connection and is attempting to reconnect. */
34-
RECONNECTING,
44+
/**
45+
* The MQTT client is not connected to the broker.
46+
*
47+
* @property reason Localized failure message for an unexpected disconnect, or `null` for the idle / initial /
48+
* intentional-close case (use [Idle]).
49+
*/
50+
data class Disconnected(val reason: String? = null) : MqttConnectionState() {
51+
companion object {
52+
/** Singleton for the idle / no-reason disconnected state. */
53+
val Idle: Disconnected = Disconnected(reason = null)
54+
}
55+
}
3556
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2026 Meshtastic LLC
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
*/
17+
package org.meshtastic.core.model
18+
19+
/**
20+
* UI-friendly outcome of a one-shot MQTT broker reachability probe.
21+
*
22+
* Mirrors the failure shapes of `org.meshtastic.mqtt.ProbeResult` but stays in the model module so feature/UI code can
23+
* consume the result without depending on the MQTT library.
24+
*/
25+
sealed class MqttProbeStatus {
26+
/** Probe is currently in flight. */
27+
data object Probing : MqttProbeStatus()
28+
29+
/**
30+
* Broker accepted the connection. [serverInfo] is a short human-readable summary of any CONNACK properties that are
31+
* useful to surface to the user.
32+
*/
33+
data class Success(val serverInfo: String?) : MqttProbeStatus()
34+
35+
/** Broker rejected the connection (CONNACK with non-zero reason code). */
36+
data class Rejected(val reasonCode: Int, val reason: String?, val serverReference: String?) : MqttProbeStatus()
37+
38+
/** DNS lookup failed. */
39+
data class DnsFailure(val message: String?) : MqttProbeStatus()
40+
41+
/** TCP socket could not be opened. */
42+
data class TcpFailure(val message: String?) : MqttProbeStatus()
43+
44+
/** TLS handshake failed. */
45+
data class TlsFailure(val message: String?) : MqttProbeStatus()
46+
47+
/** Probe exceeded its timeout. */
48+
data class Timeout(val timeoutMs: Long) : MqttProbeStatus()
49+
50+
/** Any other / unclassified failure. */
51+
data class Other(val message: String?) : MqttProbeStatus()
52+
}

core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class MQTTRepositoryImpl(
6565
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
6666
private const val JSON_TOPIC_LEVEL = "/2/json/"
6767
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
68-
private const val WEBSOCKET_PATH = "/mqtt"
6968
private const val KEEPALIVE_SECONDS = 30
7069
private const val INITIAL_RECONNECT_DELAY_MS = 1000L
7170
private const val MAX_RECONNECT_DELAY_MS = 30_000L
@@ -74,7 +73,7 @@ class MQTTRepositoryImpl(
7473

7574
@Volatile private var client: MqttClient? = null
7675

77-
private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED)
76+
private val _connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected.Idle)
7877
override val connectionState: StateFlow<ConnectionState> = _connectionState.asStateFlow()
7978

8079
@OptIn(ExperimentalSerializationApi::class)
@@ -89,7 +88,7 @@ class MQTTRepositoryImpl(
8988
Logger.i { "MQTT Disconnecting" }
9089
val c = client
9190
client = null
92-
_connectionState.value = ConnectionState.DISCONNECTED
91+
_connectionState.value = ConnectionState.Disconnected.Idle
9392
scope.launch { safeCatching { c?.close() }.onFailure { e -> Logger.w(e) { "MQTT clean disconnect failed" } } }
9493
}
9594

@@ -102,14 +101,7 @@ class MQTTRepositoryImpl(
102101
val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT
103102

104103
val rawAddress = mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS
105-
val endpoint =
106-
if (rawAddress.contains("://")) {
107-
MqttEndpoint.parse(rawAddress)
108-
} else {
109-
// Use WebSocket transport on all platforms for firewall/CDN compatibility.
110-
val scheme = if (mqttConfig?.tls_enabled == true) "wss" else "ws"
111-
MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH")
112-
}
104+
val endpoint = resolveEndpoint(rawAddress, mqttConfig?.tls_enabled == true)
113105

114106
val newClient =
115107
MqttClient(ownerId) {
@@ -226,3 +218,26 @@ class MQTTRepositoryImpl(
226218
}
227219
}
228220
}
221+
222+
/**
223+
* Resolve a user-supplied broker address into an [MqttEndpoint].
224+
*
225+
* Address resolution rules:
226+
* - If [rawAddress] already contains a URI scheme (`scheme://…`), parse it directly via [MqttEndpoint.parse] and
227+
* respect whatever transport / port the user encoded.
228+
* - Otherwise wrap it as a WebSocket endpoint (`ws[s]://host${WEBSOCKET_PATH}`) so the proxy works over CDNs and
229+
* firewall-restricted networks where raw 1883/8883 may be blocked. The scheme is `wss` when [tlsEnabled] is `true`,
230+
* `ws` otherwise.
231+
*
232+
* Extracted as a top-level function so [MQTTRepositoryImplTest] can exercise every branch without spinning up the full
233+
* repository, and so `MqttManagerImpl` (in `:core:data`) can reuse the same parsing rules for the probe API. Visibility
234+
* is `public` because Kotlin's `internal` is scoped per Gradle module.
235+
*/
236+
fun resolveEndpoint(rawAddress: String, tlsEnabled: Boolean): MqttEndpoint = if (rawAddress.contains("://")) {
237+
MqttEndpoint.parse(rawAddress)
238+
} else {
239+
val scheme = if (tlsEnabled) "wss" else "ws"
240+
MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH")
241+
}
242+
243+
private const val WEBSOCKET_PATH = "/mqtt"

core/network/src/commonTest/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImplTest.kt

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,82 @@ package org.meshtastic.core.network.repository
1818

1919
import kotlinx.serialization.json.Json
2020
import org.meshtastic.core.model.MqttJsonPayload
21+
import org.meshtastic.mqtt.MqttEndpoint
2122
import kotlin.test.Test
2223
import kotlin.test.assertEquals
24+
import kotlin.test.assertIs
2325
import kotlin.test.assertTrue
2426

2527
class MQTTRepositoryImplTest {
2628

29+
// region resolveEndpoint — every behavioral branch of address parsing.
30+
31+
@Test
32+
fun `bare host without scheme is wrapped as ws WebSocket on the standard port`() {
33+
val endpoint = resolveEndpoint(rawAddress = "broker.example.com", tlsEnabled = false)
34+
35+
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
36+
assertEquals("ws://broker.example.com/mqtt", ws.url)
37+
}
38+
2739
@Test
28-
fun `test address parsing logic`() {
29-
val address1 = "mqtt.example.com:1883"
30-
val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
31-
assertEquals("mqtt.example.com", host1)
32-
assertEquals(1883, port1)
33-
34-
val address2 = "mqtt.example.com"
35-
val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
36-
assertEquals("mqtt.example.com", host2)
37-
assertEquals(1883, port2)
40+
fun `bare host with TLS enabled is upgraded to wss`() {
41+
val endpoint = resolveEndpoint(rawAddress = "broker.example.com", tlsEnabled = true)
42+
43+
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
44+
assertEquals("wss://broker.example.com/mqtt", ws.url)
3845
}
3946

47+
@Test
48+
fun `host with explicit port is preserved when wrapped`() {
49+
val endpoint = resolveEndpoint(rawAddress = "broker.example.com:9001", tlsEnabled = false)
50+
51+
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
52+
assertEquals("ws://broker.example.com:9001/mqtt", ws.url)
53+
}
54+
55+
@Test
56+
fun `address with ws scheme is parsed as-is and tls flag is ignored`() {
57+
// tlsEnabled is intentionally true here — when the user supplies a full URL we
58+
// must honor whatever scheme they provided, not silently upgrade it.
59+
val endpoint = resolveEndpoint(rawAddress = "ws://broker.example.com:8080/custom-path", tlsEnabled = true)
60+
61+
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
62+
assertEquals("ws://broker.example.com:8080/custom-path", ws.url)
63+
}
64+
65+
@Test
66+
fun `address with wss scheme is parsed as-is`() {
67+
val endpoint = resolveEndpoint(rawAddress = "wss://broker.example.com/secure-mqtt", tlsEnabled = false)
68+
69+
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
70+
assertEquals("wss://broker.example.com/secure-mqtt", ws.url)
71+
}
72+
73+
@Test
74+
fun `address with mqtt tcp scheme is parsed as Tcp endpoint`() {
75+
val endpoint = resolveEndpoint(rawAddress = "mqtt://broker.example.com:1883", tlsEnabled = false)
76+
77+
val tcp = assertIs<MqttEndpoint.Tcp>(endpoint)
78+
assertEquals("broker.example.com", tcp.host)
79+
assertEquals(1883, tcp.port)
80+
assertEquals(false, tcp.tls)
81+
}
82+
83+
@Test
84+
fun `address with mqtts tcp scheme is parsed as Tcp endpoint with tls true`() {
85+
val endpoint = resolveEndpoint(rawAddress = "mqtts://broker.example.com:8883", tlsEnabled = false)
86+
87+
val tcp = assertIs<MqttEndpoint.Tcp>(endpoint)
88+
assertEquals("broker.example.com", tcp.host)
89+
assertEquals(8883, tcp.port)
90+
assertEquals(true, tcp.tls)
91+
}
92+
93+
// endregion
94+
95+
// region MqttJsonPayload — keep the existing JSON contract tests.
96+
4097
@Test
4198
fun `test json payload parsing`() {
4299
val jsonStr =
@@ -72,4 +129,6 @@ class MQTTRepositoryImplTest {
72129
assertTrue(jsonStr.contains("\"from\":12345678"))
73130
assertTrue(jsonStr.contains("\"payload\":\"Hello World\""))
74131
}
132+
133+
// endregion
75134
}

core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/MqttManager.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.meshtastic.core.repository
1818

1919
import kotlinx.coroutines.flow.StateFlow
2020
import org.meshtastic.core.model.MqttConnectionState
21+
import org.meshtastic.core.model.MqttProbeStatus
2122
import org.meshtastic.proto.MqttClientProxyMessage
2223

2324
/** Interface for managing MQTT proxy communication. */
@@ -33,4 +34,15 @@ interface MqttManager {
3334

3435
/** Handles an MQTT proxy message from the radio. */
3536
fun handleMqttProxyMessage(message: MqttClientProxyMessage)
37+
38+
/**
39+
* Probe an MQTT broker to verify connectivity and credentials without joining the proxy lifecycle. Intended for UI
40+
* "Test Connection" affordances.
41+
*
42+
* @param address Raw broker address as the user would type it (host, host:port, or full URL).
43+
* @param tlsEnabled `true` to upgrade bare addresses to `wss://` (ignored when [address] already has a scheme).
44+
* @param username Optional MQTT username.
45+
* @param password Optional MQTT password.
46+
*/
47+
suspend fun probe(address: String, tlsEnabled: Boolean, username: String?, password: String?): MqttProbeStatus
3648
}

0 commit comments

Comments
 (0)