Skip to content

Commit 8ed4311

Browse files
committed
The analytics-kotlin library runs blocking OkHttp calls (client.newCall(request).execute()) inside coroutines via withContext and launch, but without runInterruptible.
This means Job.cancel() cannot interrupt in-flight HTTP calls, the coroutine only cancels at the next suspension point, not during the blocking I/O. Wrapping in runInterruptible causes coroutine cancellation to trigger Thread.interrupt(), which OkHttp handles by aborting the connection immediately. Signed-off-by: Olivier Lamy <olamy@apache.org>
1 parent ac040af commit 8ed4311

3 files changed

Lines changed: 25 additions & 17 deletions

File tree

core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.segment.analytics.kotlin.core.platform.plugins.logger.log
66
import com.segment.analytics.kotlin.core.utilities.LenientJson
77
import com.segment.analytics.kotlin.core.utilities.safeJsonObject
88
import kotlinx.coroutines.launch
9+
import kotlinx.coroutines.runInterruptible
910
import kotlinx.coroutines.withContext
1011
import kotlinx.serialization.DeserializationStrategy
1112
import kotlinx.serialization.Serializable
@@ -89,7 +90,7 @@ suspend fun Analytics.checkSettings() {
8990

9091
val settingsObj = withContext(networkIODispatcher) {
9192
log("Fetching settings on ${Thread.currentThread().name}")
92-
return@withContext fetchSettings(writeKey, cdnHost)
93+
return@withContext runInterruptible { fetchSettings(writeKey, cdnHost) }
9394
}
9495

9596
settingsObj?.let {

core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,19 @@ object Telemetry: Subscriber {
263263
// We're using this to leave off the 'log' parameter if unset.
264264
val payload = Json.encodeToString(mapOf("series" to sendQueue))
265265

266-
val connection = httpClient.upload(host)
267-
connection.outputStream?.use { outputStream ->
268-
// Write the JSON string to the outputStream.
269-
outputStream.write(payload.toByteArray(Charsets.UTF_8))
270-
outputStream.flush() // Ensure all data is written
266+
runBlocking {
267+
runInterruptible {
268+
val connection = httpClient.upload(host)
269+
connection.outputStream?.use { outputStream ->
270+
// Write the JSON string to the outputStream.
271+
outputStream.write(payload.toByteArray(Charsets.UTF_8))
272+
outputStream.flush() // Ensure all data is written
273+
}
274+
connection.inputStream?.close()
275+
connection.outputStream?.close()
276+
connection.close()
277+
}
271278
}
272-
connection.inputStream?.close()
273-
connection.outputStream?.close()
274-
connection.close()
275279
} catch (e: HTTPException) {
276280
errorHandler?.invoke(e)
277281
if (e.responseCode == 429) {

core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.coroutines.channels.Channel
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1111
import kotlinx.coroutines.channels.consumeEach
1212
import kotlinx.coroutines.launch
13+
import kotlinx.coroutines.runInterruptible
1314
import kotlinx.coroutines.withContext
1415
import kotlinx.serialization.encodeToString
1516
import kotlinx.serialization.json.Json
@@ -135,14 +136,16 @@ open class EventPipeline(
135136
var shouldCleanup = true
136137
storage.readAsStream(url)?.use { data ->
137138
try {
138-
val connection = httpClient.upload(apiHost)
139-
connection.outputStream?.let {
140-
// Write the payloads into the OutputStream
141-
data.copyTo(connection.outputStream)
142-
connection.outputStream.close()
143-
144-
// Upload the payloads.
145-
connection.close()
139+
runInterruptible {
140+
val connection = httpClient.upload(apiHost)
141+
connection.outputStream?.let {
142+
// Write the payloads into the OutputStream
143+
data.copyTo(connection.outputStream)
144+
connection.outputStream.close()
145+
146+
// Upload the payloads.
147+
connection.close()
148+
}
146149
}
147150
// Cleanup uploaded payloads
148151
analytics.log("$logTag uploaded $url")

0 commit comments

Comments
 (0)