Skip to content

Commit 7ed7bf0

Browse files
authored
KTOR-8906 Bypass Dispatchers.IO for OutputStreamContent on servlet engines (#5594)
* KTOR-8906 Bypass Dispatchers.IO for OutputStreamContent on servlet engines Restores Ktor 2.x behavior where serialization on servlet engines ran in-place on the request thread without consuming Dispatchers.IO threads. In 3.x, PollersKt was removed from ktor-io, causing the withBlocking bridge to always redispatch to Dispatchers.IO. Combined with the ByteWriteChannel drain coroutine also needing IO, this created a circular thread-pool dependency that permanently deadlocked under concurrent load. Servlet-based engines (Jetty, Tomcat) now write OutputStreamContent directly to the native OutputStream via a new @internalapi overload, bypassing both the IO dispatch and the runBlocking bridge entirely. Also removes the dead isParkingAllowed reflection that has been a no-op since the PollersKt class was deleted in the ktor-io 3.x rewrite, along with the redundant ServerJacksonBlockingTest/ServerGsonBlockingTest tests that existed solely to exercise that mechanism. * Address coderabbit comments * Address bjhham comments * Fix StackOverflowError in KtorTargets.filterTargets The filter lambda captured itself by reference, causing infinite recursion when resolved. Capture previous value before reassignment.
1 parent 4e4dfbb commit 7ed7bf0

15 files changed

Lines changed: 214 additions & 329 deletions

File tree

build-logic/src/main/kotlin/ktorbuild/targets/KtorTargets.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ abstract class KtorTargets @Inject internal constructor(
122122
/** Applies a filter to the target list. The given [predicate] is combined with the existing filter. */
123123
internal fun filterTargets(predicate: (String) -> Boolean) {
124124
check(!filterFrozen) { "Can't change filter after targets have been finalized." }
125-
filter = { filter(it) && predicate(it) }
125+
val previousFilter = filter
126+
filter = { previousFilter(it) && predicate(it) }
126127
}
127128

128129
/**

ktor-http/api/ktor-http.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,7 @@ public final class io/ktor/http/content/OutputStreamContent : io/ktor/http/conte
15121512
public fun getContentType ()Lio/ktor/http/ContentType;
15131513
public fun getStatus ()Lio/ktor/http/HttpStatusCode;
15141514
public fun writeTo (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1515+
public final fun writeTo (Ljava/io/OutputStream;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
15151516
}
15161517

15171518
public abstract class io/ktor/http/content/PartData {

ktor-http/jvm/src/io/ktor/http/content/BlockingBridge.kt

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,20 @@
11
/*
2-
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3-
*/
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
44

55
package io.ktor.http.content
66

77
import kotlinx.coroutines.Dispatchers
88
import kotlinx.coroutines.withContext
9-
import java.lang.reflect.Method
10-
11-
private val isParkingAllowedFunction: Method? by lazy {
12-
try {
13-
Class.forName("io.ktor.utils.io.jvm.javaio.PollersKt")
14-
.getMethod("isParkingAllowed")
15-
} catch (cause: Throwable) {
16-
null
17-
}
18-
}
199

2010
/**
21-
* Execute [block] function either directly or redispatch on [Dispatchers.IO].
22-
* Redispatch is usually required when running on a thread that does not allow blocking
23-
* because it handles an event loop and/or epoll/kqueue/select operations.
24-
* Note that coroutines event loop thread usually can handle some blocking operations
25-
* so no need to redispatch.
11+
* Redispatches [block] onto [Dispatchers.IO] for blocking I/O.
12+
*
13+
* This is used by non-blocking engines (CIO, Netty) where the calling thread is an event-loop
14+
* thread that must not block. Servlet-based engines bypass this entirely via
15+
* [OutputStreamContent.writeTo] with a native [java.io.OutputStream].
2616
*/
2717
internal suspend fun withBlocking(block: suspend () -> Unit) {
28-
if (safeToRunInPlace()) {
29-
return block()
30-
}
31-
32-
return withBlockingAndRedispatch(block)
33-
}
34-
35-
private fun safeToRunInPlace(): Boolean {
36-
return try {
37-
isParkingAllowedFunction?.invoke(null) == true
38-
} catch (cause: Throwable) {
39-
false
40-
}
41-
}
42-
43-
private suspend fun withBlockingAndRedispatch(block: suspend () -> Unit) {
4418
withContext(Dispatchers.IO) {
4519
block()
4620
}

ktor-http/jvm/src/io/ktor/http/content/OutputStreamContent.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,19 @@ public class OutputStreamContent(
3131
}
3232
}
3333
}
34+
35+
/**
36+
* Writes the content body directly to the given [stream], bypassing the [ByteWriteChannel] intermediary.
37+
*
38+
* Engine implementations that have access to a native blocking [OutputStream] (e.g. servlet engines
39+
* backed by a thread-per-request model) should call this method instead of [writeTo(ByteWriteChannel)]
40+
* to avoid dispatching to [kotlinx.coroutines.Dispatchers.IO] and the `runBlocking` bridge inside
41+
* [ByteWriteChannel.toOutputStream].
42+
*
43+
* The caller is responsible for closing the [stream] after this method returns.
44+
*/
45+
@InternalAPI
46+
public suspend fun writeTo(stream: OutputStream) {
47+
stream.body()
48+
}
3449
}

