Skip to content

Commit bca6f20

Browse files
Internally managed coroutine scope for pollers (#1991)
1 parent 77b7c61 commit bca6f20

7 files changed

Lines changed: 40 additions & 69 deletions

File tree

app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/BasePoller.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package org.session.libsession.messaging.sending_receiving.pollers
33
import kotlinx.coroutines.CancellationException
44
import kotlinx.coroutines.CoroutineScope
55
import kotlinx.coroutines.Deferred
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.SupervisorJob
68
import kotlinx.coroutines.async
9+
import kotlinx.coroutines.cancel
710
import kotlinx.coroutines.channels.Channel
811
import kotlinx.coroutines.channels.SendChannel
912
import kotlinx.coroutines.delay
@@ -39,8 +42,8 @@ abstract class BasePoller<T>(
3942
private val debugLabel: String,
4043
private val networkConnectivity: NetworkConnectivity,
4144
private val appVisibilityManager: AppVisibilityManager,
42-
private val scope: CoroutineScope,
4345
) {
46+
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
4447

4548
private val manualPollRequestSender: SendChannel<PollRequestCallback<T>>
4649

@@ -80,8 +83,8 @@ abstract class BasePoller<T>(
8083
pollOnce(pollReason)
8184
}.onSuccess { numConsecutiveFailures = 0 }
8285
.onFailure {
83-
if (it is CancellationException) {
84-
logE("Polling cancelled", it)
86+
if (it is PollerCancelledException) {
87+
log("Polling cancelled", it)
8588
throw it
8689
}
8790
numConsecutiveFailures += 1
@@ -97,6 +100,12 @@ abstract class BasePoller<T>(
97100
}
98101
}
99102

103+
private class PollerCancelledException : CancellationException("Poller is cancelled")
104+
105+
fun cancel() {
106+
scope.cancel(PollerCancelledException())
107+
}
108+
100109
private fun waitForRoutinePoll(minStartAt: TimeMark?): Deferred<Unit> {
101110
return scope.async {
102111
combine(

app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,9 @@ class OpenGroupPoller @AssistedInject constructor(
5858
appVisibilityManager: AppVisibilityManager,
5959
private val json: Json,
6060
@Assisted private val server: String,
61-
@Assisted private val scope: CoroutineScope,
6261
@Assisted private val pollerSemaphore: Semaphore,
6362
): BasePoller<Unit>(
6463
networkConnectivity = networkConnectivity,
65-
scope = scope,
6664
appVisibilityManager = appVisibilityManager,
6765
debugLabel = "OpenGroupPoller($server)"
6866
) {
@@ -336,7 +334,6 @@ class OpenGroupPoller @AssistedInject constructor(
336334
interface Factory {
337335
fun create(
338336
server: String,
339-
scope: CoroutineScope,
340337
pollerSemaphore: Semaphore
341338
): OpenGroupPoller
342339
}

app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerManager.kt

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package org.session.libsession.messaging.sending_receiving.pollers
22

33
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.Dispatchers
5-
import kotlinx.coroutines.SupervisorJob
64
import kotlinx.coroutines.async
75
import kotlinx.coroutines.awaitAll
8-
import kotlinx.coroutines.cancel
96
import kotlinx.coroutines.flow.SharingStarted
107
import kotlinx.coroutines.flow.StateFlow
118
import kotlinx.coroutines.flow.distinctUntilChanged
@@ -48,7 +45,7 @@ class OpenGroupPollerManager @Inject constructor(
4845
) : OnAppStartupComponent {
4946
private val pollerSemaphore = Semaphore(3)
5047

51-
val pollers: StateFlow<Map<String, PollerHandle>> =
48+
val pollers: StateFlow<Map<String, OpenGroupPoller>> =
5249
loginStateRepository.flowWithLoggedInState {
5350
configFactory
5451
.userConfigsChanged(onlyConfigTypes = EnumSet.of(UserConfigType.USER_GROUPS))
@@ -61,25 +58,21 @@ class OpenGroupPollerManager @Inject constructor(
6158
}
6259
}
6360
.distinctUntilChanged()
64-
.scan(emptyMap<String, PollerHandle>()) { acc, value ->
61+
.scan(emptyMap<String, OpenGroupPoller>()) { acc, value ->
6562
if (acc.keys == value) {
6663
acc // No change, return the same map
6764
} else {
6865
val newPollerStates = value.associateWith { baseUrl ->
6966
acc[baseUrl] ?: run {
70-
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
7167
Log.d(TAG, "Creating new poller for $baseUrl")
72-
PollerHandle(
73-
poller = pollerFactory.create(baseUrl, scope, pollerSemaphore),
74-
pollerScope = scope
75-
)
68+
pollerFactory.create(baseUrl, pollerSemaphore)
7669
}
7770
}
7871

79-
for ((baseUrl, handle) in acc) {
72+
for ((baseUrl, poller) in acc) {
8073
if (baseUrl !in value) {
8174
Log.d(TAG, "Stopping poller for $baseUrl")
82-
handle.pollerScope.cancel()
75+
poller.cancel()
8376
}
8477
}
8578

@@ -90,23 +83,18 @@ class OpenGroupPollerManager @Inject constructor(
9083

9184
val isAllCaughtUp: Boolean
9285
get() = pollers.value.values.all {
93-
it.poller.pollState.value is BasePoller.PollState.Polled
86+
it.pollState.value is BasePoller.PollState.Polled
9487
}
9588

9689

9790
suspend fun pollAllOpenGroupsOnce() {
9891
Log.d(TAG, "Polling all open groups once")
9992
supervisorScope {
100-
pollers.value.map { (_, handle) ->
93+
pollers.value.map { (_, poller) ->
10194
async {
102-
handle.poller.manualPollOnce()
95+
poller.manualPollOnce()
10396
}
10497
}.awaitAll()
10598
}
10699
}
107-
108-
data class PollerHandle(
109-
val poller: OpenGroupPoller,
110-
val pollerScope: CoroutineScope
111-
)
112100
}

app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package org.session.libsession.messaging.sending_receiving.pollers
22

3-
import dagger.assisted.Assisted
4-
import dagger.assisted.AssistedFactory
5-
import dagger.assisted.AssistedInject
63
import kotlinx.coroutines.CancellationException
7-
import kotlinx.coroutines.CoroutineScope
84
import kotlinx.coroutines.async
95
import kotlinx.coroutines.launch
106
import kotlinx.coroutines.supervisorScope
@@ -40,10 +36,11 @@ import org.thoughtcrime.securesms.preferences.PreferenceKey
4036
import org.thoughtcrime.securesms.preferences.PreferenceStorage
4137
import org.thoughtcrime.securesms.util.AppVisibilityManager
4238
import org.thoughtcrime.securesms.util.NetworkConnectivity
39+
import javax.inject.Inject
4340
import kotlin.time.Duration.Companion.days
4441
import kotlin.time.Duration.Companion.seconds
4542

46-
class Poller @AssistedInject constructor(
43+
class Poller @Inject constructor(
4744
private val configFactory: ConfigFactoryProtocol,
4845
private val storage: StorageProtocol,
4946
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
@@ -60,11 +57,9 @@ class Poller @AssistedInject constructor(
6057
private val swarmDirectory: SwarmDirectory,
6158
private val snodeApiExecutor: SnodeApiExecutor,
6259
appVisibilityManager: AppVisibilityManager,
63-
@Assisted scope: CoroutineScope
6460
) : BasePoller<Unit>(
6561
debugLabel = "MainPoller",
6662
networkConnectivity = networkConnectivity,
67-
scope = scope,
6863
appVisibilityManager = appVisibilityManager
6964
) {
7065
private val userPublicKey: String
@@ -76,11 +71,6 @@ class Poller @AssistedInject constructor(
7671
}
7772

7873

79-
@AssistedFactory
80-
interface Factory {
81-
fun create(scope: CoroutineScope): Poller
82-
}
83-
8474
override suspend fun doPollOnce(isFirstPollSinceAppStarted: Boolean) {
8575
// Migrate to multipart config when needed
8676
if (isFirstPollSinceAppStarted && !prefs[hasMigratedToMultiPartConfigKey]) {

app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/PollerManager.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package org.session.libsession.messaging.sending_receiving.pollers
22

3-
import kotlinx.coroutines.coroutineScope
43
import kotlinx.coroutines.flow.MutableStateFlow
54
import kotlinx.coroutines.flow.filterNotNull
65
import kotlinx.coroutines.flow.first
76
import org.thoughtcrime.securesms.auth.AuthAwareComponent
87
import org.thoughtcrime.securesms.auth.LoggedInState
98
import javax.inject.Inject
9+
import javax.inject.Provider
1010
import javax.inject.Singleton
1111

1212
/**
@@ -17,15 +17,19 @@ import javax.inject.Singleton
1717
*/
1818
@Singleton
1919
class PollerManager @Inject constructor(
20-
private val provider: Poller.Factory,
20+
private val provider: Provider<Poller>,
2121
) : AuthAwareComponent {
2222
private val currentPoller = MutableStateFlow<Poller?>(null)
2323

24-
override suspend fun doWhileLoggedIn(loggedInState: LoggedInState): Unit = coroutineScope {
25-
val poller = provider.create(this)
24+
override suspend fun doWhileLoggedIn(loggedInState: LoggedInState) {
25+
val poller = provider.get()
2626
currentPoller.value = poller
2727
}
2828

29+
override fun onLoggedOut() {
30+
currentPoller.value?.cancel()
31+
}
32+
2933

3034
val isPolling: Boolean
3135
get() = currentPoller.value?.pollState?.value is BasePoller.PollState.Polling

app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.groups
33
import dagger.assisted.Assisted
44
import dagger.assisted.AssistedFactory
55
import dagger.assisted.AssistedInject
6-
import kotlinx.coroutines.CoroutineScope
76
import kotlinx.coroutines.Deferred
87
import kotlinx.coroutines.async
98
import kotlinx.coroutines.awaitAll
@@ -39,7 +38,6 @@ import kotlin.coroutines.cancellation.CancellationException
3938
import kotlin.time.Duration.Companion.days
4039

4140
class GroupPoller @AssistedInject constructor(
42-
@Assisted scope: CoroutineScope,
4341
@Assisted private val groupId: AccountId,
4442
@Assisted private val pollSemaphore: Semaphore,
4543
private val configFactoryProtocol: ConfigFactoryProtocol,
@@ -58,7 +56,6 @@ class GroupPoller @AssistedInject constructor(
5856
): BasePoller<GroupPoller.GroupPollResult>(
5957
networkConnectivity = networkConnectivity,
6058
appVisibilityManager = appVisibilityManager,
61-
scope = scope,
6259
debugLabel = "GroupPoller(${groupId.truncatedForDisplay()})"
6360
) {
6461
data class GroupPollResult(
@@ -337,6 +334,6 @@ class GroupPoller @AssistedInject constructor(
337334

338335
@AssistedFactory
339336
interface Factory {
340-
fun create(scope: CoroutineScope, groupId: AccountId, pollSemaphore: Semaphore): GroupPoller
337+
fun create(groupId: AccountId, pollSemaphore: Semaphore): GroupPoller
341338
}
342339
}

app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package org.thoughtcrime.securesms.groups
22

33
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.Dispatchers
54
import kotlinx.coroutines.ExperimentalCoroutinesApi
6-
import kotlinx.coroutines.SupervisorJob
75
import kotlinx.coroutines.async
86
import kotlinx.coroutines.awaitAll
9-
import kotlinx.coroutines.cancel
107
import kotlinx.coroutines.flow.Flow
118
import kotlinx.coroutines.flow.SharingStarted
129
import kotlinx.coroutines.flow.StateFlow
@@ -64,7 +61,7 @@ class GroupPollerManager @Inject constructor(
6461
private val groupPollerSemaphore = Semaphore(20)
6562

6663
@Suppress("OPT_IN_USAGE")
67-
private val groupPollers: StateFlow<Map<AccountId, GroupPollerHandle>> =
64+
private val groupPollers: StateFlow<Map<AccountId, GroupPoller>> =
6865
combine(
6966
connectivity.networkAvailable.debounce(200L),
7067
loginStateRepository.loggedInState,
@@ -95,12 +92,12 @@ class GroupPollerManager @Inject constructor(
9592
// This scan compares the previous active group pollers with the incoming set of groups
9693
// that should be polled now, to work out which pollers should be started or stopped,
9794
// and finally emits the new state
98-
.scan(emptyMap<AccountId, GroupPollerHandle>()) { previous, newActiveGroupIDs ->
95+
.scan(emptyMap<AccountId, GroupPoller>()) { previous, newActiveGroupIDs ->
9996
// Go through previous pollers and stop those that are not in the new set
10097
for ((groupId, poller) in previous) {
10198
if (groupId !in newActiveGroupIDs) {
10299
Log.d(TAG, "Stopping poller for $groupId")
103-
poller.scope.cancel()
100+
poller.cancel()
104101
}
105102
}
106103

@@ -111,14 +108,9 @@ class GroupPollerManager @Inject constructor(
111108

112109
if (poller == null) {
113110
Log.d(TAG, "Starting poller for $groupId")
114-
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
115-
poller = GroupPollerHandle(
116-
poller = pollFactory.create(
117-
scope = scope,
118-
groupId = groupId,
119-
pollSemaphore = groupPollerSemaphore,
120-
),
121-
scope = scope
111+
poller = pollFactory.create(
112+
groupId = groupId,
113+
pollSemaphore = groupPollerSemaphore,
122114
)
123115
}
124116

@@ -133,7 +125,7 @@ class GroupPollerManager @Inject constructor(
133125
fun watchGroupPollingState(groupId: AccountId): Flow<BasePoller.PollState<GroupPoller.GroupPollResult>> {
134126
return groupPollers
135127
.flatMapLatest { pollers ->
136-
pollers[groupId]?.poller?.pollState ?: flowOf(BasePoller.PollState.Idle)
128+
pollers[groupId]?.pollState ?: flowOf(BasePoller.PollState.Idle)
137129
}
138130
.distinctUntilChanged()
139131
}
@@ -145,7 +137,7 @@ class GroupPollerManager @Inject constructor(
145137
// Merge all poller states into a single flow of (groupId, state) pairs
146138
merge(
147139
*pollers
148-
.map { (id, poller) -> poller.poller.pollState.map { state -> id to state } }
140+
.map { (id, poller) -> poller.pollState.map { state -> id to state } }
149141
.toTypedArray()
150142
)
151143
}
@@ -155,7 +147,7 @@ class GroupPollerManager @Inject constructor(
155147
supervisorScope {
156148
groupPollers.value.values.map {
157149
async {
158-
it.poller.manualPollOnce()
150+
it.manualPollOnce()
159151
}
160152
}.awaitAll()
161153
}
@@ -170,15 +162,9 @@ class GroupPollerManager @Inject constructor(
170162
suspend fun pollOnce(groupId: AccountId): GroupPoller.GroupPollResult {
171163
return groupPollers.mapNotNull { it[groupId] }
172164
.first()
173-
.poller
174165
.manualPollOnce()
175166
}
176167

177-
data class GroupPollerHandle(
178-
val poller: GroupPoller,
179-
val scope: CoroutineScope,
180-
)
181-
182168
companion object {
183169
private const val TAG = "GroupPollerHandler"
184170
}

0 commit comments

Comments
 (0)