Skip to content

Commit 6761b47

Browse files
jamesarichclaude
andauthored
fix(node): chart local-node air-quality telemetry (orphaning + zero-suppression) (#5793)
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 269bab0 commit 6761b47

6 files changed

Lines changed: 425 additions & 61 deletions

File tree

core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImpl.kt

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.meshtastic.core.data.manager
1919
import co.touchlab.kermit.Logger
2020
import kotlinx.coroutines.CoroutineScope
2121
import kotlinx.coroutines.Job
22+
import kotlinx.coroutines.flow.combine
23+
import kotlinx.coroutines.flow.distinctUntilChanged
2224
import kotlinx.coroutines.flow.launchIn
2325
import kotlinx.coroutines.flow.onEach
2426
import kotlinx.coroutines.launch
@@ -80,10 +82,17 @@ class MeshMessageProcessorImpl(
8082
}
8183

8284
init {
83-
nodeManager.isNodeDbReady
84-
.onEach { ready ->
85-
if (ready) {
86-
flushEarlyReceivedPackets("dbReady")
85+
// Flush buffered packets only once BOTH the node DB is ready AND our own node number is known. Processing a
86+
// received packet while myNodeNum is still null would key a local packet under its raw from_num instead of
87+
// NODE_NUM_LOCAL (see [handleReceivedMeshPacket] / [processReceivedMeshPacket]), orphaning it from per-node
88+
// queries. Emit the (ready, myNodeNum) pair — not the derived boolean — so distinctUntilChanged re-fires on
89+
// every underlying state change (including a reconnect where myNodeNum transitions null -> value) rather than
90+
// collapsing distinct states that happen to map to the same boolean.
91+
combine(nodeManager.isNodeDbReady, nodeManager.myNodeNum) { ready, myNodeNum -> ready to myNodeNum }
92+
.distinctUntilChanged()
93+
.onEach { (ready, myNodeNum) ->
94+
if (ready && myNodeNum != null) {
95+
flushEarlyReceivedPackets("ready")
8796
}
8897
}
8998
.launchIn(scope)
@@ -157,7 +166,12 @@ class MeshMessageProcessorImpl(
157166
}
158167
val preparedPacket = packet.copy(rx_time = rxTime)
159168

160-
if (nodeManager.isNodeDbReady.value) {
169+
// Require myNodeNum to be known before storing: processReceivedMeshPacket only keys a local packet under
170+
// NODE_NUM_LOCAL when packet.from == myNodeNum. If myNodeNum is still null (early in a (re)connect, before
171+
// MyNodeInfo resolves), a local packet would be stored under its raw from_num and become invisible to
172+
// per-node chart queries while still appearing in the unfiltered Debug log. Buffer until both are ready;
173+
// the init combine flushes the buffer once myNodeNum resolves.
174+
if (nodeManager.isNodeDbReady.value && myNodeNum != null) {
161175
processReceivedMeshPacket(preparedPacket, myNodeNum)
162176
} else {
163177
scope.launch {
@@ -175,6 +189,10 @@ class MeshMessageProcessorImpl(
175189

176190
private fun flushEarlyReceivedPackets(reason: String) {
177191
scope.launch {
192+
// Resolve and null-check myNodeNum BEFORE draining the buffer: if it regressed to null between the flush
193+
// trigger and here, leave the packets buffered for the next resolution rather than draining and keying
194+
// them under their raw from_num. The captured non-null value is used for the whole batch.
195+
val myNodeNum = nodeManager.myNodeNum.value ?: return@launch
178196
val packets =
179197
earlyMutex.withLock {
180198
if (earlyReceivedPackets.isEmpty()) return@withLock emptyList<MeshPacket>()
@@ -185,7 +203,6 @@ class MeshMessageProcessorImpl(
185203
if (packets.isEmpty()) return@launch
186204

187205
Logger.d { "replayEarlyPackets reason=$reason count=${packets.size}" }
188-
val myNodeNum = nodeManager.myNodeNum.value
189206
packets.forEach { processReceivedMeshPacket(it, myNodeNum) }
190207
}
191208
}

core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImplTest.kt

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import dev.mokkery.every
2222
import dev.mokkery.matcher.any
2323
import dev.mokkery.mock
2424
import dev.mokkery.verify
25+
import dev.mokkery.verify.VerifyMode
2526
import dev.mokkery.verifySuspend
2627
import kotlinx.coroutines.CoroutineScope
2728
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -287,10 +288,13 @@ class MeshMessageProcessorImplTest {
287288
// No crash, no emitMeshPacket call (decoded is null so processReceivedMeshPacket returns early)
288289
}
289290

290-
// ---------- handleReceivedMeshPacket: null myNodeNum ----------
291+
// ---------- handleReceivedMeshPacket: myNodeNum not yet known ----------
291292

292293
@Test
293-
fun `processReceivedMeshPacket with null myNodeNum skips node updates`() = runTest(testDispatcher) {
294+
fun `packets received before myNodeNum is known are buffered until it resolves`() = runTest(testDispatcher) {
295+
// Our own node number is not yet known, even though the node DB is ready.
296+
val myNodeNumFlow = MutableStateFlow<Int?>(null)
297+
every { nodeManager.myNodeNum } returns myNodeNumFlow
294298
processor = createProcessor(backgroundScope)
295299
isNodeDbReady.value = true
296300

@@ -305,7 +309,48 @@ class MeshMessageProcessorImplTest {
305309
processor.handleReceivedMeshPacket(packet, null)
306310
advanceUntilIdle()
307311

308-
// emitMeshPacket should still be called, but node updates should be skipped
312+
// Buffered, not processed: storing now could key a local packet under its raw from_num and orphan it from
313+
// per-node chart queries. Neither the DB insert nor the downstream emit should have happened yet.
314+
verifySuspend(mode = VerifyMode.not) { meshLogRepository.insert(any()) }
315+
verifySuspend(mode = VerifyMode.not) { serviceRepository.emitMeshPacket(any()) }
316+
317+
// Once myNodeNum resolves, the buffer flushes and the packet is processed (stored + emitted).
318+
myNodeNumFlow.value = 4321
319+
advanceUntilIdle()
320+
verifySuspend { meshLogRepository.insert(any()) }
321+
verifySuspend { serviceRepository.emitMeshPacket(any()) }
322+
}
323+
324+
@Test
325+
fun `buffer survives a reconnect toggle and flushes only once myNodeNum resolves`() = runTest(testDispatcher) {
326+
val myNodeNumFlow = MutableStateFlow<Int?>(null)
327+
every { nodeManager.myNodeNum } returns myNodeNumFlow
328+
processor = createProcessor(backgroundScope)
329+
isNodeDbReady.value = true // DB ready, our node number still unknown
330+
331+
val packet =
332+
MeshPacket(
333+
id = 11,
334+
from = 999,
335+
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
336+
rx_time = 1700000000,
337+
)
338+
processor.handleReceivedMeshPacket(packet, null)
339+
advanceUntilIdle()
340+
verifySuspend(mode = VerifyMode.not) { serviceRepository.emitMeshPacket(any()) }
341+
342+
// Reconnect: isNodeDbReady toggles false -> true while myNodeNum stays null. The packet must stay buffered
343+
//
344+
// flushing now would key it under its raw from_num.
345+
isNodeDbReady.value = false
346+
advanceUntilIdle()
347+
isNodeDbReady.value = true
348+
advanceUntilIdle()
349+
verifySuspend(mode = VerifyMode.not) { serviceRepository.emitMeshPacket(any()) }
350+
351+
// Only once myNodeNum finally resolves does the buffer flush.
352+
myNodeNumFlow.value = 4321
353+
advanceUntilIdle()
309354
verifySuspend { serviceRepository.emitMeshPacket(any()) }
310355
}
311356

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Copyright (c) 2026 Meshtastic LLC
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
*/
17+
package org.meshtastic.core.data.repository
18+
19+
import dev.mokkery.MockMode
20+
import dev.mokkery.answering.returns
21+
import dev.mokkery.every
22+
import dev.mokkery.mock
23+
import kotlinx.coroutines.flow.MutableStateFlow
24+
import kotlinx.coroutines.flow.first
25+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
26+
import kotlinx.coroutines.test.advanceUntilIdle
27+
import kotlinx.coroutines.test.runTest
28+
import okio.ByteString.Companion.toByteString
29+
import org.meshtastic.core.data.datasource.NodeInfoReadDataSource
30+
import org.meshtastic.core.data.manager.MeshMessageProcessorImpl
31+
import org.meshtastic.core.database.entity.MyNodeEntity
32+
import org.meshtastic.core.di.CoroutineDispatchers
33+
import org.meshtastic.core.model.MeshLog
34+
import org.meshtastic.core.repository.FromRadioPacketHandler
35+
import org.meshtastic.core.repository.MeshDataHandler
36+
import org.meshtastic.core.repository.NodeManager
37+
import org.meshtastic.core.repository.ServiceStateWriter
38+
import org.meshtastic.core.testing.FakeDatabaseProvider
39+
import org.meshtastic.core.testing.FakeMeshLogPrefs
40+
import org.meshtastic.proto.AirQualityMetrics
41+
import org.meshtastic.proto.Data
42+
import org.meshtastic.proto.FromRadio
43+
import org.meshtastic.proto.MeshPacket
44+
import org.meshtastic.proto.PortNum
45+
import org.meshtastic.proto.Telemetry
46+
import kotlin.test.AfterTest
47+
import kotlin.test.Test
48+
import kotlin.test.assertEquals
49+
import kotlin.test.assertTrue
50+
51+
/**
52+
* Repro for the field report: a node's air-quality telemetry shows up in the in-app Debug log but never appears in the
53+
* Air Quality chart (PR #5701).
54+
*
55+
* Uses Brian's real packet: a TRANSPORT_INTERNAL TELEMETRY_APP packet from the locally-connected node (num [localNum])
56+
* carrying non-zero PM (pm10_standard=1, pm25_standard=2, pm100_standard=2).
57+
*
58+
* The chart reads [MeshLogRepositoryImpl.getTelemetryFrom], which resolves the viewed node through `effectiveLogId` to
59+
* [MeshLog.NODE_NUM_LOCAL] and filters `WHERE from_num = :fromNum`. The Debug screen reads the unfiltered
60+
* `getAllLogsUnbounded`. So the chart is sensitive to the stored `from_num` column; the Debug log is not.
61+
*
62+
* `MeshMessageProcessorImpl.processReceivedMeshPacket` stores `fromNum = if (packet.from == myNodeNum) NODE_NUM_LOCAL
63+
* else packet.from`, where the insert-time `myNodeNum` is `nodeManager.myNodeNum.value` read at packet arrival
64+
* (MeshServiceOrchestrator). That StateFlow starts null and is only set once MyNodeInfo is processed, so a local packet
65+
* that arrives during the null window is stored under its raw `from_num` and orphaned from the chart while still
66+
* visible in the Debug log.
67+
*/
68+
class AirQualityChartReproTest {
69+
70+
private lateinit var dbProvider: FakeDatabaseProvider
71+
private lateinit var meshLogPrefs: FakeMeshLogPrefs
72+
private lateinit var nodeInfoReadDataSource: NodeInfoReadDataSource
73+
private val testDispatcher = UnconfinedTestDispatcher()
74+
private val dispatchers = CoroutineDispatchers(main = testDispatcher, io = testDispatcher, default = testDispatcher)
75+
private lateinit var repository: MeshLogRepositoryImpl
76+
77+
private val nowMillis = 1_000_000_000L
78+
79+
/** Brian's connected node number, taken verbatim from his Debug log (`from=-93009324`). */
80+
private val localNum = -93009324
81+
82+
private fun setup(myNodeNum: Int?) {
83+
dbProvider = FakeDatabaseProvider()
84+
meshLogPrefs = FakeMeshLogPrefs().apply { setLoggingEnabled(true) }
85+
nodeInfoReadDataSource = mock(MockMode.autofill)
86+
every { nodeInfoReadDataSource.myNodeInfoFlow() } returns MutableStateFlow(myNodeNum?.let(::myNodeEntity))
87+
repository = MeshLogRepositoryImpl(dbProvider, dispatchers, meshLogPrefs, nodeInfoReadDataSource)
88+
}
89+
90+
@AfterTest
91+
fun tearDown() {
92+
if (::dbProvider.isInitialized) dbProvider.close()
93+
}
94+
95+
private fun myNodeEntity(num: Int) = MyNodeEntity(
96+
myNodeNum = num,
97+
model = "model",
98+
firmwareVersion = "1.0",
99+
couldUpdate = false,
100+
shouldUpdate = false,
101+
currentPacketId = 0L,
102+
messageTimeoutMsec = 0,
103+
minAppVersion = 0,
104+
maxChannels = 0,
105+
hasWifi = false,
106+
)
107+
108+
/** Brian's real reading: pm10_standard=1, pm25_standard=2, pm100_standard=2. */
109+
private fun brianAirQualityTelemetry() = Telemetry(
110+
air_quality_metrics =
111+
AirQualityMetrics(
112+
pm10_standard = 1,
113+
pm25_standard = 2,
114+
pm100_standard = 2,
115+
pm10_environmental = 1,
116+
pm25_environmental = 2,
117+
pm100_environmental = 2,
118+
),
119+
)
120+
121+
private fun airQualityPacket() = MeshPacket(
122+
from = localNum,
123+
rx_time = 1_700_000_000,
124+
decoded =
125+
Data(payload = brianAirQualityTelemetry().encode().toByteString(), portnum = PortNum.TELEMETRY_APP),
126+
)
127+
128+
private fun airQualityLog(fromNum: Int) = MeshLog(
129+
uuid = "aq-$fromNum",
130+
message_type = "Packet",
131+
received_date = nowMillis,
132+
raw_message = "",
133+
fromNum = fromNum,
134+
portNum = PortNum.TELEMETRY_APP.value,
135+
fromRadio = FromRadio(packet = airQualityPacket()),
136+
)
137+
138+
/** Checkpoint 1: the parse + query round-trip preserves the air-quality payload (rules out content loss). */
139+
@Test
140+
fun `checkpoint 1 - air quality payload survives the telemetry round-trip`() = runTest(testDispatcher) {
141+
setup(myNodeNum = null) // effectiveLogId(0) -> 0
142+
repository.insert(airQualityLog(fromNum = 0))
143+
144+
val result = repository.getTelemetryFrom(0).first()
145+
146+
assertEquals(1, result.size, "the air-quality telemetry row should round-trip")
147+
assertEquals(2, result[0].air_quality_metrics?.pm25_standard, "pm25_standard must survive decode")
148+
}
149+
150+
/** Checkpoint 2: local-node air-quality stored under NODE_NUM_LOCAL is returned to the chart (the happy path). */
151+
@Test
152+
fun `checkpoint 2 - local AQ stored under NODE_NUM_LOCAL is charted`() = runTest(testDispatcher) {
153+
setup(myNodeNum = localNum) // viewing the local node -> effectiveLogId -> NODE_NUM_LOCAL
154+
// myNodeNum known at insert -> processReceivedMeshPacket would store NODE_NUM_LOCAL.
155+
repository.insert(airQualityLog(fromNum = MeshLog.NODE_NUM_LOCAL))
156+
157+
val result = repository.getTelemetryFrom(localNum).first()
158+
159+
assertEquals(1, result.size, "AQ stored under NODE_NUM_LOCAL should be visible to the local node's chart")
160+
}
161+
162+
/**
163+
* Checkpoint 3 (query invariant — the rationale for the insert-side fix): the per-node chart query keys the local
164+
* node on NODE_NUM_LOCAL, so a row stored under the raw myNodeNum is not returned (though the unfiltered Debug log
165+
* still shows it). This is *why* the insert path must key local packets under NODE_NUM_LOCAL — verified in
166+
* checkpoint 4. This query behavior is intentional and unchanged by the fix.
167+
*/
168+
@Test
169+
fun `checkpoint 3 - query keys local node on NODE_NUM_LOCAL not the raw from_num`() = runTest(testDispatcher) {
170+
setup(myNodeNum = localNum) // viewing the local node -> effectiveLogId -> NODE_NUM_LOCAL
171+
repository.insert(airQualityLog(fromNum = localNum)) // a hypothetical mis-keyed row
172+
173+
// Debug screen (unfiltered) sees it:
174+
assertEquals(
175+
1,
176+
repository.getAllLogsUnbounded().first().size,
177+
"Debug log shows rows regardless of from_num",
178+
)
179+
180+
// Chart query (from_num = NODE_NUM_LOCAL) does not — hence the insert must never produce a raw-keyed local
181+
// row:
182+
val charted = repository.getTelemetryFrom(localNum).first()
183+
assertTrue(charted.isEmpty(), "the local-node query only matches NODE_NUM_LOCAL")
184+
}
185+
186+
/**
187+
* Checkpoint 4 (regression test for the fix, end-to-end through the real insert path): a local air-quality packet
188+
* received while myNodeNum is still null is BUFFERED (not stored under its raw from_num). Once myNodeNum resolves,
189+
* the buffer flushes and the packet is stored under NODE_NUM_LOCAL, so the local node's Air Quality chart sees it.
190+
*
191+
* Before the fix this packet was stored immediately under its raw from_num and orphaned from the chart.
192+
*/
193+
@Test
194+
fun `checkpoint 4 - local AQ received before myNodeNum resolves is buffered then charted`() =
195+
runTest(testDispatcher) {
196+
setup(myNodeNum = localNum) // query side: phone is connected to localNum
197+
198+
val myNodeNumFlow = MutableStateFlow<Int?>(null) // not yet resolved
199+
val nodeManager = mock<NodeManager>(MockMode.autofill)
200+
every { nodeManager.isNodeDbReady } returns MutableStateFlow(true)
201+
every { nodeManager.myNodeNum } returns myNodeNumFlow
202+
203+
val processor =
204+
MeshMessageProcessorImpl(
205+
nodeManager = nodeManager,
206+
serviceStateWriter = mock<ServiceStateWriter>(MockMode.autofill),
207+
meshLogRepository = lazy { repository },
208+
dataHandler = lazy { mock<MeshDataHandler>(MockMode.autofill) },
209+
fromRadioDispatcher = mock<FromRadioPacketHandler>(MockMode.autofill),
210+
scope = backgroundScope,
211+
)
212+
213+
// Arrives before MyNodeInfo resolves -> buffered, NOT written to the log table (so not orphaned in the DB).
214+
processor.handleReceivedMeshPacket(airQualityPacket(), myNodeNum = null)
215+
advanceUntilIdle()
216+
assertEquals(0, repository.getAllLogsUnbounded().first().size, "packet should be buffered, not yet stored")
217+
218+
// MyNodeInfo resolves -> the buffer flushes and the packet is stored under NODE_NUM_LOCAL.
219+
myNodeNumFlow.value = localNum
220+
advanceUntilIdle()
221+
222+
val charted = repository.getTelemetryFrom(localNum).first()
223+
assertEquals(1, charted.size, "after myNodeNum resolves, the local AQ packet must reach the chart")
224+
assertEquals(
225+
2,
226+
charted[0].air_quality_metrics?.pm25_standard,
227+
"the real reading (pm25_standard=2) survives",
228+
)
229+
}
230+
}

0 commit comments

Comments
 (0)