Skip to content

Commit f2a07bb

Browse files
committed
Wrap MQTT proxyMessageFlow with shareIn to prevent lifecycle-related disconnects.
Prevents callbackFlow from closing when collector stops (UI changes, errors, etc.).
1 parent f245bf8 commit f2a07bb

1 file changed

Lines changed: 36 additions & 13 deletions

File tree

core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ import kotlinx.coroutines.SupervisorJob
3232
import kotlinx.coroutines.channels.awaitClose
3333
import kotlinx.coroutines.delay
3434
import kotlinx.coroutines.flow.Flow
35+
import kotlinx.coroutines.flow.SharingStarted
3536
import kotlinx.coroutines.flow.callbackFlow
3637
import kotlinx.coroutines.flow.first
38+
import kotlinx.coroutines.flow.shareIn
3739
import kotlinx.coroutines.isActive
3840
import kotlinx.coroutines.launch
3941
import kotlinx.coroutines.sync.Mutex
@@ -80,11 +82,7 @@ class MQTTRepositoryImpl(
8082
}
8183

8284
@OptIn(ExperimentalUnsignedTypes::class)
83-
/**
84-
* Cold flow. MUST be collected by exactly one subscriber. Multiple collectors will create duplicate MQTT clients
85-
* with same clientId, causing broker to disconnect previous connections.
86-
*/
87-
override val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
85+
private fun createProxyMessageFlow(): Flow<MqttClientProxyMessage> = callbackFlow {
8886
val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}"
8987
val channelSet = radioConfigRepository.channelSetFlow.first()
9088
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
@@ -113,11 +111,7 @@ class MQTTRepositoryImpl(
113111
// Using IO-dispatcher since we use blocking MQTTClient.run()
114112
clientJob =
115113
scope.launch(Dispatchers.IO) {
116-
@Suppress("MagicNumber")
117-
val baseDelay = 2_000L
118-
119-
// Base backoff value
120-
@Suppress("MagicNumber")
114+
val baseDelay = 2_000L // Base backoff value
121115
val maxDelay = 64_000L // Maximal backoff value
122116

123117
// Reconnection loop
@@ -129,7 +123,6 @@ class MQTTRepositoryImpl(
129123
}
130124

131125
// Exponential backoff
132-
@Suppress("MagicNumber")
133126
val delayMs =
134127
when {
135128
attempt == 1 -> 0
@@ -144,7 +137,6 @@ class MQTTRepositoryImpl(
144137

145138
// Creating client on each iteration
146139
var newClient: MQTTClient? = null
147-
@Suppress("TooGenericExceptionCaught")
148140
try {
149141
newClient =
150142
MQTTClient(
@@ -249,6 +241,38 @@ class MQTTRepositoryImpl(
249241
awaitClose { disconnect() }
250242
}
251243

244+
/**
245+
* Cold flow that creates MQTT client and manages connection lifecycle.
246+
*
247+
* Single collector requirement:
248+
* This flow MUST be collected by exactly one subscriber. Multiple collectors
249+
* will create duplicate MQTT clients with the same clientId, causing the broker
250+
* to disconnect previous connections (standard MQTT behavior).
251+
*
252+
* Lifecycle fix (shareIn wrapper):
253+
* Originally, this callbackFlow would close when the collector stopped (e.g., UI
254+
* lifecycle changes, configuration changes, collector errors). This triggered
255+
* awaitClose { disconnect() }, which canceled the reconnect loop entirely.
256+
*
257+
* Symptoms observed:
258+
* - "MQTT message dropped: flow channel closed" logs
259+
* - No reconnection attempts after initial connection loss
260+
* - MQTT permanently dead until app restart
261+
*
262+
* Possible solution: Wrap with shareIn() to create a hot SharedFlow that:
263+
* - Survives temporary collector loss (30s timeout)
264+
* - Keeps reconnect loop running independently of UI/collector lifecycle
265+
* - Only stops when the application process terminates
266+
*/
267+
@OptIn(ExperimentalUnsignedTypes::class)
268+
override val proxyMessageFlow: Flow<MqttClientProxyMessage> =
269+
createProxyMessageFlow()
270+
.shareIn(
271+
scope = CoroutineScope(SupervisorJob() + Dispatchers.IO),
272+
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 30_000),
273+
replay = 0
274+
)
275+
252276
@OptIn(ExperimentalUnsignedTypes::class)
253277
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
254278
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
@@ -261,7 +285,6 @@ class MQTTRepositoryImpl(
261285
}
262286

263287
publishSemaphore.withPermit {
264-
@Suppress("TooGenericExceptionCaught")
265288
try {
266289
c.publish(
267290
retain = retained,

0 commit comments

Comments
 (0)