Skip to content

Commit 067bd41

Browse files
amrfarid140solcott
andauthored
Avoid deadlock in RealMutableStore (#658)
* Add test case Signed-off-by: Amr Yousef <amr.farid140@gmail.com> * Always Release storeLock Signed-off-by: Amr Yousef <amr.farid140@gmail.com> * Update kermit to 2.0.4 (#655) Fixes #653 and #654 Signed-off-by: Scott Olcott <scottolcott@gmail.com> Signed-off-by: Amr Yousef <amr.farid140@gmail.com> * Revert "Update kermit to 2.0.4 (#655)" This reverts commit 76f34d4. Signed-off-by: Amr Yousef <amr.farid140@gmail.com> --------- Signed-off-by: Amr Yousef <amr.farid140@gmail.com> Signed-off-by: Scott Olcott <scottolcott@gmail.com> Co-authored-by: Scott Olcott <scottolcott@gmail.com>
1 parent 22bb733 commit 067bd41

2 files changed

Lines changed: 99 additions & 4 deletions

File tree

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,13 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
203203
block: suspend ThreadSafety.() -> Output,
204204
): Output {
205205
storeLock.lock()
206-
val threadSafety = requireNotNull(keyToThreadSafety[key])
207-
val output = threadSafety.block()
208-
storeLock.unlock()
209-
return output
206+
try {
207+
val threadSafety = requireNotNull(keyToThreadSafety[key])
208+
val output = threadSafety.block()
209+
return output
210+
} finally {
211+
storeLock.unlock()
212+
}
210213
}
211214

212215
private suspend fun conflictsMightExist(key: Key): Boolean {

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package org.mobilenativefoundation.store.store5
22

3+
import kotlinx.coroutines.CoroutineScope
4+
import kotlinx.coroutines.Dispatchers
35
import kotlinx.coroutines.ExperimentalCoroutinesApi
46
import kotlinx.coroutines.FlowPreview
7+
import kotlinx.coroutines.Job
8+
import kotlinx.coroutines.async
9+
import kotlinx.coroutines.awaitAll
10+
import kotlinx.coroutines.cancel
11+
import kotlinx.coroutines.flow.*
512
import kotlinx.coroutines.test.TestScope
613
import kotlinx.coroutines.test.runTest
714
import org.mobilenativefoundation.store.store5.impl.extensions.get
815
import kotlin.test.Test
916
import kotlin.test.assertEquals
17+
import kotlin.test.assertIs
18+
import kotlin.test.assertNotNull
1019
import kotlin.time.Duration.Companion.hours
1120

1221
@FlowPreview
@@ -39,4 +48,87 @@ class StoreWithInMemoryCacheTests {
3948
assertEquals("result", c)
4049
assertEquals("result", d)
4150
}
51+
52+
@Test
53+
fun storeDeadlock() =
54+
testScope.runTest {
55+
repeat(1000) {
56+
val store =
57+
StoreBuilder
58+
.from(
59+
fetcher = Fetcher.of { key: Int -> "fetcher_${key}" },
60+
sourceOfTruth = SourceOfTruth.Companion.of(
61+
reader = { key ->
62+
flow<String> {
63+
emit("source_of_truth_${key}")
64+
}
65+
},
66+
writer = { key: Int, local: String ->
67+
68+
}
69+
)
70+
)
71+
.disableCache()
72+
.toMutableStoreBuilder(
73+
converter = object : Converter<String, String, String> {
74+
override fun fromNetworkToLocal(network: String): String {
75+
return network
76+
}
77+
78+
override fun fromOutputToLocal(output: String): String {
79+
return output
80+
}
81+
},
82+
)
83+
.build(
84+
updater = object : Updater<Int, String, Unit> {
85+
var callCount = -1
86+
override suspend fun post(key: Int, value: String): UpdaterResult {
87+
callCount += 1
88+
if (callCount % 2 == 0) {
89+
throw IllegalArgumentException(key.toString() + "value:$value")
90+
} else {
91+
return UpdaterResult.Success.Untyped("")
92+
}
93+
}
94+
95+
override val onCompletion: OnUpdaterCompletion<Unit>?
96+
get() = null
97+
98+
}
99+
)
100+
101+
val jobs = mutableListOf<Job>()
102+
jobs.add(
103+
store.stream<Nothing>(StoreReadRequest.cached(1, refresh = true))
104+
.mapNotNull { it.dataOrNull() }
105+
.launchIn(CoroutineScope(Dispatchers.Default))
106+
)
107+
val job1 = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
108+
.mapNotNull { it.dataOrNull() }
109+
.launchIn(CoroutineScope(Dispatchers.Default))
110+
jobs.add(
111+
store.stream<Nothing>(StoreReadRequest.cached(2, refresh = true))
112+
.mapNotNull { it.dataOrNull() }
113+
.launchIn(CoroutineScope(Dispatchers.Default)))
114+
jobs.add(
115+
store.stream<Nothing>(StoreReadRequest.cached(3, refresh = true))
116+
.mapNotNull { it.dataOrNull() }
117+
.launchIn(CoroutineScope(Dispatchers.Default)))
118+
job1.cancel()
119+
assertEquals(
120+
expected = "source_of_truth_0",
121+
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
122+
.mapNotNull { it.dataOrNull() }.first()
123+
)
124+
jobs.forEach {
125+
it.cancel()
126+
assertEquals(
127+
expected = "source_of_truth_0",
128+
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
129+
.mapNotNull { it.dataOrNull() }.first()
130+
)
131+
}
132+
}
133+
}
42134
}

0 commit comments

Comments
 (0)