Skip to content

Commit 3f1fd39

Browse files
authored
fix(ktor): resolve client hang on streaming path with Ktor 3.5.0 (#18781)
1 parent 6daaf4d commit 3f1fd39

4 files changed

Lines changed: 80 additions & 16 deletions

File tree

instrumentation/ktor/ktor-2.0/testing/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/AbstractKtorHttpClientTest.kt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import io.ktor.client.*
99
import io.ktor.client.engine.cio.*
1010
import io.ktor.client.plugins.*
1111
import io.ktor.client.request.*
12+
import io.ktor.client.statement.*
1213
import io.ktor.http.*
14+
import io.opentelemetry.api.trace.SpanKind
1315
import io.opentelemetry.context.Context
1416
import io.opentelemetry.extension.kotlin.asContextElement
1517
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
@@ -21,8 +23,11 @@ import kotlinx.coroutines.CoroutineScope
2123
import kotlinx.coroutines.Dispatchers
2224
import kotlinx.coroutines.launch
2325
import kotlinx.coroutines.runBlocking
26+
import kotlinx.coroutines.withTimeout
2427
import org.junit.jupiter.api.AfterAll
28+
import org.junit.jupiter.api.Test
2529
import java.net.URI
30+
import kotlin.time.Duration.Companion.seconds
2631

2732
abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {
2833

@@ -57,6 +62,34 @@ abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBu
5762
client.request(request).status.value
5863
}
5964

65+
fun sendStreamingRequest(request: HttpRequestBuilder) = runBlocking {
66+
// withTimeout ensures requests complete before the HttpTimeout fires. The bug this guards
67+
// against caused the instrumentation to prevent timely completion of streaming requests.
68+
withTimeout(5.seconds) {
69+
client.prepareRequest(request).execute { response ->
70+
response.bodyAsText()
71+
response.status.value
72+
}
73+
}
74+
}
75+
76+
@Test
77+
fun streamingRequestCompletesPromptly() {
78+
val uri = resolveAddress("/success")
79+
val request = buildRequest("GET", uri, mutableMapOf())
80+
81+
sendStreamingRequest(request)
82+
83+
testing.waitAndAssertTraces(
84+
{ trace ->
85+
trace.hasSpansSatisfyingExactly(
86+
{ span -> span.hasName("GET").hasKind(SpanKind.CLIENT).hasNoParent() },
87+
{ span -> assertServerSpan(span).hasParent(trace.getSpan(0)) },
88+
)
89+
}
90+
)
91+
}
92+
6093
override fun sendRequestWithCallback(
6194
request: HttpRequestBuilder,
6295
method: String,

instrumentation/ktor/ktor-3.0/testing/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/AbstractKtorHttpClientTest.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import io.ktor.client.*
99
import io.ktor.client.engine.cio.*
1010
import io.ktor.client.plugins.*
1111
import io.ktor.client.request.*
12+
import io.ktor.client.statement.*
1213
import io.ktor.http.*
14+
import io.opentelemetry.api.trace.SpanKind
1315
import io.opentelemetry.context.Context
1416
import io.opentelemetry.extension.kotlin.asContextElement
1517
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
@@ -21,14 +23,21 @@ import kotlinx.coroutines.CoroutineScope
2123
import kotlinx.coroutines.Dispatchers
2224
import kotlinx.coroutines.launch
2325
import kotlinx.coroutines.runBlocking
26+
import kotlinx.coroutines.withTimeout
2427
import org.junit.jupiter.api.AfterAll
28+
import org.junit.jupiter.api.Test
2529
import java.net.URI
30+
import kotlin.time.Duration.Companion.seconds
2631

2732
abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {
2833

2934
private val client = HttpClient(CIO) {
3035
install(HttpRedirect)
3136

37+
// HttpTimeout adds a child job to the call job, causing a hang in job.join() based span-end
38+
// implementations on the streaming path
39+
install(HttpTimeout) { requestTimeoutMillis = 30_000 }
40+
3241
installTracing()
3342
}
3443
private val singleConnectionClient = HttpClient(CIO) {
@@ -57,6 +66,34 @@ abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBu
5766
client.request(request).status.value
5867
}
5968

69+
fun sendStreamingRequest(request: HttpRequestBuilder) = runBlocking {
70+
// withTimeout ensures requests complete before the HttpTimeout fires. The bug this guards
71+
// against caused the instrumentation to prevent timely completion of streaming requests.
72+
withTimeout(5.seconds) {
73+
client.prepareRequest(request).execute { response ->
74+
response.bodyAsText()
75+
response.status.value
76+
}
77+
}
78+
}
79+
80+
@Test
81+
fun streamingRequestCompletesPromptly() {
82+
val uri = resolveAddress("/success")
83+
val request = buildRequest("GET", uri, mutableMapOf())
84+
85+
sendStreamingRequest(request)
86+
87+
testing.waitAndAssertTraces(
88+
{ trace ->
89+
trace.hasSpansSatisfyingExactly(
90+
{ span -> span.hasName("GET").hasKind(SpanKind.CLIENT).hasNoParent() },
91+
{ span -> assertServerSpan(span).hasParent(trace.getSpan(0)) },
92+
)
93+
}
94+
)
95+
}
96+
6097
override fun sendRequestWithCallback(
6198
request: HttpRequestBuilder,
6299
method: String,

instrumentation/ktor/ktor-common-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/common/v2_0/AbstractKtorClientTelemetry.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ abstract class AbstractKtorClientTelemetry(
1717
private val propagators: ContextPropagators,
1818
) {
1919

20-
internal fun createSpan(requestBuilder: HttpRequestBuilder): Context? {
21-
val parentContext = Context.current()
20+
internal fun createSpan(requestBuilder: HttpRequestBuilder, parentContext: Context): Context? {
2221
val requestData = requestBuilder.build()
2322

2423
return if (instrumenter.shouldStart(parentContext, requestData)) {

instrumentation/ktor/ktor-common-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/common/v2_0/internal/KtorClientTelemetryUtil.kt

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ import io.opentelemetry.context.Context
1414
import io.opentelemetry.extension.kotlin.asContextElement
1515
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientRequestResendCount
1616
import io.opentelemetry.instrumentation.ktor.common.v2_0.AbstractKtorClientTelemetry
17-
import kotlinx.coroutines.InternalCoroutinesApi
1817
import kotlinx.coroutines.job
19-
import kotlinx.coroutines.launch
2018
import kotlinx.coroutines.withContext
2119

2220
/**
@@ -25,6 +23,7 @@ import kotlinx.coroutines.withContext
2523
*/
2624
object KtorClientTelemetryUtil {
2725
private val OPEN_TELEMETRY_CONTEXT_KEY = AttributeKey<Context>("OpenTelemetry")
26+
private val OPEN_TELEMETRY_PARENT_CONTEXT_KEY = AttributeKey<Context>("OpenTelemetryParent")
2827

2928
fun install(plugin: AbstractKtorClientTelemetry, scope: HttpClient) {
3029
installSpanCreation(plugin, scope)
@@ -37,15 +36,17 @@ object KtorClientTelemetryUtil {
3736

3837
scope.requestPipeline.intercept(initializeRequestPhase) {
3938
val openTelemetryContext = HttpClientRequestResendCount.initialize(Context.current())
40-
withContext(openTelemetryContext.asContextElement()) { proceed() }
39+
context.attributes.put(OPEN_TELEMETRY_PARENT_CONTEXT_KEY, openTelemetryContext)
40+
proceed()
4141
}
4242

4343
val createSpanPhase = PipelinePhase("OpenTelemetryCreateSpan")
4444
scope.sendPipeline.insertPhaseAfter(HttpSendPipeline.State, createSpanPhase)
4545

4646
scope.sendPipeline.intercept(createSpanPhase) {
4747
val requestBuilder = context
48-
val openTelemetryContext = plugin.createSpan(requestBuilder)
48+
val parentContext = requestBuilder.attributes[OPEN_TELEMETRY_PARENT_CONTEXT_KEY]
49+
val openTelemetryContext = plugin.createSpan(requestBuilder, parentContext)
4950

5051
if (openTelemetryContext != null) {
5152
try {
@@ -63,7 +64,6 @@ object KtorClientTelemetryUtil {
6364
}
6465
}
6566

66-
@OptIn(InternalCoroutinesApi::class)
6767
private fun installSpanEnd(plugin: AbstractKtorClientTelemetry, scope: HttpClient) {
6868
val endSpanPhase = PipelinePhase("OpenTelemetryEndSpan")
6969
scope.receivePipeline.insertPhaseBefore(HttpReceivePipeline.State, endSpanPhase)
@@ -72,16 +72,11 @@ object KtorClientTelemetryUtil {
7272
val openTelemetryContext = it.call.attributes.getOrNull(OPEN_TELEMETRY_CONTEXT_KEY)
7373
openTelemetryContext ?: return@intercept
7474

75-
scope.launch {
76-
val job = it.call.coroutineContext.job
77-
job.join()
78-
val cause = if (!job.isCancelled) {
79-
null
80-
} else {
81-
kotlin.runCatching { job.getCancellationException() }.getOrNull()
82-
}
75+
val job = it.call.coroutineContext.job
76+
val call = it.call
8377

84-
plugin.endSpan(openTelemetryContext, it.call, cause)
78+
job.invokeOnCompletion { cause ->
79+
plugin.endSpan(openTelemetryContext, call, cause)
8580
}
8681
}
8782
}

0 commit comments

Comments
 (0)