@@ -21,19 +21,28 @@ import io.github.davidepianca98.MQTTClient
2121import io.github.davidepianca98.mqtt.MQTTVersion
2222import io.github.davidepianca98.mqtt.Subscription
2323import io.github.davidepianca98.mqtt.packets.Qos
24+ import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode
2425import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
2526import io.github.davidepianca98.socket.tls.TLSClientSettings
2627import kotlinx.coroutines.CoroutineScope
28+ import kotlinx.coroutines.Dispatchers
29+ import kotlinx.coroutines.IO
2730import kotlinx.coroutines.Job
2831import kotlinx.coroutines.SupervisorJob
2932import kotlinx.coroutines.channels.awaitClose
33+ import kotlinx.coroutines.delay
3034import kotlinx.coroutines.flow.Flow
3135import kotlinx.coroutines.flow.callbackFlow
3236import kotlinx.coroutines.flow.first
37+ import kotlinx.coroutines.isActive
3338import kotlinx.coroutines.launch
39+ import kotlinx.coroutines.sync.Mutex
3440import kotlinx.coroutines.sync.Semaphore
41+ import kotlinx.coroutines.sync.withLock
3542import kotlinx.coroutines.sync.withPermit
43+ import kotlinx.serialization.SerializationException
3644import kotlinx.serialization.json.Json
45+ import okio.ByteString
3746import okio.ByteString.Companion.toByteString
3847import org.koin.core.annotation.Single
3948import org.meshtastic.core.model.MqttJsonPayload
@@ -46,7 +55,6 @@ import org.meshtastic.proto.MqttClientProxyMessage
4655class MQTTRepositoryImpl (
4756 private val radioConfigRepository : RadioConfigRepository ,
4857 private val nodeRepository : NodeRepository ,
49- dispatchers : org.meshtastic.core.di.CoroutineDispatchers ,
5058) : MQTTRepository {
5159
5260 companion object {
@@ -58,9 +66,11 @@ class MQTTRepositoryImpl(
5866
5967 private var client: MQTTClient ? = null
6068 private val json = Json { ignoreUnknownKeys = true }
61- private val scope = CoroutineScope (dispatchers.default + SupervisorJob ())
69+ private val scope = CoroutineScope (Dispatchers . Default + SupervisorJob ())
6270 private var clientJob: Job ? = null
6371 private val publishSemaphore = Semaphore (20 )
72+ private val reconnectMutex = Mutex ()
73+ private var reconnectAttempt = 0
6474
6575 override fun disconnect () {
6676 Logger .i { " MQTT Disconnecting" }
@@ -70,6 +80,10 @@ class MQTTRepositoryImpl(
7080 }
7181
7282 @OptIn(ExperimentalUnsignedTypes ::class )
83+ /* *
84+ * Cold flow. MUST be collected by exactly one subscriber. Multiple collectors will create duplicate MQTT clients
85+ * with same clientId, causing broker to disconnect previous connections.
86+ */
7387 override val proxyMessageFlow: Flow <MqttClientProxyMessage > = callbackFlow {
7488 val ownerId = " MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ? : " unknown" } "
7589 val channelSet = radioConfigRepository.channelSetFlow.first()
@@ -82,64 +96,7 @@ class MQTTRepositoryImpl(
8296 it[0 ] to (it.getOrNull(1 )?.toIntOrNull() ? : if (mqttConfig?.tls_enabled == true ) 8883 else 1883 )
8397 }
8498
85- val newClient =
86- MQTTClient (
87- mqttVersion = MQTTVersion .MQTT5 ,
88- address = host,
89- port = port,
90- tls = if (mqttConfig?.tls_enabled == true ) TLSClientSettings () else null ,
91- userName = mqttConfig?.username,
92- password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
93- clientId = ownerId,
94- publishReceived = { packet ->
95- val topic = packet.topicName
96- val payload = packet.payload?.toByteArray()
97- Logger .d { " MQTT received message on topic $topic (size: ${payload?.size ? : 0 } bytes)" }
98-
99- if (topic.contains(" /json/" )) {
100- try {
101- val jsonStr = payload?.decodeToString() ? : " "
102- // Validate JSON by parsing it
103- json.decodeFromString<MqttJsonPayload >(jsonStr)
104- Logger .d { " MQTT parsed JSON payload successfully" }
105-
106- trySend(MqttClientProxyMessage (topic = topic, text = jsonStr, retained = packet.retain))
107- } catch (e: kotlinx.serialization.SerializationException ) {
108- Logger .e(e) { " Failed to parse MQTT JSON: ${e.message} " }
109- } catch (e: IllegalArgumentException ) {
110- Logger .e(e) { " Failed to parse MQTT JSON: ${e.message} " }
111- }
112- } else {
113- trySend(
114- MqttClientProxyMessage (
115- topic = topic,
116- data_ = payload?.toByteString() ? : okio.ByteString .EMPTY ,
117- retained = packet.retain,
118- ),
119- )
120- }
121- },
122- )
123-
124- client = newClient
125-
126- clientJob = scope.launch {
127- try {
128- Logger .i { " MQTT Starting client loop for $host :$port " }
129- newClient.runSuspend()
130- } catch (e: io.github.davidepianca98.mqtt.MQTTException ) {
131- Logger .e(e) { " MQTT Client loop error (MQTT)" }
132- close(e)
133- } catch (e: io.github.davidepianca98.socket.IOException ) {
134- Logger .e(e) { " MQTT Client loop error (IO)" }
135- close(e)
136- } catch (e: kotlinx.coroutines.CancellationException ) {
137- Logger .i { " MQTT Client loop cancelled" }
138- throw e
139- }
140- }
141-
142- // Subscriptions
99+ // Subscriptions (out of loop)
143100 val subscriptions = mutableListOf<Subscription >()
144101 channelSet.subscribeList.forEach { globalId ->
145102 subscriptions.add(
@@ -153,10 +110,135 @@ class MQTTRepositoryImpl(
153110 }
154111 subscriptions.add(Subscription (" $rootTopic${DEFAULT_TOPIC_LEVEL } PKI/+" , SubscriptionOptions (Qos .AT_LEAST_ONCE )))
155112
156- if (subscriptions.isNotEmpty()) {
157- Logger .d { " MQTT subscribing to ${subscriptions.size} topics" }
158- newClient.subscribe(subscriptions)
159- }
113+ // Using IO-dispatcher since we use blocking MQTTClient.run()
114+ clientJob =
115+ scope.launch(Dispatchers .IO ) {
116+ val baseDelay = 2_000L // Base backoff value
117+ val maxDelay = 64_000L // Maximal backoff value
118+
119+ // Reconnection loop
120+ while (isActive) {
121+ val attempt =
122+ reconnectMutex.withLock {
123+ ++ reconnectAttempt // Don't really think we will ever get overflow here since it will take
124+ // 4300 years
125+ }
126+
127+ // Exponential backoff
128+ val delayMs =
129+ when {
130+ attempt == 1 -> 0
131+ attempt >= 7 -> maxDelay
132+ else -> baseDelay * (1L shl (attempt - 2 )) // Backoff 2→4→8→16→32→64 seconds
133+ }
134+
135+ if (delayMs > 0 ) {
136+ Logger .w { " MQTT reconnect #$attempt in ${delayMs / 1000 } s" }
137+ delay(delayMs)
138+ }
139+
140+ // Creating client on each iteration
141+ var newClient: MQTTClient ? = null
142+ try {
143+ newClient =
144+ MQTTClient (
145+ mqttVersion = MQTTVersion .MQTT5 ,
146+ address = host,
147+ port = port,
148+ tls = if (mqttConfig?.tls_enabled == true ) TLSClientSettings () else null ,
149+ userName = mqttConfig?.username,
150+ password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
151+ clientId = ownerId,
152+ onConnected = {
153+ Logger .i { " MQTT connected" }
154+ scope.launch {
155+ // Reset backoff
156+ reconnectMutex.withLock { reconnectAttempt = 0 }
157+ }
158+ },
159+ onDisconnected = { Logger .w { " MQTT disconnected" } },
160+ publishReceived = { packet ->
161+ val topic = packet.topicName
162+ val payload = packet.payload?.toByteArray()
163+ Logger .d {
164+ " MQTT received message on topic $topic (size: ${payload?.size ? : 0 } bytes)"
165+ }
166+
167+ val result =
168+ trySend(
169+ (
170+ if (topic.contains(" /json/" )) {
171+ try {
172+ val jsonStr = payload?.decodeToString() ? : " "
173+ // Validate JSON by parsing it
174+ json.decodeFromString<MqttJsonPayload >(jsonStr)
175+ Logger .d { " MQTT parsed JSON payload successfully" }
176+ MqttClientProxyMessage (
177+ topic = topic,
178+ text = jsonStr,
179+ retained = packet.retain,
180+ )
181+ } catch (e: SerializationException ) {
182+ Logger .e(e) { " Failed to parse MQTT JSON: ${e.message} " }
183+ } catch (e: IllegalArgumentException ) {
184+ Logger .e(e) { " Failed to parse MQTT JSON: ${e.message} " }
185+ }
186+ } else {
187+ MqttClientProxyMessage (
188+ topic = topic,
189+ data_ = payload?.toByteString() ? : ByteString .EMPTY ,
190+ retained = packet.retain,
191+ )
192+ }
193+ )
194+ as MqttClientProxyMessage ,
195+ )
196+ if (result.isFailure) {
197+ Logger .w { " MQTT message dropped: flow channel closed" }
198+ }
199+ },
200+ )
201+
202+ if (subscriptions.isNotEmpty()) {
203+ Logger .d { " MQTT subscribing to ${subscriptions.size} topics" }
204+ newClient.subscribe(subscriptions)
205+ }
206+
207+ // Renew client for publish()
208+ client = newClient
209+
210+ Logger .i { " MQTT run loop start ($host :$port )" }
211+
212+ // Note: run() is blocking. Cancellation via clientJob.cancel()
213+ // will be processed when run() returns or if the library checks for interruption.
214+ // Test with actual network disconnect to verify timely shutdown.
215+ newClient.run () // Blocking
216+
217+ Logger .w { " MQTT run() exited normally — reconnecting" }
218+ } catch (e: io.github.davidepianca98.mqtt.MQTTException ) {
219+ Logger .e(e) { " MQTT protocol error (attempt #$attempt )" }
220+ // Continue loop
221+ } catch (e: io.github.davidepianca98.socket.IOException ) {
222+ Logger .e(e) { " MQTT IO error (attempt #$attempt )" }
223+ // Continue loop
224+ } catch (e: kotlinx.coroutines.CancellationException ) {
225+ Logger .i { " MQTT reconnect loop cancelled" }
226+ throw e // Stop
227+ } catch (e: Exception ) {
228+ Logger .e(e) { " MQTT unexpected error (attempt #$attempt ): ${e::class .simpleName} " }
229+ } finally {
230+ // Cleanup
231+ newClient?.let {
232+ if (client == = it) {
233+ client = null
234+ }
235+ try {
236+ newClient.disconnect(ReasonCode .SUCCESS ) // Success?
237+ } catch (_: Exception ) {}
238+ }
239+ }
240+ }
241+ }
160242
161243 awaitClose { disconnect() }
162244 }
@@ -165,13 +247,25 @@ class MQTTRepositoryImpl(
165247 override fun publish (topic : String , data : ByteArray , retained : Boolean ) {
166248 Logger .d { " MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained )" }
167249 scope.launch {
250+ val c = client
251+
252+ if (c == null || ! c.isConnackReceived()) {
253+ Logger .w { " MQTT not connected, dropping message" }
254+ return @launch
255+ }
256+
168257 publishSemaphore.withPermit {
169- client?.publish(
170- retain = retained,
171- qos = Qos .AT_LEAST_ONCE ,
172- topic = topic,
173- payload = data.toUByteArray(),
174- )
258+ try {
259+ c.publish(
260+ retain = retained,
261+ qos = Qos .AT_LEAST_ONCE ,
262+ topic = topic,
263+ payload = data.toUByteArray(),
264+ ) // Potential TOCTOU.
265+ Logger .d { " MQTT publish succeeded" }
266+ } catch (e: Exception ) {
267+ Logger .w(e) { " MQTT publish failed" }
268+ }
175269 }
176270 }
177271 }
0 commit comments