-
Notifications
You must be signed in to change notification settings - Fork 997
Add idle body write detection to Netty HTTP client #6844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
61a1147
Add idle body write detection to Netty HTTP client
zoewangg 9ba90bb
Address feedback
zoewangg 54b4b57
Address feedback
zoewangg 97ee3fc
Merge branch 'master' into zoewang/WriteIdleTimeoutHandler
zoewangg 0c1f3b3
Merge branch 'master' into zoewang/WriteIdleTimeoutHandler
zoewangg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "bugfix", | ||
| "category": "Netty NIO HTTP Client", | ||
| "contributor": "", | ||
| "description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period." | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ; | ||
| import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY; | ||
| import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken; | ||
| import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME; | ||
|
|
||
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.buffer.Unpooled; | ||
|
|
@@ -39,6 +40,7 @@ | |
| import io.netty.handler.codec.http.HttpMethod; | ||
| import io.netty.handler.codec.http.HttpRequest; | ||
| import io.netty.handler.codec.http.HttpVersion; | ||
| import io.netty.handler.timeout.IdleStateHandler; | ||
| import io.netty.handler.timeout.ReadTimeoutHandler; | ||
| import io.netty.handler.timeout.WriteTimeoutHandler; | ||
| import io.netty.util.Attribute; | ||
|
|
@@ -238,14 +240,14 @@ private void makeRequest() { | |
| } | ||
|
|
||
| private void writeRequest(HttpRequest request) { | ||
| channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(), | ||
| TimeUnit.MILLISECONDS)); | ||
| addWriteTimeoutHandlers(); | ||
| StreamedRequest streamedRequest = new StreamedRequest(request, | ||
| context.executeRequest().requestContentPublisher()); | ||
|
|
||
| channel.writeAndFlush(streamedRequest) | ||
| .addListener(wireCall -> { | ||
| // Done writing so remove the idle write timeout handler | ||
| ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class); | ||
| removeWriteTimeoutHandlers(); | ||
| if (wireCall.isSuccess()) { | ||
| NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel); | ||
|
|
||
|
|
@@ -281,6 +283,23 @@ private void writeRequest(HttpRequest request) { | |
| } | ||
| } | ||
|
|
||
| private void removeWriteTimeoutHandlers() { | ||
| ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class, | ||
| WriteIdleTimeoutHandler.class); | ||
| ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME); | ||
| } | ||
|
|
||
| private void addWriteTimeoutHandlers() { | ||
| channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(), | ||
| TimeUnit.MILLISECONDS)); | ||
| String httpStreamsName = channel.pipeline().context(HttpStreamsClientHandler.class).name(); | ||
| channel.pipeline().addBefore(httpStreamsName, WRITE_IDLE_STATE_HANDLER_NAME, | ||
| new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0, | ||
| TimeUnit.MILLISECONDS)); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to give it a dedicated name because we use IdleStateHandler elsewhere and removing the handler based on class type would cause those handlers to be removed prematurely |
||
| channel.pipeline().addBefore(httpStreamsName, null, | ||
| new WriteIdleTimeoutHandler()); | ||
| } | ||
|
|
||
| /** | ||
| * It should explicitly trigger Read for the following situations: | ||
| * | ||
|
|
||
64 changes: 64 additions & 0 deletions
64
...src/main/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandler.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package software.amazon.awssdk.http.nio.netty.internal; | ||
|
|
||
| import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics; | ||
|
|
||
| import io.netty.channel.ChannelDuplexHandler; | ||
| import io.netty.channel.ChannelHandlerContext; | ||
| import io.netty.handler.timeout.IdleState; | ||
| import io.netty.handler.timeout.IdleStateEvent; | ||
| import java.io.IOException; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
|
|
||
| /** | ||
| * Handles writer idle events from IdleStateHandler to detect idle body write gaps. | ||
| */ | ||
| @SdkInternalApi | ||
| public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler { | ||
| private boolean closed; | ||
|
|
||
| @Override | ||
| public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | ||
| if (evt instanceof IdleStateEvent) { | ||
| IdleStateEvent e = (IdleStateEvent) evt; | ||
| if (e.state() == IdleState.WRITER_IDLE) { | ||
| writeTimeout(ctx); | ||
| } | ||
| } | ||
| super.userEventTriggered(ctx, evt); | ||
| } | ||
|
|
||
| private void writeTimeout(ChannelHandlerContext ctx) throws Exception { | ||
| if (!closed) { | ||
| IOException exception = new IOException( | ||
| errorMessageWithChannelDiagnostics(ctx.channel(), "No data was written to the request body for the configured " | ||
| + "write timeout duration. " | ||
| + "This can occur if the request body publisher is slow to " | ||
| + "produce data, " | ||
| + "for example when using AsyncRequestBody.fromInputStream() " | ||
| + "with an executor " | ||
| + "that has fewer threads than concurrent requests. " | ||
| + "If applicable, consider increasing the executor's thread " | ||
| + "pool size or " | ||
| + "investigating what is preventing the request body from " | ||
| + "being written.")); | ||
|
zoewangg marked this conversation as resolved.
Outdated
|
||
| ctx.fireExceptionCaught(exception); | ||
| ctx.close(); | ||
| closed = true; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
198 changes: 198 additions & 0 deletions
198
...lient/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,198 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package software.amazon.awssdk.http.nio.netty.fault; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
| import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; | ||
|
|
||
| import io.netty.bootstrap.ServerBootstrap; | ||
| import io.netty.channel.Channel; | ||
| import io.netty.channel.ChannelInitializer; | ||
| import io.netty.channel.ChannelPipeline; | ||
| import io.netty.channel.nio.NioEventLoopGroup; | ||
| import io.netty.channel.socket.ServerSocketChannel; | ||
| import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
| import io.netty.handler.ssl.SslContext; | ||
| import io.netty.handler.ssl.SslContextBuilder; | ||
| import io.netty.handler.ssl.util.SelfSignedCertificate; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.time.Duration; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.reactivestreams.Publisher; | ||
| import org.reactivestreams.Subscriber; | ||
| import org.reactivestreams.Subscription; | ||
| import software.amazon.awssdk.http.Protocol; | ||
| import software.amazon.awssdk.http.SdkHttpFullRequest; | ||
| import software.amazon.awssdk.http.SdkHttpMethod; | ||
| import software.amazon.awssdk.http.SdkHttpResponse; | ||
| import software.amazon.awssdk.http.async.AsyncExecuteRequest; | ||
| import software.amazon.awssdk.http.async.SdkAsyncHttpClient; | ||
| import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; | ||
| import software.amazon.awssdk.http.async.SdkHttpContentPublisher; | ||
| import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; | ||
| import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; | ||
| import software.amazon.awssdk.utils.AttributeMap; | ||
|
|
||
| /** | ||
| * Tests that the write idle timeout handler detects when no request body data is written | ||
| * and proactively closes the connection. | ||
| */ | ||
| public class WriteIdleTimeoutTest { | ||
|
|
||
| private SdkAsyncHttpClient netty; | ||
| private Server server; | ||
|
|
||
| @BeforeEach | ||
| public void setup() throws Exception { | ||
| server = new Server(); | ||
| server.init(); | ||
|
|
||
| netty = NettyNioAsyncHttpClient.builder() | ||
| .writeTimeout(Duration.ofMillis(500)) | ||
| .readTimeout(Duration.ofSeconds(5)) | ||
| .eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build()) | ||
| .protocol(Protocol.HTTP1_1) | ||
| .buildWithDefaults(AttributeMap.builder() | ||
| .put(TRUST_ALL_CERTIFICATES, true) | ||
| .build()); | ||
| } | ||
|
|
||
| @AfterEach | ||
| public void teardown() throws InterruptedException { | ||
| if (server != null) { | ||
| server.shutdown(); | ||
| } | ||
| if (netty != null) { | ||
| netty.close(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A request body publisher that subscribes but never produces any data, simulating | ||
| * the thread starvation scenario from the customer issue. | ||
| */ | ||
| @Test | ||
| public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException { | ||
| SdkHttpFullRequest request = SdkHttpFullRequest.builder() | ||
| .method(SdkHttpMethod.PUT) | ||
| .protocol("https") | ||
| .host("localhost") | ||
| .port(server.port()) | ||
| .putHeader("Content-Length", "1024") | ||
| .build(); | ||
|
|
||
| CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024)); | ||
|
|
||
| assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)) | ||
| .hasCauseInstanceOf(IOException.class) | ||
| .hasStackTraceContaining("No data was written to the request body"); | ||
| } | ||
|
|
||
| private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) { | ||
| return netty.execute(AsyncExecuteRequest.builder() | ||
| .responseHandler(new SdkAsyncHttpResponseHandler() { | ||
| @Override | ||
| public void onHeaders(SdkHttpResponse headers) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onStream(Publisher<ByteBuffer> stream) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable error) { | ||
| } | ||
| }) | ||
| .request(request) | ||
| .requestContentPublisher(contentPublisher) | ||
| .build()); | ||
| } | ||
|
|
||
| /** | ||
| * A content publisher that accepts a subscription but never calls onNext/onComplete, | ||
| * simulating a stalled body write. | ||
| */ | ||
| private static class NeverWritesContentPublisher implements SdkHttpContentPublisher { | ||
| private final long contentLength; | ||
|
|
||
| NeverWritesContentPublisher(long contentLength) { | ||
| this.contentLength = contentLength; | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<Long> contentLength() { | ||
| return Optional.of(contentLength); | ||
| } | ||
|
|
||
| @Override | ||
| public void subscribe(Subscriber<? super ByteBuffer> s) { | ||
| // Request subscription but never produce data | ||
| s.onSubscribe(new Subscription() { | ||
| @Override | ||
| public void request(long n) { | ||
| // intentionally do nothing | ||
| } | ||
|
|
||
| @Override | ||
| public void cancel() { | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| private static class Server extends ChannelInitializer<Channel> { | ||
| private ServerBootstrap bootstrap; | ||
| private ServerSocketChannel serverSock; | ||
| private final NioEventLoopGroup group = new NioEventLoopGroup(); | ||
| private SslContext sslCtx; | ||
|
|
||
| void init() throws Exception { | ||
| SelfSignedCertificate ssc = new SelfSignedCertificate(); | ||
| sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); | ||
|
|
||
| bootstrap = new ServerBootstrap() | ||
| .channel(NioServerSocketChannel.class) | ||
| .group(group) | ||
| .childHandler(this); | ||
|
|
||
| serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void initChannel(Channel ch) { | ||
| ChannelPipeline pipeline = ch.pipeline(); | ||
| pipeline.addLast(sslCtx.newHandler(ch.alloc())); | ||
| } | ||
|
|
||
| void shutdown() throws InterruptedException { | ||
| group.shutdownGracefully().await(); | ||
| serverSock.close(); | ||
| } | ||
|
|
||
| int port() { | ||
| return serverSock.localAddress().getPort(); | ||
| } | ||
|
|
||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.