Skip to content

Commit 1f642b9

Browse files
committed
Merge branch '7.0.x'
2 parents 80c9efd + 8d93670 commit 1f642b9

9 files changed

Lines changed: 116 additions & 22 deletions

File tree

spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,10 @@ public static Publisher<?> invokeSuspendingFunction(
134134
Object arg = args[index];
135135
if (!(parameter.isOptional() && arg == null)) {
136136
KType type = parameter.getType();
137-
if (!type.isMarkedNullable() &&
137+
if (!(type.isMarkedNullable() && arg == null) &&
138138
type.getClassifier() instanceof KClass<?> kClass &&
139-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass))) {
139+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass)) &&
140+
!JvmClassMappingKt.getJavaClass(kClass).isInstance(arg)) {
140141
arg = box(kClass, arg);
141142
}
142143
argMap.put(parameter, arg);
@@ -166,9 +167,10 @@ public static Publisher<?> invokeSuspendingFunction(
166167
private static Object box(KClass<?> kClass, @Nullable Object arg) {
167168
KFunction<?> constructor = Objects.requireNonNull(KClasses.getPrimaryConstructor(kClass));
168169
KType type = constructor.getParameters().get(0).getType();
169-
if (!type.isMarkedNullable() &&
170+
if (!(type.isMarkedNullable() && arg == null) &&
170171
type.getClassifier() instanceof KClass<?> parameterClass &&
171-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass))) {
172+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass)) &&
173+
!JvmClassMappingKt.getJavaClass(parameterClass).isInstance(arg)) {
172174
arg = box(parameterClass, arg);
173175
}
174176
if (!KCallablesJvm.isAccessible(constructor)) {

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,16 @@ public class ReactiveAdapterRegistry {
7171

7272
private static final boolean MUTINY_PRESENT;
7373

74+
private static final boolean CONTEXT_PROPAGATION_PRESENT;
75+
7476
static {
7577
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
7678
REACTIVE_STREAMS_PRESENT = ClassUtils.isPresent("org.reactivestreams.Publisher", classLoader);
7779
REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
7880
RXJAVA_3_PRESENT = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
7981
COROUTINES_REACTOR_PRESENT = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
8082
MUTINY_PRESENT = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
83+
CONTEXT_PROPAGATION_PRESENT = ClassUtils.isPresent("io.micrometer.context.ContextSnapshotFactory", classLoader);
8184
}
8285

8386
private final List<ReactiveAdapter> adapters = new ArrayList<>();
@@ -356,7 +359,9 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
356359

357360
registry.registerReactiveType(
358361
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
359-
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
362+
CONTEXT_PROPAGATION_PRESENT ?
363+
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source, new PropagationContextElement()) :
364+
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
360365
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow);
361366
}
362367
}

spring-core/src/test/kotlin/org/springframework/core/CoroutinesUtilsTests.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ class CoroutinesUtilsTests {
236236
Assertions.assertThat(mono.awaitSingleOrNull()).isEqualTo("foo")
237237
}
238238