ktor-server/ktor-server-core/jvm/test/io/ktor/tests/hosts/ReceiveBlockingPrimitiveTest.kt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import io.ktor.server.request.*
1212
import io.ktor.utils.io.*
1313
import kotlinx.coroutines.*
1414
import java.io.*
15-
import java.lang.reflect.*
1615
import kotlin.concurrent.*
1716
import kotlin.coroutines.*
1817
import kotlin.test.*
@@ -135,13 +134,4 @@ class ReceiveBlockingPrimitiveTest {
135134
application.dispose()
136135
}
137136
}
138-
139-
private val prohibitParkingFunction: Method? by lazy {
140-
Class.forName("io.ktor.utils.io.jvm.javaio.PollersKt")
141-
.getMethod("prohibitParking")
142-
}
143-
144-
private fun markParkingProhibited() {
145-
prohibitParkingFunction?.invoke(null)
146-
}
147137
}

ktor-server/ktor-server-jetty-jakarta/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ kotlin {
2929
implementation(projects.ktorServerCore)
3030
implementation(projects.ktorServerTestBase)
3131
implementation(projects.ktorServerTestSuites)
32+
implementation(projects.ktorServerContentNegotiation)
33+
implementation(projects.ktorSerializationJackson)
34+
implementation(projects.ktorClientContentNegotiation)
35+
implementation(libs.jackson.databind)
36+
implementation(libs.jackson.module.kotlin)
3237

3338
implementation(libs.jetty.servlet.jakarta)
3439
implementation(libs.jetty.servlet.websocket.jakarta)

ktor-server/ktor-server-jetty-jakarta/jvm/src/io/ktor/server/jetty/jakarta/JettyApplicationCall.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.ktor.http.HttpStatusCode
99
import io.ktor.http.Parameters
1010
import io.ktor.http.RequestConnectionPoint
1111
import io.ktor.http.content.OutgoingContent
12+
import io.ktor.http.content.OutputStreamContent
1213
import io.ktor.http.parseQueryString
1314
import io.ktor.http.withEmptyStringForValuelessKeys
1415
import io.ktor.server.application.*
@@ -24,6 +25,7 @@ import io.ktor.utils.io.pool.ByteBufferPool
2425
import kotlinx.coroutines.InternalCoroutinesApi
2526
import kotlinx.coroutines.suspendCancellableCoroutine
2627
import kotlinx.io.InternalIoApi
28+
import org.eclipse.jetty.io.Content
2729
import org.eclipse.jetty.io.EndPoint
2830
import org.eclipse.jetty.server.Request
2931
import org.eclipse.jetty.server.Response
@@ -186,6 +188,16 @@ public class JettyApplicationCall(
186188
}
187189
}
188190

