diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 3b0a14b3cb96..f341473a9a4d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -162,16 +162,23 @@ public Mono connect(HttpMethod method, URI uri, requestSender = setUri(requestSender, uri); AtomicReference responseRef = new AtomicReference<>(); + AtomicReference connectionRef = new AtomicReference<>(); - return requestSender + return requestSender .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .responseConnection((response, connection) -> { ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection); responseRef.set(clientResponse); - registerAttributeCallback(connection); + connectionRef.set(connection); return Mono.just((ClientHttpResponse) clientResponse); }) .next() + .doFinally(signal -> { + ReactorClientHttpResponse response = responseRef.get(); + if (response != null) { + clearChannelAttribute(response.connection); + } + }) .doOnCancel(() -> { ReactorClientHttpResponse response = responseRef.get(); if (response != null) { @@ -180,6 +187,10 @@ public Mono connect(HttpMethod method, URI uri, }); } + private static void clearChannelAttribute(Connection connection) { + connection.channel().attr(ATTRIBUTES_KEY).set(null); + } + private static HttpClient.RequestSender setUri(HttpClient.RequestSender requestSender, URI uri) { if (uri.isAbsolute()) { try { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 2e40a36c1fcf..871fe79fe952 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -1323,7 +1323,6 @@ void retrieveTextDecodedToFlux(ClientHttpConnector connector) throws IOException .verify(Duration.ofSeconds(3)); } - @Disabled("Disabled because it's flaky (gh-36589)") @Test // gh-36158 void reactorNettyAttributes() throws IOException { startServer(new ReactorClientHttpConnector()); @@ -1334,14 +1333,17 @@ void reactorNettyAttributes() throws IOException { AtomicReference channelRef = new AtomicReference<>(); Mono result = this.webClient.get().uri("/greeting") - .httpRequest(request -> { - HttpClientRequest reactorRequest = request.getNativeRequest(); - channelRef.set(((ChannelOperations) reactorRequest).channel()); - }) - .retrieve() - .bodyToMono(String.class); - - StepVerifier.create(result).expectNext("Hello Spring!").expectComplete().verify(Duration.ofSeconds(3)); + .httpRequest(request -> { + HttpClientRequest reactorRequest = request.getNativeRequest(); + channelRef.set(((ChannelOperations) reactorRequest).channel()); + }) + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result) + .expectNext("Hello Spring!") + .expectComplete() + .verify(Duration.ofSeconds(3)); assertThat(channelRef.get().attr(ReactorClientHttpConnector.ATTRIBUTES_KEY).get()).isNull(); }