Skip to content

Commit 66c74f1

Browse files
DoomsCoderdavid-allison
authored andcommitted
feat: decouple consumer/producer in ChangeManager
Primarily designed for the API, where `suspend` methods are not feasible in ContentProviders. Add a synchronous 'publish' method, and use a Channel to decouple the producer and consumer. Fixes 21102 Fixes part of 20907 Assisted-by: Claude Opus 4.7 - self-review Co-authored-by: David Allison <62114487+david-allison@users.noreply.github.com>
1 parent d4232c8 commit 66c74f1

4 files changed

Lines changed: 168 additions & 14 deletions

File tree

AnkiDroid/src/main/java/com/ichi2/anki/observability/ChangeManager.kt

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import anki.collection.OpChangesWithId
3939
import anki.collection.opChanges
4040
import anki.import_export.ImportResponse
4141
import com.ichi2.anki.common.crashreporting.CrashReportService
42+
import com.ichi2.anki.observability.ChangeManager.publish
43+
import com.ichi2.anki.observability.ChangeManager.toOpChanges
4244
import com.ichi2.anki.utils.ext.ifNotZero
4345
import org.jetbrains.annotations.Contract
4446
import timber.log.Timber
@@ -69,6 +71,30 @@ object ChangeManager {
6971

7072
val subscriberCount get() = subscribers.size
7173

74+
/**
75+
* Delivers changes to [subscribers][ChangeManager.subscribers] off the caller's thread.
76+
*
77+
* @see publish
78+
*/
79+
private val publisher = OpChangesPublisher { changes, handler -> notifySubscribers(changes, handler) }
80+
81+
/**
82+
* Notifies `subscribers` of `changes` without blocking the caller.
83+
*
84+
* Unlike `notifySubscribers`, the caller **does not** wait for subscribers to complete.
85+
*
86+
* This may be called from any thread.
87+
*
88+
* @param changes a raw/wrapped backend [OpChanges] response (see [ChangeManager.toOpChanges])
89+
* @param handler the initiator of the change, or `null`. See [Subscriber.opExecuted]
90+
*/
91+
fun <T : Any> publish(
92+
changes: T,
93+
handler: Any? = null,
94+
) {
95+
publisher.publish(changes.toOpChanges(), handler)
96+
}
97+
7298
/**
7399
* Subscribes to changes.
74100
*
@@ -143,7 +169,9 @@ object ChangeManager {
143169
}
144170

145171
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
172+
@Synchronized
146173
fun resetForTesting() {
174+
publisher.reset()
147175
subscribers.size.ifNotZero { size -> Timber.d("clearing %d subscribers", size) }
148176
subscribers.clear()
149177
}
@@ -152,19 +180,26 @@ object ChangeManager {
152180
changes: T,
153181
initiator: Any?,
154182
) {
155-
val opChanges =
156-
when (changes) {
157-
is OpChanges -> changes
158-
is OpChangesWithCount -> changes.changes
159-
is OpChangesWithId -> changes.changes
160-
is OpChangesAfterUndo -> changes.changes
161-
is OpChangesOnly -> changes.changes
162-
is ImportResponse -> changes.changes
163-
else -> TODO("unhandled change type of class '${changes::class}'")
164-
}
165-
notifySubscribers(opChanges, initiator)
183+
notifySubscribers(changes.toOpChanges(), initiator)
166184
}
167185

186+
/**
187+
* Extracts an [OpChanges] from a backend response.
188+
*
189+
* @throws NotImplementedError if the receiver is not a backend OpChanges type.
190+
*/
191+
// TODO: See if we can add a marker interface
192+
private fun <T : Any> T.toOpChanges(): OpChanges =
193+
when (this) {
194+
is OpChanges -> this
195+
is OpChangesWithCount -> changes
196+
is OpChangesWithId -> changes
197+
is OpChangesAfterUndo -> changes
198+
is OpChangesOnly -> changes
199+
is ImportResponse -> changes
200+
else -> TODO("unhandled change type of class '${this::class}'")
201+
}
202+
168203
fun notifySubscribersAllValuesChanged(handler: Any? = null) {
169204
notifySubscribers(ALL, handler)
170205
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// SPDX-License-Identifier: GPL-3.0-or-later
2+
3+
package com.ichi2.anki.observability
4+
5+
import anki.collection.OpChanges
6+
import kotlinx.coroutines.CoroutineScope
7+
import kotlinx.coroutines.Dispatchers
8+
import kotlinx.coroutines.SupervisorJob
9+
import kotlinx.coroutines.cancel
10+
import kotlinx.coroutines.channels.Channel
11+
import kotlinx.coroutines.launch
12+
13+
/**
14+
* Provides non-blocking delivery of [OpChanges] to the [ChangeManager] subscriber registry.
15+
*
16+
* Use this when `suspend` is not feasible (e.g. `ContentProvider`).
17+
*
18+
* [publish] queues a change onto [changeChannel] and returns immediately.
19+
*
20+
* @param deliver forwards a change to subscribers; invoked on [Dispatchers.Main].
21+
*/
22+
internal class OpChangesPublisher(
23+
private val deliver: (changes: OpChanges, handler: Any?) -> Unit,
24+
) {
25+
private data class ChangeEvent(
26+
val changes: OpChanges,
27+
val handler: Any?,
28+
)
29+
30+
/** CoroutineScope which handles the coroutine consuming [changeChannel]. */
31+
private var scope: CoroutineScope? = null
32+
private val changeChannel =
33+
Channel<ChangeEvent>(
34+
capacity = Channel.UNLIMITED,
35+
)
36+
37+
/** Ensures that [changeChannel] is being processed. */
38+
@Synchronized
39+
private fun ensureCollector() {
40+
if (scope != null) return
41+
scope =
42+
CoroutineScope(Dispatchers.Main + SupervisorJob()).also { s ->
43+
s.launch {
44+
for (event in changeChannel) {
45+
deliver(event.changes, event.handler)
46+
}
47+
}
48+
}
49+
}
50+
51+
/**
52+
* Queues [changes] for non-blocking delivery to subscribers.
53+
*
54+
* May be called on any thread.
55+
*
56+
* @see ChangeManager.publish
57+
*/
58+
fun publish(
59+
changes: OpChanges,
60+
handler: Any?,
61+
) {
62+
ensureCollector()
63+
changeChannel.trySend(ChangeEvent(changes, handler))
64+
}
65+
66+
/** Cancels the collector and discards any queued, undelivered changes. */
67+
@Synchronized
68+
fun reset() {
69+
scope?.cancel()
70+
scope = null
71+
changeChannel.drain()
72+
}
73+
}
74+
75+
private fun <T> Channel<T>.drain() {
76+
while (this.tryReceive().isSuccess) { /* intentionally empty */ }
77+
}

AnkiDroid/src/test/java/com/ichi2/anki/observability/ChangeManagerTest.kt

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,29 @@
1515
*/
1616
package com.ichi2.anki.observability
1717

18+
import androidx.test.ext.junit.runners.AndroidJUnit4
1819
import anki.collection.OpChanges
20+
import com.ichi2.testutils.EmptyApplication
1921
import com.ichi2.testutils.JvmTest
22+
import com.ichi2.testutils.subscriberChangeCounter
23+
import kotlinx.coroutines.Dispatchers
24+
import kotlinx.coroutines.test.StandardTestDispatcher
25+
import kotlinx.coroutines.test.advanceUntilIdle
26+
import kotlinx.coroutines.test.setMain
2027
import org.hamcrest.MatcherAssert.assertThat
2128
import org.hamcrest.Matchers.equalTo
2229
import org.hamcrest.Matchers.greaterThan
2330
import org.junit.Test
2431
import org.junit.runner.RunWith
2532
import org.junit.runners.Parameterized
26-
import org.robolectric.RobolectricTestRunner
33+
import org.robolectric.annotation.Config
2734
import kotlin.reflect.KProperty1
2835
import kotlin.reflect.full.memberProperties
2936
import kotlin.reflect.javaType
3037
import kotlin.reflect.jvm.isAccessible
3138

32-
@RunWith(RobolectricTestRunner::class)
39+
// this cannot yet use `EmptyApplication` - `CrashReportService` dependency
40+
@RunWith(AndroidJUnit4::class)
3341
class ChangeManagerExceptionHandlingTest : JvmTest() {
3442
@Test
3543
fun `all subscribers notified even if one throws exception`() {
@@ -100,3 +108,26 @@ class ChangeManagerTest {
100108
}
101109
}
102110
}
111+
112+
@RunWith(AndroidJUnit4::class)
113+
@Config(application = EmptyApplication::class)
114+
class ChangeManagerPublishTest : JvmTest() {
115+
@Test
116+
fun `publish notifies multiple subscribers asynchronously`() =
117+
runTest {
118+
// queue coroutine execution on Dispatchers.Main, to test 'not notified synchronously'
119+
Dispatchers.setMain(StandardTestDispatcher(testScheduler))
120+
121+
val counter1 = subscriberChangeCounter()
122+
val counter2 = subscriberChangeCounter()
123+
124+
ChangeManager.publish(ChangeManager.ALL)
125+
126+
assertThat("not notified synchronously by publish", counter1.hasChanges, equalTo(false))
127+
128+
advanceUntilIdle()
129+
130+
assertThat("First subscriber should be notified", counter1.hasChanges, equalTo(true))
131+
assertThat("Second subscriber should be notified", counter2.hasChanges, equalTo(true))
132+
}
133+
}

AnkiDroid/src/test/java/com/ichi2/testutils/TestChangeSubscriber.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ suspend fun ensureOpWithHandler(
6060
}
6161

6262
// used to ensure a strong reference to the subscription is held
63-
private class ChangeCounter : ChangeManager.Subscriber {
63+
internal class ChangeCounter : ChangeManager.Subscriber {
6464
private var changes = 0
6565
val changeCount get() = changes
6666
val hasChanges get() = changes > 0
@@ -85,3 +85,14 @@ private class ExtractOpHandler : ChangeManager.Subscriber {
8585
this.handler = handler
8686
}
8787
}
88+
89+
/**
90+
* Produces a [ChangeCounter] which is subscribed to [ChangeManager].
91+
*
92+
* Query the result via [.changeCount][ChangeCounter.changeCount] or [.hasChanges][ChangeCounter.hasChanges]
93+
*/
94+
internal fun subscriberChangeCounter(): ChangeCounter {
95+
val counter = ChangeCounter()
96+
ChangeManager.subscribe(counter)
97+
return counter
98+
}

0 commit comments

Comments
 (0)