Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Netty NIO HTTP Client",
"contributor": "",
"description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period."
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -52,7 +53,9 @@ public void channelReleased(Channel channel) {
FlushOnReadHandler.class,
ResponseHandler.class,
ReadTimeoutHandler.class,
WriteTimeoutHandler.class);
WriteTimeoutHandler.class,
WriteIdleTimeoutHandler.class);
removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -39,6 +40,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.Attribute;
Expand Down Expand Up @@ -238,14 +240,14 @@ private void makeRequest() {
}

private void writeRequest(HttpRequest request) {
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
TimeUnit.MILLISECONDS));
addWriteTimeoutHandlers();
StreamedRequest streamedRequest = new StreamedRequest(request,
context.executeRequest().requestContentPublisher());

channel.writeAndFlush(streamedRequest)
.addListener(wireCall -> {
// Done writing so remove the idle write timeout handler
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class);
removeWriteTimeoutHandlers();
if (wireCall.isSuccess()) {
NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel);

Expand Down Expand Up @@ -281,6 +283,23 @@ private void writeRequest(HttpRequest request) {
}
}

private void removeWriteTimeoutHandlers() {
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class,
WriteIdleTimeoutHandler.class);
ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
}

private void addWriteTimeoutHandlers() {
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
TimeUnit.MILLISECONDS));
String httpStreamsName = channel.pipeline().context(HttpStreamsClientHandler.class).name();
Comment thread
zoewangg marked this conversation as resolved.
Outdated
channel.pipeline().addBefore(httpStreamsName, WRITE_IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0,
TimeUnit.MILLISECONDS));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to give it a dedicated name because we use IdleStateHandler elsewhere and removing the handler based on class type would cause those handlers to be removed prematurely

channel.pipeline().addBefore(httpStreamsName, null,
new WriteIdleTimeoutHandler());
}

/**
* It should explicitly trigger Read for the following situations:
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.io.IOException;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
* Handles writer idle events from IdleStateHandler to detect idle body write gaps.
*/
@SdkInternalApi
public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler {
private boolean closed;

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
writeTimeout(ctx);
}
}
super.userEventTriggered(ctx, evt);
}

private void writeTimeout(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
IOException exception = new IOException(
errorMessageWithChannelDiagnostics(ctx.channel(), "No data was written to the request body for the configured "
+ "write timeout duration. "
+ "This can occur if the request body publisher is slow to "
+ "produce data, "
+ "for example when using AsyncRequestBody.fromInputStream() "
+ "with an executor "
+ "that has fewer threads than concurrent requests. "
+ "If applicable, consider increasing the executor's thread "
+ "pool size or "
+ "investigating what is preventing the request body from "
+ "being written."));
Comment thread
zoewangg marked this conversation as resolved.
Outdated
ctx.fireExceptionCaught(exception);
ctx.close();
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

@SdkInternalApi
public final class ChannelUtils {
public static final String WRITE_IDLE_STATE_HANDLER_NAME = "WriteIdleStateHandler";

private ChannelUtils() {
}

Expand All @@ -50,6 +52,20 @@ public static void removeIfExists(ChannelPipeline pipeline, Class<? extends Chan
}
}

public static void removeIfExists(ChannelPipeline pipeline, String... handlers) {
for (String handler: handlers) {
if (pipeline.get(handler) != null) {
try {
pipeline.remove(handler);
} catch (NoSuchElementException exception) {
// There could still be race condition when channel gets
// closed right after removeIfExists is invoked. Ignoring
// NoSuchElementException for that edge case.
}
}
}
}

/**
* Retrieve optional attribute of the channel
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.fault;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.utils.AttributeMap;

/**
* Tests that the write idle timeout handler detects when no request body data is written
* and proactively closes the connection.
*/
public class WriteIdleTimeoutTest {

private SdkAsyncHttpClient netty;
private Server server;

@BeforeEach
public void setup() throws Exception {
server = new Server();
server.init();

netty = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ofMillis(500))
.readTimeout(Duration.ofSeconds(5))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder()
.put(TRUST_ALL_CERTIFICATES, true)
.build());
}

@AfterEach
public void teardown() throws InterruptedException {
if (server != null) {
server.shutdown();
}
if (netty != null) {
netty.close();
}
}

/**
* A request body publisher that subscribes but never produces any data, simulating
* the thread starvation scenario from the customer issue.
*/
@Test
public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException {
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.PUT)
.protocol("https")
.host("localhost")
.port(server.port())
.putHeader("Content-Length", "1024")
.build();

CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024));

assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
.hasCauseInstanceOf(IOException.class)
.hasStackTraceContaining("No data was written to the request body");
}

private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) {
return netty.execute(AsyncExecuteRequest.builder()
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}

@Override
public void onStream(Publisher<ByteBuffer> stream) {
}

@Override
public void onError(Throwable error) {
}
})
.request(request)
.requestContentPublisher(contentPublisher)
.build());
}

/**
* A content publisher that accepts a subscription but never calls onNext/onComplete,
* simulating a stalled body write.
*/
private static class NeverWritesContentPublisher implements SdkHttpContentPublisher {
private final long contentLength;

NeverWritesContentPublisher(long contentLength) {
this.contentLength = contentLength;
}

@Override
public Optional<Long> contentLength() {
return Optional.of(contentLength);
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// Request subscription but never produce data
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// intentionally do nothing
}

@Override
public void cancel() {
}
});
}
}

private static class Server extends ChannelInitializer<Channel> {
private ServerBootstrap bootstrap;
private ServerSocketChannel serverSock;
private final NioEventLoopGroup group = new NioEventLoopGroup();
private SslContext sslCtx;

void init() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

bootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(group)
.childHandler(this);

serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
}

@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}

void shutdown() throws InterruptedException {
group.shutdownGracefully().await();
serverSock.close();
}

int port() {
return serverSock.localAddress().getPort();
}

}
}
Loading
Loading