Skip to content
This repository was archived by the owner on Feb 9, 2024. It is now read-only.

Commit c2edf63

Browse files
author
Ben Asher
authored
Support IO-bound work with its own thread (#69)
1 parent 7f81011 commit c2edf63

6 files changed

Lines changed: 125 additions & 33 deletions

File tree

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ val coroutinesVersion = "1.3.9"
44
val atomicfuVersion = "0.14.4"
55

66
plugins {
7-
kotlin("multiplatform") version "1.4.0"
7+
kotlin("multiplatform") version "1.4.10"
88
id("org.jetbrains.dokka") version "0.10.0"
99
id("maven-publish")
1010
id("signing")

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION=0.6.1
1+
VERSION=0.6.2

src/nativeMain/kotlin/com/autodesk/coroutineworker/BackgroundCoroutineWorkQueueExecutor.kt

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ internal class BackgroundCoroutineWorkQueueExecutor<WorkItem : CoroutineWorkItem
4545
*/
4646
private val queueThread = Worker.start()
4747

48+
/**
49+
* Special worker for IO work
50+
*/
51+
private val ioWorker = Worker.start(name = ioWorkerName)
52+
4853
/**
4954
* The wrapped (allow freezing and mutable access on single thread) queue of WorkItems
5055
*/
@@ -84,32 +89,52 @@ internal class BackgroundCoroutineWorkQueueExecutor<WorkItem : CoroutineWorkItem
8489
}.result
8590

8691
/**
87-
* Queues an item to be executed
92+
* Queues an item to be executed in the general worker pool
8893
*/
89-
fun enqueueWork(item: WorkItem) = queueThread.executeAfter(
90-
operation = {
91-
queue.enqueue(item)
92-
// start a worker if we have more workers to start
93-
val activeWorkerCount = _numActiveWorkers.value
94-
if (activeWorkerCount < numWorkers) {
95-
pool.performWork {
94+
fun enqueueWork(item: WorkItem, isIoWork: Boolean) {
95+
if (isIoWork) {
96+
ioWorker.executeAfter(
97+
operation = {
9698
runBlocking {
97-
// error if we accidentally freeze coroutine internals
9899
this.ensureNeverFrozen()
99-
processWorkItems()
100+
performWorkHandlingExceptions(item, this)
100101
}
101-
}
102-
_numActiveWorkers.increment()
103-
}
104-
}.freeze()
105-
)
102+
}.freeze()
103+
)
104+
} else {
105+
queueThread.executeAfter(
106+
operation = {
107+
queue.enqueue(item)
108+
// start a worker if we have more workers to start
109+
val activeWorkerCount = _numActiveWorkers.value
110+
if (activeWorkerCount < numWorkers) {
111+
pool.performWork {
112+
runBlocking {
113+
// error if we accidentally freeze coroutine internals
114+
this.ensureNeverFrozen()
115+
processWorkItems(this)
116+
}
117+
}
118+
_numActiveWorkers.increment()
119+
}
120+
}.freeze()
121+
)
122+
}
123+
}
106124

107-
suspend fun CoroutineScope.processWorkItems() {
125+
private suspend fun processWorkItems(scope: CoroutineScope) {
108126
val workItem = dequeueWork() ?: return
109127

128+
performWorkHandlingExceptions(workItem, scope)
129+
130+
// execute a coroutine to attempt to process the next work item, if possible
131+
scope.launch { processWorkItems(scope) }
132+
}
133+
134+
private suspend fun performWorkHandlingExceptions(workItem: WorkItem, scope: CoroutineScope) {
110135
// Execute the work in a job that can be cancelled
111136
try {
112-
async {
137+
scope.async {
113138
workItem.work(this)
114139
}.await()
115140
} catch (_: CancellationException) {
@@ -122,9 +147,6 @@ internal class BackgroundCoroutineWorkQueueExecutor<WorkItem : CoroutineWorkItem
122147
throw e
123148
}
124149
}
125-
126-
// execute a coroutine to attempt to process the next work item, if possible
127-
launch { processWorkItems() }
128150
}
129151

130152
init { freeze() }
@@ -136,6 +158,16 @@ internal class BackgroundCoroutineWorkQueueExecutor<WorkItem : CoroutineWorkItem
136158
internal fun setUnhandledExceptionHook(handler: (Throwable) -> Unit) {
137159
UNHANDLED_EXCEPTION_HOOK.value = handler.freeze()
138160
}
161+
162+
/**
163+
* The name of the IO worker
164+
*/
165+
private const val ioWorkerName = "com.autodesk.coroutineworker.ioworker"
166+
167+
/**
168+
* Returns whether we're already running on the IO thread
169+
*/
170+
internal fun shouldPerformIoWorkInline() = Worker.current.name == ioWorkerName
139171
}
140172
}
141173

