Skip to content

Commit 29ecbf0

Browse files
authored
Add idle body write detection to Netty HTTP client (#6844)
* Add idle body write detection to Netty HTTP client * Address feedback * Address feedback
1 parent f608643 commit 29ecbf0

8 files changed

Lines changed: 382 additions & 9 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
1819
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;
1920

2021
import io.netty.channel.Channel;
@@ -52,7 +53,9 @@ public void channelReleased(Channel channel) {
5253
FlushOnReadHandler.class,
5354
ResponseHandler.class,
5455
ReadTimeoutHandler.class,
55-
WriteTimeoutHandler.class);
56+
WriteTimeoutHandler.class,
57+
WriteIdleTimeoutHandler.class);
58+
removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
5659
}
5760
}
5861
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
2828
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
2929
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
30+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.HTTP_STREAMS_HANDLER_NAME;
31+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
3032

3133
import io.netty.buffer.ByteBuf;
3234
import io.netty.buffer.Unpooled;
@@ -39,6 +41,7 @@
3941
import io.netty.handler.codec.http.HttpMethod;
4042
import io.netty.handler.codec.http.HttpRequest;
4143
import io.netty.handler.codec.http.HttpVersion;
44+
import io.netty.handler.timeout.IdleStateHandler;
4245
import io.netty.handler.timeout.ReadTimeoutHandler;
4346
import io.netty.handler.timeout.WriteTimeoutHandler;
4447
import io.netty.util.Attribute;
@@ -221,7 +224,7 @@ private void configurePipeline() throws IOException {
221224
if (protocol == Protocol.HTTP2) {
222225
pipeline.addLast(FlushOnReadHandler.getInstance());
223226
}
224-
pipeline.addLast(new HttpStreamsClientHandler());
227+
pipeline.addLast(HTTP_STREAMS_HANDLER_NAME, new HttpStreamsClientHandler());
225228
pipeline.addLast(ResponseHandler.getInstance());
226229

227230
// It's possible that the channel could become inactive between checking it out from the pool, and adding our response
@@ -238,14 +241,14 @@ private void makeRequest() {
238241
}
239242

240243
private void writeRequest(HttpRequest request) {
241-
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
242-
TimeUnit.MILLISECONDS));
244+
addWriteTimeoutHandlers();
243245
StreamedRequest streamedRequest = new StreamedRequest(request,
244246
context.executeRequest().requestContentPublisher());
247+
245248
channel.writeAndFlush(streamedRequest)
246249
.addListener(wireCall -> {
247250
// Done writing so remove the idle write timeout handler
248-
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class);
251+
removeWriteTimeoutHandlers();
249252
if (wireCall.isSuccess()) {
250253
NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel);
251254

@@ -270,7 +273,7 @@ private void writeRequest(HttpRequest request) {
270273
// Add before HttpStreamsClientHandler so that raw TLS handshake bytes cannot
271274
// prematurely remove this one-time handler. See Expect100ContinueReadTimeoutTest.
272275
channel.pipeline().addBefore(
273-
channel.pipeline().context(HttpStreamsClientHandler.class).name(), null,
276+
HTTP_STREAMS_HANDLER_NAME, null,
274277
new OneTimeReadTimeoutHandler(Duration.ofMillis(context.configuration().readTimeoutMillis())));
275278
} else {
276279
channel.pipeline().addFirst(new ReadTimeoutHandler(context.configuration().readTimeoutMillis(),
@@ -281,6 +284,22 @@ private void writeRequest(HttpRequest request) {
281284
}
282285
}
283286

287+
private void removeWriteTimeoutHandlers() {
288+
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class,
289+
WriteIdleTimeoutHandler.class);
290+
ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
291+
}
292+
293+
private void addWriteTimeoutHandlers() {
294+
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
295+
TimeUnit.MILLISECONDS));
296+
channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, WRITE_IDLE_STATE_HANDLER_NAME,
297+
new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0,
298+
TimeUnit.MILLISECONDS));
299+
channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, null,
300+
new WriteIdleTimeoutHandler(context.configuration().writeTimeoutMillis()));
301+
}
302+
284303
/**
285304
* It should explicitly trigger Read for the following situations:
286305
*
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics;
19+
20+
import io.netty.channel.ChannelDuplexHandler;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.IdleState;
23+
import io.netty.handler.timeout.IdleStateEvent;
24+
import java.io.IOException;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
27+
/**
28+
* Handles writer idle events from IdleStateHandler to detect idle body write gaps.
29+
*/
30+
@SdkInternalApi
31+
public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler {
32+
private final long timeoutMillis;
33+
private boolean closed;
34+
35+
public WriteIdleTimeoutHandler(long timeoutMillis) {
36+
this.timeoutMillis = timeoutMillis;
37+
}
38+
39+
@Override
40+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
41+
if (evt instanceof IdleStateEvent) {
42+
IdleStateEvent e = (IdleStateEvent) evt;
43+
if (e.state() == IdleState.WRITER_IDLE) {
44+
writeTimeout(ctx);
45+
}
46+
}
47+
super.userEventTriggered(ctx, evt);
48+
}
49+
50+
private void writeTimeout(ChannelHandlerContext ctx) {
51+
if (!closed) {
52+
String message = String.format("No data was written to the request body within %dms. "
53+
+ "This can occur if the request body publisher is not producing data, "
54+
+ "for example when using AsyncRequestBody.fromInputStream() with an "
55+
+ "executor that has fewer threads than concurrent requests. "
56+
+ "Verify that the request body publisher is sending data or "
57+
+ "investigate why it may be blocked.", timeoutMillis);
58+
ctx.fireExceptionCaught(new IOException(errorMessageWithChannelDiagnostics(ctx.channel(), message)));
59+
ctx.close();
60+
closed = true;
61+
}
62+
}
63+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
@SdkInternalApi
2828
public final class ChannelUtils {
29+
public static final String WRITE_IDLE_STATE_HANDLER_NAME = "WriteIdleStateHandler";
30+
public static final String HTTP_STREAMS_HANDLER_NAME = "HttpStreamsClientHandler";
31+
2932
private ChannelUtils() {
3033
}
3134

@@ -50,6 +53,20 @@ public static void removeIfExists(ChannelPipeline pipeline, Class<? extends Chan
5053
}
5154
}
5255

