@@ -82,6 +82,7 @@ class MQTTRepositoryImpl(
8282 }
8383
8484 @OptIn(ExperimentalUnsignedTypes ::class )
85+ @Suppress(" LongMethod" , " CyclomaticComplexMethod" )
8586 private fun createProxyMessageFlow (): Flow <MqttClientProxyMessage > = callbackFlow {
8687 val ownerId = " MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ? : " unknown" } "
8788 val channelSet = radioConfigRepository.channelSetFlow.first()
@@ -91,6 +92,7 @@ class MQTTRepositoryImpl(
9192
9293 val (host, port) =
9394 (mqttConfig?.address ? : DEFAULT_SERVER_ADDRESS ).split(" :" , limit = 2 ).let {
95+ @Suppress(" MagicNumber" )
9496 it[0 ] to (it.getOrNull(1 )?.toIntOrNull() ? : if (mqttConfig?.tls_enabled == true ) 8883 else 1883 )
9597 }
9698
@@ -109,18 +111,18 @@ class MQTTRepositoryImpl(
109111 subscriptions.add(Subscription (" $rootTopic${DEFAULT_TOPIC_LEVEL } PKI/+" , SubscriptionOptions (Qos .AT_LEAST_ONCE )))
110112
111113 // Using IO-dispatcher since we use blocking MQTTClient.run()
114+ @Suppress(" MagicNumber" )
112115 clientJob =
113116 scope.launch(Dispatchers .IO ) {
114117 val baseDelay = 2_000L // Base backoff value
115118 val maxDelay = 64_000L // Maximal backoff value
116119
117120 // Reconnection loop
118121 while (isActive) {
119- val attempt =
120- reconnectMutex.withLock {
121- ++ reconnectAttempt // Don't really think we will ever get overflow here since it will take
122- // 4300 years
123- }
122+ val attempt = reconnectMutex.withLock {
123+ ++ reconnectAttempt // Don't really think we will ever get overflow here since it will take
124+ // 4300 years
125+ }
124126
125127 // Exponential backoff
126128 val delayMs =
@@ -137,6 +139,7 @@ class MQTTRepositoryImpl(
137139
138140 // Creating client on each iteration
139141 var newClient: MQTTClient ? = null
142+ @Suppress(" TooGenericExceptionCaught" )
140143 try {
141144 newClient =
142145 MQTTClient (
@@ -244,15 +247,13 @@ class MQTTRepositoryImpl(
244247 /* *
245248 * Cold flow that creates MQTT client and manages connection lifecycle.
246249 *
247- * Single collector requirement:
248- * This flow MUST be collected by exactly one subscriber. Multiple collectors
249- * will create duplicate MQTT clients with the same clientId, causing the broker
250- * to disconnect previous connections (standard MQTT behavior).
250+ * Single collector requirement: This flow MUST be collected by exactly one subscriber. Multiple collectors will
251+ * create duplicate MQTT clients with the same clientId, causing the broker to disconnect previous connections
252+ * (standard MQTT behavior).
251253 *
252- * Lifecycle fix (shareIn wrapper):
253- * Originally, this callbackFlow would close when the collector stopped (e.g., UI
254- * lifecycle changes, configuration changes, collector errors). This triggered
255- * awaitClose { disconnect() }, which canceled the reconnect loop entirely.
254+ * Lifecycle fix (shareIn wrapper): Originally, this callbackFlow would close when the collector stopped (e.g., UI
255+ * lifecycle changes, configuration changes, collector errors). This triggered awaitClose { disconnect() }, which
256+ * canceled the reconnect loop entirely.
256257 *
257258 * Symptoms observed:
258259 * - "MQTT message dropped: flow channel closed" logs
@@ -270,7 +271,7 @@ class MQTTRepositoryImpl(
270271 .shareIn(
271272 scope = CoroutineScope (SupervisorJob () + Dispatchers .IO ),
272273 started = SharingStarted .WhileSubscribed (stopTimeoutMillis = 30_000 ),
273- replay = 0
274+ replay = 0 ,
274275 )
275276
276277 @OptIn(ExperimentalUnsignedTypes ::class )
@@ -285,6 +286,7 @@ class MQTTRepositoryImpl(
285286 }
286287
287288 publishSemaphore.withPermit {
289+ @Suppress(" TooGenericExceptionCaught" )
288290 try {
289291 c.publish(
290292 retain = retained,
0 commit comments