191+
@OptIn(InternalAPI::class)
192+
override suspend fun respondWriteChannelContent(content: OutgoingContent.WriteChannelContent) {
193+
when (content) {
194+
is OutputStreamContent -> Content.Sink.asOutputStream(response).use { stream ->
195+
content.writeTo(stream)
196+
}
197+
else -> super.respondWriteChannelContent(content)
198+
}
199+
}
200+
189201
override suspend fun responseChannel(): ByteWriteChannel =
190202
responseBodyJob.value.channel
191203

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.tests.server.jetty.jakarta
6+
7+
import io.ktor.client.*
8+
import io.ktor.client.engine.cio.*
9+
import io.ktor.client.plugins.*
10+
import io.ktor.client.plugins.contentnegotiation.*
11+
import io.ktor.client.request.*
12+
import io.ktor.client.statement.*
13+
import io.ktor.http.*
14+
import io.ktor.serialization.jackson.*
15+
import io.ktor.server.application.*
16+
import io.ktor.server.jetty.jakarta.*
17+
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
18+
import io.ktor.server.request.*
19+
import io.ktor.server.response.*
20+
import io.ktor.server.routing.*
21+
import io.ktor.server.test.base.*
22+
import kotlinx.coroutines.*
23+
import kotlin.test.*
24+
import kotlin.time.Duration.Companion.seconds
25+
26+
class JettyJacksonIOExhaustionTest :
27+
EngineTestBase<JettyApplicationEngine, JettyApplicationEngineBase.Configuration>(Jetty) {
28+
29+
init {
30+
enableSsl = false
31+
enableHttp2 = false
32+
}
33+
34+
override fun plugins(application: Application, routingConfig: Route.() -> Unit) {
35+
super.plugins(application, routingConfig)
36+
application.install(ContentNegotiation) {
37+
jackson {}
38+
}
39+
}
40+
41+
@Test
42+
fun `concurrent jackson deserialization exhausts Dispatchers IO`() = runTest {
43+
createAndStartServer {
44+
post("/json/jackson") {
45+
val body = call.receive<Map<String, Any>>()
46+
call.respond(body)
47+
}
48+
}
49+
50+
val concurrentRequests = 50
51+
52+
HttpClient(CIO) {
53+
install(io.ktor.client.plugins.contentnegotiation.ContentNegotiation) {
54+
jackson {}
55+
}
56+
install(HttpTimeout) {
57+
requestTimeoutMillis = 60_000
58+
}
59+
}.use { client ->
60+
withTimeout(60.seconds) {
61+
coroutineScope {
62+
val results = (1..concurrentRequests).map { i ->
63+
async(Dispatchers.Default) {
64+
client.post("http://127.0.0.1:$port/json/jackson") {
65+
contentType(ContentType.Application.Json)
66+
setBody(mapOf("request" to i, "data" to "value$i"))
67+
}
68+
}
69+
}.awaitAll()
70+
71+
results.forEach { response ->
72+
assertEquals(HttpStatusCode.OK, response.status)
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}

ktor-server/ktor-server-servlet-jakarta/api/ktor-server-servlet-jakarta.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public abstract class io/ktor/server/servlet/jakarta/ServletApplicationResponse
118118
protected final fun getCompleted ()Z
119119
public fun getHeaders ()Lio/ktor/server/response/ResponseHeaders;
120120
protected final fun getServletResponse ()Ljakarta/servlet/http/HttpServletResponse;
121+
protected fun respondWriteChannelContent (Lio/ktor/http/content/OutgoingContent$WriteChannelContent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
121122
public final fun responseChannel (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
122123
protected final fun setCompleted (Z)V
123124
protected fun setStatus (Lio/ktor/http/HttpStatusCode;)V

ktor-server/ktor-server-servlet-jakarta/jvm/src/io/ktor/server/servlet/jakarta/ServletApplicationResponse.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.ktor.server.servlet.jakarta
66

77
import io.ktor.http.*
8+
import io.ktor.http.content.*
89
import io.ktor.server.application.*
910
import io.ktor.server.engine.*
1011
import io.ktor.server.response.*
@@ -48,6 +49,15 @@ public abstract class ServletApplicationResponse(
4849
responseJob.value.channel
4950
}
5051

52+
@OptIn(InternalAPI::class)
53+
override suspend fun respondWriteChannelContent(content: OutgoingContent.WriteChannelContent) {
54+
if (content is OutputStreamContent) {
55+
content.writeTo(servletResponse.outputStream)
56+
return
57+
}
58+
super.respondWriteChannelContent(content)
59+
}
60+
5161
public final override suspend fun responseChannel(): ByteWriteChannel = responseChannel.value
5262

5363
init {

0 commit comments

Comments
 (0)