src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineUtils.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.autodesk.coroutineworker
22

33
import kotlinx.coroutines.CancellationException
4+
import kotlinx.coroutines.CoroutineDispatcher
45
import kotlinx.coroutines.Delay
6+
import kotlinx.coroutines.Runnable
57
import kotlinx.coroutines.delay
68
import kotlin.coroutines.ContinuationInterceptor
9+
import kotlin.coroutines.CoroutineContext
710
import kotlin.coroutines.coroutineContext
811
import kotlin.native.concurrent.AtomicInt
912
import kotlin.native.concurrent.AtomicReference
@@ -64,3 +67,14 @@ public actual suspend fun <T> threadSafeSuspendCallback(startAsync: (CompletionL
6467
throw e
6568
}
6669
}
70+
71+
/**
72+
* Symbolic dispatcher used to trigger IO-bound-thread-like behavior on native
73+
*/
74+
public object IODispatcher : CoroutineDispatcher() {
75+
override fun dispatch(context: CoroutineContext, block: Runnable) {
76+
throw UnsupportedOperationException(
77+
"This dispatcher is symbolic and should not be used to run anything."
78+
)
79+
}
80+
}

src/nativeMain/kotlin/com/autodesk/coroutineworker/CoroutineWorker.kt

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,16 @@ public actual class CoroutineWorker internal actual constructor() {
5050
private val executor = BackgroundCoroutineWorkQueueExecutor<WorkItem>(4)
5151

5252
public actual fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker {
53-
return CoroutineWorker().also {
54-
val state = it.state
55-
executor.enqueueWork(
56-
WorkItem(
57-
{ state.cancelled },
58-
{ state.completed = true },
59-
block
60-
)
61-
)
62-
}
53+
return executeInternal(false, block)
6354
}
6455

6556
public actual suspend fun <T> withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
57+
val isIoWork = jvmContext == IODispatcher
58+
if (isIoWork && BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline()) {
59+
return coroutineScope(block)
60+
}
6661
return threadSafeSuspendCallback<T> { completion ->
67-
val job = execute {
62+
val job = executeInternal(isIoWork) {
6863
val result = runCatching {
6964
block()
7065
}
@@ -74,6 +69,20 @@ public actual class CoroutineWorker internal actual constructor() {
7469
}
7570
}
7671

72+
private fun executeInternal(isIoWork: Boolean, block: suspend CoroutineScope.() -> Unit): CoroutineWorker {
73+
return CoroutineWorker().also {
74+
val state = it.state
75+
executor.enqueueWork(
76+
WorkItem(
77+
{ state.cancelled },
78+
{ state.completed = true },
79+
block
80+
),
81+
isIoWork
82+
)
83+
}
84+
}
85+
7786
/** CoroutineWorker's CoroutineWorkItem class that listens for cancellation */
7887
private class WorkItem(
7988
val cancelled: () -> Boolean,

src/nativeTest/kotlin/com/autodesk/coroutineworker/CoroutineUtilsTest.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.autodesk.coroutineworker
22

3+
import kotlinx.atomicfu.atomic
4+
import kotlinx.coroutines.Dispatchers
35
import kotlin.native.concurrent.isFrozen
46
import kotlin.test.Test
7+
import kotlin.test.assertFalse
58
import kotlin.test.assertTrue
69

710
class CoroutineUtilsTest {
@@ -19,4 +22,38 @@ class CoroutineUtilsTest {
1922
assertTrue(called)
2023
}
2124
}
25+
26+
@Test
27+
fun `withContext with special dispatcher uses special thread`() {
28+
testRunBlocking {
29+
CoroutineWorker.withContext(IODispatcher) {
30+
// should return true because it's on the special thread
31+
assertTrue(BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline())
32+
}
33+
}
34+
}
35+
36+
@Test
37+
fun `withContext without special dispatcher does not use special thread`() {
38+
testRunBlocking {
39+
CoroutineWorker.withContext(Dispatchers.Default) {
40+
// should return false because it's not on the special thread
41+
assertFalse(BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline())
42+
}
43+
}
44+
}
45+
46+
@Test
47+
fun `multiple withContext on IODispatcher works without timeout`() {
48+
testRunBlocking {
49+
val ran = atomic(false)
50+
CoroutineWorker.withContext(IODispatcher) {
51+
// should return false because it's not on the special thread
52+
CoroutineWorker.withContext(IODispatcher) {
53+
ran.value = true
54+
}
55+
}
56+
assertTrue(ran.value)
57+
}
58+
}
2259
}

0 commit comments

Comments
 (0)