239+
@Test
240+
suspend fun invokeSuspendingFunctionWithNullableValueClassParameterAndUnderlyingValue() {
241+
val method = CoroutinesUtilsTests::class.java.declaredMethods.first { it.name.startsWith("suspendingFunctionWithNullableValueClass") }
242+
val mono = CoroutinesUtils.invokeSuspendingFunction(method, this, "foo", null) as Mono
243+
Assertions.assertThat(mono.awaitSingleOrNull()).isEqualTo("foo")
244+
}
245+
239246
@Test
240247
suspend fun invokeSuspendingFunctionWithNullableValueClassParameter() {
241248
val method = CoroutinesUtilsTests::class.java.declaredMethods.first { it.name.startsWith("suspendingFunctionWithNullableValueClass") }

spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616

1717
package org.springframework.core
1818

19+
import io.micrometer.observation.Observation
20+
import io.micrometer.observation.tck.TestObservationRegistry
1921
import kotlinx.coroutines.Deferred
2022
import kotlinx.coroutines.DelicateCoroutinesApi
23+
import kotlinx.coroutines.Dispatchers
2124
import kotlinx.coroutines.GlobalScope
2225
import kotlinx.coroutines.async
2326
import kotlinx.coroutines.flow.Flow
2427
import kotlinx.coroutines.flow.flow
2528
import kotlinx.coroutines.flow.toList
29+
import kotlinx.coroutines.reactive.awaitSingle
30+
import kotlinx.coroutines.runBlocking
2631
import org.assertj.core.api.Assertions.assertThat
2732
import org.junit.jupiter.api.Test
2833
import org.reactivestreams.Publisher
@@ -40,6 +45,8 @@ import kotlin.reflect.KClass
4045
@OptIn(DelicateCoroutinesApi::class)
4146
class ReactiveAdapterRegistryKotlinTests {
4247

48+
private val observationRegistry = TestObservationRegistry.create()
49+
4350
private val registry = ReactiveAdapterRegistry.getSharedInstance()
4451

4552
@Test
@@ -82,6 +89,23 @@ class ReactiveAdapterRegistryKotlinTests {
8289
assertThat((target as Flow<*>).toList()).contains(1, 2, 3)
8390
}
8491

92+
@Test
93+
fun propagateMicrometerContextToFlow() {
94+
val source = flow {
95+
val currentObservation = observationRegistry.currentObservation
96+
assertThat(currentObservation).isNotNull
97+
emit(currentObservation?.context?.name)
98+
}
99+
val observation = Observation.createNotStarted("coroutine", observationRegistry)
100+
observation.observe {
101+
val target: Publisher<String> = getAdapter(Flow::class).toPublisher(source)
102+
val result = runBlocking(Dispatchers.IO) {
103+
target.awaitSingle()
104+
}
105+
assertThat(result).isEqualTo("coroutine")
106+
}
107+
}
108+
85109
private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
86110
return this.registry.getAdapter(reactiveType.java)!!
87111
}

spring-web/src/main/java/org/springframework/web/method/support/InvocableHandlerMethod.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,10 @@ private static class KotlinDelegate {
316316
Object arg = args[index];
317317
if (!(parameter.isOptional() && arg == null)) {
318318
KType type = parameter.getType();
319-
if (!type.isMarkedNullable() &&
319+
if (!(type.isMarkedNullable() && arg == null) &&
320320
type.getClassifier() instanceof KClass<?> kClass &&
321-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass))) {
321+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass)) &&
322+
!JvmClassMappingKt.getJavaClass(kClass).isInstance(arg)) {
322323
arg = box(kClass, arg);
323324
}
324325
argMap.put(parameter, arg);
@@ -337,9 +338,10 @@ private static class KotlinDelegate {
337338
private static Object box(KClass<?> kClass, @Nullable Object arg) {
338339
KFunction<?> constructor = Objects.requireNonNull(KClasses.getPrimaryConstructor(kClass));
339340
KType type = constructor.getParameters().get(0).getType();
340-
if (!type.isMarkedNullable() &&
341+
if (!(type.isMarkedNullable() && arg == null) &&
341342
type.getClassifier() instanceof KClass<?> parameterClass &&
342-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass))) {
343+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass)) &&
344+
!JvmClassMappingKt.getJavaClass(parameterClass).isInstance(arg)) {
343345
arg = box(parameterClass, arg);
344346
}
345347
if (!KCallablesJvm.isAccessible(constructor)) {

spring-web/src/test/kotlin/org/springframework/web/method/support/InvocableHandlerMethodKotlinTests.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ class InvocableHandlerMethodKotlinTests {
155155
Assertions.assertThat(value).isEqualTo(1L)
156156
}
157157

158+
@Test
159+
fun valueClassWithNullableAndUnderlyingValue() {
160+
composite.addResolver(StubArgumentResolver(LongValueClass::class.java, 1L))
161+
val value = getInvocable(ValueClassHandler::valueClassWithNullable.javaMethod!!).invokeForRequest(request, null)
162+
Assertions.assertThat(value).isEqualTo(1L)
163+
}
164+
158165
@Test
159166
fun valueClassWithNullable() {
160167
composite.addResolver(StubArgumentResolver(LongValueClass::class.java, null))
@@ -215,6 +222,14 @@ class InvocableHandlerMethodKotlinTests {
215222
StepVerifier.create(value as Mono<Long>).verifyComplete()
216223
}
217224

225+
@Test
226+
fun suspendingValueClassWithNullableAndUnderlyingValue() {
227+
composite.addResolver(ContinuationHandlerMethodArgumentResolver())
228+
composite.addResolver(StubArgumentResolver(LongValueClass::class.java, 1L))
229+
val value = getInvocable(SuspendingValueClassHandler::valueClassWithNullable.javaMethod!!).invokeForRequest(request, null)
230+
StepVerifier.create(value as Mono<Long>).expectNext(1L).verifyComplete()
231+
}
232+
218233
@Test
219234
fun suspendingValueClassWithPrivateConstructor() {
220235
composite.addResolver(ContinuationHandlerMethodArgumentResolver())

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/InvocableHandlerMethod.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,10 @@ private static class KotlinDelegate {
356356
Object arg = args[index];
357357
if (!(parameter.isOptional() && arg == null)) {
358358
KType type = parameter.getType();
359-
if (!type.isMarkedNullable() &&
359+
if (!(type.isMarkedNullable() && arg == null) &&
360360
type.getClassifier() instanceof KClass<?> kClass &&
361-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass))) {
361+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(kClass)) &&
362+
!JvmClassMappingKt.getJavaClass(kClass).isInstance(arg)) {
362363
arg = box(kClass, arg);
363364
}
364365
argMap.put(parameter, arg);
@@ -378,9 +379,10 @@ private static class KotlinDelegate {
378379
private static Object box(KClass<?> kClass, @Nullable Object arg) {
379380
KFunction<?> constructor = Objects.requireNonNull(KClasses.getPrimaryConstructor(kClass));
380381
KType type = constructor.getParameters().get(0).getType();
381-
if (!type.isMarkedNullable() &&
382+
if (!(type.isMarkedNullable() && arg == null) &&
382383
type.getClassifier() instanceof KClass<?> parameterClass &&
383-
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass))) {
384+
KotlinDetector.isInlineClass(JvmClassMappingKt.getJavaClass(parameterClass)) &&
385+
!JvmClassMappingKt.getJavaClass(parameterClass).isInstance(arg)) {
384386
arg = box(parameterClass, arg);
385387
}
386388
if (!KCallablesJvm.isAccessible(constructor)) {

spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/InvocableHandlerMethodKotlinTests.kt

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,14 @@ class InvocableHandlerMethodKotlinTests {
258258
assertHandlerResultValue(result, "1")
259259
}
260260

261+
@Test
262+
fun valueClassWithNullableAndUnderlyingValue() {
263+
this.resolvers.add(stubResolver(1L, LongValueClass::class.java))
264+
val method = ValueClassController::valueClassWithNullable.javaMethod!!
265+
val result = invoke(ValueClassController(), method)
266+
assertHandlerResultValue(result, "1")
267+
}
268+
261269
@Test
262270
fun valueClassWithNullable() {
263271
this.resolvers.add(stubResolver(null, LongValueClass::class.java))
@@ -320,6 +328,14 @@ class InvocableHandlerMethodKotlinTests {
320328
assertHandlerResultValue(result, "null")
321329
}
322330

331+
@Test
332+
fun suspendingValueClassWithNullableAndUnderlyingValue() {
333+
this.resolvers.add(stubResolver(1L, LongValueClass::class.java))
334+
val method = SuspendingValueClassController::valueClassWithNullable.javaMethod!!
335+
val result = invoke(SuspendingValueClassController(), method)
336+
assertHandlerResultValue(result, "1")
337+
}
338+
323339
@Test
324340
fun suspendingValueClassWithPrivateConstructor() {
325341
this.resolvers.add(stubResolver(1L, Long::class.java))
@@ -590,4 +606,4 @@ class InvocableHandlerMethodKotlinTests {
590606
}
591607

592608
class CustomException(message: String) : Throwable(message)
593-
}
609+
}

spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,16 @@
1616

1717
package org.springframework.web.reactive.result.method.annotation
1818

19-
import kotlinx.coroutines.Deferred
20-
import kotlinx.coroutines.DelicateCoroutinesApi
21-
import kotlinx.coroutines.GlobalScope
22-
import kotlinx.coroutines.async
23-
import kotlinx.coroutines.delay
19+
import io.micrometer.observation.ObservationRegistry
20+
import io.micrometer.observation.tck.TestObservationRegistry
21+
import kotlinx.coroutines.*
2422
import kotlinx.coroutines.flow.Flow
2523
import kotlinx.coroutines.flow.flow
2624
import org.assertj.core.api.Assertions.assertThat
2725
import org.assertj.core.api.Assertions.assertThatExceptionOfType
28-
import org.junit.jupiter.api.Assumptions.assumeFalse
2926
import org.springframework.context.ApplicationContext
3027
import org.springframework.context.annotation.AnnotationConfigApplicationContext
28+
import org.springframework.context.annotation.Bean
3129
import org.springframework.context.annotation.ComponentScan
3230
import org.springframework.context.annotation.Configuration
3331
import org.springframework.http.HttpHeaders
@@ -86,6 +84,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
8684
assertThat(entity.body).isEqualTo("foobar")
8785
}
8886

87+
@ParameterizedHttpServerTest
88+
fun `Handler method returning Flow with observation`(httpServer: HttpServer) {
89+
startServer(httpServer)
90+
91+
val entity = performGet("/flow-observation", HttpHeaders.EMPTY, String::class.java)
92+
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
93+
assertThat(entity.body).isEqualTo("http.server.requests")
94+
}
95+
8996
@ParameterizedHttpServerTest
9097
fun `Suspending handler method returning Flow`(httpServer: HttpServer) {
9198
startServer(httpServer)
@@ -135,11 +142,16 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
135142
@Configuration
136143
@EnableWebFlux
137144
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
138-
open class WebConfig
145+
open class WebConfig {
146+
147+
@Bean
148+
open fun observationRegistry(): ObservationRegistry = TestObservationRegistry.create()
149+
150+
}
139151

140152
@OptIn(DelicateCoroutinesApi::class)
141153
@RestController
142-
class CoroutinesController {
154+
class CoroutinesController(private val observationRegistry: ObservationRegistry) {
143155

144156
@GetMapping("/suspend")
145157
suspend fun suspendingEndpoint(): String {
@@ -167,6 +179,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
167179
delay(1)
168180
}
169181

182+
@GetMapping("/flow-observation")
183+
fun flowObservationEndpoint(): Flow<String?> {
184+
return flow {
185+
val currentObservation = observationRegistry.currentObservation
186+
assertThat(currentObservation).isNotNull
187+
emit(currentObservation?.context?.name)
188+
}
189+
}
190+
170191
@GetMapping("/suspending-flow")
171192
suspend fun suspendingFlowEndpoint(): Flow<String> {
172193
delay(1)

0 commit comments

Comments
 (0)