Skip to content

Commit 03c9ac1

Browse files
committed
Add idle body write detection to Netty HTTP client
1 parent d97564a commit 03c9ac1

File tree

8 files changed

+376
-4
lines changed

8 files changed

+376
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
1819
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;
1920

2021
import io.netty.channel.Channel;
@@ -52,7 +53,9 @@ public void channelReleased(Channel channel) {
5253
FlushOnReadHandler.class,
5354
ResponseHandler.class,
5455
ReadTimeoutHandler.class,
55-
WriteTimeoutHandler.class);
56+
WriteTimeoutHandler.class,
57+
WriteIdleTimeoutHandler.class);
58+
removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
5659
}
5760
}
5861
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
2828
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
2929
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
30+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
3031

3132
import io.netty.buffer.ByteBuf;
3233
import io.netty.buffer.Unpooled;
@@ -39,6 +40,7 @@
3940
import io.netty.handler.codec.http.HttpMethod;
4041
import io.netty.handler.codec.http.HttpRequest;
4142
import io.netty.handler.codec.http.HttpVersion;
43+
import io.netty.handler.timeout.IdleStateHandler;
4244
import io.netty.handler.timeout.ReadTimeoutHandler;
4345
import io.netty.handler.timeout.WriteTimeoutHandler;
4446
import io.netty.util.Attribute;
@@ -238,14 +240,14 @@ private void makeRequest() {
238240
}
239241

240242
private void writeRequest(HttpRequest request) {
241-
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
242-
TimeUnit.MILLISECONDS));
243+
addWriteTimeoutHandlers();
243244
StreamedRequest streamedRequest = new StreamedRequest(request,
244245
context.executeRequest().requestContentPublisher());
246+
245247
channel.writeAndFlush(streamedRequest)
246248
.addListener(wireCall -> {
247249
// Done writing so remove the idle write timeout handler
248-
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class);
250+
removeWriteTimeoutHandlers();
249251
if (wireCall.isSuccess()) {
250252
NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel);
251253

@@ -281,6 +283,23 @@ private void writeRequest(HttpRequest request) {
281283
}
282284
}
283285

286+
private void removeWriteTimeoutHandlers() {
287+
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class,
288+
WriteIdleTimeoutHandler.class);
289+
ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
290+
}
291+
292+
private void addWriteTimeoutHandlers() {
293+
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
294+
TimeUnit.MILLISECONDS));
295+
String httpStreamsName = channel.pipeline().context(HttpStreamsClientHandler.class).name();
296+
channel.pipeline().addBefore(httpStreamsName, WRITE_IDLE_STATE_HANDLER_NAME,
297+
new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0,
298+
TimeUnit.MILLISECONDS));
299+
channel.pipeline().addBefore(httpStreamsName, null,
300+
new WriteIdleTimeoutHandler());
301+
}
302+
284303
/**
285304
* It should explicitly trigger Read for the following situations:
286305
*
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics;
19+
20+
import io.netty.channel.ChannelDuplexHandler;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.IdleState;
23+
import io.netty.handler.timeout.IdleStateEvent;
24+
import java.io.IOException;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
27+
/**
28+
* Handles writer idle events from IdleStateHandler to detect idle body write gaps.
29+
*/
30+
@SdkInternalApi
31+
public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler {
32+
private boolean closed;
33+
34+
@Override
35+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
36+
if (evt instanceof IdleStateEvent) {
37+
IdleStateEvent e = (IdleStateEvent) evt;
38+
if (e.state() == IdleState.WRITER_IDLE) {
39+
writeTimeout(ctx);
40+
}
41+
}
42+
super.userEventTriggered(ctx, evt);
43+
}
44+
45+
private void writeTimeout(ChannelHandlerContext ctx) throws Exception {
46+
if (!closed) {
47+
IOException exception = new IOException(
48+
errorMessageWithChannelDiagnostics(ctx.channel(), "No data was written to the request body for the configured "
49+
+ "write timeout duration. "
50+
+ "This can occur if the request body publisher is slow to "
51+
+ "produce data, "
52+
+ "for example when using AsyncRequestBody.fromInputStream() "
53+
+ "with an executor "
54+
+ "that has fewer threads than concurrent requests. "
55+
+ "If applicable, consider increasing the executor's thread "
56+
+ "pool size or "
57+
+ "investigating what is preventing the request body from "
58+
+ "being written."));
59+
ctx.fireExceptionCaught(exception);
60+
ctx.close();
61+
closed = true;
62+
}
63+
}
64+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
@SdkInternalApi
2828
public final class ChannelUtils {
29+
public static final String WRITE_IDLE_STATE_HANDLER_NAME = "WriteIdleStateHandler";
2930
private ChannelUtils() {
3031
}
3132

@@ -50,6 +51,20 @@ public static void removeIfExists(ChannelPipeline pipeline, Class<? extends Chan
5051
}
5152
}
5253

