Skip to content

Commit 67897b3

Browse files
committed
Add idle body write detection to Netty HTTP client
1 parent d97564a commit 67897b3

7 files changed

Lines changed: 354 additions & 4 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandler;
22+
import io.netty.handler.timeout.IdleStateHandler;
2223
import io.netty.handler.timeout.ReadTimeoutHandler;
2324
import io.netty.handler.timeout.WriteTimeoutHandler;
2425
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -52,7 +53,9 @@ public void channelReleased(Channel channel) {
5253
FlushOnReadHandler.class,
5354
ResponseHandler.class,
5455
ReadTimeoutHandler.class,
55-
WriteTimeoutHandler.class);
56+
WriteTimeoutHandler.class,
57+
IdleStateHandler.class,
58+
WriteIdleTimeoutHandler.class);
5659
}
5760
}
5861
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.netty.handler.codec.http.HttpMethod;
4040
import io.netty.handler.codec.http.HttpRequest;
4141
import io.netty.handler.codec.http.HttpVersion;
42+
import io.netty.handler.timeout.IdleStateHandler;
4243
import io.netty.handler.timeout.ReadTimeoutHandler;
4344
import io.netty.handler.timeout.WriteTimeoutHandler;
4445
import io.netty.util.Attribute;
@@ -238,14 +239,15 @@ private void makeRequest() {
238239
}
239240

240241
private void writeRequest(HttpRequest request) {
241-
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
242-
TimeUnit.MILLISECONDS));
242+
addWriteTimeoutHandlers();
243243
StreamedRequest streamedRequest = new StreamedRequest(request,
244244
context.executeRequest().requestContentPublisher());
245+
245246
channel.writeAndFlush(streamedRequest)
246247
.addListener(wireCall -> {
247248
// Done writing so remove the idle write timeout handler
248-
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class);
249+
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class,
250+
IdleStateHandler.class, WriteIdleTimeoutHandler.class);
249251
if (wireCall.isSuccess()) {
250252
NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel);
251253

@@ -281,6 +283,17 @@ private void writeRequest(HttpRequest request) {
281283
}
282284
}
283285

