Skip to content

Commit 95441ae

Browse files
committed
Merge branch '4.3.x'
2 parents 0855dd6 + 127f659 commit 95441ae

2 files changed

Lines changed: 50 additions & 0 deletions

File tree

spring-cloud-netflix-eureka-client/src/main/java/org/springframework/cloud/netflix/eureka/http/WebClientTransportClientFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import com.netflix.discovery.shared.transport.TransportClientFactory;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
27+
import reactor.netty.http.client.HttpClient;
28+
import reactor.netty.resources.ConnectionProvider;
29+
import reactor.netty.resources.LoopResources;
2730

2831
import org.springframework.http.HttpStatus;
2932
import org.springframework.http.MediaType;
33+
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
3034
import org.springframework.http.codec.ClientCodecConfigurer;
3135
import org.springframework.http.codec.json.Jackson2JsonDecoder;
3236
import org.springframework.http.codec.json.Jackson2JsonEncoder;
@@ -54,8 +58,14 @@ public class WebClientTransportClientFactory implements TransportClientFactory {
5458

5559
private final Supplier<WebClient.Builder> builderSupplier;
5660

61+
private final ConnectionProvider connectionProvider;
62+
63+
private final LoopResources loopResources;
64+
5765
public WebClientTransportClientFactory(Supplier<WebClient.Builder> builderSupplier) {
5866
this.builderSupplier = builderSupplier;
67+
this.connectionProvider = ConnectionProvider.create("eureka-webclient");
68+
this.loopResources = LoopResources.create("eureka-webclient");
5969
}
6070

6171
@Override
@@ -65,6 +75,11 @@ public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
6575
setUrl(builder, endpoint.getServiceUrl());
6676
setCodecs(builder);
6777
builder.filter(http4XxErrorExchangeFilterFunction());
78+
// Use dedicated Reactor Netty resources independent of the reactive web server
79+
// to prevent RejectedExecutionException during graceful shutdown when the
80+
// server's event loop terminates before DiscoveryClient deregisters.
81+
builder.clientConnector(
82+
new ReactorClientHttpConnector(HttpClient.create(this.connectionProvider).runOn(this.loopResources)));
6883
return new WebClientEurekaHttpClient(builder.build());
6984
}
7085

@@ -113,6 +128,8 @@ private ExchangeFilterFunction http4XxErrorExchangeFilterFunction() {
113128

114129
@Override
115130
public void shutdown() {
131+
this.connectionProvider.dispose();
132+
this.loopResources.dispose();
116133
}
117134

118135
}

spring-cloud-netflix-eureka-client/src/test/java/org/springframework/cloud/netflix/eureka/http/WebClientEurekaHttpClientTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,25 @@
1818

1919
import com.netflix.appinfo.providers.EurekaConfigBasedInstanceInfoProvider;
2020
import com.netflix.discovery.shared.resolver.DefaultEndpoint;
21+
import com.netflix.discovery.shared.transport.EurekaHttpClient;
2122
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
import reactor.netty.http.client.HttpClient;
25+
import reactor.netty.resources.LoopResources;
2226

2327
import org.springframework.beans.factory.annotation.Autowired;
2428
import org.springframework.beans.factory.annotation.Value;
2529
import org.springframework.boot.test.context.SpringBootTest;
2630
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
2731
import org.springframework.cloud.commons.util.InetUtils;
2832
import org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean;
33+
import org.springframework.http.HttpStatus;
34+
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
2935
import org.springframework.test.annotation.DirtiesContext;
3036
import org.springframework.web.reactive.function.client.WebClient;
3137

38+
import static org.assertj.core.api.Assertions.assertThat;
39+
3240
/**
3341
* @author Daniel Lavoie
3442
*/
@@ -65,4 +73,29 @@ void setup() {
6573
info = new EurekaConfigBasedInstanceInfoProvider(config).get();
6674
}
6775

76+
@Test
77+
void cancelSucceedsAfterSharedReactorResourcesDisposed() {
78+
// Simulate the LoopResources shared between the reactive web server and a
79+
// WebClient (as Spring Boot's ReactorResourceFactory provides by default)
80+
LoopResources sharedResources = LoopResources.create("simulated-server");
81+
WebClient.Builder sharedBuilder = WebClient.builder()
82+
.clientConnector(new ReactorClientHttpConnector(HttpClient.create().runOn(sharedResources)));
83+
84+
WebClientTransportClientFactory factory = new WebClientTransportClientFactory(() -> sharedBuilder);
85+
EurekaHttpClient client = factory.newClient(new DefaultEndpoint(serviceUrl));
86+
87+
// Simulate the reactive web server shutting down (terminates the shared event
88+
// loop)
89+
sharedResources.dispose();
90+
91+
// cancel() must succeed because the factory uses its own dedicated Reactor Netty
92+
// resources that are independent of the shared (now disposed) web server
93+
// resources.
94+
// Without the fix this would throw RejectedExecutionException: event executor
95+
// terminated
96+
assertThat(client.cancel("test", "test").getStatusCode()).isEqualTo(HttpStatus.OK.value());
97+
98+
factory.shutdown();
99+
}
100+
68101
}

0 commit comments

Comments
 (0)