54+
public static void removeIfExists(ChannelPipeline pipeline, String... handlers) {
55+
for (String handler: handlers) {
56+
if (pipeline.get(handler) != null) {
57+
try {
58+
pipeline.remove(handler);
59+
} catch (NoSuchElementException exception) {
60+
// There could still be race condition when channel gets
61+
// closed right after removeIfExists is invoked. Ignoring
62+
// NoSuchElementException for that edge case.
63+
}
64+
}
65+
}
66+
}
67+
5368
/**
5469
* Retrieve optional attribute of the channel
5570
*
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.fault;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
20+
21+
import io.netty.bootstrap.ServerBootstrap;
22+
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelInitializer;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.channel.nio.NioEventLoopGroup;
26+
import io.netty.channel.socket.ServerSocketChannel;
27+
import io.netty.channel.socket.nio.NioServerSocketChannel;
28+
import io.netty.handler.ssl.SslContext;
29+
import io.netty.handler.ssl.SslContextBuilder;
30+
import io.netty.handler.ssl.util.SelfSignedCertificate;
31+
import java.io.IOException;
32+
import java.nio.ByteBuffer;
33+
import java.time.Duration;
34+
import java.util.Optional;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.reactivestreams.Publisher;
43+
import org.reactivestreams.Subscriber;
44+
import org.reactivestreams.Subscription;
45+
import software.amazon.awssdk.http.Protocol;
46+
import software.amazon.awssdk.http.SdkHttpFullRequest;
47+
import software.amazon.awssdk.http.SdkHttpMethod;
48+
import software.amazon.awssdk.http.SdkHttpResponse;
49+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
50+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
51+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
52+
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
53+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
54+
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
55+
import software.amazon.awssdk.utils.AttributeMap;
56+
57+
/**
58+
* Tests that the write idle timeout handler detects when no request body data is written
59+
* and proactively closes the connection.
60+
*/
61+
public class WriteIdleTimeoutTest {
62+
63+
private SdkAsyncHttpClient netty;
64+
private Server server;
65+
66+
@BeforeEach
67+
public void setup() throws Exception {
68+
server = new Server();
69+
server.init();
70+
71+
netty = NettyNioAsyncHttpClient.builder()
72+
.writeTimeout(Duration.ofMillis(500))
73+
.readTimeout(Duration.ofSeconds(5))
74+
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
75+
.protocol(Protocol.HTTP1_1)
76+
.buildWithDefaults(AttributeMap.builder()
77+
.put(TRUST_ALL_CERTIFICATES, true)
78+
.build());
79+
}
80+
81+
@AfterEach
82+
public void teardown() throws InterruptedException {
83+
if (server != null) {
84+
server.shutdown();
85+
}
86+
if (netty != null) {
87+
netty.close();
88+
}
89+
}
90+
91+
/**
92+
* A request body publisher that subscribes but never produces any data, simulating
93+
* the thread starvation scenario from the customer issue.
94+
*/
95+
@Test
96+
public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException {
97+
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
98+
.method(SdkHttpMethod.PUT)
99+
.protocol("https")
100+
.host("localhost")
101+
.port(server.port())
102+
.putHeader("Content-Length", "1024")
103+
.build();
104+
105+
CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024));
106+
107+
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
108+
.hasCauseInstanceOf(IOException.class)
109+
.hasStackTraceContaining("No data was written to the request body");
110+
}
111+
112+
private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) {
113+
return netty.execute(AsyncExecuteRequest.builder()
114+
.responseHandler(new SdkAsyncHttpResponseHandler() {
115+
@Override
116+
public void onHeaders(SdkHttpResponse headers) {
117+
}
118+
119+
@Override
120+
public void onStream(Publisher<ByteBuffer> stream) {
121+
}
122+
123+
@Override
124+
public void onError(Throwable error) {
125+
}
126+
})
127+
.request(request)
128+
.requestContentPublisher(contentPublisher)
129+
.build());
130+
}
131+
132+
/**
133+
* A content publisher that accepts a subscription but never calls onNext/onComplete,
134+
* simulating a stalled body write.
135+
*/
136+
private static class NeverWritesContentPublisher implements SdkHttpContentPublisher {
137+
private final long contentLength;
138+
139+
NeverWritesContentPublisher(long contentLength) {
140+
this.contentLength = contentLength;
141+
}
142+
143+
@Override
144+
public Optional<Long> contentLength() {
145+
return Optional.of(contentLength);
146+
}
147+
148+
@Override
149+
public void subscribe(Subscriber<? super ByteBuffer> s) {
150+
// Request subscription but never produce data
151+
s.onSubscribe(new Subscription() {
152+
@Override
153+
public void request(long n) {
154+
// intentionally do nothing
155+
}
156+
157+
@Override
158+
public void cancel() {
159+
}
160+
});
161+
}
162+
}
163+
164+
private static class Server extends ChannelInitializer<Channel> {
165+
private ServerBootstrap bootstrap;
166+
private ServerSocketChannel serverSock;
167+
private final NioEventLoopGroup group = new NioEventLoopGroup();
168+
private SslContext sslCtx;
169+
170+
void init() throws Exception {
171+
SelfSignedCertificate ssc = new SelfSignedCertificate();
172+
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
173+
174+
bootstrap = new ServerBootstrap()
175+
.channel(NioServerSocketChannel.class)
176+
.group(group)
177+
.childHandler(this);
178+
179+
serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
180+
}
181+
182+
@Override
183+
protected void initChannel(Channel ch) {
184+
ChannelPipeline pipeline = ch.pipeline();
185+
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
186+
}
187+
188+
void shutdown() throws InterruptedException {
189+
group.shutdownGracefully().await();
190+
serverSock.close();
191+
}
192+
193+
int port() {
194+
return serverSock.localAddress().getPort();
195+
}
196+
197+
}
198+
}

0 commit comments

Comments
 (0)