286+
private void addWriteTimeoutHandlers() {
287+
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
288+
TimeUnit.MILLISECONDS));
289+
String httpStreamsName = channel.pipeline().context(HttpStreamsClientHandler.class).name();
290+
channel.pipeline().addBefore(httpStreamsName, null,
291+
new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0,
292+
TimeUnit.MILLISECONDS));
293+
channel.pipeline().addBefore(httpStreamsName, null,
294+
new WriteIdleTimeoutHandler());
295+
}
296+
284297
/**
285298
* It should explicitly trigger Read for the following situations:
286299
*
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics;
19+
20+
import io.netty.channel.ChannelDuplexHandler;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.IdleState;
23+
import io.netty.handler.timeout.IdleStateEvent;
24+
import java.io.IOException;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
27+
/**
28+
* Handles writer idle events from IdleStateHandler to detect idle body write gaps.
29+
*/
30+
@SdkInternalApi
31+
public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler {
32+
private boolean closed;
33+
34+
@Override
35+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
36+
if (evt instanceof IdleStateEvent) {
37+
IdleStateEvent e = (IdleStateEvent) evt;
38+
if (e.state() == IdleState.WRITER_IDLE) {
39+
writeTimeout(ctx);
40+
}
41+
}
42+
super.userEventTriggered(ctx, evt);
43+
}
44+
45+
private void writeTimeout(ChannelHandlerContext ctx) throws Exception {
46+
if (!closed) {
47+
IOException exception = new IOException(
48+
errorMessageWithChannelDiagnostics(ctx.channel(), "No data was written to the request body for the configured "
49+
+ "write timeout duration. "
50+
+ "This can occur if the request body publisher is slow to "
51+
+ "produce data, "
52+
+ "for example when using AsyncRequestBody.fromInputStream() "
53+
+ "with an executor "
54+
+ "that has fewer threads than concurrent requests. "
55+
+ "If applicable, consider increasing the executor's thread "
56+
+ "pool size or "
57+
+ "investigating what is preventing the request body from "
58+
+ "being written."));
59+
ctx.fireExceptionCaught(exception);
60+
ctx.close();
61+
closed = true;
62+
}
63+
}
64+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.fault;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
20+
21+
import io.netty.bootstrap.ServerBootstrap;
22+
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelInitializer;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.channel.nio.NioEventLoopGroup;
26+
import io.netty.channel.socket.ServerSocketChannel;
27+
import io.netty.channel.socket.nio.NioServerSocketChannel;
28+
import io.netty.handler.ssl.SslContext;
29+
import io.netty.handler.ssl.SslContextBuilder;
30+
import io.netty.handler.ssl.util.SelfSignedCertificate;
31+
import java.io.IOException;
32+
import java.nio.ByteBuffer;
33+
import java.time.Duration;
34+
import java.util.Optional;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.reactivestreams.Publisher;
43+
import org.reactivestreams.Subscriber;
44+
import org.reactivestreams.Subscription;
45+
import software.amazon.awssdk.http.Protocol;
46+
import software.amazon.awssdk.http.SdkHttpFullRequest;
47+
import software.amazon.awssdk.http.SdkHttpMethod;
48+
import software.amazon.awssdk.http.SdkHttpResponse;
49+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
50+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
51+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
52+
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
53+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
54+
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
55+
import software.amazon.awssdk.utils.AttributeMap;
56+
57+
/**
58+
* Tests that the write idle timeout handler detects when no request body data is written
59+
* and proactively closes the connection.
60+
*/
61+
public class WriteIdleTimeoutTest {
62+
63+
private SdkAsyncHttpClient netty;
64+
private Server server;
65+
66+
@BeforeEach
67+
public void setup() throws Exception {
68+
server = new Server();
69+
server.init();
70+
71+
netty = NettyNioAsyncHttpClient.builder()
72+
.writeTimeout(Duration.ofMillis(500))
73+
.readTimeout(Duration.ofSeconds(5))
74+
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
75+
.protocol(Protocol.HTTP1_1)
76+
.buildWithDefaults(AttributeMap.builder()
77+
.put(TRUST_ALL_CERTIFICATES, true)
78+
.build());
79+
}
80+
81+
@AfterEach
82+
public void teardown() throws InterruptedException {
83+
if (server != null) {
84+
server.shutdown();
85+
}
86+
if (netty != null) {
87+
netty.close();
88+
}
89+
}
90+
91+
/**
92+
* A request body publisher that subscribes but never produces any data, simulating
93+
* the thread starvation scenario from the customer issue.
94+
*/
95+
@Test
96+
public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException {
97+
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
98+
.method(SdkHttpMethod.PUT)
99+
.protocol("https")
100+
.host("localhost")
101+
.port(server.port())
102+
.putHeader("Content-Length", "1024")
103+
.build();
104+
105+
CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024));
106+
107+
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
108+
.hasCauseInstanceOf(IOException.class)
109+
.hasStackTraceContaining("No data was written to the request body");
110+
}
111+
112+
private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) {
113+
return netty.execute(AsyncExecuteRequest.builder()
114+
.responseHandler(new SdkAsyncHttpResponseHandler() {
115+
@Override
116+
public void onHeaders(SdkHttpResponse headers) {
117+
}
118+
119+
@Override
120+
public void onStream(Publisher<ByteBuffer> stream) {
121+
}
122+
123+
@Override
124+
public void onError(Throwable error) {
125+
}
126+
})
127+
.request(request)
128+
.requestContentPublisher(contentPublisher)
129+
.build());
130+
}
131+
132+
/**
133+
* A content publisher that accepts a subscription but never calls onNext/onComplete,
134+
* simulating a stalled body write.
135+
*/
136+
private static class NeverWritesContentPublisher implements SdkHttpContentPublisher {
137+
private final long contentLength;
138+
139+
NeverWritesContentPublisher(long contentLength) {
140+
this.contentLength = contentLength;
141+
}
142+
143+
@Override
144+
public Optional<Long> contentLength() {
145+
return Optional.of(contentLength);
146+
}
147+
148+
@Override
149+
public void subscribe(Subscriber<? super ByteBuffer> s) {
150+
// Request subscription but never produce data
151+
s.onSubscribe(new Subscription() {
152+
@Override
153+
public void request(long n) {
154+
// intentionally do nothing
155+
}
156+
157+
@Override
158+
public void cancel() {
159+
}
160+
});
161+
}
162+
}
163+
164+
private static class Server extends ChannelInitializer<Channel> {
165+
private ServerBootstrap bootstrap;
166+
private ServerSocketChannel serverSock;
167+
private final NioEventLoopGroup group = new NioEventLoopGroup();
168+
private SslContext sslCtx;
169+
170+
void init() throws Exception {
171+
SelfSignedCertificate ssc = new SelfSignedCertificate();
172+
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
173+
174+
bootstrap = new ServerBootstrap()
175+
.channel(NioServerSocketChannel.class)
176+
.group(group)
177+
.childHandler(this);
178+
179+
serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
180+
}
181+
182+
@Override
183+
protected void initChannel(Channel ch) {
184+
ChannelPipeline pipeline = ch.pipeline();
185+
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
186+
}
187+
188+
void shutdown() throws InterruptedException {
189+
group.shutdownGracefully().await();
190+
serverSock.close();
191+
}
192+
193+
int port() {
194+
return serverSock.localAddress().getPort();
195+
}
196+
197+
}
198+
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListenerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.netty.channel.nio.NioEventLoopGroup;
2727
import io.netty.handler.logging.LogLevel;
2828
import io.netty.handler.logging.LoggingHandler;
29+
import io.netty.handler.timeout.IdleStateHandler;
2930
import io.netty.handler.timeout.ReadTimeoutHandler;
3031
import io.netty.handler.timeout.WriteTimeoutHandler;
3132
import org.junit.After;
@@ -71,6 +72,8 @@ public void setup() throws Exception {
7172
pipeline.addLast(ResponseHandler.getInstance());
7273
pipeline.addLast(new ReadTimeoutHandler(10));
7374
pipeline.addLast(new WriteTimeoutHandler(10));
75+
pipeline.addLast(new IdleStateHandler(0, 10, 0));
76+
pipeline.addLast(new WriteIdleTimeoutHandler());
7477
handler = HandlerRemovingChannelPoolListener.create();
7578
}
7679

@@ -111,12 +114,16 @@ private void assertHandlersRemoved() {
111114
assertThat(pipeline.get(ResponseHandler.class)).isNull();
112115
assertThat(pipeline.get(ReadTimeoutHandler.class)).isNull();
113116
assertThat(pipeline.get(WriteTimeoutHandler.class)).isNull();
117+
assertThat(pipeline.get(IdleStateHandler.class)).isNull();
118+
assertThat(pipeline.get(WriteIdleTimeoutHandler.class)).isNull();
114119
}
115120

116121
private void assertHandlersNotRemoved() {
117122
assertThat(pipeline.get(HttpStreamsClientHandler.class)).isNotNull();
118123
assertThat(pipeline.get(ResponseHandler.class)).isNotNull();
119124
assertThat(pipeline.get(ReadTimeoutHandler.class)).isNotNull();
120125
assertThat(pipeline.get(WriteTimeoutHandler.class)).isNotNull();
126+
assertThat(pipeline.get(IdleStateHandler.class)).isNotNull();
127+
assertThat(pipeline.get(WriteIdleTimeoutHandler.class)).isNotNull();
121128
}
122129
}

0 commit comments

Comments
 (0)