56+
public static void removeIfExists(ChannelPipeline pipeline, String... handlers) {
57+
for (String handler: handlers) {
58+
if (pipeline.get(handler) != null) {
59+
try {
60+
pipeline.remove(handler);
61+
} catch (NoSuchElementException exception) {
62+
// There could still be race condition when channel gets
63+
// closed right after removeIfExists is invoked. Ignoring
64+
// NoSuchElementException for that edge case.
65+
}
66+
}
67+
}
68+
}
69+
5370
/**
5471
* Retrieve optional attribute of the channel
5572
*
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.fault;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
20+
21+
import io.netty.bootstrap.ServerBootstrap;
22+
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelInitializer;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.channel.nio.NioEventLoopGroup;
26+
import io.netty.channel.socket.ServerSocketChannel;
27+
import io.netty.channel.socket.nio.NioServerSocketChannel;
28+
import io.netty.handler.ssl.SslContext;
29+
import io.netty.handler.ssl.SslContextBuilder;
30+
import io.netty.handler.ssl.util.SelfSignedCertificate;
31+
import java.io.IOException;
32+
import java.nio.ByteBuffer;
33+
import java.time.Duration;
34+
import java.util.Optional;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.reactivestreams.Publisher;
43+
import org.reactivestreams.Subscriber;
44+
import org.reactivestreams.Subscription;
45+
import software.amazon.awssdk.http.Protocol;
46+
import software.amazon.awssdk.http.SdkHttpFullRequest;
47+
import software.amazon.awssdk.http.SdkHttpMethod;
48+
import software.amazon.awssdk.http.SdkHttpResponse;
49+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
50+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
51+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
52+
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
53+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
54+
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
55+
import software.amazon.awssdk.utils.AttributeMap;
56+
57+
/**
58+
* Tests that the write idle timeout handler detects when no request body data is written
59+
* and proactively closes the connection.
60+
*/
61+
public class WriteIdleTimeoutTest {
62+
63+
private SdkAsyncHttpClient netty;
64+
private Server server;
65+
66+
@BeforeEach
67+
public void setup() throws Exception {
68+
server = new Server();
69+
server.init();
70+
71+
netty = NettyNioAsyncHttpClient.builder()
72+
.writeTimeout(Duration.ofMillis(500))
73+
.readTimeout(Duration.ofSeconds(5))
74+
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
75+
.protocol(Protocol.HTTP1_1)
76+
.buildWithDefaults(AttributeMap.builder()
77+
.put(TRUST_ALL_CERTIFICATES, true)
78+
.build());
79+
}
80+
81+
@AfterEach
82+
public void teardown() throws InterruptedException {
83+
if (server != null) {
84+
server.shutdown();
85+
}
86+
if (netty != null) {
87+
netty.close();
88+
}
89+
}
90+
91+
/**
92+
* A request body publisher that subscribes but never produces any data, simulating
93+
* the thread starvation scenario from the customer issue.
94+
*/
95+
@Test
96+
public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException {
97+
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
98+
.method(SdkHttpMethod.PUT)
99+
.protocol("https")
100+
.host("localhost")
101+
.port(server.port())
102+
.putHeader("Content-Length", "1024")
103+
.build();
104+
105+
CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024));
106+
107+
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
108+
.hasCauseInstanceOf(IOException.class)
109+
.hasStackTraceContaining("No data was written to the request body within 500ms");
110+
}
111+
112+
private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) {
113+
return netty.execute(AsyncExecuteRequest.builder()
114+
.responseHandler(new SdkAsyncHttpResponseHandler() {
115+
@Override
116+
public void onHeaders(SdkHttpResponse headers) {
117+
}
118+
119+
@Override
120+
public void onStream(Publisher<ByteBuffer> stream) {
121+
}
122+
123+
@Override
124+
public void onError(Throwable error) {
125+
}
126+
})
127+
.request(request)
128+
.requestContentPublisher(contentPublisher)
129+
.build());
130+
}
131+
132+
/**
133+
* A content publisher that accepts a subscription but never calls onNext/onComplete,
134+
* simulating a stalled body write.
135+
*/
136+
private static class NeverWritesContentPublisher implements SdkHttpContentPublisher {
137+
private final long contentLength;
138+
139+
NeverWritesContentPublisher(long contentLength) {
140+
this.contentLength = contentLength;
141+
}
142+
143+
@Override
144+
public Optional<Long> contentLength() {
145+
return Optional.of(contentLength);
146+
}
147+
148+
@Override
149+
public void subscribe(Subscriber<? super ByteBuffer> s) {
150+
// Request subscription but never produce data
151+
s.onSubscribe(new Subscription() {
152+
@Override
153+
public void request(long n) {
154+
// intentionally do nothing
155+
}
156+
157+
@Override
158+
public void cancel() {
159+
}
160+
});
161+
}
162+
}
163+
164+
private static class Server extends ChannelInitializer<Channel> {
165+
private ServerBootstrap bootstrap;
166+
private ServerSocketChannel serverSock;
167+
private final NioEventLoopGroup group = new NioEventLoopGroup();
168+
private SslContext sslCtx;
169+
170+
void init() throws Exception {
171+
SelfSignedCertificate ssc = new SelfSignedCertificate();
172+
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
173+
174+
bootstrap = new ServerBootstrap()
175+
.channel(NioServerSocketChannel.class)
176+
.group(group)
177+
.childHandler(this);
178+
179+
serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
180+
}
181+
182+
@Override
183+
protected void initChannel(Channel ch) {
184+
ChannelPipeline pipeline = ch.pipeline();
185+
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
186+
}
187+
188+
void shutdown() throws InterruptedException {
189+
group.shutdownGracefully().await();
190+
serverSock.close();
191+
}
192+
193+
int port() {
194+
return serverSock.localAddress().getPort();
195+
}
196+
197+
}
198+
}

0 commit comments

Comments
 (0)