@@ -26,6 +26,7 @@ import com.github.umercodez.sensorspot.data.sensoreventprovider.SensorEventProvi
2626import kotlinx.coroutines.CoroutineDispatcher
2727import kotlinx.coroutines.CoroutineScope
2828import kotlinx.coroutines.Dispatchers
29+ import kotlinx.coroutines.Job
2930import kotlinx.coroutines.flow.MutableSharedFlow
3031import kotlinx.coroutines.flow.SharedFlow
3132import kotlinx.coroutines.flow.asSharedFlow
@@ -67,6 +68,8 @@ class SensorPublisher(
6768 private var mqttAsyncClient: MqttAsyncClient ? = null
6869 private var memoryPersistence: MemoryPersistence ? = null
6970 private var connectionOptions: MqttConnectionOptions ? = null
71+ private var sensorEventCollectionJob: Job ? = null
72+ private var gpsDataCollectionJob: Job ? = null
7073
7174 private val clock = Clock ()
7275 val elapsedTime : SharedFlow <ElapsedTime > get() = clock.time
@@ -92,13 +95,15 @@ class SensorPublisher(
9295 }
9396 suspend fun connectAndPublish (mqttConfig : MqttConfig ) = withContext(ioDispatcher){
9497
95- scope.launch {
96- sensorEventProvider.events.collect{ sensorEvent ->
98+ sensorEventCollectionJob = scope.launch {
99+ sensorEventProvider.events.collect { sensorEvent ->
97100
98101 try {
99102
100- if (mqttAsyncClient?.isConnected == true ) {
101- val message = MqttMessage (sensorEvent.toJson(! mqttConfig.dedicatedTopics).toByteArray()).apply {
103+ if (mqttAsyncClient?.isConnected == true ) {
104+ val message = MqttMessage (
105+ sensorEvent.toJson(! mqttConfig.dedicatedTopics).toByteArray()
106+ ).apply {
102107 qos = mqttConfig.qos
103108 }
104109 mqttAsyncClient?.publish(getTopic(mqttConfig, sensorEvent.type), message)
@@ -110,13 +115,15 @@ class SensorPublisher(
110115 }
111116 }
112117
113- scope.launch {
118+ gpsDataCollectionJob = scope.launch {
114119 gpsDataProvider.gpsData.collect { gpsData ->
115120
116121 try {
117122
118- if (mqttAsyncClient?.isConnected == true ) {
119- val message = MqttMessage (gpsData.toJson(! mqttConfig.dedicatedTopics).toByteArray()).apply {
123+ if (mqttAsyncClient?.isConnected == true ) {
124+ val message = MqttMessage (
125+ gpsData.toJson(! mqttConfig.dedicatedTopics).toByteArray()
126+ ).apply {
120127 qos = mqttConfig.qos
121128 }
122129 mqttAsyncClient?.publish(getTopic(mqttConfig, gpsData.type), message)
@@ -245,6 +252,8 @@ class SensorPublisher(
245252 _mqttConnectionState .emit(MqttConnectionState .Disconnected )
246253 sensorEventProvider.stopProvidingEvents()
247254 gpsDataProvider.stopProvidingGpsData()
255+ sensorEventCollectionJob?.cancel()
256+ gpsDataCollectionJob?.cancel()
248257 clock.reset()
249258
250259 } catch (e: Exception ) {
0 commit comments