Skip to content

Commit 8d93670

Browse files
dmitrysulmansdeleuze
authored andcommitted
Support Micrometer context propagation in Kotlin Flow
See spring-projectsgh-36427 Closes spring-projectsgh-36667 Signed-off-by: Dmitry Sulman <dmitry.sulman@gmail.com>
1 parent 29a7402 commit 8d93670

3 files changed

Lines changed: 59 additions & 9 deletions

File tree

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,16 @@ public class ReactiveAdapterRegistry {
7171

7272
private static final boolean MUTINY_PRESENT;
7373

74+
private static final boolean CONTEXT_PROPAGATION_PRESENT;
75+
7476
static {
7577
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
7678
REACTIVE_STREAMS_PRESENT = ClassUtils.isPresent("org.reactivestreams.Publisher", classLoader);
7779
REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
7880
RXJAVA_3_PRESENT = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
7981
COROUTINES_REACTOR_PRESENT = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
8082
MUTINY_PRESENT = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
83+
CONTEXT_PROPAGATION_PRESENT = ClassUtils.isPresent("io.micrometer.context.ContextSnapshotFactory", classLoader);
8184
}
8285

8386
private final List<ReactiveAdapter> adapters = new ArrayList<>();
@@ -356,7 +359,9 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
356359

357360
registry.registerReactiveType(
358361
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
359-
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
362+
CONTEXT_PROPAGATION_PRESENT ?
363+
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source, new PropagationContextElement()) :
364+
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
360365
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow);
361366
}
362367
}

spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616

1717
package org.springframework.core
1818

19+
import io.micrometer.observation.Observation
20+
import io.micrometer.observation.tck.TestObservationRegistry
1921
import kotlinx.coroutines.Deferred
2022
import kotlinx.coroutines.DelicateCoroutinesApi
23+
import kotlinx.coroutines.Dispatchers
2124
import kotlinx.coroutines.GlobalScope
2225
import kotlinx.coroutines.async
2326
import kotlinx.coroutines.flow.Flow
2427
import kotlinx.coroutines.flow.flow
2528
import kotlinx.coroutines.flow.toList
29+
import kotlinx.coroutines.reactive.awaitSingle
30+
import kotlinx.coroutines.runBlocking
2631
import org.assertj.core.api.Assertions.assertThat
2732
import org.junit.jupiter.api.Test
2833
import org.reactivestreams.Publisher
@@ -40,6 +45,8 @@ import kotlin.reflect.KClass
4045
@OptIn(DelicateCoroutinesApi::class)
4146
class ReactiveAdapterRegistryKotlinTests {
4247

48+
private val observationRegistry = TestObservationRegistry.create()
49+
4350
private val registry = ReactiveAdapterRegistry.getSharedInstance()
4451

4552
@Test
@@ -82,6 +89,23 @@ class ReactiveAdapterRegistryKotlinTests {
8289
assertThat((target as Flow<*>).toList()).contains(1, 2, 3)
8390
}
8491

92+
@Test
93+
fun propagateMicrometerContextToFlow() {
94+
val source = flow {
95+
val currentObservation = observationRegistry.currentObservation
96+
assertThat(currentObservation).isNotNull
97+
emit(currentObservation?.context?.name)
98+
}
99+
val observation = Observation.createNotStarted("coroutine", observationRegistry)
100+
observation.observe {
101+
val target: Publisher<String> = getAdapter(Flow::class).toPublisher(source)
102+
val result = runBlocking(Dispatchers.IO) {
103+
target.awaitSingle()
104+
}
105+
assertThat(result).isEqualTo("coroutine")
106+
}
107+
}
108+
85109
private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
86110
return this.registry.getAdapter(reactiveType.java)!!
87111
}

spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,16 @@
1616

1717
package org.springframework.web.reactive.result.method.annotation
1818

19-
import kotlinx.coroutines.Deferred
20-
import kotlinx.coroutines.DelicateCoroutinesApi
21-
import kotlinx.coroutines.GlobalScope
22-
import kotlinx.coroutines.async
23-
import kotlinx.coroutines.delay
19+
import io.micrometer.observation.ObservationRegistry
20+
import io.micrometer.observation.tck.TestObservationRegistry
21+
import kotlinx.coroutines.*
2422
import kotlinx.coroutines.flow.Flow
2523
import kotlinx.coroutines.flow.flow
2624
import org.assertj.core.api.Assertions.assertThat
2725
import org.assertj.core.api.Assertions.assertThatExceptionOfType
28-
import org.junit.jupiter.api.Assumptions.assumeFalse
2926
import org.springframework.context.ApplicationContext
3027
import org.springframework.context.annotation.AnnotationConfigApplicationContext
28+
import org.springframework.context.annotation.Bean
3129
import org.springframework.context.annotation.ComponentScan
3230
import org.springframework.context.annotation.Configuration
3331
import org.springframework.http.HttpHeaders
@@ -86,6 +84,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
8684
assertThat(entity.body).isEqualTo("foobar")
8785
}
8886

87+
@ParameterizedHttpServerTest
88+
fun `Handler method returning Flow with observation`(httpServer: HttpServer) {
89+
startServer(httpServer)
90+
91+
val entity = performGet("/flow-observation", HttpHeaders.EMPTY, String::class.java)
92+
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
93+
assertThat(entity.body).isEqualTo("http.server.requests")
94+
}
95+
8996
@ParameterizedHttpServerTest
9097
fun `Suspending handler method returning Flow`(httpServer: HttpServer) {
9198
startServer(httpServer)
@@ -135,11 +142,16 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
135142
@Configuration
136143
@EnableWebFlux
137144
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
138-
open class WebConfig
145+
open class WebConfig {
146+
147+
@Bean
148+
open fun observationRegistry(): ObservationRegistry = TestObservationRegistry.create()
149+
150+
}
139151

140152
@OptIn(DelicateCoroutinesApi::class)
141153
@RestController
142-
class CoroutinesController {
154+
class CoroutinesController(private val observationRegistry: ObservationRegistry) {
143155

144156
@GetMapping("/suspend")
145157
suspend fun suspendingEndpoint(): String {
@@ -167,6 +179,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
167179
delay(1)
168180
}
169181

182+
@GetMapping("/flow-observation")
183+
fun flowObservationEndpoint(): Flow<String?> {
184+
return flow {
185+
val currentObservation = observationRegistry.currentObservation
186+
assertThat(currentObservation).isNotNull
187+
emit(currentObservation?.context?.name)
188+
}
189+
}
190+
170191
@GetMapping("/suspending-flow")
171192
suspend fun suspendingFlowEndpoint(): Flow<String> {
172193
delay(1)

0 commit comments

Comments
 (0)